你了解kafka的,请用面试者的方式告诉我spring boot和它结合使用,获取消息有哪些模式? 以及处理消息失败的重试策略。

Spring Boot + Kafka 面试深度解析


一、Kafka 消息消费的 4 种核心模式


1. 手动提交模式(Manual Acknowledgement)


```java

@Component

public class ManualAckConsumer {

   

   // 核心配置:关闭自动提交,手动控制偏移量

   @KafkaListener(

       topics = "${kafka.topic.manual}",

       containerFactory = "manualAckContainerFactory"

   )

   public void onMessage(

           ConsumerRecord<String, String> record,

           Acknowledgment ack) {  // 关键:注入Acknowledgment

       

       try {

           // 1. 业务处理

           processBusiness(record.value());

           

           // 2. 成功后才手动提交

           ack.acknowledge();

           

           log.info("消息处理成功,已提交偏移量:{}", record.offset());

           

       } catch (Exception e) {

           log.error("消息处理失败,不提交偏移量:{}", record.offset(), e);

           // 不调用acknowledge(),消息会被重新消费

       }

   }

   

   @Bean

   public ConcurrentKafkaListenerContainerFactory<String, String>

           manualAckContainerFactory() {

       

       ConcurrentKafkaListenerContainerFactory<String, String> factory =

           new ConcurrentKafkaListenerContainerFactory<>();

       factory.setConsumerFactory(consumerFactory());

       

       // 关键配置

       factory.getContainerProperties().setAckMode(

           ContainerProperties.AckMode.MANUAL_IMMEDIATE  // 手动立即提交

       );

       

       // 设置并发消费者数量

       factory.setConcurrency(3);

       

       return factory;

   }

}

```


面试回答要点:


· 适用场景:金融交易、订单处理等需要精确一次语义的场景

· 优点:完全控制提交时机,避免消息丢失

· 缺点:需要手动管理,复杂度高

· 提交方式:

 · MANUAL:批量提交(推荐)

 · MANUAL_IMMEDIATE:单条立即提交(性能较差)


2. 批量消费模式(Batch Processing)


```java

@Component

public class BatchConsumer {

   

   // 核心:一次性消费一批消息

   @KafkaListener(

       topics = "${kafka.topic.batch}",

       containerFactory = "batchContainerFactory"

   )

   public void onMessages(

           List<ConsumerRecord<String, String>> records) {  // 关键:List接收

       

       log.info("收到一批消息,数量:{}", records.size());

       

       // 批量处理逻辑

       List<CompletableFuture<Void>> futures = records.stream()

           .map(record -> processAsync(record))

           .collect(Collectors.toList());

       

       // 等待所有处理完成

       CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))

           .join();

       

       log.info("批次处理完成");

   }

   

   @Bean

   public ConcurrentKafkaListenerContainerFactory<String, String>

           batchContainerFactory() {

       

       ConcurrentKafkaListenerContainerFactory<String, String> factory =

           new ConcurrentKafkaListenerContainerFactory<>();

       factory.setConsumerFactory(consumerFactory());

       

       // 关键配置:开启批量模式

       factory.setBatchListener(true);

       

       // 批量配置

       factory.getContainerProperties().setPollTimeout(3000);

       factory.getContainerProperties().setIdleBetweenPolls(1000);

       

       // 控制批量大小

       factory.getContainerProperties().setMaxPollRecords(100);

       

       return factory;

   }

}

```


面试回答要点:


· 适用场景:日志处理、数据同步、ETL等吞吐量大的场景

· 性能优势:减少网络IO,提高吞吐量

· 配置要点:

 · setBatchListener(true) 开启批量模式

 · setMaxPollRecords() 控制每批最大数量

 · setPollTimeout() 设置拉取超时时间


3. 并发消费模式(Concurrent Consumption)


