当前位置: 首页 > web >正文

怎么理解使用MQ解决分布式事务 -- 以kafka为例

利用 Apache Kafka 实现分布式事务的完整指南

本文聚焦 Kafka 原生能力,从「事务语义 → 代码 → 运维 → 故障场景」逐层展开,给出可在生产环境直接落地的全套方案。


一、Kafka 分布式事务的 3 个核心语义

语义实现机制配置/代码标志
幂等性Broker 端去重 + Sequence Numberenable.idempotence=true
事务两阶段提交 + Transaction Coordinatortransactional.id
读已提交消费者过滤未提交事务消息isolation.level=read_committed

二、架构全景图

┌─────────────────────────────────────────────────────────────┐
│  Producer (订单服务)                                         │
│  1. beginTransaction()                                       │
│  2. insert into order_tbl …                                  │
│  3. send("stock-deduct", orderId)                            │
│  4. commitTransaction()   ─┐                                 │
└────────────────────────────┼─────────────────────────────┐   ││ 两阶段提交                   │   │
┌────────────────────────────┼─────────────────────────────┘   │
│  Broker                                                    │   │
│  • Transaction Coordinator (TC)                            │   │
│  • __transaction_state 日志 (3 副本)                       │   │
│  • 写入分区队列                                           │   │
└────────────────────────────┼─────────────────────────────┐   ││ 仅投递 committed 消息        │   │
┌────────────────────────────┼─────────────────────────────┘   │
│  Consumer (库存服务)                                       │
│  5. poll() → read_committed                               │
│  6. update stock_tbl set qty = qty - ? where id = ?        │
│  7. ack()                                                  │
└─────────────────────────────────────────────────────────────┘

三、Producer 端完整配置与代码

1. 通用 Producer 参数

bootstrap.servers=kafka:9092
enable.idempotence=true               # 幂等发送
transactional.id=order-service-tx-1   # 全局唯一
acks=all
max.in.flight.requests.per.connection=5
transaction.timeout.ms=30000          # 小于 broker 的 max.transaction.timeout.ms

2. Spring Boot 双事务(Kafka + JDBC)

@Configuration
public class KafkaChainedTxConfig {@Beanpublic ProducerFactory<String, String> producerFactory() {Map<String, Object> props = new HashMap<>();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "order-service-tx");DefaultKafkaProducerFactory<String, String> pf =new DefaultKafkaProducerFactory<>(props);pf.setTransactionIdPrefix("order-tx-");          // 支持并发事务return pf;}@Beanpublic KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> pf) {return new KafkaTransactionManager<>(pf);}@Bean("chainedTxManager")public ChainedTransactionManager chainedTxManager(KafkaTransactionManager<?, ?> ktm,DataSourceTransactionManager dstm) {return new ChainedTransactionManager(ktm, dstm);}
}

3. Service 层

@Service
public class OrderService {private final OrderRepository repo;private final KafkaTemplate<String, OrderEvent> kafka;@Transactional("chainedTxManager")public void createOrder(CreateOrderCommand cmd) {// 1. 本地事务Order order = repo.save(new Order(cmd));// 2. 发送事务消息OrderEvent event = new OrderEvent(order.getId(), cmd.getSkuId(), cmd.getQty());kafka.send("stock-deduct", order.getId().toString(), event);// 3. 若 DB 回滚,Kafka 事务也回滚;反之亦然}
}

四、Consumer 端:幂等 + 重试 + 死信队列

1. 消费者配置

bootstrap.servers=kafka:9092
group.id=stock-service
isolation.level=read_committed
enable.auto.commit=false
max.poll.records=100

2. 监听器(批量 + 幂等)

@Component
public class StockConsumer {private final StockRepository stockRepo;@KafkaListener(topics = "stock-deduct",containerFactory = "batchFactory")public void listen(List<ConsumerRecord<String, OrderEvent>> records,Acknowledgment ack) {for (var r : records) {try {consumeOne(r.value());} catch (DuplicateKeyException ex) {// 幂等冲突,跳过} catch (DataIntegrityViolationException ex) {// 库存不足,记录告警并手动 ack,不再重试} catch (Exception ex) {// 其他异常:抛出让 SeekToCurrentErrorHandler 重试throw ex;}}ack.acknowledge();}@Transactionalpublic void consumeOne(OrderEvent e) {int affected = stockRepo.deductQty(e.getSkuId(), e.getQty(), e.getOrderId());if (affected == 0) {throw new IllegalStateException("库存扣减失败");}}
}

3. 重试与死信队列(Spring Kafka)

@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> batchFactory(ConsumerFactory<String, OrderEvent> cf) {ConcurrentKafkaListenerContainerFactory<String, OrderEvent> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(cf);factory.setBatchListener(true);// 最多重试 3 次后发送到 DLQDefaultErrorHandler handler =new DefaultErrorHandler(new DeadLetterPublishingRecoverer(kafkaTemplate(), (r, e) -> new TopicPartition("stock-deduct.DLT", r.partition())),new FixedBackOff(1000L, 2));factory.setCommonErrorHandler(handler);return factory;
}

五、事务超时 & 死锁排查

指标触发场景解决
transaction.timeout.ms 超期Broker 未收到 commit/abort调大或优化业务耗时
producer.send 阻塞网络抖动、ISR < min.insync.replicas监控 kafka.server:RequestQueueTimeMs
消费者 lag 持续增大下游消费慢 / 重试风暴扩容消费者、减少 batch size

六、完整监控体系

