Open Source, Open Future!
  menu
107 文章
ღゝ◡╹)ノ❤️

Kafka主题增删改原理探析:深入理解操作背后的机制 置顶!

1、主题创建

1.1、方式

用kafka自带的脚本创建一个主题,主题名:topic_a,分区数为3,副本因子为3,命令如下:

方式一

指定3分区3副本,由kafka用算法自动生成分区副本分配方案:

bin/kafka-topics.sh --create --topic topic_a --zookeeper localhost:2184 --partitions 3 --replication-factor 3

方式二

自定义分区副本分配方案(3分区3副本),例如:

sh bin/kafka-topics.sh --create --topic topic_a --zookeeper localhost:2184 --replica-assignment 1:2:0,2:0:1,0:1:2

执行脚本后返回结果:

Created topic "topic_a".


1.2、数据

创建的主题:topic_a,数据如下:

1、ZK数据如下:

2、kafka存储文件如下:


1.3、原理

1.3.1、执行脚本

kafka-topic.sh脚本的内容很简单,如下:

exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"

最终调用的是 kafka.admin.TopicCommand 的createTopic方法来创建主题。


1.3.2、确定分区副本分配方案

1.3.2.1、流程

分区副本分配方案可以由算法自动生成,也可以用参数--replica-assignment来指定,流程如下:

  1. 如果指定了方案
    1.1. 先对方案做解析校检
      ● 不同分区之间用 "," 分割
      ● 同一个分区的所有副本用 ":" 分割,第一个为 Leader 副本所在的节点
      ● 同一个分区的副本不能在同一个broker上
      ● 同一个主题不同分区的副本数必须相同
    1.2. 再把分配方案写入zk
     1.2.1. zk节点:/config/topics/<TOPIC_NAME>,写入数据:{"version":1,"config":{}}
     1.2.2. zk节点:/brokers/topics/<TOPIC_NAME>,写入副本分配方案2
  2. 如果未指定方案
    2.1. 先用算法自动生成方案
     2.1.1. 参数校检
      ● 分区数不能小于0
      ● 副本数不能小于0
      ● 副本数不能大于broker数
     2.1.2. 算法自动生成方案
    2.2. 再把分配方案写入zk
     2.2.1. zk节点:/config/topics/<TOPIC_NAME>,写入数据:{"version":1,"config":{}}
     2.2.2. zk节点:/brokers/topics/<TOPIC_NAME>,写入副本分配方案

ZK数据结构如下:


1.3.2.2、源码

判断是否用参数--replica-assignment指定了分区副本分配方案,若指定了,则先做解析校检,然后把分配方案写入ZK;若未指定,则先用算法自动生成方案,再把分配方案写入ZK.

def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
  val topic = opts.options.valueOf(opts.topicOpt)
  val configs = parseTopicConfigsToBeAdded(opts)
  val ifNotExists = opts.options.has(opts.ifNotExistsOpt)
  if (Topic.hasCollisionChars(topic))
  println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.")
  try {
    // 是否指定了分区副本分配方案(参数:--replica-assignment)
    if (opts.options.has(opts.replicaAssignmentOpt)) {
      // 1、若指定了分配方案
      // 1.1、则先对方案做解析校检
      val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
      // 1.2、再把分配方案写入zk
      AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false)
    } else {
      // 2、若未指定分配方案
      // 2.1、先用算法自动生成方案
      CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
      val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
      val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
      val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
      else RackAwareMode.Enforced
      // 2.2、再把分配方案写入zk
      AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode)
    }
    println("Created topic \"%s\".".format(topic))
  } catch {
    case e: TopicExistsException => if (!ifNotExists) throw e
  }
}

1.3.2.2.1、指定了分配方案

如果指定了方案,则先对方案做解析校检,如下:

// 方案校检和解析
def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = {
  // 分区之间用 "," 分割
  val partitionList = replicaAssignmentList.split(",")
  val ret = new mutable.HashMap[Int, List[Int]]()
  for (i <- 0 until partitionList.size) {
    // 同一个分区的所有副本用 ":" 分割,第一个为 Leader 副本所在的节点
    val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
    val duplicateBrokers = CoreUtils.duplicates(brokerList)
    // 同一个分区的副本不能在同一个broker上
    if (duplicateBrokers.nonEmpty)
      throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries: %s".format(duplicateBrokers.mkString(",")))
    ret.put(i, brokerList.toList)
    // 同一个主题不同分区的副本数必须相同
    if (ret(i).size != ret(0).size)
      throw new AdminOperationException("Partition " + i + " has different replication factor: " + brokerList)
  }
  ret.toMap
}

然后将分配方案写入zk:

def createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils: ZkUtils,
                                                   topic: String,
                                                   partitionReplicaAssignment: Map[Int, Seq[Int]],
                                                   config: Properties = new Properties,
                                                   update: Boolean = false) {
  validateCreateOrUpdateTopic(zkUtils, topic, partitionReplicaAssignment, config, update)
  if (!update) {
    // 1.2.1、zk节点:/config/topics/<TOPIC_NAME>,写入数据:{"version":1,"config":{}}
    writeEntityConfig(zkUtils, getEntityConfigPath(ConfigType.Topic, topic), config)
  }
    // 1.2.2、zk节点:/brokers/topics/<TOPIC_NAME>,写入副本分配方案
  writeTopicPartitionAssignment(zkUtils, topic, partitionReplicaAssignment, update)
}

1.3.2.2.2、未指定分配方案

如果未指定方案,则先用算法自动生成方案,再把方案写入zk:

def createTopic(zkUtils: ZkUtils,
                topic: String,
                partitions: Int,
                replicationFactor: Int,
                topicConfig: Properties = new Properties,
                rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
  // 获取broker元数据
  val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
  // 2.1、算法自动生成分配方案
  val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)
  // 2.2、分配方案写入zk
  AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)
}

算法自动生成分配方案:

def assignReplicasToBrokers(brokerMetadatas: Seq[BrokerMetadata],
                            nPartitions: Int,
                            replicationFactor: Int,
                            fixedStartIndex: Int = -1,
                            startPartitionId: Int = -1): Map[Int, Seq[Int]] = {
  // 分区数不能小于0
  if (nPartitions <= 0)
    throw new InvalidPartitionsException("number of partitions must be larger than 0")
  // 副本数不能小于0
  if (replicationFactor <= 0)
    throw new InvalidReplicationFactorException("replication factor must be larger than 0")
  // 副本数不能大于broker数
  if (replicationFactor > brokerMetadatas.size)
    throw new InvalidReplicationFactorException(s"replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}")

  if (brokerMetadatas.forall(_.rack.isEmpty))
    // 分配算法(无机架感知的情况)
    assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex,
      startPartitionId)
  else {
    if (brokerMetadatas.exists(_.rack.isEmpty))
      throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment")
    // 分配算法(有机架感知的情况)
    assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex,
      startPartitionId)
  }
}


// 分配算法(无机架感知的情况)
private def assignReplicasToBrokersRackUnaware(nPartitions: Int,       // 分区数
                                               replicationFactor: Int, // 副本数
                                               brokerList: Seq[Int],   // broker列表
                                               fixedStartIndex: Int,   // -1
                                               startPartitionId: Int): Map[Int, Seq[Int]] = { // -1
  // 用来存储分配方案
  val ret = mutable.Map[Int, Seq[Int]]()
  // broker集合
  val brokerArray = brokerList.toArray
  // fixedStartIndex默认为-1,所以产生一个随机数赋值给startIndex,比如 1
  val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
  // startPartitionId默认为-1,所以currentPartitionId为0
  var currentPartitionId = math.max(0, startPartitionId)
  // fixedStartIndex默认为-1,所以产生一个随机数赋值给nextReplicaShift,比如 2
  var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)

  // 遍历所有分区,计算每个分区的副本分配方案  
  for (_ <- 0 until nPartitions) {
    if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
      nextReplicaShift += 1
    val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
    val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
    for (j <- 0 until replicationFactor - 1)
      replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
  
    ret.put(currentPartitionId, replicaBuffer)
    currentPartitionId += 1
  }
  ret
}

// 每个分区第一个副本确定后,其他副本的分配计算方法
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
  val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
  (firstReplicaIndex + shift) % nBrokers
}

对第一个分区进行计算,最终得到副本集合:[1, 2, 0]

// currentPartitionId = 0
  // startIndex = 1
  // nextReplicaShift = 2

  if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
    nextReplicaShift += 1

  // firstReplicaIndex = (0 + 1) % 3 = 1
  val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length

  // replicaBuffer = [1]
  val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))

  for (j <- 0 until replicationFactor - 1)
    replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))

  // replicaBuffer = [1, 2, 0]
  // ret = {0:[1, 2, 0]}
  ret.put(currentPartitionId, replicaBuffer)

  // currentPartitionId = 1
  currentPartitionId += 1


################replicaIndex第一次循环###################
  // firstReplicaIndex = 1
  // secondReplicaShift = 2
  // replicaIndex = 0
  // nBrokers = 3
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
  // shift = 1 + (2 + 0) % (3 - 1) = 1 + 0 = 1
  val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
  // (1 + 1) % 3 = 2
  (firstReplicaIndex + shift) % nBrokers
}

################replicaIndex第二次循环###################
  // firstReplicaIndex = 1
  // secondReplicaShift = 2
  // replicaIndex = 1
  // nBrokers = 3
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
  // shift = 1 + (2 + 1) % (3 - 1) = 1 + 1 = 2
  val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
  // (1 + 2) % 3 = 0
  (firstReplicaIndex + shift) % nBrokers
}

对第二分区进行计算,最终得到副本集合:[1, 2, 0]

