1、副本状态
当前, Kafka 为副本定义了 7 种状态以及每个状态之间的流转规则。这些状态分别如下:
- NewReplica
Controller 创建副本时的最初状态 。当处在这个状态时 ,副本只能成为follower 副本 。
- OnlineReplica
启动副本后变更为该状态。在该状态下,副本既可以成为 follower 副本也可以成为 leader 副本。
- OfflineReplica
一旦副本所在 broker 崩溃,该副本将变更为该状态 。
- ReplicaDeletionStarted
若开启了 topic 删除操作, topic下所有分区的所有副本都会被删除 。 此时副本进入该状态 。
- ReplicaDeletionSuccessful
若副本成功响应了删除副本请求,则进入该状态。
- ReplicaDeletionlneligible
若副本删除失败,则进入该状态 。
- NonExistentReplica
若副本被成功删除,则进入该状态 。

2、状态机初始化
流程如下:
- 初始化所有副本的状态
1.1. 若副本是存活的,则将状态转移至OnlineReplica
1.2. 若副本不在了,则将状态移至ReplicaDeletionIneligible - 将所有存活的副本状态移至OnlineReplica
源码如下:
def startup() {
// 1、初始化所有副本的状态
initializeReplicaState()
// 2、将存活的副本状态转移至OnlineReplica
handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)
}
private def initializeReplicaState() {
for((topicPartition, assignedReplicas) <- controllerContext.partitionReplicaAssignment) {
val topic = topicPartition.topic
val partition = topicPartition.partition
assignedReplicas.foreach { replicaId =>
val partitionAndReplica = PartitionAndReplica(topic, partition, replicaId)
// 1.1、若副本是存活的,则将状态转移至OnlineReplica
if (controllerContext.liveBrokerIds.contains(replicaId))
replicaState.put(partitionAndReplica, OnlineReplica)
else
// 1.2、若副本不在了,则将状态转移至ReplicaDeletionIneligible
replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
}
}
}
def handleStateChanges(...) {
if(replicas.nonEmpty) {
try {
brokerRequestBatch.newBatch()
// 2.1、副本状态转移
replicas.foreach(r => handleStateChange(r, targetState, callbacks))
// 2.2、向broker发送请求
brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
}catch {
case e: Throwable => error(...)
}
}
}
状态转移流程如下:
- 若副本不存在,状态初始化为 NonExistentReplica
- 副本各种状态之间的转移
2.1. 副本前置状态校检
2.2. 副本状态转移
状态转移源码如下:
def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState, callbacks: Callbacks) {
val topic = partitionAndReplica.topic
val partition = partitionAndReplica.partition
val replicaId = partitionAndReplica.replica
val topicAndPartition = TopicAndPartition(topic, partition)
// 1、副本当前的状态,若副本不存在,状态初始化为 NonExistentReplica
val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)
try {
val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
// 2、副本各种状态之间的转移(currState--> targetState)
targetState match {
case NewReplica =>
// 2.1、副本前置状态校检(currState--> NewReplica)
// 2.2、副本状态转移具体逻辑(currState--> NewReplica)
case ReplicaDeletionStarted =>
...
case ReplicaDeletionIneligible =>
...
case ReplicaDeletionSuccessful =>
...
case NonExistentReplica =>
...
case OnlineReplica =>
...
case OfflineReplica =>
...
}
}
catch {
case t: Throwable =>
stateChangeLogger.error(...)
}
}
3、状态转移
3.1、状态转移至NewReplica
触发条件:
- 新增主题(TopicChangeListener监听 /brokers/topics)
- 新增主题分区(PartitionModificationsListener 监听 /brokers/topics/<TOPIC_NAME>)
- 新分配的副本
流程如下:
- 副本前置状态校检(前置状态只能是NonExistentReplica)
- 从zk(/config/topics/<TOPIC_NAME>/<PARTITION_ID>/state)中获取分区的元数据(leader、ISR、epoch)
- 对读取的结果进行处理
3.1. 当有数据时
3.1.1. 若副本是leader,抛出异常
3.1.2. 若副本不是leader,向该副本所在的broker发送LeaderAndIsr请求,向所有的broker发送UpdateMetadata请求 - 副本状态转移到NewReplica
源码如下:
// 1、副本前置状态校检(前置状态只能是NonExistentReplica)
ssertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState)
// 2、从zk(/config/topics/<TOPIC_NAME>/<PARTITION_ID>/state)中获取分区的元数据(leader、ISR、epoch)
val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
// 3、对读取的结果进行处理
leaderIsrAndControllerEpochOpt match {
// 3.1、当有数据时
case Some(leaderIsrAndControllerEpoch) =>
// 3.1.1、若副本是leader,抛出异常
if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)
throw new StateChangeFailedException(...)
// 3.1.2、若副本不是leader,向该副本所在的broker发送LeaderAndIsr请求,向所有的broker发送UpdateMetadata请求
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
topic, partition, leaderIsrAndControllerEpoch,
replicaAssignment
// 3.2、当没有数据时,继续后面的流程
case None =>
}
// 4、副本状态转移到NewReplica
replicaState.put(partitionAndReplica, NewReplica)
3.2、状态转移至OnlineReplica
触发条件:
- 新增主题后,状态先转移到NewReplica,再转移到OnlineReplica
- 新增主题分区后,状态先转移到NewReplica,再转移到OnlineReplica
- broker启动后,将该节点上的副本状态转移到OnlineReplica
- 分区重分配,RAR中的副本状态转移到OnlineReplica
- 副本状态机启动初始化时,将存活的副本状态转变为OnlineReplica
流程如下:
- 副本前置状态校检(前置状态只能是NewReplica、OnlineReplica、OfflineReplica、ReplicaDeletionIneligible)
- 副本状态转移
2.1. 如果副本当前状态是NewReplica
2.1.1. 从缓存中拿到分区的副本列表
2.1.2. 若副本列表中不包含当前副本,则将该副本加入到副本列表中
2.2. 如果副本当前状态是其他状态
2.2.1. 获取该分区的元数据
2.2.2. 如果分区元数据存在,向该副本所在的broker发送LeaderAndIsr请求
3、副本状态转移到OnlineReplica
源码如下:
// 1、副本前置状态校检(前置状态只能是NewReplica、OnlineReplica、OfflineReplica、ReplicaDeletionIneligible)
assertValidPreviousStates(partitionAndReplica, List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
// 2、副本状态转移
replicaState(partitionAndReplica) match {
// 2.1、如果副本当前状态是NewReplica
case NewReplica =>
// 2.1.1、从缓存中拿到分区的副本列表
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
// 2.1.2、若副本列表中不包含当前副本,则将该副本加入到副本列表中
if (!currentAssignedReplicas.contains(replicaId))
controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
// 2.2、如果副本当前状态是其他状态
case _ =>
// 2.2.1、获取该分区的元数据
controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
// 2.2.2、如果分区元数据存在,向该副本所在的broker发送LeaderAndIsr请求
case Some(leaderIsrAndControllerEpoch) =>
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,replicaAssignment)
replicaState.put(partitionAndReplica, OnlineReplica)
case None =>
}
}
// 3、副本状态转移到OnlineReplica
replicaState.put(partitionAndReplica, OnlineReplica)
3.3、状态转移至OfflineReplica
触发条件:
- 删除主题
- broker掉线后,该节点上的副本状态转移到OfflineReplica
- broker优雅关闭(ControlledShutdown)时,该节点上的副本状态转移到OfflineReplica
- 分区重分配,需要下线的副本,状态转移到OfflineReplica
- 删除失败的副本,状态转移到OfflineReplica,重新进行删除
流程如下:
- 副本前置状态校检(前置状态只能是NewReplica、OnlineReplica、OfflineReplica、ReplicaDeletionIneligible)
- 向副本所在的broker发送StopReplicaRequest请求,停止副本同步
- 从ISR中移除这个副本,并更新zk节点(/brokers/topics//partitions//state)
- 给剩余的其他副本所在的broker发送LeaderAndIsr请求更新元数据
源码如下:
// 1、副本前置状态校检(前置状态只能是NewReplica、OnlineReplica、OfflineReplica、ReplicaDeletionIneligible)
assertValidPreviousStates(partitionAndReplica,List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
// 2、向副本所在的broker发送StopReplicaRequest请求,停止副本同步
brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)
val leaderAndIsrIsEmpty: Boolean =
controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
case Some(_) =>
// 3、从ISR中移除这个副本,并更新zk节点(/brokers/topics/<TOPIC_NAME>/partitions/<PARTITION_ID>/state)
controller.removeReplicaFromIsr(topic, partition, replicaId) match {
case Some(updatedLeaderIsrAndControllerEpoch) =>
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
if (!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) {
// 4、给剩余的其他副本所在的broker发送LeaderAndIsr请求
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId),
topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
}
// 5、副本状态转移到OfflineReplica
replicaState.put(partitionAndReplica, OfflineReplica)
false
case None =>
true
}
case None =>
true
}
3.4、状态转移至ReplicaDeletionStarted
触发条件:
- 删除主题
- 分区重分配,要删掉的副本
流程如下:
- 副本前置状态校检(前置状态只能是OfflineReplica)
- 副本状态转移到ReplicaDeletionStarted
- 给副本所在的broker发送StopReplica请求
源码如下:
// 1、副本前置状态校检(前置状态只能是OfflineReplica)
assertValidPreviousStates(partitionAndReplica, List(OfflineReplica), targetState)
// 2、副本状态转移到ReplicaDeletionStarted
replicaState.put(partitionAndReplica, ReplicaDeletionStarted)
// 3、给该副本所在的broker发送StopReplica请求,并设置deletePartition=true
brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true,
callbacks.stopReplicaResponseCallback)
3.5、状态转移至ReplicaDeletionSuccessful
触发条件:
- 删除主题
- 分区重分配,要删掉的副本
流程如下:
- 副本前置状态校检(前置状态只能是ReplicaDeletionStarted)
- 副本状态转移到ReplicaDeletionSuccessful
源码如下:
// 1、副本前置状态校检(前置状态只能是ReplicaDeletionStarted)
assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
// 2、副本状态转移到ReplicaDeletionSuccessful
replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful)
3.6、状态转移至ReplicaDeletionIneligible
触发条件:
- 删除主题
- 分区重分配,要删掉的副本
流程如下:
- 前置状态校检(前置状态只能是ReplicaDeletionStarted)
- 副本状态转移到ReplicaDeletionIneligible
源码如下:
// 1、副本前置状态校检(前置状态只能是ReplicaDeletionStarted)
assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
// 2、副本状态转移到ReplicaDeletionIneligible
replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
3.7、状态转移至NonExistentReplica
触发条件:
- 删除主题
- 分区重分配,要删掉的副本
流程如下:
- 副本前置状态校检(前置状态只能是ReplicaDeletionStarted)
- 从缓存中删掉副本
源码如下:
// 1、副本前置状态校检(前置状态只能是ReplicaDeletionSuccessful)
assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionSuccessful), targetState)
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
// 2、从缓存中删掉副本
controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
replicaState.remove(partitionAndReplica)