当前位置: 首页 > ds >正文

rockerMQ实战 事务消息、延迟消息

引入依赖

<!-- RocketMQ -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.3.2</version>
</dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client-java</artifactId><version>5.0.7</version>
</dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId>
</dependency>

事务消息

发送事务消息

package com.test.xulk.transactionmq;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;public class OrderService {  // 发消息public static void main(String[] args) throws MQClientException {  // 初始化事务消息生产者  TransactionMQProducer producer = new TransactionMQProducer("order_transaction_group");  producer.setNamesrvAddr("localhost:9876");  // 设置事务监听器  TransactionListener transactionListener = new OrderTransactionListener();  producer.setTransactionListener(transactionListener);  producer.start();  try {  // 构建减库存半消息  Message msg = new Message("OrderTopic", "TagA", "KEY1", ("OrderID:123456, SKU:123456, Quantity:1").getBytes());// 发送事务消息  TransactionSendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.printf("发送事务消息: %s%n", sendResult);} catch (MQClientException e) {  e.printStackTrace();  }  Runtime.getRuntime().addShutdownHook(new Thread(producer::shutdown));  }  
}  

监听消息

package com.test.xulk.transactionmq;import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;/*** @program: 监听消息* @author: xulk* @create: 2025-05-27 19:35*/
public class OrderTransactionListener implements TransactionListener {// 监听消息@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {try {// 执行本地事务:创建订单boolean isOrderCreated = createOrder(msg);// 模拟异常 进入  catch 方法  返回未知  return LocalTransactionState.UNKNOW;// int i =  1/0;// 订单插入成功 就提交事务,插入失败就回滚事务// 模拟超时 8 秒 就会进行回调检查Thread.sleep(7000);if (isOrderCreated) {return LocalTransactionState.COMMIT_MESSAGE; // 提交库存扣减消息} else {return LocalTransactionState.ROLLBACK_MESSAGE; // 回滚库存扣减消息}} catch (Exception e) {e.printStackTrace();return LocalTransactionState.UNKNOW; // 未知状态}}// 回查订单是否存在  只有 mq 超时未收到 COMMIT_MESSAGE / ROLLBACK_MESSAGE的时候或者 LocalTransactionState.UNKNOW 未知状态的时候才会触发此方法@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {// 回查本地事务状态boolean isOrderConfirmed = checkOrderStatus(msg.getKeys());return isOrderConfirmed ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE;}private boolean createOrder(Message msg) {// 模拟订单创建逻辑(通常为数据库操作)System.out.println("创建订单===============: " + new String(msg.getBody()));// 这里应该包括数据库的insert操作和相关的业务逻辑return true; // 返回订单创建成功状态}private boolean checkOrderStatus(String orderKey) {// 模拟查询订单状态System.out.println("查询订单是否存在: " + orderKey);// 这里通常涉及查询数据库以确认订单的最终状态return true; // 假设订单已确认}}   

模拟异常

模拟超时 20 秒

实现要点

