Kafka学习笔记
小轲 Lv3

Kafka基本的概念

Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,**基于zookeeper协调的分布式日志系统(也可以当做MQ系统)**,常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
image

Kafka部分名词解释如下

  • Broker:消息中间件处理结点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。
  • Topic:一类消息,例如page view日志、click、短信、日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。
  • Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。相当于分区
  • Segment:partition物理上由多个segment组成
  • offset:每个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每个消息都有一个连续的序列号叫做offset,用于partition唯一标识一条消息.
  • Producer:消息的生产者,负责向Broker中投递消息
  • Consumer:消息的消费者,负责从Broker中拉取消息
  • Consumer Group:消费者组,在同一个消费者组中是不能够消费同一个分区中的消息的。在多个不同的消费组中,多个不同的消费者可以消费同一条消息

image

Kafka消息队列模型

kafka的消息队列一般分为两种模式:点对点和订阅模式

点对点模式

Kafka 是支持消费者群组的,也就是说 Kafka 中会有一个或者多个消费者,如果一个生产者生产的消息由一个消费者进行消费的话,那么这种模式就是点对点模式
image

发布订阅模式

如果一个生产者或者多个生产者产生的消息能够被多个消费者同时消费的情况,这样的消息队列成为发布订阅模式的消息队列
image

Kafka有哪些特性致使它性能这么高?

  • 顺序读写
  • 零拷贝
  • 消息压缩
  • 分批发送

Kafka的设计亮点

  1. 高吞吐、低延迟:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒;
  2. 高伸缩性:每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中;
  3. 持久性、可靠性:Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 我们知道它的数据能够持久存储;
  4. 容错性:允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作;
  5. 高并发:支持数千个客户端同时读写。

kafka的设计原理

kafka文件存储原理

  1. Kafka实际上就是日志消息存储系统, 根据offset获取对应的消息,消费者获取到消息之后

该消息不会立即从mq中移除。

  1. 将topic分成多个不同的分区、每个分区中拆分成多个不同的segment文件存储日志。
  2. 每个segment文件会有
    • .index 消息偏移量索引文件
    • .log文件 消息物理存放的位置

在默认的情况下,每个segment文件容量最大是为500mb,如果超过500mb的情况下依次内推,产生一个新的segment文件

topic中partition存储分布

假设一个kafka集群中只有一个broker,opt/keshao/message-data为数据文件存储目录,在Kafka broker中的server.properties文件配置(参数:log.dirs),例如创建2个topic名称分别为report_pushlaunch_info,partitions数量都为partitions=4

1
2
3
4
5
6
7
8
|--report_push-0
|--report_push-1
|--report_push-2
|--report_push-3
|--launch_info-0
|--launch_info-1
|--launch_info-2
|--launch_info-3

在Kafka文件存储中,同一个topic下有多个不同partition,每个partition为一个目录,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。 如果是多broker分布情况,请参考kafka集群partition分布原理分析

partition中文件存储方式

  • 每个partiton中相当于一个巨型文件被平均分配到多个大小相等segment(段)数据中。但每个段segment file消息数量不一定相等,这种特性方便old segment被快速检索删除
  • 每个partiton只需要顺序读写就可以了,segment文件生命周期由服务端配置参数决定。
  • 这样做的好处就是能快速删除无用文件,有效提高磁盘利用率。

image

partiton中segment文件存储结构

  • segment file由2大部分组成,分别index file和data file,两个文件一一对应,成对出现。后缀.index.log分别表示为segment索引文件、数据文件;
  • segment文件命名规则:partiton全局的第一个segment0开始,后续每个setment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充;

文件的是Kafka broker中做的一个实验,创建一个topicXXX包含1 partiton,设置每个segment大小为500MB,并启动producer向Kafka broker写入大量数据
image
segment file中,index与file的物理关系如下:
image
查找顺序:
查找offset=6的消息的流程如下

  1. 二分查找算法:查找到该分区中所有的Segment文件 list排序 每个Segment文件都是有一个命名规范,offset=7在我们的Segment文件中,此处定位到index文件
  2. 先访问该index文件,根据offset值查询到物理存放位置,Offset=7>6<9 所以定位到offset=6 获取到物理存放位置1407
  3. 根据该物理存放位置9807 去对应的log文件查找消息,依次向下查找+1次 获取到offset=7的消息。

为什么kafka中的 索引文件没有对每个消息建立索引呢?

  1. 目的是为了节约我们空间的资源
  2. 稀疏索引算法+二分查找算法,定位到位置,在根据顺序遍历查找。
    如果该offset消息 没有对应的索引的情况下,时间复杂度是为多少:(ON)
    如果该offset消息 有对应的索引的情况下,时间复杂度是为多少:(O1)

message的存储方式

表格中列出了message的物理结构:

关键字 解释说明
8 byte offset 在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它可以唯一确定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message
4 byte message size message大小
4 byte CRC32 用crc32校验message
1 byte “magic” 表示本次发布Kafka服务程序协议版本号
1 byte “attributes” 表示为独立版本、或标识压缩类型、或编码类型。
4 byte key length 表示key的长度,当key为-1时,K byte key字段不填
K byte key 可选
value bytes payload 表示实际消息数据。

