Seata源码—6.Seata AT模式的数据源代理三
大纲
1.Seata的Resource资源接口源码
2.Seata数据源连接池代理的实现源码
3.Client向Server发起注册RM的源码
4.Client向Server注册RM时的交互源码
5.数据源连接代理与SQL句柄代理的初始化源码
6.Seata基于SQL句柄代理执行SQL的源码
7.执行SQL语句前取消自动提交事务的源码
8.执行SQL语句前后构建数据镜像的源码
9.构建全局锁的key和UndoLog数据的源码
10.Seata Client发起分支事务注册的源码
11.Seata Server处理分支事务注册请求的源码
12.将UndoLog写入到数据库与提交事务的源码
13.通过全局锁重试策略组件执行事务的提交
14.注册分支事务时获取全局锁的入口源码
15.Seata Server获取全局锁的具体逻辑源码
16.全局锁和分支事务及本地事务总结
17.提交全局事务以及提交各分支事务的源码
18.全局事务回滚的过程源码
16.全局锁和分支事务及本地事务总结
获取到全局锁,才能注册分支事务成功,否则LockRetryPolicy重试。获取到全局锁,才能提交本地事务成功,否则LockRetryPolicy重试。
全局锁没有被其他事务(xid)获取,则当前事务(xid)才能获取全局锁成功。获取全局锁,会将当前分支事务申请全局锁的记录写入到数据库中。
17.提交全局事务以及提交各分支事务的源码
(1)Seata Client发起提交全局事务的请求
(2)Server向Client发送提交分支事务的请求
(3)Seata Client处理提交分支事务的请求
(4)全局事务的提交主要就是让各个分支事务把本地的UndoLog删除
(1)Seata Client发起提交全局事务的请求
-> TransactionalTemplate.execute()发起全局事务的提交
-> TransactionalTemplate.commitTransaction()
-> DefaultGlobalTransaction.commit()
-> DefaultTransactionManager.commit()
-> DefaultTransactionManager.syncCall()
-> TmNettyRemotingClient.sendSyncRequest()
把全局事务提交请求GlobalCommitRequest发送给Seata Server进行处理
//Template of executing business logic with a global transaction. 全局事务执行模版
public class TransactionalTemplate {private static final Logger LOGGER = LoggerFactory.getLogger(TransactionalTemplate.class);public Object execute(TransactionalExecutor business) throws Throwable {//1.Get transactionInfoTransactionInfo txInfo = business.getTransactionInfo();if (txInfo == null) {throw new ShouldNeverHappenException("transactionInfo does not exist");}//1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'.//根据线程本地变量副本,获取当前线程本地变量副本里是否存在xid,如果存在则创建一个全局事务//刚开始在开启一个全局事务的时候,是没有全局事务的GlobalTransaction tx = GlobalTransactionContext.getCurrent();//1.2 Handle the transaction propagation.//从全局事务配置里,可以获取到全局事务的传播级别,默认是REQUIRED//也就是如果存在一个全局事务,就直接执行业务;如果不存在一个全局事务,就开启一个新的全局事务;Propagation propagation = txInfo.getPropagation();//不同的全局事务传播级别,会采取不同的处理方式//比如挂起当前事务 + 开启新的事务,或者是直接不使用事务执行业务,挂起其实就是解绑当前线程的xid//可以通过@GlobalTransactional注解,定制业务方法的全局事务,比如指定业务方法全局事务的传播级别SuspendedResourcesHolder suspendedResourcesHolder = null;try {switch (propagation) {...}//1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'.if (tx == null) {tx = GlobalTransactionContext.createNew();}//set current tx config to holderGlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);try {//2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC,//else do nothing. Of course, the hooks will still be triggered.//开启一个全局事务beginTransaction(txInfo, tx);Object rs;try {//Do Your Business//执行业务方法,把全局事务xid通过Dubbo RPC传递下去,开启并提交一个一个分支事务rs = business.execute();} catch (Throwable ex) {//3. The needed business exception to rollback.//发生异常时需要完成的事务completeTransactionAfterThrowing(txInfo, tx, ex);throw ex;}//4. everything is fine, commit.//如果一切执行正常就会在这里提交全局事务commitTransaction(tx);return rs;} finally {//5. clear//执行一些全局事务完成后的回调,比如清理等工作resumeGlobalLockConfig(previousConfig);triggerAfterCompletion();cleanUp();}} finally {//If the transaction is suspended, resume it.if (suspendedResourcesHolder != null) {//如果之前挂起了一个全局事务,此时可以恢复这个全局事务tx.resume(suspendedResourcesHolder);}}}//提交事务private void commitTransaction(GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {try {triggerBeforeCommit();tx.commit();triggerAfterCommit();} catch (TransactionException txe) {// 4.1 Failed to committhrow new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.CommitFailure);}}...
}//The type Default global transaction. 默认的全局事务
public class DefaultGlobalTransaction implements GlobalTransaction {private TransactionManager transactionManager;DefaultGlobalTransaction(String xid, GlobalStatus status, GlobalTransactionRole role) {this.transactionManager = TransactionManagerHolder.get();//全局事务管理者this.xid = xid;this.status = status;this.role = role;}...@Overridepublic void commit() throws TransactionException {if (role == GlobalTransactionRole.Participant) {//Participant has no responsibility of committingif (LOGGER.isDebugEnabled()) {LOGGER.debug("Ignore Commit(): just involved in global transaction [{}]", xid);}return;}assertXIDNotNull();int retry = COMMIT_RETRY_COUNT <= 0 ? DEFAULT_TM_COMMIT_RETRY_COUNT : COMMIT_RETRY_COUNT;try {while (retry > 0) {try {retry--;status = transactionManager.commit(xid);break;} catch (Throwable ex) {LOGGER.error("Failed to report global commit [{}],Retry Countdown: {}, reason: {}", this.getXid(), retry, ex.getMessage());if (retry == 0) {throw new TransactionException("Failed to report global commit", ex);}}}} finally {if (xid.equals(RootContext.getXID())) {suspend();}}if (LOGGER.isInfoEnabled()) {LOGGER.info("[{}] commit status: {}", xid, status);}}...
}public class TransactionManagerHolder {...private TransactionManagerHolder() {}private static class SingletonHolder {private static TransactionManager INSTANCE = null;static {try {INSTANCE = EnhancedServiceLoader.load(TransactionManager.class);LOGGER.info("TransactionManager Singleton {}", INSTANCE);} catch (Throwable anyEx) {LOGGER.error("Failed to load TransactionManager Singleton! ", anyEx);}}}//Get transaction manager.public static TransactionManager get() {if (SingletonHolder.INSTANCE == null) {throw new ShouldNeverHappenException("TransactionManager is NOT ready!");}return SingletonHolder.INSTANCE;}...
}public class DefaultTransactionManager implements TransactionManager {...@Overridepublic GlobalStatus commit(String xid) throws TransactionException {GlobalCommitRequest globalCommit = new GlobalCommitRequest();globalCommit.setXid(xid);GlobalCommitResponse response = (GlobalCommitResponse) syncCall(globalCommit);return response.getGlobalStatus();}private AbstractTransactionResponse syncCall(AbstractTransactionRequest request) throws TransactionException {try {//TMNettyRemotingClient会和Seata Server基于Netty建立长连接return (AbstractTransactionResponse) TmNettyRemotingClient.getInstance().sendSyncRequest(request);} catch (TimeoutException toe) {throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);}}...
}
(2)Server向Client发送提交分支事务的请求
ServerHandler的channelRead()方法会将收到的请求进行层层传递:首先交给DefaultCoordinator的onRequest()方法来进行处理,然后交给GlobalCommitRequest的handle()方法来进行处理,接着交给AbstractTCInboundHandler的handle()方法来进行处理,最后交给DefaultCoordinator的doGlobalCommit()方法来进行处理,也就是调用DefaultCore的commit()方法来提交全局事务。
public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {...@ChannelHandler.Sharableclass ServerHandler extends ChannelDuplexHandler {@Overridepublic void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {if (!(msg instanceof RpcMessage)) {return;}//接下来调用processMessage()方法对解码完毕的RpcMessage对象进行处理processMessage(ctx, (RpcMessage) msg);}...}...
}public abstract class AbstractNettyRemoting implements Disposable {...protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {if (LOGGER.isDebugEnabled()) {LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));}Object body = rpcMessage.getBody();if (body instanceof MessageTypeAware) {MessageTypeAware messageTypeAware = (MessageTypeAware) body;//根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的//processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的//所以下面的代码实际上会由ServerOnRequestProcessor的process()方法进行处理final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());if (pair != null) {if (pair.getSecond() != null) {try {pair.getSecond().execute(() -> {try {pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);} finally {MDC.clear();}});} catch (RejectedExecutionException e) {...}} else {try {pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);}}} else {LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());}} else {LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);}}...
}public class ServerOnRequestProcessor implements RemotingProcessor, Disposable {private final RemotingServer remotingServer;...@Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {if (ChannelManager.isRegistered(ctx.channel())) {onRequestMessage(ctx, rpcMessage);} else {try {if (LOGGER.isInfoEnabled()) {LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel());}ctx.disconnect();ctx.close();} catch (Exception exx) {LOGGER.error(exx.getMessage());}if (LOGGER.isInfoEnabled()) {LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString()));}}}private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {Object message = rpcMessage.getBody();//RpcContext线程本地变量副本RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());if (LOGGER.isDebugEnabled()) {LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message, NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup());} else {try {BatchLogHandler.INSTANCE.getLogQueue().put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:" + rpcContext.getTransactionServiceGroup());} catch (InterruptedException e) {LOGGER.error("put message to logQueue error: {}", e.getMessage(), e);}}if (!(message instanceof AbstractMessage)) {return;}//the batch send request messageif (message instanceof MergedWarpMessage) {...} else {//the single send request messagefinal AbstractMessage msg = (AbstractMessage) message;//最终调用到DefaultCoordinator的onRequest()方法来处理RpcMessageAbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);//返回响应给客户端remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);}}...
}public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {...@Overridepublic AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {if (!(request instanceof AbstractTransactionRequestToTC)) {throw new IllegalArgumentException();}//传入的request其实就是客户端发送请求时的GlobalCommitRequestAbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;transactionRequest.setTCInboundHandler(this);return transactionRequest.handle(context);}...
}public class GlobalCommitRequest extends AbstractGlobalEndRequest {@Overridepublic short getTypeCode() {return MessageType.TYPE_GLOBAL_COMMIT;}@Overridepublic AbstractTransactionResponse handle(RpcContext rpcContext) {return handler.handle(this, rpcContext);}
}public abstract class AbstractTCInboundHandler extends AbstractExceptionHandler implements TCInboundHandler {...@Overridepublic GlobalCommitResponse handle(GlobalCommitRequest request, final RpcContext rpcContext) {GlobalCommitResponse response = new GlobalCommitResponse();response.setGlobalStatus(GlobalStatus.Committing);exceptionHandleTemplate(new AbstractCallback<GlobalCommitRequest, GlobalCommitResponse>() {@Overridepublic void execute(GlobalCommitRequest request, GlobalCommitResponse response) throws TransactionException {try {doGlobalCommit(request, response, rpcContext);} catch (StoreException e) {throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("global commit request failed. xid=%s, msg=%s", request.getXid(), e.getMessage()), e);}}@Overridepublic void onTransactionException(GlobalCommitRequest request, GlobalCommitResponse response, TransactionException tex) {super.onTransactionException(request, response, tex);checkTransactionStatus(request, response);}@Overridepublic void onException(GlobalCommitRequest request, GlobalCommitResponse response, Exception rex) {super.onException(request, response, rex);checkTransactionStatus(request, response);}}, request, response);return response;}protected abstract void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext) throws TransactionException;...
}public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable {private final DefaultCore core;...@Overrideprotected void doGlobalCommit(GlobalCommitRequest request, GlobalCommitResponse response, RpcContext rpcContext) throws TransactionException {MDC.put(RootContext.MDC_KEY_XID, request.getXid());//调用DefaultCore.commit()方法提交全局事务response.setGlobalStatus(core.commit(request.getXid()));}...
}
DefaultCore的commit()方法会调用DefaultCore的doGlobalCommit()方法,而doGlobalCommit()方法会获取全局事务的所有分支事务并进行遍历,然后把提交分支事务的请求BranchCommitRequest发送到Seata Client中。
public class DefaultCore implements Core {...@Overridepublic GlobalStatus commit(String xid) throws TransactionException {GlobalSession globalSession = SessionHolder.findGlobalSession(xid);if (globalSession == null) {return GlobalStatus.Finished;}globalSession.addSessionLifecycleListener(SessionHolder.getRootSessionManager());//just lock changeStatusboolean shouldCommit = SessionHolder.lockAndExecute(globalSession, () -> {if (globalSession.getStatus() == GlobalStatus.Begin) {//Highlight: Firstly, close the session, then no more branch can be registered.globalSession.closeAndClean();if (globalSession.canBeCommittedAsync()) {globalSession.asyncCommit();MetricsPublisher.postSessionDoneEvent(globalSession, GlobalStatus.Committed, false, false);return false;} else {globalSession.changeGlobalStatus(GlobalStatus.Committing);return true;}}return false;});if (shouldCommit) {boolean success = doGlobalCommit(globalSession, false);//If successful and all remaining branches can be committed asynchronously, do async commit.if (success && globalSession.hasBranch() && globalSession.canBeCommittedAsync()) {globalSession.asyncCommit();return GlobalStatus.Committed;} else {return globalSession.getStatus();}} else {return globalSession.getStatus() == GlobalStatus.AsyncCommitting ? GlobalStatus.Committed : globalSession.getStatus();}}@Overridepublic boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) throws TransactionException {boolean success = true;//start committing eventMetricsPublisher.postSessionDoingEvent(globalSession, retrying);if (globalSession.isSaga()) {success = getCore(BranchType.SAGA).doGlobalCommit(globalSession, retrying);} else {//获取到全局事务的所有分支事务,并进行遍历提交Boolean result = SessionHelper.forEach(globalSession.getSortedBranches(), branchSession -> {//if not retrying, skip the canBeCommittedAsync branchesif (!retrying && branchSession.canBeCommittedAsync()) {return CONTINUE;}BranchStatus currentStatus = branchSession.getStatus();if (currentStatus == BranchStatus.PhaseOne_Failed) {SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;}try {//发送请求给Seata Client提交分支事务BranchStatus branchStatus = getCore(branchSession.getBranchType()).branchCommit(globalSession, branchSession);if (isXaerNotaTimeout(globalSession,branchStatus)) {LOGGER.info("Commit branch XAER_NOTA retry timeout, xid = {} branchId = {}", globalSession.getXid(), branchSession.getBranchId());branchStatus = BranchStatus.PhaseTwo_Committed;}switch (branchStatus) {case PhaseTwo_Committed:SessionHelper.removeBranch(globalSession, branchSession, !retrying);return CONTINUE;case PhaseTwo_CommitFailed_Unretryable://not at branchSessionHelper.endCommitFailed(globalSession, retrying);LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());return false;default:if (!retrying) {globalSession.queueToRetryCommit();return false;}if (globalSession.canBeCommittedAsync()) {LOGGER.error("Committing branch transaction[{}], status:{} and will retry later", branchSession.getBranchId(), branchStatus);return CONTINUE;} else {LOGGER.error("Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession.getXid(), branchSession.getBranchId());return false;}}} catch (Exception ex) {StackTraceLogger.error(LOGGER, ex, "Committing branch transaction exception: {}", new String[] {branchSession.toString()});if (!retrying) {globalSession.queueToRetryCommit();throw new TransactionException(ex);}}return CONTINUE;});//Return if the result is not nullif (result != null) {return result;}//If has branch and not all remaining branches can be committed asynchronously,//do print log and return falseif (globalSession.hasBranch() && !globalSession.canBeCommittedAsync()) {LOGGER.info("Committing global transaction is NOT done, xid = {}.", globalSession.getXid());return false;}if (!retrying) {//contains not AT branchglobalSession.setStatus(GlobalStatus.Committed);}}//if it succeeds and there is no branch, retrying=true is the asynchronous state when retrying. EndCommitted is//executed to improve concurrency performance, and the global transaction ends..if (success && globalSession.getBranchSessions().isEmpty()) {SessionHelper.endCommitted(globalSession, retrying);LOGGER.info("Committing global transaction is successfully done, xid = {}.", globalSession.getXid());}return success;}...
}public abstract class AbstractCore implements Core {protected RemotingServer remotingServer;...@Overridepublic BranchStatus branchCommit(GlobalSession globalSession, BranchSession branchSession) throws TransactionException {try {BranchCommitRequest request = new BranchCommitRequest();request.setXid(branchSession.getXid());request.setBranchId(branchSession.getBranchId());request.setResourceId(branchSession.getResourceId());request.setApplicationData(branchSession.getApplicationData());request.setBranchType(branchSession.getBranchType());return branchCommitSend(request, globalSession, branchSession);} catch (IOException | TimeoutException e) {throw new BranchTransactionException(FailedToSendBranchCommitRequest, String.format("Send branch commit failed, xid = %s branchId = %s", branchSession.getXid(), branchSession.getBranchId()), e);}}protected BranchStatus branchCommitSend(BranchCommitRequest request, GlobalSession globalSession, BranchSession branchSession) throws IOException, TimeoutException {BranchCommitResponse response = (BranchCommitResponse) remotingServer.sendSyncRequest(branchSession.getResourceId(), branchSession.getClientId(), request);return response.getBranchStatus();}...
}public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer {...@Overridepublic Object sendSyncRequest(Channel channel, Object msg) throws TimeoutException {if (channel == null) {throw new RuntimeException("client is not connected");}RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC);return super.sendSync(channel, rpcMessage, NettyServerConfig.getRpcRequestTimeout());}...
}public abstract class AbstractNettyRemoting implements Disposable {...protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException {if (timeoutMillis <= 0) {throw new FrameworkException("timeout should more than 0ms");}if (channel == null) {LOGGER.warn("sendSync nothing, caused by null channel.");return null;}//把发送出去的请求封装到MessageFuture中,然后存放到futures这个Map里MessageFuture messageFuture = new MessageFuture();messageFuture.setRequestMessage(rpcMessage);messageFuture.setTimeout(timeoutMillis);futures.put(rpcMessage.getId(), messageFuture);channelWritableCheck(channel, rpcMessage.getBody());//获取远程地址String remoteAddr = ChannelUtil.getAddressFromChannel(channel);doBeforeRpcHooks(remoteAddr, rpcMessage);//异步化发送数据,同时对发送结果添加监听器//如果发送失败,则会对网络连接Channel进行销毁处理channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> {if (!future.isSuccess()) {MessageFuture messageFuture1 = futures.remove(rpcMessage.getId());if (messageFuture1 != null) {messageFuture1.setResultMessage(future.cause());}destroyChannel(future.channel());}});try {//然后通过请求响应组件MessageFuture同步等待Seata Server返回该请求的响应Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);doAfterRpcHooks(remoteAddr, rpcMessage, result);return result;} catch (Exception exx) {LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(), rpcMessage.getBody());if (exx instanceof TimeoutException) {throw (TimeoutException) exx;} else {throw new RuntimeException(exx);}}}...
}
(3)Seata Client处理提交分支事务的请求
ClientHandler的channelRead()方法收到提交分支事务的请求后,会由RmBranchCommitProcessor的handleBranchCommit()方法进行处理。
-> AbstractRMHandler.onRequest()
-> BranchCommitRequest.handle()
-> AbstractRMHandler.handle()
-> AbstractRMHandler.doBranchCommit()
-> DataSourceManager.branchCommit()
-> AsyncWorker.branchCommit()异步化提交分支事务
public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {...@Sharableclass ClientHandler extends ChannelDuplexHandler {@Overridepublic void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {if (!(msg instanceof RpcMessage)) {return;}processMessage(ctx, (RpcMessage) msg);}...}...
}public abstract class AbstractNettyRemoting implements Disposable {...protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {if (LOGGER.isDebugEnabled()) {LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody()));}Object body = rpcMessage.getBody();if (body instanceof MessageTypeAware) {MessageTypeAware messageTypeAware = (MessageTypeAware) body;//根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode());if (pair != null) {if (pair.getSecond() != null) {try {pair.getSecond().execute(() -> {try {pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);} finally {MDC.clear();}});} catch (RejectedExecutionException e) {...}} else {try {pair.getFirst().process(ctx, rpcMessage);} catch (Throwable th) {LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th);}}} else {LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode());}} else {LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body);}}...
}public class RmBranchCommitProcessor implements RemotingProcessor {...@Overridepublic void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());Object msg = rpcMessage.getBody();if (LOGGER.isInfoEnabled()) {LOGGER.info("rm client handle branch commit process:" + msg);}handleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest) msg);}private void handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest) {BranchCommitResponse resultMessage;resultMessage = (BranchCommitResponse) handler.onRequest(branchCommitRequest, null);if (LOGGER.isDebugEnabled()) {LOGGER.debug("branch commit result:" + resultMessage);}try {this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);} catch (Throwable throwable) {LOGGER.error("branch commit error: {}", throwable.getMessage(), throwable);}}...
}public abstract class AbstractRMHandler extends AbstractExceptionHandler implements RMInboundHandler, TransactionMessageHandler {...@Overridepublic AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {if (!(request instanceof AbstractTransactionRequestToRM)) {throw new IllegalArgumentException();}AbstractTransactionRequestToRM transactionRequest = (AbstractTransactionRequestToRM)request;transactionRequest.setRMInboundMessageHandler(this);return transactionRequest.handle(context);}...
}public class BranchCommitRequest extends AbstractBranchEndRequest {@Overridepublic short getTypeCode() {return MessageType.TYPE_BRANCH_COMMIT;}@Overridepublic AbstractTransactionResponse handle(RpcContext rpcContext) {return handler.handle(this);}
}public abstract class AbstractRMHandler extends AbstractExceptionHandler implements RMInboundHandler, TransactionMessageHandler {@Overridepublic BranchCommitResponse handle(BranchCommitRequest request) {BranchCommitResponse response = new BranchCommitResponse();exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {@Overridepublic void execute(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException {doBranchCommit(request, response);}}, request, response);return response;}protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response) throws TransactionException {String xid = request.getXid();long branchId = request.getBranchId();String resourceId = request.getResourceId();String applicationData = request.getApplicationData();if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);}BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId, applicationData);response.setXid(xid);response.setBranchId(branchId);response.setBranchStatus(status);if (LOGGER.isInfoEnabled()) {LOGGER.info("Branch commit result: " + status);}}...
}//The type Data source manager. DataSourceManager是AT模式下的资源管理器
public class DataSourceManager extends AbstractResourceManager {//异步化workerprivate final AsyncWorker asyncWorker = new AsyncWorker(this);...@Overridepublic BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId, String applicationData) throws TransactionException {//通过asyncWorker,异步化提交分支事务return asyncWorker.branchCommit(xid, branchId, resourceId);}...
}
(4)全局事务的提交主要就是让各个分支事务把本地的UndoLog删除
public class AsyncWorker {...public BranchStatus branchCommit(String xid, long branchId, String resourceId) {Phase2Context context = new Phase2Context(xid, branchId, resourceId);addToCommitQueue(context);return BranchStatus.PhaseTwo_Committed;}private void addToCommitQueue(Phase2Context context) {if (commitQueue.offer(context)) {return;}CompletableFuture.runAsync(this::doBranchCommitSafely, scheduledExecutor).thenRun(() -> addToCommitQueue(context));}void doBranchCommitSafely() {try {doBranchCommit();} catch (Throwable e) {LOGGER.error("Exception occur when doing branch commit", e);}}private void doBranchCommit() {if (commitQueue.isEmpty()) {return;}//transfer all context currently received to this listList<Phase2Context> allContexts = new LinkedList<>();commitQueue.drainTo(allContexts);//group context by their resourceIdMap<String, List<Phase2Context>> groupedContexts = groupedByResourceId(allContexts);groupedContexts.forEach(this::dealWithGroupedContexts);}private void dealWithGroupedContexts(String resourceId, List<Phase2Context> contexts) {DataSourceProxy dataSourceProxy = dataSourceManager.get(resourceId);if (dataSourceProxy == null) {LOGGER.warn("failed to find resource for {} and requeue", resourceId);addAllToCommitQueue(contexts);return;}Connection conn = null;try {conn = dataSourceProxy.getPlainConnection();UndoLogManager undoLogManager = UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType());//split contexts into several lists, with each list contain no more element than limit sizeList<List<Phase2Context>> splitByLimit = Lists.partition(contexts, UNDOLOG_DELETE_LIMIT_SIZE);//全局事务的提交,就是让各个分支事务把本地的undo logs删除掉即可for (List<Phase2Context> partition : splitByLimit) {deleteUndoLog(conn, undoLogManager, partition);}} catch (SQLException sqlExx) {addAllToCommitQueue(contexts);LOGGER.error("failed to get connection for async committing on {} and requeue", resourceId, sqlExx);} finally {IOUtil.close(conn);}}...
}
18.全局事务回滚的过程源码
全局事务的回滚流程和提交流程几乎一样:
一.Seata Client发起全局事务回滚请求
二.Server向Client发送分支事务回滚请求
三.Seata Client处理分支事务回滚的请求