怎么理解使用MQ解决分布式事务 -- 以kafka为例
利用 Apache Kafka 实现分布式事务的完整指南
本文聚焦 Kafka 原生能力,从「事务语义 → 代码 → 运维 → 故障场景」逐层展开,给出可在生产环境直接落地的全套方案。
一、Kafka 分布式事务的 3 个核心语义
语义 | 实现机制 | 配置/代码标志 |
---|---|---|
幂等性 | Broker 端去重 + Sequence Number | enable.idempotence=true |
事务 | 两阶段提交 + Transaction Coordinator | transactional.id |
读已提交 | 消费者过滤未提交事务消息 | isolation.level=read_committed |
二、架构全景图
┌─────────────────────────────────────────────────────────────┐
│ Producer (订单服务) │
│ 1. beginTransaction() │
│ 2. insert into order_tbl … │
│ 3. send("stock-deduct", orderId) │
│ 4. commitTransaction() ─┐ │
└────────────────────────────┼─────────────────────────────┐ ││ 两阶段提交 │ │
┌────────────────────────────┼─────────────────────────────┘ │
│ Broker │ │
│ • Transaction Coordinator (TC) │ │
│ • __transaction_state 日志 (3 副本) │ │
│ • 写入分区队列 │ │
└────────────────────────────┼─────────────────────────────┐ ││ 仅投递 committed 消息 │ │
┌────────────────────────────┼─────────────────────────────┘ │
│ Consumer (库存服务) │
│ 5. poll() → read_committed │
│ 6. update stock_tbl set qty = qty - ? where id = ? │
│ 7. ack() │
└─────────────────────────────────────────────────────────────┘
三、Producer 端完整配置与代码
1. 通用 Producer 参数
bootstrap.servers=kafka:9092
enable.idempotence=true # 幂等发送
transactional.id=order-service-tx-1 # 全局唯一
acks=all
max.in.flight.requests.per.connection=5
transaction.timeout.ms=30000 # 小于 broker 的 max.transaction.timeout.ms
2. Spring Boot 双事务(Kafka + JDBC)
@Configuration
public class KafkaChainedTxConfig {@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-service-tx");DefaultKafkaProducerFactory<String, String> pf =new DefaultKafkaProducerFactory<>(props);pf.setTransactionIdPrefix("order-tx-"); // 支持并发事务return pf;}@Beanpublic KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> pf) {return new KafkaTransactionManager<>(pf);}@Bean("chainedTxManager")public ChainedTransactionManager chainedTxManager(KafkaTransactionManager<?, ?> ktm,DataSourceTransactionManager dstm) {return new ChainedTransactionManager(ktm, dstm);}
}
3. Service 层
@Service
public class OrderService {private final OrderRepository repo;private final KafkaTemplate<String, OrderEvent> kafka;@Transactional("chainedTxManager")public void createOrder(CreateOrderCommand cmd) {// 1. 本地事务Order order = repo.save(new Order(cmd));// 2. 发送事务消息OrderEvent event = new OrderEvent(order.getId(), cmd.getSkuId(), cmd.getQty());kafka.send("stock-deduct", order.getId().toString(), event);// 3. 若 DB 回滚,Kafka 事务也回滚;反之亦然}
}
四、Consumer 端:幂等 + 重试 + 死信队列
1. 消费者配置
bootstrap.servers=kafka:9092
group.id=stock-service
isolation.level=read_committed
enable.auto.commit=false
max.poll.records=100
2. 监听器(批量 + 幂等)
@Component
public class StockConsumer {private final StockRepository stockRepo;@KafkaListener(topics = "stock-deduct",containerFactory = "batchFactory")public void listen(List<ConsumerRecord<String, OrderEvent>> records,Acknowledgment ack) {for (var r : records) {try {consumeOne(r.value());} catch (DuplicateKeyException ex) {// 幂等冲突,跳过} catch (DataIntegrityViolationException ex) {// 库存不足,记录告警并手动 ack,不再重试} catch (Exception ex) {// 其他异常:抛出让 SeekToCurrentErrorHandler 重试throw ex;}}ack.acknowledge();}@Transactionalpublic void consumeOne(OrderEvent e) {int affected = stockRepo.deductQty(e.getSkuId(), e.getQty(), e.getOrderId());if (affected == 0) {throw new IllegalStateException("库存扣减失败");}}
}
3. 重试与死信队列(Spring Kafka)
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> batchFactory(ConsumerFactory<String, OrderEvent> cf) {ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(cf);factory.setBatchListener(true);// 最多重试 3 次后发送到 DLQDefaultErrorHandler handler =new DefaultErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate(), (r, e) -> new TopicPartition("stock-deduct.DLT", r.partition())),new FixedBackOff(1000L, 2));factory.setCommonErrorHandler(handler);return factory;
}
五、事务超时 & 死锁排查
指标 | 触发场景 | 解决 |
---|---|---|
transaction.timeout.ms 超期 | Broker 未收到 commit/abort | 调大或优化业务耗时 |
producer.send 阻塞 | 网络抖动、ISR < min.insync.replicas | 监控 kafka.server:RequestQueueTimeMs |
消费者 lag 持续增大 | 下游消费慢 / 重试风暴 | 扩容消费者、减少 batch size |
六、完整监控体系
-
JMX 指标
- Producer:
record-send-rate
,transaction-duration-avg
- Broker:
transaction-coordinator-metrics
→transactional-id-count
- Consumer:
records-lag-max
,commit-latency-avg
- Producer:
-
Prometheus + Grafana
- pattern: kafka.producer<type=producer-metrics, client-id=(.+)><>(transaction-duration-avg)name: kafka_producer_transaction_duration_avglabels:client_id: "$1"
-
告警规则示例
- alert: KafkaTransactionStuckexpr: kafka_producer_transaction_duration_avg > 20for: 1mannotations:summary: "事务长时间未完成"
七、故障演练清单
场景 | 操作 | 预期行为 |
---|---|---|
Broker 重启 | docker kill kafka-1 | 事务协调器 failover,事务仍可完成 |
Producer 进程崩溃 | kill -9 | 事务超时后 Broker 自动 abort |
消费者消费异常 | 业务抛异常 | 重试 3 次 → DLQ → 人工处理 |
八、小结
维度 | 结论 |
---|---|
一致性 | 本地事务 + Kafka 事务 API → 原子提交 |
可用性 | 异步投递,高吞吐,支持水平扩容 |
复杂度 | 仅需幂等消费与重试策略,2PC 网络阻塞消失 |
性能 | 实测 TPS 下降 < 10%,远低于数据库 2PC |
至此,从配置、代码到监控、故障演练 的 Kafka 分布式事务闭环已完整落地。