```java

@Component

public class ConcurrentConsumer {

   

   @KafkaListener(

       topics = "${kafka.topic.concurrent}",

       containerFactory = "concurrentContainerFactory",

       concurrency = "3"  // 关键:并发消费者数量

   )

   public void onMessage(String message) {

       log.info("线程 {} 处理消息:{}",

           Thread.currentThread().getName(), message);

   }

   

   @Bean  

   public ConcurrentKafkaListenerContainerFactory<String, String>

           concurrentContainerFactory() {

       

       ConcurrentKafkaListenerContainerFactory<String, String> factory =

           new ConcurrentKafkaListenerContainerFactory<>();

       factory.setConsumerFactory(consumerFactory());

       

       // 设置并发:每个分区一个消费者

       int partitions = getTopicPartitions("concurrent-topic");

       factory.setConcurrency(partitions);  // 最佳实践:与分区数一致

       

       // 优化配置

       factory.getContainerProperties().setPollTimeout(1000);

       factory.getContainerProperties().setAckTime(1000);

       

       return factory;

   }

}

```


面试回答要点:


· 并发策略:

 · 一个分区只能被一个消费者消费

 · 消费者数量 ≤ 分区数量(否则有消费者闲置)

· 负载均衡:Kafka自动分配分区给消费者

· 动态扩缩容:消费者组支持动态增加/减少消费者


4. 消息过滤模式(Message Filtering)


```java

@Component

public class FilteringConsumer {

   

   @Bean

   public RecordFilterStrategy<String, String> recordFilterStrategy() {

       return new RecordFilterStrategy<String, String>() {

           @Override

           public boolean filter(ConsumerRecord<String, String> record) {

               // 过滤掉特定的消息

               return record.value().contains("ignore") ||

                      record.key() == null;

           }

       };

   }

   

   @KafkaListener(

       topics = "${kafka.topic.filter}",

       containerFactory = "filteringContainerFactory"

   )

   public void onFilteredMessage(String message) {

       // 只会收到未被过滤的消息

       log.info("收到有效消息:{}", message);

   }

   

   @Bean

   public ConcurrentKafkaListenerContainerFactory<String, String>

           filteringContainerFactory() {

       

       ConcurrentKafkaListenerContainerFactory<String, String> factory =

           new ConcurrentKafkaListenerContainerFactory<>();

       factory.setConsumerFactory(consumerFactory());

       

       // 设置过滤器

       factory.setRecordFilterStrategy(recordFilterStrategy());

       

       // 被过滤的消息会被确认(不抛给业务层)

       factory.setAckDiscarded(true);

       

       return factory;

   }

}

```


二、消息消费失败的重试策略


1. Spring Retry 重试机制(推荐)


```java

@Component

public class RetryConsumer {

   

   @KafkaListener(

       topics = "${kafka.topic.retry}",

       containerFactory = "retryContainerFactory"

   )

   @Retryable(  // 方法级别重试

       value = {BusinessException.class, NetworkException.class},

       maxAttempts = 3,

       backoff = @Backoff(delay = 1000, multiplier = 2.0)

   )

   public void onMessageWithRetry(String message) {

       log.info("尝试处理消息:{}", message);

       processWithPotentialFailure(message);

   }

   

   @Recover  // 所有重试失败后的兜底方法

   public void recover(BusinessException e, String message) {

       log.error("消息处理失败,进入恢复逻辑:{}", message, e);

       sendToDeadLetterQueue(message);

   }

   

   @Bean

   public ConcurrentKafkaListenerContainerFactory<String, String>

           retryContainerFactory() {

       

       ConcurrentKafkaListenerContainerFactory<String, String> factory =

           new ConcurrentKafkaListenerContainerFactory<>();

       factory.setConsumerFactory(consumerFactory());

       

       // 配置重试模板

       RetryTemplate retryTemplate = new RetryTemplate();

       

       // 重试策略

       SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();

       retryPolicy.setMaxAttempts(3);

       

       // 退避策略(避免惊群)

       ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();

       backOffPolicy.setInitialInterval(1000);

       backOffPolicy.setMultiplier(2.0);

       backOffPolicy.setMaxInterval(10000);

       

       retryTemplate.setRetryPolicy(retryPolicy);

       retryTemplate.setBackOffPolicy(backOffPolicy);

       

       // 设置重试监听器

       retryTemplate.registerListener(new RetryListener() {

           @Override

           public <T, E extends Throwable> boolean open(RetryContext context,

                   RetryCallback<T, E> callback) {

               log.info("开始重试");

               return true;

           }

           

           @Override

           public <T, E extends Throwable> void close(RetryContext context,

                   RetryCallback<T, E> callback, Throwable throwable) {

               log.info("结束重试");

           }

           

           @Override

           public <T, E extends Throwable> void onError(RetryContext context,

                   RetryCallback<T, E> callback, Throwable throwable) {

               log.warn("第{}次重试失败", context.getRetryCount(), throwable);

           }

       });

       

       factory.setRetryTemplate(retryTemplate);

       

       // 关键:重试失败后的处理

       factory.setRecoveryCallback(context -> {

           ConsumerRecord<?, ?> record =

               (ConsumerRecord<?, ?>) context.getAttribute("record");

           log.error("重试耗尽,消息进入死信队列:{}", record.value());

           

           // 发送到死信队列

           kafkaTemplate.send("dlq-topic", record.value());

           

           return null;

       });

       

       return factory;

   }

}

```


