Open Source, Open Future!
  menu
107 文章
ღゝ◡╹)ノ❤️

Kafka分区状态机

1、分区状态

分区状态机用来负责集群下所有分区的状态管理,定义了 4 个状态 ,如下 :

  • NonExistent

分区不存在或己被删除。

  • NewPartition

分区创建后,进入此状态 。此时分区不可用,尚未选举出 leader 和 ISR。

  • OnlinePartition

分区 leader 副本被选出后,进入此状态 。分区上线可正常工作。

  • OfflinePartition

leader 所在 broker 宕机,分区进入此状态,无法正常工作,会触发选举产生新leader。

image-20241011233215983

2、状态机初始化

流程如下:

  1. 初始化所有分区的状态
    1.1. 遍历所有的分区
    1.2. 每个分区是否有leader和isr信息1.2.1. 若有leader和isr信息   1.2.1.1. leader在线,则将状态设转移至OnlinePartition   1.2.1.1. leader不在线,则将状态转移至OfflinePartition1.2.2、若没有leader和isr信息, 则将状态转移至NewPartition
  2. 触发状态转移到OnlinePartition
    2.1. 剔除将要被删除的topic
    2.2. 将OfflinePartition或NewPartition状态的分区,状态转移至OnlinePartition

源码如下:

def startup() {
  // 1、初始化所有分区的状态
  initializePartitionState()
  // 2、触发状态转移到OnlinePartition
  triggerOnlinePartitionStateChange()
}

private def initializePartitionState() {
  // 1.1、遍历所有的分区
  for (topicPartition <- controllerContext.partitionReplicaAssignment.keys) {
    // 1.2、每个分区是否有leader和isr信息
    controllerContext.partitionLeadershipInfo.get(topicPartition) match {
      // 1.2.1、若有leader和isr信息
      case Some(currentLeaderIsrAndEpoch) =>
        if (controllerContext.liveBrokerIds.contains(currentLeaderIsrAndEpoch.leaderAndIsr.leader))
          // 1.2.1.1、Leader在线,则将状态设置为OnlinePartition
          partitionState.put(topicPartition, OnlinePartition)
        else
          // 1.2.1.2、Leader不在线,则将状态设置为OfflinePartition
          partitionState.put(topicPartition, OfflinePartition)
      // 1.2.2、若没有leader和isr信息, 则将状态设置为NewPartition
      case None =>
        partitionState.put(topicPartition, NewPartition)
    }
  }
}


def triggerOnlinePartitionStateChange() {
  try {
    brokerRequestBatch.newBatch()
    for ((topicAndPartition, partitionState) <- partitionState
      // 2.1、剔除将要被删除的topic
         if !controller.deleteTopicManager.isTopicQueuedUpForDeletion(topicAndPartition.topic)) {
      // 2.2、将OfflinePartition或NewPartition状态的分区,状态转移至OnlinePartition
      if (partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
        handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector,
          (new CallbackBuilder).build)
    }
    brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
  } catch {
    case e: Throwable => error(...)
  }
}

状态转移流程如下:

  1. 若分区不存在,状态初始化为 NonExistentPartition
  2. 分区各种状态之间的转移
    2.1. 分区前置状态校检
    2.2. 状态转移

状态转移源码如下:

private def handleStateChange(topic: String, partition: Int, targetState: PartitionState,leaderSelector: PartitionLeaderSelector, callbacks: Callbacks) {
  val topicAndPartition = TopicAndPartition(topic, partition)
  // 1、分区当前的状态,若分区不存在,状态初始化为 NonExistentPartition
  val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition)
  try {
    // 2、分区各种状态之间的转移(currState--> targetState)
    targetState match {
      case NewPartition =>
          // 2.1、分区前置状态校检(currState--> NewPartition)
          // 2.2、分区状态转移具体逻辑(currState--> NewPartition)
      case OnlinePartition =>
          // ...
      case OfflinePartition =>
          // ...
      case NonExistentPartition =>
          // ...
    }
  } catch {
    case t: Throwable =>
      stateChangeLogger.error(...)
  }
}

3、状态转移

3.1、状态转移至NewPartition

触发条件:

  1. 新增主题(TopicChangeListener监听 /brokers/topics)
  2. 新增主题分区(PartitionModificationsListener 监听 /brokers/topics/<TOPIC_NAME>)

流程如下:

  1. 分区前置状态校检(前置状态只能是NonExistentPartition)
  2. 分区状态转移到NewPartition

源码如下:

// 1、分区前置状态校检(前置状态只能是NonExistentPartition)
assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
// 2、分区状态转移到NewPartition
partitionState.put(topicAndPartition, NewPartition)

3.2、状态转移至OnlinePartition