小结

Kafka运行时很少有大量读磁盘的操作,主要是定期批量写磁盘操作,因此操作磁盘很高效。这跟Kafka文件存储中读写message的设计是息息相关的。Kafka中读写message有如下特点:
写message

  • 消息从java堆转入page cache(即物理内存)。
  • 由异步线程刷盘,消息从page cache刷入磁盘。

读message

  • 消息直接从page cache转入socket发送出去。
  • 当从page cache没有找到相应数据时,此时会产生磁盘IO,从磁 盘Load消息到page cache,然后直接从socket发出去

Kafka高效文件存储设计特点

  1. Kafka把topic中一个parition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完文件,减少磁盘占用。
  2. 通过索引信息可以快速定位message和确定response的最大大小。
  3. 通过index元数据全部映射到memory,可以避免segment file的IO磁盘操作。
  4. 通过索引文件稀疏存储,可以大幅降低index文件元数据占用空间大小。

无论消息是否被消费,kafka都会保存所有的消息,旧消息删除策略
1、 基于时间,默认配置是168小时(7天)。
2、 基于大小,默认配置是1073741824。

Kafka如何保证生产消息可靠性

副本机制

副本机制(Replication):也可以称之为备份机制,通常是指分布式系统在多台互联网的机器上保存相同的数据拷贝,副本机制有什么好处?

  • 提供数据冗余:即使系统部分组件失效,系统依然能够继续运转,因而增加了整体可用性以及数据持久性
  • 提供高伸缩性:支持横向扩展,能够通过添加机器的方式来提升读的性能,进而提高读操作吞吐量
  • 改善数据局部性:允许将数据放入与用户地理位置相近的地方,从而降低系统延时

kafka是有主题概念的,而每一个主题又进一步划分成若干个分区。副本的概念实际上是在分区层级下定义的,每个分区配置有多若干个副本。
所谓的副本,本质上就是一个只能追加写消息的提交日志,根据kafka副本机制的定义,同一个分区下的所有副本保存有相同的消息序列,这些副本分散的保存在不同的Broker上,从而能够对抗部分Broker宕机带来的数据不可用。
image

kafka有了副本机制是否会发生数据丢失?

会。写入数据都是往某个Partition的Leader写入的,然后那个Partition的Follower会从Leader同步数据,但是这个同步过程是异步的。也就是说如果此时1条数据刚写入Leader Partition1,还没来得及同步给Follower,Leader Partiton1所在机器突然就宕机了的话,此时就会选举Partition1的Follower作为新的Leader对外提供服务,然后用户就读不到刚才写入的那条数据了。因为Partition0的Follower上是没有同步到最新的一条数据的,这个时候就会造成数据丢失的问题。

Kafka的ISR机制

此处的Leader是Partition的Leader,而不是Broker的Leader

这个机制简单来说,就是会自动给每个Partition维护一个ISR列表,这个列表里一定会有Leader,然后还会包含跟Leader保持同步的Follower。也就是说,只要Leader的某个Follower一直跟他保持数据同步,那么就会存在于ISR列表里。
但是如果Follower因为自身发生一些问题,导致不能及时的从Leader同步数据过去,那么这个Follower就会被认为是“out-of-sync”,从ISR列表里移除。

怎么保证Kafka写入的数据不丢失?

  1. 每个Partition都至少得有1个Follower在ISR列表里,跟上了Leader的数据同步
  2. 每次写入数据的时候,都要求至少写入Partition Leader成功,同时还有至少一个ISR里的Follower也写入成功,才算这个写入是成功了
  3. 如果不满足上述两个条件,那就一直写入失败,让生产系统不停的尝试重试,直到满足上述两个条件,然后才能认为写入成功
  4. 这个时候万一leader宕机,就可以切换到那个follower上去,那么Follower上是有刚写入的数据的,此时数据就不会丢失了。

关于第二点就需要去配置相应ack参数,才能保证写入Kafka的数据不会丢失。

Kafka的ack消息模式

acks参数,是在Kafka Producer,也就是生产者里设置的。
这个参数实际上有三种常见的值可以设置,分别是:0、1 和 all。

  • 0: Producer 不等待 Broker 的 ACK,这提供了最低延迟,Broker 一收到数据还没有写入磁盘就已经返回,当 Broker 故障时有可能丢失数据。
  • 1: Producer 等待 Broker 的 ACK,Partition 的 Leader 落盘成功后返回 ACK,如果在 Follower 同步成功之前 Leader 故障,那么将会丢失数据。
  • -1 (all): Producer 等待 Broker 的 ACK,Partition 的 Leader 和 Follower 全部落盘成功后才返回 ACK。但是在 Broker 发送 ACK 时,Leader 发生故障,则会造成数据重复。

副本选举实现原理

当Leader副本宕机之后,会从ISR同步副本列表中剔除,然后取出剩下的ISR列表中第一个为Lader副本,显然还有可能数据没有及时同步完成,当选择为Leader副本之后,数据还会可能存在丢失的情况。
image

副本故障处理机制

  • LEO:每个副本数据最后一个的offset或者最大的offset值。
  • HW 消费者能够看见到的最大offset值。

image

Follower节点发生故障原理

