Open Source, Open Future!
  menu
107 文章
ღゝ◡╹)ノ❤️

kafka副本状态机

1、副本状态

当前, Kafka 为副本定义了 7 种状态以及每个状态之间的流转规则。这些状态分别如下:

  • NewReplica

Controller 创建副本时的最初状态 。当处在这个状态时 ,副本只能成为follower 副本 。

  • OnlineReplica

启动副本后变更为该状态。在该状态下,副本既可以成为 follower 副本也可以成为 leader 副本。

  • OfflineReplica

一旦副本所在 broker 崩溃,该副本将变更为该状态 。

  • ReplicaDeletionStarted

若开启了 topic 删除操作, topic下所有分区的所有副本都会被删除 。 此时副本进入该状态 。

  • ReplicaDeletionSuccessful

若副本成功响应了删除副本请求,则进入该状态。

  • ReplicaDeletionlneligible

若副本删除失败,则进入该状态 。

  • NonExistentReplica

若副本被成功删除,则进入该状态 。


image-20241011233215983

2、状态机初始化

流程如下:

  1. 初始化所有副本的状态
    1.1. 若副本是存活的,则将状态转移至OnlineReplica
    1.2. 若副本不在了,则将状态移至ReplicaDeletionIneligible
  2. 将所有存活的副本状态移至OnlineReplica

源码如下:

def startup() {
  // 1、初始化所有副本的状态
  initializeReplicaState()
  // 2、将存活的副本状态转移至OnlineReplica
  handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)
}

private def initializeReplicaState() {
  for((topicPartition, assignedReplicas) <- controllerContext.partitionReplicaAssignment) {
    val topic = topicPartition.topic
    val partition = topicPartition.partition
    assignedReplicas.foreach { replicaId =>
      val partitionAndReplica = PartitionAndReplica(topic, partition, replicaId)
      // 1.1、若副本是存活的,则将状态转移至OnlineReplica
      if (controllerContext.liveBrokerIds.contains(replicaId))
        replicaState.put(partitionAndReplica, OnlineReplica)
      else
      // 1.2、若副本不在了,则将状态转移至ReplicaDeletionIneligible
        replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
    }
  }
}

def handleStateChanges(...) {
  if(replicas.nonEmpty) {
    try {
      brokerRequestBatch.newBatch()
      // 2.1、副本状态转移
      replicas.foreach(r => handleStateChange(r, targetState, callbacks))
      // 2.2、向broker发送请求
      brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
    }catch {
      case e: Throwable => error(...)
    }
  }
}

状态转移流程如下:

  1. 若副本不存在,状态初始化为 NonExistentReplica
  2. 副本各种状态之间的转移
    2.1. 副本前置状态校检
    2.2. 副本状态转移

状态转移源码如下:

def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState, callbacks: Callbacks) {
  val topic = partitionAndReplica.topic
  val partition = partitionAndReplica.partition
  val replicaId = partitionAndReplica.replica
  val topicAndPartition = TopicAndPartition(topic, partition)
  
  // 1、副本当前的状态,若副本不存在,状态初始化为 NonExistentReplica
  val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)
  try {
    val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
  
    // 2、副本各种状态之间的转移(currState--> targetState)
    targetState match {
      case NewReplica =>
          // 2.1、副本前置状态校检(currState--> NewReplica)
          // 2.2、副本状态转移具体逻辑(currState--> NewReplica)
      case ReplicaDeletionStarted =>
          ...
      case ReplicaDeletionIneligible =>
          ...
      case ReplicaDeletionSuccessful =>
          ...
      case NonExistentReplica =>
          ...
      case OnlineReplica =>
          ...
      case OfflineReplica =>
          ...
    }
  }
  catch {
    case t: Throwable =>
      stateChangeLogger.error(...)
  }
}

3、状态转移

3.1、状态转移至NewReplica

触发条件:

  1. 新增主题(TopicChangeListener监听 /brokers/topics)
  2. 新增主题分区(PartitionModificationsListener 监听 /brokers/topics/<TOPIC_NAME>)
  3. 新分配的副本