    1、事务消息的发送:sendMessageInTransaction用于发送半消息,该消息在事务未确定前不会被消费者接收。
    2、本地事务操作:在executeLocalTransaction方法中执行订单创建的本地事务逻辑。
    3、事务状态反馈:根据订单创建成功与否,决定是否提交或回滚事务消息。
    4、事务状态回查:checkLocalTransaction用于处理RocketMQ对事务状态的回查请求,以解决网络超时、进程崩溃等问题导致的事务状态不一致。
注意事项
    幂等性:事务操作(例如订单创建和库存扣减)应确保幂等性,以防止重复执行造成的数据不一致。
    异常处理:正确处理各种异常状况,确保事务的完善性和数据的一致性。
    回查频率:设置适当的事务回查策略,避免频繁的检查影响性能。
通过这些实现,RocketMQ可以确保在订单创建与覆盖在储存事务消息的可靠性之间的精准一致性处理,保障系统的最终一致性和事务可靠性。

事务消息的原理:

1. 半事务消息
    当生产者发送半事务消息时,RocketMQ会将其标记为"Prepared"准备状态,并存储在特定的队列中。这种消息对消费者不可见。

// RocketMQ内部实现(简化版)  
public class TransactionMessageService {  public PutMessageResult prepareMessage(MessageExtBrokerInner msgInner) {  // 将消息标记为Prepared状态  msgInner.setTransactionPrepared(true);  // 存储消息到特定队列  return commitLog.putMessage(msgInner);  }  
}


2. 提交或回滚事务
    根据本地事务的执行结果,生产者会向RocketMQ发送提交或回滚指令。

// RocketMQ内部实现(简化版)  
public class TransactionMessageService {  public void commitMessage(MessageExt msgExt) {  // 将消息从Prepared状态改为可消费状态  MessageExtBrokerInner msgInner = convertToMessageInner(msgExt);  msgInner.setTransactionPrepared(false);  // 将消息从特定队列转存到真正的发送队列里去commitLog.putMessage(msgInner);  }  public void rollbackMessage(MessageExt msgExt) {  // 删除Prepared状态的消息  removeMessage(msgExt);  }  
}

3. 事务状态回查
    如果RocketMQ在一定时间内没有收到提交或回滚指令,(定时扫描)会启动回查机制。大约7-8秒

// RocketMQ内部实现(简化版)  
public class TransactionCheckService {  public void check() {  for (MessageExt msgExt : getPrepareedMessages()) {  LocalTransactionState state = producer.checkLocalTransaction(msgExt);  if (state == LocalTransactionState.COMMIT_MESSAGE) {  commitMessage(msgExt);  } else if (state == LocalTransactionState.ROLLBACK_MESSAGE) {  rollbackMessage(msgExt);  } else {  // 继续等待  }  }  }  
}


4. 关键点说明
    原子性:半事务消息对消费者不可见,确保了事务的原子性。
    持久性:半事务消息会被持久化,即使Broker宕机也能恢复。
    隔离性:通过特殊的队列存储半事务消息,实现了隔离。
    一致性:通过二阶段提交和回查机制保证了最终一致性。
通过这套实现机制,RocketMQ能够在分布式系统中实现可靠的事务消息,确保消息发送与本地事务的一致性,即使在系统崩溃或网络故障的情况下也能保证数据的一致性。

延迟消息

发送延迟消息

4.X版本不支持任意时间的延迟,而是提供了18个固定的延迟级别,而是提供了18个固定的延迟级别,从1s,5s,10s,30s,1m,2m,3m到2h不等

5.X版本支持任意时间的延迟,只需要设置 message.setDeliverTimeMs(time); 即可

package com.test.xulk.delaymq;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;import java.text.SimpleDateFormat;
import java.util.Date;public class DelayMessageProducer {public static void main(String[] args) throws Exception {// 创建生产者实例  DefaultMQProducer producer = new DefaultMQProducer("DelayMessageProducerGroup");producer.setNamesrvAddr("localhost:9876");// 启动生产者  producer.start();SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String currentTime =  "  延迟消息   发送时间 延迟13秒 ==  " + sdf.format(new Date()) ;System.out.println(  currentTime );// 创建一条消息  Message message = new Message("DelayTopic", "Tag1", currentTime.getBytes());// 设置延迟级别,如3表示延迟10秒    RocketMQ不支持任意时间的延迟,而是提供了18个固定的延迟级别,从1s,5s,10s,30s,1m,2m,3m到2h不等// message.setDelayTimeLevel(3);Long time = System.currentTimeMillis() + 13 * 1000;message.setDeliverTimeMs(time);try {producer.send(message);} catch (Exception e) {System.out.println("发送异常----{}" + e.getLocalizedMessage());}// 关闭生产者producer.shutdown();}
}

接收延迟消息

package com.test.xulk.delaymq;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;  
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;  
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;  
import org.apache.rocketmq.common.message.MessageExt;import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;public class DelayMessageConsumer {  public static void main(String[] args) throws Exception {  // 创建消费者实例  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DelayMessageConsumerGroup");  consumer.setNamesrvAddr("localhost:9876");// 订阅主题  consumer.subscribe("DelayTopic", "*");// 注册消息监听器  consumer.registerMessageListener(new MessageListenerConcurrently() {  @Override  public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");String currentTime = sdf.format(new Date());for (MessageExt msg : msgs) {  System.out.printf("------ 接收时间:" + currentTime  + "   ============ " +  new String(msg.getBody()) );}  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  }  });// 启动消费者  consumer.start();  }  
}

http://www.xdnf.cn/news/9465.html

相关文章:

  • 【Ruoyi-Vue】动态修改ruoyi-vue路由标签名称
  • MYSQL丢失pid处理方式
  • ZAB 和 RAFT分别是什么?它们的区别是什么?
  • STM32之FreeRTOS移植(重点)
  • 一次消谐器更换操作流程及注意事项
  • 7系fpga带microblaze做固件及固化
  • leetcode501.二叉搜索树中的众数:迭代中序遍历的众数追踪与数组动态更新
  • 重磅发布 | 复旦533页《大规模语言模型:从理论到实践(第2版)》(免费下载)
  • spring Data JPA详细介绍。
  • 3.20 工程计价数字化与智能化
  • PyTorch 2.1新特性:TorchDynamo如何实现30%训练加速(原理+自定义编译器开发)
  • Spring Ai | 从零带你一起走进AI项目(中英)
  • PXC集群
  • C++数据结构 : 二叉搜索树
  • Java大师成长计划之第32天:使用Kubernetes进行Java应用编排与管理
  • Python页面纸张大小设置
  • 为什么苹果签名会掉签
  • 语音合成之十七 语音合成(TTS)中文自然度:问题、成因、解决方案
  • C++ 初始化大全
  • JavaScript变量宣言三剑客:var、let、const的奇幻冒险
  • 覆盖索引详解:原理、优势与面试要点
  • 循环神经网络(RNN):原理、架构与实战
  • 第1章 计算机系统知识
  • 32. 自动化测试开发之建立mysql和oracle数据库连接池
  • 训练自己的yolo模型,并部署到rk3588上
  • 微元法求解弧长与侧面积
  • 哪些情况索引会失效?
  • ubuntu 24 下使用pip 时碰到Externally Managed Environment Error的解决办法
  • Oracle迁移到瀚高之后,空值问题处理(APP)
  • 数据库相关问题