当我们follower2节点如果宕机之后,就会从ISR列表中剔除,有突然恢复了,则开始同步Leader 节点的数据。
如何同步:如果follower 的leo不等于Leader 节点的leo,则开始截取高于当前hw位置的log,从该hw位置开始同步Leader 节点数据,如果该follower的leo大于该分区的hw,则从新加入isr列表中。
Kafka实现集群,保证每个副本的数据一致性问题,但是不能保证消息不会丢失

发生该问题如何解决?
生产者投递消息采用日志形式记录下来,如果消费者消费成功之后,在可以将该消息给删除。

Leader节点发生故障原理

如果Leader节点宕机之后,会从新在剩余的isr列表中,选举一个新的Leader节点。为了保证每个节点中副本一致性的问题,会将高与hw位置的log给截取掉。
所以我们kafka为了严格意义上,保证每个节点副本数据一致性问题,但是不能保证数据不丢失。
概率:—-非常低。
解决办法:
生产者投递消息的时候采用日志记录的方式,如果发生Leader变为follower 部分的消息被丢失的情况下,我们可以使用生产投递日志实现补偿。

Kafka选举原理控制器原理

控制器组件(Controller),是 Apache Kafka 的核心组件。它的主要作用是在 Apache ZooKeeper 的帮助下管理和协调整个 Kafka 集群。在分布式系统中,通常需要有一个协调者,该协调者会在分布式系统发生异常时发挥特殊的作用。在Kafka中该协调者称之为控制器(Controller),其实该控制器并没有什么特殊之处,它本身也是一个普通的Broker,只不过需要负责一些额外的工作(追踪集群中的其他Broker,并在合适的时候处理新加入的和失败的Broker节点、Rebalance分区、分配新的leader分区等)。值得注意的是:Kafka集群中始终只有一个Controller Broker。

Controller Broker是如何被选出来的

Broker 在启动时,会尝试去 ZooKeeper 中创建 /controller 节点。Kafka 当前选举控制器的规则是:第一个成功创建 /controller 节点的 Broker 会被指定为控制器
image
Kafka的Broker Controller通过注册一个controller临时节点节点进行竞选,如果未set成功,则watch该节点,等待成为controller

Controller Broker的具体作用是什么

Controller Broker的主要职责有很多,主要是一些管理行为,主要包括以下几个方面:

  • 创建、删除主题,增加分区并分配leader分区
  • 集群Broker管理(新增 Broker、Broker 主动关闭、Broker 故障)
  • preferred leader选举
  • 分区重分配

主题管理

这里的主题管理,就是指控制器帮助我们完成对 Kafka 主题的创建、删除以及分区增加的操作。换句话说,当我们执行kafka-topics 脚本时,大部分的后台工作都是控制器来完成的。

分区重分配

分区重分配主要是指,kafka-reassign-partitions 脚本提供的对已有主题分区进行细粒度的分配功能。这部分功能也是控制器实现的。

集群成员管理

自动检测新增 Broker、Broker 主动关闭及被动宕机。这种自动检测是依赖于前面提到的 Watch 功能和 ZooKeeper 临时节点组合实现的。比如,控制器组件会利用Watch 机制检查 ZooKeeper 的 /brokers/ids 节点下的子节点数量变更。目前,当有新 Broker 启动后,它会在 /brokers 下创建专属的 znode 节点。一旦创建完毕,ZooKeeper 会通过 Watch 机制将消息通知推送给控制器,这样,控制器就能自动地感知到这个变化,进而开启后续的新增 Broker 作业。
侦测 Broker 存活性则是依赖于刚刚提到的另一个机制:临时节点。每个 Broker 启动后,会在 /brokers/ids 下创建一个临时 znode。当 Broker 宕机或主动关闭后,该 Broker 与 ZooKeeper 的会话结束,这个 znode 会被自动删除。同理,ZooKeeper 的 Watch 机制将这一变更推送给控制器,这样控制器就能知道有 Broker 关闭或宕机了,从而进行“善后”。

数据服务

控制器的最后一大类工作,就是向其他 Broker 提供数据服务,控制器上保存了最全的集群元数据信息,其他所有 Broker 会定期接收控制器发来的元数据更新请求,从而更新其内存中的缓存数据。
当控制器发现一个 broker 离开集群(通过观察相关 ZooKeeper 路径),控制器会收到消息:这个 broker 所管理的那些分区需要一个新的 Leader。控制器会依次遍历每个分区,确定谁能够作为新的 Leader,然后向所有包含新 Leader 或现有 Follower 的分区发送消息,该请求消息包含谁是新的 Leader 以及谁是 Follower 的信息。随后,新的 Leader 开始处理来自生产者和消费者的请求,Follower 用于从新的 Leader 那里进行复制。

消费者Rebalance机制(再平衡)

rebalance就是说如果消费组里的消费者数量有变化或消费的分区数有变化,kafka会重新分配消费者消费分区的关系。比如consumer group中某个消费者挂了,此时会自动把分配给他的分区交给其他的消费者,如果他又重启了,那么又会把一些分区重新交还给他。
注意:rebalance只针对subscribe这种不指定分区消费的情况,如果通过assign这种消费方式指定了分区,kafka不会进行rebanlance。
如下情况可能会触发消费者rebalance:

  • 消费组里的consumer增加或减少了
  • 动态给topic增加了分区
  • 消费组订阅了更多的topic