2. SeekToCurrentErrorHandler(本地重试)


```java

@Configuration

public class SeekToCurrentConfig {

   

   @Bean

   public ConcurrentKafkaListenerContainerFactory<String, String>

           seekToCurrentContainerFactory() {

       

       ConcurrentKafkaListenerContainerFactory<String, String> factory =

           new ConcurrentKafkaListenerContainerFactory<>();

       factory.setConsumerFactory(consumerFactory());

       

       // 使用SeekToCurrentErrorHandler

       SeekToCurrentErrorHandler errorHandler =

           new SeekToCurrentErrorHandler(

               new DeadLetterPublishingRecoverer(kafkaTemplate()),  // 死信队列处理器

               new FixedBackOff(1000, 3)  // 重试3次,间隔1秒

           );

       

       // 排除某些异常不重试

       errorHandler.addNotRetryableExceptions(

           IllegalArgumentException.class,

           ValidationException.class

       );

       

       // 设置重试监听器

       errorHandler.setCommitRecovered(true);  // 重试成功后提交偏移量

       

       factory.setErrorHandler(errorHandler);

       

       return factory;

   }

}

```


3. 自定义错误处理器


```java

@Component

public class CustomErrorHandler implements ConsumerAwareErrorHandler {

   

   @Autowired

   private KafkaTemplate<String, String> kafkaTemplate;

   

   @Override

   public void handle(Exception thrownException,

                      ConsumerRecord<?, ?> record,

                      Consumer<?, ?> consumer) {

       

       log.error("消费失败,记录:{}", record, thrownException);

       

       // 根据异常类型选择策略

       if (thrownException instanceof DeserializationException) {

           // 反序列化失败:跳过此消息

           log.warn("反序列化失败,跳过消息:{}", record.offset());

           

       } else if (thrownException instanceof BusinessException) {

           // 业务异常:重试

           handleRetry(record, consumer, thrownException);

           

       } else {

           // 其他异常:发送到死信队列

           sendToDeadLetterQueue(record);

       }

   }

   

   private void handleRetry(ConsumerRecord<?, ?> record,

                            Consumer<?, ?> consumer,

                            Exception exception) {

       

       // 获取当前分区和偏移量

       TopicPartition partition = new TopicPartition(

           record.topic(), record.partition());

       

       // 重试策略

       int maxRetries = 3;

       for (int i = 0; i < maxRetries; i++) {

           try {

               Thread.sleep(1000 * (i + 1));  // 递增间隔

               

               // 重新消费同一条消息

               consumer.seek(partition, record.offset());

               consumer.poll(Duration.ofMillis(100));

               

               log.info("第{}次重试成功", i + 1);

               return;

               

           } catch (Exception e) {

               log.warn("第{}次重试失败", i + 1, e);

           }

       }

       

       // 重试失败,发送到死信队列

       sendToDeadLetterQueue(record);

   }

}

```


