Loading... **kafka目录结构** ```xml kafka |-- config |-- consumer |-- message |-- producer ``` ## 一、生产消费配置 ### 基于String的生产消费(推荐) **消息体** ```java @Data public class MessageDemo { public static final String TOPIC = "message_demo"; private String accountUuid; private String accountName; private Integer sexType; private Long createTime; public static String getKey(MessageDemo messageDemo){ return messageDemo.accountUuid; } public static String getValue(MessageDemo messageDemo){ return JSONUtil.toJsonStr(messageDemo); } } ``` **生产者配置** ```java @Configuration @EnableKafka public class KafkaProducerConfig { @Value(value = "${kafka.bootstrapServers}") private String bootstrapServers; @Bean public KafkaTemplate<String, String> kafkaTemplate() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // 默认1,0-不应答、1-leader 应答、all-所有 leader 和 follower 应答。 props.put(ProducerConfig.ACKS_CONFIG, "1"); // 失败重试次数,默认无限大 props.put(ProducerConfig.RETRIES_CONFIG, 3); // 批量发送消息的最大值,默认16KB props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 生产者内存缓冲区大小,默认32MB props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 批处理最大延迟时间,默认0ms props.put(ProducerConfig.LINGER_MS_CONFIG, 5000); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(props); return new KafkaTemplate<>(factory); } } ``` **生产者代码** ```java /** * 消息演示生产商 * * 【建议使用】 * key值不强制,可以为空;value为jsonString */ @Component public class MessageDemoProducer { @Resource private KafkaTemplate<String, String> kafkaTemplate; public ListenableFuture<SendResult<String, String>> asyncSend(MessageDemo msg) { // 异步发送消息 return kafkaTemplate.send(MessageDemo.TOPIC, MessageDemo.getKey(msg), MessageDemo.getValue(msg)); } } ``` **生产者发送示例** ```java @Test public void testProducer() throws InterruptedException { for (int i = 0; i < 3; i++) { MessageDemo msg = new MessageDemo(); msg.setAccountUuid("uuid" + i); msg.setAccountName("林哲公主" + i + "号"); msg.setSexType(1); msg.setCreateTime(DateUtil.currentSeconds()); messageDemoProducer.asyncSend(msg).addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onFailure(Throwable e) { logger.error("messageDemoProducer发送异常,msg={},e={}", msg, e); } @Override public void onSuccess(SendResult<String, String> result) { logger.info("messageDemoProducer发送成功,msg={},result={}", msg, result); } }); } // 阻塞等待,保证消费 new CountDownLatch(1).await(); } ``` **消费者配置** ```java @Slf4j @Configuration public class KafkaConsumerConfig { @Value(value = "${kafka.bootstrapServers}") private String bootstrapServers; @Bean("kafkaListenerContainerFactory") KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); // 设置消费者工厂 factory.setConsumerFactory(consumerFactory()); // 消费者组中线程数量 factory.setConcurrency(3); // 拉取超时时间 factory.getContainerProperties().setPollTimeout(3000); // 当使用批量监听器时需要设置为true factory.setBatchListener(true); // 消费监听的主题不存在时不报错 factory.setMissingTopicsFatal(false); return factory; } public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); // Kafka地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); //配置默认分组,这里没有配置+在监听的地方没有设置groupId,多个服务会出现收到相同消息情况 // props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); // 是否自动提交offset偏移量(默认true) props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 自动提交的频率(ms) props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // Session超时设置 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); // offset偏移量规则设置: // (1)、earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 // (2)、latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 // (3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 每个批次最大数量 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); // 最小拉取字节数 props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100); // 阻塞拉取的最大等待时长 props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); return props; } } ``` **消费者代码** ```java /** * 基于String序列化的消费者 */ @Component public class MessageDemoConsumer { private final Logger logger = LoggerFactory.getLogger(getClass()); /** * 单数据消费 * 也支持批量消费,如果配置开启批量需要拼接"[]" * * @param messageDemoString 消息演示 */ @KafkaListener(topics = MessageDemo.TOPIC, groupId = "message-demo-consumer-group-1", containerFactory = "kafkaListenerContainerFactory") public void onMessage(String messageDemoString) { String messageDemoArrayString = "[" + messageDemoString + "]"; List<MessageDemo> messageDemoList = JSON.parseArray(messageDemoArrayString, MessageDemo.class); messageDemoList.forEach(System.out::println); } /** * 批量消费 * * @param messageDemoStringList 消息列表演示 */ @KafkaListener(topics = MessageDemo.TOPIC, groupId = "message-demo-consumer-group-2", containerFactory = "kafkaListenerContainerFactory") public void onMessage2(List<String> messageDemoStringList) { messageDemoStringList.forEach(item -> System.out.println(JSON.parseObject(item, MessageDemo.class))); } /** * 批量消费 * ConsumerRecord可以获取更多更多信息,例如说消息的所属队列、创建时间等等属性 * 不过消息的内容(value)就需要自己去反序列化。一般情况下不建议使用。 * * @param records 记录 */ @KafkaListener(topics = MessageDemo.TOPIC, groupId = "message-demo-consumer-group-3", containerFactory = "kafkaListenerContainerFactory") public void onMessage3(List<ConsumerRecord<String, String>> records) { for (ConsumerRecord<String, String> record : records) { MessageDemo messageDemo = JSON.parseObject(record.value(), MessageDemo.class); System.out.println(messageDemo); } } } ``` 在学习中如果对于消费的时候是怎么把配置串联起来的有疑问,可以看下这篇文章: [Spring Kafka:@KafkaListener 单条或批量处理消息](https://mp.weixin.qq.com/s/ZG21a6OulOcI_8d_Fl2A-g) ### 基于Json的生产消费(不推荐) **消息体** ```java @Data public class MessageJsonDemo { public static final String TOPIC = "message_json_demo"; private String accountUuid; private String accountName; private Integer sexType; private Long createTime; public static String getKey(MessageJsonDemo messageDemo){ return messageDemo.accountUuid; } } ``` **生产者配置** ```java @Configuration @EnableKafka public class KafkaJsonProducerConfig { @Value(value = "${kafka.bootstrapServers}") private String bootstrapServers; @Bean public KafkaTemplate<String, MessageJsonDemo> kafkaJsonTemplate() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // 默认1,0-不应答、1-leader 应答、all-所有 leader 和 follower 应答。 props.put(ProducerConfig.ACKS_CONFIG, "1"); // 失败重试次数,默认无限大 props.put(ProducerConfig.RETRIES_CONFIG, 3); // 批量发送消息的最大值,默认16KB props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 生产者内存缓冲区大小,默认32MB props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 批处理最大延迟时间,默认0ms props.put(ProducerConfig.LINGER_MS_CONFIG, 5000); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); DefaultKafkaProducerFactory<String, MessageJsonDemo> factory = new DefaultKafkaProducerFactory<>(props); return new KafkaTemplate<>(factory); } } ``` **生产者代码** ```java /** * json消息演示生产商 * * 【不建议使用】 * 需要生产者和消费者一起约定消息体,一旦不一致反序列化时就会报错,还需要专门为每个消息体写生产配置,复用性差 */ @Component public class MessageDemoJsonProducer { @Resource private KafkaTemplate<String, MessageJsonDemo> kafkaJsonTemplate; public ListenableFuture<SendResult<String, MessageJsonDemo>> asyncSend(MessageJsonDemo msg) { // 异步发送消息 return kafkaJsonTemplate.send(MessageJsonDemo.TOPIC, MessageJsonDemo.getKey(msg), msg); } } ``` **生产者发送示例** ```java @Test public void testJsonProducer() throws InterruptedException { for (int i = 0; i < 5; i++) { MessageJsonDemo msg = new MessageJsonDemo(); msg.setAccountUuid("uuid" + i); msg.setAccountName("林哲" + i + "号"); msg.setSexType(1); msg.setCreateTime(DateUtil.currentSeconds()); messageDemoJsonProducer.asyncSend(msg).addCallback(new ListenableFutureCallback<SendResult<String, MessageJsonDemo>>() { @Override public void onFailure(Throwable e) { logger.error("messageDemoJsonProducer发送异常,msg={},e={}", msg, e); } @Override public void onSuccess(SendResult<String, MessageJsonDemo> result) { logger.info("messageDemoJsonProducer发送成功,msg={},result={}", msg, result); } }); } // 阻塞等待,保证消费 new CountDownLatch(1).await(); } ``` **消费者配置** ```java @Slf4j @Configuration public class KafkaJsonConsumerConfig { @Value(value = "${kafka.bootstrapServers}") private String bootstrapServers; @Bean("kafkaListenerJsonContainerFactory") KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, MessageJsonDemo>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, MessageJsonDemo> factory = new ConcurrentKafkaListenerContainerFactory<>(); // 设置消费者工厂 factory.setConsumerFactory(consumerFactory()); // 消费者组中线程数量 factory.setConcurrency(3); // 拉取超时时间 factory.getContainerProperties().setPollTimeout(3000); // 当使用批量监听器时需要设置为true factory.setBatchListener(true); // 消费监听的主题不存在时不报错 factory.setMissingTopicsFatal(false); return factory; } public ConsumerFactory<String, MessageJsonDemo> consumerFactory() { // 需要为反序列化添加信任消息类 JsonDeserializer<MessageJsonDemo> jsonDeserializer = new JsonDeserializer<>(); jsonDeserializer.addTrustedPackages("*"); return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jsonDeserializer); } public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); // Kafka地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); //配置默认分组,这里没有配置+在监听的地方没有设置groupId,多个服务会出现收到相同消息情况 // props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); // 是否自动提交offset偏移量(默认true) props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 自动提交的频率(ms) props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // Session超时设置 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); // offset偏移量规则设置: // (1)、earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 // (2)、latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 // (3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 每个批次最大数量 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); // 最小拉取字节数 props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100); // 阻塞拉取的最大等待时长 props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1000); return props; } } ``` **消费者代码** ```java /** * 基于Json序列化的消费者 * * 【不建议使用】 * 需要生产者和消费者一起约定消息体,一旦不一致反序列化时就会报错,还需要专门为每个消息体写消费配置,复用性差 * * 如果消费的序列化格式不一致,会提示: * SerializationException: Error deserializing key/value for partition xxx at offset x. If needed, please seek past the record to continue consumption. * 需要在消费配置的反序列实例中添加信任包名 */ @Component public class MessageDemoJsonConsumer { private final Logger logger = LoggerFactory.getLogger(getClass()); /** * 单数据消费,支持批量 * * @param messageJsonDemo json消息演示 */ @KafkaListener(topics = MessageJsonDemo.TOPIC, groupId = "message-demo-json-consumer-group-1", containerFactory = "kafkaListenerJsonContainerFactory") public void onMessage(MessageJsonDemo messageJsonDemo) { System.out.println(messageJsonDemo); } /** * 批量消费 * * @param messageJsonDemoList json消息列表演示 */ @KafkaListener(topics = MessageJsonDemo.TOPIC, groupId = "message-demo-json-consumer-group-2", containerFactory = "kafkaListenerJsonContainerFactory") public void onMessage2(List<MessageJsonDemo> messageJsonDemoList) { messageJsonDemoList.forEach(System.out::println); } /** * 批量消费 * ConsumerRecord可以获取更多更多信息,例如说消息的所属队列、创建时间等等属性 * 不过消息的内容(value)就需要自己去反序列化。一般情况下不建议使用。 * * @param records 记录 */ @KafkaListener(topics = MessageJsonDemo.TOPIC, groupId = "message-demo-json-consumer-group-3", containerFactory = "kafkaListenerJsonContainerFactory") public void onMessage3(List<ConsumerRecord<String, MessageJsonDemo>> records) { for (ConsumerRecord<String, MessageJsonDemo> record : records) { System.out.println(record.value()); } } } ``` ### 基于Avro的生产消费 Avro 序列化相比JSON更快,序列化的数据也更小,并且支持实时编译,不用像JSON那样需要预先定义好数据格式,这样就可以跟String类型一样共用一套配置。 需要通过 Kafka Schema Registry 服务去管理这些 topic 的schema,它还对外提供了多个 restful 接口,用于存储和查找。 我们向 kafka 发送数据时,需要先向 Schema Registry 注册 schema,然后序列化发送到 kafka 里。当我们需要从 kafka 消费数据时,也需要先从 Schema Registry 获取 schema,然后才能解析数据。 一般只有大数据涉及到标签这种大数据量的才需要用到,正常业务String够用了。  **生产配置** ```java // 配置相关 Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxx"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); // 指定Value的序列化类,KafkaAvroSerializer props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class); // 指定 registry 服务的地址 // 如果 Schema Registry 启动了高可用,那么这儿的配置值可以是多个服务地址,以逗号隔开 props.put("schema.registry.url", registryHost); KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(props); // 定义消息体 String key = "Alyssa key"; Schema schema = new Schema.Parser().parse(new File(schameFilename)); GenericRecord avroRecord = new GenericData.Record(schema); avroRecord.put("name", "Alyssa"); avroRecord.put("favorite_number", 256); // 发送消息 ProducerRecord<String, GenericRecord> record = new ProducerRecord<>(topic, key, avroRecord); producer.send(record); producer.flush(); producer.close(); ``` **消费配置** ```java // 消费者组中线程数量可以适当拉大(根据系统) factory.setConcurrency(8); // 消费相关配置 public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); // Kafka地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); //配置默认分组,这里没有配置+在监听的地方没有设置groupId,多个服务会出现收到相同消息情况 props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); // 是否自动提交offset偏移量(默认true) props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); // 自动提交的频率(ms) props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // Session超时设置 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); // offset偏移量规则设置: // (1)、earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 // (2)、latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 // (3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 // props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); //每个批次数量 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000); //avro props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class.getName()); props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl); props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, false); return props; } // 消费代码 @KafkaListener(topics = {"xxx"}, containerFactory="avroKafkaListenerContainerFactory") public void peipeiKafkaListener(List<GenericData.Record> records){ // record.toString()后转成相应的实体类 ... } ``` ## 二、生产消费基本流程  ## 三、消息重试 > **死信队列** > > 当一条消息初次消费失败,Spring-Kafka 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,Spring-Kafka 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。 > > 这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),将存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。后续可以通过对死信队列中的消息进行重发,来使得消费者实例再次进行消费。 > > 消费重试和死信队列,是由 Spring-Kafka 所封装提供的。 ```java /** * kafka配置 异常处理 */ @Configuration public class KafkaConfiguration { private final Logger logger = LoggerFactory.getLogger(getClass()); /** * 单条消费失败的消费重试处理 * 与kafkaJsonTemplate冲突,只处理KafkaTemplate<String, String> * * @param operations 操作 * @return {@link ErrorHandler} */ @Bean("kafkaErrorHandler") @Primary public ErrorHandler kafkaErrorHandler(@Qualifier("kafkaTemplate") KafkaOperations<?, ?> operations) { logger.info("kafkaErrorHandler begin to Handle"); // <1> 创建 DeadLetterPublishingRecoverer 对象,template方法已过时所以采用最新的父类operations // 若超过N次仍失败,则发送到死信队列:原TOPIC.DLT // 死信主题必须至少具有与原始主题一样多的分区 ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(operations); // <2> 创建 FixedBackOff 对象,设置重试间隔 10秒 次数为 3次 // 另一个实现类ExponentialBackOff为指数递增的间隔时间 BackOff backOff = new FixedBackOff(10 * 1000L, 5L); // <3> 创建 SeekToCurrentErrorHandler 对象,处理异常 return new SeekToCurrentErrorHandler(recoverer, backOff); } /** * 批量消费失败的消费重试处理 * 并没有设置 DeadLetterPublishingRecoverer 对象,SeekToCurrentBatchErrorHandler 暂时不支持死信队列的机制 * * @return {@link BatchErrorHandler} */ @Bean @Primary public BatchErrorHandler kafkaBatchErrorHandler() { // 创建 SeekToCurrentBatchErrorHandler 对象 SeekToCurrentBatchErrorHandler batchErrorHandler = new SeekToCurrentBatchErrorHandler(); // 创建 FixedBackOff 对象 BackOff backOff = new FixedBackOff(10 * 1000L, 3L); batchErrorHandler.setBackOff(backOff); // 返回 return batchErrorHandler; } } ``` 吐槽一下这套网上大多数套用的配置,和实际生产偏差太大。生产中项目消费者可能存在各种类型String、Json、Avro等等,一般都是通过 `@Configuration`在代码中去配置kafka。 这就相当于我们需要在对应的消费者配置的监听工厂中,获取异常处理的Bean,再放到factory的errorHandler中,有点多此一举。 (有点感叹,强如芋道源码、xxx架构师分享的都是基础的配置使用,很少人能讲清实际场景中的应用与通俗的原理,原本我只是打算找个最详细的教程学习而已) ```java @Configuration public class KafkaConsumerConfig { @Value(value = "${kafka.bootstrapServers}") private String bootstrapServers; @Bean("kafkaListenerContainerFactory") KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(@Qualifier("kafkaErrorHandler") ErrorHandler errorHandler) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); .... factory.setErrorHandler(errorHandler); } } ``` 还有一点要注意的是,当存在多个消费者配置时不指定Bean名称(尤其配置不到是上面的模板),会导致ErrorHandler没有被配置进去。此时消费报错时会默认重试 `10`次后,将消息丢弃。具体流程如下: `KafkaListenerContainerFactory`的实现类 `AbstractKafkaListenerContainerFactory`配置完后,会先执行 `afterPropertiesSet()`方法区分配置的为单次or批量消费的异常处理器,再到 `initializeContainer(C instance, KafkaListenerEndpoint endpoint)`中设置配置的异常处理器。但因为配置没有被成功路由到,所以 `errorHandler`为 `null` 再来到消费者这边,`KafkaMessgaeListenerContainer`的 `doStart()`中会执行下面逻辑,配置默认重试10。  ### 单次消费:重试+死信 ```java /** * 消费者通用配置 * */ @Slf4j @Configuration public class KafkaConsumerConfig { @Value(value = "${kafka.bootstrapServers}") private String bootstrapServers; @Bean("kafkaListenerContainerFactory") KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(@Qualifier("kafkaTemplate") KafkaOperations<?, ?> operations) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); // 设置消费者工厂 factory.setConsumerFactory(consumerFactory()); // 消费者组中线程数量 factory.setConcurrency(1); // 拉取超时时间 factory.getContainerProperties().setPollTimeout(3000); // 当使用批量监听器时需要设置为true factory.setBatchListener(false); // 消费监听的主题不存在时不报错 factory.setMissingTopicsFatal(false); // <1> 创建 DeadLetterPublishingRecoverer 对象,template方法已过时所以采用最新的父类operations // 若超过N次仍失败,则发送到死信队列:原TOPIC.DLT // 死信主题必须至少具有与原始主题一样多的分区 ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(operations); // <2> 创建 FixedBackOff 对象,设置重试间隔 10秒 次数为 3次 // 另一个实现类ExponentialBackOff为指数递增的间隔时间 BackOff backOff = new FixedBackOff(10 * 1000L, 3L); // <3> 创建 SeekToCurrentErrorHandler 对象,处理异常 SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler(recoverer, backOff); factory.setErrorHandler(seekToCurrentErrorHandler); return factory; } public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); // Kafka地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); //配置默认分组,这里没有配置+在监听的地方没有设置groupId,多个服务会出现收到相同消息情况 // props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); // 是否自动提交offset偏移量(默认true) props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 自动提交的频率(ms) props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // Session超时设置 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); // offset偏移量规则设置: // (1)、earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 // (2)、latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 // (3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 阻塞拉取的最大等待时长 props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); return props; } } ``` * 在消息消费失败时,SeekToCurrentErrorHandler 会将 调用 Kafka Consumer 的 `#seek(TopicPartition partition, long offset)` 方法,将 Consumer 对于该消息对应的 TopicPartition 分区的**本地** 进度设置成**该消息的位置** 。这样,Consumer 在下次从 Kafka Broker 拉取消息的时候,又能重新拉取到这条消费失败的消息,并且是第一条。 * 同时,Spring-Kafka 使用 `FailedRecordTracker` 对每个 Topic 的每个 TopicPartition 消费失败次数进行**计数**,当 `backOff`达到 `-1`时表示不再重试。又因为我们定义的是 `DeadLetterPublishingRecoverer`,所以会将消息加到TOPIC.DLT中。 **触发DEMO** ```java @KafkaListener(topics = MessageDemo.TOPIC, groupId = "message-demo-consumer-group-4", containerFactory = "kafkaListenerContainerFactory") public void failedMessage(String message) { logger.info("[failedMessage][线程编号:{} 消息内容:{}]", Thread.currentThread().getId(), message); throw new ServiceException("故意抛出异常,验证消息重试"); } ``` ```java @Test public void testProducer() throws InterruptedException { for (int i = 0; i < 1; i++) { MessageDemo msg = new MessageDemo(); msg.setAccountUuid("uuid" + i); msg.setAccountName("哲妹" + i + "号"); msg.setSexType(1); msg.setCreateTime(DateUtil.currentSeconds()); messageDemoProducer.asyncSend(msg).addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onFailure(Throwable e) { logger.error("messageDemoProducer发送异常,msg={},e={}", msg, e); } @Override public void onSuccess(SendResult<String, String> result) { logger.info("messageDemoProducer发送成功,msg={},result={}", msg, result); } }); } // 阻塞等待,保证消费 new CountDownLatch(1).await(); } ```  **注意:** FailedRecordTracker 提供的计数是**客户端** 级别的,重启 JVM 应用后,计数是会丢失的。所以,如果想要计数进行持久化,需要自己重新实现下 FailedRecordTracker 类,通过 ZooKeeper 存储计数。 RocketMQ 提供的消费重试的计数,目前是**服务端** 级别,已经进行持久化。 ### 批量消费:重试+死信 首先纠正一点,本次使用的kafka 2.6版本是支持批量消费的死信的,但需要用其它办法,缺点是没办法抛出异常,可以打error日志。 网上的大多数版本都是2.3、2.4的老版本,2.6的 `SeekToCurrentErrorHandler`已经没办法适配批量重置了,换句话说,目前还在公众号知名网站上流传的配置已经过时了,真搞不懂他们为什么还要发表出来,难道自己没试过吗,而且也都不标明版本,无语子。最后还是在kafka开源作者在项目的一篇issue回复上看到的,他建议2.6后使用 `RetryingBatchErrorHandler`替代 `SeekToCurrentErrorHandler`,但是没有给出后者不生效的原因,全网也没有解决的文章。 ```java /** * 消费者通用配置 */ @Slf4j @Configuration public class KafkaConsumerConfig { @Value(value = "${kafka.bootstrapServers}") private String bootstrapServers; @Bean("kafkaListenerContainerFactory") KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(@Qualifier("kafkaTemplate") KafkaOperations<?, ?> operations) { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); // 设置消费者工厂 factory.setConsumerFactory(consumerFactory()); // 消费者组中线程数量 factory.setConcurrency(3); // 拉取超时时间 factory.getContainerProperties().setPollTimeout(3000); // 当使用批量监听器时需要设置为true factory.setBatchListener(true); // 消费监听的主题不存在时不报错 factory.setMissingTopicsFatal(false); // <1> 创建 DeadLetterPublishingRecoverer 对象,template方法已过时所以采用最新的父类operations // 若超过N次仍失败,则发送到死信队列:原TOPIC.DLT // 死信主题必须至少具有与原始主题一样多的分区 // 自定义 destinationResolver 对象,打印错误日志 ConsumerRecordRecoverer recoverer = new DeadLetterPublishingRecoverer(operations, (cr, e) -> { log.error(e.toString()); return new TopicPartition(cr.topic() + ".DLT", cr.partition()); }); // <2> 创建 FixedBackOff 对象,设置重试间隔 10秒 次数为 3次 // 另一个实现类ExponentialBackOff为指数递增的间隔时间 BackOff backOff = new FixedBackOff(10 * 1000L, 3L); // <3> 创建 retryingBatchErrorHandler 对象,处理异常 RetryingBatchErrorHandler retryingBatchErrorHandler = new RetryingBatchErrorHandler(backOff, recoverer); factory.setBatchErrorHandler(retryingBatchErrorHandler); return factory; } public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); // Kafka地址 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); //配置默认分组,这里没有配置+在监听的地方没有设置groupId,多个服务会出现收到相同消息情况 // props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); // 是否自动提交offset偏移量(默认true) props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 自动提交的频率(ms) props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); // Session超时设置 props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000"); // offset偏移量规则设置: // (1)、earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 // (2)、latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 // (3)、none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // 每个批次最大数量 props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); // 最小拉取字节数 props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100); // 阻塞拉取的最大等待时长 props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); return props; } } ``` 需要注意这种写法与单次消费不同,它并不会抛出异常,因为 `RetryingBatchErrorHandler`只是捕获了异常,调用 `BackOff`重试,所以就需要我们在上游 `DeadLetterPublishingRecoverer`中自定义 `destinationResolver`对象,打印错误日志。 ### 其它思路 这种写法也不错[# ConsumerAwareErrorHandler异常处理器](https://www.jianshu.com/p/cded9e69a13f) Last modification:August 22, 2022 © Allow specification reprint Like 2 喵ฅฅ
One comment
1