主要有三种rebalance的策略:range()、round-robin(轮询)、sticky(粘性)。
Kafka 提供了消费者客户端参数partition.assignment.strategy 来设置消费者与订阅主题之间的分区分配策略。默认情况为range分配策略。

  1. range策略就是按照分区序号排序(范围分配),假设 n=分区数/消费者数量 = 3, m=分区数%消费者数量 = 1,那么前 m 个消费者每个分配 n+1 个分区,后面的(消费者数量-m )个消费者每个分配 n 个分区。比如分区03给一个consumer,分区46给一个consumer,分区7~9给一个consumer。
  2. round-robin策略就是轮询分配,比如分区0、3、6、9给一个consumer,分区1、4、7给一个consumer,分区2、5、8给一个consumer
  3. sticky策略初始时分配策略与round-robin类似,但是在rebalance的时候,需要保证如下两个原则。
    1. 分区的分配要尽可能均匀
    2. 分区的分配尽可能与上次分配的保持相同。
    3. 当两者发生冲突时,第一个目标优先于第二个目标 。这样可以最大程度维持原来的分区分配的策略。比如对于第一种range情况的分配,如果第三个consumer挂了,那么重新用sticky策略分配的结果如下:
      1. consumer1除了原有的0~3,会再分配一个7
      2. consumer2除了原有的4~6,会再分配8和9

同步副本(in-sync replica ,ISR)列表

ISR中的副本都是与Leader进行同步的副本,所以不在该列表的follower会被认为与Leader是不同步的. 那么,ISR中存在是什么副本呢?首先可以明确的是:Leader副本总是存在于ISR中。 而follower副本是否在ISR中,取决于该follower副本是否与Leader副本保持了“同步”。
始终保证拥有足够数量的同步副本是非常重要的。要将follower提升为Leader,它必须存在于同步副本列表中。每个分区都有一个同步副本列表,该列表由Leader分区和Controller进行更新。
选择一个同步副本列表中的分区作为leader 分区的过程称为clean leader election。注意,这里要与在非同步副本中选一个分区作为leader分区的过程区分开,在非同步副本中选一个分区作为leader的过程称之为unclean leader election。由于ISR是动态调整的,所以会存在ISR列表为空的情况,通常来说,非同步副本落后 Leader 太多,因此,如果选择这些副本作为新 Leader,就可能出现数据的丢失。毕竟,这些副本中保存的消息远远落后于老 Leader 中的消息。在 Kafka 中,选举这种副本的过程可以通过Broker 端参数 *_*unclean.leader.election.enable _控制是否允许 Unclean 领导者选举。开启 Unclean 领导者选举可能会造成数据丢失,但好处是,它使得分区 Leader 副本一直存在,不至于停止对外提供服务,因此提升了高可用性。反之,禁止 Unclean Leader 选举的好处在于维护了数据的一致性,避免了消息丢失,但牺牲了高可用性。分布式系统的CAP理论说的就是这种情况。
不幸的是,
unclean leader election
*的选举过程仍可能会造成数据的不一致,因为同步副本并不是
*完全同步的。由于复制是异步**完成的,因此无法保证follower可以获取最新消息。比如Leader分区的最后一条消息的offset是100,此时副本的offset可能不是100,这受到两个参数的影响:

  • replica.lag.time.max.ms:同步副本滞后与leader副本的时间
  • zookeeper.session.timeout.ms:与zookeeper会话超时时间

脑裂现象

如果controller Broker 挂掉了,Kafka集群必须找到可以替代的controller,集群将不能正常运转。这里面存在一个问题,很难确定Broker是挂掉了,还是仅仅只是短暂性的故障。但是,集群为了正常运转,必须选出新的controller。如果之前被取代的controller又正常了,他并不知道自己已经被取代了,那么此时集群中会出现两台controller。
其实这种情况是很容易发生。比如,某个controller由于GC而被认为已经挂掉,并选择了一个新的controller。在GC的情况下,在最初的controller眼中,并没有改变任何东西,该Broker甚至不知道它已经暂停了。因此,它将继续充当当前controller,这是分布式系统中的常见情况,称为脑裂。
假如,处于活跃状态的controller进入了长时间的GC暂停。它的ZooKeeper会话过期了,之前注册的/controller节点被删除。集群中其他Broker会收到zookeeper的这一通知。
image
由于集群中必须存在一个controller Broker,所以现在每个Broker都试图尝试成为新的controller。假设Broker 2速度比较快,成为了最新的controller Broker。此时,每个Broker会收到Broker2成为新的controller的通知,由于Broker3正在进行”stop the world”的GC,可能不会收到Broker2成为最新的controller的通知。
image
等到Broker3的GC完成之后,仍会认为自己是集群的controller,在Broker3的眼中好像什么都没有发生一样。
image
现在,集群中出现了两个controller,它们可能一起发出具有冲突的命令,就会出现脑裂的现象。如果对这种情况不加以处理,可能会导致严重的不一致。所以需要一种方法来区分谁是集群当前最新的Controller。
Kafka是通过使用epoch number(纪元编号,也称为隔离令牌)来完成的。epoch number只是单调递增的数字,第一次选出Controller时,epoch number值为1,如果再次选出新的Controller,则epoch number将为2,依次单调递增。
每个新选出的controller通过Zookeeper 的条件递增操作获得一个全新的、数值更大的epoch number 。其他Broker 在知道当前epoch number 后,如果收到由controller发出的包含较旧(较小)epoch number的消息,就会忽略它们,即Broker根据最大的epoch number来区分当前最新的controller。

