Open Source, Open Future!
  menu
107 文章
ღゝ◡╹)ノ❤️

kafka配置参数整理

生产者

acks

控制着生产者发送消息后,要求 Kafka 服务器返回的确认(acknowledgment)级别。默认为1。

acks 参数可以设置为以下几个值:

  • acks=0:生产者不会等待来自服务器的任何确认。消息一旦发送出去,生产者就会立即认为消息已经被成功写入。这种模式下,生产者的吞吐量最高,但是消息丢失的风险也最大,因为如果服务器在接收消息后崩溃,那么这些消息将会丢失。
  • acks=1:生产者会等待来自 Leader 副本的确认,即当消息被写入 Leader 副本后,生产者就会收到确认。这种模式下,消息的持久性得到了保证,但是仍然存在一定的风险,因为如果在消息被写入 Leader 副本后,但还未被同步到 Follower 副本之前,Leader 副本崩溃,那么消息可能会丢失。
  • acks=all 或 acks=-1:生产者会等待来自所有副本(包括 Leader 和所有 Follower 副本)的确认,即消息需要被写入所有副本并同步后,生产者才会收到确认。这种模式下,消息的持久性和可靠性最高,但是生产者的吞吐量可能会受到影响,因为需要等待所有副本的确认。

调整 acks 参数的值需要根据具体的应用场景和性能需求来进行权衡。如果对消息的持久性和可靠性有很高的要求,可以将 acks 设置为 all 或 -1。如果更关注生产者的吞吐量,可以将 acks 设置为 0。但是请注意,将 acks 设置为 0 可能会增加消息丢失的风险。

batch.size

指定了生产者发送消息时批次(batch)的大小。默认值为16KB。

这个参数对于控制生产者的吞吐量和延迟有着直接的影响。

具体来说,batch.size 定义了生产者在一次发送操作中可以将多少条消息组合成一个批次发送给 Kafka 服务器。当生产者积累的消息总字节数达到这个批次大小时,就会触发一次发送操作。如果生产者的发送速率高于 Kafka 服务器的接收速率,那么增大 batch.size 可以帮助提高吞吐量,因为更多的消息会被组合在一起发送,减少了网络往返时间。

然而,batch.size 的设置也需要谨慎。如果设置得过大,可能会导致生产者内部的缓冲区溢出,进而引发异常。此外,过大的批次大小也可能增加消息丢失的风险,因为在一次发送操作中,如果只有部分消息被成功写入,那么整个批次的消息都可能需要重新发送。

另外,batch.size 的设置还需要与 linger.ms 参数进行权衡。linger.ms 参数用于控制生产者在发送批次之前等待更多消息加入批次的时间。如果 linger.ms 设置得较大,生产者可能会等待更多的消息加入批次,从而增大批次的大小。然而,这也可能增加消息的延迟。

因此,在配置 Kafka 生产者时,需要根据实际的应用场景和性能需求来合理设置 batch.size 参数。通常建议通过实验和监控来确定最佳的批次大小,以在吞吐量和延迟之间达到良好的平衡。同时,还需要注意监控生产者的内部缓冲区使用情况,以避免缓冲区溢出等问题的发生。

buffer.memory

用于指定消息缓冲区大小,单位为字节,默认值为:33554432(32MB)。

当生产者发送消息时,这些消息首先会被存储在内部缓冲区中,直到它们被批量发送到 Kafka 服务器。如果生产者发送消息的速度超过了服务器处理的速度,那么缓冲区可能会迅速填满。为了避免内存溢出错误,buffer.memory 参数限制了缓冲区可以使用的最大内存量。

当缓冲区内存使用量接近 buffer.memory 的限制时,生产者会根据一定的策略来阻塞或丢弃消息,直到缓冲区中有足够的空间来继续存储新的消息。这个策略可以通过配置 blocking.max.ms 参数来控制,它指定了生产者在达到内存限制时最多可以阻塞多长时间。

调整 buffer.memory 的大小可以影响生产者的性能和吞吐量。如果设置得过小,生产者可能会频繁地阻塞或丢弃消息,导致吞吐量下降。如果设置得过大,则可能会浪费内存资源,并增加生产者崩溃的风险。

compression.type

压缩类型,目前支持none(不压缩),gzip,snappy和lz4,默认none。

生产者可以指定使用哪种压缩算法对发送的消息进行压缩。压缩可以在生产者端进行,也可以在 Broker 端进行。压缩的好处是可以用较小的 CPU 开销获得更少的磁盘占用或网络 I/O 传输,从而提高 Kafka 的吞吐量和性能。

需要注意的是,不同的压缩算法有不同的压缩比和性能特点,因此需要根据实际的应用场景和性能需求来选择合适的压缩算法。同时,压缩也会对消息的延迟产生一定影响,因为压缩和解压缩操作需要消耗一定的时间。因此,在配置 compression.type 参数时,需要综合考虑压缩比、性能、延迟等因素。

linger.ms

消息在生产者缓冲区中的最长滞留时间,以毫秒为单位。默认是0,表示不做停留。