// currentPartitionId = 1
  // startIndex = 1
  // nextReplicaShift = 2

  if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
    nextReplicaShift += 1

  // firstReplicaIndex = (1 + 1) % 3 = 2
  val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length

  // replicaBuffer = [2]
  val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))

  for (j <- 0 until replicationFactor - 1)
    replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))

  // replicaBuffer = [2, 0, 1]
  // ret = {1:[2, 0, 1], 0:[1, 2, 0]}
  ret.put(currentPartitionId, replicaBuffer)

  // currentPartitionId = 2
  currentPartitionId += 1


################replicaIndex第一次循环###################
  // firstReplicaIndex = 2
  // secondReplicaShift = 2
  // replicaIndex = 0
  // nBrokers = 3
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
  // shift = 1 + (2 + 0) % (3 - 1) = 1 + 0 = 1
  val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
  // (2 + 1) % 3 = 0
  (firstReplicaIndex + shift) % nBrokers
}

################replicaIndex第二次循环###################
  // firstReplicaIndex = 1
  // secondReplicaShift = 2
  // replicaIndex = 1
  // nBrokers = 3
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
  // shift = 1 + (2 + 1) % (3 - 1) = 1 + 1 = 2
  val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
  // (2 + 2) % 3 = 1
  (firstReplicaIndex + shift) % nBrokers
}

对第三分区进行计算,最终得到副本集合:[0, 1, 2]

// currentPartitionId = 2
  // startIndex = 1
  // nextReplicaShift = 2

  if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
    nextReplicaShift += 1

  // firstReplicaIndex = (2 + 1) % 3 = 0
  val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length

  // replicaBuffer = [0]
  val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))

  for (j <- 0 until replicationFactor - 1)
    replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))

  // replicaBuffer = [0, 1, 2]
  // ret = {2:[0, 1, 2], 1:[2, 0, 1], 0:[1, 2, 0]]}
  ret.put(currentPartitionId, replicaBuffer)

  // currentPartitionId = 3
  currentPartitionId += 1


################replicaIndex第一次循环###################
  // firstReplicaIndex = 0
  // secondReplicaShift = 2
  // replicaIndex = 0
  // nBrokers = 3
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
  // shift = 1 + (2 + 0) % (3 - 1) = 1 + 0 = 1
  val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
  // (0 + 1) % 3 = 1
  (firstReplicaIndex + shift) % nBrokers
}

################replicaIndex第二次循环###################
  // firstReplicaIndex = 0
  // secondReplicaShift = 2
  // replicaIndex = 1
  // nBrokers = 3
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
  // shift = 1 + (2 + 1) % (3 - 1) = 1 + 1 = 2
  val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
  // (0 + 2) % 3 = 2
  (firstReplicaIndex + shift) % nBrokers
}

最终得到副本分配方案:

"2" : [ 0, 1, 2 ],
"1" : [ 2, 0, 1 ],
"0" : [ 1, 2, 0 ]


最后方案写入ZK中(/brokers/topics/<TOPIC_NAME>),数据如下:


1.3.3、监听器

Controller选举成功后,会创建监听器(TopicChangeListener)监听ZK节点(/brokers/topics)的变化。当新建主题后,分区副本分配方案会写入ZK(/brokers/topics/<TOPIC_NAME>),Controller便能感知到变化,从而触发主题新增逻辑。


1.3.3.1、流程

创建主题流程如下:

  1. 从ZK节点(/brokers/topics/<TOPIC_NAME>)获取分区副本分配方案
  2. 更新缓存
  3. Controller创建新主题
    3.1. 为新主题注册分区变更监听器(监听ZK节点:/brokers/topics/<TOPIC_NAME> )
    3.2. 分区状态机状态转移至NewPartition
    3.3. 副本状态机状态转移至NewReplica
    3.4. AR中存活的第一个副本,选举为Leader
    3.5. AR中存活的副本集合,为ISR
    3.6. ZK上创建持久节点,/brokers/topics/<TOPIC_NAME>/partitions/<PARTITION_ID>/state
    3.7. ZK节点写入分区的元数据(leader、isr、leader_epoch、controller_epoch、version)
    3.8. 更新本地缓存的元数据
    3.9. Controller向所有的broker发送LeaderAndIsr请求
    3.10. 分区状态机状态转移至OnlinePartition(可用的状态)
    3.11. 副本状态机状态转移至OnlineReplica(可用的状态)

1.3.3.2、源码

class TopicChangeListener(protected val controller: KafkaController) extends ControllerZkChildListener {

  protected def logName = "TopicChangeListener"

  def doHandleChildChange(parentPath: String, children: Seq[String]) {
    inLock(controllerContext.controllerLock) {
      if (hasStarted.get) {
        try {
          val currentChildren = {
            debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
            children.toSet
          }
  
          // 要新增的主题
          val newTopics = currentChildren -- controllerContext.allTopics
          // 要删除的主题
          val deletedTopics = controllerContext.allTopics -- currentChildren
  
          controllerContext.allTopics = currentChildren

          // 1、从ZK节点(/brokers/topics/<TOPIC_NAME>)获取分区副本分配方案
          val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
  
          // 2、更新缓存
          controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
            !deletedTopics.contains(p._1.topic))
          controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)

          // 3、Controller处理新创建的主题
          if (newTopics.nonEmpty)
            controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet)
        } catch {
          case e: Throwable => error("Error while handling new topic", e)
        }
      }
    }
  }
}