流程如下:

  1. 副本前置状态校检(前置状态只能是NonExistentReplica)
  2. 从zk(/config/topics/<TOPIC_NAME>/<PARTITION_ID>/state)中获取分区的元数据(leader、ISR、epoch)
  3. 对读取的结果进行处理
    3.1. 当有数据时
     3.1.1. 若副本是leader,抛出异常
     3.1.2. 若副本不是leader,向该副本所在的broker发送LeaderAndIsr请求,向所有的broker发送UpdateMetadata请求
  4. 副本状态转移到NewReplica

源码如下:

// 1、副本前置状态校检(前置状态只能是NonExistentReplica)
ssertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState)
// 2、从zk(/config/topics/<TOPIC_NAME>/<PARTITION_ID>/state)中获取分区的元数据(leader、ISR、epoch)
val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
// 3、对读取的结果进行处理
leaderIsrAndControllerEpochOpt match {
  // 3.1、当有数据时
  case Some(leaderIsrAndControllerEpoch) =>
    // 3.1.1、若副本是leader,抛出异常
    if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)
      throw new StateChangeFailedException(...)
    // 3.1.2、若副本不是leader,向该副本所在的broker发送LeaderAndIsr请求,向所有的broker发送UpdateMetadata请求
    brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
                                                        topic, partition, leaderIsrAndControllerEpoch,
                                                        replicaAssignment
  // 3.2、当没有数据时,继续后面的流程               
  case None => 
}
// 4、副本状态转移到NewReplica
replicaState.put(partitionAndReplica, NewReplica)

3.2、状态转移至OnlineReplica

触发条件:

  1. 新增主题后,状态先转移到NewReplica,再转移到OnlineReplica
  2. 新增主题分区后,状态先转移到NewReplica,再转移到OnlineReplica
  3. broker启动后,将该节点上的副本状态转移到OnlineReplica
  4. 分区重分配,RAR中的副本状态转移到OnlineReplica
  5. 副本状态机启动初始化时,将存活的副本状态转变为OnlineReplica

流程如下:

  1. 副本前置状态校检(前置状态只能是NewReplica、OnlineReplica、OfflineReplica、ReplicaDeletionIneligible)
  2. 副本状态转移
    2.1. 如果副本当前状态是NewReplica
     2.1.1. 从缓存中拿到分区的副本列表
     2.1.2. 若副本列表中不包含当前副本,则将该副本加入到副本列表中
    2.2. 如果副本当前状态是其他状态
     2.2.1. 获取该分区的元数据
     2.2.2. 如果分区元数据存在,向该副本所在的broker发送LeaderAndIsr请求
    3、副本状态转移到OnlineReplica

源码如下:

// 1、副本前置状态校检(前置状态只能是NewReplica、OnlineReplica、OfflineReplica、ReplicaDeletionIneligible)
assertValidPreviousStates(partitionAndReplica, List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
// 2、副本状态转移
replicaState(partitionAndReplica) match {
   // 2.1、如果副本当前状态是NewReplica
  case NewReplica =>
     // 2.1.1、从缓存中拿到分区的副本列表
    val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
     // 2.1.2、若副本列表中不包含当前副本,则将该副本加入到副本列表中
    if (!currentAssignedReplicas.contains(replicaId))
      controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
   // 2.2、如果副本当前状态是其他状态
  case _ =>
     // 2.2.1、获取该分区的元数据
    controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
     // 2.2.2、如果分区元数据存在,向该副本所在的broker发送LeaderAndIsr请求
      case Some(leaderIsrAndControllerEpoch) =>
        brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,replicaAssignment)
        replicaState.put(partitionAndReplica, OnlineReplica)
      case None => 
    }
}
// 3、副本状态转移到OnlineReplica
replicaState.put(partitionAndReplica, OnlineReplica)

3.3、状态转移至OfflineReplica