当生产者向 Kafka 发送消息时,它会将消息写入缓冲区,并等待一段时间以将多个消息批量发送给 Broker。linger.ms 参数就是用来控制这个等待时间的。如果 linger.ms 设置为 0,那么生产者会立即将单个消息发送给 Broker,不进行任何批量操作。如果 linger.ms 设置为大于 0 的值,那么生产者会定期检查缓冲区中是否已经达到了 batch.size(批量大小)或者 linger.ms 时间,如果满足其中一个条件,生产者就会批量发送所有消息并清空缓冲区。

在实际生产环境中,合理地设置 linger.ms 参数可以有效地提高系统的吞吐率和响应速度。如果 linger.ms 设置得较小,那么生产者会更频繁地发送消息,但每次发送的消息量可能较少,这可能会增加网络开销。如果 linger.ms 设置得较大,那么生产者会等待更多的消息加入批次后再发送,这可能会减少网络开销,但会增加消息的延迟。

max.block.ms

用于控制生产者在发送消息时被阻塞的最长时间。当生产者发送消息到 Kafka 时,如果 Kafka 服务器因为各种原因(如网络延迟、Broker 繁忙等)无法立即处理这些消息,生产者可能会被阻塞一段时间,直到服务器准备好接收消息为止。max.block.ms 就是用来控制这个阻塞时间的上限。

具体来说,当生产者尝试发送消息到 Kafka 服务器时,如果服务器无法立即处理这些消息,生产者会进入一个阻塞状态,等待服务器变得可用。如果在这个阻塞状态中等待的时间超过了 max.block.ms 指定的值,生产者会抛出一个异常,通常是 org.apache.kafka.clients.producer.ProducerFencedException。

这个参数的设置对于控制生产者的行为和性能很重要。如果 max.block.ms 设置得较小,生产者可能会在等待服务器响应时更快地抛出异常,这可能会导致生产者更快地重试发送消息,但也可能增加重试的次数和系统的负担。如果 max.block.ms 设置得较大,生产者可能会等待更长的时间来尝试发送消息,这可能会减少重试的次数,但也可能增加生产者的延迟。

在配置 Kafka 生产者时,需要根据实际的应用场景和性能需求来合理设置 max.block.ms 参数。如果生产者发送的消息量很大,或者 Kafka 服务器的处理能力有限,可能需要增加 max.block.ms 的值来减少生产者抛出异常的概率。然而,如果生产者需要更快速地响应或者对延迟要求较高,可能需要减少 max.block.ms 的值来降低生产者的延迟。

需要注意的是,max.block.ms 参数只控制生产者在发送消息时的阻塞时间,而不控制生产者在接收确认(acknowledgment)时的阻塞时间。生产者接收确认的阻塞时间由 request.timeout.ms 参数控制。

max.in.flight.requests.per.connection

用于控制每个连接到 Kafka Broker 的连接上可以同时处于飞行状态(即等待服务器响应)的请求的最大数量。默认值为5。

当生产者向 Kafka Broker 发送消息时,它会等待服务器对消息的确认(acknowledgment)。在发送一个消息后,生产者可以继续发送下一个消息,而不必等待前一个消息的确认。

max.in.flight.requests.per.connection 参数定义了每个连接到 Broker 的连接上可以同时处于这种等待确认状态的请求的最大数量。

这个参数的设置对于控制生产者的性能和可靠性很重要。如果 max.in.flight.requests.per.connection 设置得较小,生产者会更快地收到每个消息的确认,这有助于降低消息丢失的风险,但可能会降低整体的吞吐量,因为生产者需要等待每个消息的确认后才能继续发送下一个消息。如果 max.in.flight.requests.per.connection 设置得较大,生产者可以同时发送更多的消息而不必等待确认,这可能会提高吞吐量,但也会增加消息丢失的风险,因为在服务器崩溃或网络故障的情况下,未确认的消息可能会丢失。

在配置 Kafka 生产者时,需要根据实际的应用场景和性能需求来合理设置 max.in.flight.requests.per.connection 参数。通常,对于需要高可靠性的场景,可以将其设置为较小的值,以确保每个消息都得到了确认。而对于需要高吞吐量的场景,可以将其设置为较大的值,以允许生产者同时发送更多的消息。

需要注意的是,max.in.flight.requests.per.connection 参数仅适用于生产者端,与消费者端无关。此外,它只控制每个连接到 Broker 的连接上可以同时处于飞行状态的请求数量,而不影响生产者整体可以发送的请求数量。

max.message.bytes

max.message.bytes 是 Apache Kafka 生产者(Producer)和消费者(Consumer)配置中的一个重要参数,用于限制单个消息的最大字节大小。当生产者尝试发送超过此限制的消息时,或者当消费者尝试读取超过此限制的消息时,会抛出异常。

具体来说,max.message.bytes 参数定义了生产者可以发送的最大消息大小,以及消费者可以接收的最大消息大小。这个限制适用于消息的整个大小,包括消息头(如果有的话)和消息体。

在生产者端,如果尝试发送一个超过 max.message.bytes 大小的消息,Kafka 会抛出一个 RecordTooLargeException 异常。在消费者端,如果尝试读取一个超过 max.message.bytes 大小的消息,Kafka 也会抛出一个异常。

