Open Source, Open Future!
  menu
107 文章
ღゝ◡╹)ノ❤️

Kafka副本管理器

Broker 启动

Broker在启动时,会启动一个副本管理器,代码如下:

def startup() {
  ...
  // 1、ReplicaManager创建
  replicaManager = new ReplicaManager(...)
  // 2、ReplicaManager启动
  replicaManager.startup()
}

ReplicaManager 创建

// 控制器的纪元
var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
// 当前broker的id
private val localBrokerId = config.brokerId
// 当前broker上所有分区
private val allPartitions = new Pool[TopicPartition, Partition](valueFactory = Some(tp =>
  new Partition(tp.topic, tp.partition, time, this)))
private val replicaStateChangeLock = new Object
val replicaFetcherManager = new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager)
private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
val highWatermarkCheckpoints = config.logDirs.map(dir => (new File(dir).getAbsolutePath, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
private var hwThreadInitialized = false
this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
val stateChangeLogger = KafkaController.stateChangeLogger
private val isrChangeSet: mutable.Set[TopicPartition] = new mutable.HashSet[TopicPartition]()
private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis())
private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis())
// 延时PRODUCE请求
val delayedProducePurgatory = DelayedOperationPurgatory[DelayedProduce](
  purgatoryName = "Produce", localBrokerId, config.producerPurgatoryPurgeIntervalRequests)
// 延时FETCH请求
val delayedFetchPurgatory = DelayedOperationPurgatory[DelayedFetch](
  purgatoryName = "Fetch", localBrokerId, config.fetchPurgatoryPurgeIntervalRequests)

ReplicaManager 启动

副本管理器启动时,会启动两个 ISR 相关的定时任务线程。

def startup() {
  // 1、定时任务,检查是否有follower落后leader,需要从ISR中移除(执行周期为5s)
  scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
  // 2、定时任务,检查是否有分区的ISR有变动(执行周期为2.5秒)
  scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L, unit = TimeUnit.MILLISECONDS)
}

1、maybeShrinkIsr

遍历所有的分区,检查ISR是否需要收缩。

private def maybeShrinkIsr(): Unit = {
  // 遍历所有的分区,检查ISR是否需要收缩
  allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs))
}
def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
  val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
    leaderReplicaIfLocal match {
      // 本地副本如果是leader,才需要做以下操作
      case Some(leaderReplica) =>
        // 获取需要从ISR中移除的副本(失效副本)
        val outOfSyncReplicas = getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs)
        if(outOfSyncReplicas.nonEmpty) {
          // 计算新的ISR
          val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
          // 更新ZK(/brokers/topics/<TOPIC_NAME>/partitions/<PARTITION_ID>/state)和缓存中的ISR
          updateIsr(newInSyncReplicas)
          replicaManager.isrShrinkRate.mark()
          // 尝试更新HW高水位
          maybeIncrementLeaderHW(leaderReplica)
        } else {
          false
        }

      case None => false 
    }
  }

  // 如果HW做了修改
  if (leaderHWIncremented)
    // 尝试完成所有延迟的请求(fetch、produce、deleteRecords等延迟请求)
    tryCompleteDelayedRequests()
}


// 本地副本是否是leader
def leaderReplicaIfLocal: Option[Replica] =
  leaderReplicaIdOpt.filter(_ == localBrokerId).flatMap(getReplica)


// 获取需要从ISR中移除的副本
def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long): Set[Replica] = {
  val candidateReplicas = inSyncReplicas - leaderReplica
  // 副本失效的两种情况:
  // 1、follower卡住了,在一段时间内没有向leader副本发起同步请求
  // 2、follower同步缓慢,在一段时间内无法追上leader
  // replica.lag.time.max.ms,默认10s
  val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
  laggingReplicas
}
private def updateIsr(newIsr: Set[Replica]) {
  val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, newIsr.map(r => r.brokerId).toList, zkVersion)
  // 更新ZK(/brokers/topics/<TOPIC_NAME>/partitions/<PARTITION_ID>/state)
  val (updateSucceeded,newVersion) = ReplicationUtils.updateLeaderAndIsr(zkUtils, topic, partitionId,
    newLeaderAndIsr, controllerEpoch, zkVersion)

  if(updateSucceeded) {
    // 记录哪些分区的ISR有变动
    replicaManager.recordIsrChange(topicPartition)  
    inSyncReplicas = newIsr
    zkVersion = newVersion
  } else {
  }
}

