Seata源码—5.全局事务的创建与返回处理一
大纲
1.Seata开启分布式事务的流程总结
2.Seata生成全局事务ID的雪花算法源码
3.生成xid以及对全局事务会话进行持久化的源码
4.全局事务会话数据持久化的实现源码
5.Seata Server创建全局事务与返回xid的源码
6.Client获取Server的响应与处理的源码
7.Seata与Dubbo整合的过滤器源码
1.Seata开启分布式事务的流程总结
(1)Seata分布式事务执行流程
(2)开启一个全局事务的流程
(1)Seata分布式事务执行流程
Seata Client在执行添加了全局事务注解@GlobalTransactional的方法时,实际执行的是根据全局事务拦截器创建该方法所在Bean的动态代理方法,于是会执行GlobalTransactionalInterceptor的invoke()方法。此时,添加了全局事务注解@GlobalTransactional的方法就会被全局事务拦截器拦截了。
GlobalTransactionalInterceptor全局事务拦截器拦截目标方法的调用后,会由事务执行模版TransactionalTemplate的excute()方法来执行目标方法。
在事务执行模版TransactionalTemplate的excute()方法中,首先会判断Propagation全局事务传播级别,然后开启一个全局事务(也就是打开一个全局事务),接着才执行具体的业务目标方法。
执行具体的业务目标方法时,会通过Dubbo的RPC调用来传递全局事务的xid给其他的Seata Client。其他的Seata Client通过Dubbo过滤器获取到RPC调用中的xid后,会将xid放入线程本地变量副本中。之后执行SQL时就会获取数据库连接代理来对SQL进行拦截,数据库连接代理就可以从线程本地变量副本中获取xid,然后开启分支事务。
各个分支事务都执行完毕后,开启全局事务的Seata Client就会提交事务、处理全局锁、资源清理。
(2)开启一个全局事务的流程
Seata Server收到Seata Client发送过来的RpcMessage对象消息后,RpcMessage对象消息首先会由ServerOnRequestProcessor的process()方法处理,然后会由DefaultCoordinator的onRequest()方法进行处理,接着会由GlobalBeginRequest的handle()方法进行处理,然后会由DefaultCoordinator的doGlobalBegin()方法来处理,最后给到DefaultCore的begin()方法来进行处理。
在DefaultCore的begin()方法中,首先就会创建一个全局事务会话,然后将全局事务会话的xid通过MDC放入线程本地变量副本中,接着对该全局事务会话添加一个全局事务会话的生命周期监听器,最后打开该全局事务会话、发布会话开启事件并返回全局事务会话的xid。
在创建一个全局事务会话GlobalSession时,首先会由uuid生成组件UUIDGenerator来生成全局事务id(transactionId),然后根据生成的全局事务id(transactionId)来继续生成xid。
2.Seata生成全局事务ID的雪花算法源码
(1)通过UUIDGenerator生成全局事务ID
(2)IdWorker实现的雪花算法生成的ID的组成
(3)IdWorker实现的雪花算法对时钟回拨的处理
(1)通过UUIDGenerator生成全局事务ID
Seata在创建全局事务会话时会通过UUIDGenerator来生成全局事务ID,UUIDGenerator在生成ID时是通过Seata自己实现的雪花算法来生成的。
public class GlobalSession implements SessionLifecycle, SessionStorable {...//创建全局事务会话public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName, int timeout) {GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout, false);return session;}public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout, boolean lazyLoadBranch) {//全局事务id是通过UUIDGenerator来生成的this.transactionId = UUIDGenerator.generateUUID();this.status = GlobalStatus.Begin;this.lazyLoadBranch = lazyLoadBranch;if (!lazyLoadBranch) {this.branchSessions = new ArrayList<>();}this.applicationId = applicationId;this.transactionServiceGroup = transactionServiceGroup;this.transactionName = transactionName;this.timeout = timeout;//根据UUIDGenerator生成的transactionId + XID工具生成最终的xidthis.xid = XID.generateXID(transactionId);}...
}public class UUIDGenerator {private static volatile IdWorker idWorker;//generate UUID using snowflake algorithmpublic static long generateUUID() {//Double Check + volatile,实现并发场景下只创建一次idWorker对象if (idWorker == null) {synchronized (UUIDGenerator.class) {if (idWorker == null) {init(null);}}}//正常情况下,每次都会通过idWorker生成一个idreturn idWorker.nextId();}//init IdWorkerpublic static void init(Long serverNode) {idWorker = new IdWorker(serverNode);}
}
(2)IdWorker实现的雪花算法生成的ID的组成
IdWorker就是Seata自己实现的基于雪花算法的ID生成器。IdWorker的nextId()方法通过雪花算法生成的transactionId一共是64位,用64个bit拼接出一个唯一的ID。
一.最高位始终是0,占1个bit
二.接着的10个bit是workerId
一台机器就是一个worker,每个worker都会有一个自己的workerId。生成workerId时,是基于本机网络地址里的Mac地址来生成的。
三.接着的41个bit是时间戳
表示可以为某台机器的每一毫秒,分配一个自增长的ID。毫秒时间戳有13位数,转换为2进制需要2的41次方。
四.最后的12个bit是序列号
如果一台机器在一毫秒内需要为很多线程生成ID,就可以通过自增长的12个bit的Sequence为每个线程分配ID。
(3)IdWorker实现的雪花算法对时钟回拨的处理
在执行IdWorker的nextId()方法时,会对包含序列号和时间戳的timestampAndSequence进行累加,也就是对timestampAndSequence的某一个毫秒内的Sequence序列号进行累加。
如果出现大量的线程并发获取ID,此时可能会导致timestampAndSequence中某一个毫秒内的Sequence序列号快速累加,并且将代表Sequence序列号的12个bit全部累加完毕,最后便会导致包含序列号和时间戳的timestampAndSequence中的毫秒时间戳也进行累加。
但当前的实际时间其实还是这一毫秒,而timestampAndSequence里的毫秒时间戳已经累加到下一个毫秒去了,出现时钟回拨问题,于是就需要调用waitIfNecessary()方法进行处理。
所以,在IdWorker的waitIfNecessary()方法中,如果获取ID的QPS过高,导致当前时间戳对应的Sequence序列号被耗尽,那么就需要阻塞当前线程5毫秒。
//IdWorker就是Seata自己实现的基于雪花算法的ID生成器
public class IdWorker {private final long twepoch = 1588435200000L;//Start time cut (2020-05-03)private final int workerIdBits = 10;//The number of bits occupied by workerIdprivate final int timestampBits = 41;//The number of bits occupied by timestampprivate final int sequenceBits = 12;//The number of bits occupied by sequenceprivate final int maxWorkerId = ~(-1 << workerIdBits);//Maximum supported machine id, the result is 1023//business meaning: machine ID (0 ~ 1023)//actual layout in memory://highest 1 bit: 0//middle 10 bit: workerId//lowest 53 bit: all 0private long workerId;//timestampAndSequence是64位的、支持CAS操作的Long型的、包含了Sequence序列号的时间戳//它的最高位是11个bit,没有使用//中间有41个bit,是时间戳//最低位有12个bit,是序列号//timestampAndSequence可以认为是把时间戳和序列号混合在了一个long型数字里//timestamp and sequence mix in one Long//highest 11 bit: not used//middle 41 bit: timestamp//lowest 12 bit: sequenceprivate AtomicLong timestampAndSequence;//mask that help to extract timestamp and sequence from a long//可以帮忙从一个long数字里提取出一个包含Sequence序列号的时间戳private final long timestampAndSequenceMask = ~(-1L << (timestampBits + sequenceBits));//instantiate an IdWorker using given workerIdpublic IdWorker(Long workerId) {//初始化timestampAndSequenceinitTimestampAndSequence();//初始化workerIdinitWorkerId(workerId);}//init first timestamp and sequence immediatelyprivate void initTimestampAndSequence() {//获取相对于twepoch的最新时间戳long timestamp = getNewestTimestamp();//将最新时间戳和sequenceBits进行位运算(左移),从而得到一个混合了sequence的时间戳long timestampWithSequence = timestamp << sequenceBits;//把混合了sequence的时间戳,赋值给timestampAndSequencethis.timestampAndSequence = new AtomicLong(timestampWithSequence);}//init workerIdprivate void initWorkerId(Long workerId) {if (workerId == null) {workerId = generateWorkerId();}if (workerId > maxWorkerId || workerId < 0) {String message = String.format("worker Id can't be greater than %d or less than 0", maxWorkerId);throw new IllegalArgumentException(message);}//将workerId与timestampBits+sequenceBits的和进行位运算(左移),获取一个workerIdthis.workerId = workerId << (timestampBits + sequenceBits);}//通过snowflake雪花算法来生成transactionId//一共是64位,用64个bit拼接出一个唯一的ID,最高位始终是0,占1个bit//接着的10个bit是workerId,一台机器就是一个worker,每个worker都会有一个自己的workerId//接着的41个bit是时间戳,表示可以为某台机器的每一毫秒,分配一个自增长的id,毫秒时间戳有13位数,转换为2进制就需要2的41次方,2的20次方是一个7位数的数字//最后的12个bit是序列号,如果一台机器在一毫秒内需要为很多线程生成id,就可以通过自增长的12个bit的Sequence为每个线程分配id//get next UUID(base on snowflake algorithm), which look like://highest 1 bit: always 0//next 10 bit: workerId//next 41 bit: timestamp//lowest 12 bit: sequencepublic long nextId() {waitIfNecessary();//对包含Sequence序列号的时间戳timestampAndSequence进行累加,也就是对timestampAndSequence的某一个毫秒内的Sequence进行累加//如果出现大量的线程并发获取id,此时可能会导致timestampAndSequence的某一个毫秒内的Sequence快速累加,并且将12个bit全部累加完毕//最终导致timestampAndSequence的毫秒时间戳也进行累加了//但当前的实际时间其实还是这一毫秒,而timestampAndSequence里的毫秒时间戳已经累加到下一个毫秒去了,于是就需要waitIfNecessary()进行处理long next = timestampAndSequence.incrementAndGet();//把最新的包含Sequence序列号的时间戳next与timestampAndSequenceMask进行位运算,获取真正的包含Sequence序列号的时间戳timestampWithSequencelong timestampWithSequence = next & timestampAndSequenceMask;//对包含Sequence序列号的时间戳与workerId通过位运算拼接在一起return workerId | timestampWithSequence;}//block current thread if the QPS of acquiring UUID is too high that current sequence space is exhausted//如果获取UUID的QPS过高,导致当前时间戳对应的Sequence序列号被耗尽了,那么就需要阻塞当前线程5毫秒private void waitIfNecessary() {//先获取包含Sequence序列号的当前时间戳long currentWithSequence = timestampAndSequence.get();//将currentWithSequence与sequenceBits进行位运算(右移),获取到当前时间戳long current = currentWithSequence >>> sequenceBits;//获取相对于twepoch的最新时间戳long newest = getNewestTimestamp();//如果当前的时间戳大于最新的时间戳,说明获取UUID的QPS过高,导致timestampAndSequence增长太快了(出现时钟回拨问题)if (current >= newest) {try {//如果获取UUID的QPS过高,导致当前时间戳对应的Sequence序列号被耗尽了,那么就需要阻塞当前线程5毫秒Thread.sleep(5);} catch (InterruptedException ignore) {//don't care}}}//get newest timestamp relative to twepochprivate long getNewestTimestamp() {//通过当前毫秒单位的时间戳 减去 一个固定的时间twepoch,得到的就是相对于twepoch的最新时间戳return System.currentTimeMillis() - twepoch;}//auto generate workerId, try using mac first, if failed, then randomly generate oneprivate long generateWorkerId() {try {//生成一个workerId,默认是基于网络的Mac地址来生成的return generateWorkerIdBaseOnMac();} catch (Exception e) {return generateRandomWorkerId();}}//use lowest 10 bit of available MAC as workerIdprivate long generateWorkerIdBaseOnMac() throws Exception {//获取所有的网络接口Enumeration<NetworkInterface> all = NetworkInterface.getNetworkInterfaces();//遍历每一个网络接口while (all.hasMoreElements()) {NetworkInterface networkInterface = all.nextElement();boolean isLoopback = networkInterface.isLoopback();boolean isVirtual = networkInterface.isVirtual();//如果是虚拟的、回环的地址,那么这个地址就跳过,不能使用if (isLoopback || isVirtual) {continue;}//获取本机网络地址里的Mac地址,基于Mac地址来生成一个workeridbyte[] mac = networkInterface.getHardwareAddress();return ((mac[4] & 0B11) << 8) | (mac[5] & 0xFF);}throw new RuntimeException("no available mac found");}//randomly generate one as workerIdprivate long generateRandomWorkerId() {return new Random().nextInt(maxWorkerId + 1);}
}
3.生成xid以及对全局事务会话进行持久化的源码
(1)根据全局事务ID生成xid
(2)全局事务会话的持久化
(1)根据全局事务ID生成xid
xid是通过ip:port:transactionId拼接出来的。
public class XID {private static int port;private static String ipAddress;...//Generate xid string.public static String generateXID(long tranId) {//首先获取当前机器的IP地址//然后拼接上一个冒号、接着拼接一个端口号、再拼接一个冒号//最后再拼接事务id,以此来生成xid//所以xid是通过ip:port:transactionId拼接出来的return new StringBuilder().append(ipAddress).append(IP_PORT_SPLIT_CHAR).append(port).append(IP_PORT_SPLIT_CHAR).append(tranId).toString();}...
}
(2)全局事务会话的持久化
public class DefaultCore implements Core {...@Overridepublic String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException {//创建一个全局事务会话GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);//通过slf4j的MDC把xid放入线程本地变量副本里去MDC.put(RootContext.MDC_KEY_XID, session.getXid());//添加一个全局事务会话的生命周期监听器session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());//打开Session,其中会对全局事务会话进行持久化session.begin();//transaction start event,发布会话开启事件MetricsPublisher.postSessionDoingEvent(session, false);//返回全局事务会话的xidreturn session.getXid();}...
}public class GlobalSession implements SessionLifecycle, SessionStorable {...@Overridepublic void begin() throws TransactionException {this.status = GlobalStatus.Begin;this.beginTime = System.currentTimeMillis();this.active = true;for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {lifecycleListener.onBegin(this);}}...
}public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {...@Overridepublic void onBegin(GlobalSession globalSession) throws TransactionException {addGlobalSession(globalSession);}@Overridepublic void addGlobalSession(GlobalSession session) throws TransactionException {if (LOGGER.isDebugEnabled()) {LOGGER.debug("MANAGER[{}] SESSION[{}] {}", name, session, LogOperation.GLOBAL_ADD);}writeSession(LogOperation.GLOBAL_ADD, session);}private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {//transactionStoreManager.writeSession()会对全局事务会话进行持久化if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {if (LogOperation.GLOBAL_ADD.equals(logOperation)) {throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to store global session");} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to update global session");} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to remove global session");} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to store branch session");} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to update branch session");} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to remove branch session");} else {throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession, "Unknown LogOperation:" + logOperation.name());}}}...
}
4.全局事务会话数据持久化的实现源码
(1)全局事务会话数据的持久化流程
(2)将全局事务会话持久化到MySQL数据库的实现
(3)将全局事务会话持久化到File文件的实现
(4)将全局事务会话持久化到Redis存储的实现
(1)全局事务会话数据的持久化流程
创建全局事务会话时,会通过雪花算法生成全局事务ID即transactionId,然后通过transactionId按照"ip:port:transactionId"格式生成xid。
创建完全局事务会话之后,就会添加一个全局事务会话的生命周期监听器,然后就会调用GlobalSession的begin()方法开启会话。
在GlobalSession的begin()方法中,会调用全局事务会话生命周期监听器的onBegin()方法,也就是调用SessionLifecycleListener的onBegin()方法。
接着就会由AbstractSessionManager对全局事务会话进行管理,将GlobalSession添加到SessionManager会话管理器中,也就是调用transactionStoreManager的writeSession()方法,对全局事务会话进行持久化。
默认情况下,会通过数据库进行持久化,也就是调用DataBaseTransactionStoreManager数据库事务存储管理器的writeSession()方法,将全局事务会话存储到数据库中。
当然Seata提供了三种方式来对全局事务会话进行持久化,分别是数据库存储、文件存储和Redis存储。
(2)将全局事务会话持久化到MySQL数据库的实现
//The type Database transaction store manager.
public class DataBaseTransactionStoreManager extends AbstractTransactionStoreManager implements TransactionStoreManager {private static volatile DataBaseTransactionStoreManager instance;protected LogStore logStore;...//Get the instance.public static DataBaseTransactionStoreManager getInstance() {if (instance == null) {synchronized (DataBaseTransactionStoreManager.class) {if (instance == null) {instance = new DataBaseTransactionStoreManager();}}}return instance;}//Instantiates a new Database transaction store manager.private DataBaseTransactionStoreManager() {logQueryLimit = CONFIG.getInt(ConfigurationKeys.STORE_DB_LOG_QUERY_LIMIT, DEFAULT_LOG_QUERY_LIMIT);String datasourceType = CONFIG.getConfig(ConfigurationKeys.STORE_DB_DATASOURCE_TYPE);//init dataSource,通过SPI机制加载DataSourceProviderDataSource logStoreDataSource = EnhancedServiceLoader.load(DataSourceProvider.class, datasourceType).provide();logStore = new LogStoreDataBaseDAO(logStoreDataSource);}@Overridepublic boolean writeSession(LogOperation logOperation, SessionStorable session) {if (LogOperation.GLOBAL_ADD.equals(logOperation)) {return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));} else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));} else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));} else if (LogOperation.BRANCH_ADD.equals(logOperation)) {return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));} else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));} else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));} else {throw new StoreException("Unknown LogOperation:" + logOperation.name());}}...
}public class LogStoreDataBaseDAO implements LogStore {protected DataSource logStoreDataSource = null;protected String globalTable;protected String branchTable;private String dbType;...public LogStoreDataBaseDAO(DataSource logStoreDataSource) {this.logStoreDataSource = logStoreDataSource;globalTable = CONFIG.getConfig(ConfigurationKeys.STORE_DB_GLOBAL_TABLE, DEFAULT_STORE_DB_GLOBAL_TABLE);branchTable = CONFIG.getConfig(ConfigurationKeys.STORE_DB_BRANCH_TABLE, DEFAULT_STORE_DB_BRANCH_TABLE);dbType = CONFIG.getConfig(ConfigurationKeys.STORE_DB_TYPE);if (StringUtils.isBlank(dbType)) {throw new StoreException("there must be db type.");}if (logStoreDataSource == null) {throw new StoreException("there must be logStoreDataSource.");}//init transaction_name sizeinitTransactionNameSize();}@Overridepublic boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);Connection conn = null;PreparedStatement ps = null;try {int index = 1;conn = logStoreDataSource.getConnection();conn.setAutoCommit(true);ps = conn.prepareStatement(sql);ps.setString(index++, globalTransactionDO.getXid());ps.setLong(index++, globalTransactionDO.getTransactionId());ps.setInt(index++, globalTransactionDO.getStatus());ps.setString(index++, globalTransactionDO.getApplicationId());ps.setString(index++, globalTransactionDO.getTransactionServiceGroup());String transactionName = globalTransactionDO.getTransactionName();transactionName = transactionName.length() > transactionNameColumnSize ? transactionName.substring(0, transactionNameColumnSize) : transactionName;ps.setString(index++, transactionName);ps.setInt(index++, globalTransactionDO.getTimeout());ps.setLong(index++, globalTransactionDO.getBeginTime());ps.setString(index++, globalTransactionDO.getApplicationData());return ps.executeUpdate() > 0;} catch (SQLException e) {throw new StoreException(e);} finally {IOUtil.close(ps, conn);}}...
}
(3)将全局事务会话持久化到File文件的实现
public class FileTransactionStoreManager extends AbstractTransactionStoreManager implements TransactionStoreManager, ReloadableStore {private ReentrantLock writeSessionLock = new ReentrantLock();...@Overridepublic boolean writeSession(LogOperation logOperation, SessionStorable session) {long curFileTrxNum;writeSessionLock.lock();try {if (!writeDataFile(new TransactionWriteStore(session, logOperation).encode())) {return false;}lastModifiedTime = System.currentTimeMillis();curFileTrxNum = FILE_TRX_NUM.incrementAndGet();if (curFileTrxNum % PER_FILE_BLOCK_SIZE == 0 && (System.currentTimeMillis() - trxStartTimeMills) > MAX_TRX_TIMEOUT_MILLS) {return saveHistory();}} catch (Exception exx) {LOGGER.error("writeSession error, {}", exx.getMessage(), exx);return false;} finally {writeSessionLock.unlock();}flushDisk(curFileTrxNum, currFileChannel);return true;}private boolean writeDataFile(byte[] bs) {if (bs == null || bs.length >= Integer.MAX_VALUE - 3) {return false;}if (!writeDataFrame(bs)) {return false;}return flushWriteBuffer(writeBuffer);}private boolean writeDataFrame(byte[] data) {if (data == null || data.length <= 0) {return true;}int dataLength = data.length;int bufferRemainingSize = writeBuffer.remaining();if (bufferRemainingSize <= INT_BYTE_SIZE) {if (!flushWriteBuffer(writeBuffer)) {return false;}}bufferRemainingSize = writeBuffer.remaining();if (bufferRemainingSize <= INT_BYTE_SIZE) {throw new IllegalStateException(String.format("Write buffer remaining size %d was too small", bufferRemainingSize));}writeBuffer.putInt(dataLength);bufferRemainingSize = writeBuffer.remaining();int dataPos = 0;while (dataPos < dataLength) {int dataLengthToWrite = dataLength - dataPos;dataLengthToWrite = Math.min(dataLengthToWrite, bufferRemainingSize);writeBuffer.put(data, dataPos, dataLengthToWrite);bufferRemainingSize = writeBuffer.remaining();if (bufferRemainingSize == 0) {if (!flushWriteBuffer(writeBuffer)) {return false;}bufferRemainingSize = writeBuffer.remaining();}dataPos += dataLengthToWrite;}return true;}private boolean flushWriteBuffer(ByteBuffer writeBuffer) {writeBuffer.flip();if (!writeDataFileByBuffer(writeBuffer)) {return false;}writeBuffer.clear();return true;}private void flushDisk(long curFileNum, FileChannel currFileChannel) {if (FLUSH_DISK_MODE == FlushDiskMode.SYNC_MODEL) {SyncFlushRequest syncFlushRequest = new SyncFlushRequest(curFileNum, currFileChannel);writeDataFileRunnable.putRequest(syncFlushRequest);syncFlushRequest.waitForFlush(MAX_WAIT_FOR_FLUSH_TIME_MILLS);} else {writeDataFileRunnable.putRequest(new AsyncFlushRequest(curFileNum, currFileChannel));}}...
}public class TransactionWriteStore implements SessionStorable {private SessionStorable sessionRequest;private LogOperation operate;public TransactionWriteStore(SessionStorable sessionRequest, LogOperation operate) {this.sessionRequest = sessionRequest;this.operate = operate;}@Overridepublic byte[] encode() {byte[] bySessionRequest = this.sessionRequest.encode();byte byOpCode = this.getOperate().getCode();int len = bySessionRequest.length + 1;byte[] byResult = new byte[len];ByteBuffer byteBuffer = ByteBuffer.wrap(byResult);byteBuffer.put(bySessionRequest);byteBuffer.put(byOpCode);return byResult;}...
}
(4)将全局事务会话持久化到Redis存储的实现
这里的实现比较优雅,十分值得借鉴。
public class RedisTransactionStoreManager extends AbstractTransactionStoreManager implements TransactionStoreManager {private static volatile RedisTransactionStoreManager instance;//Map for LogOperation Global Operationpublic static volatile ImmutableMap<LogOperation, Function<GlobalTransactionDO, Boolean>> globalMap;//Map for LogOperation Branch Operationpublic static volatile ImmutableMap<LogOperation, Function<BranchTransactionDO, Boolean>> branchMap;...public static RedisTransactionStoreManager getInstance() {if (instance == null) {synchronized (RedisTransactionStoreManager.class) {if (instance == null) {instance = new RedisTransactionStoreManager();}}}return instance;}public RedisTransactionStoreManager() {super();initGlobalMap();initBranchMap();logQueryLimit = CONFIG.getInt(STORE_REDIS_QUERY_LIMIT, DEFAULT_LOG_QUERY_LIMIT);if (logQueryLimit > DEFAULT_LOG_QUERY_LIMIT) {logQueryLimit = DEFAULT_LOG_QUERY_LIMIT;}}public void initGlobalMap() {if (CollectionUtils.isEmpty(branchMap)) {globalMap = ImmutableMap.<LogOperation, Function<GlobalTransactionDO, Boolean>>builder().put(LogOperation.GLOBAL_ADD, this::insertGlobalTransactionDO).put(LogOperation.GLOBAL_UPDATE, this::updateGlobalTransactionDO).put(LogOperation.GLOBAL_REMOVE, this::deleteGlobalTransactionDO).build();}}public void initBranchMap() {if (CollectionUtils.isEmpty(branchMap)) {branchMap = ImmutableMap.<LogOperation, Function<BranchTransactionDO, Boolean>>builder().put(LogOperation.BRANCH_ADD, this::insertBranchTransactionDO).put(LogOperation.BRANCH_UPDATE, this::updateBranchTransactionDO).put(LogOperation.BRANCH_REMOVE, this::deleteBranchTransactionDO).build();}}//Insert the global transaction.private boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {String globalKey = buildGlobalKeyByTransactionId(globalTransactionDO.getTransactionId());try (Jedis jedis = JedisPooledFactory.getJedisInstance(); Pipeline pipelined = jedis.pipelined()) {Date now = new Date();globalTransactionDO.setGmtCreate(now);globalTransactionDO.setGmtModified(now);pipelined.hmset(globalKey, BeanUtils.objectToMap(globalTransactionDO));pipelined.rpush(buildGlobalStatus(globalTransactionDO.getStatus()), globalTransactionDO.getXid());pipelined.sync();return true;} catch (Exception ex) {throw new RedisException(ex);}}//Insert branch transactionprivate boolean insertBranchTransactionDO(BranchTransactionDO branchTransactionDO) {String branchKey = buildBranchKey(branchTransactionDO.getBranchId());String branchListKey = buildBranchListKeyByXid(branchTransactionDO.getXid());try (Jedis jedis = JedisPooledFactory.getJedisInstance(); Pipeline pipelined = jedis.pipelined()) {Date now = new Date();branchTransactionDO.setGmtCreate(now);branchTransactionDO.setGmtModified(now);pipelined.hmset(branchKey, BeanUtils.objectToMap(branchTransactionDO));pipelined.rpush(branchListKey, branchKey); pipelined.sync();return true;} catch (Exception ex) {throw new RedisException(ex);}}@Overridepublic boolean writeSession(LogOperation logOperation, SessionStorable session) {if (globalMap.containsKey(logOperation) || branchMap.containsKey(logOperation)) {return globalMap.containsKey(logOperation) ?globalMap.get(logOperation).apply(SessionConverter.convertGlobalTransactionDO(session)) :branchMap.get(logOperation).apply(SessionConverter.convertBranchTransactionDO(session));} else {throw new StoreException("Unknown LogOperation:" + logOperation.name());}}...
}public class SessionConverter {...public static GlobalTransactionDO convertGlobalTransactionDO(SessionStorable session) {if (session == null || !(session instanceof GlobalSession)) {throw new IllegalArgumentException("The parameter of SessionStorable is not available, SessionStorable:" + StringUtils.toString(session));}GlobalSession globalSession = (GlobalSession)session;GlobalTransactionDO globalTransactionDO = new GlobalTransactionDO();globalTransactionDO.setXid(globalSession.getXid());globalTransactionDO.setStatus(globalSession.getStatus().getCode());globalTransactionDO.setApplicationId(globalSession.getApplicationId());globalTransactionDO.setBeginTime(globalSession.getBeginTime());globalTransactionDO.setTimeout(globalSession.getTimeout());globalTransactionDO.setTransactionId(globalSession.getTransactionId());globalTransactionDO.setTransactionName(globalSession.getTransactionName());globalTransactionDO.setTransactionServiceGroup(globalSession.getTransactionServiceGroup());globalTransactionDO.setApplicationData(globalSession.getApplicationData());return globalTransactionDO;}public static BranchTransactionDO convertBranchTransactionDO(SessionStorable session) {if (session == null || !(session instanceof BranchSession)) {throw new IllegalArgumentException("The parameter of SessionStorable is not available, SessionStorable:" + StringUtils.toString(session));}BranchSession branchSession = (BranchSession)session;BranchTransactionDO branchTransactionDO = new BranchTransactionDO();branchTransactionDO.setXid(branchSession.getXid());branchTransactionDO.setBranchId(branchSession.getBranchId());branchTransactionDO.setBranchType(branchSession.getBranchType().name());branchTransactionDO.setClientId(branchSession.getClientId());branchTransactionDO.setResourceGroupId(branchSession.getResourceGroupId());branchTransactionDO.setTransactionId(branchSession.getTransactionId());branchTransactionDO.setApplicationData(branchSession.getApplicationData());branchTransactionDO.setResourceId(branchSession.getResourceId());branchTransactionDO.setStatus(branchSession.getStatus().getCode());return branchTransactionDO;}...
}