这个参数的设置对于控制 Kafka 集群的消息大小和性能很重要。如果 max.message.bytes 设置得较小,可以限制生产者发送的消息大小,从而避免产生过大的消息导致集群性能下降或资源不足。然而,这也可能限制了某些需要发送大消息的应用场景。如果 max.message.bytes 设置得较大,可以允许生产者发送更大的消息,但也可能增加集群的负载和资源消耗。

在配置 Kafka 生产者和消费者时,需要根据实际的应用场景和性能需求来合理设置 max.message.bytes 参数。通常,建议根据消息的实际内容和需求来设置这个参数,以确保消息的大小在合理范围内,同时避免对集群造成过大的压力。

需要注意的是,max.message.bytes 参数的设置应考虑到 Kafka Broker 端的配置,特别是 message.max.bytes 参数。message.max.bytes 是 Broker 端配置中限制消息大小的参数,它应该大于或等于生产者和消费者端的 max.message.bytes 设置,以确保消息能够在整个集群中正常传输和存储。

max.request.size

默认的 1048576 字节(1MB)。

该参数用于控制 producer 发送请求的大小 。 实际上该参数控制的是producer 端能够发送的最大消息大小 。 由于请求有一些头部数据结构,因此包含一条消息的请求的大小要比消息本身大 。 不过姑且把它当作请求的最大尺寸是安全的 。 如果 producer 要发送尺寸很大的消息 , 那么这个参数就是要被设置的 。默认的 1048576 字节太小 了 , 通常无法满足企业级消息的大小要求 。

metadata.fetch.timeout.ms

客户端在获取集群元数据(例如主题、分区和副本信息)时的超时时间。这个参数以毫秒为单位。

当 Kafka 客户端(无论是生产者还是消费者)启动时,或者在运行时需要刷新其关于 Kafka 集群的元数据时,它会向 Kafka Broker 发送请求以获取这些元数据。metadata.fetch.timeout.ms 参数指定了客户端等待 Broker 响应的最长时间。

如果在指定的超时时间内,客户端没有收到来自 Broker 的响应,那么它将抛出一个超时异常。这通常意味着客户端无法与 Kafka 集群通信,可能是因为网络问题、Broker 故障或集群配置错误。

合理地设置 metadata.fetch.timeout.ms 参数对于确保 Kafka 客户端的稳定性和性能至关重要。如果设置得过短,客户端可能会因为短暂的网络波动或 Broker 负载增加而频繁地遇到超时异常。这可能导致客户端不断地重试获取元数据的操作,从而增加集群的负载。

相反,如果设置得过长,客户端可能会等待过长时间才能获取到元数据,这可能导致应用程序的延迟增加。此外,如果 Broker 在超时时间内无法响应,客户端可能会认为整个集群都不可用,即使实际上只有部分 Broker 存在问题。

因此,在配置 Kafka 客户端时,应根据网络条件、Broker 性能和应用程序的需求来合理设置 metadata.fetch.timeout.ms 参数。通常,建议将其设置为一个足够长的时间,以允许客户端在网络波动或 Broker 负载增加的情况下仍然能够获取到元数据,同时也要避免设置得过于冗长,以免增加不必要的延迟。

metadata.max.age.ms

用于控制客户端缓存集群元数据的最大时间长度。

如果在这段时间内,集群的元数据(如分区领导者、副本状态等)发生了变化,客户端可能无法立即知道这些变化,因为它依赖于缓存的元数据。因此,metadata.max.age.ms 的值应该根据集群的动态变化频率和客户端对元数据新鲜度的需求来设置。

metadata.max.age.ms 的默认值取决于 Kafka 的版本和配置,但在许多版本中,它的默认值通常是 300000 毫秒(5 分钟)。

如果设置的值过小,客户端可能会频繁地更新元数据,这可能会增加网络开销和客户端的负载。如果设置的值过大,客户端可能会使用过时的元数据,这可能会导致在元数据发生变化时出现问题(例如,当分区领导者发生变化时,客户端可能仍然尝试向旧的领导者发送消息)。

receive.buffer.bytes

用于设置 TCP 接收缓冲区的大小。这个参数对于 Kafka 生产者和消费者都是可用的,并且它影响了客户端与 Kafka 服务器之间网络通信的性能。

当 Kafka 客户端从服务器接收数据时,数据首先会被存储在这个接收缓冲区中。如果接收到的数据量超过了缓冲区的大小,那么剩余的数据将会等待缓冲区有空闲空间后再继续接收。因此,增大 receive.buffer.bytes 的值可以提高客户端接收数据的能力,从而提高整体的吞吐量。

然而,需要注意的是,盲目地增大接收缓冲区的大小并不一定总是有利的。过大的接收缓冲区可能会导致操作系统消耗更多的内存资源,并可能增加网络拥塞的风险。因此,在调整这个参数时,应该根据实际的应用需求和系统性能来做出合理的选择。

request.timeout.ms

当 producer 发送请求给 broker 后 , broker 需要在规定 的时 间范围 内 将处理结果返还给produce r 。 这段时间便是由该参数控制的,默认是 30 秒 。