// 新主题创建
def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) {
  // 注册分区变更监听器(监听ZK节点:/brokers/topics/<TOPIC_NAME> )
  topics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
  // 新分区创建
  onNewPartitionCreation(newPartitions)
}

// 新分区创建
def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
  // 分区状态机状态转移至NewPartition
  partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
  // 副本状态机状态转移至NewReplica
  replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
  // 分区状态机状态转移至OnlinePartition(确定分区的Leader、ISR等,并写入ZK,更新缓存,向相关Broker发送LeaderAndIsr请求)
  partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
  // 副本状态机状态转移至OnlineReplica
  replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
}

2、主题列表

查询所有的主题,可以执行以下命令:

sh bin/kafka-topics.sh --list --zookeeper localhost:2184

执行脚本后返回结果:

topic_a
topic_b
topic_c


原理如下:

  1. 读取ZK节点(/brokers/topics)下的所有主题
  2. 遍历所有主题,过滤掉删除的主题(/admin/delete_topics/<TOPIC_NAME>)
def listTopics(zkUtils: ZkUtils, opts: TopicCommandOptions) {
  // 1、读取ZK节点(/brokers/topics)下的所有主题
  val topics = getTopics(zkUtils, opts)
  // 2、遍历所有主题,过滤掉删除的主题(/admin/delete_topics/<TOPIC_NAME>)
  for (topic <- topics) {
    if (zkUtils.pathExists(getDeleteTopicPath(topic))) {
      println("%s - marked for deletion".format(topic))
    } else {
      println(topic)
    }
  }
}

3、主题详情

查询某个主题的详情,可以执行以下命令:

sh bin/kafka-topics.sh --describe --topic topic_a  --zookeeper localhost:2184

执行脚本后返回结果:

Topic:topic_a PartitionCount:2 ReplicationFactor:1 Configs:
Topic: topic_a Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: topic_a Partition: 1 Leader: 0 Replicas: 0 Isr: 0


原理如下:

def describeTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
  val topics = getTopics(zkUtils, opts)
  val reportUnderReplicatedPartitions = opts.options.has(opts.reportUnderReplicatedPartitionsOpt)
  val reportUnavailablePartitions = opts.options.has(opts.reportUnavailablePartitionsOpt)
  val reportOverriddenConfigs = opts.options.has(opts.topicsWithOverridesOpt)
  // 从ZK读取broker列表(/brokers/ids)
  val liveBrokers = zkUtils.getAllBrokersInCluster().map(_.id).toSet
  for (topic <- topics) {
    //  从ZK读取分区副本分配方案(/brokers/topics/<TOPIC_NAME>)
    zkUtils.getPartitionAssignmentForTopics(List(topic)).get(topic) match {
      case Some(topicPartitionAssignment) =>
        val describeConfigs: Boolean = !reportUnavailablePartitions && !reportUnderReplicatedPartitions
        val describePartitions: Boolean = !reportOverriddenConfigs
        val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
        if (describeConfigs) {
          val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic).asScala
          if (!reportOverriddenConfigs || configs.nonEmpty) {
            val numPartitions = topicPartitionAssignment.size
            val replicationFactor = topicPartitionAssignment.head._2.size
            println("Topic:%s\tPartitionCount:%d\tReplicationFactor:%d\tConfigs:%s"
              .format(topic, numPartitions, replicationFactor, configs.map(kv => kv._1 + "=" + kv._2).mkString(",")))
          }
        }
        if (describePartitions) {
          //  遍历主题的所有分区
          for ((partitionId, assignedReplicas) <- sortedPartitions) {
            // 从ZK读取Leader和ISR(/brokers/topics/<TOPIC_NAME>/partitions/<PARTITION_ID>/state)
            val inSyncReplicas = zkUtils.getInSyncReplicasForPartition(topic, partitionId)
            val leader = zkUtils.getLeaderForPartition(topic, partitionId)
            if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) ||
              (reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) ||
              (reportUnavailablePartitions && (leader.isEmpty || !liveBrokers.contains(leader.get)))) {
              print("\tTopic: " + topic)
              print("\tPartition: " + partitionId)
              print("\tLeader: " + (if (leader.isDefined) leader.get else "none"))
              print("\tReplicas: " + assignedReplicas.mkString(","))
              println("\tIsr: " + inSyncReplicas.mkString(","))
            }
          }
        }
      case None =>
        println("Topic " + topic + " doesn't exist!")
    }
  }
}

4、修改主题

4.1、扩容

4.1.1、方式

将主题topic_a的分区数增加到4,如下:

sh bin/kafka-topics.sh --alter --topic topic_a  --zookeeper localhost:2184 --partitions 4

执行脚本后返回结果:

Adding partitions succeeded!


4.1.2、原理

def alterTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
  ...
  // 分区数
  val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
  // 副本
  val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
  // 扩容
  AdminUtils.addPartitions(zkUtils, topic, nPartitions, replicaAssignmentStr)
  println("Adding partitions succeeded!")
  ...
}
def addPartitions(zkUtils: ZkUtils,
                  topic: String,
                  numPartitions: Int = 1,  // 4
                  replicaAssignmentStr: String = "",  // null
                  checkBrokerAvailable: Boolean = true,
                  rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
  //  从ZK读取分区副本分配方案(/brokers/topics/<TOPIC_NAME>)
  val existingPartitionsReplicaList = zkUtils.getReplicaAssignmentForTopics(List(topic))
  //  需要增加的分区数
  val partitionsToAdd = numPartitions - existingPartitionsReplicaList.size
  // 分区数只能增加,不能减少
  if (partitionsToAdd <= 0)
    throw new AdminOperationException("The number of partitions for a topic can only be increased")
  ...
  
  // 算法自动生成新的分配方案
  val newPartitionReplicaList = ...
  partitionReplicaList ++= newPartitionReplicaList
  
  // 将新的分配方案写入ZK
  AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, partitionReplicaList, update = true)
}
// 分配算法
private def assignReplicasToBrokersRackUnaware(nPartitions: Int,       // 分区数 1
                                               replicationFactor: Int, // 副本数 3
                                               brokerList: Seq[Int],   // broker列表 
                                               fixedStartIndex: Int,   // 1
                                               startPartitionId: Int): Map[Int, Seq[Int]] = { // 3
  // 用来存储分配方案
  val ret = mutable.Map[Int, Seq[Int]]()
  // broker集合
  val brokerArray = brokerList.toArray
  // startIndex = 1 (fixedStartIndex为1)
  val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
  // currentPartitionId = 3 (startPartitionId为3) 
  var currentPartitionId = math.max(0, startPartitionId)
  // nextReplicaShift = 1 (fixedStartIndex为1)
  var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)

  // 遍历所有分区,计算每个分区的副本分配方案  
  for (_ <- 0 until nPartitions) {
    if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
      nextReplicaShift += 1
    val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
    val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
    for (j <- 0 until replicationFactor - 1)
      replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
  
    ret.put(currentPartitionId, replicaBuffer)
    currentPartitionId += 1
  }
  ret
}

// 每个分区第一个副本确定后,其他副本的分配计算方法
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
  val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
  (firstReplicaIndex + shift) % nBrokers
}

对新分区进行计算,最终得到副本集合:[1, 2, 0]

// currentPartitionId = 3
  // startIndex = 1
  // nextReplicaShift = 1

  // (3 > 0 && (3 % 3 == 0)) true
  if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
    // nextReplicaShift = 2
    nextReplicaShift += 1

  // firstReplicaIndex = (3 + 1) % 3 = 1
  val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length

  // replicaBuffer = [1]
  val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))

  for (j <- 0 until replicationFactor - 1)
    replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))

  // replicaBuffer = [1, 2, 0]
  // ret = {3:[1, 2, 0]}
  ret.put(currentPartitionId, replicaBuffer)

  // currentPartitionId = 4
  currentPartitionId += 1


################replicaIndex第一次循环###################
  // firstReplicaIndex = 1
  // secondReplicaShift = 2
  // replicaIndex = 0
  // nBrokers = 3
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
  // shift = 1 + (2 + 0) % (3 - 1) = 1 + 0 = 1
  val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
  // (1 + 1) % 3 = 2
  (firstReplicaIndex + shift) % nBrokers
}

################replicaIndex第二次循环###################
  // firstReplicaIndex = 1
  // secondReplicaShift = 2
  // replicaIndex = 1
  // nBrokers = 3
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
  // shift = 1 + (2 + 1) % (3 - 1) = 1 + 1 = 2
  val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
  // (1 + 2) % 3 = 0
  (firstReplicaIndex + shift) % nBrokers
}

最后方案写入ZK中(/brokers/topics/<TOPIC_NAME>),数据如下:


4.2、缩容

将主题topic_a的分区数缩减到1,如下:

sh bin/kafka-topics.sh --alter --topic topic_a  --zookeeper localhost:2184 --partitions 1

执行脚本后报错:

kafka.admin.AdminOperationException: The number of partitions for a topic can only be increased

kafka不支持缩减分区,代码如下:

// 要增加的分区数 = 新的分区数 - 老的分区数
val partitionsToAdd = numPartitions - existingPartitionsReplicaList.size
if (partitionsToAdd <= 0)
throw new AdminOperationException("The number of partitions for a topic can only be increased")

5、主题删除

5.1、方式

删除主题,可以执行以下命令:

sh bin/kafka-topics.sh --delete --topic topic_c --zookeeper localhost:2184

执行脚本后返回结果:

Topic topic_c is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.


5.2、原理

5.2.1、执行脚本

执行脚本后,最终调用的是 kafka.admin.TopicCommand 的deleteTopic方法来删除主题。

