前言
Kafka 的基础知识点还是蛮多的,本文针对 Kafka 的一些面试过程中常见的基础知识进行全面总结,方便大家进行查漏补缺。
正文
一. Broker概念
在 Kafka 中,一个 Kafka 服务端的实例,就叫做一个 Broker 。已知 Kafka 使用 Zookeeper 来维护 Kafka 集群信息,如下图所示。
Kafka-Zookeeper存储信息脑图
Broker 启动时,会在 Zookeeper 的 /brokers/ids 路径上创建临时节点,将自己的 id 注册到 Zookeeper 。
Kafka 组件会订阅 Zookeeper 的 /brokers/ids 路径,当有 Broker 加入或者退出集群时,这些 Kafka 组件就能够获得通知。
二. Topic概念
可以在 Broker 上创建 Topic ,生产者向 Topic 发送消息,消费者订阅 Topic 并从 Topic 拉取消息。可以用下图进行示意。
Kafka-Topic示意图
三. Partition概念
一个 Topic 可以有多个分区,这里的分区就叫做 Partition ,分区的作用是提高 Kafka 的吞吐量。
分区可以用下图进行示意。
Kafka-Partition示意图
可以用下面的指令在创建 Topic 的时候指定分区,指令如下所示。
./kafka-topics.sh --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --replication-factor 3 --partitions 3 --create --topic mytopic-0
上述指令会创建一个分区数为 3 ,副本数为 3 的 Topic 。
一个分区可以有多个副本,且一个分区的多个副本不能在同一个 Broker 上,例如只有 3 个 Broker ,但是创建 Topic 的时候,指定副本数为 4 ,此时创建 Topic 会失败。
分区的多个副本可以分为 leader 节点和 follower 节点,且 leader 节点为客户端提供 读写 功能, follower 节点会从 leader 节点同步数据但不提供读写功能,这样能避免出现读写不一致的问题。
创建 Topic 时,会在 Broker 上为这个 Topic 的分区创建目录,如下所示。
Topic分区目录创建图
分区目录的内容如下所示。
Topic分区目录内容图
每个文件含义如下。
- index 文件是索引文件 。记录消息的编号;
- timeindex 文件是时间戳索引 。记录生产者发送消息的时间和消息记录到日志文件的时间;
- log 是日志文件 。记录消息。
因为 log 文件会越来越大,此时根据 index 文件进行检索会影响效率,所以还需要对上述文件进行切分,切分出来的单位叫做 Segment ( 段 ),可以通过 log.segment.bytes 来控制一段文件的大小。 Segment 的示意如下。
00000000000000000000.index00000000000000000000.log00000000000000000000.timeindex00000000000000050000.index00000000000000050000.log00000000000000050000.timeindex
四.消费者组
消费者组是 Kafka 提供的可扩展且具有容错性的消费者机制。一个消费者组内存在多个消费者,这些消费者共享一个 Group ID 。消费者组示意如下。
Kafka-消费者组示意图
关于消费者组,有如下说明。
- 消费者组的不同消费者不能同时消费同一个 Partition ;
- 如果消费者组里消费者数量和 Partition 数量一样则一个消费者消费一个 Partition ;
- 如果消费者组里消费者数量大于 Partition 数量则部分消费者会无法消费到 Partition ;
- 如果消费者组里消费者数量小于 Partition 数量则部分消费者会消费多个 Partition 。
五.偏移量
偏移量,即 Consumer Offset ,用于存储消费者对 Partition 消费的位移量。
偏移量存储在 Kafka 的内部 Topic 中,这个内部 Topic 叫做_ consumer_offsets ,该 Topic 默认有 50 个分区,将消费者的 Group ID 进行 hash 后再对 50 取模,得到的结果对应的分区就会用于存储这个消费者的偏移量。
_ consumer_offsets 的每条消息格式示意图如下所示。
Kafka-消费者偏移量消息结构图
注意_ consumer_offsets 是存储在 Broker 上的。
六.生产者发送消息完整流程
生产者发送消息完整流程图如下所示。
Kafka-生产者发送消息流程图
结合上述流程图,对消息发送流程说明如下。
- 生产者生成消息 Record ;
- Record 经过拦截器链;
- key 和 value 进行序列化;
- 使用自定义或者默认的分区器获取 Record 所属分区 Partition ;
- Record 放入消息累加器 RecordAccumulator 。根据 Topic 和 Partition ,可以确定一个双端队列 Deque ,该队列每个节点为多条 Record 的合集即 ProducerBatch ,新 Record 会被添加到队列最后一个节点上;
- Sender 将相同 Broker 节点的可发送 ProducerBatch 合并到一个 Request 中并发送。 Sender 会持续扫描 RecordAccumulator 中的 ProducerBatch ,只要满足大小为 batch.size ( 默认 16K )或者最早 Record 等待已经超过 linger.ms ,该 ProducerBatch 就会被 Sender 收集,然后 Sender 会合并收集的相同 Broker 的 ProducerBatch 到一个 Request 中并发送;
- 缓存请求 Request 到 inFlightRequest 缓冲区中。 inFlightRequest 中为每个 Broker 分配了一个队列,新 Request 会添加到队列头,每个队列最多容纳的 Request 个数由 max.in.flight.requests.per.connection ( 默认为 5 )控制,队列满后不会生成新 Request ;
- Selector 发送请求到 Broker ;
- Broker 收到并处理 Request 后,对 Request 进行 ACK ;
- 客户端收到 Request 的 ACK 后,将 Request 从 inFlightRequest 中移除。
七.分区策略
Kafka 中消息的分区计算策略小结如下。
- 消息中指定了分区 。此时使用指定的分区;
- 消息中未指定分区但有自定义分区器 。此时使用自定义分区器计算分区;
- 消息中未指定分区也没有自定义分区器但消息键不为空 。此时对键求哈希值,并用求得的哈希值对 Topic 的分区数取模得到分区;
- 如果前面都不满足 。此时根据 Topic 取一个递增整数并对 Topic 分区数求模得到分区。
八. ISR机制
当生产者向服务端发送消息后,通常需要等待服务端的 ACK ,这一过程可以用下图进行示意。
Kafka-服务端响应Producer示意图
即 Producer 会将消息发送给 Topic 对应分区的 leader 节点,然后 leader 与 follower 进行同步,如果全部正常的 follower 同步成功( follower 完成消息落盘 ),那么服务端就可以向 Producer 发送 ACK 。
上面描述中的正常 follower 的集合,叫做 ISR ( In-Sync Replica Set ),只有与 leader 节点正常通信的 follower 才会被放入 ISR 中,换言之,只有 ISR 中的 follower 才有资格让 leader 等待同步结果。
如果 leader 挂掉,那么会在 ISR 中选择新的 leader 。
九. ACK机制
Producer 发送消息的时候,可以通过 acks 配置项来决定服务端返回 ACK 的策略,如下所示。
- acks 设置为 0 , Producer 不需要等待服务端返回 ACK ,即 Producer 不关心服务端是否成功将消息落盘;
- acks 设置为 1 , leader 成功将消息落盘便返回 ACK ,这是默认策略;
- acks 设置为 -1 , leader 和 ISR 中全部 follower 落盘成功才返回 ACK 。
十. Segment生成策略
分区在磁盘上由多个 Segment 组成,如下所示。
Topic分区目录内容图
有如下参数控制 Segment 的生成策略。
- log.segment.bytes 。用于设置单个 Segment 大小,当某个 Segment 的大小超过这个值后,就需要生成新的 Segment ;
- log.roll.hours 。用于设置每隔多少小时就生成新的 Segment ;
- log.index.size.max.bytes 。当 Segment 的 index 文件达到这个大小时,也需要生成新的 Segment 。
十一. index文件
使用 Kafka 提供的 kafka-dump-log.sh 工具,可以打开 index 文件,打开后的 index 文件可以表示如下。
offset: 613 position: 5252offset: 1284 position: 10986offset: 1803 position: 17491offset: 2398 position: 25792offset: 3422 position: 35309offset: 4446 position: 51690offset: 5470 position: 68071offset: 6494 position: 84452offset: 7518 position: 100833
上述示例中, offset 是偏移量, position 表示这个 offset 对应的消息在 log 文件里的位置。
index 文件建立的索引是稀疏索引,示意图如下。
Kafka-稀疏索引示例图
十二. timeindex文件
每一条被发送的消息都会记录时间戳,这里的时间戳可以是发送消息时间戳,或者是消息落盘时间戳。可以配置如下。
- log.message.timestamp.type 设置为 createtime 。表示发送消息时间戳;
- log.message.timestamp.type 设置为 logappendtime 。表示消息落盘时间戳。
十三.索引检索过程
- 根据 offset 找到在哪个 Segment 中;
- 从 Segment 的 index 文件根据 offset 找到消息的 position ;
- 根据 position 从 Segment 的 log 文件中最终找到消息。
十四. Partition存储总结
Partition 存储示意图如下。
Kafka-Partition存储示意图
十五. Kafka中的Controller选举
Controller 在 Kafka 集群中负责对整个集群进行协调管理,比如完成 分区分配 , Leader 选举 和 副本管理 等。
关于 Controller 的选举,有如下注意点。
- 启动时选举 。集群中 Broker 启动时会去 Zookeeper 创建临时节点 /controller ,最先创建成功的 Broker 会成为 Controller ;
- Controller 异常时选举 。如果 Controller 挂掉,此时其它 Broker 会通过 Watch 对象收到 Controller 变更的消息,然后就会尝试去 Zookeeper 创建临时节点 /controller ,只会有一个 Broker 创建成功,创建失败的 Broker 会再次创建 Watch 对象来监视新 Controller ;
- Borker 异常 。如果集群中某个非 Controller 的 Broker 挂掉,此时 Controller 会检查挂掉的 Broker 上是否有某个分区的 leader 副本,如果有,则需要为这个分区选举新的 leader 副本,并更新分区的 ISR 集合;
- Broker 加入 。如果有一个 Broker 加入集群,则 Controller 会去判断新加入的 Broker 中是否含有当前已有分区的副本,如果有,那么需要去从 leader 副本中同步数据。
十六.分区leader副本选举
一个分区有三种集合,如下所示。
- AR ( Assigned Replicas )。分区中的所有副本;
- ISR ( In-Sync Replicas )。与 leader 副本保持一定同步程度的副本, ISR 包括 leader 副本自身;
- OSR ( Out-of-Sync Replicas )。与 leader 副本同步程度滞后过多的副本。
上述三种集合的关系是 AR = ISR + OSR 。
分区 leader 副本选举有如下注意点。
- leader 副本会维护和跟踪 ISR 中所有副本与 leader 副本的同步程度,如果某个副本的同步程度滞后过多,则 leader 副本会将这个副本从 ISR 中移到 OSR 中;
- 当 OSR 中有副本重新与 leader 副本保持一定同步程度,则 leader 副本会将其从 OSR 中移到 ISR ;
- 当 leader 副本发生故障时, Controller 会负责为这个分区从 ISR 中选举新的 leader 副本。
十七.主从同步
分区的 leader 和 follower 之间的主从同步示意图如下。
Kafka-主从同步示意图
有两个重要概念如下所示。
- LEO ( Log End Offset )。下一条待写入消息的 Offset ;
- HW ( High Watermark )。 ISR 集合中的最小 LEO 。
那么对于上图而言, HW 为 6 ,那么消费者最多只能消费到 HW 之前的消息,也就是 Offset 为 5 的消息。
主从同步规则如下。
- follower 会向 leader 发送 fetch 请求,然后 leader 向 follower 发送数据;
- follower 接收到数据后,依次写入消息并且更新 LEO ;
- leader 最后会更新 HW 。
当 leader 或 follower 发生故障时,处理策略如下。
- 如果 follower 挂掉,那么当 follower 恢复后,需要先将 HW 和 HW 之后的数据丢弃,然后再向 leader 发起同步;
- 如果 leader 挂掉,则会先从 follower 中选择一个成为 leader ,然后其它 follower 把 HW 和 HW 之后的数据丢弃,然后再向 leader 发起同步。
十八. Kafka为什么快
- 顺序读写 ;
- 索引 ;
- 批量读写和文件压缩 ;
- 零拷贝 。
十九.零拷贝
如果要将磁盘中的文件内容,发送到远程服务器,则整个数据流转如下所示。
Kafka-传统IO示意图
步骤说明如下。
- 从磁盘文件读取文件内容,并拷贝到内核缓冲区;
- CPU 控制器将内核缓冲区的数据拷贝到用户缓冲区;
- 应用程序中调用 write() 方法,将用户缓冲区的数据拷贝到 Socket 缓冲区;
- 将 Socket 缓冲区的数据拷贝到网卡。
一共经历了四次拷贝( 和四次 CPU 上下文切换 ),其中如下两次拷贝是多余的。
- 内核缓冲区拷贝到用户缓冲区;
- 用户缓冲区拷贝到 Socket 缓冲区。
而 Kafka 中的零拷贝,就是将上述两次多余的拷贝省掉,示意图如下。
Kafka-零拷贝示意图
零拷贝步骤如下所示。
- 从磁盘文件读取文件内容,并拷贝到内核缓冲区;
- 将文件描述符和数据长度加载到 Socket 缓冲区;
- 将数据直接从内核缓冲区拷贝到网卡。
一共只会经历两次拷贝( 和两次 CPU 上下文切换 )。
总结不易,如果本文对你有帮助,烦请点赞,收藏加关注,谢谢帅气漂亮的你。
你的名字-002
作者:半夏之沫
原文:https://juejin.cn/post/7391034124374261775