这就是说,如果 broker 在 30 秒内都没有给 producer 发送响应,那么 producer 就会认为该请求超时了,并在回调函数中显式地抛出TimeoutException 异常交由用户处理 。

默认的 30 秒对于一般的情况而言是足够的 , 但如果 producer 发送的负载很大 , 超时的情况就很容易碰到,此时就应该适当调整该参数值。

retries

客户端(包括生产者和消费者)在遇到可重试的异常时尝试重新发送请求的次数。

当 Kafka 客户端在执行操作时遇到可重试的异常(如网络故障、Broker 宕机等),它会根据 retries 参数的设置来决定是否重新尝试该操作。

对于生产者(Producer):

retries 参数定义了生产者在发送消息时遇到可重试异常时尝试重新发送消息的次数。例如,如果生产者发送消息到 Broker 时网络突然中断,它会尝试重新发送消息,直到达到 retries 参数指定的次数。如果所有重试都失败,生产者会抛出一个异常,通常是一个 KafkaException。

对于消费者(Consumer):

retries 参数同样适用于消费者,它定义了消费者在拉取消息或提交偏移量时遇到可重试异常时尝试重新操作的次数。例如,如果消费者在拉取消息时遇到网络问题,它会尝试重新拉取,直到达到 retries 参数指定的次数。

合理地设置 retries 参数对于确保消息的可靠性和系统的稳定性非常重要。如果设置得太低,客户端可能会在遇到短暂的网络波动或 Broker 宕机时过早地放弃,导致消息丢失或系统不稳定。如果设置得太高,可能会导致过多的重试,增加系统的延迟和负载。

在配置 retries 参数时,应该考虑到网络条件、Broker 的稳定性和性能需求。通常,建议将 retries 设置为一个相对较大的值(如几十次),以便在短暂的故障情况下能够成功发送或拉取消息。同时,也可以结合其他参数,如 retry.backoff.ms(重试之间的退避时间),来避免过于频繁的重试。

需要注意的是,retries 参数只是控制可重试异常的重试次数,对于一些不可重试的异常(如消息大小超过限制、无效的请求参数等),即使设置了 retries 也不会进行重试。

send.buffer.bytes

用于设置 TCP 发送缓冲区的大小。这个参数对于 Kafka 生产者(Producer)是特别重要的,因为它影响了生产者将数据发送到 Kafka 服务器时的网络性能。

当 Kafka 生产者向服务器发送数据时,数据首先会被写入这个发送缓冲区。然后,数据从缓冲区中取出并发送到网络上。如果生产者发送数据的速度超过了网络传输的速度,那么发送缓冲区可能会填满。在这种情况下,生产者会等待缓冲区中有足够的空间来继续发送数据,或者直到达到配置的 send.buffer.bytes 的大小限制。

增大 send.buffer.bytes 的值可以允许生产者一次性发送更多的数据,这可能会提高吞吐量,特别是在高带宽和低延迟的网络环境中。然而,过大的发送缓冲区可能会导致操作系统消耗更多的内存资源,并且如果生产者发送数据的速度持续超过网络传输的速度,那么增大缓冲区大小可能并不会带来太大的性能提升。

在配置 send.buffer.bytes 时,你应该考虑以下几个因素:

  • 网络带宽:较高的网络带宽允许更大的发送缓冲区,以便更有效地利用网络资源。
  • 生产者发送速率:如果生产者发送数据的速度非常快,那么可能需要增加发送缓冲区的大小来避免数据丢失。
  • 操作系统限制:操作系统通常会对单个套接字的缓冲区大小有上限。确保你设置的 send.buffer.bytes 值不会超过这些限制。
  • 内存使用:增大发送缓冲区大小会增加生产者的内存使用。确保你的应用程序有足够的内存来支持更大的缓冲区。

总之,调整 send.buffer.bytes 参数应该根据你的具体需求和系统性能来进行,并可能需要进行实验来确定最佳配置。

timeout.ms

在客户端(生产者和消费者)在执行某些操作时等待响应或完成的最大时间。

这个参数的具体名称和用途可能会根据上下文和 Kafka 版本的不同而有所变化。

对于生产者(Producer):

timeout.ms 可能用于指定生产者发送消息到 Broker 时等待确认(acknowledgment)的最长时间。如果在这个时间内没有收到确认,生产者可能会重新发送消息或抛出异常。

对于消费者(Consumer):

timeout.ms 可能用于指定消费者拉取数据或执行其他操作时等待响应的最长时间。如果在这个时间内没有收到响应,消费者可能会抛出异常或触发重连逻辑。

此外,Kafka 中还有其他与超时相关的参数,如 request.timeout.ms(请求超时时间)和 metadata.fetch.timeout.ms(元数据获取超时时间)等。这些参数的具体含义和用途可能因版本和上下文而异,因此在实际使用时需要参考 Kafka 的官方文档或相关资源来了解其准确含义和推荐设置。

需要注意的是,timeout.ms 参数的设置应该根据实际的网络条件、Broker 性能以及客户端需求来进行调整。如果设置得太短,可能会导致频繁的超时和重试,增加系统负载和延迟;如果设置得太长,可能会导致客户端等待过长时间,影响系统的响应性能。因此,需要根据具体情况来合理设置这些参数的值。