  1. JMX 指标

    • Producer:record-send-rate, transaction-duration-avg
    • Broker:transaction-coordinator-metricstransactional-id-count
    • Consumer:records-lag-max, commit-latency-avg
  2. Prometheus + Grafana

    - pattern: kafka.producer<type=producer-metrics, client-id=(.+)><>(transaction-duration-avg)name: kafka_producer_transaction_duration_avglabels:client_id: "$1"
    
  3. 告警规则示例

    - alert: KafkaTransactionStuckexpr: kafka_producer_transaction_duration_avg > 20for: 1mannotations:summary: "事务长时间未完成"
    

七、故障演练清单

场景操作预期行为
Broker 重启docker kill kafka-1事务协调器 failover,事务仍可完成
Producer 进程崩溃kill -9事务超时后 Broker 自动 abort
消费者消费异常业务抛异常重试 3 次 → DLQ → 人工处理

八、小结

维度结论
一致性本地事务 + Kafka 事务 API → 原子提交
可用性异步投递,高吞吐,支持水平扩容
复杂度仅需幂等消费与重试策略,2PC 网络阻塞消失
性能实测 TPS 下降 < 10%,远低于数据库 2PC

至此,从配置、代码到监控、故障演练 的 Kafka 分布式事务闭环已完整落地。

http://www.xdnf.cn/news/16702.html

相关文章:

  • 小白学OpenCV系列1-图像处理基本操作
  • 机器学习-十大算法之一线性回归算法
  • gTest测试框架的安装与配置
  • Qt 并行计算框架与应用
  • 项目优化中对象的隐式共享
  • 从单机架构到分布式:Redis为何成为架构升级的关键一环?
  • 【开源项目】轻量加速利器 HubProxy 自建 Docker、GitHub 下载加速服务
  • Less Less基础
  • Docker学习相关视频笔记(二)
  • 负载均衡、算法/策略
  • ROUGE-WE:词向量化革新的文本生成评估框架
  • Java 9 新特性解析
  • 考古学家 - 华为OD统一考试(JavaScript 题解)
  • 算法第29天|动态规划dp2:不同路径、不同路径Ⅱ、整数拆分、不同的二叉搜索树
  • uipath数据写入excel的坑
  • Python 程序设计讲义(25):循环结构——嵌套循环
  • 《Spring Cloud Gateway 深度剖析:从核心原理到企业级实战》
  • WAIC 2025观察:昇腾助力AI融入多元化生活场景
  • 理解Transformer解码器
  • Github 连接救星,完全合规合法,无风险!
  • 操作系统-lecture2(操作系统结构)
  • 微服务 01
  • Objective-c 初阶——异常处理(try-catch)
  • Typecho handsome新增评论区QQ,抖音,b站等表情包
  • 用FunASR轻松实现音频转SRT字幕:完整脚本与解析
  • iOS仿写 —— 计算器
  • Python 程序设计讲义(28):字符串的用法——格式化字符串
  • [leetcode] 组合总和
  • 冒泡排序算法
  • Java中什么是类加载?类加载的过程?