def recordIsrChange(topicPartition: TopicPartition) {
  isrChangeSet synchronized {
    // ISR有变动的分区集合
    isrChangeSet += topicPartition
    // ISR最近一次变动时间
    lastIsrChangeMs.set(System.currentTimeMillis())
  }
}
/**
 * 检查并可能增加分区的高水位
 * 这个方法将在两种情况下触发:
 *
 * 1. 分区 ISR 变动
 * 2. 任一副本LEO 的改变
 */
private def maybeIncrementLeaderHW(leaderReplica: Replica, curTime: Long = time.milliseconds): Boolean = {
  // 从ISR和认为能追得上的副本选择最小的LEO,作为新的HW
  val allLogEndOffsets = assignedReplicas.filter { replica =>
    curTime - replica.lastCaughtUpTimeMs <= replicaManager.config.replicaLagTimeMaxMs || inSyncReplicas.contains(replica)
  }.map(_.logEndOffset)
  val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
  val oldHighWatermark = leaderReplica.highWatermark
  if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) {
    leaderReplica.highWatermark = newHighWatermark
    true
  } else {
    false
  }
}

2、maybePropagateIsrChanges

将 ISR的变动信息(isrChangeSet)写入 ZK (/isr_change_notification/isr_change_xxx),这样 Controller会感知到哪些分区的ISR发生了变动。

def maybePropagateIsrChanges() {
  val now = System.currentTimeMillis()
  isrChangeSet synchronized {
    // 有ISR变动的分区
    if (isrChangeSet.nonEmpty &&
        // 距离上一次的时间(lastIsrChange)超过了5s
      (lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBlackOut < now ||
        // 距离上一次的时间(lastIsrPropagation)超过了60s
        lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) {
      // 将isrChangeSet数据写入ZK(/isr_change_notification/isr_change_xxx)
      ReplicationUtils.propagateIsrChanges(zkUtils, isrChangeSet)
      // 清空isrChangeSet
      isrChangeSet.clear()
      // 记录最近一次触发的时间(lastIsrPropagation)
      lastIsrPropagationMs.set(now)
    }
  }
}

副本管理器通过直接操作分区对象来间接管理下属的副本对象。这些副本中,哪些是 Leader 副本、哪些是 Follower 副本。不是一直不变的,而是随着时间的推移不断变化的。当发生变化时,Controller 会给 Broker 发送 LeaderAndIsrRequest 请求来告知变化,当 Broker 收到请求后,会调用副本管理器的 becomeLeaderOrFollower 方法来处理,然后执行“成为 Leader 副本”或 “成为 Follower 副本”的逻辑。

def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
    // ensureTopicExists is only for client facing requests
    // We can't have the ensureTopicExists check here since the controller sends it as an advisory to all brokers so they
    // stop serving data to clients for the topic being deleted
    val correlationId = request.header.correlationId
    val leaderAndIsrRequest = request.body.asInstanceOf[LeaderAndIsrRequest]

    try {
      def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {
        // for each new leader or follower, call coordinator to handle consumer group migration.
        // this callback is invoked under the replica state change lock to ensure proper order of
        // leadership changes
        updatedLeaders.foreach { partition =>
          if (partition.topic == Topic.GroupMetadataTopicName)
            coordinator.handleGroupImmigration(partition.partitionId)
        }
        updatedFollowers.foreach { partition =>
          if (partition.topic == Topic.GroupMetadataTopicName)
            coordinator.handleGroupEmigration(partition.partitionId)
        }
      }

      val leaderAndIsrResponse =
        if (authorize(request.session, ClusterAction, Resource.ClusterResource)) {
          val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange)
          new LeaderAndIsrResponse(result.errorCode, result.responseMap.mapValues(new JShort(_)).asJava)
        } else {
          val result = leaderAndIsrRequest.partitionStates.asScala.keys.map((_, new JShort(Errors.CLUSTER_AUTHORIZATION_FAILED.code))).toMap
          new LeaderAndIsrResponse(Errors.CLUSTER_AUTHORIZATION_FAILED.code, result.asJava)
        }

      requestChannel.sendResponse(new Response(request, leaderAndIsrResponse))
    } catch {
      case e: KafkaStorageException =>
        fatal("Disk error during leadership change.", e)
        Runtime.getRuntime.halt(1)
    }
  }