Broker

auto.leader.rebalance.enable

是否自动Leader Partition平衡,默认是true。

leader.imbalance.per.broker.percentage

每个Broker允许的不平衡的Leader的比率。如果每个Broker超过了这个值,控制器会触发Leader的平衡。默认是10%

leader.imbalance.check.interval.seconds

检查Leader负载是否平衡的间隔时间,默认值300秒。

log.cleanup.policy

表示所有数据启用删除策略;如果设置值为compact,表示所有数据启用压缩策略,默认是delete。

log.flush.interval.messages

强制页缓存刷写到磁盘的条数,默认是long的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。

log.flush.interval.ms

每隔多久,刷数据到磁盘,默认是null。一般不建议修改,交给系统自己管理。

log.index.interval.bytes

Kafka每当写入4KB大小的日志(.log),然后就往index文件里面记录一个索引。默认4KB。

log.retention.hours

Kafka中数据保存的时间,默认7天。

log.retention.minutes

Kafka中数据保存的时间,分钟级别,默认关闭。

log.retention.ms

Kafka中数据保存的时间,毫秒级别,默认关闭。

log.retention.check.interval.ms

检查数据是否保存超时的间隔,默认是5分钟。

log.retention.bytes

超过设置的所有日志总大小,删除最早的segment。默认等于-1,表示无穷大。

log.retention.check.interval.ms

检查数据是否保存超时的间隔,默认是5分钟。

log.segment.bytes

Kafka中log日志是分成一块块存储的,此配置是指log日志划分 成块的大小,默认值1G。

message.max.bytes

一个批次的消息大小,一个批次当中会包含多条消息,指的是生产者可以进行消息批次发送,提高吞吐量。

num.io.threads

负责写磁盘的线程数。整个参数值要占总核数的50%,默认是8

num.network.threads

数据传输线程数,这个参数占总核数的50%的2/3 ,默认是3。

num.replica.fetchers

副本拉取线程数,这个参数占总核数的50%的1/3

replica.lag.time.max.ms

ISR中,如果一个 follower 副本落后 leader 的时间持续性地超过了这个参数值,那么该 follower 副本就是“不同步”的。则该Follower将被踢出ISR。该时间阈值,默认10s。

segment.bytes

定义了日志段(log segment)文件的大小。默认值通常是 1GB(即 1073741824 字节)。

在 Kafka 中,日志是以段(segment)为单位进行存储的,每个段是一个有序的、不可变的消息日志记录序列。这意味着当 Kafka 的日志大小达到这个配置值时,会创建一个新的日志段来存储后续的消息。

较小的 segment.bytes:

优点:较小的段大小意味着更频繁的段滚动(segment rollover),这有助于减少单个段的文件大小,从而可能减少在读取或删除旧消息时的延迟。

缺点:频繁的段滚动会增加日志管理的开销,因为需要创建新的文件和处理旧的文件。同时,更多的文件意味着更多的文件句柄和可能的文件系统缓存压力。

较大的 segment.bytes:

优点:较大的段大小可以减少段滚动的频率,从而减少日志管理的开销。同时,更大的段文件可能更适合于文件系统缓存,从而提高读取性能。

缺点:较大的段大小意味着在删除旧消息时可能需要更多的时间和磁盘空间。此外,如果单个段的大小过大,可能导致在消息读取或处理时出现性能瓶颈。

在配置 Kafka 时,需要根据实际的工作负载、磁盘性能和存储容量等因素来权衡 segment.bytes 的大小。如果你有一个高吞吐量的生产者,并且希望减少文件句柄的开销和磁盘空间的占用,可以考虑增加 segment.bytes 的值。如果你希望减少读取旧消息的延迟,并且磁盘空间充足,可以考虑减小 segment.bytes 的值。

消费者

auto.commit.interval.ms

如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s。

auto.offset.reset

当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理?

  • earliest:自动重置偏移量到最早的偏移量。
  • latest:默认,自动重置偏移量为最新的偏移量。
  • none:如果消费组原来的偏移量不存在,则向消费者抛异常。

bootstrap.servers

向Kafka集群建立初始连接用到的host/port列表。

connections.max.idle.ms

有用户抱怨在生产环境下周期性地观测到请求平均处理时间在飘升,这很有可能是因为 Kafka 会定期地关闭空闲 Socket 连接导致下次 consumer 处理请求时需要重新创建连向 broker 的 Socket 连接 。 当前默认值是 9 分钟,如果用户实际环境中不在乎这些 Socket 资源开销,比较推荐设置该参数值为- 1 ,即不要关闭这些空闲连接 。

enable.auto.commit

消费者会自动周期性地向服务器提交偏移量,默认值为true。

fetch.max.bytes

默认值: 52428800字节,即50MB。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。

fetch.max.wait.ms

如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。默认500ms。

fetch.min.bytes

消费者获取服务器端一批消息最小的字节数。默认1个字节。

group.id

标记消费者所属的消费者组。

heartbeat.interval.ms

Kafka消费者和coordinator之间的心跳时间,默认3s。该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的1/3。

key.deserializer

指定接收消息的key的反序列化类型。一定要写全类名。

max.poll.interval.ms

