TDDL 在分布式下的SEQUENCE原理
TDDL大家应该很熟悉了,淘宝分布式数据层。很好的为我们实现了分库分表、Master/Salve、动态数据源配置等功能。
那么分布式之后,数据库自增序列肯定用不了了,如何方便快捷的解决这个问题呢?TDDL也提供了SEQUENCE的解决方案。
总述
在数据库中创建 sequence 表,用于记录,当前已被占用的id最大值。
每台客户端主机取一个id区间(比如 1000~2000)缓存在本地,并更新 sequence 表中的id最大值记录。
客户端主机之间取不同的id区间,用完再取,使用乐观锁机制控制并发。
第一步:创建一张sequence对应的表
CREATE TABLE `imp_sequence` (
`BIZ_NAME` varchar(45) NOT NULL COMMENT '业务名称',
`CURRENT_VALUE` int(11) NOT NULL COMMENT '当前最大值',
`GMT_CREATE` datetime DEFAULT NULL COMMENT '创建时间',
`GMT_MODIFIED` datetime DEFAULT NULL COMMENT '修改时间',
PRIMARY KEY (`BIZ_NAME`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='数据序列表';
表名和字段可以按各自规则定义,定义之后需要与第二步DAO中的定义相对应!
几张逻辑表需要声明几个sequence。
第二步:配置sequenceDao


<bean id="sequenceDao" class="com.taobao.tddl.client.sequence.impl.DefaultSequenceDao"><!-- 数据源 --><property name="dataSource" ref="dataSource" /><!-- 步长--><property name="step" value="1000" /><!-- 重试次数--><property name="retryTimes" value="1" /><!-- sequence 表名--><property name="tableName" value="gt_sequence" /><!-- sequence 名称--><property name="nameColumnName" value="BIZ_NAME" /><!-- sequence 当前值--><property name="valueColumnName" value="CURRENT_VALUE" /><!-- sequence 更新时间--><property name="gmtModifiedColumnName" value="gmt_modified" />
</bean>
第三步:配置sequence生成器


<bean id="businessSequence" class="com.taobao.tddl.client.sequence.impl.DefaultSequence"><property name="sequenceDao" ref="sequenceDao"/><property name="name" value="business_sequence" />
</bean>
第四步:调用


public class IbatisSmDAO extends SqlMapClientDaoSupport implements SmDAO {/**smSequence*/private DefaultSequence businessSequence;public int insert(SmDO sm) throws DataAccessException {if (sm == null) {throw new IllegalArgumentException("Can't insert a null data object into db.");}try {sm.setId((int)businessSequence.nextValue());} catch (SequenceException e) {throw new RuntimeException("Can't get primary key.");}getSqlMapClientTemplate().insert("MS-SM-INSERT", sm);return sm.getId();} }
从调用配置中,我们可以发现其中涉及到二个重要类DefaultSequenceDao和DefaultSequence,这二个都是TDDL的默认实现。DefaultSequenceDao:序列DAO默认实现,JDBC方式。DefaultSequence:序列默认实现。
先来看DefaultSequenceDao,TDDL中提供了默认的表名,列名和步长等,第一步的建表可以参照默认方式。


private static final int MIN_STEP = 1;private static final int MAX_STEP = 100000;private static final int DEFAULT_STEP = 1000;private static final int DEFAULT_RETRY_TIMES = 150;private static final String DEFAULT_TABLE_NAME = "sequence";private static final String DEFAULT_NAME_COLUMN_NAME = "name";private static final String DEFAULT_VALUE_COLUMN_NAME = "value";private static final String DEFAULT_GMT_MODIFIED_COLUMN_NAME = "gmt_modified";private static final long DELTA = 100000000L;private DataSource dataSource;/*** 重试次数*/private int retryTimes = DEFAULT_RETRY_TIMES;/*** 步长*/private int step = DEFAULT_STEP;/*** 序列所在的表名*/private String tableName = DEFAULT_TABLE_NAME;/*** 存储序列名称的列名*/private String nameColumnName = DEFAULT_NAME_COLUMN_NAME;/*** 存储序列值的列名*/private String valueColumnName = DEFAULT_VALUE_COLUMN_NAME;/*** 存储序列最后更新时间的列名*/private String gmtModifiedColumnName = DEFAULT_GMT_MODIFIED_COLUMN_NAME;
接下来看一下nextRange方法:取得下一个可用的序列区间:


public SequenceRange nextRange(String name) throws SequenceException {if (name == null) {throw new IllegalArgumentException("序列名称不能为空");}long oldValue;long newValue;Connection conn = null;PreparedStatement stmt = null;ResultSet rs = null;for (int i = 0; i < retryTimes + 1; ++i) {try {conn = dataSource.getConnection();stmt = conn.prepareStatement(getSelectSql());stmt.setString(1, name);rs = stmt.executeQuery();rs.next();oldValue = rs.getLong(1);if (oldValue < 0) {StringBuilder message = new StringBuilder();message.append("Sequence value cannot be less than zero, value = ").append(oldValue);message.append(", please check table ").append(getTableName());throw new SequenceException(message.toString());}if (oldValue > Long.MAX_VALUE - DELTA) {StringBuilder message = new StringBuilder();message.append("Sequence value overflow, value = ").append(oldValue);message.append(", please check table ").append(getTableName());throw new SequenceException(message.toString());}newValue = oldValue + getStep();} catch (SQLException e) {throw new SequenceException(e);} finally {closeResultSet(rs);rs = null;closeStatement(stmt);stmt = null;closeConnection(conn);conn = null;}try {conn = dataSource.getConnection();stmt = conn.prepareStatement(getUpdateSql());stmt.setLong(1, newValue);stmt.setTimestamp(2, new Timestamp(System.currentTimeMillis()));stmt.setString(3, name);stmt.setLong(4, oldValue);int affectedRows = stmt.executeUpdate();if (affectedRows == 0) {// retrycontinue;}return new SequenceRange(oldValue + 1, newValue);} catch (SQLException e) {throw new SequenceException(e);} finally {closeStatement(stmt);stmt = null;closeConnection(conn);conn = null;}}throw new SequenceException("Retried too many times, retryTimes = " + retryTimes);}
通过getSelectSql查询最新的value值,然后加上步点,通过getUpdateSql更新到数据库中


private String getSelectSql() {if (selectSql == null) {synchronized (this) {if (selectSql == null) {StringBuilder buffer = new StringBuilder();buffer.append("select ").append(getValueColumnName());buffer.append(" from ").append(getTableName());buffer.append(" where ").append(getNameColumnName()).append(" = ?");selectSql = buffer.toString();}}}return selectSql;}private String getUpdateSql() {if (updateSql == null) {synchronized (this) {if (updateSql == null) {StringBuilder buffer = new StringBuilder();buffer.append("update ").append(getTableName());buffer.append(" set ").append(getValueColumnName()).append(" = ?, ");buffer.append(getGmtModifiedColumnName()).append(" = ? where ");buffer.append(getNameColumnName()).append(" = ? and ");buffer.append(getValueColumnName()).append(" = ?");updateSql = buffer.toString();}}}return updateSql;}
有一个特殊需要说明的,在update语句中,where需要把之前的value当成条件传入。实现了类型version的乐观锁操作。如果同一个时间AB二台机器同时请求获取到相同的value,进行update操作只有可能一条成功。失败的会按retryTimes进行重试。
接下来看DefaultSequence,比较简单,就不说明了


public class DefaultSequence implements Sequence {private final Lock lock = new ReentrantLock();private SequenceDao sequenceDao;/*** 序列名称*/private String name;private volatile SequenceRange currentRange;public long nextValue() throws SequenceException {if (currentRange == null) {lock.lock();try {if (currentRange == null) {currentRange = sequenceDao.nextRange(name);}} finally {lock.unlock();}}long value = currentRange.getAndIncrement();if (value == -1) {lock.lock();try {for (;;) {if (currentRange.isOver()) {currentRange = sequenceDao.nextRange(name);}value = currentRange.getAndIncrement();if (value == -1) {continue;}break;}} finally {lock.unlock();}}if (value < 0) {throw new SequenceException("Sequence value overflow, value = " + value);}return value;}public SequenceDao getSequenceDao() {return sequenceDao;}public void setSequenceDao(SequenceDao sequenceDao) {this.sequenceDao = sequenceDao;}public String getName() {return name;}public void setName(String name) {this.name = name;} }