流程如下:

  1. 校检,kafka内置的主题不能删除(__consumer_offsets)
  2. 创建ZK节点:/admin/delete_topics/<TOPIC_NAME>
def deleteTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
  val topics = getTopics(zkUtils, opts)
  val ifExists = opts.options.has(opts.ifExistsOpt)
  if (topics.isEmpty && !ifExists) {
    throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
                                                                                        opts.options.valueOf(opts.zkConnectOpt)))
  }
  topics.foreach { topic =>
    try {
      // kafka内置的主题不能删除(__consumer_offsets)
      if (Topic.isInternal(topic)) {
        throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic))
      } else {
        // 创建ZK节点:/admin/delete_topics/<TOPIC_NAME>
        zkUtils.createPersistentPath(getDeleteTopicPath(topic))
        println("Topic %s is marked for deletion.".format(topic))
        println("Note: This will have no impact if delete.topic.enable is not set to true.")
      }
    } catch {
      case _: ZkNodeExistsException =>
      println("Topic %s is already marked for deletion.".format(topic))
      case e: AdminOperationException =>
      throw e
      case _: Throwable =>
      throw new AdminOperationException("Error while deleting topic %s".format(topic))
    }
  }
}

5.2.2、监听器

Controller选举成功后,会创建监听器(TopicDeletionListener)监听ZK节点(/admin/delete_topics)的变化。当删除主题后,这个ZK节点下会创建子节点(主题名),Controller便能感知到变化,从而触发主题删除逻辑。

5.2.2.1、流程

删除主题流程如下:

  1. 检查主题的所有副本之前是否已经都删除成功了(副本状态:ReplicaDeletionSuccessful)
    1.1. 若是,则解除对该主题分区的监听器,删除ZK和Controller缓存中该主题的信息
     1.1.1. 解除分区变化监听器
     1.1.2. 副本状态机将副本状态转移至NonExistentReplica
     1.1.3. 分区状态机将分区状态先转移至OfflinePartition
     1.1.4. 分区状态机将分区状态再转移至NonExistentPartition
     1.1.5. 删除ZK节点/brokers/topics/<TOPIC_NAME>
     1.1.6. 删除ZK节点/config/topics/<TOPIC_NAME>
     1.1.7. 删除ZK节点/admin/delete_topics/<TOPIC_NAME>
     1.1.8. 移除Controller上下文中该主题相关的信息
    1.2. 如果有副本删除失败了(ReplicaDeletionIneligible),则将状态转移到OfflineReplica,后面重新进行删除
  2. 遍历删除主题的所有副本
    2.1. 副本状态机将挂掉的副本状态转移至ReplicaDeletionIneligible(不可删除)
    2.2. 副本状态机将在线的副本状态转移至OfflineReplica(可删除)
    2.3. 副本状态机将可删除的在线的副本状态转移至ReplicaDeletionStarted
    2.4. 给broker发送StopReplica请求删除副本
    2.5. 回调函数处理删除的结果
     2.5.1. 若删除副本成功,将副本状态设置为ReplicaDeletionSuccessful
     2.5.2. 若删除副本失败,将副本状态设置为ReplicaDeletionIneligible
     2.5.3. 回调函数中再次调用 步骤1

5.2.2.2、源码

TopicDeletionListener监听器逻辑如下:

case class TopicDeletion(var topicsToBeDeleted: Set[String]) extends ControllerEvent {
  def state = ControllerState.TopicDeletion
  override def process(): Unit = {
    if (!isActive) return
  
    val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
    if (nonExistentTopics.nonEmpty) {
      nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
    }
    // 需要被删除的主题
    topicsToBeDeleted --= nonExistentTopics
    // 是否允许删除主题(delete.topic.enable,默认false)
    if (config.deleteTopicEnable) {
      if (topicsToBeDeleted.nonEmpty) {
        topicsToBeDeleted.foreach { topic =>
          val partitionReassignmentInProgress =
            controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
          if (partitionReassignmentInProgress)
            // 如果主题正在迁移中,那么将主题标记为非法删除状态
            topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
        }
        // 将主题添加到待删除主题集合中(topicsToBeDeleted)
        topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted)
      }
    } else {
      // 如果不允许删除主题,则删掉zk中/admin/delete_topics下的节点
      for (topic <- topicsToBeDeleted) {
        zkUtils.zkClient.delete(getDeleteTopicPath(topic))
      }
    }
  }
}

调用TopicDeletionManager将分区添加到待删除分区集合如下:

