Seata源码—3.全局事务注解扫描器的初始化二
大纲
1.全局事务注解扫描器继承的父类与实现的接口
2.全局事务注解扫描器的核心变量
3.Spring容器初始化后初始化Seata客户端的源码
4.TM全局事务管理器客户端初始化的源码
5.TM组件的Netty网络通信客户端初始化源码
6.Seata框架的SPI动态扩展机制源码
7.向Seata客户端注册网络请求处理器的源码
8.Seata客户端的定时调度任务源码
9.Seata客户端初始化Netty Bootstrap的源码
10.Seata客户端的寻址机制与连接服务端的源码
11.RM分支事务资源管理器客户端初始化的源码
12.全局事务注解扫描器扫描Bean是否有Seata注解
13.Seata全局事务拦截器的创建和初始化
14.基于Spring AOP创建全局事务动态代理的源码
15.全局事务注解扫描器的初始化总结
7.向Seata客户端注册网络请求处理器的源码
(1)向Seata客户端注册网络请求处理器
(2)初始化Seata客户端的Netty网络服务器
(1)向Seata客户端注册网络请求处理器
这些网络请求处理器主要就是:对事务协调者进行响应的处理器和心跳消息处理器。
public class TMClient {public static void init(String applicationId, String transactionServiceGroup) {init(applicationId, transactionServiceGroup, null, null);}public static void init(String applicationId, String transactionServiceGroup, String accessKey, String secretKey) {TmNettyRemotingClient tmNettyRemotingClient = TmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup, accessKey, secretKey);tmNettyRemotingClient.init();}
}public final class TmNettyRemotingClient extends AbstractNettyRemotingClient {...private final AtomicBoolean initialized = new AtomicBoolean(false);@Overridepublic void init() {//registry processor,注册一些请求处理器//由于Seata Server是可以主动给Seata Client发送请求过来的//所以Netty收到不同的请求时需要有不同的请求处理器来处理registerProcessor();if (initialized.compareAndSet(false, true)) {//初始化Netty网络服务器super.init();if (io.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {getClientChannelManager().reconnect(transactionServiceGroup);}}}private void registerProcessor() {//1.registry TC response processor,对事务协调者进行响应的处理器ClientOnResponseProcessor onResponseProcessor = new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_BEGIN_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_COMMIT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_REPORT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_ROLLBACK_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_STATUS_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_REG_CLT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);//2.registry heartbeat message processor,心跳消息处理器ClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);}...
}public class ClientOnResponseProcessor implements RemotingProcessor {//The Merge msg map from io.seata.core.rpc.netty.AbstractNettyRemotingClient#mergeMsgMapprivate Map<Integer, MergeMessage> mergeMsgMap;//The Futures from io.seata.core.rpc.netty.AbstractNettyRemoting#futuresprivate final ConcurrentMap<Integer, MessageFuture> futures;//To handle the received RPC message on upper levelprivate final TransactionMessageHandler transactionMessageHandler;public ClientOnResponseProcessor(Map<Integer, MergeMessage> mergeMsgMap, ConcurrentHashMap<Integer, MessageFuture> futures, TransactionMessageHandler transactionMessageHandler) {this.mergeMsgMap = mergeMsgMap;this.futures = futures;this.transactionMessageHandler = transactionMessageHandler;}...
}public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {...@Overridepublic void registerProcessor(int requestCode, RemotingProcessor processor, ExecutorService executor) {Pair<RemotingProcessor, ExecutorService> pair = new Pair<>(processor, executor);this.processorTable.put(requestCode, pair);}...
}public abstract class AbstractNettyRemoting implements Disposable {...//This container holds all processors.protected final HashMap<Integer/*MessageType*/, Pair<RemotingProcessor, ExecutorService>> processorTable = new HashMap<>(32);...
}
(2)初始化Seata客户端的Netty网络服务器
public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {private NettyClientChannelManager clientChannelManager;private ExecutorService mergeSendExecutorService;private final NettyClientBootstrap clientBootstrap;...@Overridepublic void init() {//启动一个定时任务,每隔10s对tx分组发起一个重连接timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {clientChannelManager.reconnect(getTransactionServiceGroup());}}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);//是否启用客户端批量发送请求,默认是falseif (this.isEnableClientBatchSendRequest()) {mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,MAX_MERGE_SEND_THREAD,KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));mergeSendExecutorService.submit(new MergedSendRunnable());}super.init();//启动Seata客户端的Netty网络服务器clientBootstrap.start();}...
}
8.Seata客户端的定时调度任务源码
Seata客户端在初始化时会启动两个定时任务:
一.每隔10s对Seata服务端发起一个重连接
二.每隔3秒检查发送的请求是否响应超时
public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {private NettyClientChannelManager clientChannelManager;private ExecutorService mergeSendExecutorService;private final NettyClientBootstrap clientBootstrap;private static final long SCHEDULE_INTERVAL_MILLS = 10 * 1000L;...@Overridepublic void init() {//启动一个定时任务,每隔10s对Seata服务端发起一个重连接timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {clientChannelManager.reconnect(getTransactionServiceGroup());}}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);//是否启用客户端批量发送请求,默认是falseif (this.isEnableClientBatchSendRequest()) {mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,MAX_MERGE_SEND_THREAD,KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));mergeSendExecutorService.submit(new MergedSendRunnable());}super.init();//启动Seata客户端的Netty网络服务器clientBootstrap.start();}...
}public abstract class AbstractNettyRemoting implements Disposable {//The Timer executor. 由单个线程进行调度的线程池protected final ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("timeoutChecker", 1, true));//Obtain the return result through MessageFuture blocking.protected final ConcurrentHashMap<Integer, MessageFuture> futures = new ConcurrentHashMap<>();protected volatile long nowMills = 0;private static final int TIMEOUT_CHECK_INTERVAL = 3000;...public void init() {//启动一个定时任务,每隔3秒检查发送的请求是否响应超时timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {for (Map.Entry<Integer, MessageFuture> entry : futures.entrySet()) {MessageFuture future = entry.getValue();if (future.isTimeout()) {futures.remove(entry.getKey());RpcMessage rpcMessage = future.getRequestMessage();future.setResultMessage(new TimeoutException(String.format("msgId: %s ,msgType: %s ,msg: %s ,request timeout", rpcMessage.getId(), String.valueOf(rpcMessage.getMessageType()), rpcMessage.getBody().toString())));if (LOGGER.isDebugEnabled()) {LOGGER.debug("timeout clear future: {}", entry.getValue().getRequestMessage().getBody());}}}nowMills = System.currentTimeMillis();}}, TIMEOUT_CHECK_INTERVAL, TIMEOUT_CHECK_INTERVAL, TimeUnit.MILLISECONDS);}
}
9.Seata客户端初始化Netty Bootstrap的源码
基于Netty的API构建一个Bootstrap:
public final class TmNettyRemotingClient extends AbstractNettyRemotingClient {...private final AtomicBoolean initialized = new AtomicBoolean(false);@Overridepublic void init() {//registry processor,注册一些请求处理器//由于Seata Server是可以主动给Seata Client发送请求过来的//所以Netty收到不同的请求时需要有不同的请求处理器来处理registerProcessor();if (initialized.compareAndSet(false, true)) {//初始化Netty网络服务器super.init();if (io.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {//找到长连接管理器,对事务服务分组发起连接请求getClientChannelManager().reconnect(transactionServiceGroup);}}}...
}public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient {private NettyClientChannelManager clientChannelManager;private ExecutorService mergeSendExecutorService;private final NettyClientBootstrap clientBootstrap;private static final long SCHEDULE_INTERVAL_MILLS = 10 * 1000L;...@Overridepublic void init() {//启动一个定时任务,每隔10s对Seata服务端发起一个重连接timerExecutor.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {clientChannelManager.reconnect(getTransactionServiceGroup());}}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);//是否启用客户端批量发送请求,默认是falseif (this.isEnableClientBatchSendRequest()) {mergeSendExecutorService = new ThreadPoolExecutor(MAX_MERGE_SEND_THREAD,MAX_MERGE_SEND_THREAD,KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(),new NamedThreadFactory(getThreadPrefix(), MAX_MERGE_SEND_THREAD));mergeSendExecutorService.submit(new MergedSendRunnable());}super.init();//启动Seata客户端的Netty网络服务器clientBootstrap.start();}...
}public class NettyClientBootstrap implements RemotingBootstrap {private final NettyClientConfig nettyClientConfig;private final Bootstrap bootstrap = new Bootstrap();private final EventLoopGroup eventLoopGroupWorker;private EventExecutorGroup defaultEventExecutorGroup;private ChannelHandler[] channelHandlers;...public NettyClientBootstrap(NettyClientConfig nettyClientConfig, final EventExecutorGroup eventExecutorGroup, NettyPoolKey.TransactionRole transactionRole) {this.nettyClientConfig = nettyClientConfig;int selectorThreadSizeThreadSize = this.nettyClientConfig.getClientSelectorThreadSize();this.transactionRole = transactionRole;this.eventLoopGroupWorker = new NioEventLoopGroup(selectorThreadSizeThreadSize, new NamedThreadFactory(getThreadPrefix(this.nettyClientConfig.getClientSelectorThreadPrefix()), selectorThreadSizeThreadSize));this.defaultEventExecutorGroup = eventExecutorGroup;}@Overridepublic void start() {if (this.defaultEventExecutorGroup == null) {this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(nettyClientConfig.getClientWorkerThreads(), new NamedThreadFactory(getThreadPrefix(nettyClientConfig.getClientWorkerThreadPrefix()), nettyClientConfig.getClientWorkerThreads()));}//基于Netty的API构建一个Bootstrap//设置好对应的NioEventLoopGroup线程池组,默认1个线程就够了this.bootstrap.group(this.eventLoopGroupWorker).channel(nettyClientConfig.getClientChannelClazz()).option(ChannelOption.TCP_NODELAY, true).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis()).option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize()).option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize());if (nettyClientConfig.enableNative()) {if (PlatformDependent.isOsx()) {if (LOGGER.isInfoEnabled()) {LOGGER.info("client run on macOS");}} else {bootstrap.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED).option(EpollChannelOption.TCP_QUICKACK, true);}}//对Netty网络通信数据处理组件pipeline进行初始化bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();//IdleStateHandler,空闲状态检查Handler//如果有数据通过就记录一下时间//如果超过很长时间没有数据通过,即处于空闲状态,那么就会触发一个user triggered event出去给ClientHandler来进行处理pipeline.addLast(new IdleStateHandler(nettyClientConfig.getChannelMaxReadIdleSeconds(),nettyClientConfig.getChannelMaxWriteIdleSeconds(),nettyClientConfig.getChannelMaxAllIdleSeconds()))//基于Seata通信协议的编码器.addLast(new ProtocolV1Decoder())//基于Seata通信协议的解码器.addLast(new ProtocolV1Encoder());if (channelHandlers != null) {addChannelPipelineLast(ch, channelHandlers);}}});if (initialized.compareAndSet(false, true) && LOGGER.isInfoEnabled()) {LOGGER.info("NettyClientBootstrap has started");}}...
}
10.Seata客户端的寻址机制与连接服务端的源码
(1)获取服务端地址的寻址机制
(2)Seata客户端发起与服务端的连接
(1)获取服务端地址的寻址机制
Seata客户端获取Seata服务端地址的方法是Netty长连接管理器NettyClientChannelManager的getAvailServerList()方法。
在getAvailServerList()方法中,首先会通过SPI机制获取注册中心服务实例,也就是注册中心工厂RegistryFactory会根据SPI机制构建出Seata的注册中心服务RegistryService的实例,然后再通过注册中心服务实例RegistryService的lookup()方法获取地址。
比如SPI获取到的注册中心服务实例是FileRegistryServiceImpl。那么其lookup()方法就会根据事务服务分组名称到file.conf里去找,找到映射的名字如default,然后根据default找到Seata服务端的地址列表。
public final class TmNettyRemotingClient extends AbstractNettyRemotingClient {...@Overridepublic void init() {//registry processor,注册一些请求处理器//由于Seata Server是可以主动给Seata Client发送请求过来的//所以Netty收到不同的请求时需要有不同的请求处理器来处理registerProcessor();if (initialized.compareAndSet(false, true)) {//初始化Netty网络服务器super.init();if (io.seata.common.util.StringUtils.isNotBlank(transactionServiceGroup)) {//找到长连接管理器,对事务服务分组发起连接请求getClientChannelManager().reconnect(transactionServiceGroup);}}}
}//Netty client pool manager. Netty的网络连接管理器
class NettyClientChannelManager {...//Reconnect to remote server of current transaction service group.void reconnect(String transactionServiceGroup) {List<String> availList = null;try {//根据事务服务分组获取到Seata Server的地址列表//比如根据事务服务分组名称到file.conf里去找,找到映射的名字如default//然后根据default找到Seata Server的地址列表availList = getAvailServerList(transactionServiceGroup);} catch (Exception e) {LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);return;}...}...private List<String> getAvailServerList(String transactionServiceGroup) throws Exception {List<InetSocketAddress> availInetSocketAddressList = RegistryFactory.getInstance().lookup(transactionServiceGroup);if (CollectionUtils.isEmpty(availInetSocketAddressList)) {return Collections.emptyList();}return availInetSocketAddressList.stream().map(NetUtil::toStringAddress).collect(Collectors.toList());}
}public class RegistryFactory {public static RegistryService getInstance() {return RegistryFactoryHolder.INSTANCE;}private static class RegistryFactoryHolder {private static final RegistryService INSTANCE = buildRegistryService();}private static RegistryService buildRegistryService() {//接下来构建Seata注册中心服务RegistryServiceRegistryType registryType;String registryTypeName = ConfigurationFactory.CURRENT_FILE_INSTANCE.getConfig(ConfigurationKeys.FILE_ROOT_REGISTRY + ConfigurationKeys.FILE_CONFIG_SPLIT_CHAR + ConfigurationKeys.FILE_ROOT_TYPE);try {registryType = RegistryType.getType(registryTypeName);} catch (Exception exx) {throw new NotSupportYetException("not support registry type: " + registryTypeName);}//通过SPI机制进行加载,比如加载到FileRegistryServiceImpl实现类return EnhancedServiceLoader.load(RegistryProvider.class, Objects.requireNonNull(registryType).name()).provide();}
}public class FileRegistryServiceImpl implements RegistryService<ConfigChangeListener> {...@Overridepublic List<InetSocketAddress> lookup(String key) throws Exception {String clusterName = getServiceGroup(key);if (clusterName == null) {return null;}String endpointStr = CONFIG.getConfig(PREFIX_SERVICE_ROOT + CONFIG_SPLIT_CHAR + clusterName + POSTFIX_GROUPLIST);if (StringUtils.isNullOrEmpty(endpointStr)) {throw new IllegalArgumentException(clusterName + POSTFIX_GROUPLIST + " is required");}String[] endpoints = endpointStr.split(ENDPOINT_SPLIT_CHAR);List<InetSocketAddress> inetSocketAddresses = new ArrayList<>();for (String endpoint : endpoints) {String[] ipAndPort = endpoint.split(IP_PORT_SPLIT_CHAR);if (ipAndPort.length != 2) {throw new IllegalArgumentException("endpoint format should like ip:port");}inetSocketAddresses.add(new InetSocketAddress(ipAndPort[0], Integer.parseInt(ipAndPort[1])));}return inetSocketAddresses;}...
}
(2)Seata客户端发起与服务端的连接
Netty长连接管理器NettyClientChannelManager的acquireChannel()方法会尝试获取连接。如果没有存活的连接,则会在获取到锁之后通过NettyClientChannelManager的doConnect()方法来发起连接。注意:使用到了Apache的Common Pool公共对象池来管理发起的连接。
//Netty client pool manager. Netty的网络连接管理器
class NettyClientChannelManager {...//Reconnect to remote server of current transaction service group.void reconnect(String transactionServiceGroup) {List<String> availList = null;try {//根据事务服务分组获取到Seata Server的地址列表//比如根据事务服务分组名称到file.conf里去找,找到映射的名字如default//然后根据default找到Seata Server的地址列表availList = getAvailServerList(transactionServiceGroup);} catch (Exception e) {LOGGER.error("Failed to get available servers: {}", e.getMessage(), e);return;}//availList一般都不会为空if (CollectionUtils.isEmpty(availList)) {RegistryService registryService = RegistryFactory.getInstance();String clusterName = registryService.getServiceGroup(transactionServiceGroup);if (StringUtils.isBlank(clusterName)) {LOGGER.error("can not get cluster name in registry config '{}{}', please make sure registry config correct", ConfigurationKeys.SERVICE_GROUP_MAPPING_PREFIX, transactionServiceGroup);return;}if (!(registryService instanceof FileRegistryServiceImpl)) {LOGGER.error("no available service found in cluster '{}', please make sure registry config correct and keep your seata server running", clusterName);}return;}Set<String> channelAddress = new HashSet<>(availList.size());try {//尝试和每个Seata Server去建立一个长连接for (String serverAddress : availList) {try {acquireChannel(serverAddress);channelAddress.add(serverAddress);} catch (Exception e) {LOGGER.error("{} can not connect to {} cause:{}", FrameworkErrorCode.NetConnect.getErrCode(), serverAddress, e.getMessage(), e);}}} finally {if (CollectionUtils.isNotEmpty(channelAddress)) {List<InetSocketAddress> aliveAddress = new ArrayList<>(channelAddress.size());for (String address : channelAddress) {String[] array = address.split(":");aliveAddress.add(new InetSocketAddress(array[0], Integer.parseInt(array[1])));}RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, aliveAddress);} else {RegistryFactory.getInstance().refreshAliveLookup(transactionServiceGroup, Collections.emptyList());}}}//Acquire netty client channel connected to remote server.Channel acquireChannel(String serverAddress) {Channel channelToServer = channels.get(serverAddress);if (channelToServer != null) {channelToServer = getExistAliveChannel(channelToServer, serverAddress);if (channelToServer != null) {return channelToServer;}}if (LOGGER.isInfoEnabled()) {LOGGER.info("will connect to {}", serverAddress);}Object lockObj = CollectionUtils.computeIfAbsent(channelLocks, serverAddress, key -> new Object());//获取锁之后发起连接synchronized (lockObj) {return doConnect(serverAddress);}}...private final ConcurrentMap<String, Channel> channels = new ConcurrentHashMap<>();private Function<String, NettyPoolKey> poolKeyFunction;private final GenericKeyedObjectPool<NettyPoolKey, Channel> nettyClientKeyPool;...private Channel doConnect(String serverAddress) {Channel channelToServer = channels.get(serverAddress);if (channelToServer != null && channelToServer.isActive()) {return channelToServer;}Channel channelFromPool;try {NettyPoolKey currentPoolKey = poolKeyFunction.apply(serverAddress);if (currentPoolKey.getMessage() instanceof RegisterTMRequest) {poolKeyMap.put(serverAddress, currentPoolKey);} else {NettyPoolKey previousPoolKey = poolKeyMap.putIfAbsent(serverAddress, currentPoolKey);if (previousPoolKey != null && previousPoolKey.getMessage() instanceof RegisterRMRequest) {RegisterRMRequest registerRMRequest = (RegisterRMRequest) currentPoolKey.getMessage();((RegisterRMRequest) previousPoolKey.getMessage()).setResourceIds(registerRMRequest.getResourceIds());}}//发起连接,最终会调用到NettyPoolableFactory的makeObject()方法channelFromPool = nettyClientKeyPool.borrowObject(poolKeyMap.get(serverAddress));channels.put(serverAddress, channelFromPool);} catch (Exception exx) {LOGGER.error("{} register RM failed.", FrameworkErrorCode.RegisterRM.getErrCode(), exx);throw new FrameworkException("can not register RM,err:" + exx.getMessage());}return channelFromPool;}...
}public class NettyPoolableFactory implements KeyedPoolableObjectFactory<NettyPoolKey, Channel> {private final AbstractNettyRemotingClient rpcRemotingClient;private final NettyClientBootstrap clientBootstrap;public NettyPoolableFactory(AbstractNettyRemotingClient rpcRemotingClient, NettyClientBootstrap clientBootstrap) {this.rpcRemotingClient = rpcRemotingClient;this.clientBootstrap = clientBootstrap;}@Overridepublic Channel makeObject(NettyPoolKey key) {InetSocketAddress address = NetUtil.toInetSocketAddress(key.getAddress());if (LOGGER.isInfoEnabled()) {LOGGER.info("NettyPool create channel to " + key);}Channel tmpChannel = clientBootstrap.getNewChannel(address);long start = System.currentTimeMillis();Object response;Channel channelToServer = null;if (key.getMessage() == null) {throw new FrameworkException("register msg is null, role:" + key.getTransactionRole().name());}try {response = rpcRemotingClient.sendSyncRequest(tmpChannel, key.getMessage());if (!isRegisterSuccess(response, key.getTransactionRole())) {rpcRemotingClient.onRegisterMsgFail(key.getAddress(), tmpChannel, response, key.getMessage());} else {channelToServer = tmpChannel;rpcRemotingClient.onRegisterMsgSuccess(key.getAddress(), tmpChannel, response, key.getMessage());}} catch (Exception exx) {if (tmpChannel != null) {tmpChannel.close();}throw new FrameworkException("register " + key.getTransactionRole().name() + " error, errMsg:" + exx.getMessage());}if (LOGGER.isInfoEnabled()) {LOGGER.info("register success, cost " + (System.currentTimeMillis() - start) + " ms, version:" + getVersion(response, key.getTransactionRole()) + ",role:" + key.getTransactionRole().name() + ",channel:" + channelToServer);}return channelToServer;}...
}
11.RM分支事务资源管理器客户端初始化的源码
RmNettyRemotingClient初始化时,会注入一个DefaultResourceManager实例以便可以获取根据SPI机制加载的资源管理器,以及注入一个DefaultRMHandler实例以便可以获取根据SPI机制加载的事务消息处理器。
public class RMClient {public static void init(String applicationId, String transactionServiceGroup) {RmNettyRemotingClient rmNettyRemotingClient = RmNettyRemotingClient.getInstance(applicationId, transactionServiceGroup);rmNettyRemotingClient.setResourceManager(DefaultResourceManager.get());rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());rmNettyRemotingClient.init();}
}public class DefaultResourceManager implements ResourceManager {protected static Map<BranchType, ResourceManager> resourceManagers = new ConcurrentHashMap<>();private static class SingletonHolder {private static DefaultResourceManager INSTANCE = new DefaultResourceManager();}public static DefaultResourceManager get() {return SingletonHolder.INSTANCE;}private DefaultResourceManager() {initResourceManagers();}protected void initResourceManagers() {//通过SPI加载所有的ResourceManager资源管理器//比如:DataSourceManager、TCCResourceManager、SagaResourceManager、ResourceManagerXAList<ResourceManager> allResourceManagers = EnhancedServiceLoader.loadAll(ResourceManager.class);if (CollectionUtils.isNotEmpty(allResourceManagers)) {for (ResourceManager rm : allResourceManagers) {resourceManagers.put(rm.getBranchType(), rm); }}}...
}public class DefaultRMHandler extends AbstractRMHandler {protected static Map<BranchType, AbstractRMHandler> allRMHandlersMap = new ConcurrentHashMap<>();private static class SingletonHolder {private static AbstractRMHandler INSTANCE = new DefaultRMHandler();}public static AbstractRMHandler get() {return DefaultRMHandler.SingletonHolder.INSTANCE;}protected DefaultRMHandler() {initRMHandlers();}protected void initRMHandlers() {//通过SPI加载所有的RMHandler事务消息处理器//比如:RMHandlerAT、RMHandlerTCC、RMHandlerSaga、RMHandlerXAList<AbstractRMHandler> allRMHandlers = EnhancedServiceLoader.loadAll(AbstractRMHandler.class);if (CollectionUtils.isNotEmpty(allRMHandlers)) {for (AbstractRMHandler rmHandler : allRMHandlers) {allRMHandlersMap.put(rmHandler.getBranchType(), rmHandler);}}}...
}public final class RmNettyRemotingClient extends AbstractNettyRemotingClient {private final AtomicBoolean initialized = new AtomicBoolean(false);private ResourceManager resourceManager;...@Overridepublic void init() {//registry processor,注册一些请求处理器registerProcessor();if (initialized.compareAndSet(false, true)) {//和TmNettyRemotingClient.init()的一样super.init();if (resourceManager != null && !resourceManager.getManagedResources().isEmpty() && StringUtils.isNotBlank(transactionServiceGroup)) {//和TmNettyRemotingClient.init()的一样getClientChannelManager().reconnect(transactionServiceGroup);}}}private void registerProcessor() {//1.registry rm client handle branch commit processorRmBranchCommitProcessor rmBranchCommitProcessor = new RmBranchCommitProcessor(getTransactionMessageHandler(), this);super.registerProcessor(MessageType.TYPE_BRANCH_COMMIT, rmBranchCommitProcessor, messageExecutor);//2.registry rm client handle branch rollback processorRmBranchRollbackProcessor rmBranchRollbackProcessor = new RmBranchRollbackProcessor(getTransactionMessageHandler(), this);super.registerProcessor(MessageType.TYPE_BRANCH_ROLLBACK, rmBranchRollbackProcessor, messageExecutor);//3.registry rm handler undo log processorRmUndoLogProcessor rmUndoLogProcessor = new RmUndoLogProcessor(getTransactionMessageHandler());super.registerProcessor(MessageType.TYPE_RM_DELETE_UNDOLOG, rmUndoLogProcessor, messageExecutor);//4.registry TC response processorClientOnResponseProcessor onResponseProcessor = new ClientOnResponseProcessor(mergeMsgMap, super.getFutures(), getTransactionMessageHandler());super.registerProcessor(MessageType.TYPE_SEATA_MERGE_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BRANCH_REGISTER_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BRANCH_STATUS_REPORT_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_GLOBAL_LOCK_QUERY_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_REG_RM_RESULT, onResponseProcessor, null);super.registerProcessor(MessageType.TYPE_BATCH_RESULT_MSG, onResponseProcessor, null);//5.registry heartbeat message processorClientHeartbeatProcessor clientHeartbeatProcessor = new ClientHeartbeatProcessor();super.registerProcessor(MessageType.TYPE_HEARTBEAT_MSG, clientHeartbeatProcessor, null);}...
}
12.全局事务注解扫描器扫描Bean是否有Seata注解
由于GlobalTransactionScanner继承自Spring的AbstractAutoProxyCreator,所以Spring会把Spring Bean传递给GlobalTransactionScanner进行判断,也就是让GlobalTransactionScanner重写的wrapIfNecessary()方法进行判断。
重写的wrapIfNecessary()方法会判断传递过来的Bean的Class或方法上是否添加了Seata的注解,从而决定是否需要针对Bean的Class创建动态代理,从而实现对添加了Seata的注解的方法进行拦截。
对传入的Bean创建动态代理时,是通过调用其继承的父类Spring的AbstractAutoProxyCreator的wrapIfNecessary()方法进行创建的。
这些Seata的注解包括:@GlobalTransactional、@GlobalLock、@TwoPhaseBusinessAction、@LocalTCC。
//AbstractAutoProxyCreator:Spring的动态代理自动创建者
//ConfigurationChangeListener:关注配置变更事件的监听器
//InitializingBean:Spring Bean初始化回调
//ApplicationContextAware:用来获取Spring容器
//DisposableBean:支持可抛弃Bean
public class GlobalTransactionScanner extends AbstractAutoProxyCreatorimplements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {...//Spring AOP里对方法进行拦截的拦截器private MethodInterceptor interceptor;//对添加了@GlobalTransactional注解的方法进行拦截的AOP拦截器private MethodInterceptor globalTransactionalInterceptor;...//The following will be scanned, and added corresponding interceptor://添加了如下注解的方法会被扫描到,然后方法会添加相应的拦截器进行拦截//TM://@see io.seata.spring.annotation.GlobalTransactional // TM annotation//Corresponding interceptor://@see io.seata.spring.annotation.GlobalTransactionalInterceptor#handleGlobalTransaction(MethodInvocation, AspectTransactional) // TM handler//GlobalLock://@see io.seata.spring.annotation.GlobalLock // GlobalLock annotation//Corresponding interceptor://@see io.seata.spring.annotation.GlobalTransactionalInterceptor# handleGlobalLock(MethodInvocation, GlobalLock) // GlobalLock handler//TCC mode://@see io.seata.rm.tcc.api.LocalTCC // TCC annotation on interface//@see io.seata.rm.tcc.api.TwoPhaseBusinessAction // TCC annotation on try method//@see io.seata.rm.tcc.remoting.RemotingParser // Remote TCC service parser//Corresponding interceptor://@see io.seata.spring.tcc.TccActionInterceptor // the interceptor of TCC mode@Override//由于GlobalTransactionScanner继承了Spring提供的AbstractAutoProxyCreator,//所以Spring会把Spring Bean传递给GlobalTransactionScanner.wrapIfNecessary()进行判断;//让GlobalTransactionScanner来决定是否要根据Bean的Class、或者Bean的方法是否有上述注解,//从而决定是否需要针对Bean的Class创建动态代理,来对添加了注解的方法进行拦截;protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {//do checkersif (!doCheckers(bean, beanName)) {return bean;}try {synchronized (PROXYED_SET) {if (PROXYED_SET.contains(beanName)) {return bean;}interceptor = null;//check TCC proxy//判断传递进来的Bean是否是TCC动态代理//服务启动时会把所有的Bean传递进来这个wrapIfNecessary(),检查这个Bean是否需要创建TCC动态代理if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {//init tcc fence clean task if enable useTccFenceTCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCCinterceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor);} else {//获取目标class的接口Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);//existsAnnotation()方法会判断Bean的Class或Method是否添加了@GlobalTransactional等注解if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) {return bean;}if (globalTransactionalInterceptor == null) {//创建一个GlobalTransactionalInterceptor,即全局事务注解的拦截器globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor);}interceptor = globalTransactionalInterceptor;}LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());if (!AopUtils.isAopProxy(bean)) {//如果这个Bean并不是AOP代理//接下来会基于Spring的AbstractAutoProxyCreator创建针对目标Bean接口的动态代理//这样后续调用到目标Bean的方法,就会调用到GlobalTransactionInterceptor拦截器bean = super.wrapIfNecessary(bean, beanName, cacheKey);} else {AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));int pos;for (Advisor avr : advisor) {//Find the position based on the advisor's order, and add to advisors by pospos = findAddSeataAdvisorPosition(advised, avr);advised.addAdvisor(pos, avr);}}PROXYED_SET.add(beanName);return bean;}} catch (Exception exx) {throw new RuntimeException(exx);}}...private boolean existsAnnotation(Class<?>[] classes) {if (CollectionUtils.isNotEmpty(classes)) {for (Class<?> clazz : classes) {if (clazz == null) {continue;}//目标class是否被打了@GlobalTransactional注解GlobalTransactional trxAnno = clazz.getAnnotation(GlobalTransactional.class);if (trxAnno != null) {return true;}//检查目标Spring Bean的各个方法,通过反射拿到添加了注解的一个方法Method[] methods = clazz.getMethods();for (Method method : methods) {//如果方法上被加了如@GlobalTransactional注解,则返回truetrxAnno = method.getAnnotation(GlobalTransactional.class);if (trxAnno != null) {return true;}GlobalLock lockAnno = method.getAnnotation(GlobalLock.class);if (lockAnno != null) {return true;}}}}return false;}...
}
13.Seata全局事务拦截器的创建和初始化
如果传入GlobalTransactionScanner全局事务注解扫描器的wrapIfNecessary()方法的Bean,添加了比如@GlobalTransactional的全局事务注解,那么wrapIfNecessary()方法就会创建一个全局事务注解拦截器GlobalTransactionalInterceptor。
这个全局事务注解拦截器会被存放在GlobalTransactionScanner实例里的两个变量中:interceptor和globalTransactionalInterceptor。
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor {...public GlobalTransactionalInterceptor(FailureHandler failureHandler) {this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler;this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, DEFAULT_DISABLE_GLOBAL_TRANSACTION);this.order = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.TM_INTERCEPTOR_ORDER, TM_INTERCEPTOR_ORDER);degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK, DEFAULT_TM_DEGRADE_CHECK);if (degradeCheck) {ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this);degradeCheckPeriod = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD);degradeCheckAllowTimes = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES);EVENT_BUS.register(this);if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) {startDegradeCheck();}}this.initDefaultGlobalTransactionTimeout();}...
}
14.基于Spring AOP创建全局事务动态代理的源码
全局事务注解扫描器GlobalTransactionScanner的wrapIfNecessary()方法,发现传入的Bean含有Seata的注解,需要为该Bean创建动态代理时,会调用父类Spring的AbstractAutoProxyCreator的wrapIfNecessary()方法来创建。
AbstractAutoProxyCreator的wrapIfNecessary()方法,会通过子类GlobalTransactionScanner的getAdvicesAndAdvisorsForBean()方法,获取在GlobalTransactionScanner的wrapIfNecessary()方法中构建的拦截器(也就是全局事务注解的拦截器GlobalTransactionalInterceptor),然后创建传入的Bean的动态代理。
这样后续调用到传入Bean的方法时,就会先调用GlobalTransactionInterceptor拦截器。
//关注配置变更事件的监听器、Spring Bean初始化回调、感知到Spring容器、支持可抛弃Bean
public class GlobalTransactionScanner extends AbstractAutoProxyCreatorimplements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean {... //Spring AOP里对方法进行拦截的拦截器private MethodInterceptor interceptor;@Override//由于GlobalTransactionScanner继承了Spring提供的AbstractAutoProxyCreator,//所以Spring会把Spring Bean传递给GlobalTransactionScanner.wrapIfNecessary()进行判断;//让GlobalTransactionScanner来决定是否要根据Bean的Class、或者Bean的方法是否有上述注解,//从而决定是否需要针对Bean的Class创建动态代理,来对添加了注解的方法进行拦截;protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {if (!doCheckers(bean, beanName)) {return bean;}try {synchronized (PROXYED_SET) {if (PROXYED_SET.contains(beanName)) {return bean;}interceptor = null;//check TCC proxy//判断传递进来的Bean是否是TCC动态代理//服务启动时会把所有的Bean传递进来这个wrapIfNecessary(),检查这个Bean是否需要创建TCC动态代理if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {//init tcc fence clean task if enable useTccFenceTCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCCinterceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor);} else {//获取目标class的接口Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);//existsAnnotation()方法会判断Bean的Class或Method是否添加了@GlobalTransactional等注解if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) {return bean;}if (globalTransactionalInterceptor == null) {//构建一个GlobalTransactionalInterceptor,即全局事务注解的拦截器globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor);}interceptor = globalTransactionalInterceptor;}LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());if (!AopUtils.isAopProxy(bean)) {//如果这个Bean并不是AOP代理//接下来会基于Spring的AbstractAutoProxyCreator创建针对目标Bean接口的动态代理//这样后续调用到目标Bean的方法,就会调用到GlobalTransactionInterceptor拦截器bean = super.wrapIfNecessary(bean, beanName, cacheKey);} else {AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));int pos;for (Advisor avr : advisor) {// Find the position based on the advisor's order, and add to advisors by pospos = findAddSeataAdvisorPosition(advised, avr);advised.addAdvisor(pos, avr);}}PROXYED_SET.add(beanName);return bean;}} catch (Exception exx) {throw new RuntimeException(exx);}}//获取指定的拦截器@Overrideprotected Object[] getAdvicesAndAdvisorsForBean(Class beanClass, String beanName, TargetSource customTargetSource) throws BeansException {return new Object[]{interceptor};}...
}public abstract class AbstractAutoProxyCreator extends ProxyProcessorSupportimplements SmartInstantiationAwareBeanPostProcessor, BeanFactoryAware {...protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {if (StringUtils.hasLength(beanName) && this.targetSourcedBeans.contains(beanName)) {return bean;}if (Boolean.FALSE.equals(this.advisedBeans.get(cacheKey))) {return bean;}if (isInfrastructureClass(bean.getClass()) || shouldSkip(bean.getClass(), beanName)) {this.advisedBeans.put(cacheKey, Boolean.FALSE);return bean;}// Create proxy if we have advice. 获取指定的拦截器Object[] specificInterceptors = getAdvicesAndAdvisorsForBean(bean.getClass(), beanName, null);if (specificInterceptors != DO_NOT_PROXY) {this.advisedBeans.put(cacheKey, Boolean.TRUE);//创建动态代理Object proxy = createProxy(bean.getClass(), beanName, specificInterceptors, new SingletonTargetSource(bean));this.proxyTypes.put(cacheKey, proxy.getClass());return proxy;}this.advisedBeans.put(cacheKey, Boolean.FALSE);return bean;}//获取指定的拦截器protected abstract Object[] getAdvicesAndAdvisorsForBean(Class<?> beanClass, String beanName, @Nullable TargetSource customTargetSource) throws BeansException;...
}
15.全局事务注解扫描器的初始化总结
全局事务注解扫描器GlobalTransactionScanner的初始化主要做了如下三项工作:
一.初始化TM全局事务管理器客户端
二.初始化RM分支事务资源管理器客户端
三.对添加了Seata相关注解的Bean创建全局事务动态代理