image
上图,Broker3向Broker1发出命令:让Broker1上的某个分区副本成为leader,该消息的epoch number值为1。于此同时,Broker2也向Broker1发送了相同的命令,不同的是,该消息的epoch number值为2,此时Broker1只听从Broker2的命令(由于其epoch number较大),会忽略Broker3的命令,从而避免脑裂的发生。

Kafka优化策略

常见核心配置

内存缓冲的大小:buffer.memory

  1. 生产者投递消息先存放在本地缓冲区中,将消息组装成n多个不同的Batch,在通过send线程将缓冲区的数据批量的形式发送给kafka服务器端存放。
  2. 生产者本地内存缓冲区如果设置太小了,在高并发情况下有可能会发生内存溢出,导致生产者无法继续写入消息到缓冲区卡死。
  3. 实际生产环境中,根据压力测试情况下,合理设置内存缓冲区大小。

参数:buffer.memory

最大请求大小:“max.request.size”

该参数kafkamq服务器端限制接受的消息

重试策略“retries”和“retries.backoff.ms”

该参数设置定重试的次数、间隔时间

确认机制:acks

建议设置为1
ACK 参数配置:

  1. 0:Producer 不等待 Broker 的 ACK,这提供了最低延迟,Broker 一收到数据还没有写入磁盘就已经返回,当 Broker 故障时有可能丢失数据。
  2. 1:Producer 等待 Broker 的 ACK,Partition 的 Leader 落盘成功后返回 ACK,如果在 Follower 同步成功之前 Leader 故障,那么将会丢失数据。
  3. -1(all):Producer 等待 Broker 的 ACK,Partition 的 Leader 和 Follower 全部落盘成功后才返回 ACK。但是在 Broker 发送 ACK 时,Leader 发生故障,则会造成数据重复。

具体看业务要求:-1 all 延迟概率一定很低 0 延迟概率为适中1

消费者分区的个数

消费者怎么知道我应该从哪个位置开始消费呢?应该有一个记录分组对应消费分区offset位置。

  • 在老的版本kafka中是记录在zk上,记录在zk上频繁读写操作,性能不是很好。
  • 在新的版本kafka中,消费者消费分区中的消息是记录在topic主题日志文件中,默认的情况下分成50个文件记录。

为什么消费者需要使用50个文件记录消费者消费记录呢?

  • 如果消费者(分组)比较多的,都记录在同一个日志文件中,读写操作就非常麻烦。

消费者怎么知道我应该读取那个日志文件 知道从那个offset开始消费呢?

  1. 消费者消费消息的时候:key=group-id.topic.partition
  2. group-id.topic.partition=mayikt.mttopic.0
  3. (key=group-id.topic.partition)%consumer_offsets.size(50)=12
  4. Offset消费记录 记录在consumer_offsets-12文件夹
  5. 记录Offset消费记录 的是consumer分组对应消费记录 不是记录单个消费者消费记录。

–记录当前分组消费的记录—
offsets.topic.replication.factor 参数的约束,默认值为3(注意:该参数的使用限制在0.11.0.0版本发生变化),分区数可以通过 offsets.topic.num.partitions 参数设置,默认值为50。

优化策略

Broker优化

  1. replica复制配置

follow从leader拉取消息进行同步数据

1
2
3
4
num.replica.fetchers  拉取线程数 配置多可以提高follower的I/O并发度,单位时间内leader持有更多请求,相应负载会增大,需要根据机器硬件资源做权衡
replica.fetch.min.bytes=1 拉取最小字节数 默认配置为1字节,否则读取消息不及时
replica.fetch.max.bytes= 5 * 1024 * 1024 拉取最大字节数 默认为1MB,这个值太小,5MB为宜,根据业务情况调整
replica.fetch.wait.max.ms follow 最大等待时间
  1. 压缩速度 compression.type:压缩的速度上lz4=snappy<gzip。

Produer优化

幂等性:enable.idempotence
是否使用幂等性。如果设置为true,表示producer将确保每一条消息只会存放一份;如果设置为false,则表示producer因发送数据到broker失败重试使,可能往数据流中写入多分重试的消息。
注意:如果使用idempotence,即enable.idempotence为true,那么要求配置项max.in.flight.requests.per.connection的值必须小于或等于5;配置项retries的值必须大于0;acks配置项必须设置为all。如果这些值没有被用户明确地设置,那么系统将自动选择合适的值。如果设置  的值不合适,那么会抛出ConfigException异常。

网络和IO线程配置优化