触发条件:

  1. 删除主题
  2. broker掉线后,该节点上的副本状态转移到OfflineReplica
  3. broker优雅关闭(ControlledShutdown)时,该节点上的副本状态转移到OfflineReplica
  4. 分区重分配,需要下线的副本,状态转移到OfflineReplica
  5. 删除失败的副本,状态转移到OfflineReplica,重新进行删除

流程如下:

  1. 副本前置状态校检(前置状态只能是NewReplica、OnlineReplica、OfflineReplica、ReplicaDeletionIneligible)
  2. 向副本所在的broker发送StopReplicaRequest请求,停止副本同步
  3. 从ISR中移除这个副本,并更新zk节点(/brokers/topics//partitions//state)
  4. 给剩余的其他副本所在的broker发送LeaderAndIsr请求更新元数据

源码如下:

// 1、副本前置状态校检(前置状态只能是NewReplica、OnlineReplica、OfflineReplica、ReplicaDeletionIneligible)
assertValidPreviousStates(partitionAndReplica,List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
// 2、向副本所在的broker发送StopReplicaRequest请求,停止副本同步
brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)
val leaderAndIsrIsEmpty: Boolean =
  controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
    case Some(_) =>
      // 3、从ISR中移除这个副本,并更新zk节点(/brokers/topics/<TOPIC_NAME>/partitions/<PARTITION_ID>/state)
      controller.removeReplicaFromIsr(topic, partition, replicaId) match {
        case Some(updatedLeaderIsrAndControllerEpoch) =>
          val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
          if (!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) {
            // 4、给剩余的其他副本所在的broker发送LeaderAndIsr请求
            brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId),
              topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
          }
          // 5、副本状态转移到OfflineReplica
          replicaState.put(partitionAndReplica, OfflineReplica)
          false
        case None =>
          true
      }
    case None =>
      true
  }

3.4、状态转移至ReplicaDeletionStarted

触发条件:

  1. 删除主题
  2. 分区重分配,要删掉的副本

流程如下:

  1. 副本前置状态校检(前置状态只能是OfflineReplica)
  2. 副本状态转移到ReplicaDeletionStarted
  3. 给副本所在的broker发送StopReplica请求

源码如下:

// 1、副本前置状态校检(前置状态只能是OfflineReplica)
assertValidPreviousStates(partitionAndReplica, List(OfflineReplica), targetState)
// 2、副本状态转移到ReplicaDeletionStarted
replicaState.put(partitionAndReplica, ReplicaDeletionStarted)
// 3、给该副本所在的broker发送StopReplica请求,并设置deletePartition=true
brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true,
  callbacks.stopReplicaResponseCallback)

3.5、状态转移至ReplicaDeletionSuccessful

触发条件:

  1. 删除主题
  2. 分区重分配,要删掉的副本

流程如下:

  1. 副本前置状态校检(前置状态只能是ReplicaDeletionStarted)
  2. 副本状态转移到ReplicaDeletionSuccessful

源码如下:

// 1、副本前置状态校检(前置状态只能是ReplicaDeletionStarted)
assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
// 2、副本状态转移到ReplicaDeletionSuccessful
replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful)

3.6、状态转移至ReplicaDeletionIneligible

触发条件:

  1. 删除主题
  2. 分区重分配,要删掉的副本

流程如下:

  1. 前置状态校检(前置状态只能是ReplicaDeletionStarted)
  2. 副本状态转移到ReplicaDeletionIneligible

源码如下:

// 1、副本前置状态校检(前置状态只能是ReplicaDeletionStarted)
assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
// 2、副本状态转移到ReplicaDeletionIneligible
replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)

3.7、状态转移至NonExistentReplica

触发条件:

  1. 删除主题
  2. 分区重分配,要删掉的副本

流程如下:

  1. 副本前置状态校检(前置状态只能是ReplicaDeletionStarted)
  2. 从缓存中删掉副本

源码如下:

// 1、副本前置状态校检(前置状态只能是ReplicaDeletionSuccessful)
assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionSuccessful), targetState)
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
// 2、从缓存中删掉副本
controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
replicaState.remove(partitionAndReplica)