Loading... # Kafka ### 为什么要使用消息队列? 消息队列在特定的场景下具有**解耦**、**异步**、**削峰**三大特点,当然还有分布式(不像ActiveMQ、RabbitMQ只是通过主从实现高可用)、高吞吐高可靠、支持离线批处理等优点。 #### 解耦  A 系统发送数据到 BCD 三个系统,通过接口调用发送。如果 E 系统也要这个数据呢?那如果 D系统现在不需要了呢? 如:用户系统对高危用户进行封号,需要调用风控系统、以及各个业务方。 在这个场景中,A 系统跟其它系统严重耦合,A 系统产生一条比较关键的数据,很多系统都需要 A 系统将这个数据发送过来。A 系统要时时刻刻考虑 BCDE 四个系统如果挂了该咋办?要不要重发?若重发,接收方怎么确定是重发数据?要不要把消息存起来? 如果使用 MQ,A 系统产生一条数据,广播到 MQ 里面去,哪个系统需要数据自己去 MQ 里面订阅。如果新系统需要数据,直接从 MQ 里订阅即可;如果某个系统不需要这条数据,就取消对 MQ 消息的订阅即可。这样下来,A 系统就不需要去考虑要给谁发送数据,不需要维护这个代码,也不需要考虑人家是否调用成功、失败超时等情况(交由MQ负责:重试、死信队列)。 **通过一个 MQ,Pub/Sub 发布订阅消息这一模型,A 系统就跟其它系统彻底解耦了。**  #### 异步 A 系统接收一个请求,需要在自己本地写库,还需要在 BCD 三个系统写库,自己本地写库要 3ms,BCD 三个系统分别写库要 300ms、450ms、200ms。最终请求总延时是 3 + 300 + 450 + 200 = 953ms,接近 1s。从发起请求需等待个 1s甚至更久,严重影响用户体验。 如:用户注册,需要用户系统写库,风控系统写库,触发风控规则后还需要推送到业务方,业务系统写库。  为了让用户无感知,我们只需要让A 系统连续发送 3 条消息到 MQ 队列中,假如耗时 5ms,A 系统从接受一个请求到返回响应给用户,总时长是 3 + 5 = 8ms。  #### 削峰 平时A 系统风平浪静,QPS就50 。一到各种福利活动限时特价期间 ,QPS就会突然会暴增到 5k+ 。但是系统是直接基于 MySQL 的,大量的请求涌入 MySQL,每秒钟对 MySQL 执行约 5k 条 SQL。 一般的 MySQL,扛到每秒 2k 个请求就差不多了,如果每秒请求到 5k 的话,可能就直接把 MySQL 给打死了,导致系统崩溃,用户也就没法再使用系统了。 但是高峰期一过,活动结束后就成了低峰期,可能也就 1w 不到的用户同时在网站上操作,对整个系统几乎没有任何的压力。 如果使用 MQ,每秒 5k 个请求写入 MQ,A 系统每秒钟最多处理 2k 个请求,因为 MySQL 每秒钟最多处理 2k 个。A 系统从 MQ 中慢慢拉取请求,每秒钟就拉取 2k 个请求,不要超过自己每秒能处理的最大请求数量就 ok,这样下来,哪怕是高峰期的时候,A 系统也绝对不会挂掉。而 MQ 每秒钟 5k 个请求进来,就 2k 个请求出去,结果就导致在高峰期(短期),可能有几十万甚至几百万的请求积压在 MQ 中(如果是实时性要求较高如社交类型的,几万积压就报警,再往上就需要扩容了)。  这个短暂的高峰期积压是 ok 的,因为高峰期过了之后,每秒钟就 50 个请求进 MQ,但是 A 系统依然会按照每秒 2k 个请求的速度在处理。所以说,只要高峰期一过,A 系统就会快速将积压的消息给解决掉。 --- ### 消息队列有什么缺点?(以kafka为例) #### 一、系统可用性降低 系统引入的外部依赖越多,越容易挂掉。本来只是 A 系统调用 BCD 三个系统的接口,但引入MQ后还需要考虑万一 MQ 挂了咋整?因为MQ 一挂,整套系统也就跟着崩溃了。 **解决方案:** 在分布式数据系统中,通常使用**分区**来提高系统的处理能力,通过**副本**(副本不是备胎,而是所有轮胎的统称)来保证数据的高可用性。Kafka 0.8 以后,提供了 HA 机制:**replica副本机制**。同 一 个 partition 可 能 会 有 多 个 replication (**对 应 `server.properties`配 置 中 的 `default.replication.factor=N`**)。这多个副本中,只有一个是 `leader`副本,而其他的都是 `follower` 副本。仅有 leader 副本可以对外提供服务。多个 follower 副本通常存放在和 leader 副本不同的 broker 中,当某台机器挂掉后,其他 follower 副本也能迅速”转正“,开始对外提供服务,从而实现**高可用性**。 **在生产环境中,副本数必须大于1。**  ##### 为什么 follower 副本不提供读服务?又是如何选举出leader ? 先普及一下概念: * **LSO**:日志文件起始数(Log Start Offset),消费起始位移。 * **HW**:高水位值 (High Watermark)。这是控制消费者可读取消息范围的重要字段。一个普通消费者只能“看到”Leader 副本上介于 Log Start Offset 和 HW(不含)之间的 所有消息。水位以上的消息是对消费者不可见的。 * **LEO**:日志末端位移(Log End Offset),代表日志文件中下一条待写入消息的offset。当leader副本收到生产者的一条消息,LEO通常会自增1,而follower副本需要从leader副本fetch到数据后,才会增加它的LEO,最后leader副本会比较自己的LEO以及满足条件的follower副本上的LEO,选取两者中较小值作为新的HW,来更新自己的HW值。 * **AR**:Assigned Replicas。AR 是主题被创建后,分区创建时被分配的副本集合,副本个数由副本因子决定。 * **ISR**:In-Sync Replicas。Kafka 中特别重要的概念,指代的是 AR 中那些**与 Leader 保持同步的副本集合**。在 AR 中的副本可能不在 ISR 中,但 **Leader 副本天然就包含在 ISR 中**。 判断副本是否应该属于 ISR的依据是:Follower 副本的 LEO 落后 Leader LEO 的时间,是否超过了 Broker 端参数 `replica.lag.time.max.ms` 值。如果超过了,副本就会被从 ISR 中移除。 设想一下要是可以随意读写每个 follower,那么就要 care **数据一致性的问题**,系统复杂度太高,很容易出问题。 Kafka 通过 HW 和 LEO 的管理来决定 Consumer 可以消费哪些数据,已经当前写入的数据。  当 leader 挂掉时,kakfa 通过 zookeeper 感知到这一情况,在 ISR 副本集合中选取新的副本成为 leader,对外提供服务。但如果ISR中原本只有leader或设置 `unclean.leader.election.enable` 参数为 true,就会在ISR之外选取出副本成为 leader,这样就会照成数据丢失问题。 当然,Kafka为了保证高可用的优越性能,还做了诸如:**partition 并发、顺序读写磁盘、page cache 压缩、高性能序列化(二进制)、内存映射、无锁 offset 管理、Java NIO 模型**等设计,但这块学术有限就不做深入研究了~ #### 二、系统复杂度提高 ##### 怎么保证消息传递的顺序性? **Kafka 在 Topic 级别本身是无序的,只有 Partition 上才有序**,所以为了保证处理顺序,可以自定义分区器,将需顺序处理的数据发送到同一个 partition。具体做法:实现 `kafka.producer.Partitioner`,并修改 `partitioner.class`为该实现类。 另一个场景是,发送者通过配置 `retries`重试次数后,如果没有设置 `max.in.flight.requests.per.connection=1`将潜在地改变的记录的顺序。因为如果两批被送到同个分区,第一失败重试,但第二成功,则在第二批中的记录可以首先出现。 #### 三、一致性问题 ##### 消息丢失问题 Kafka可能会在三个阶段丢失消息: **(1)生产者丢失消息**  生产者丢失消息与acks配置息息相关: - 0:发生网络抖动消息丢了,生产者不校验ACK自然就不知道丢了。 - 1:保证leader不丢,但是如果leader挂了,恰好选了一个没有ACK的follower,那也丢了。 - all/-1:保证leader和follower不丢,但是如果网络拥塞,没有收到ACK,**会有重复下发的问题**。 **(2)Kafka Broker丢失消息**  Kafka消息发送有两种方式,同步(sync)和异步(async),默认是**同步方式**,如果Kafka写入到mmap之后就立即 flush 然后再返回 Producer 叫同步 (sync);写入mmap之后立即返回 Producer 不调用 flush 叫异步 (async)。可通过 `producer.type`属性进行配置。下文的常用配置,都是基于同步方式下。 可能出现丢失的场景: 1. 上文的宕机Leader挂掉,ISR没候选人只能从外面选举落后的Follower 2. 如果数据已经写入系统 cache 中但是还没来得及刷入磁盘,此时突然机器宕机或者掉电那就丢了 一般我们需要指定: 1. topic 的 `replication.factor` 参数必须大于1,保证至少两个副本; 2. Kafka 的 `min.insync.replicas` 参数必须大于 1,保证至少一个Follower没有掉队; 3. producer 的 `acks` 参数为 `-1/all`,保证全部写入成功; 4. producer 的 `retries` 参数为3,(查阅资料时是建议为MAX,但可能出现无限重试导致卡死) **(3)消费者丢失消息**  普及一下流程: 消费者通过 pull 模式主动的去 kafka 集群拉取消息,与 producer 相同的是,消费者在拉取消息的时候也是找 leader 分区去拉取。多个消费者可以组成一个消费者组(consumer group),每个消费者组都有一个组 id。**同一个消费组者的消费者可以消费同一 topic 下不同分区的数据,但是不会出现多个消费者消费同一分区的数据,多出来的消费者会出现空跑状态。** 消费消息的时候主要分为两个阶段: 1. 标识消息已被消费,commit offset 坐标; 2. 处理消息。 场景一:先commit再处理消息(手动提交offset)。如果在处理消息的时候异常了,但是offset 已经提交了,这条消息对于该消费者来说就是丢失了,再也不会消费到了。不建议此逻辑。 场景二:先处理消息再commit(手动提交offset)。如果在commit之前发生异常,下次还会消费到该消息,重复消费的问题可以通过业务保证消息幂等性来解决。(异常没有往上抛,springboot会自动帮你提交offset) 在生产环境中严格做到全流程 `exactly once`其实是难的,同时也会牺牲效率和吞吐量,最佳实践是业务侧做好补偿机制,万一出现消息丢失可以兜底。 ##### 重复消费问题 生产经常遇到的,就是你有时候重启系统,看你怎么重启了,如果碰到点着急的,直接 kill 进程了,再重启。这会导致 consumer 有些消息处理了,但是没来得及提交 offset,尴尬了。重启之后,少数消息会再次消费一次。 通过业务保证消息幂等性来解决。 --- ### 生产者配置  位置:org.apache.kafka.clients.producer.ProducerConfig | 名称 | 描述 | 建议设置 | | ------------------------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------------------------ | | bootstrap.servers | Kafka集群的初始连接的主机:端口列表,仅用于初始连接以发现完整的集群成员资格 | 配置两到三个即可:<br>host1:port1,host2:port2 | | key.serializer | key序列化接口 | 按需 | | value.serializer | value序列化接口 | 按需 | | acks | 应答确认数:<br>【"0"】不进行消息接收是否成功的确认;不能保证消息是否发送成功,生成环境基本不会用。<br>【“1”】默认,当Leader接收成功时确认;只要Leader存活就可以保证不丢失,保证了吞吐量。(在follower复制前挂掉将导致数据丢失)<br>【"all"/"-1"】Leader和Follower都接收成功时确认;可以最大限度保证消息不丢失,但是吞吐量低。 | "1" | | buffer.memory | 内存缓冲区大小,默认【33554432】32M | 应大致对应于生产者将使用的总内存 | | batch.size | 批量发送消息的最大值,默认【16384】16K | 根据实际业务量,可以和压缩配合使用,适合下调不适合上调 | | max.request.size | 请求的最大大小,默认【1048576】字节,1M,必须小于 `socket.request.max.bytes`10M。 | 消息大小需要上调时,配合 `message.max.bytes`使用 | | compression.type | 数据压缩类型,压缩完整批次,默认【none】,支持【gzip/snappy/lz4】 | 少量多批次 | | retries | 重试如果次数,默认【0】<br>如果没有设置 `max.in.flight.requests.per.connection=1`将潜在地改变的记录的顺序。因为如果两批被送到同个分区,第一失败重试,但第二成功,则在第二批中的记录可以首先出现。 | 3 | | max.in.flight.requests.per.connection | 客户端在单个连接上能够发送的未响应请求的个数。默认【5】,【1】表示kafka broker在响应请求之前client不能再向同一个broker发送请求。 | 看实际业务需要,1为了避免消息乱序,但严重影响吞吐量 | | connections.max.idle.ms | 指定的毫秒数后关闭空闲连接,默认【540000】,【-1】为假长链接一个是Socket的探活机制仍然生效,一个是会照成一半的僵尸链接。(productor初始会创建2*broker数的TCP链接) | 180000 | | metadata.max.age.ms | 强行刷新元数据时间,默认【300000】 | 240000 | | linger.ms | 最大延迟时间,默认【0】无延迟,高吞吐场景可以适当延迟以用于批处理 | 高吞吐下500 | | request.timeout.ms | 请求超时时间,默认【30000】,如果重试次数耗尽将请求失败。 | | ### 消费者配置 #### Kafka Rebalance rebalance 本质上是一种协议,规定了一个 consumer group 下的所有 consumer 如何达成一致来分配订阅 topic 的每个分区。比如某个 group 下有 20 个 consumer,它订阅了一个具有 100 个分区的 topic。正常情况下,Kafka 平均会为每个 consumer 分配 5 个分区。这个分配的过程就叫 rebalance。 如果同一 group 下 consumer 的数量大于 part 的数量,则多余的 consumer 将处于无用状态,不消费数据。 **什么时候 rebalance?** 这也是经常被提及的一个问题。rebalance 的触发条件有三种: - consumer发生变更(新 consumer 加入组、已有 consumer 主动离开组或已有 consumer 崩溃了) - 订阅Topic数发生变更 - 订阅Topic的Partition数发生变更 - Kafka Consumer 的常见配置?broker, 网络和拉取参数,心跳参数 - Consumer 什么时候会被踢出集群?奔溃,网络异常,处理时间过长提交位移超时 位置:org.apache.kafka.clients.consumer.ConsumerConfig | 名称 | 描述 | 建议设置 | | ------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------------------- | | bootstrap.servers | Kafka集群的初始连接的主机:端口列表,仅用于初始连接以发现完整的集群成员资格 | 配置两到三个即可:<br>host1:port1,host2:port2 | | key.deserializer | key反序列化接口 | 跟producter保持一致 | | value.deserializer | value反序列化接口 | 跟producter保持一致 | | fetch.min.bytes | 服务器应为获取请求返回的最小数据量。如果可用数据不足,则请求将在响应前等待累积数据。默认【1】,调大将以一些额外的延迟为代价提高服务器吞吐量。 | 按需 | | fetch.max.wait.ms | 默认【500】。如果没有足够的数据立即满足 `fetch.min.bytes`给出的要求,则等待该时间后重新拉取。 | 1000 | | max.partition.fetch.bytes | 每个分区能拉取的最大数据量,默认【1048576】。但如果分区的第一条消息就大于此限制,扔会拉取成功。可通过message.max.bytes(broker配置)或max.message.bytes(主题配置)指定最大消息大小。 | 默认 | | session.timeout.ms | 拉取session超时设置,默认【10000】 | 15000 | | enable.auto.commit | 是否自动提交偏移量,默认【true】 | 通用true | | auto.offset.reset | offset偏移量规则设置:<br/>(1)、【earliest】: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费<br/> (2)、默认【latest】: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据<br/>(3)、【none】:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 | latest | | auto.commit.interval | 自动提交频率,默认【5000】 | 5000 | | max.poll.records | 最大拉取数量,默认【500】 | 500,多批次下300 | | max.poll.interval.ms | 拉取的最大延迟,默认【300000】。控制轮询调用之前,消费者会主动离开组的最大时间。 | 小批量快速响应下30000 | | request.timeout.ms | 请求超时时间,默认【305000】,如果重试次数耗尽将请求失败。<br>代表一个JoinGroup请求可以在服务器上阻塞而消费是平衡的最大时间,因此必须始终大于 `max.poll.interval.ms`。 | 小批量快速响应下35000 | --- ### Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点? | 特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka | | ------------------------ | ------------------------------------- | -------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------- | | 单机吞吐量 | 万级,比 RocketMQ、Kafka 低一个数量级 | 同 ActiveMQ | 10 万级,支撑高吞吐 | 10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景 | | topic 数量对吞吐量的影响 | | | topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topic | topic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源 | | 时效性 | ms 级 | 微秒级,这是 RabbitMQ 的一大特点,延迟最低 | ms 级 | 延迟在 ms 级以内 | | 可用性 | 高,基于主从架构实现高可用 | 同 ActiveMQ | 非常高,分布式架构 | 非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用 | | 消息可靠性 | 有较低的概率丢失数据 | 基本不丢 | 经过参数优化配置,可以做到 0 丢失 | 同 RocketMQ | | 功能支持 | MQ 领域的功能极其完备 | 基于 erlang 开发,并发能力很强,性能极好,延时很低 | MQ 功能较为完善,还是分布式的,扩展性好 | 功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用 | ### 推荐文档 [阿里文档:发布者最佳实践](https://help.aliyun.com/document_detail/68165.html) [阿里文档:订阅者最佳实践](https://help.aliyun.com/document_detail/68166.html) Last modification:December 20, 2022 © Allow specification reprint Like 0 喵ฅฅ
One comment
很棒