1.num.network.threads:Broker处理消息的最大线程数
2.num.io.threads:Broker处理磁盘IO的线程数
一般num.network.threads主要处理网络io,读写缓冲区数据,配置线程数量为cpu核数加1
num.io.threads主要进行磁盘io操作,高峰期可能会发生IO等待,因此配置需要大些,配置线程数量为cpu核数2倍,最大不超过3倍.

日志保留策略配置

生产者投递消息到kafka的mq中,消费者获取到消息之后不会立即被删除,会有一个日志保留策略。

  1. 减少日志保留时间,建议三天或则更多时间。log.retention.hours=72
  2. 分段文件配置1GB, 默认是500mb 有利于快速回收磁盘空间,重启kafka加载也会加快(如果文件过小,则文件数量比较多,kafka启动时是单线程扫描目录(log.dir)下所有数据文件),文件较多时性能会稍微降低。log.segment.bytes=1073741824
    1
    2
    3
    4
    5
    6
    ##日志滚动的周期时间,到达指定周期时间时,强制生成一个新的segment
    log.roll.hours=72
    ##segment的索引文件最大尺寸限制,即时log.segment.bytes没达到,也会生成一个新的segment
    log.index.size.max.bytes=10*1024*1024
    ##控制日志segment文件的大小,超出该大小则追加到一个新的日志segment文件中(-1表示没有限制)
    log.segment.bytes=1014*1024*1024

log数据文件刷盘策略

当我们把数据写入到文件系统之后,数据其实在操作系统的page cache里面,并没有刷到磁盘上去。如果此时操作系统挂了,其实数据就丢了。

  1. 每当producer写入10000条消息时,刷数据到磁盘 配置为:log.flush.interval.messages=10000
  2. 每间隔1秒钟时间,刷数据到磁盘。log.flush.interval.ms=1000

配置优化案例(重要)

Broker
num.replica.fetchers: 适量提高同步leader副本线程
Producer
inger.ms 0 或者1 定时将批量消息发送到Broker中
Consumer
auto.commit.enable —配置为手动提交offset

Docker环境下的Kafka环境搭建(单机)

https://www.lixueduan.com/posts/kafka/01-install/

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
version: "3"
services:
zookeeper:
image: 'bitnami/zookeeper:latest'
ports:
- '2181:2181'
environment:
# 匿名登录--必须开启
- ALLOW_ANONYMOUS_LOGIN=yes
#volumes:
#- ./zookeeper:/bitnami/zookeeper
# 该镜像具体配置参考 https://github.com/bitnami/bitnami-docker-kafka/blob/master/README.md
kafka:
image: 'bitnami/kafka:2.8.0'
ports:
- '9092:9092'
- '9999:9999'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
# 客户端访问地址,更换成自己的
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
# 允许使用PLAINTEXT协议(镜像中默认为关闭,需要手动开启)
- ALLOW_PLAINTEXT_LISTENER=yes
# 关闭自动创建 topic 功能
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
# 全局消息过期时间 6 小时(测试时可以设置短一点)
- KAFKA_CFG_LOG_RETENTION_HOURS=6
# 开启JMX监控
- JMX_PORT=9999
#volumes:
#- ./kafka:/bitnami/kafka
depends_on:
- zookeeper
# Web 管理界面 另外也可以用exporter+prometheus+grafana的方式来监控 https://github.com/danielqsj/kafka_exporter
kafka_manager:
image: 'hlebalbau/kafka-manager:latest'
ports:
- "9000:9000"
environment:
ZK_HOSTS: "zookeeper:2181"
APPLICATION_SECRET: letmein
depends_on:
- zookeeper
- kafka

SpringBoot启动和配置kafka

SpringBoot中很多kafka的使用技巧,简单记录和探索

1. 引入依赖

引入基本的maven的pom文件,包含kafka的Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

2. 使用Spring Test启动一个kafka

只要maven项目中引入spring-kafka-test就可以直接使用springboot启动一个kafka的Server,非常方便在开发阶段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 启动kafka dev环境的实例
*/
@SpringBootTest(classes = ApplicationTests.class)
@EmbeddedKafka(count = 4, ports = {9092, 9093, 9094, 9095})
public class ApplicationTests {

/**
* 启动kafka
*
* dev test模式
*
* @throws IOException
*/
@Test
public void startServer() throws IOException {
System.in.read();
}
}

3. 初步测试

3.1 编写生产者和消费者的demo代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
/**
* kafka的demo
*
* @author zhangshuaike
*/
@SpringBootApplication
@RestController
public class KafkaDemoApplication {

public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}


@Autowired
private KafkaTemplate<Object, Object> template;

@GetMapping("/send/{input}")
public void sendFoo(@PathVariable String input) {
this.template.send("topic_input", input);
}

@KafkaListener(id = "webGroup", topics = "topic_input")
public void listen(String input) {
System.out.println("input value:" + input);
}
}

3.2 发送测试

1
curl http://127.0.0.1:8080/send/test

4. 带回调函数的生产者

kafkaTemplate提供了一个回调方法addCallback,我们可以在回调方法中监控消息是否发送成功 或 失败时做补偿处理,有两种写法。

4.1 第一种

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@GetMapping("/one/{message}")
public void sendMessage2(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("topic1", callbackMessage).addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
System.out.println("发送消息失败:" + failure.getMessage());
});
}