消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。

在一个典型的 consumer 使用场景中,用户对于消息的处理可能需要花费很长时间。这个参数就是用于设置消息处理逻辑的最大时间的 。

假设用户的业务场景中消息处理逻辑是把消息、“落地”到远程数据库中,且这个过程平均处理时间是 2 分钟,那么用户仅需要将 max.poll.interval.ms 设置为稍稍大于 2 分钟的值即可,而不必为 session.time.ms 也设置这么大的值。

通过将该参数设置成实际的逻辑处理时间再结合较低的 session.timeout.ms 参数值,consumer group 既实现了快速的 consumer 崩溃检测,也保证了复杂的事件处理逻辑不会造成不必要的 rebalance 。

max.poll.records

一次poll拉取数据返回消息的最大条数,默认500条。

offsets.topic.num.partitions

__consumer_offsets的分区数,默认是50个分区。

session.timeout.ms

Kafka消费者和coordinator之间连接超时时间,默认 10 秒。超过该值,该消费者被移除,消费者组执行再平衡。

coordinator 检测失败的时间,因此在实际使用中,用户可以为该参数设置一个比较小的值让 coordinator 能够更快地检测 consumer 崩溃的情况,从而更快地开启 rebalance,避免造成更大的消费滞后( consumer lag ) 。

value.deserializer

指定接收消息的value的反序列化类型。一定要写全类名。

broker配置文件

broker 端参数需要在 Kafka 目录下的 config/server.properties 文件中进行设置 。

当前对于绝大多数的 broker 端参数而言, Kafka 尚不支持动态修改一一这就是说,如果要新增、修改,抑或是删除某些 broker 参数的话,需要重启对应的 broker 服务器。

主要的参数配置如下:

1、broker.id

Kafka 使用唯一 的一个整数来标识每个 broker,该参数默认是一1 。

如果不指定, Kafka 会自动生成一个唯一值 。

在实际使用中,推荐使用从 0 开始的数字序列,如 0、 l 、 2……

2、log.dirs

该参数指定了 Kafka 持久化消息的目录 。

若不设置该参数, Kafka 默认使用/tmp/kafka-logs 作为消息保存的目录。把消息保存在 /tmp 目 录下,在实际的生产环境中是极其不可取的。

若待保存的消息数量非常多,那么最好确保该文件夹下有充足的磁盘空间。该参数可以设置多个目录,以逗号分隔,比如/home/kafkal ,/home/kafka2 。在实际使用过程中,指定多个目录的做法通常是被推荐的,因为这样 Kafka 可以把负载均匀地分配到多个目录下。若用户机器上有 N 块物理硬盘(并且假设这台机器完全给 Kafka 使用),那么设置 N个目录(须挂载在不同磁盘上的目录)是一个很好的选择。 N 个磁头可以同时执行写操作,极大地提升了吞吐量。注意,这里的“均匀”是根据目录下的分区数进行比较的,而不是根据实际的磁盘空间。

3、zookeeper.connect

该参数也可以是一个 CSV (comma-separated values )列表,比如在前面的例子中设置的那样: zkl :2181,zk2:2181,zk3:2181 。如果要使用 一套 ZooKeeper环境管理多套 Kafka 集群,那么设置该参数的时候就必须指定 ZooKeeper 的 chroot,比如 zkl :218 l ,zk2:2181,zk3:2181/kafka_clusterl o 结尾的/kafka_cluster 1 就是 chroot,它是可选的配置,如果不指定则默认使用 ZooKeeper 的根路径 。 在实际使用过程中,配置 chroot 可以起到很好的隔离效果。这样管理 Kafka 集群将变得更加容易。

4、listeners

broker 监听器的 csv 列表,格式是[协议]://[主机名 ]:[端口],[[协议]]://[主机

名]:[端口]]。

该参数主要用于客户端连接 broker 使用,可以认为是 broker 端开放给 clients的监听端口 。 如果不指定主机名,则表示绑定默认网卡:如果主机名是 0.0.0.0,则表示绑定所有网卡。

Kafka 当前支持的协议类型包括 PLAINTEXT、 SSL 及 SASL SSL 等。对于新版本的 Kafka,笔者推荐只设置 listeners 一个参数就够了,对于已经过时的两个参数host.name 和 p。此,就不用再配置了 。 对于未启用安全的 Kafka 集群,使用 PLAil汀EXT协议足矣。如果启用了安全认证,可以考虑使用 SSL 或 SASL_SSL 协议。

5、delete.topic.enable

是否允许 Kafka 删除 topic。

默认情况下, Kafka 集群允许用户删除 topic 及其数据。这样当用户发起删除 topic 操作时, broker 端会执行 topic 删除逻辑。

在实际生产环境中我们发现允许 Kafka 删除 topic 其实是一个很方便的功能,再加上自Kafka 0 .9 .0.0 新增的 ACL 权限特性,以往对于误操作和恶意操作的担心完全消失了,因此设置该参数为 true 是推荐的做法。

6、log.retention. {hourslminuteslms}

这组参数控制了消息数据的留存时间。