4. 死信队列(DLQ)模式


```java

@Configuration

public class DeadLetterConfig {

   

   @Bean

   public DeadLetterPublishingRecoverer dlqRecoverer() {

       return new DeadLetterPublishingRecoverer(

           kafkaTemplate(),

           (record, ex) -> {

               // 决定发送到哪个死信队列

               String originalTopic = record.topic();

               

               if (ex instanceof DeserializationException) {

                   return new TopicPartition(originalTopic + ".DLQ.DESERIALIZE", 0);

               } else if (ex instanceof ValidationException) {

                   return new TopicPartition(originalTopic + ".DLQ.VALIDATE", 0);

               } else {

                   return new TopicPartition(originalTopic + ".DLQ.ERROR", 0);

               }

           }

       );

   }

   

   @Bean

   public ConcurrentKafkaListenerContainerFactory<String, String>

           dlqContainerFactory() {

       

       ConcurrentKafkaListenerContainerFactory<String, String> factory =

           new ConcurrentKafkaListenerContainerFactory<>();

       factory.setConsumerFactory(consumerFactory());

       

       // 组合策略:重试 + 死信队列

       ExponentialBackOff backOff = new ExponentialBackOff(1000, 2.0);

       backOff.setMaxInterval(10000);

       backOff.setMaxElapsedTime(30000);

       

       DefaultErrorHandler errorHandler = new DefaultErrorHandler(

           dlqRecoverer(),  // 死信队列处理器

           backOff          // 重试策略

       );

       

       // 配置

       errorHandler.setAckAfterHandle(false);

       errorHandler.addRetryableExceptions(BusinessException.class);

       errorHandler.addNotRetryableExceptions(

           IllegalArgumentException.class,

           ValidationException.class

       );

       

       factory.setCommonErrorHandler(errorHandler);

       

       return factory;

   }

}

```


三、完整的消费端配置示例


```yaml

# application.yml

spring:

 kafka:

   # 消费者通用配置

   consumer:

     bootstrap-servers: ${KAFKA_HOST:localhost:9092}

     group-id: ${APPLICATION_NAME:myapp}-consumer

     auto-offset-reset: earliest  # 没有偏移量时从最早开始

     enable-auto-commit: false    # 关闭自动提交

     key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

     value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

     

     # 重要性能配置

     max-poll-records: 500        # 每次拉取最大记录数

     fetch-min-size: 1            # 最小拉取字节数

     fetch-max-wait-ms: 500       # 拉取等待时间

     heartbeat-interval-ms: 3000  # 心跳间隔

     session-timeout-ms: 10000    # 会话超时

     

     # 反序列化容错

     properties:

       spring.json.trusted.packages: "*"

       spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2

       spring.deserializer.key.delegate.class: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2

       

   listener:

     # 监听器类型

     type: batch  # single 或 batch

     

     # 并发配置

     concurrency: 3

     

     # 提交模式

     ack-mode: manual_immediate

     

     # 批量配置

     poll-timeout: 3000

     idle-between-polls: 1000

     

     # 错误处理

     missing-topics-fatal: false  # 主题不存在时不失败

```


四、生产环境最佳实践


1. 消费幂等性设计


```java

@Component

public class IdempotentConsumer {

   

   @Autowired

   private RedisTemplate<String, String> redisTemplate;

   

   @KafkaListener(topics = "order-topic")

   public void consumeOrder(ConsumerRecord<String, String> record) {

       

       String messageId = record.key();

       String messageBody = record.value();

       

       // 幂等性检查

       if (isDuplicate(messageId)) {

           log.warn("重复消息,跳过处理:{}", messageId);

           return;

       }

       

       // 处理业务

       processOrder(messageBody);

       

       // 记录已处理

       markAsProcessed(messageId);

   }

   

   private boolean isDuplicate(String messageId) {

       // 使用Redis判断是否重复

       String key = "kafka:processed:" + messageId;

       return Boolean.TRUE.equals(redisTemplate.hasKey(key));

   }

   

   private void markAsProcessed(String messageId) {

       String key = "kafka:processed:" + messageId;

       redisTemplate.opsForValue().set(key, "1", 24, TimeUnit.HOURS);

   }

}

```