def enqueueTopicsForDeletion(topics: Set[String]) {
  if(isDeleteTopicEnabled) {
    // 将主题添加到待删除主题集合中(topicsToBeDeleted)
    topicsToBeDeleted ++= topics
    // 将分区添加到待删除分区集合中(partitionsToBeDeleted)
    partitionsToBeDeleted ++= topics.flatMap(controllerContext.partitionsForTopic)
    // 主题删除
    resumeDeletions()
  }
}
private def resumeDeletions(): Unit = {
  val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted
  // 要删除的主题列表
  topicsQueuedForDeletion.foreach { topic =>
    // 1、检查主题的所有副本之前是否已经删除成功了(副本状态:ReplicaDeletionSuccessful)
    if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {
      // 1.1、若是,则解除对该主题分区的监听器,删除ZK和Controller缓存中该主题的信息
      completeDeleteTopic(topic)
    } else {
      ...
      // 1.2、如果有副本删除失败了(ReplicaDeletionIneligible),则将状态转移到OfflineReplica,后面重新进行删除
      if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
        markTopicForDeletionRetry(topic)
      }
      ...
    }

    if(isTopicEligibleForDeletion(topic)) {
      // 2、遍历删除主题的所有副本
      onTopicDeletion(Set(topic))
    } else if(isTopicIneligibleForDeletion(topic)) {
      info("Not retrying deletion of topic %s at this time since it is marked ineligible for deletion".format(topic))
    }
  }
}

// 主题删除
private def onTopicDeletion(topics: Set[String]) {
  val partitions = topics.flatMap(controllerContext.partitionsForTopic)
  controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
  val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic)
  topics.foreach { topic =>
    // 分区删除
    onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).keySet)
  }
}

// 分区删除
private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicAndPartition]) {
  val replicasPerPartition = controllerContext.replicasForPartition(partitionsToBeDeleted)
  // 副本删除
  startReplicaDeletion(replicasPerPartition)
}

// 副本删除
private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) {
  replicasForTopicsToBeDeleted.groupBy(_.topic).keys.foreach { topic =>
    val aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic == topic)
    val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic
    val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
    val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas
    // 2.1、副本状态机将挂掉的副本状态转移至ReplicaDeletionIneligible(不可删除)
    replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionIneligible)
    // 2.2、副本状态机将在线的副本状态转移至OfflineReplica(可删除)
    replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica)
    // 2.3、副本状态机将可删除的副本状态转移至ReplicaDeletionStarted
    // 2.4、给broker发送StopReplica请求关闭副本
    // 2.5、设置回调函数处理删除的结果
    controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted,
      new Callbacks.CallbackBuilder().stopReplicaCallback((stopReplicaResponseObj, replicaId) =>
        eventManager.put(controller.TopicDeletionStopReplicaResult(stopReplicaResponseObj, replicaId))).build)
    if (deadReplicasForTopic.nonEmpty) {
      markTopicIneligibleForDeletion(Set(topic))
    }
  }
}


// 副本删除结果处理
case class TopicDeletionStopReplicaResult(stopReplicaResponseObj: AbstractResponse, replicaId: Int) extends ControllerEvent {
  def state = ControllerState.TopicDeletion
  override def process(): Unit = {
    import JavaConverters._
    if (!isActive) return
    val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse]
    val responseMap = stopReplicaResponse.responses.asScala
    val partitionsInError =
      if (stopReplicaResponse.error != Errors.NONE) responseMap.keySet
      else responseMap.filter { case (_, error) => error != Errors.NONE }.keySet
    val replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId))
    // 删除副本失败
    topicDeletionManager.failReplicaDeletion(replicasInError)
    if (replicasInError.size != responseMap.size) {
      val deletedReplicas = responseMap.keySet -- partitionsInError
      // 删除副本成功
      topicDeletionManager.completeReplicaDeletion(deletedReplicas.map(p => PartitionAndReplica(p.topic, p.partition, replicaId)))
    }
  }
}

// 删除副本成功
def completeReplicaDeletion(replicas: Set[PartitionAndReplica]) {
  val successfullyDeletedReplicas = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic))))
  // 2.3.1、删除副本成功,将副本状态设置为ReplicaDeletionSuccessful
  controller.replicaStateMachine.handleStateChanges(successfullyDeletedReplicas, ReplicaDeletionSuccessful)
  // 再次调用resumeDeletions方法
  resumeDeletions()
}

// 删除副本失败
def failReplicaDeletion(replicas: Set[PartitionAndReplica]) {
  if(isDeleteTopicEnabled) {
    val replicasThatFailedToDelete = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic))
    if(replicasThatFailedToDelete.nonEmpty) {
      val topics = replicasThatFailedToDelete.map(_.topic)
      // 2.3.2、删除副本失败,将副本状态设置为ReplicaDeletionIneligible
      controller.replicaStateMachine.handleStateChanges(replicasThatFailedToDelete, ReplicaDeletionIneligible)
      markTopicIneligibleForDeletion(topics)
      // 再次调用resumeDeletions方法
      resumeDeletions()
    }
  }
}