它们是“三兄弟参数,用户可以很方便地在 3 个时间维度上设置日志的留存时间。默认的留存时间是7 天,即 Kafka 只会保存最近 7 天的数据,井自动删除 7 天前的数据。当前较新版本的Kafka 会根据消息的时间戳信息进行留存与否的判断 。 对于没有时间戳的老版本消息格式, Kafka 会根据日志文件的最近修改时间( last mo~ified time )进行判断。可以这样说,这组参数定义的是时间维度上的留存策略。实际线上环境中,需要根据用户的业务需求进行设置。保存消息很长时间的业务通常都需要设置一个较大的值。

7、log.retention.bytes

如果说上面那组参数定义了时间维度上的留存策略,那么这个参数便定义了空间维度上的留存策略。

即它控制着 Kafka 集群需要为每个消息日志保存多大的数据 。对于大小超过该参数的分区日志而言 , Kafka 会自动清理该分区的过期日志段文件。

该参数默认值是 -1,表示 Kafka 永远不会根据消息日志文件总大小来删除日志。和上面的参数一样,生产环境中需要根据实际业务场景设置该参数的值 。

8、num.network.threads

它控制了 一个 broker 在后台用于处理网络请求的线程数,默认是 3 。

通常情况下, broker 启动时会创建多个线程处理来自其他broker 和 clients 发送过来的各种请求。注意,这里的“处理”其实只是负责转发请求,它会将接收到的请求转发到后面的处理线程中 。 在真实的环境中,用户需要不断地监控 NetworkProcessorAvgldlePercent 几仅指标 。 如果该指标持续低于 0 .3 ,笔者建议适当增加该参数值。

9、num.io.threads

这个参数就是控制 broker 端实际处理网络请求的线程数,默认值是8 ,即 Kafka broker 默认创建 8 个线程以轮询方式不停地监昕转发过来的网络请求井进行实时处理。 Kafka 同样也为请求处理提供了 一 个 JMX 监控指标 RequestHandlerAvgldlePercent 。如果发现该指标持续低于 0.3 ,则可以考虑适当增加该参数的值。

10、message.max.byes

Kafka broker 能够接收的最大消息大小,默认是 977KB,还不到lMB ,可见是非常小的。在实际使用场景中,突破 lMB 大小的消息十分常见,因此用户有必要综合考虑 Kafka 集群可能处理的最大消息尺寸井设置该参数值 。

JVM 参数

kafka 推荐用户使用最新版本的 JDK,另外鉴于 Kafka broker 主要使用的是堆外内存,即大量使用操作系统的页缓存,因此其实并不需要为JVM分配太多的内存。在实际使用中,通常为 broker 设置不超过 6GB 的堆空间 。 以下就是一份典型的生产环境中的JVM参数列表:

-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseGlGC -XX:MaxGCPauseMillis=20

-XX: InitiatingHeapOccupancyPercent=35 - XX : G1HeapRegionSize=16M -XX: MinMetaspaceFreeRatio=50 -XX: MaxMetaspaceFreeRatio=80

OS 参数

通常情况下, Kafka 并不需要太多的 OS 级别的参数调优,但依然有一些 OS 参数是必须要调整的 。

1、文件描述符限制