2. 消费监控和告警


```java

@Component

public class ConsumerMonitor {

   

   @EventListener

   public void listen(ListenerContainerIdleEvent event) {

       // 消费者空闲监控

       log.warn("消费者 {} 空闲超过阈值,可能有问题",

           event.getListenerId());

       

       if (event.getIdleTime() > 30000) {  // 30秒

           sendAlert("Kafka消费者长时间空闲: " + event.getListenerId());

       }

   }

   

   @EventListener  

   public void listen(ConsumptionPausedEvent event) {

       // 消费暂停监控

       log.error("消费暂停:{}", event.getContainer().getListenerId());

       sendAlert("Kafka消费暂停: " + event.getContainer().getListenerId());

   }

   

   @EventListener

   public void listen(NonResponsiveConsumerEvent event) {

       // 消费者无响应监控

       log.error("消费者无响应:{}", event.getContainer().getListenerId());

       sendAlert("Kafka消费者无响应: " + event.getContainer().getListenerId());

   }

}

```


3. 动态配置更新


```java

@Configuration

@RefreshScope  // 支持配置热更新

public class DynamicKafkaConfig {

   

   @Value("${kafka.consumer.concurrency:3}")

   private Integer concurrency;

   

   @Bean

   @RefreshScope

   public ConcurrentKafkaListenerContainerFactory<String, String>

           dynamicContainerFactory() {

       

       ConcurrentKafkaListenerContainerFactory<String, String> factory =

           new ConcurrentKafkaListenerContainerFactory<>();

       factory.setConsumerFactory(consumerFactory());

       

       // 动态并发数

       factory.setConcurrency(concurrency);

       

       // 根据负载动态调整

       factory.setContainerCustomizer(container -> {

           int currentLoad = getCurrentLoad();

           if (currentLoad > 80) {

               container.setConcurrency(concurrency + 2);

           } else if (currentLoad < 30) {

               container.setConcurrency(Math.max(1, concurrency - 1));

           }

       });

       

       return factory;

   }

}

```


五、面试要点总结


消费模式选择策略


场景 推荐模式 配置要点

金融交易 手动提交 关闭自动提交,业务成功后手动ack

日志处理 批量消费 开启批量监听,调整max-poll-records

高吞吐 并发消费 设置合适并发数(≤分区数)

消息过滤 过滤模式 设置RecordFilterStrategy


重试策略选择


异常类型 推荐策略 说明

网络抖动 指数退避重试 逐步增加重试间隔

业务异常 有限次重试+死信队列 避免无限重试

数据错误 立即死信队列 不重试,直接进入DLQ

反序列化失败 跳过并记录 避免阻塞消费


常见面试问题及回答


1. Q:如何保证消息不丢失?

  · A:生产者使用acks=all,消费者关闭自动提交,业务成功后手动提交

2. Q:如何处理消息积压?

  · A:增加消费者数量(不超过分区数),调整max.poll.records,开启批量消费

3. Q:如何实现消费幂等性?

  · A:使用消息ID+Redis/数据库唯一约束,实现去重逻辑

4. Q:什么时候使用死信队列?

  · A:重试达到上限后,或者某些特定异常(如数据格式错误)直接进入DLQ

5. Q:如何监控消费状态?

  · A:使用Spring Kafka事件监听器,结合Metrics暴露消费指标


性能优化建议


1. 合理设置批量大小:根据消息大小和网络延迟调整

2. 优化消费者线程数:CPU密集型任务减少并发,IO密集型增加并发

3. 调整拉取参数:fetch.min.bytes和fetch.max.wait.ms的权衡

4. 使用合适的序列化:Protobuf/Avro比JSON性能更好

5. 监控消费延迟:关注consumer-lag指标


记住:没有银弹,最佳配置取决于具体业务场景、数据量和性能要求。