1、基本概念
主题
Topic,kafka中的消息以主题为基本单位来归类,各个主题逻辑上相互独立。
分区
Partition,每个主题又可以分为一个或多个分区,每条消息在发送时会根据分区规则确定分区号,然后追加到对应的分区中。
副本
Replica,为了实现高可用性采用的冗余机制,简单的说,就是备份多份日志。
分段
Segment,一个分区对应一个日志(Log),为了防止Log过大,将日志切分成多个Segment。
关系如下:
<br>
2、文件目录
比如创建一个主题:topic_a,分区数:2,副本数:1,则文件目录如下:
Log对应一个文件夹,命名形式为:<topic>
- <partition>
文件夹下有以下文件:
文件作用
.log文件用来存储消息,为了便于消息的检索,每个.log文件都有对应的两个索引文件:偏移量索引文件(.index后缀的文件)和时间戳索引文件(.timeindex后缀的文件)。
命名规则
每个Segment都有一个基准偏移量baseOffset,用来表示当前Segment中的第一条消息的偏移量。偏移量是一个64位的长整型数。日志文件和两个索引文件都是根据基准偏移量命名的,名称固定为20位数字。
3、底层原理
3.1、存储
Kafka将消息封装成一个个Record,并以自定义的格式序列化成紧凑的二进制字节数组,按照顺序保存到日志中。
如果每个分区只有一个日志文件,对于消息的检索和过期清除都是一个大难题,因此 Kafka 会将每个分区的日志文件继续切分成若干个日志文件,这些日志文件也称作日志段文件(log segment file)。
每个日志段文件都会伴随一个索引文件和时间戳索引文件,在 broker 所属节点打开对应分区日志的目录,可以看到相关文件:
每个segment包含了 .log | .index | .timeindex 3个文件,而且名字都是相同的。
- log 文件
.log 后缀文件保存了 Kafka 消息的记录,而且每个 log 文件都有对应的消息记录范围,名字的数字代表了消息记录的初始位移值,并且随着消息数量的增多而增大,因此,每个新创建的分区一定会包含 0 的 log 文件。Kafka 文件名字使用了 20 位来标识位移。
每个 log 文件的默认大小为 1 GB,可以通过 log.segment.bytes 参数进行控制,每当 log 文件被填满后,Kafka 会自动创建一组新的日志文件和索引文件,也就是说日志段文件一旦被填满后,就不会再继续写入新消息,而是写到新的日志段文件中,而当前可被写入消息的日志段文件也称作当前日志段文件,它是一种特殊的日志段文件,它不会受到 Kafka 任何后台任务的影响,比如日志过期清除、日志 compaction 等任务。
- 索引文件
每个 log 文件都会包含两个索引文件,分别是 .index 和 .timeindex,在 Kafka 中它们分别被称为位移索引文件和时间戳索引文件,位移索引文件可根据消息的位移值快速地从查询到消息的物理文件位置,时间戳索引文件可根据时间戳查找到对应的位移信息。
Kafka 的索引文件按照稀疏索引文件的思想进行设计的,每个索引文件包含若干条索引项。稀疏索引的核心是不会为每个记录都保存索引,而是写入一定的记录之后才会增加一个索引值,具体这个间隔有多大则通过 log.index.interval.bytes 参数进行控制,默认大小为 4 KB,意味着 Kafka 至少写入 4KB 消息数据之后,才会在索引文件中增加一个索引项。
需要注意的一点是,消息大小还会影响 Kakfa 索引的插入频率,假设每个消息大小均大于 4 KB,会导致每次追加消息的时候,都会伴随一次索引项的增加。因此 log.index.interval.bytes 也是 Kafka 调优一个重要参数值。
每个日志段的索引文件可通过 log.index.size.max.bytes 参数控制,默认大小为 10 MB。
3.2、索引
为了便于消息的检索,每个日志文件都有对应的两个索引文件,那么既然有了索引文件,Kafka 是如何根据索引文件进行快速检索的呢?由于索引文件也是按照消息的顺序性进行增加索引项的,位移索引文件按照位移顺序保存,而时间戳索引文件则按照时间顺序保存索引项,因此 Kafka 可以利用二分查找算法来搜索目标索引项,把时间复杂度降到了 O(lgN),大大减少了查找的时间。
3.2.1、位移索引
索引项结构
位移索引文件的索引项结构如下:
每个索引项的大小为 8 bytes,索引文件大小必须是索引项的整数倍(即8的整数倍),比如设置 log.index.size.max.bytes = 100,则 Kafka 会创建一个大小为 96 bytes 的索引文件。
相对位移
保存的是与索引文件起始位移的差值。比如一个索引文件为:00000000000000000050.index,那么起始位移值即 50,当存储位移为60的消息索引时,在索引文件中的相对位移则为 60 - 50 = 10,通过保存差值,只需要4字节而非保存整个位移的8字节,可以节省很多磁盘空间。
文件物理位置
消息在 log 文件中保存的位置,也就是说 Kafka 可根据消息位移,通过位移索引文件快速找到消息在 log 文件中的物理位置,有了该物理位置的值,我们就可以快速地从 log 文件中找到对应的消息了。
检索过程
假设要找出位移为 3670 的消息,那么 Kafka 首先会使用二分查找算法找到小于 3670 的最大索引项:<3026, 2070021>,得到索引项之后,Kafka 会根据该索引项的文件物理位置在 log 文件中从位置 2070021 开始顺序查找,直至找到位移为 3670 的消息记录。
如果想要增加索引项的密度,可以减少broker端的参数 log.index.interval.bytes的值。
3.2.2、时间戳索引
Kafka 在 0.10.0.0 以后的版本,消息中增加了时间戳信息,为了满足用户需要根据时间戳查询消息记录的需求,Kafka 增加了时间戳索引文件。
索引项结构
每个索引项的大小为 12 bytes,同样的,时间戳索引文件大小也必须为索引项的整数倍大小,计算方式与位移索引文件相同。
检索过程
假设要查询时间戳为 1459388086725 附近的消息,根据二分算法找到时间戳索引项 <1459388085342, 3587>,然后根据位移值从位移索引文件中找到小于 3587 位移的最大索引项<3026, 2070021>。从2070021字节处开始的消息就是满足该时间戳条件的消息。