Kafka 会频繁地创建井修改文件系统中的文件,这包括消息的日志文件、索引文件及各种元数据管理文件等。因此如果一个 broker 上面有很多 topic 的分区,那么这个 broker 势必就需要打开很多个文件一一大致数量约等于分区数 × (分区总大小/日志段大小〉 × 3。举一个例子,假设 broker 上保存了 50 个分区,每个分区平均尺寸是 10GB,每个日志段大小是1GB,那么这个 broker 需要维护 1500 个左右的文件描述符。因此在实际使用场景中最好首先增大进程能够打开的最大文件描述符上限,比如设置一个很大的值,如 100000。

具体设置方法为 ulimit -n 100000 。

2、Socket 缓冲区大小

这里指的是 OS 级别的 Socket 缓冲区大小,而非 Kafka 自己提供的 Socket 缓冲区参数 。 事实上, Kafka 自己的参数将其设置为 64KB,这对于普通的内网环境而言通常是足够的,因为内网环境下往返时间( round-trip time, RTT ) 一般都很低,不会产生过多的数据堆积在 Socket 缓冲区中,但对于那些跨地区的数据传输而言,仅仅增加 Kafka 参数就不够了,因为前者也受限于 OS 级别的设置 。 因此如果是做远距离的数据传输,那么建议将 OS 级别的 Socket 缓冲区调大,比如增加到 128KB,甚至更大。

3、最好使用 Ext4 或×FS 文件系统

其实 Kafka 操作的都是普通文件,并没有依赖于特定的文件系统,但依然推荐使用 Ext4 或 XFS 文件系统,特别是 XFS 通常都有着更好的性能。这种性能的提升主要影响的是 Kafka 的写入性能。根据官网的测试报告,使

用 XFS 的写入时间大约是 160 毫秒,而使用 Ext4 大约是 250 毫秒。因此生产环境中最好使用 XFS 文件系统。

4、关闭 swap

其实这是很多使用磁盘的应用程序的常规调优手段,具体命令为 sysctlvm.swappiness = <一个较小的数〉,即大幅度降低对 swap 空间的使用,以免极大地拉低性能。后面的章节中会详细讨论为何不显式设置该值为 0 。

5、设置更长的 flush 时间

我们知道 Kafka 依赖 OS 页缓存的“刷盘”功能实现消息真正写入物理磁盘,默认的刷盘问隔是 5 秒。通常情况下,这个间隔太短了,适当增加该值可以在很大程度上提升 OS 物理写入操作的性能。 Linkedin 公司自己就将该值设置为2 分钟以增加整体的物理写入吞吐量 。

无消息丢失配置

Java 版本 producer 用户采用异步发送机制。 KafkaProducer.send 方法仅仅把消息放入缓冲区中,由一个专属 1/0 线程负责从缓冲区中提取消息井封装进消息 batch 中,然后发送出去。

显然,这个过程中存在着数据丢失的窗口:若 1/0 线程发送之前 producer 崩溃,则存储缓冲区中的消息全部丢失了。这是 producer 需要处理的很重要的问题。

producer 的另一个问题就是消息的乱序 。假设客户端依次执行下面的语句发送两条消息到相同的分区。

producer.send(recordl);

producer.send(record2);

若此时由于某些原因(比如瞬时的网络抖动〉导致 record I 未发送成功,同时 Kafka 又配置了重试机制以及 max.in.flight.requests.per.connection 大于 1 (默认值是 5 ),那么 producer 重试 record I 成功后, record I 在日志中的位置反而位于 record2 之后,这样造成了消息的乱序。

要知道很多实际使用场景中都有事件强顺序保证的要求。鉴于 producer 的这两个问题,应该如何规避呢?

首先,对于消息丢失的问题,很容易想到的一个方案就是:既然异步发送可能丢失数据,改成同步发送似乎是一个不错的主意。比如这样:

producer.send(record) . get();

采用同步发送当然是可以的,但是性能会很差,并不推荐在实际场景中使用。因此最好能有一份配置,既使用异步方式还能有效地避免数据丢失,即使出现 producer 崩溃的情况也不会有问题。

producer 端

• block.on.buffer.full =true

实际上这个参数在 Kafka 0.9.0.0 版本已经被标记为“ deprecated”,并使用 max.block.ms参数替代,但这里还是推荐用户显式地设置它为 true,使得内存缓冲区被填满时 producer 处于阻塞状态并停止接收新的消息而不是抛出异常;否则 producer 生产速度过快会耗尽缓冲区。新版本 Kafka ( 0.10.0.0 之后)可以不用理会这个参数,转而设置 max.block.ms 即可 。

• acks = all or -1

设置 acks 为 all 很容易理解,即必须要等到所有 follower 都响应了发送消息才能认为提交成功,这是 producer 端最强程度的持久化保证。

• retries = Integer.MAX_VALUE

设置成 MAX_VALUE 纵然有些极端,但其实想表达的是 producer 要开启无限重试 。 用户不必担心 producer 会重试那些肯定无法恢复的错误,当前 producer 只会重试那些可恢复的异常情况,所以放心地设置一个比较大的值通常能很好地保证消息不丢失。

• max.in.flight.requests.per.connection= 1

设置该参数为 1 主要是为了防止 topic 同分区下的消息乱序问题。这个参数的实际效果其实限制了 producer 在单个 broker 连接上能够发送的未响应请求的数量。因此,如果设置成 1 ,则 producer 在某个 broker 发送响应之前将无法再给该 broker 发送 PRODUCE 请求 。

• 使用带回调机制的 send 发送消息,即 KafkaProducer.send(record, callback)

不要使用 KafkaProducer 中单参数的 send 方法,因为该 send 调用仅仅是把消息发出而不会理会消息发送的结果。如果消息发送失败,该方法不会得到任何通知,故可能造成数据的丢失 。实际环境中一定要使用带回调机制的 send 版本,即 KafkaProducer.send(record, callback) 。

• Callback 逻辑中显式地立即关闭 producer,使用 close(0)

在 Callback 的失败处理逻辑中显式调用 KafkaProducer.close(0) 。 这样做的目的是为了处理消息的乱序问题 。 若不使用 close(0),默认情况下 producer 会被允许将未完成的消息发送出去,这样就有可能造成消息乱序 。

broker 端配置

• unclean.leader.election.enable= false

关闭 unclean leader 选举,即不允许非 ISR 中的副本被选举为 leader,从而避免 broker 端因日志水位截断而造成的消息丢失 。

• replication.factor = 3

设置成 3 主要是参考了 Hadoop 及业界通用的三备份原则,其实这里想强调的是一定要使用多个副本来保存分区的消息。

• min.insync.replicas = 2

用于控制某条消息至少被写入到 ISR 中的多少个副本才算成功,设置成大于 1 是为了提升producer 端发送语义的持久性。记住只有在 producer 端 acks 被设置成 all 或-1时,这个参数才有意义。在实际使用时,不要使用默认值 。

• replication.factor > min.insync.replicas

若两者相等,那么只要有一个副本挂掉,分区就无法正常工作,虽然有很高的持久性但可用性被极大地降低了 。 推荐配置成 replication.factor= min.insyn.replicas +1 。

• enable.auto.commit= false