def becomeLeaderOrFollower(correlationId: Int, leaderAndISRRequest: LeaderAndIsrRequest,
                             metadataCache: MetadataCache,
                             onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {
    replicaStateChangeLock synchronized {
      val responseMap = new mutable.HashMap[TopicPartition, Short]

      // 1、校检controller epoch,若请求的controller已过期,返回错误(Errors.STALE_CONTROLLER_EPOCH)
      if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {
        BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH.code)
      } else {
        val controllerId = leaderAndISRRequest.controllerId
        controllerEpoch = leaderAndISRRequest.controllerEpoch

        // 2、校检所有分区的 leader epoch
        // 2.1、若leader epoch错误,返回错误(Errors.STALE_CONTROLLER_EPOCH)
        // 2.2、若leader epoch正确,但本地不包含该partition,返回错误(Errors.UNKNOWN_TOPIC_OR_PARTITION)
        // 2.3、若leader epoch正确,且本地包含该partition,则通过校检
        val partitionState = new mutable.HashMap[Partition, PartitionState]()
        leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
          val partition = getOrCreatePartition(topicPartition)
          val partitionLeaderEpoch = partition.getLeaderEpoch
          if (partitionLeaderEpoch < stateInfo.leaderEpoch) {
            if (stateInfo.replicas.contains(localBrokerId))
              partitionState.put(partition, stateInfo)
            else {
              responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
            }
          } else {
            responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH.code)
          }
        }

        // 哪些副本是Leader副本
        val partitionsTobeLeader = partitionState.filter { case (_, stateInfo) =>
          stateInfo.leader == localBrokerId
        }
        // 哪些副本是Follower副本
        val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys
        // 成为Leader副本
        val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
          makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap)
        else
          Set.empty[Partition]
        // 成为Follower副本
        val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty                  
          makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap, metadataCache)
        else
          Set.empty[Partition]    

        if (!hwThreadInitialized) {
          // 启动高水位检查点专属线程,定期将Broker上所有非Offline分区的高水位值写入到检查点文件
          startHighWaterMarksCheckPointThread()
          hwThreadInitialized = true
        }
                                     
        replicaFetcherManager.shutdownIdleFetcherThreads()

        onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
        BecomeLeaderOrFollowerResult(responseMap, Errors.NONE.code)
      }
    }
  }

makeLeaders

private def makeLeaders(controllerId: Int,
                        epoch: Int,
                        partitionState: Map[Partition, PartitionState],
                        correlationId: Int,
                        responseMap: mutable.Map[TopicPartition, Short]): Set[Partition] = {
  // 构造返回结果
  for (partition <- partitionState.keys)
    responseMap.put(partition.topicPartition, Errors.NONE.code)

  val partitionsToMakeLeaders: mutable.Set[Partition] = mutable.Set()

  try {
    // First stop fetchers for all the partitions
    // 停止Fetcher线程 
    replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(_.topicPartition))
  
    // Update the partition information to be the leader
    // 构造新增leader partition集合
    partitionState.foreach { case (partition, partitionStateInfo) =>
      if (partition.makeLeader(controllerId, partitionStateInfo, correlationId))
        partitionsToMakeLeaders += partition
      else
        stateChangeLogger.info(("Broker %d skipped the become-leader state change after marking its partition as leader with correlation id %d from " +
          "controller %d epoch %d for partition %s since it is already the leader for the partition.")
          .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
    }
    partitionsToMakeLeaders.foreach { partition =>
      stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-leader request from controller " +
        "%d epoch %d with correlation id %d for partition %s")
        .format(localBrokerId, controllerId, epoch, correlationId, partition.topicPartition))
    }
  } catch {
    case e: Throwable =>
      partitionState.keys.foreach { partition =>
        val errorMsg = ("Error on broker %d while processing LeaderAndIsr request correlationId %d received from controller %d" +
          " epoch %d for partition %s").format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition)
        stateChangeLogger.error(errorMsg, e)
      }
      // Re-throw the exception for it to be caught in KafkaApis
      throw e
  }

  partitionState.keys.foreach { partition =>
    stateChangeLogger.trace(("Broker %d completed LeaderAndIsr request correlationId %d from controller %d epoch %d " +
      "for the become-leader transition for partition %s")
      .format(localBrokerId, correlationId, controllerId, epoch, partition.topicPartition))
  }

  partitionsToMakeLeaders
}
def makeLeader(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {
    val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
      val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
      // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
      // to maintain the decision maker controller's epoch in the zookeeper path
      controllerEpoch = partitionStateInfo.controllerEpoch
      // add replicas that are new
      allReplicas.foreach(replica => getOrCreateReplica(replica))
  
      // 新ISR
      val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet
      // 移除所有不在新ISR中的副本
      (assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica)
  
      inSyncReplicas = newInSyncReplicas
      leaderEpoch = partitionStateInfo.leaderEpoch
      zkVersion = partitionStateInfo.zkVersion

      // 是否第一次成为该partition的leader
      val isNewLeader =
        if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == localBrokerId) {
          false
        } else {
          leaderReplicaIdOpt = Some(localBrokerId)
          true
        }
  
      val leaderReplica = getReplica().get
      val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset
      val curTimeMs = time.milliseconds
      // initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and lastFetchLeaderLogEndOffset.
      (assignedReplicas - leaderReplica).foreach { replica =>
        val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0L
        replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs)
      }

  
      // we may need to increment high watermark since ISR could be down to 1
      if (isNewLeader) {
        // construct the high watermark metadata for the new leader replica
        leaderReplica.convertHWToLocalOffsetMetadata()
        // reset log end offset for remote replicas
        assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult))
      }
      (maybeIncrementLeaderHW(leaderReplica), isNewLeader)
    }
    // some delayed operations may be unblocked after HW changed
    if (leaderHWIncremented)
      tryCompleteDelayedRequests()
    isNewLeader
  }