4.2 第二种

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@GetMapping("/two/{message}")
public void sendMessage3(@PathVariable("message") String callbackMessage) {
kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
System.out.println("发送消息失败:"+ex.getMessage());
}

@Override
public void onSuccess(SendResult<String, Object> result) {
System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
}
});
}

5. 自定义分区器

kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体要追加到哪个分区?这就是分区策略,Kafka 为我们提供了默认的分区策略,同时它也支持自定义分区策略。其路由机制为:

  1. 若发送消息时指定了分区(即自定义分区策略),则直接将消息append到指定分区;
  2. 若发送消息时未指定 patition,但指定了 key(kafka允许为每条消息设置一个key),则对key值进行hash计算,根据计算结果路由到指定分区,这种情况下可以保证同一个 Key 的所有消息都进入到相同的分区;
  3. patition 和 key 都未指定,则使用kafka默认的分区策略,轮询选出一个 patition;

我们自定义一个分区策略,将消息发送到我们指定的partition,首先新建一个分区器类实现Partitioner接口,重写方法,其中partition方法的返回值就表示将消息发送到几号分区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class CustomizePartitioner implements Partitioner {
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
//自定义分区规则(这里假设全部发到0号分区)

return 0;
}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> map) {

}
}

在application.propertise中配置自定义分区器,配置的值就是分区器类的全路径名

1
2
# 自定义分区器
spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner

6.kafka事务提交

如果在发送消息时需要创建事务,可以使用 KafkaTemplate 的 executeInTransaction 方法来声明事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* 发送事务消息,需要处理以下事项:
* 1. 设置spring
* kafka:
* producer:
* retries: 3 #重试次数
* acks: all
* # 加事务前缀,自动给producer开启事务,所有加
* transaction-id-prefix: tx_
* 2.方法上增加 @Transactional
*/
@GetMapping("/send")
@SuppressWarnings("all")
public void sendMessageTransaction(){
//生命事务,后面报错消息不会发出去
kafkaTemplate.executeInTransaction(operations ->{
operations.send("transaction","慢慢沉淀");
throw new RuntimeException("fail");
});
//不声明事务,后面保存但前端消息已经发送成功了
kafkaTemplate.send("transaction","慢慢沉淀,但是我不带事务");
throw new RuntimeException("fail");
}

7. 消费者指定参数

指定topic、partition、offset消费
前面我们在监听消费topic1的时候,监听的是topic1上所有的消息,如果我们想指定topic、指定partition、指定offset来消费呢?也很简单,@KafkaListener注解已全部为我们提供。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* @Title 指定topic、partition、offset消费
* @Description 同时监听topic1和topic2,监听topic1的0号分区、
* topic2的 "0号和1号" 分区,指向1号分区的offset初始值为8
* @param record
*/
@KafkaListener(id="consumer1",groupId = "felix-group",topicPartitions = {
@TopicPartition(topic = "topic1",partitions = {"0"}),
@TopicPartition(topic = "topic2",partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1",initialOffset = "8"))
})
public void onMessage2(ConsumerRecord<?,?> record){
System.out.println("topic:"+record.topic()+"partition:"+record.partition()+"offset:"+record.offset()+"value:"+record.value());
}

属性解释:

  1. id:消费者ID;
  2. groupId:消费组ID;
  3. topics:监听的topic,可监听多个;
  4. topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听。

上面onMessage2监听的含义:监听topic1的0号分区,同时监听topic2的0号分区和topic2的1号分区里面offset从8开始的消息。
注意:topics和topicPartitions不能同时使用;

8.消费者批量消费

8.1设置application.properties开启批量消费即可

1
2
3
4
# 设置批量消费
spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=50

8.2 接收消息时用List来接收,监听代码如下:

1
2
3
4
5
6
7
@KafkaListener(id="consumer2",groupId = "felix-group",topics = "topic1" )
public void onMesssage(List<ConsumerRecord<?,?>> records){
System.out.println(">>>批量消费一次,records.size()="+records.size());
for(ConsumerRecord<?,?> record:records){
System.out.println(record.value());
}
}

9.ConsumerAwareListenerErrorHandler异常处理器

通过异常处理器,我们可以处理consumer在消费时发生的异常。
新建一个 ConsumerAwareListenerErrorHandler 类型的异常处理方法,用@Bean注入,BeanName默认就是方法名,然后我们将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面,当监听抛出异常的时候,则会自动调用异常处理器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//异常处理
// 新建一个异常处理器,用@Bean注入
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler(){
return (message,exception,consumer)->{
System.out.println("消费异常:"+message.getPayload());
return null;
};
}

//将这个异常处理器的BeanName放到@KafkaListener注解的errorHandler属性里面
@KafkaListener(topics = {"topic1"},errorHandler = "consumerAwareErrorHandler")
public void onMessage4(ConsumerRecord<?,?> record) throws Exception{
throw new Exception("简单消费-模拟异常");
}

// 批量消费也一样,异常处理器的message.getPayload()也可以拿到各条消息的信息
@KafkaListener(topics = "topic1",errorHandler="consumerAwareErrorHandler")
public void onMessage5(List<ConsumerRecord<?,?>> records) throws Exception{
System.out.println("批量消费一次...");
throw new Exception("批量消费-模拟异常");
}

10.消费过滤器

消息过滤器可以在消息抵达consumer之前被拦截,在实际应用中,我们可以根据自己的业务逻辑,筛选出需要的信息再交由KafkaListener处理,不需要的消息则过滤掉。
配置消息过滤只需要为 监听器工厂 配置一个RecordFilterStrategy(消息过滤策略),返回true的时候消息将会被抛弃,返回false时,消息能正常抵达监听容器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.example.kafka.consumer;


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
public class KafkaConsumer {

@Autowired
ConsumerFactory consumerFactory;

//消息过滤器
@Bean
public ConcurrentKafkaListenerContainerFactory filterContainerFactory(){
ConcurrentKafkaListenerContainerFactory factory=new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory);
//被过滤器的消息将被丢弃
factory.setAckDiscarded(true);
//消息过滤策略
factory.setRecordFilterStrategy(consumerRecord -> {
if(Integer.parseInt(consumerRecord.value().toString())%2==0){
return false;
}
//返回true消息则被过滤
return true;
});
return factory;
}

//消息过滤监听
@KafkaListener(topics = {"topic1"},containerFactory = "filterContainerFactory")
public void onMessage6(ConsumerRecord<?,?> record){
System.out.println(record.value());
}
}

