1、主题创建
1.1、方式
用kafka自带的脚本创建一个主题,主题名:topic_a,分区数为3,副本因子为3,命令如下:
方式一
指定3分区3副本,由kafka用算法自动生成分区副本分配方案:
bin/kafka-topics.sh --create --topic topic_a --zookeeper localhost:2184 --partitions 3 --replication-factor 3
方式二
自定义分区副本分配方案(3分区3副本),例如:
sh bin/kafka-topics.sh --create --topic topic_a --zookeeper localhost:2184 --replica-assignment 1:2:0,2:0:1,0:1:2
执行脚本后返回结果:
Created topic "topic_a".
1.2、数据
创建的主题:topic_a,数据如下:
1、ZK数据如下:
2、kafka存储文件如下:
1.3、原理
1.3.1、执行脚本
kafka-topic.sh脚本的内容很简单,如下:
exec $(dirname $0)/kafka-run-class.sh kafka.admin.TopicCommand "$@"
最终调用的是 kafka.admin.TopicCommand 的createTopic方法来创建主题。
1.3.2、确定分区副本分配方案
1.3.2.1、流程
分区副本分配方案可以由算法自动生成,也可以用参数--replica-assignment来指定,流程如下:
- 如果指定了方案
1.1. 先对方案做解析校检
● 不同分区之间用 "," 分割
● 同一个分区的所有副本用 ":" 分割,第一个为 Leader 副本所在的节点
● 同一个分区的副本不能在同一个broker上
● 同一个主题不同分区的副本数必须相同
1.2. 再把分配方案写入zk
1.2.1. zk节点:/config/topics/<TOPIC_NAME>,写入数据:{"version":1,"config":{}}
1.2.2. zk节点:/brokers/topics/<TOPIC_NAME>,写入副本分配方案2 - 如果未指定方案
2.1. 先用算法自动生成方案
2.1.1. 参数校检
● 分区数不能小于0
● 副本数不能小于0
● 副本数不能大于broker数
2.1.2. 算法自动生成方案
2.2. 再把分配方案写入zk
2.2.1. zk节点:/config/topics/<TOPIC_NAME>,写入数据:{"version":1,"config":{}}
2.2.2. zk节点:/brokers/topics/<TOPIC_NAME>,写入副本分配方案
ZK数据结构如下:
1.3.2.2、源码
判断是否用参数--replica-assignment指定了分区副本分配方案,若指定了,则先做解析校检,然后把分配方案写入ZK;若未指定,则先用算法自动生成方案,再把分配方案写入ZK.
def createTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
val topic = opts.options.valueOf(opts.topicOpt)
val configs = parseTopicConfigsToBeAdded(opts)
val ifNotExists = opts.options.has(opts.ifNotExistsOpt)
if (Topic.hasCollisionChars(topic))
println("WARNING: Due to limitations in metric names, topics with a period ('.') or underscore ('_') could collide. To avoid issues it is best to use either, but not both.")
try {
// 是否指定了分区副本分配方案(参数:--replica-assignment)
if (opts.options.has(opts.replicaAssignmentOpt)) {
// 1、若指定了分配方案
// 1.1、则先对方案做解析校检
val assignment = parseReplicaAssignment(opts.options.valueOf(opts.replicaAssignmentOpt))
// 1.2、再把分配方案写入zk
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, assignment, configs, update = false)
} else {
// 2、若未指定分配方案
// 2.1、先用算法自动生成方案
CommandLineUtils.checkRequiredArgs(opts.parser, opts.options, opts.partitionsOpt, opts.replicationFactorOpt)
val partitions = opts.options.valueOf(opts.partitionsOpt).intValue
val replicas = opts.options.valueOf(opts.replicationFactorOpt).intValue
val rackAwareMode = if (opts.options.has(opts.disableRackAware)) RackAwareMode.Disabled
else RackAwareMode.Enforced
// 2.2、再把分配方案写入zk
AdminUtils.createTopic(zkUtils, topic, partitions, replicas, configs, rackAwareMode)
}
println("Created topic \"%s\".".format(topic))
} catch {
case e: TopicExistsException => if (!ifNotExists) throw e
}
}
1.3.2.2.1、指定了分配方案
如果指定了方案,则先对方案做解析校检,如下:
// 方案校检和解析
def parseReplicaAssignment(replicaAssignmentList: String): Map[Int, List[Int]] = {
// 分区之间用 "," 分割
val partitionList = replicaAssignmentList.split(",")
val ret = new mutable.HashMap[Int, List[Int]]()
for (i <- 0 until partitionList.size) {
// 同一个分区的所有副本用 ":" 分割,第一个为 Leader 副本所在的节点
val brokerList = partitionList(i).split(":").map(s => s.trim().toInt)
val duplicateBrokers = CoreUtils.duplicates(brokerList)
// 同一个分区的副本不能在同一个broker上
if (duplicateBrokers.nonEmpty)
throw new AdminCommandFailedException("Partition replica lists may not contain duplicate entries: %s".format(duplicateBrokers.mkString(",")))
ret.put(i, brokerList.toList)
// 同一个主题不同分区的副本数必须相同
if (ret(i).size != ret(0).size)
throw new AdminOperationException("Partition " + i + " has different replication factor: " + brokerList)
}
ret.toMap
}
然后将分配方案写入zk:
def createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils: ZkUtils,
topic: String,
partitionReplicaAssignment: Map[Int, Seq[Int]],
config: Properties = new Properties,
update: Boolean = false) {
validateCreateOrUpdateTopic(zkUtils, topic, partitionReplicaAssignment, config, update)
if (!update) {
// 1.2.1、zk节点:/config/topics/<TOPIC_NAME>,写入数据:{"version":1,"config":{}}
writeEntityConfig(zkUtils, getEntityConfigPath(ConfigType.Topic, topic), config)
}
// 1.2.2、zk节点:/brokers/topics/<TOPIC_NAME>,写入副本分配方案
writeTopicPartitionAssignment(zkUtils, topic, partitionReplicaAssignment, update)
}
1.3.2.2.2、未指定分配方案
如果未指定方案,则先用算法自动生成方案,再把方案写入zk:
def createTopic(zkUtils: ZkUtils,
topic: String,
partitions: Int,
replicationFactor: Int,
topicConfig: Properties = new Properties,
rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
// 获取broker元数据
val brokerMetadatas = getBrokerMetadatas(zkUtils, rackAwareMode)
// 2.1、算法自动生成分配方案
val replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerMetadatas, partitions, replicationFactor)
// 2.2、分配方案写入zk
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, replicaAssignment, topicConfig)
}
算法自动生成分配方案:
def assignReplicasToBrokers(brokerMetadatas: Seq[BrokerMetadata],
nPartitions: Int,
replicationFactor: Int,
fixedStartIndex: Int = -1,
startPartitionId: Int = -1): Map[Int, Seq[Int]] = {
// 分区数不能小于0
if (nPartitions <= 0)
throw new InvalidPartitionsException("number of partitions must be larger than 0")
// 副本数不能小于0
if (replicationFactor <= 0)
throw new InvalidReplicationFactorException("replication factor must be larger than 0")
// 副本数不能大于broker数
if (replicationFactor > brokerMetadatas.size)
throw new InvalidReplicationFactorException(s"replication factor: $replicationFactor larger than available brokers: ${brokerMetadatas.size}")
if (brokerMetadatas.forall(_.rack.isEmpty))
// 分配算法(无机架感知的情况)
assignReplicasToBrokersRackUnaware(nPartitions, replicationFactor, brokerMetadatas.map(_.id), fixedStartIndex,
startPartitionId)
else {
if (brokerMetadatas.exists(_.rack.isEmpty))
throw new AdminOperationException("Not all brokers have rack information for replica rack aware assignment")
// 分配算法(有机架感知的情况)
assignReplicasToBrokersRackAware(nPartitions, replicationFactor, brokerMetadatas, fixedStartIndex,
startPartitionId)
}
}
// 分配算法(无机架感知的情况)
private def assignReplicasToBrokersRackUnaware(nPartitions: Int, // 分区数
replicationFactor: Int, // 副本数
brokerList: Seq[Int], // broker列表
fixedStartIndex: Int, // -1
startPartitionId: Int): Map[Int, Seq[Int]] = { // -1
// 用来存储分配方案
val ret = mutable.Map[Int, Seq[Int]]()
// broker集合
val brokerArray = brokerList.toArray
// fixedStartIndex默认为-1,所以产生一个随机数赋值给startIndex,比如 1
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
// startPartitionId默认为-1,所以currentPartitionId为0
var currentPartitionId = math.max(0, startPartitionId)
// fixedStartIndex默认为-1,所以产生一个随机数赋值给nextReplicaShift,比如 2
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
// 遍历所有分区,计算每个分区的副本分配方案
for (_ <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
nextReplicaShift += 1
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1)
replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
ret.put(currentPartitionId, replicaBuffer)
currentPartitionId += 1
}
ret
}
// 每个分区第一个副本确定后,其他副本的分配计算方法
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
(firstReplicaIndex + shift) % nBrokers
}
对第一个分区进行计算,最终得到副本集合:[1, 2, 0]
// currentPartitionId = 0
// startIndex = 1
// nextReplicaShift = 2
if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
nextReplicaShift += 1
// firstReplicaIndex = (0 + 1) % 3 = 1
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
// replicaBuffer = [1]
val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1)
replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
// replicaBuffer = [1, 2, 0]
// ret = {0:[1, 2, 0]}
ret.put(currentPartitionId, replicaBuffer)
// currentPartitionId = 1
currentPartitionId += 1
################replicaIndex第一次循环###################
// firstReplicaIndex = 1
// secondReplicaShift = 2
// replicaIndex = 0
// nBrokers = 3
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
// shift = 1 + (2 + 0) % (3 - 1) = 1 + 0 = 1
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
// (1 + 1) % 3 = 2
(firstReplicaIndex + shift) % nBrokers
}
################replicaIndex第二次循环###################
// firstReplicaIndex = 1
// secondReplicaShift = 2
// replicaIndex = 1
// nBrokers = 3
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
// shift = 1 + (2 + 1) % (3 - 1) = 1 + 1 = 2
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
// (1 + 2) % 3 = 0
(firstReplicaIndex + shift) % nBrokers
}
对第二分区进行计算,最终得到副本集合:[1, 2, 0]
// currentPartitionId = 1
// startIndex = 1
// nextReplicaShift = 2
if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
nextReplicaShift += 1
// firstReplicaIndex = (1 + 1) % 3 = 2
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
// replicaBuffer = [2]
val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1)
replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
// replicaBuffer = [2, 0, 1]
// ret = {1:[2, 0, 1], 0:[1, 2, 0]}
ret.put(currentPartitionId, replicaBuffer)
// currentPartitionId = 2
currentPartitionId += 1
################replicaIndex第一次循环###################
// firstReplicaIndex = 2
// secondReplicaShift = 2
// replicaIndex = 0
// nBrokers = 3
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
// shift = 1 + (2 + 0) % (3 - 1) = 1 + 0 = 1
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
// (2 + 1) % 3 = 0
(firstReplicaIndex + shift) % nBrokers
}
################replicaIndex第二次循环###################
// firstReplicaIndex = 1
// secondReplicaShift = 2
// replicaIndex = 1
// nBrokers = 3
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
// shift = 1 + (2 + 1) % (3 - 1) = 1 + 1 = 2
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
// (2 + 2) % 3 = 1
(firstReplicaIndex + shift) % nBrokers
}
对第三分区进行计算,最终得到副本集合:[0, 1, 2]
// currentPartitionId = 2
// startIndex = 1
// nextReplicaShift = 2
if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
nextReplicaShift += 1
// firstReplicaIndex = (2 + 1) % 3 = 0
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
// replicaBuffer = [0]
val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1)
replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
// replicaBuffer = [0, 1, 2]
// ret = {2:[0, 1, 2], 1:[2, 0, 1], 0:[1, 2, 0]]}
ret.put(currentPartitionId, replicaBuffer)
// currentPartitionId = 3
currentPartitionId += 1
################replicaIndex第一次循环###################
// firstReplicaIndex = 0
// secondReplicaShift = 2
// replicaIndex = 0
// nBrokers = 3
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
// shift = 1 + (2 + 0) % (3 - 1) = 1 + 0 = 1
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
// (0 + 1) % 3 = 1
(firstReplicaIndex + shift) % nBrokers
}
################replicaIndex第二次循环###################
// firstReplicaIndex = 0
// secondReplicaShift = 2
// replicaIndex = 1
// nBrokers = 3
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
// shift = 1 + (2 + 1) % (3 - 1) = 1 + 1 = 2
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
// (0 + 2) % 3 = 2
(firstReplicaIndex + shift) % nBrokers
}
最终得到副本分配方案:
"2" : [ 0, 1, 2 ],
"1" : [ 2, 0, 1 ],
"0" : [ 1, 2, 0 ]
最后方案写入ZK中(/brokers/topics/<TOPIC_NAME>),数据如下:
1.3.3、监听器
Controller选举成功后,会创建监听器(TopicChangeListener)监听ZK节点(/brokers/topics)的变化。当新建主题后,分区副本分配方案会写入ZK(/brokers/topics/<TOPIC_NAME>),Controller便能感知到变化,从而触发主题新增逻辑。
1.3.3.1、流程
创建主题流程如下:
- 从ZK节点(/brokers/topics/<TOPIC_NAME>)获取分区副本分配方案
- 更新缓存
- Controller创建新主题
3.1. 为新主题注册分区变更监听器(监听ZK节点:/brokers/topics/<TOPIC_NAME> )
3.2. 分区状态机状态转移至NewPartition
3.3. 副本状态机状态转移至NewReplica
3.4. AR中存活的第一个副本,选举为Leader
3.5. AR中存活的副本集合,为ISR
3.6. ZK上创建持久节点,/brokers/topics/<TOPIC_NAME>/partitions/<PARTITION_ID>/state
3.7. ZK节点写入分区的元数据(leader、isr、leader_epoch、controller_epoch、version)
3.8. 更新本地缓存的元数据
3.9. Controller向所有的broker发送LeaderAndIsr请求
3.10. 分区状态机状态转移至OnlinePartition(可用的状态)
3.11. 副本状态机状态转移至OnlineReplica(可用的状态)
1.3.3.2、源码
class TopicChangeListener(protected val controller: KafkaController) extends ControllerZkChildListener {
protected def logName = "TopicChangeListener"
def doHandleChildChange(parentPath: String, children: Seq[String]) {
inLock(controllerContext.controllerLock) {
if (hasStarted.get) {
try {
val currentChildren = {
debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
children.toSet
}
// 要新增的主题
val newTopics = currentChildren -- controllerContext.allTopics
// 要删除的主题
val deletedTopics = controllerContext.allTopics -- currentChildren
controllerContext.allTopics = currentChildren
// 1、从ZK节点(/brokers/topics/<TOPIC_NAME>)获取分区副本分配方案
val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
// 2、更新缓存
controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
!deletedTopics.contains(p._1.topic))
controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
// 3、Controller处理新创建的主题
if (newTopics.nonEmpty)
controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet)
} catch {
case e: Throwable => error("Error while handling new topic", e)
}
}
}
}
}
// 新主题创建
def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) {
// 注册分区变更监听器(监听ZK节点:/brokers/topics/<TOPIC_NAME> )
topics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
// 新分区创建
onNewPartitionCreation(newPartitions)
}
// 新分区创建
def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
// 分区状态机状态转移至NewPartition
partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
// 副本状态机状态转移至NewReplica
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
// 分区状态机状态转移至OnlinePartition(确定分区的Leader、ISR等,并写入ZK,更新缓存,向相关Broker发送LeaderAndIsr请求)
partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
// 副本状态机状态转移至OnlineReplica
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
}
2、主题列表
查询所有的主题,可以执行以下命令:
sh bin/kafka-topics.sh --list --zookeeper localhost:2184
执行脚本后返回结果:
topic_a
topic_b
topic_c
原理如下:
- 读取ZK节点(/brokers/topics)下的所有主题
- 遍历所有主题,过滤掉删除的主题(/admin/delete_topics/<TOPIC_NAME>)
def listTopics(zkUtils: ZkUtils, opts: TopicCommandOptions) {
// 1、读取ZK节点(/brokers/topics)下的所有主题
val topics = getTopics(zkUtils, opts)
// 2、遍历所有主题,过滤掉删除的主题(/admin/delete_topics/<TOPIC_NAME>)
for (topic <- topics) {
if (zkUtils.pathExists(getDeleteTopicPath(topic))) {
println("%s - marked for deletion".format(topic))
} else {
println(topic)
}
}
}
3、主题详情
查询某个主题的详情,可以执行以下命令:
sh bin/kafka-topics.sh --describe --topic topic_a --zookeeper localhost:2184
执行脚本后返回结果:
Topic:topic_a PartitionCount:2 ReplicationFactor:1 Configs:
Topic: topic_a Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: topic_a Partition: 1 Leader: 0 Replicas: 0 Isr: 0
原理如下:
def describeTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
val topics = getTopics(zkUtils, opts)
val reportUnderReplicatedPartitions = opts.options.has(opts.reportUnderReplicatedPartitionsOpt)
val reportUnavailablePartitions = opts.options.has(opts.reportUnavailablePartitionsOpt)
val reportOverriddenConfigs = opts.options.has(opts.topicsWithOverridesOpt)
// 从ZK读取broker列表(/brokers/ids)
val liveBrokers = zkUtils.getAllBrokersInCluster().map(_.id).toSet
for (topic <- topics) {
// 从ZK读取分区副本分配方案(/brokers/topics/<TOPIC_NAME>)
zkUtils.getPartitionAssignmentForTopics(List(topic)).get(topic) match {
case Some(topicPartitionAssignment) =>
val describeConfigs: Boolean = !reportUnavailablePartitions && !reportUnderReplicatedPartitions
val describePartitions: Boolean = !reportOverriddenConfigs
val sortedPartitions = topicPartitionAssignment.toList.sortWith((m1, m2) => m1._1 < m2._1)
if (describeConfigs) {
val configs = AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic).asScala
if (!reportOverriddenConfigs || configs.nonEmpty) {
val numPartitions = topicPartitionAssignment.size
val replicationFactor = topicPartitionAssignment.head._2.size
println("Topic:%s\tPartitionCount:%d\tReplicationFactor:%d\tConfigs:%s"
.format(topic, numPartitions, replicationFactor, configs.map(kv => kv._1 + "=" + kv._2).mkString(",")))
}
}
if (describePartitions) {
// 遍历主题的所有分区
for ((partitionId, assignedReplicas) <- sortedPartitions) {
// 从ZK读取Leader和ISR(/brokers/topics/<TOPIC_NAME>/partitions/<PARTITION_ID>/state)
val inSyncReplicas = zkUtils.getInSyncReplicasForPartition(topic, partitionId)
val leader = zkUtils.getLeaderForPartition(topic, partitionId)
if ((!reportUnderReplicatedPartitions && !reportUnavailablePartitions) ||
(reportUnderReplicatedPartitions && inSyncReplicas.size < assignedReplicas.size) ||
(reportUnavailablePartitions && (leader.isEmpty || !liveBrokers.contains(leader.get)))) {
print("\tTopic: " + topic)
print("\tPartition: " + partitionId)
print("\tLeader: " + (if (leader.isDefined) leader.get else "none"))
print("\tReplicas: " + assignedReplicas.mkString(","))
println("\tIsr: " + inSyncReplicas.mkString(","))
}
}
}
case None =>
println("Topic " + topic + " doesn't exist!")
}
}
}
4、修改主题
4.1、扩容
4.1.1、方式
将主题topic_a的分区数增加到4,如下:
sh bin/kafka-topics.sh --alter --topic topic_a --zookeeper localhost:2184 --partitions 4
执行脚本后返回结果:
Adding partitions succeeded!
4.1.2、原理
def alterTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
...
// 分区数
val nPartitions = opts.options.valueOf(opts.partitionsOpt).intValue
// 副本
val replicaAssignmentStr = opts.options.valueOf(opts.replicaAssignmentOpt)
// 扩容
AdminUtils.addPartitions(zkUtils, topic, nPartitions, replicaAssignmentStr)
println("Adding partitions succeeded!")
...
}
def addPartitions(zkUtils: ZkUtils,
topic: String,
numPartitions: Int = 1, // 4
replicaAssignmentStr: String = "", // null
checkBrokerAvailable: Boolean = true,
rackAwareMode: RackAwareMode = RackAwareMode.Enforced) {
// 从ZK读取分区副本分配方案(/brokers/topics/<TOPIC_NAME>)
val existingPartitionsReplicaList = zkUtils.getReplicaAssignmentForTopics(List(topic))
// 需要增加的分区数
val partitionsToAdd = numPartitions - existingPartitionsReplicaList.size
// 分区数只能增加,不能减少
if (partitionsToAdd <= 0)
throw new AdminOperationException("The number of partitions for a topic can only be increased")
...
// 算法自动生成新的分配方案
val newPartitionReplicaList = ...
partitionReplicaList ++= newPartitionReplicaList
// 将新的分配方案写入ZK
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, partitionReplicaList, update = true)
}
// 分配算法
private def assignReplicasToBrokersRackUnaware(nPartitions: Int, // 分区数 1
replicationFactor: Int, // 副本数 3
brokerList: Seq[Int], // broker列表
fixedStartIndex: Int, // 1
startPartitionId: Int): Map[Int, Seq[Int]] = { // 3
// 用来存储分配方案
val ret = mutable.Map[Int, Seq[Int]]()
// broker集合
val brokerArray = brokerList.toArray
// startIndex = 1 (fixedStartIndex为1)
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
// currentPartitionId = 3 (startPartitionId为3)
var currentPartitionId = math.max(0, startPartitionId)
// nextReplicaShift = 1 (fixedStartIndex为1)
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
// 遍历所有分区,计算每个分区的副本分配方案
for (_ <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
nextReplicaShift += 1
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1)
replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
ret.put(currentPartitionId, replicaBuffer)
currentPartitionId += 1
}
ret
}
// 每个分区第一个副本确定后,其他副本的分配计算方法
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
(firstReplicaIndex + shift) % nBrokers
}
对新分区进行计算,最终得到副本集合:[1, 2, 0]
// currentPartitionId = 3
// startIndex = 1
// nextReplicaShift = 1
// (3 > 0 && (3 % 3 == 0)) true
if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
// nextReplicaShift = 2
nextReplicaShift += 1
// firstReplicaIndex = (3 + 1) % 3 = 1
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
// replicaBuffer = [1]
val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1)
replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
// replicaBuffer = [1, 2, 0]
// ret = {3:[1, 2, 0]}
ret.put(currentPartitionId, replicaBuffer)
// currentPartitionId = 4
currentPartitionId += 1
################replicaIndex第一次循环###################
// firstReplicaIndex = 1
// secondReplicaShift = 2
// replicaIndex = 0
// nBrokers = 3
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
// shift = 1 + (2 + 0) % (3 - 1) = 1 + 0 = 1
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
// (1 + 1) % 3 = 2
(firstReplicaIndex + shift) % nBrokers
}
################replicaIndex第二次循环###################
// firstReplicaIndex = 1
// secondReplicaShift = 2
// replicaIndex = 1
// nBrokers = 3
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
// shift = 1 + (2 + 1) % (3 - 1) = 1 + 1 = 2
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
// (1 + 2) % 3 = 0
(firstReplicaIndex + shift) % nBrokers
}
最后方案写入ZK中(/brokers/topics/<TOPIC_NAME>),数据如下:
4.2、缩容
将主题topic_a的分区数缩减到1,如下:
sh bin/kafka-topics.sh --alter --topic topic_a --zookeeper localhost:2184 --partitions 1
执行脚本后报错:
kafka.admin.AdminOperationException: The number of partitions for a topic can only be increased
kafka不支持缩减分区,代码如下:
// 要增加的分区数 = 新的分区数 - 老的分区数
val partitionsToAdd = numPartitions - existingPartitionsReplicaList.size
if (partitionsToAdd <= 0)
throw new AdminOperationException("The number of partitions for a topic can only be increased")
5、主题删除
5.1、方式
删除主题,可以执行以下命令:
sh bin/kafka-topics.sh --delete --topic topic_c --zookeeper localhost:2184
执行脚本后返回结果:
Topic topic_c is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
5.2、原理
5.2.1、执行脚本
执行脚本后,最终调用的是 kafka.admin.TopicCommand 的deleteTopic方法来删除主题。
流程如下:
- 校检,kafka内置的主题不能删除(__consumer_offsets)
- 创建ZK节点:/admin/delete_topics/<TOPIC_NAME>
def deleteTopic(zkUtils: ZkUtils, opts: TopicCommandOptions) {
val topics = getTopics(zkUtils, opts)
val ifExists = opts.options.has(opts.ifExistsOpt)
if (topics.isEmpty && !ifExists) {
throw new IllegalArgumentException("Topic %s does not exist on ZK path %s".format(opts.options.valueOf(opts.topicOpt),
opts.options.valueOf(opts.zkConnectOpt)))
}
topics.foreach { topic =>
try {
// kafka内置的主题不能删除(__consumer_offsets)
if (Topic.isInternal(topic)) {
throw new AdminOperationException("Topic %s is a kafka internal topic and is not allowed to be marked for deletion.".format(topic))
} else {
// 创建ZK节点:/admin/delete_topics/<TOPIC_NAME>
zkUtils.createPersistentPath(getDeleteTopicPath(topic))
println("Topic %s is marked for deletion.".format(topic))
println("Note: This will have no impact if delete.topic.enable is not set to true.")
}
} catch {
case _: ZkNodeExistsException =>
println("Topic %s is already marked for deletion.".format(topic))
case e: AdminOperationException =>
throw e
case _: Throwable =>
throw new AdminOperationException("Error while deleting topic %s".format(topic))
}
}
}
5.2.2、监听器
Controller选举成功后,会创建监听器(TopicDeletionListener)监听ZK节点(/admin/delete_topics)的变化。当删除主题后,这个ZK节点下会创建子节点(主题名),Controller便能感知到变化,从而触发主题删除逻辑。
5.2.2.1、流程
删除主题流程如下:
- 检查主题的所有副本之前是否已经都删除成功了(副本状态:ReplicaDeletionSuccessful)
1.1. 若是,则解除对该主题分区的监听器,删除ZK和Controller缓存中该主题的信息
1.1.1. 解除分区变化监听器
1.1.2. 副本状态机将副本状态转移至NonExistentReplica
1.1.3. 分区状态机将分区状态先转移至OfflinePartition
1.1.4. 分区状态机将分区状态再转移至NonExistentPartition
1.1.5. 删除ZK节点/brokers/topics/<TOPIC_NAME>
1.1.6. 删除ZK节点/config/topics/<TOPIC_NAME>
1.1.7. 删除ZK节点/admin/delete_topics/<TOPIC_NAME>
1.1.8. 移除Controller上下文中该主题相关的信息
1.2. 如果有副本删除失败了(ReplicaDeletionIneligible),则将状态转移到OfflineReplica,后面重新进行删除 - 遍历删除主题的所有副本
2.1. 副本状态机将挂掉的副本状态转移至ReplicaDeletionIneligible(不可删除)
2.2. 副本状态机将在线的副本状态转移至OfflineReplica(可删除)
2.3. 副本状态机将可删除的在线的副本状态转移至ReplicaDeletionStarted
2.4. 给broker发送StopReplica请求删除副本
2.5. 回调函数处理删除的结果
2.5.1. 若删除副本成功,将副本状态设置为ReplicaDeletionSuccessful
2.5.2. 若删除副本失败,将副本状态设置为ReplicaDeletionIneligible
2.5.3. 回调函数中再次调用 步骤1
5.2.2.2、源码
TopicDeletionListener监听器逻辑如下:
case class TopicDeletion(var topicsToBeDeleted: Set[String]) extends ControllerEvent {
def state = ControllerState.TopicDeletion
override def process(): Unit = {
if (!isActive) return
val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
if (nonExistentTopics.nonEmpty) {
nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
}
// 需要被删除的主题
topicsToBeDeleted --= nonExistentTopics
// 是否允许删除主题(delete.topic.enable,默认false)
if (config.deleteTopicEnable) {
if (topicsToBeDeleted.nonEmpty) {
topicsToBeDeleted.foreach { topic =>
val partitionReassignmentInProgress =
controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
if (partitionReassignmentInProgress)
// 如果主题正在迁移中,那么将主题标记为非法删除状态
topicDeletionManager.markTopicIneligibleForDeletion(Set(topic))
}
// 将主题添加到待删除主题集合中(topicsToBeDeleted)
topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted)
}
} else {
// 如果不允许删除主题,则删掉zk中/admin/delete_topics下的节点
for (topic <- topicsToBeDeleted) {
zkUtils.zkClient.delete(getDeleteTopicPath(topic))
}
}
}
}
调用TopicDeletionManager将分区添加到待删除分区集合如下:
def enqueueTopicsForDeletion(topics: Set[String]) {
if(isDeleteTopicEnabled) {
// 将主题添加到待删除主题集合中(topicsToBeDeleted)
topicsToBeDeleted ++= topics
// 将分区添加到待删除分区集合中(partitionsToBeDeleted)
partitionsToBeDeleted ++= topics.flatMap(controllerContext.partitionsForTopic)
// 主题删除
resumeDeletions()
}
}
private def resumeDeletions(): Unit = {
val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted
// 要删除的主题列表
topicsQueuedForDeletion.foreach { topic =>
// 1、检查主题的所有副本之前是否已经删除成功了(副本状态:ReplicaDeletionSuccessful)
if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {
// 1.1、若是,则解除对该主题分区的监听器,删除ZK和Controller缓存中该主题的信息
completeDeleteTopic(topic)
} else {
...
// 1.2、如果有副本删除失败了(ReplicaDeletionIneligible),则将状态转移到OfflineReplica,后面重新进行删除
if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
markTopicForDeletionRetry(topic)
}
...
}
if(isTopicEligibleForDeletion(topic)) {
// 2、遍历删除主题的所有副本
onTopicDeletion(Set(topic))
} else if(isTopicIneligibleForDeletion(topic)) {
info("Not retrying deletion of topic %s at this time since it is marked ineligible for deletion".format(topic))
}
}
}
// 主题删除
private def onTopicDeletion(topics: Set[String]) {
val partitions = topics.flatMap(controllerContext.partitionsForTopic)
controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic)
topics.foreach { topic =>
// 分区删除
onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).keySet)
}
}
// 分区删除
private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicAndPartition]) {
val replicasPerPartition = controllerContext.replicasForPartition(partitionsToBeDeleted)
// 副本删除
startReplicaDeletion(replicasPerPartition)
}
// 副本删除
private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) {
replicasForTopicsToBeDeleted.groupBy(_.topic).keys.foreach { topic =>
val aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic == topic)
val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic
val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas
// 2.1、副本状态机将挂掉的副本状态转移至ReplicaDeletionIneligible(不可删除)
replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionIneligible)
// 2.2、副本状态机将在线的副本状态转移至OfflineReplica(可删除)
replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica)
// 2.3、副本状态机将可删除的副本状态转移至ReplicaDeletionStarted
// 2.4、给broker发送StopReplica请求关闭副本
// 2.5、设置回调函数处理删除的结果
controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted,
new Callbacks.CallbackBuilder().stopReplicaCallback((stopReplicaResponseObj, replicaId) =>
eventManager.put(controller.TopicDeletionStopReplicaResult(stopReplicaResponseObj, replicaId))).build)
if (deadReplicasForTopic.nonEmpty) {
markTopicIneligibleForDeletion(Set(topic))
}
}
}
// 副本删除结果处理
case class TopicDeletionStopReplicaResult(stopReplicaResponseObj: AbstractResponse, replicaId: Int) extends ControllerEvent {
def state = ControllerState.TopicDeletion
override def process(): Unit = {
import JavaConverters._
if (!isActive) return
val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse]
val responseMap = stopReplicaResponse.responses.asScala
val partitionsInError =
if (stopReplicaResponse.error != Errors.NONE) responseMap.keySet
else responseMap.filter { case (_, error) => error != Errors.NONE }.keySet
val replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId))
// 删除副本失败
topicDeletionManager.failReplicaDeletion(replicasInError)
if (replicasInError.size != responseMap.size) {
val deletedReplicas = responseMap.keySet -- partitionsInError
// 删除副本成功
topicDeletionManager.completeReplicaDeletion(deletedReplicas.map(p => PartitionAndReplica(p.topic, p.partition, replicaId)))
}
}
}
// 删除副本成功
def completeReplicaDeletion(replicas: Set[PartitionAndReplica]) {
val successfullyDeletedReplicas = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic))))
// 2.3.1、删除副本成功,将副本状态设置为ReplicaDeletionSuccessful
controller.replicaStateMachine.handleStateChanges(successfullyDeletedReplicas, ReplicaDeletionSuccessful)
// 再次调用resumeDeletions方法
resumeDeletions()
}
// 删除副本失败
def failReplicaDeletion(replicas: Set[PartitionAndReplica]) {
if(isDeleteTopicEnabled) {
val replicasThatFailedToDelete = replicas.filter(r => isTopicQueuedUpForDeletion(r.topic))
if(replicasThatFailedToDelete.nonEmpty) {
val topics = replicasThatFailedToDelete.map(_.topic)
// 2.3.2、删除副本失败,将副本状态设置为ReplicaDeletionIneligible
controller.replicaStateMachine.handleStateChanges(replicasThatFailedToDelete, ReplicaDeletionIneligible)
markTopicIneligibleForDeletion(topics)
// 再次调用resumeDeletions方法
resumeDeletions()
}
}
}
// 所有的副本全部删除成功,解除对该主题分区的监听器,删除ZK和Controller缓存中该主题的信息
private def completeDeleteTopic(topic: String) {
// 1.1.1、解除分区变化监听器
controller.deregisterPartitionModificationsListener(topic)
val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
// 1.1.2、副本状态机将副本状态转移至NonExistentReplica
replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica)
val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)
// 1.1.3、分区状态机将分区状态先转移至OfflinePartition
partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition)
// 1.1.4、分区状态机将分区状态再转移至NonExistentPartition
partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)
topicsToBeDeleted -= topic
partitionsToBeDeleted.retain(_.topic != topic)
val zkUtils = controllerContext.zkUtils
// 1.1.5、删除ZK节点/brokers/topics/<TOPIC_NAME>
zkUtils.zkClient.deleteRecursive(getTopicPath(topic))
// 1.1.6、删除ZK节点/config/topics/<TOPIC_NAME>
zkUtils.zkClient.deleteRecursive(getEntityConfigPath(ConfigType.Topic, topic))
// 1.1.7、删除ZK节点/admin/delete_topics/<TOPIC_NAME>
zkUtils.zkClient.delete(getDeleteTopicPath(topic))
// 1.1.8、移除Controller上下文中该主题相关的信息
controllerContext.removeTopic(topic)
}
private def markTopicForDeletionRetry(topic: String) {
val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible)
// 副本状态机将副本状态由ReplicaDeletionIneligible转移至OfflineReplica
controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica)
}
Controller给Broker发送StopReplica请求停止副本,Broker接受到StopReplica请求后,主要处理逻辑如下:
- 修改log目录名称(后缀添加 .uuid-delete)
- 将待删除的Log添加到logsToBeDeleted队列中,异步删除
def asyncDelete(topicPartition: TopicPartition): Log = {
val removedLog: Log = logCreationOrDeletionLock synchronized {
logs.remove(topicPartition)
}
if (removedLog != null) {
if (cleaner != null) {
cleaner.abortCleaning(topicPartition)
cleaner.updateCheckpoints(removedLog.dir.getParentFile)
}
// 1、修改log目录名称(后缀添加 .uuid-delete,例如将topic_a-2 改成 topic_a-2.d0de6b09d9604f6d9ab89bf7bd413502-delete)
val dirName = Log.logDeleteDirName(removedLog.name)
removedLog.close()
val renamedDir = new File(removedLog.dir.getParent, dirName)
val renameSuccessful = removedLog.dir.renameTo(renamedDir)
if (renameSuccessful) {
checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
removedLog.dir = renamedDir
removedLog.logSegments.foreach(_.updateDir(renamedDir))
// 2、将待删除的Log添加到logsToBeDeleted队列中,异步删除
logsToBeDeleted.add(removedLog)
removedLog.removeLogMetrics()
} else {
throw new KafkaStorageException("Failed to rename log directory from " + removedLog.dir.getAbsolutePath + " to " + renamedDir.getAbsolutePath)
}
}
removedLog
}
def logDeleteDirName(logName: String): String = {
val uniqueId = java.util.UUID.randomUUID.toString.replaceAll("-", "")
s"$logName.$uniqueId$DeleteDirSuffix"
}
Broker在启动时,会启动一个日志管理器,并开启一个删除日志定时任务,代码如下:
def startup() {
...
// 删除日志定时任务(默认周期60s)
scheduler.schedule("kafka-delete-logs",
deleteLogs _,
delay = InitialTaskDelayMs,
period = defaultConfig.fileDeleteDelayMs,
TimeUnit.MILLISECONDS)
...
}
删除日志流程如下:
- 将待删除的Log从logsToBeDeleted队列中取出
- 删除Log
2.1. 遍历删除日志段
2.1.1. 删除.log文件
2.1.2. 删除.index文件
2.1.3. 删除.timeindex文件
2.1.4. 删除.txnindex文件
2.2. 清除缓存
代码如下:
private def deleteLogs(): Unit = {
try {
var failed = 0
while (!logsToBeDeleted.isEmpty && failed < logsToBeDeleted.size()) {
// 1、将待删除的Log从logsToBeDeleted队列中取出
val removedLog = logsToBeDeleted.take()
if (removedLog != null) {
try {
// 2、删除Log
removedLog.delete()
info(s"Deleted log for partition ${removedLog.topicPartition} in ${removedLog.dir.getAbsolutePath}.")
} catch {
case e: Throwable =>
error(s"Exception in deleting $removedLog. Moving it to the end of the queue.", e)
failed = failed + 1
logsToBeDeleted.put(removedLog)
}
}
}
} catch {
case e: Throwable =>
error(s"Exception in kafka-delete-logs thread.", e)
}
}
private[log] def delete() {
lock synchronized {
// 2.1、遍历删除日志段
logSegments.foreach(_.delete())
// 2.2、清除缓存
segments.clear()
leaderEpochCache.clear()
Utils.delete(dir)
}
}
def delete() {
// 2.1.1、删除.log文件
val deletedLog = log.delete()
// 2.1.2、删除.index文件
val deletedIndex = index.delete
// 2.1.3、删除.timeindex文件
val deletedTimeIndex = timeIndex.delete()
// 2.1.4、删除.txnindex文件
val deletedTxnIndex = txnIndex.delete()
...
}