Broker 启动
Broker在启动时,会启动一个副本管理器,代码如下:
def startup() {
...
// 1、ReplicaManager创建
replicaManager = new ReplicaManager(...)
// 2、ReplicaManager启动
replicaManager.startup()
}
ReplicaManager 创建
// 控制器的纪元
var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
// 当前broker的id
private val localBrokerId = config.brokerId
// 当前broker上所有分区
private val allPartitions = new Pool[TopicPartition, Partition](valueFactory = Some(tp =>
new Partition(tp.topic, tp.partition, time, this)))
private val replicaStateChangeLock = new Object
val replicaFetcherManager = new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager)
private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
private var hwThreadInitialized = false
this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
val stateChangeLogger = KafkaController.stateChangeLogger
private val isrChangeSet: mutable.Set[TopicPartition] = new mutable.HashSet[TopicPartition]()
private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis())
private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis())
// 延时PRODUCE请求
val delayedProducePurgatory = DelayedOperationPurgatory[DelayedProduce](
purgatoryName = "Produce", localBrokerId, config.producerPurgatoryPurgeIntervalRequests)
// 延时FETCH请求
val delayedFetchPurgatory = DelayedOperationPurgatory[DelayedFetch](
purgatoryName = "Fetch", localBrokerId, config.fetchPurgatoryPurgeIntervalRequests)
ReplicaManager 启动
副本管理器启动时,会启动两个 ISR 相关的定时任务线程。
def startup() {
// 1、定时任务,检查是否有follower落后leader,需要从ISR中移除(执行周期为5s)
scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
// 2、定时任务,检查是否有分区的ISR有变动(执行周期为2.5秒)
scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L, unit = TimeUnit.MILLISECONDS)
}
1、maybeShrinkIsr
遍历所有的分区,检查ISR是否需要收缩。
private def maybeShrinkIsr(): Unit = {
// 遍历所有的分区,检查ISR是否需要收缩
allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs))
}
def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
leaderReplicaIfLocal match {
// 本地副本如果是leader,才需要做以下操作
case Some(leaderReplica) =>
// 获取需要从ISR中移除的副本(失效副本)
val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs)
if(outOfSyncReplicas.nonEmpty) {
// 计算新的ISR
val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
// 更新ZK(/brokers/topics/<TOPIC_NAME>/partitions/<PARTITION_ID>/state)和缓存中的ISR
updateIsr(newInSyncReplicas)
replicaManager.isrShrinkRate.mark()
// 尝试更新HW高水位
maybeIncrementLeaderHW(leaderReplica)
} else {
false
}
case None => false
}
}
// 如果HW做了修改
if (leaderHWIncremented)
// 尝试完成所有延迟的请求(fetch、produce、deleteRecords等延迟请求)
tryCompleteDelayedRequests()
}
// 本地副本是否是leader
def leaderReplicaIfLocal: Option[Replica] =
leaderReplicaIdOpt.filter(_ == localBrokerId).flatMap(getReplica)
// 获取需要从ISR中移除的副本
def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long): Set[Replica] = {
val candidateReplicas = inSyncReplicas - leaderReplica
// 副本失效的两种情况:
// 1、follower卡住了,在一段时间内没有向leader副本发起同步请求
// 2、follower同步缓慢,在一段时间内无法追上leader
// replica.lag.time.max.ms,默认10s
val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
laggingReplicas
}
private def updateIsr(newIsr: Set[Replica]) {
val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
// 更新ZK(/brokers/topics/<TOPIC_NAME>/partitions/<PARTITION_ID>/state)
val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId,
newLeaderAndIsr, controllerEpoch, zkVersion)
if(updateSucceeded) {
// 记录哪些分区的ISR有变动
replicaManager.recordIsrChange(topicPartition)
inSyncReplicas = newIsr
zkVersion = newVersion
} else {
}
}
def recordIsrChange(topicPartition: TopicPartition) {
isrChangeSet synchronized {
// ISR有变动的分区集合
isrChangeSet += topicPartition
// ISR最近一次变动时间
lastIsrChangeMs.set(System.currentTimeMillis())
}
}
/**
* 检查并可能增加分区的高水位
* 这个方法将在两种情况下触发:
*
* 1. 分区 ISR 变动
* 2. 任一副本LEO 的改变
*/
private def maybeIncrementLeaderHW(leaderReplica: Replica, curTime: Long = time.milliseconds): Boolean = {
// 从ISR和认为能追得上的副本选择最小的LEO,作为新的HW
val allLogEndOffsets = assignedReplicas.filter { replica =>
curTime - replica.lastCaughtUpTimeMs <= replicaManager.config.replicaLagTimeMaxMs || inSyncReplicas.contains(replica)
}.map(_.logEndOffset)
val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
val oldHighWatermark = leaderReplica.highWatermark
if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) {
leaderReplica.highWatermark = newHighWatermark
true
} else {
false
}
}
2、maybePropagateIsrChanges
将 ISR的变动信息(isrChangeSet)写入 ZK (/isr_change_notification/isr_change_xxx),这样 Controller会感知到哪些分区的ISR发生了变动。
def maybePropagateIsrChanges() {
val now = System.currentTimeMillis()
isrChangeSet synchronized {
// 有ISR变动的分区
if (isrChangeSet.nonEmpty &&
// 距离上一次的时间(lastIsrChange)超过了5s
(lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBlackOut < now ||
// 距离上一次的时间(lastIsrPropagation)超过了60s
lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) {
// 将isrChangeSet数据写入ZK(/isr_change_notification/isr_change_xxx)
ReplicationUtils.propagateIsrChanges(zkUtils, isrChangeSet)
// 清空isrChangeSet
isrChangeSet.clear()
// 记录最近一次触发的时间(lastIsrPropagation)
lastIsrPropagationMs.set(now)
}
}
}
副本管理器通过直接操作分区对象来间接管理下属的副本对象。这些副本中,哪些是 Leader 副本、哪些是 Follower 副本。不是一直不变的,而是随着时间的推移不断变化的。当发生变化时,Controller 会给 Broker 发送 LeaderAndIsrRequest 请求来告知变化,当 Broker 收到请求后,会调用副本管理器的 becomeLeaderOrFollower 方法来处理,然后执行“成为 Leader 副本”或 “成为 Follower 副本”的逻辑。
def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
// ensureTopicExists is only for client facing requests
// We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
// stop serving data to clients for the topic being deleted
val correlationId = request.header.correlationId
val leaderAndIsrRequest = request.body.asInstanceOf[LeaderAndIsrRequest]
try {
def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {
// for each new leader or follower, call coordinator to handle consumer group migration.
// this callback is invoked under the replica state change lock to ensure proper order of
// leadership changes
updatedLeaders.foreach { partition =>
if (partition.topic == Topic.GroupMetadataTopicName)
coordinator.handleGroupImmigration(partition.partitionId)
}
updatedFollowers.foreach { partition =>
if (partition.topic == Topic.GroupMetadataTopicName)
coordinator.handleGroupEmigration(partition.partitionId)
}
}
val leaderAndIsrResponse =
if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)
new LeaderAndIsrResponse(result.errorCode, result.responseMap.mapValues(new JShort(_)).asJava)
} else {
val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap
new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava)
}
requestChannel.sendResponse(new Response(request, leaderAndIsrResponse))
} catch {
case e: KafkaStorageException =>
fatal("Disk error during leadership change.", e)
Runtime.getRuntime.halt(1)
}
}
def becomeLeaderOrFollower(correlationId: Int, leaderAndISRRequest: LeaderAndIsrRequest,
metadataCache: MetadataCache,
onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {
replicaStateChangeLock synchronized {
val responseMap = new mutable.HashMap[TopicPartition, Short]
// 1、校检controller epoch,若请求的controller已过期,返回错误(Errors.STALE_CONTROLLER_EPOCH)
if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {
BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH.code)
} else {
val controllerId = leaderAndISRRequest.controllerId
controllerEpoch = leaderAndISRRequest.controllerEpoch
// 2、校检所有分区的 leader epoch
// 2.1、若leader epoch错误,返回错误(Errors.STALE_CONTROLLER_EPOCH)
// 2.2、若leader epoch正确,但本地不包含该partition,返回错误(Errors.UNKNOWN_TOPIC_OR_PARTITION)
// 2.3、若leader epoch正确,且本地包含该partition,则通过校检
val partitionState = new mutable.HashMap[Partition, PartitionState]()
leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
val partition = getOrCreatePartition(topicPartition)
val partitionLeaderEpoch = partition.getLeaderEpoch
if (partitionLeaderEpoch < stateInfo.leaderEpoch) {
if (stateInfo.replicas.contains(localBrokerId))
partitionState.put(partition, stateInfo)
else {
responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
}
} else {
responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH.code)
}
}
// 哪些副本是Leader副本
val partitionsTobeLeader = partitionState.filter { case (_, stateInfo) =>
stateInfo.leader == localBrokerId
}
// 哪些副本是Follower副本
val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys
// 成为Leader副本
val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap)
else
Set.empty[Partition]
// 成为Follower副本
val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty
makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap, metadataCache)
else
Set.empty[Partition]
if (!hwThreadInitialized) {
// 启动高水位检查点专属线程,定期将Broker上所有非Offline分区的高水位值写入到检查点文件
startHighWaterMarksCheckPointThread()
hwThreadInitialized = true
}
replicaFetcherManager.shutdownIdleFetcherThreads()
onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
BecomeLeaderOrFollowerResult(responseMap, Errors.NONE.code)
}
}
}
makeLeaders
private def makeLeaders(controllerId: Int,
epoch: Int,
partitionState: Map[Partition, PartitionState],
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Short]): Set[Partition] = {
// 构造返回结果
for (partition <- partitionState.keys)
responseMap.put(partition.topicPartition, Errors.NONE.code)
val partitionsToMakeLeaders: mutable.Set[Partition] = mutable.Set()
try {
// First stop fetchers for all the partitions
// 停止Fetcher线程
replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(_.topicPartition))
// Update the partition information to be the leader
// 构造新增leader partition集合
partitionState.foreach { case (partition, partitionStateInfo) =>
if (partition.makeLeader(controllerId, partitionStateInfo, correlationId))
partitionsToMakeLeaders += partition
else
stateChangeLogger.info(("Broker %d skipped the become-leader state change after marking its partition as leader with correlation id %d from " +
"controller %d epoch %d for partition %s since it is already the leader for the partition.")
.format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
}
partitionsToMakeLeaders.foreach { partition =>
stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-leader request from controller " +
"%d epoch %d with correlation id %d for partition %s")
.format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition))
}
} catch {
case e: Throwable =>
partitionState.keys.foreach { partition =>
val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d" +
" epoch %d for partition %s").format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition)
stateChangeLogger.error(errorMsg, e)
}
// Re-throw the exception for it to be caught in KafkaApis
throw e
}
partitionState.keys.foreach { partition =>
stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " +
"for the become-leader transition for partition %s")
.format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
}
partitionsToMakeLeaders
}
def makeLeader(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {
val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = partitionStateInfo.controllerEpoch
// add replicas that are new
allReplicas.foreach(replica => getOrCreateReplica(replica))
// 新ISR
val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet
// 移除所有不在新ISR中的副本
(assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica)
inSyncReplicas = newInSyncReplicas
leaderEpoch = partitionStateInfo.leaderEpoch
zkVersion = partitionStateInfo.zkVersion
// 是否第一次成为该partition的leader
val isNewLeader =
if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == localBrokerId) {
false
} else {
leaderReplicaIdOpt = Some(localBrokerId)
true
}
val leaderReplica = getReplica().get
val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset
val curTimeMs = time.milliseconds
// initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset.
(assignedReplicas - leaderReplica).foreach { replica =>
val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0L
replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs)
}
// we may need to increment high watermark since ISR could be down to 1
if (isNewLeader) {
// construct the high watermark metadata for the new leader replica
leaderReplica.convertHWToLocalOffsetMetadata()
// reset log end offset for remote replicas
assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult))
}
(maybeIncrementLeaderHW(leaderReplica), isNewLeader)
}
// some delayed operations may be unblocked after HW changed
if (leaderHWIncremented)
tryCompleteDelayedRequests()
isNewLeader
}
private def makeFollowers(controllerId: Int,
epoch: Int,
partitionState: Map[Partition, PartitionState],
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Short],
metadataCache: MetadataCache): Set[Partition] = {
// 构造返回结果
for (partition <- partitionState.keys)
responseMap.put(partition.topicPartition, Errors.NONE.code)
val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
try {
// TODO: Delete leaders from LeaderAndIsrRequest
partitionState.foreach { case (partition, partitionStateInfo) =>
val newLeaderBrokerId = partitionStateInfo.leader
metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
// Only change partition state when the leader is available
case Some(_) =>
if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
partitionsToMakeFollower += partition
else
...
case None =>
partition.getOrCreateReplica()
}
}
// 移除Fetcher线程
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))
// 根据新hw进行truncate
logManager.truncateTo(partitionsToMakeFollower.map { partition =>
(partition.topicPartition, partition.getOrCreateReplica().highWatermark.messageOffset)
}.toMap)
// hw更新,尝试处理请求
partitionsToMakeFollower.foreach { partition =>
val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition)
tryCompleteDelayedProduce(topicPartitionOperationKey)
tryCompleteDelayedFetch(topicPartitionOperationKey)
}
else {
// we do not need to check if the leader exists again since this has been done at the beginning of this process
// 重置fetch位置,加入Fetcher
val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
partition.topicPartition -> BrokerAndInitialOffset(
metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName),
partition.getReplica().get.logEndOffset.messageOffset)).toMap
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
}
} catch {
case e: Throwable =>
throw e
}
partitionsToMakeFollower
}
def makeFollower(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {
inWriteLock(leaderIsrUpdateLock) {
val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
val newLeaderBrokerId: Int = partitionStateInfo.leader
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = partitionStateInfo.controllerEpoch
// add replicas that are new
allReplicas.foreach(r => getOrCreateReplica(r))
// 移除掉不需要的副本
(assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica)
inSyncReplicas = Set.empty[Replica]
leaderEpoch = partitionStateInfo.leaderEpoch
zkVersion = partitionStateInfo.zkVersion
if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) {
false
}
else {
leaderReplicaIdOpt = Some(newLeaderBrokerId)
true
}
}
}