// 所有的副本全部删除成功,解除对该主题分区的监听器,删除ZK和Controller缓存中该主题的信息
private def completeDeleteTopic(topic: String) {
  // 1.1.1、解除分区变化监听器
  controller.deregisterPartitionModificationsListener(topic)
  val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
  // 1.1.2、副本状态机将副本状态转移至NonExistentReplica
  replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica)
  val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)
  // 1.1.3、分区状态机将分区状态先转移至OfflinePartition
  partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition)
  // 1.1.4、分区状态机将分区状态再转移至NonExistentPartition
  partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)
  topicsToBeDeleted -= topic
  partitionsToBeDeleted.retain(_.topic != topic)
  val zkUtils = controllerContext.zkUtils
  // 1.1.5、删除ZK节点/brokers/topics/<TOPIC_NAME>
  zkUtils.zkClient.deleteRecursive(getTopicPath(topic))
  // 1.1.6、删除ZK节点/config/topics/<TOPIC_NAME>
  zkUtils.zkClient.deleteRecursive(getEntityConfigPath(ConfigType.Topic, topic))
  // 1.1.7、删除ZK节点/admin/delete_topics/<TOPIC_NAME>
  zkUtils.zkClient.delete(getDeleteTopicPath(topic))
  // 1.1.8、移除Controller上下文中该主题相关的信息
  controllerContext.removeTopic(topic)
}

private def markTopicForDeletionRetry(topic: String) {
  val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible)
  // 副本状态机将副本状态由ReplicaDeletionIneligible转移至OfflineReplica
  controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica)
}

Controller给Broker发送StopReplica请求停止副本,Broker接受到StopReplica请求后,主要处理逻辑如下:

  1. 修改log目录名称(后缀添加 .uuid-delete)
  2. 将待删除的Log添加到logsToBeDeleted队列中,异步删除

def asyncDelete(topicPartition: TopicPartition): Log = {
  val removedLog: Log = logCreationOrDeletionLock synchronized {
    logs.remove(topicPartition)
  }
  if (removedLog != null) {
    if (cleaner != null) {
      cleaner.abortCleaning(topicPartition)
      cleaner.updateCheckpoints(removedLog.dir.getParentFile)
    }
    // 1、修改log目录名称(后缀添加 .uuid-delete,例如将topic_a-2 改成 topic_a-2.d0de6b09d9604f6d9ab89bf7bd413502-delete)
    val dirName = Log.logDeleteDirName(removedLog.name)
    removedLog.close()
    val renamedDir = new File(removedLog.dir.getParent, dirName)
    val renameSuccessful = removedLog.dir.renameTo(renamedDir)
    if (renameSuccessful) {
      checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
      removedLog.dir = renamedDir
      removedLog.logSegments.foreach(_.updateDir(renamedDir))
  
      // 2、将待删除的Log添加到logsToBeDeleted队列中,异步删除
      logsToBeDeleted.add(removedLog)
      removedLog.removeLogMetrics()
    } else {
      throw new KafkaStorageException("Failed to rename log directory from " + removedLog.dir.getAbsolutePath + " to " + renamedDir.getAbsolutePath)
    }
  }
  removedLog
}


def logDeleteDirName(logName: String): String = {
  val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
  s"$logName.$uniqueId$DeleteDirSuffix"
}

Broker在启动时,会启动一个日志管理器,并开启一个删除日志定时任务,代码如下:

def startup() {
    ...
    // 删除日志定时任务(默认周期60s)
    scheduler.schedule("kafka-delete-logs",
                       deleteLogs _,
                       delay = InitialTaskDelayMs,
                       period = defaultConfig.fileDeleteDelayMs,
                       TimeUnit.MILLISECONDS)
    ...
  }

删除日志流程如下:
  1. 将待删除的Log从logsToBeDeleted队列中取出
  2. 删除Log
    2.1. 遍历删除日志段
     2.1.1. 删除.log文件
     2.1.2. 删除.index文件
     2.1.3. 删除.timeindex文件
     2.1.4. 删除.txnindex文件
    2.2. 清除缓存

代码如下:

private def deleteLogs(): Unit = {
  try {
    var failed = 0
    while (!logsToBeDeleted.isEmpty && failed < logsToBeDeleted.size()) {
      // 1、将待删除的Log从logsToBeDeleted队列中取出
      val removedLog = logsToBeDeleted.take()
      if (removedLog != null) {
        try {
          // 2、删除Log
          removedLog.delete()
          info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")
        } catch {
          case e: Throwable =>
            error(s"Exception in deleting $removedLog. Moving it to the end of the queue.", e)
            failed = failed + 1
            logsToBeDeleted.put(removedLog)
        }
      }
    }
  } catch {
    case e: Throwable => 
      error(s"Exception in kafka-delete-logs thread.", e)
  }
}


private[log] def delete() {
  lock synchronized {
    // 2.1、遍历删除日志段
    logSegments.foreach(_.delete())
    // 2.2、清除缓存
    segments.clear()
    leaderEpochCache.clear()
    Utils.delete(dir)
  }
}

def delete() {
  // 2.1.1、删除.log文件
  val deletedLog = log.delete()
  // 2.1.2、删除.index文件
  val deletedIndex = index.delete
  // 2.1.3、删除.timeindex文件
  val deletedTimeIndex = timeIndex.delete()
  // 2.1.4、删除.txnindex文件
  val deletedTxnIndex = txnIndex.delete()
  ...
}