private def makeFollowers(controllerId: Int,
                          epoch: Int,
                          partitionState: Map[Partition, PartitionState],
                          correlationId: Int,
                          responseMap: mutable.Map[TopicPartition, Short],
                          metadataCache: MetadataCache): Set[Partition] = {
  // 构造返回结果
  for (partition <- partitionState.keys)
    responseMap.put(partition.topicPartition, Errors.NONE.code)

  val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()

  try {
    // TODO: Delete leaders from LeaderAndIsrRequest
    partitionState.foreach { case (partition, partitionStateInfo) =>
      val newLeaderBrokerId = partitionStateInfo.leader
      metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
        // Only change partition state when the leader is available
        case Some(_) =>
          if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
            partitionsToMakeFollower += partition
          else
             ...
        case None =>
          partition.getOrCreateReplica()
      }
    }

    // 移除Fetcher线程
    replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))

    // 根据新hw进行truncate
    logManager.truncateTo(partitionsToMakeFollower.map { partition =>
      (partition.topicPartition, partition.getOrCreateReplica().highWatermark.messageOffset)
    }.toMap)

    // hw更新,尝试处理请求
    partitionsToMakeFollower.foreach { partition =>
      val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition)
      tryCompleteDelayedProduce(topicPartitionOperationKey)
      tryCompleteDelayedFetch(topicPartitionOperationKey)
    }
    else {
      // we do not need to check if the leader exists again since this has been done at the beginning of this process
      // 重置fetch位置,加入Fetcher
      val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
        partition.topicPartition -> BrokerAndInitialOffset(
          metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName),
          partition.getReplica().get.logEndOffset.messageOffset)).toMap
      replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
    }
  } catch {
    case e: Throwable =>
      throw e
  }
  partitionsToMakeFollower
}
def makeFollower(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {
  inWriteLock(leaderIsrUpdateLock) {
    val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
    val newLeaderBrokerId: Int = partitionStateInfo.leader
    // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
    // to maintain the decision maker controller's epoch in the zookeeper path
    controllerEpoch = partitionStateInfo.controllerEpoch
    // add replicas that are new
    allReplicas.foreach(r => getOrCreateReplica(r))
    // 移除掉不需要的副本
    (assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica)
    inSyncReplicas = Set.empty[Replica]
    leaderEpoch = partitionStateInfo.leaderEpoch
    zkVersion = partitionStateInfo.zkVersion

    if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) {
      false
    }
    else {
      leaderReplicaIdOpt = Some(newLeaderBrokerId)
      true
    }
  }
}