1、简介
当一个主题分区的 Leader、ISR发生改变时,Controller会向 Broker 发送 LeaderAndIsr 请求。这个请求中会封装分区的元数据信息(leader、isr、controller_epoch、leader_epoch、version等)。Controller是批量发送的,所以请求中可能会包含有多个分区的数据。
2、流程
Broker接收到LeaderAndIsr 请求后,处理流程如下:
- 校检controller epoch是否有效,若错误则返回 Errors.STALE_CONTROLLER_EPOCH
- 获取分区对象,分区不存在则创建并放入缓存池 allPartitions 中
- 校检leader epoch是否有效
- 分析哪些副本将要成为Leader,哪些副本将要成为Follower
- 把将要成为Leader的副本设置为Leader
5.1. 移除这些分区的副本同步线程
5.2. 将副本设置为Leader
5.2.1. 获取所有的副本,副本不存在则创建并放入缓存池 assignedReplicaMap 中
5.2.1.1. 若是本地副本,创建Log和Replica对象
5.2.1.1.1. 创建Log对象
5.2.1.1.1.1. 创建日志段
5.2.1.1.1.1.1. 创建日志文件,文件名以.log结尾
5.2.1.1.1.1.2. 创建索引文件,文件名以.index结尾
5.2.1.1.1.1.3. 创建时间索引文件,文件名以.timeindex结尾
5.2.1.1.1.2. 计算下一条待写入消息的偏移
5.2.1.1.2. 创建Replica对象
5.2.1.2. 若是远程副本,只创建Replica对象,不创建Log对象
5.2.2. 更新分区所有Follower副本的信息:
● lastFetchLeaderLogEndOffset :最后一次拉取时Leader副本的LEO
● lastFetchTimeMs :最后一次拉取数据的时间
● _lastCaughtUpTimeMs :最后一次追上Leader副本的时间
5.2.3. 初始化Leader副本的高水位(HW)
5.2.4. 重置远程副本的LEO
5.2.5. 尝试更新高水位值HW
5.2.5.1. 取出ISR中所有副本的LEO
5.2.5.2. 将最小的LEO作为新的高水位值
5.2.6. 尝试完成所有延迟的请求
5.2.7. 如果副本之前就是Leader,返回false,否则返回true - 把将要成为Follower的副本设置为Follower
6.1. 将副本设置为Follower
6.1.1. 获取所有的副本,副本不存在则创建并放入缓存中
6.1.1.1. 若是本地副本,创建Log和Replica对象
6.1.1.1.1. 创建Log对象
6.1.1.1.1.1. 创建日志段
6.1.1.1.1.1.1. 创建日志文件,文件名以.log结尾
6.1.1.1.1.1.2. 创建索引文件,文件名以.index结尾
6.1.1.1.1.1.3. 创建时间索引文件,文件名以.timeindex结尾
6.1.1.2. 若是远程副本,只创建Replica对象,不创建Log对象
6.1.2. 移除要删除的副本
6.1.3. Leader副本如果没有改变,则返回false,否则返回true
6.2. 移除这些分区的副本同步线程
6.3. 将日志截断到指定的偏移量
6.4. 完成延迟请求的处理(Produce和Fetch请求)
6.5. 启动副本同步线程,同步Leader副本的数据 - 启动高水位检查点线程
- 关掉处于idle状态的Fetcher线程
- 执行回调函数,如果主题__consumer_offset 发生 leader切换,GroupCoordinator需要进行相应处理
- 封装LeaderAndIsrResponse响应返回
3、原理
3.1、入口
Broker接收到LeaderAndIsr 请求后,处理逻辑在KafkaApis的handleLeaderAndIsrRequest方法中,如下:
- 调用副本管理器进行Leader和Follower副本的处理
- 执行回调函数,如果主题__consumer_offset 发生了 leader切换,GroupCoordinator需要进行相应处理
- 封装LeaderAndIsrResponse响应返回
def handleLeaderAndIsrRequest(request: RequestChannel.Request) {
val correlationId = request.header.correlationId
val leaderAndIsrRequest = request.body.asInstanceOf[LeaderAndIsrRequest]
try {
// 回调函数,如果主题:__consumer_offset 发生了 leader切换,GroupCoordinator需要进行相应处理
def onLeadershipChange(updatedLeaders: Iterable[Partition], updatedFollowers: Iterable[Partition]) {
updatedLeaders.foreach { partition =>
if (partition.topic == Topic.GroupMetadataTopicName)
coordinator.handleGroupImmigration(partition.partitionId)
}
updatedFollowers.foreach { partition =>
if (partition.topic == Topic.GroupMetadataTopicName)
coordinator.handleGroupEmigration(partition.partitionId)
}
}
val leaderAndIsrResponse =
...
// 调用副本管理器进行Leader和Follower副本的处理
val result = replicaManager.becomeLeaderOrFollower(correlationId, leaderAndIsrRequest, metadataCache, onLeadershipChange
// 封装返回结果
new LeaderAndIsrResponse(result.errorCode, result.responseMap.mapValues(new JShort(_)).asJava)
...
requestChannel.sendResponse(new Response(request, leaderAndIsrResponse))
} catch {
...
}
}
3.2、副本管理器
副本管理器对Leader和Follower副本的处理流程如下:
- 校检controller epoch是否有效,若错误则返回 Errors.STALE_CONTROLLER_EPOCH
- 获取分区对象,分区不存在则创建并放入缓存池 allPartitions 中
- 校检 Leader epoch是否有效
- 分析哪些副本将要成为Leader,哪些副本将要成为Follower
- 把将要成为Leader的副本设置为Leader
- 将要成为Follower的副本设置为Follower
- 启动高水位检查点线程
- 关掉处于idle状态的Fetcher线程
- 执行回调函数
def becomeLeaderOrFollower(correlationId: Int, leaderAndISRRequest: LeaderAndIsrRequest,
metadataCache: MetadataCache,
onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {
replicaStateChangeLock synchronized {
val responseMap = new mutable.HashMap[TopicPartition, Short]
// 1、校检controller epoch是否有效,若错误则返回 Errors.STALE_CONTROLLER_EPOCH
if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {
BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH.code)
} else {
val controllerId = leaderAndISRRequest.controllerId
controllerEpoch = leaderAndISRRequest.controllerEpoch
val partitionState = new mutable.HashMap[Partition, PartitionState]()
leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
// 2、获取分区对象,分区不存在则创建并放入缓存池 allPartitions 中
val partition = getOrCreatePartition(topicPartition)
val partitionLeaderEpoch = partition.getLeaderEpoch
// 3、校检leader epoch是否有效
if (partitionLeaderEpoch < stateInfo.leaderEpoch) {
if (stateInfo.replicas.contains(localBrokerId))
partitionState.put(partition, stateInfo)
else {
responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
}
} else {
responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH.code)
}
}
// 4、哪些副本将要成为Leader
val partitionsTobeLeader = partitionState.filter { case (_, stateInfo) =>
stateInfo.leader == localBrokerId
}
// 5、哪些副本将要成为Follower
val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys
val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
// 6、将副本设置为Leader
makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap)
else
Set.empty[Partition]
val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
// 7、将副本设置为Follower
makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap, metadataCache)
else
Set.empty[Partition]
// 8、启动高水位检查点线程
if (!hwThreadInitialized) {
startHighWaterMarksCheckPointThread()
hwThreadInitialized = true
}
// 9、关掉处于idle状态的Fetcher线程
replicaFetcherManager.shutdownIdleFetcherThreads()
// 10、执行回调函数
onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)
BecomeLeaderOrFollowerResult(responseMap, Errors.NONE.code)
}
}
}
3.3、将副本设置为Leader
3.3.1、makeLeaders
把将要成为Leader的副本设置为Leader,流程如下:
- 移除这些分区的副本同步线程
- 将副本设置为Leader
2.1. 获取所有的副本,副本不存在则创建并放入缓存中
2.2. 移除要删除的副本
2.3. 更新分区所有Follower副本的信息(LEO、lastFetchTimeMs、_lastCaughtUpTimeMs)
2.4. 初始化Leader副本的高水位(HW)
2.5. 重置远程副本的LEO
2.6. 尝试更新高水位值HW
2.6.1. 取出ISR中所有副本的LEO
2.6.2. 取最小的LEO作为新的高水位值
2.6.3. 高水位值增加了,则更新高水位值,返回true,否则返回false
2.7. 尝试完成所有延迟的请求
2.8. 若分区的Leader副本之前在这个broker上,则返回false,否则返回true
private def makeLeaders(controllerId: Int,
epoch: Int,
partitionState: Map[Partition, PartitionState],
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Short]): Set[Partition] = {
for (partition <- partitionState.keys)
responseMap.put(partition.topicPartition, Errors.NONE.code)
val partitionsToMakeLeaders: mutable.Set[Partition] = mutable.Set()
try {
// 1、移除这些分区的副本同步
replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(_.topicPartition))
// 2、将副本设置为Leader
partitionState.foreach { case (partition, partitionStateInfo) =>
if (partition.makeLeader(controllerId, partitionStateInfo, correlationId))
partitionsToMakeLeaders += partition
else
...
}
...
}
} catch {
...
}
partitionsToMakeLeaders
}
3.3.2、makeLeader
// 将副本设置为Leader, 如果副本之前就是Leader,返回false,否则返回true
def makeLeader(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {
val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
controllerEpoch = partitionStateInfo.controllerEpoch
// 1、获取所有的副本,副本不存在则创建并放入缓存中
allReplicas.foreach(replica => getOrCreateReplica(replica))
// 新ISR
val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet
// 2、移除要删除的副本
(assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica)
inSyncReplicas = newInSyncReplicas
leaderEpoch = partitionStateInfo.leaderEpoch
zkVersion = partitionStateInfo.zkVersion
// 3、是否是新Leader
val isNewLeader =
if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == localBrokerId) {
// 若分区的Leader副本之前在这个broker上,则返回false
false
} else {
// 若分区的Leader副本之前不在这个broker上,说明是新Leader,则返回true
leaderReplicaIdOpt = Some(localBrokerId)
true
}
// Leader副本
val leaderReplica = getReplica().get
// Leader副本的LEO
val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset
val curTimeMs = time.milliseconds
// 4、更新分区所有Follower副本的信息:
// 4.1、 lastFetchLeaderLogEndOffset = curLeaderLogEndOffset
// 4.2、 lastFetchTimeMs = curTimeMs
// 4.3、_lastCaughtUpTimeMs = lastCaughtUpTimeMs (副本若在ISR中则为curTimeMs,否则为0)
(assignedReplicas - leaderReplica).foreach { replica =>
val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0L
replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs)
}
if (isNewLeader) {
// 5、初始化Leader副本的高水位(HW)
leaderReplica.convertHWToLocalOffsetMetadata()
// 6、重置远程副本的LEO
assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult))
}
// 7、尝试更新高水位值HW,若HW增加了,则返回true,否则返回false
(maybeIncrementLeaderHW(leaderReplica), isNewLeader)
}
if (leaderHWIncremented)
// 8、尝试完成所有延迟的请求
tryCompleteDelayedRequests()
isNewLeader
}
// 尝试更新HW高水位值
private def maybeIncrementLeaderHW(leaderReplica: Replica, curTime: Long = time.milliseconds): Boolean = {
// 取出ISR中所有副本的LEO
val allLogEndOffsets = assignedReplicas.filter { replica =>
curTime - replica.lastCaughtUpTimeMs <= replicaManager.config.replicaLagTimeMaxMs || inSyncReplicas.contains(replica)
}.map(_.logEndOffset)
// 取最小的LEO作为新的高水位值
val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)
// 老的高水位值
val oldHighWatermark = leaderReplica.highWatermark
// 高水位值增加了,则更新高水位值,返回true,否则返回false
if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) {
leaderReplica.highWatermark = newHighWatermark
true
} else {
false
}
}
3.4、将副本设置为Follower
3.4.1、makeFollowers
把将要成为Follower的副本设置为Follower,流程如下:
- 将副本设置为Follower
1.1. 获取所有的副本,副本不存在则创建并放入缓存中
1.2. 移除要删除的副本
1.3. Leader副本如果没有改变,则返回false,否则返回true - 移除这些分区的副本同步线程
- 将日志截断到指定的偏移量
- 完成延迟请求的处理(Produce和Fetch请求)
- 启动副本同步线程,同步Leader副本的数据
private def makeFollowers(controllerId: Int,
epoch: Int,
partitionState: Map[Partition, PartitionState],
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Short],
metadataCache: MetadataCache): Set[Partition] = {
for (partition <- partitionState.keys)
responseMap.put(partition.topicPartition, Errors.NONE.code)
val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
try {
partitionState.foreach { case (partition, partitionStateInfo) =>
val newLeaderBrokerId = partitionStateInfo.leader
metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
case Some(_) =>
// 1、将副本设置为Follower,若Leader副本没有改变,则返回false,否则返回true
if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
partitionsToMakeFollower += partition
else
...
case None =>
...
partition.getOrCreateReplica()
}
}
// 2、移除这些分区的副本同步线程
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))
// 3、将日志截断到指定的偏移量
logManager.truncateTo(partitionsToMakeFollower.map { partition =>
(partition.topicPartition, partition.getOrCreateReplica().highWatermark.messageOffset)
}.toMap)
// 4、完成延迟请求的处理(Produce和Fetch请求)
partitionsToMakeFollower.foreach { partition =>
val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition)
tryCompleteDelayedProduce(topicPartitionOperationKey)
tryCompleteDelayedFetch(topicPartitionOperationKey)
}
if (isShuttingDown.get()) {
...
}
else {
val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
partition.topicPartition -> BrokerAndInitialOffset(
metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName),
partition.getReplica().get.logEndOffset.messageOffset)).toMap
// 5、启动副本同步线程,同步Leader副本的数据
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
}
} catch {
case e: Throwable =>
...
throw e
}
...
partitionsToMakeFollower
}
3.4.2、makeFollower
def makeFollower(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {
inWriteLock(leaderIsrUpdateLock) {
val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
val newLeaderBrokerId: Int = partitionStateInfo.leader
controllerEpoch = partitionStateInfo.controllerEpoch
// 1、获取所有的副本,副本不存在则创建并放入缓存中
allReplicas.foreach(r => getOrCreateReplica(r))
// 2、移除要删除的副本
(assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica)
inSyncReplicas = Set.empty[Replica]
leaderEpoch = partitionStateInfo.leaderEpoch
zkVersion = partitionStateInfo.zkVersion
// 3、Leader副本如果没有改变,则返回false,否则返回true
if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) {
false
}
else {
leaderReplicaIdOpt = Some(newLeaderBrokerId)
true
}
}
}
3.5、创建副本
使用Partition创建副本,并放入缓存池中,流程如下:
- 若是本地副本,创建Log和Replica对象
1.1. 创建Log对象
1.2. 创建Replica对象 - 若是远程副本,只创建Replica对象,不创建Log对象
// 创建副本,并放入缓存池中
def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
assignedReplicaMap.getAndMaybePut(replicaId, {
// 1、若是本地副本,创建Log和Replica对象
if (isReplicaLocal(replicaId)) {
val config = LogConfig.fromProps(logManager.defaultConfig.originals, AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic))
// 1.1、创建Log对象
val log = logManager.createLog(topicPartition, config)
// replication-offset-checkpoint
val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
val offsetMap = checkpoint.read
if (!offsetMap.contains(topicPartition))
info(s"No checkpointed highwatermark is found for partition $topicPartition")
val offset = math.min(offsetMap.getOrElse(topicPartition, 0L), log.logEndOffset)
// 1.2、创建Replica对象
new Replica(replicaId, this, time, offset, Some(log))
// 2、若是远程副本,只创建Replica对象,不创建Log对象
} else new Replica(replicaId, this, time)
})
}
def convertHWToLocalOffsetMetadata() = {
if (isLocal) {
highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset)
} else {
throw new KafkaException(s"Should not construct complete high watermark on partition $topicPartition's non-local replica $brokerId")
}
}
3.6、创建日志
- 创建日志段
1.1. 创建日志文件,文件名以.log结尾
1.2. 创建索引文件,文件名以.index结尾
1.3. 创建时间索引文件,文件名以.timeindex结尾 - 计算下一条待写入消息的偏移
def createLog(topicPartition: TopicPartition, config: LogConfig): Log = {
logCreationOrDeletionLock synchronized {
getLog(topicPartition).getOrElse {
val dataDir = nextLogDir()
val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition)
dir.mkdirs()
// 创建日志
val log = new Log(dir, config, recoveryPoint = 0L, scheduler, time)
logs.put(topicPartition, log)
log
}
}
}
locally {
val startMs = time.milliseconds
// 加载日志段,没有则创建
loadSegments()
/* Calculate the offset of the next message */
// 计算下一条待写入消息的偏移
nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset(), activeSegment.baseOffset, activeSegment.size.toInt)
}
// 加载日志段,没有则创建
private def loadSegments() {
...
// 创建日志段
if (logSegments.isEmpty) {
segments.put(0L, new LogSegment(dir = dir,
startOffset = 0,
indexIntervalBytes = config.indexInterval,
maxIndexSize = config.maxIndexSize,
rollJitterMs = config.randomSegmentJitter,
time = time,
fileAlreadyExists = false,
initFileSize = this.initFileSize(),
preallocate = config.preallocate))
}
...
}
// LogSegment类中,创建日志段
def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) =
this(
// 创建日志文件,文件名以.log结尾
FileRecords.open(Log.logFile(dir, startOffset), fileAlreadyExists, initFileSize, preallocate),
// 创建索引文件,文件名以.index结尾
new OffsetIndex(Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
// 创建时间索引文件,文件名以.timeindex结尾
new TimeIndex(Log.timeIndexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
// 起始偏移
startOffset,
// 写入多少消息之后建一个索引(稀疏索引)
indexIntervalBytes,
rollJitterMs,
time)
// 创建日志文件,文件名以.log结尾
def logFile(dir: File, offset: Long) = new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix)
// 创建索引文件,文件名以.index结尾
def indexFilename(dir: File, offset: Long) = new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix)
// 创建时间索引文件,文件名以.timeindex结尾
def timeIndexFilename(dir: File, offset: Long) = new File(dir, filenamePrefixFromOffset(offset) + TimeIndexFileSuffix)
// 日志段文件名称(以起始偏移startOffset来命名,20位数字,没有达到的位数用0填充)
def filenamePrefixFromOffset(offset: Long): String = {
val nf = NumberFormat.getInstance()
nf.setMinimumIntegerDigits(20)
nf.setMaximumFractionDigits(0)
nf.setGroupingUsed(false)
nf.format(offset)
}