Spring Boot + Kafka 面试深度解析
一、Kafka 消息消费的 4 种核心模式
1. 手动提交模式(Manual Acknowledgement)
```java
@Component
public class ManualAckConsumer {
// 核心配置:关闭自动提交,手动控制偏移量
@KafkaListener(
topics = "${kafka.topic.manual}",
containerFactory = "manualAckContainerFactory"
)
public void onMessage(
ConsumerRecord<String, String> record,
Acknowledgment ack) { // 关键:注入Acknowledgment
try {
// 1. 业务处理
processBusiness(record.value());
// 2. 成功后才手动提交
ack.acknowledge();
log.info("消息处理成功,已提交偏移量:{}", record.offset());
} catch (Exception e) {
log.error("消息处理失败,不提交偏移量:{}", record.offset(), e);
// 不调用acknowledge(),消息会被重新消费
}
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
manualAckContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 关键配置
factory.getContainerProperties().setAckMode(
ContainerProperties.AckMode.MANUAL_IMMEDIATE // 手动立即提交
);
// 设置并发消费者数量
factory.setConcurrency(3);
return factory;
}
}
```
面试回答要点:
· 适用场景:金融交易、订单处理等需要精确一次语义的场景
· 优点:完全控制提交时机,避免消息丢失
· 缺点:需要手动管理,复杂度高
· 提交方式:
· MANUAL:批量提交(推荐)
· MANUAL_IMMEDIATE:单条立即提交(性能较差)
2. 批量消费模式(Batch Processing)
```java
@Component
public class BatchConsumer {
// 核心:一次性消费一批消息
@KafkaListener(
topics = "${kafka.topic.batch}",
containerFactory = "batchContainerFactory"
)
public void onMessages(
List<ConsumerRecord<String, String>> records) { // 关键:List接收
log.info("收到一批消息,数量:{}", records.size());
// 批量处理逻辑
List<CompletableFuture<Void>> futures = records.stream()
.map(record -> processAsync(record))
.collect(Collectors.toList());
// 等待所有处理完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.join();
log.info("批次处理完成");
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
batchContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 关键配置:开启批量模式
factory.setBatchListener(true);
// 批量配置
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setIdleBetweenPolls(1000);
// 控制批量大小
factory.getContainerProperties().setMaxPollRecords(100);
return factory;
}
}
```
面试回答要点:
· 适用场景:日志处理、数据同步、ETL等吞吐量大的场景
· 性能优势:减少网络IO,提高吞吐量
· 配置要点:
· setBatchListener(true) 开启批量模式
· setMaxPollRecords() 控制每批最大数量
· setPollTimeout() 设置拉取超时时间
3. 并发消费模式(Concurrent Consumption)
```java
@Component
public class ConcurrentConsumer {
@KafkaListener(
topics = "${kafka.topic.concurrent}",
containerFactory = "concurrentContainerFactory",
concurrency = "3" // 关键:并发消费者数量
)
public void onMessage(String message) {
log.info("线程 {} 处理消息:{}",
Thread.currentThread().getName(), message);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
concurrentContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 设置并发:每个分区一个消费者
int partitions = getTopicPartitions("concurrent-topic");
factory.setConcurrency(partitions); // 最佳实践:与分区数一致
// 优化配置
factory.getContainerProperties().setPollTimeout(1000);
factory.getContainerProperties().setAckTime(1000);
return factory;
}
}
```
面试回答要点:
· 并发策略:
· 一个分区只能被一个消费者消费
· 消费者数量 ≤ 分区数量(否则有消费者闲置)
· 负载均衡:Kafka自动分配分区给消费者
· 动态扩缩容:消费者组支持动态增加/减少消费者
4. 消息过滤模式(Message Filtering)
```java
@Component
public class FilteringConsumer {
@Bean
public RecordFilterStrategy<String, String> recordFilterStrategy() {
return new RecordFilterStrategy<String, String>() {
@Override
public boolean filter(ConsumerRecord<String, String> record) {
// 过滤掉特定的消息
return record.value().contains("ignore") ||
record.key() == null;
}
};
}
@KafkaListener(
topics = "${kafka.topic.filter}",
containerFactory = "filteringContainerFactory"
)
public void onFilteredMessage(String message) {
// 只会收到未被过滤的消息
log.info("收到有效消息:{}", message);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
filteringContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 设置过滤器
factory.setRecordFilterStrategy(recordFilterStrategy());
// 被过滤的消息会被确认(不抛给业务层)
factory.setAckDiscarded(true);
return factory;
}
}
```
二、消息消费失败的重试策略
1. Spring Retry 重试机制(推荐)
```java
@Component
public class RetryConsumer {
@KafkaListener(
topics = "${kafka.topic.retry}",
containerFactory = "retryContainerFactory"
)
@Retryable( // 方法级别重试
value = {BusinessException.class, NetworkException.class},
maxAttempts = 3,
backoff = @Backoff(delay = 1000, multiplier = 2.0)
)
public void onMessageWithRetry(String message) {
log.info("尝试处理消息:{}", message);
processWithPotentialFailure(message);
}
@Recover // 所有重试失败后的兜底方法
public void recover(BusinessException e, String message) {
log.error("消息处理失败,进入恢复逻辑:{}", message, e);
sendToDeadLetterQueue(message);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
retryContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 配置重试模板
RetryTemplate retryTemplate = new RetryTemplate();
// 重试策略
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3);
// 退避策略(避免惊群)
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000);
backOffPolicy.setMultiplier(2.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setBackOffPolicy(backOffPolicy);
// 设置重试监听器
retryTemplate.registerListener(new RetryListener() {
@Override
public <T, E extends Throwable> boolean open(RetryContext context,
RetryCallback<T, E> callback) {
log.info("开始重试");
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext context,
RetryCallback<T, E> callback, Throwable throwable) {
log.info("结束重试");
}
@Override
public <T, E extends Throwable> void onError(RetryContext context,
RetryCallback<T, E> callback, Throwable throwable) {
log.warn("第{}次重试失败", context.getRetryCount(), throwable);
}
});
factory.setRetryTemplate(retryTemplate);
// 关键:重试失败后的处理
factory.setRecoveryCallback(context -> {
ConsumerRecord<?, ?> record =
(ConsumerRecord<?, ?>) context.getAttribute("record");
log.error("重试耗尽,消息进入死信队列:{}", record.value());
// 发送到死信队列
kafkaTemplate.send("dlq-topic", record.value());
return null;
});
return factory;
}
}
```
2. SeekToCurrentErrorHandler(本地重试)
```java
@Configuration
public class SeekToCurrentConfig {
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
seekToCurrentContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 使用SeekToCurrentErrorHandler
SeekToCurrentErrorHandler errorHandler =
new SeekToCurrentErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate()), // 死信队列处理器
new FixedBackOff(1000, 3) // 重试3次,间隔1秒
);
// 排除某些异常不重试
errorHandler.addNotRetryableExceptions(
IllegalArgumentException.class,
ValidationException.class
);
// 设置重试监听器
errorHandler.setCommitRecovered(true); // 重试成功后提交偏移量
factory.setErrorHandler(errorHandler);
return factory;
}
}
```
3. 自定义错误处理器
```java
@Component
public class CustomErrorHandler implements ConsumerAwareErrorHandler {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Override
public void handle(Exception thrownException,
ConsumerRecord<?, ?> record,
Consumer<?, ?> consumer) {
log.error("消费失败,记录:{}", record, thrownException);
// 根据异常类型选择策略
if (thrownException instanceof DeserializationException) {
// 反序列化失败:跳过此消息
log.warn("反序列化失败,跳过消息:{}", record.offset());
} else if (thrownException instanceof BusinessException) {
// 业务异常:重试
handleRetry(record, consumer, thrownException);
} else {
// 其他异常:发送到死信队列
sendToDeadLetterQueue(record);
}
}
private void handleRetry(ConsumerRecord<?, ?> record,
Consumer<?, ?> consumer,
Exception exception) {
// 获取当前分区和偏移量
TopicPartition partition = new TopicPartition(
record.topic(), record.partition());
// 重试策略
int maxRetries = 3;
for (int i = 0; i < maxRetries; i++) {
try {
Thread.sleep(1000 * (i + 1)); // 递增间隔
// 重新消费同一条消息
consumer.seek(partition, record.offset());
consumer.poll(Duration.ofMillis(100));
log.info("第{}次重试成功", i + 1);
return;
} catch (Exception e) {
log.warn("第{}次重试失败", i + 1, e);
}
}
// 重试失败,发送到死信队列
sendToDeadLetterQueue(record);
}
}
```
4. 死信队列(DLQ)模式
```java
@Configuration
public class DeadLetterConfig {
@Bean
public DeadLetterPublishingRecoverer dlqRecoverer() {
return new DeadLetterPublishingRecoverer(
kafkaTemplate(),
(record, ex) -> {
// 决定发送到哪个死信队列
String originalTopic = record.topic();
if (ex instanceof DeserializationException) {
return new TopicPartition(originalTopic + ".DLQ.DESERIALIZE", 0);
} else if (ex instanceof ValidationException) {
return new TopicPartition(originalTopic + ".DLQ.VALIDATE", 0);
} else {
return new TopicPartition(originalTopic + ".DLQ.ERROR", 0);
}
}
);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
dlqContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 组合策略:重试 + 死信队列
ExponentialBackOff backOff = new ExponentialBackOff(1000, 2.0);
backOff.setMaxInterval(10000);
backOff.setMaxElapsedTime(30000);
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
dlqRecoverer(), // 死信队列处理器
backOff // 重试策略
);
// 配置
errorHandler.setAckAfterHandle(false);
errorHandler.addRetryableExceptions(BusinessException.class);
errorHandler.addNotRetryableExceptions(
IllegalArgumentException.class,
ValidationException.class
);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
}
```
三、完整的消费端配置示例
```yaml
# application.yml
spring:
kafka:
# 消费者通用配置
consumer:
bootstrap-servers: ${KAFKA_HOST:localhost:9092}
group-id: ${APPLICATION_NAME:myapp}-consumer
auto-offset-reset: earliest # 没有偏移量时从最早开始
enable-auto-commit: false # 关闭自动提交
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 重要性能配置
max-poll-records: 500 # 每次拉取最大记录数
fetch-min-size: 1 # 最小拉取字节数
fetch-max-wait-ms: 500 # 拉取等待时间
heartbeat-interval-ms: 3000 # 心跳间隔
session-timeout-ms: 10000 # 会话超时
# 反序列化容错
properties:
spring.json.trusted.packages: "*"
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
spring.deserializer.key.delegate.class: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer2
listener:
# 监听器类型
type: batch # single 或 batch
# 并发配置
concurrency: 3
# 提交模式
ack-mode: manual_immediate
# 批量配置
poll-timeout: 3000
idle-between-polls: 1000
# 错误处理
missing-topics-fatal: false # 主题不存在时不失败
```
四、生产环境最佳实践
1. 消费幂等性设计
```java
@Component
public class IdempotentConsumer {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@KafkaListener(topics = "order-topic")
public void consumeOrder(ConsumerRecord<String, String> record) {
String messageId = record.key();
String messageBody = record.value();
// 幂等性检查
if (isDuplicate(messageId)) {
log.warn("重复消息,跳过处理:{}", messageId);
return;
}
// 处理业务
processOrder(messageBody);
// 记录已处理
markAsProcessed(messageId);
}
private boolean isDuplicate(String messageId) {
// 使用Redis判断是否重复
String key = "kafka:processed:" + messageId;
return Boolean.TRUE.equals(redisTemplate.hasKey(key));
}
private void markAsProcessed(String messageId) {
String key = "kafka:processed:" + messageId;
redisTemplate.opsForValue().set(key, "1", 24, TimeUnit.HOURS);
}
}
```
2. 消费监控和告警
```java
@Component
public class ConsumerMonitor {
@EventListener
public void listen(ListenerContainerIdleEvent event) {
// 消费者空闲监控
log.warn("消费者 {} 空闲超过阈值,可能有问题",
event.getListenerId());
if (event.getIdleTime() > 30000) { // 30秒
sendAlert("Kafka消费者长时间空闲: " + event.getListenerId());
}
}
@EventListener
public void listen(ConsumptionPausedEvent event) {
// 消费暂停监控
log.error("消费暂停:{}", event.getContainer().getListenerId());
sendAlert("Kafka消费暂停: " + event.getContainer().getListenerId());
}
@EventListener
public void listen(NonResponsiveConsumerEvent event) {
// 消费者无响应监控
log.error("消费者无响应:{}", event.getContainer().getListenerId());
sendAlert("Kafka消费者无响应: " + event.getContainer().getListenerId());
}
}
```
3. 动态配置更新
```java
@Configuration
@RefreshScope // 支持配置热更新
public class DynamicKafkaConfig {
@Value("${kafka.consumer.concurrency:3}")
private Integer concurrency;
@Bean
@RefreshScope
public ConcurrentKafkaListenerContainerFactory<String, String>
dynamicContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 动态并发数
factory.setConcurrency(concurrency);
// 根据负载动态调整
factory.setContainerCustomizer(container -> {
int currentLoad = getCurrentLoad();
if (currentLoad > 80) {
container.setConcurrency(concurrency + 2);
} else if (currentLoad < 30) {
container.setConcurrency(Math.max(1, concurrency - 1));
}
});
return factory;
}
}
```
五、面试要点总结
消费模式选择策略
场景 推荐模式 配置要点
金融交易 手动提交 关闭自动提交,业务成功后手动ack
日志处理 批量消费 开启批量监听,调整max-poll-records
高吞吐 并发消费 设置合适并发数(≤分区数)
消息过滤 过滤模式 设置RecordFilterStrategy
重试策略选择
异常类型 推荐策略 说明
网络抖动 指数退避重试 逐步增加重试间隔
业务异常 有限次重试+死信队列 避免无限重试
数据错误 立即死信队列 不重试,直接进入DLQ
反序列化失败 跳过并记录 避免阻塞消费
常见面试问题及回答
1. Q:如何保证消息不丢失?
· A:生产者使用acks=all,消费者关闭自动提交,业务成功后手动提交
2. Q:如何处理消息积压?
· A:增加消费者数量(不超过分区数),调整max.poll.records,开启批量消费
3. Q:如何实现消费幂等性?
· A:使用消息ID+Redis/数据库唯一约束,实现去重逻辑
4. Q:什么时候使用死信队列?
· A:重试达到上限后,或者某些特定异常(如数据格式错误)直接进入DLQ
5. Q:如何监控消费状态?
· A:使用Spring Kafka事件监听器,结合Metrics暴露消费指标
性能优化建议
1. 合理设置批量大小:根据消息大小和网络延迟调整
2. 优化消费者线程数:CPU密集型任务减少并发,IO密集型增加并发
3. 调整拉取参数:fetch.min.bytes和fetch.max.wait.ms的权衡
4. 使用合适的序列化:Protobuf/Avro比JSON性能更好
5. 监控消费延迟:关注consumer-lag指标
记住:没有银弹,最佳配置取决于具体业务场景、数据量和性能要求。