生产者发送消息,kafka server 端在收到 Produce请求后,会由 KafkaApis 进行处理。
KafkaApis 是 server 端处理所有请求的入口,它会负责将请求的具体处理交给相应的组件进行处理。
override def handle(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
try {
......
request.header.apiKey match {
case ApiKeys.PRODUCE => handleProduceRequest(request, requestLocal)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request, requestLocal)
case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request, requestLocal)
case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
case ApiKeys.FIND_COORDINATOR => handleFindCoordinatorRequest(request)
case ApiKeys.JOIN_GROUP => handleJoinGroupRequest(request, requestLocal)
case ApiKeys.HEARTBEAT => handleHeartbeatRequest(request)
case ApiKeys.LEAVE_GROUP => handleLeaveGroupRequest(request)
case ApiKeys.SYNC_GROUP => handleSyncGroupRequest(request, requestLocal)
case ApiKeys.DESCRIBE_GROUPS => handleDescribeGroupRequest(request)
case ApiKeys.LIST_GROUPS => handleListGroupsRequest(request)
case ApiKeys.SASL_HANDSHAKE => handleSaslHandshakeRequest(request)
case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
case ApiKeys.CREATE_TOPICS => maybeForwardToController(request, handleCreateTopicsRequest)
case ApiKeys.DELETE_TOPICS => maybeForwardToController(request, handleDeleteTopicsRequest)
case ApiKeys.DELETE_RECORDS => handleDeleteRecordsRequest(request)
case ApiKeys.INIT_PRODUCER_ID => handleInitProducerIdRequest(request, requestLocal)
case ApiKeys.OFFSET_FOR_LEADER_EPOCH => handleOffsetForLeaderEpochRequest(request)
case ApiKeys.ADD_PARTITIONS_TO_TXN => handleAddPartitionToTxnRequest(request, requestLocal)
case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request, requestLocal)
case ApiKeys.END_TXN => handleEndTxnRequest(request, requestLocal)
case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request, requestLocal)
case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request, requestLocal)
case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request)
case ApiKeys.CREATE_ACLS => maybeForwardToController(request, handleCreateAcls)
case ApiKeys.DELETE_ACLS => maybeForwardToController(request, handleDeleteAcls)
case ApiKeys.ALTER_CONFIGS => maybeForwardToController(request, handleAlterConfigsRequest)
case ApiKeys.DESCRIBE_CONFIGS => handleDescribeConfigsRequest(request)
case ApiKeys.ALTER_REPLICA_LOG_DIRS => handleAlterReplicaLogDirsRequest(request)
case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
case ApiKeys.SASL_AUTHENTICATE => handleSaslAuthenticateRequest(request)
case ApiKeys.CREATE_PARTITIONS => maybeForwardToController(request, handleCreatePartitionsRequest)
case ApiKeys.CREATE_DELEGATION_TOKEN => maybeForwardToController(request, handleCreateTokenRequest)
case ApiKeys.RENEW_DELEGATION_TOKEN => maybeForwardToController(request, handleRenewTokenRequest)
case ApiKeys.EXPIRE_DELEGATION_TOKEN => maybeForwardToController(request, handleExpireTokenRequest)
case ApiKeys.DESCRIBE_DELEGATION_TOKEN => handleDescribeTokensRequest(request)
case ApiKeys.DELETE_GROUPS => handleDeleteGroupsRequest(request, requestLocal)
case ApiKeys.ELECT_LEADERS => maybeForwardToController(request, handleElectLeaders)
case ApiKeys.INCREMENTAL_ALTER_CONFIGS => maybeForwardToController(request, handleIncrementalAlterConfigsRequest)
case ApiKeys.ALTER_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleAlterPartitionReassignmentsRequest)
case ApiKeys.LIST_PARTITION_REASSIGNMENTS => maybeForwardToController(request, handleListPartitionReassignmentsRequest)
case ApiKeys.OFFSET_DELETE => handleOffsetDeleteRequest(request, requestLocal)
case ApiKeys.DESCRIBE_CLIENT_QUOTAS => handleDescribeClientQuotasRequest(request)
case ApiKeys.ALTER_CLIENT_QUOTAS => maybeForwardToController(request, handleAlterClientQuotasRequest)
case ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS => handleDescribeUserScramCredentialsRequest(request)
case ApiKeys.ALTER_USER_SCRAM_CREDENTIALS => maybeForwardToController(request, handleAlterUserScramCredentialsRequest)
case ApiKeys.ALTER_ISR => handleAlterIsrRequest(request)
case ApiKeys.UPDATE_FEATURES => maybeForwardToController(request, handleUpdateFeatures)
case ApiKeys.ENVELOPE => handleEnvelope(request, requestLocal)
case ApiKeys.DESCRIBE_CLUSTER => handleDescribeCluster(request)
case ApiKeys.DESCRIBE_PRODUCERS => handleDescribeProducersRequest(request)
case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request)
case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request)
case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
case ApiKeys.DESCRIBE_QUORUM => forwardToControllerOrFail(request)
case _ => throw new IllegalStateException(s"No handler for request api key ${request.header.apiKey}")
}
} catch {
case e: FatalExitError => throw e
case e: Throwable =>
error(s"Unexpected error handling request ${request.requestDesc(true)} " +
s"with context ${request.context}", e)
requestHelper.handleError(request, e)
} finally {
replicaManager.tryCompleteActions()
if (request.apiLocalCompleteTimeNanos < 0)
request.apiLocalCompleteTimeNanos = time.nanoseconds
}
}
KafkaApis 处理 Produce 请求是在 KafkaApis的handleProducerRequest() 方法中完成,具体如下:
def handleProduceRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = {
......
// 未授权的topic的response
val unauthorizedTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()
// 不存在的topic的response
val nonExistingTopicResponses = mutable.Map[TopicPartition, PartitionResponse]()
// 无效请求的response
val invalidRequestResponses = mutable.Map[TopicPartition, PartitionResponse]()
// 正常请求的信息
val authorizedRequestInfo = mutable.Map[TopicPartition, MemoryRecords]()
......
produceRequest.data.topicData.forEach(topic => topic.partitionData.forEach { partition =>
val topicPartition = new TopicPartition(topic.name, partition.index)
val memoryRecords = partition.records.asInstanceOf[MemoryRecords]
if (!authorizedTopics.contains(topicPartition.topic))
// 未授权的topic
unauthorizedTopicResponses += topicPartition -> new PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
else if (!metadataCache.contains(topicPartition))
// 不存在的topic
nonExistingTopicResponses += topicPartition -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
try {
// 正常请求的信息,将二元组<topicPartition,memoryRecords>添加到authorizedRequestInfo的集合中
ProduceRequest.validateRecords(request.header.apiVersion, memoryRecords)
authorizedRequestInfo += (topicPartition -> memoryRecords)
} catch {
case e: ApiException =>
// 无效请求topicPartition
invalidRequestResponses += topicPartition -> new PartitionResponse(Errors.forException(e))
}
})
if (authorizedRequestInfo.isEmpty)
sendResponseCallback(Map.empty)
else {
// 调用副本管理器将消息追加到副本
replicaManager.appendRecords(
timeout = produceRequest.timeout.toLong,
requiredAcks = produceRequest.acks,
internalTopicsAllowed = internalTopicsAllowed,
origin = AppendOrigin.Client,
entriesPerPartition = authorizedRequestInfo,
requestLocal = requestLocal,
responseCallback = sendResponseCallback,
recordConversionStatsCallback = processingStatsCallback)
produceRequest.clearPartitionRecords()
}
}
其中authorizedRequestInfo中的信息如下:
调用副本管理器向本地的leader副本log追加数据,然后判断ack参数是否为-1,若ack=-1,需要等待isr的所有follower都写入成功才能返回结果。若ack不为-1,立即返回结果。源码如下:
def appendRecords(timeout: Long,
requiredAcks: Short,
internalTopicsAllowed: Boolean,
origin: AppendOrigin,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock] = None,
recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => (),
requestLocal: RequestLocal = RequestLocal.NoCaching): Unit = {
// 校检acks是否有效(取值:0,1,-1)
if (isValidRequiredAcks(requiredAcks)) {
val sTime = time.milliseconds
// 向本地的leader副本log追加数据
val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,origin, entriesPerPartition, requiredAcks, requestLocal)
// 当ack=-1时,需要等待isr的所有follower都写入成功,才能返回结果
if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
......
} else {
// 当ack不为-1时,立即返回结果
......
}
} else {
......
}
追加日志的实际操作是在 ReplicaManager的appendToLocalLog()方法中完成的,具体实现如下:
private def appendToLocalLog(internalTopicsAllowed: Boolean,
origin: AppendOrigin,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
requiredAcks: Short,
requestLocal: RequestLocal): Map[TopicPartition, LogAppendResult] = {
......
entriesPerPartition.map { case (topicPartition, records) =>
// kafka内置的topic是否允许追加数据
if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
(topicPartition, LogAppendResult(
LogAppendInfo.UnknownLogAppendInfo,
Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}"))))
} else {
try {
// 查找对应的分区,并向分区的leader副本写入数据
val partition = getPartitionOrException(topicPartition)
val info = partition.appendRecordsToLeader(records, origin, requiredAcks, requestLocal)
val numAppendedMessages = info.numMessages
......
} catch {
......
}
}
}
}
ReplicaManager 在往leader副本日志追加 records 时,调用的是 Partition 的 appendRecordsToLeader() 方法,具体实现如下:
def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int,
requestLocal: RequestLocal): LogAppendInfo = {
val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
leaderLogIfLocal match {
case Some(leaderLog) =>
val minIsr = leaderLog.config.minInSyncReplicas
// isr列表中的副本数量
val inSyncSize = isrState.isr.size
// 如果 ack 设置为-1, 且isr副本数量小于设置的 min.insync.replicas 时,就会抛出NotEnoughReplicasException异常
if (inSyncSize < minIsr && requiredAcks == -1) {
throw new NotEnoughReplicasException(s"The size of the current ISR ${isrState.isr} " +
s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")
}
// 向leader副本对应的log追加消息
val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
interBrokerProtocolVersion, requestLocal)
// 判断是否需要增加 HW
(info, maybeIncrementLeaderHW(leaderLog))
case None =>
throw new NotLeaderOrFollowerException("Leader not local for partition %s on broker %d"
.format(topicPartition, localBrokerId))
}
}
info.copy(leaderHwChange = if (leaderHWIncremented) LeaderHwChange.Increased else LeaderHwChange.Same)
}
def appendAsLeader(records: MemoryRecords,
leaderEpoch: Int,
origin: AppendOrigin = AppendOrigin.Client,
interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion,
requestLocal: RequestLocal = RequestLocal.NoCaching): LogAppendInfo = {
val validateAndAssignOffsets = origin != AppendOrigin.RaftLeader
append(records, origin, interBrokerProtocolVersion, validateAndAssignOffsets, leaderEpoch, Some(requestLocal), ignoreRecordSize = false)
}
向 active segment 追加 log,必要的情况下,滚动创建新的 segment
private def append(records: MemoryRecords,
origin: AppendOrigin,
interBrokerProtocolVersion: ApiVersion,
validateAndAssignOffsets: Boolean,
leaderEpoch: Int,
requestLocal: Option[RequestLocal],
ignoreRecordSize: Boolean): LogAppendInfo = {
val appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, leaderEpoch)
if (appendInfo.shallowCount == 0) appendInfo
else {
// 删除这批消息中无效的消息
var validRecords = trimInvalidBytes(records, appendInfo)
lock synchronized {
maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
localLog.checkIfMemoryMappedBufferClosed()
if (validateAndAssignOffsets) {
// 分配offset
val offset = new LongRef(localLog.logEndOffset)
......
// 获取合法的MemoryRecords
validRecords = validateAndOffsetAssignResult.validatedRecords
appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
appendInfo.lastOffset = offset.value - 1
appendInfo.recordConversionStats = validateAndOffsetAssignResult.recordConversionStats
if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
appendInfo.logAppendTime = now
if (!ignoreRecordSize && validateAndOffsetAssignResult.messageSizeMaybeChanged) {
validRecords.batches.forEach { batch =>
// 若batch大小大于配置的max.message.bytes(1MB),则抛RecordTooLargeException异常
if (batch.sizeInBytes > config.maxMessageSize) {
throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +
s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")
}
}
}
} else {
......
}
......
// 检查消息集大小是否超过配置的segment段大小(segment.bytes参数,默认1GB)
if (validRecords.sizeInBytes > config.segmentSize) {
throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " +
s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")
}
// 获取一个可用的segment
val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
......
maybeDuplicate match {
......
// 向segment中追加数据
localLog.append(appendInfo.lastOffset, appendInfo.maxTimestamp, appendInfo.offsetOfMaxTimestamp, validRecords)
......
}
}
}
}
}
向segment中追加数据
private[log] def append(lastOffset: Long, largestTimestamp: Long, shallowOffsetOfMaxTimestamp: Long, records: MemoryRecords): Unit = {
// 向当前活跃的segment中追加数据
segments.activeSegment.append(largestOffset = lastOffset, largestTimestamp = largestTimestamp,
shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp, records = records)
// 更新LEO
updateLogEndOffset(lastOffset + 1)
}
向segment中追加数据,源码如下:
def append(largestOffset: Long,
largestTimestamp: Long,
shallowOffsetOfMaxTimestamp: Long,
records: MemoryRecords): Unit = {
if (records.sizeInBytes > 0) {
trace(s"Inserting ${records.sizeInBytes} bytes at end offset $largestOffset at position ${log.sizeInBytes} " +
s"with largest timestamp $largestTimestamp at shallow offset $shallowOffsetOfMaxTimestamp")
val physicalPosition = log.sizeInBytes()
if (physicalPosition == 0)
rollingBasedTimestamp = Some(largestTimestamp)
ensureOffsetInRange(largestOffset)
// 追加数据
val appendedBytes = log.append(records)
// Update the in memory max timestamp and corresponding offset.
if (largestTimestamp > maxTimestampSoFar) {
maxTimestampAndOffsetSoFar = TimestampOffset(largestTimestamp, shallowOffsetOfMaxTimestamp)
}
// 判断是否需要追加索引(数据每次都会添加到数据文件中,但不是每次都会添加到索引文件的)
if (bytesSinceLastIndexEntry > indexIntervalBytes) {
// 写入位移索引文件
offsetIndex.append(largestOffset, physicalPosition)
// 写入时间戳索引文件
timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
bytesSinceLastIndexEntry = 0
}
bytesSinceLastIndexEntry += records.sizeInBytes
}
}