触发条件:

  1. 新增主题后,状态先转移到NewPartition,再转移到OnlinePartition(AR中存活的第一个副本为Leader)
  2. 新增主题分区后,状态先转移到NewPartition,再转移到OnlinePartition(AR中存活的第一个副本为Leader)
  3. 分区状态机初始化时,遍历所有的分区
    3.1. 如果分区有 LeaderAndIsr 信息,并且leader是存活的,则将分区状态转移到OnlinePartition
    3.2. 如果分区有 LeaderAndIsr 信息,但是leader是不存活的,则先将分区状态转移到OfflinePartition,再转移到OnlinePartition,触发选举(OfflinePartitionLeaderSelector选举器)
    3.3. 如果分区没有 LeaderAndIsr 信息,则先将分区状态转移到NewPartition,再转移到OnlinePartition,触发选举(AR中存活的第一个副本)
  4. 分区重分配
    4.1. 若新方案不包含当前的leader,状态转移到OnlinePartition,触发选举 (ReassignedPartitionLeaderSelector选举器)
    4.2. 若新方案包含当前的leader,但leader所在的broker已下线,状态转移到OnlinePartition,触发选举(ReassignedPartitionLeaderSelector选举器)
  5. 最优副本选举时,状态转移到OnlinePartition,触发选举(PreferredReplicaPartitionLeaderSelector选举器)
  6. broker下线后(BrokerChangeListener 监听 /brokers/ids),leader在这个broker上的分区,状态先转移到OfflinePartition,再转移到OnlinePartition,触发选举(OfflinePartitionLeaderSelector选举器)
  7. broker优雅关闭(ControlledShutdown)时,位于这个节点上的leader 副本都会下线,状态转移到 OnlinePartition,触发选举 (ControlledShutdownLeaderSelector选举器)

流程如下:

  1. 分区前置状态校检(前置状态只能是NewPartition、OnlinePartition、OfflinePartition)
  2. 分区状态转移,选举出leader、ISR,并更新zk和缓存
    2.1. 若是NewPartition-->OnlinePartition ,leader选举(AR中存活的第一个副本)
    2.2. 若是OfflinePartition-->OnlinePartition,leader选举(OfflinePartitionLeaderSelector选举器)
    2.3. 若是OnlinePartition-->OnlinePartition ,leader选举(ReassignedPartitionLeaderSelector、PreferredReplicaPartitionLeaderSelector、ControlledShutdownLeaderSelector选举器)
  3. 分区状态转移到OnlinePartition

源码如下:

// 1、分区前置状态校检(前置状态只能是NewPartition、OnlinePartition、OfflinePartition)
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
// 2、分区状态转移
partitionState(topicAndPartition) match {
  // 2.1、若是NewPartition-->OnlinePartition
  case NewPartition =>
  initializeLeaderAndIsrForPartition(topicAndPartition)
  // 2.2、若是OfflinePartition-->OnlinePartition 
  case OfflinePartition =>
  electLeaderForPartition(topic, partition, leaderSelector)
  // 2.3、若是OnlinePartition-->OnlinePartition 
  case OnlinePartition =>
  electLeaderForPartition(topic, partition, leaderSelector)
  case _ => 
}
// 3、分区状态转移到OnlinePartition
partitionState.put(topicAndPartition, OnlinePartition)

3.2.1、NewPartition-->OnlinePartition

触发条件:

  1. 新增主题后,状态先转移到NewPartition,再转移到OnlinePartition
  2. 新增主题分区后,状态先转移到NewPartition,再转移到OnlinePartition

流程如下:

  1. AR中存活的第一个副本,选举为leader
  2. AR中存活的副本集合,为ISR
  3. zk上创建一个持久节点,/brokers/topics/<TOPIC_NAME>/partitions/<PARTITION_ID>/state
  4. 写入分区的元数据信息(leader、isr、leader_epoch、controller_epoch、version)
  5. 更新本地缓存
  6. 向相关的broker发送LeaderAndIsr请求、所有broker发送UpdateMetadata请求

源码如下:

private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) {
  val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
  val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))
  liveAssignedReplicas.size match {
    case 0 =>
      throw new StateChangeFailedException(failMsg)
    case _ =>
      // 1、AR中存活的第一个副本,选举为leader
      val leader = liveAssignedReplicas.head
      // 2、AR中存活的副本集合,为ISR
      val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, liveAssignedReplicas.toList), controller.epoch)
      try {
        // 3、zk上创建一个持久节点,/brokers/topics/<TOPIC_NAME>/partitions/<PARTITION_ID>/state
        // 4、写入分区的元数据信息(leader、isr、leader_epoch、controller_epoch、version)
        zkUtils.createPersistentPath(
          getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
          zkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))

        // 5、更新本地缓存的分区元数据
        controllerContext.partitionLeadershipInfo.put(topicAndPartition, leaderIsrAndControllerEpoch)
        // 6、向相关的broker发送LeaderAndIsr请求、所有broker发送UpdateMetadata请求
        brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,
          topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment)
      } catch {
        case _: ZkNodeExistsException =>
          throw new StateChangeFailedException(failMsg)
      }
  }
}

