TransactionSynchronizationManager事务同步器的使用
方法一、监听到需要处理的事件 这个需要完善一下处理失败后重试的逻辑
@Component public class OrderEventListener {@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) // @Async // 异步执行public <T> void onApplicationEvent(BaseEvent<T> event) {MessageSender.sendMQMessage(event);} }
方法二、
import lombok.extern.slf4j.Slf4j; import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationAdapter;@Slf4j public class MqMessageCommitHook extends TransactionSynchronizationAdapter {private final Runnable action;public MqMessageCommitHook(Runnable action) {this.action = action;}@Overridepublic void afterCommit() {try {//todo 这里可以修改为异步处理action.run();} catch (Exception e) {// 记录日志或进行补偿处理log.error("Failed to execute afterCommit logic", e);}}@Overridepublic void beforeCommit(boolean readOnly) {log.error("================beforeCommit============");}@Overridepublic void flush() {log.error("================flush============");}@Overridepublic void beforeCompletion() {log.error("================beforeCompletion============");}@Overridepublic void afterCompletion(int status) {if (TransactionSynchronization.STATUS_COMMITTED == status) {log.info("=========afterCompletion======事务提交==============");} else if (TransactionSynchronization.STATUS_ROLLED_BACK == status) {log.info("========afterCompletion=========事务回滚============");}}}
@Override@Transactional(rollbackFor = {Exception.class})public void saveBatch(Integer size) {ArrayList<Algorithm> objects = new ArrayList<>();for (int i = 0; i < size; i++) {Algorithm algorithm = new Algorithm();algorithm.setAlgoCode(i + "");algorithm.setAlgoName("name" + i);algorithm.setServiceNum(222);algorithm.setAlgoApiDoc("xxxxxxxxxxxxxxxxxxxxx");algorithm.setAlgoDescribe("xxxxxxxxxxxxxxxxxxxxx");algorithm.setAlgoDeclare("xxxxxxxxxxxxxxxxxxxxx");algorithm.setAlgoDescribeDetail("xxxxxxxxxxxxxxxxxxxxx");algorithm.setAlgoType("1");algorithm.setAlgoVersion("xxxxxxxxxxxxxxxxxxxxx");algorithm.setAlgoOwner("xxxxxxxxxxxxxxxxx");algorithm.setStatus(99);algorithm.setViewNum(9999);algorithm.setUpdateBy("ddddd");algorithm.setCreateBy("ddddd");objects.add(algorithm);}this.getBaseMapper().InsertBatchEntity(objects); // this.getBaseMapper().insertBatchSomeColumn(objects); // boolean b = saveBatch(objects, 5);// 方式一使用 发布事件(事务提交后触发)eventPublisher.publishEvent(new OrderCreatedEvent(objects.get(1), "msgType add", "uniqueKey"));//方法二使用TransactionSynchronizationManager.registerSynchronization(new MqMessageCommitHook(() -> MessageSender.sendMQMessage(new OrderCreatedEvent(objects.get(1), "msgType add 1", "uniqueKey"))));}
@Slf4j public class OrderCreatedEvent extends BaseEvent<Algorithm> {public OrderCreatedEvent(Algorithm data, String msgType, String uniqueKey) {super(data, msgType, uniqueKey);} }
@Getter public abstract class BaseEvent<T> {private final T data;private final String msgType;private final String uniqueKey;public BaseEvent(T data, String msgType, String uniqueKey) {this.data = data;this.msgType = msgType;this.uniqueKey = uniqueKey;}}
具体的业务逻辑处理
public class MessageSender {public static <T> void sendMQMessage(BaseEvent<T> event) {// 发送消息到指定的交换机和路由键System.out.println("MQ Message sent: " + event);} }