1、分区状态
分区状态机用来负责集群下所有分区的状态管理,定义了 4 个状态 ,如下 :
- NonExistent
分区不存在或己被删除。
- NewPartition
分区创建后,进入此状态 。此时分区不可用,尚未选举出 leader 和 ISR。
- OnlinePartition
分区 leader 副本被选出后,进入此状态 。分区上线可正常工作。
- OfflinePartition
leader 所在 broker 宕机,分区进入此状态,无法正常工作,会触发选举产生新leader。

2、状态机初始化
流程如下:
- 初始化所有分区的状态
1.1. 遍历所有的分区
1.2. 每个分区是否有leader和isr信息 - 触发状态转移到OnlinePartition
2.1. 剔除将要被删除的topic
2.2. 将OfflinePartition或NewPartition状态的分区,状态转移至OnlinePartition
源码如下:
def startup() {
// 1、初始化所有分区的状态
initializePartitionState()
// 2、触发状态转移到OnlinePartition
triggerOnlinePartitionStateChange()
}
private def initializePartitionState() {
// 1.1、遍历所有的分区
for (topicPartition <- controllerContext.partitionReplicaAssignment.keys) {
// 1.2、每个分区是否有leader和isr信息
controllerContext.partitionLeadershipInfo.get(topicPartition) match {
// 1.2.1、若有leader和isr信息
case Some(currentLeaderIsrAndEpoch) =>
if (controllerContext.liveBrokerIds.contains(currentLeaderIsrAndEpoch.leaderAndIsr.leader))
// 1.2.1.1、Leader在线,则将状态设置为OnlinePartition
partitionState.put(topicPartition, OnlinePartition)
else
// 1.2.1.2、Leader不在线,则将状态设置为OfflinePartition
partitionState.put(topicPartition, OfflinePartition)
// 1.2.2、若没有leader和isr信息, 则将状态设置为NewPartition
case None =>
partitionState.put(topicPartition, NewPartition)
}
}
}
def triggerOnlinePartitionStateChange() {
try {
brokerRequestBatch.newBatch()
for ((topicAndPartition, partitionState) <- partitionState
// 2.1、剔除将要被删除的topic
if !controller.deleteTopicManager.isTopicQueuedUpForDeletion(topicAndPartition.topic)) {
// 2.2、将OfflinePartition或NewPartition状态的分区,状态转移至OnlinePartition
if (partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector,
(new CallbackBuilder).build)
}
brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
} catch {
case e: Throwable => error(...)
}
}
状态转移流程如下:
- 若分区不存在,状态初始化为 NonExistentPartition
- 分区各种状态之间的转移
2.1. 分区前置状态校检
2.2. 状态转移
状态转移源码如下:
private def handleStateChange(topic: String, partition: Int, targetState: PartitionState,leaderSelector: PartitionLeaderSelector, callbacks: Callbacks) {
val topicAndPartition = TopicAndPartition(topic, partition)
// 1、分区当前的状态,若分区不存在,状态初始化为 NonExistentPartition
val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition)
try {
// 2、分区各种状态之间的转移(currState--> targetState)
targetState match {
case NewPartition =>
// 2.1、分区前置状态校检(currState--> NewPartition)
// 2.2、分区状态转移具体逻辑(currState--> NewPartition)
case OnlinePartition =>
// ...
case OfflinePartition =>
// ...
case NonExistentPartition =>
// ...
}
} catch {
case t: Throwable =>
stateChangeLogger.error(...)
}
}
3、状态转移
3.1、状态转移至NewPartition
触发条件:
- 新增主题(TopicChangeListener监听 /brokers/topics)
- 新增主题分区(PartitionModificationsListener 监听 /brokers/topics/<TOPIC_NAME>)
流程如下:
- 分区前置状态校检(前置状态只能是NonExistentPartition)
- 分区状态转移到NewPartition
源码如下:
// 1、分区前置状态校检(前置状态只能是NonExistentPartition)
assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
// 2、分区状态转移到NewPartition
partitionState.put(topicAndPartition, NewPartition)
3.2、状态转移至OnlinePartition
触发条件:
- 新增主题后,状态先转移到NewPartition,再转移到OnlinePartition(AR中存活的第一个副本为Leader)
- 新增主题分区后,状态先转移到NewPartition,再转移到OnlinePartition(AR中存活的第一个副本为Leader)
- 分区状态机初始化时,遍历所有的分区
3.1. 如果分区有 LeaderAndIsr 信息,并且leader是存活的,则将分区状态转移到OnlinePartition
3.2. 如果分区有 LeaderAndIsr 信息,但是leader是不存活的,则先将分区状态转移到OfflinePartition,再转移到OnlinePartition,触发选举(OfflinePartitionLeaderSelector选举器)
3.3. 如果分区没有 LeaderAndIsr 信息,则先将分区状态转移到NewPartition,再转移到OnlinePartition,触发选举(AR中存活的第一个副本) - 分区重分配
4.1. 若新方案不包含当前的leader,状态转移到OnlinePartition,触发选举 (ReassignedPartitionLeaderSelector选举器)
4.2. 若新方案包含当前的leader,但leader所在的broker已下线,状态转移到OnlinePartition,触发选举(ReassignedPartitionLeaderSelector选举器) - 最优副本选举时,状态转移到OnlinePartition,触发选举(PreferredReplicaPartitionLeaderSelector选举器)
- broker下线后(BrokerChangeListener 监听 /brokers/ids),leader在这个broker上的分区,状态先转移到OfflinePartition,再转移到OnlinePartition,触发选举(OfflinePartitionLeaderSelector选举器)
- broker优雅关闭(ControlledShutdown)时,位于这个节点上的leader 副本都会下线,状态转移到 OnlinePartition,触发选举 (ControlledShutdownLeaderSelector选举器)
流程如下:
- 分区前置状态校检(前置状态只能是NewPartition、OnlinePartition、OfflinePartition)
- 分区状态转移,选举出leader、ISR,并更新zk和缓存
2.1. 若是NewPartition-->OnlinePartition ,leader选举(AR中存活的第一个副本)
2.2. 若是OfflinePartition-->OnlinePartition,leader选举(OfflinePartitionLeaderSelector选举器)
2.3. 若是OnlinePartition-->OnlinePartition ,leader选举(ReassignedPartitionLeaderSelector、PreferredReplicaPartitionLeaderSelector、ControlledShutdownLeaderSelector选举器) - 分区状态转移到OnlinePartition
源码如下:
// 1、分区前置状态校检(前置状态只能是NewPartition、OnlinePartition、OfflinePartition)
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
// 2、分区状态转移
partitionState(topicAndPartition) match {
// 2.1、若是NewPartition-->OnlinePartition
case NewPartition =>
initializeLeaderAndIsrForPartition(topicAndPartition)
// 2.2、若是OfflinePartition-->OnlinePartition
case OfflinePartition =>
electLeaderForPartition(topic, partition, leaderSelector)
// 2.3、若是OnlinePartition-->OnlinePartition
case OnlinePartition =>
electLeaderForPartition(topic, partition, leaderSelector)
case _ =>
}
// 3、分区状态转移到OnlinePartition
partitionState.put(topicAndPartition, OnlinePartition)
3.2.1、NewPartition-->OnlinePartition
触发条件:
- 新增主题后,状态先转移到NewPartition,再转移到OnlinePartition
- 新增主题分区后,状态先转移到NewPartition,再转移到OnlinePartition
流程如下:
- AR中存活的第一个副本,选举为leader
- AR中存活的副本集合,为ISR
- zk上创建一个持久节点,/brokers/topics/<TOPIC_NAME>/partitions/<PARTITION_ID>/state
- 写入分区的元数据信息(leader、isr、leader_epoch、controller_epoch、version)
- 更新本地缓存
- 向相关的broker发送LeaderAndIsr请求、所有broker发送UpdateMetadata请求
源码如下:
private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) {
val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))
liveAssignedReplicas.size match {
case 0 =>
throw new StateChangeFailedException(failMsg)
case _ =>
// 1、AR中存活的第一个副本,选举为leader
val leader = liveAssignedReplicas.head
// 2、AR中存活的副本集合,为ISR
val leaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, liveAssignedReplicas.toList), controller.epoch)
try {
// 3、zk上创建一个持久节点,/brokers/topics/<TOPIC_NAME>/partitions/<PARTITION_ID>/state
// 4、写入分区的元数据信息(leader、isr、leader_epoch、controller_epoch、version)
zkUtils.createPersistentPath(
getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
zkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))
// 5、更新本地缓存的分区元数据
controllerContext.partitionLeadershipInfo.put(topicAndPartition, leaderIsrAndControllerEpoch)
// 6、向相关的broker发送LeaderAndIsr请求、所有broker发送UpdateMetadata请求
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(liveAssignedReplicas, topicAndPartition.topic,
topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment)
} catch {
case _: ZkNodeExistsException =>
throw new StateChangeFailedException(failMsg)
}
}
}
3.2.2、OfflinePartition-->OnlinePartition
流程如下:
- 从zk中读取分区元数据(从/brokers/topics/<TOPIC_NAME>/partitions/<PARTITION_ID>/state节点下读取)
- 若zk中的controller epoch比当前缓存中的epoch大,说明当前controller是无效的,退出流程
- 为分区选举leader(使用的OfflinePartitionLeaderSelector选举器)
- 更新zk上的分区元数据
- 更新本地缓存
- 向所有的broker发送请求更新信息
源码如下:
def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) {
val topicAndPartition = TopicAndPartition(topic, partition)
try {
var zookeeperPathUpdateSucceeded: Boolean = false
var newLeaderAndIsr: LeaderAndIsr = null
var replicasForThisPartition: Seq[Int] = Seq.empty[Int]
while(!zookeeperPathUpdateSucceeded) {
// 1、从zk中读取分区元数据(从/brokers/topics/<TOPIC_NAME>/partitions/<PARTITION_ID>/state节点下读取)
val currentLeaderIsrAndEpoch = getLeaderIsrAndEpochOrThrowException(topic, partition)
val currentLeaderAndIsr = currentLeaderIsrAndEpoch.leaderAndIsr
val controllerEpoch = currentLeaderIsrAndEpoch.controllerEpoch
// 2、若zk中的controller epoch比当前缓存中的epoch大,说明当前controller是无效的,退出流程
if (controllerEpoch > controller.epoch) {
throw new StateChangeFailedException(failMsg)
}
// 3、为分区选举leader(使用的OfflinePartitionLeaderSelector选举器)
val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr)
// 4、更新zk上的分区元数据
val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partition,
leaderAndIsr, controller.epoch, currentLeaderAndIsr.zkVersion)
newLeaderAndIsr = leaderAndIsr
newLeaderAndIsr.zkVersion = newVersion
zookeeperPathUpdateSucceeded = updateSucceeded
replicasForThisPartition = replicas
}
// 5、更新本地缓存
val newLeaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)
controllerContext.partitionLeadershipInfo.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch)
val replicas = controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition))
// 6、向所有的broker发送请求更新信息
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
newLeaderIsrAndControllerEpoch, replicas)
} catch {
}
}
3.2.3、OnlinePartition-->OnlinePartition
流程如下:
- 从zk中读取分区元数据(从/brokers/topics/<TOPIC_NAME>/partitions/<PARTITION_ID>/state节点下读取)
- 若zk中的controller epoch比当前缓存中的epoch大,说明当前controller是无效的,退出流程
- 为分区选举leader(****ReassignedPartitionLeaderSelector、PreferredReplicaPartitionLeaderSelector、ControlledShutdownLeaderSelector选举器
- 更新zk上的分区元数据
- 更新本地缓存
- 向所有的broker发送请求更新信息
源码同上
3.3、状态转移至OfflinePartition
触发条件:
- 主题删除后,先转移到OfflinePartition,再转移到NonExistentPartition
- broker下线后(BrokerChangeListener 监听 /brokers/ids),leader在这个broker上的分区,状态先转移到OfflinePartition,再转移到OnlinePartition,触发leader选举(OfflinePartitionLeaderSelector选举器)
流程如下:
- 分区前置状态校检(前置状态只能是NewPartition、OnlinePartition、OfflinePartition)
- 分区状态转移到OfflinePartition
源码如下:
// 1、分区前置状态校检(前置状态只能是NewPartition、OnlinePartition、OfflinePartition)
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OfflinePartition)
// 2、分区状态转移到OfflinePartition
partitionState.put(topicAndPartition, OfflinePartition)
3.4、状态转移至NonExistentPartition
触发条件:
- 主题删除后,先转移到OfflinePartition,再转移到NonExistentPartition
流程如下:
- 分区前置状态校检(前置状态只能是OfflinePartition)
- 分区状态转移到NonExistentPartition
源码如下:
// 1、分区前置状态校检(前置状态只能是OfflinePartition)
assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)
// 2、分区状态转移到NonExistentPartition
partitionState.put(topicAndPartition, NonExistentPartition)