上面实现了一个”过滤奇数、接收偶数”的过滤策略,我们向topic1发送0-99总共100条消息,看一下监听器的消费情况,可以看到监听器只消费了偶数,

11. 消息转发

在实际开发中,我们可能有这样的需求,应用A从TopicA获取到消息,经过处理后转发到TopicB,再由应用B监听处理消息,即一个应用处理完成后将该消息转发至其他应用,完成消息的转发。
在SpringBoot集成Kafka实现消息的转发也很简单,只需要通过一个@SendTo注解,被注解方法的return值即转发的消息内容,如下

1
2
3
4
5
6
7
8
9
10
11
/**
* @Title 消息转发
* @Description 从topic1接收到的消息经过处理后转发到topic2
* @param record
* @return
*/
@KafkaListener(topics = {"topic"})
@SendTo("topic2")
public String onMessage7(ConsumerRecord<?,?> record){
return record.value()+"-forward message";
}

12. 定时启动,停止监听器

默认情况下,当消费者项目启动的时候,监听器就开始工作,监听消费发送到指定topic的消息,那如果我们不想让监听器立即工作,想让它在我们指定的时间点开始工作,或者在我们指定的时间点停止工作,该怎么处理呢——使用KafkaListenerEndpointRegistry,下面我们就来实现:
① 禁止监听器自启动;
② 创建两个定时任务,一个用来在指定时间点启动定时器,另一个在指定时间点停止定时器;
新建一个定时任务类,用注解@EnableScheduling声明,KafkaListenerEndpointRegistry 在SpringIO中已经被注册为Bean,直接注入,设置禁止KafkaListener自启动,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@EnableScheduling
@Component
public class CronTimer {
/**
* @KafkaListener注解所标注的方法并不会在IOC容器中被注册为Bean,
* 而是会被注册在KafkaListenerEndpointRegistry中,
* 而KafkaListenerEndpointRegistry在SpringIOC中已经被注册为Bean
**/
@Autowired
private KafkaListenerEndpointRegistry registry;

@Autowired
private ConsumerFactory consumerFactory;

// 监听器容器工厂(设置禁止KafkaListener自启动)
@Bean
public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
container.setConsumerFactory(consumerFactory);
//禁止KafkaListener自启动
container.setAutoStartup(false);
return container;
}

// 监听器
@KafkaListener(id="timingConsumer",topics = "topic1",containerFactory = "delayContainerFactory")
public void onMessage1(ConsumerRecord<?, ?> record){
System.out.println("消费成功:"+record.topic()+"-"+record.partition()+"-"+record.value());
}

// 定时启动监听器
@Scheduled(cron = "0 42 11 * * ? ")
public void startListener() {
System.out.println("启动监听器...");
// "timingConsumer"是@KafkaListener注解后面设置的监听器ID,标识这个监听器
if (!registry.getListenerContainer("timingConsumer").isRunning()) {
registry.getListenerContainer("timingConsumer").start();
}
//registry.getListenerContainer("timingConsumer").resume();
}

// 定时停止监听器
@Scheduled(cron = "0 45 11 * * ? ")
public void shutDownListener() {
System.out.println("关闭监听器...");
registry.getListenerContainer("timingConsumer").pause();
}
}

启动项目,触发生产者向topic1发送消息,可以看到consumer没有消费,因为这时监听器还没有开始工作

参考文章

Kafka文件存储机制那些事

Kafka如何保证消息的可靠性_我是你亲爱的航哥的博客-CSDN博客_kafka保证消息可靠性

谈谈你对Kafka副本Leader选举原理的理解?-51CTO.COM

kafka的实现原理_kafka_八两_InfoQ写作社区

SpringBoot整合kafka - 掘金

Kafka的Controller Broker是什么

spring boot集成kafka之spring-kafka深入探秘 - 凯京科技的个人空间 - OSCHINA - 中文开源技术交流社区

 评论