3.2.2、OfflinePartition-->OnlinePartition

流程如下:

  1. 从zk中读取分区元数据(从/brokers/topics/<TOPIC_NAME>/partitions/<PARTITION_ID>/state节点下读取)
  2. 若zk中的controller epoch比当前缓存中的epoch大,说明当前controller是无效的,退出流程
  3. 为分区选举leader(使用的OfflinePartitionLeaderSelector选举器)
  4. 更新zk上的分区元数据
  5. 更新本地缓存
  6. 向所有的broker发送请求更新信息

源码如下:

def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) {
  val topicAndPartition = TopicAndPartition(topic, partition)
  try {
    var zookeeperPathUpdateSucceeded: Boolean = false
    var newLeaderAndIsr: LeaderAndIsr = null
    var replicasForThisPartition: Seq[Int] = Seq.empty[Int]
    while(!zookeeperPathUpdateSucceeded) {
      // 1、从zk中读取分区元数据(从/brokers/topics/<TOPIC_NAME>/partitions/<PARTITION_ID>/state节点下读取)
      val currentLeaderIsrAndEpoch = getLeaderIsrAndEpochOrThrowException(topic, partition)
      val currentLeaderAndIsr = currentLeaderIsrAndEpoch.leaderAndIsr
      val controllerEpoch = currentLeaderIsrAndEpoch.controllerEpoch
  
      // 2、若zk中的controller epoch比当前缓存中的epoch大,说明当前controller是无效的,退出流程
      if (controllerEpoch > controller.epoch) {
        throw new StateChangeFailedException(failMsg)
      }
      // 3、为分区选举leader(使用的OfflinePartitionLeaderSelector选举器)
      val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr)
      // 4、更新zk上的分区元数据
      val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partition,
                                                                              leaderAndIsr, controller.epoch, currentLeaderAndIsr.zkVersion)
      newLeaderAndIsr = leaderAndIsr
      newLeaderAndIsr.zkVersion = newVersion
      zookeeperPathUpdateSucceeded = updateSucceeded
      replicasForThisPartition = replicas
    }
    // 5、更新本地缓存
    val newLeaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)
    controllerContext.partitionLeadershipInfo.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch)
    val replicas = controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition))
    // 6、向所有的broker发送请求更新信息
    brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
                                                        newLeaderIsrAndControllerEpoch, replicas)
  } catch {
  }
}

3.2.3、OnlinePartition-->OnlinePartition

流程如下:

  1. 从zk中读取分区元数据(从/brokers/topics/<TOPIC_NAME>/partitions/<PARTITION_ID>/state节点下读取)
  2. 若zk中的controller epoch比当前缓存中的epoch大,说明当前controller是无效的,退出流程
  3. 为分区选举leader(****ReassignedPartitionLeaderSelector、PreferredReplicaPartitionLeaderSelector、ControlledShutdownLeaderSelector选举器
  4. 更新zk上的分区元数据
  5. 更新本地缓存
  6. 向所有的broker发送请求更新信息

源码同上


3.3、状态转移至OfflinePartition

触发条件:

  1. 主题删除后,先转移到OfflinePartition,再转移到NonExistentPartition
  2. broker下线后(BrokerChangeListener 监听 /brokers/ids),leader在这个broker上的分区,状态先转移到OfflinePartition,再转移到OnlinePartition,触发leader选举(OfflinePartitionLeaderSelector选举器)

流程如下:

  1. 分区前置状态校检(前置状态只能是NewPartition、OnlinePartition、OfflinePartition)
  2. 分区状态转移到OfflinePartition

源码如下:

// 1、分区前置状态校检(前置状态只能是NewPartition、OnlinePartition、OfflinePartition)
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OfflinePartition)
// 2、分区状态转移到OfflinePartition
partitionState.put(topicAndPartition, OfflinePartition)

3.4、状态转移至NonExistentPartition

触发条件:

  • 主题删除后,先转移到OfflinePartition,再转移到NonExistentPartition

流程如下:

  1. 分区前置状态校检(前置状态只能是OfflinePartition)
  2. 分区状态转移到NonExistentPartition

源码如下:

// 1、分区前置状态校检(前置状态只能是OfflinePartition)
assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)
// 2、分区状态转移到NonExistentPartition
partitionState.put(topicAndPartition, NonExistentPartition)