一个基于数据库的分布式锁:乐观与悲观实现
stevensu1/EC0620
核心思想是使用一个字段同时维护乐观锁版本或标记,以及悲观锁持有状态。字段值设置为空表示:无锁状态,字段值设置为0:表示悲观锁持有状态,值段值设置为1,2,3.。。。表示:乐观锁版本或标记。
实现方式:定义里两个注解(@lockwithsync,@lockwithcas),用于分别标记需要拦截的悲观事务方法和乐观事务方法。
再定义两个切面,一个扫描@lockwithsync,执行切入悲观事务逻辑,一个扫描@lockwithcas切入乐观事务逻辑。
需要注意的是:乐观锁的逻辑需要动态修改sql,所以要扩展mybatis拦截器。并构造对应的mappedStatementt,执行修改后的乐观操作 update set vertion= vertion+1 where vertion =#{verstion},
下面是我的实现代码:
两个注解:
package com.example.lock;import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** 标记悲观事务方法*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface LockWithSync {String value() default "";
}
package com.example.lock;import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;/*** 标记乐观 CAS 事务方法*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface LockWithCAS {String value() default "";
}
切面:LockWithSycnAspt基于 @LockWithSync 注解对特定资源加锁, 实现逻辑:扫描注解,获取方法参数得到实体类型,和实体记录的id,同时从spring上下文中获取mybatis相关service 的bean。然后使用它访问mysql数据库。为什么是service注解标记的bean而不是直接从上下获取mapper呢,应为它是一个接口,而spirngbean 在执行的时候是jdk动态代理方方式调用方法的
package com.example.lock;import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
import java.lang.reflect.Field;
import java.lang.reflect.Method;/*** 切面:基于 @LockWithSync 注解对特定资源加锁*/
@Aspect
@Component
public class LockWithSyncAspt {@Autowiredprivate ApplicationContext applicationContext; // 新增:注入 ApplicationContext/*** 环绕通知:在方法执行前后加锁/解锁** @param joinPoint 切入点* @param lock 注解信息* @return 方法执行结果*/@Around("@annotation(lock)")public Object around(ProceedingJoinPoint joinPoint, LockWithSync lock) throws Throwable {//这里获取拦截的方法参数Object[] args = joinPoint.getArgs();if (args.length == 0) {throw new IllegalArgumentException("方法参数不能为空");}Object proEntity = args[0];String clazzName = proEntity.getClass().getName(); // 获取锁键值,如 entity.classNameField idField = proEntity.getClass().getDeclaredField("id");idField.setAccessible(true);Integer entityRecordId = Integer.valueOf(idField.get(proEntity).toString());String[] _clazzName = clazzName.split(":")[0].split("\\.");String mapperNamespace = "com.example.mapper." + _clazzName[3] + "MapperImpl"; // 如 "com.example.model.UserMapper"// 从 Spring 容器中获取 Mapper 实例Class<?> mapperClass = Class.forName(mapperNamespace);Object mapperInstance = applicationContext.getBean(mapperClass); // 修改:从 Spring 容器获取 Mapper// 获取 selectOne 方法并调用Method selectOneMethod = mapperInstance.getClass().getMethod("selectOne", Integer.class);Object entityDataBase = selectOneMethod.invoke(mapperInstance, entityRecordId); // 修改:使用容器中的 Mapper 实例调用方法//这里对entity.lockFlag开启private保护Field lockFlagField = entityDataBase.getClass().getDeclaredField("lockFlag");lockFlagField.setAccessible(true); // 允许访问私有字段Integer lockFlag = (Integer) lockFlagField.get(entityDataBase); // 读取值Method updateUserMethod = mapperInstance.getClass().getMethod("updateUser", entityDataBase.getClass());try {if (null == lockFlag || 0 < lockFlag) {lockFlagField.set(entityDataBase, 0); // 修改值updateUserMethod.invoke(mapperInstance, entityDataBase); // 修改:使用容器中的 Mapper 实例调用方法} else if (lockFlag == 0) {throw new RuntimeException("锁已存在");} else {new RuntimeException("系统异常");}return joinPoint.proceed(); // 执行目标方法} finally {//释放锁lockFlagField.set(entityDataBase, null); // 修改值updateUserMethod.invoke(mapperInstance, entityDataBase); // 修改:使用容器中的 Mapper 实例调用方法lockFlagField.setAccessible(false);}}
}
切面LockWithCASAspt:基于 @LockWithCAS 注解对特定资源进行CAS操作加强 在方法执行前获取数据库记录(lock_flag_value)保存到线程上下文中, 在方法执行后,扩展mybatis拦截器,修改sql ,附加 update set lock_flag+1 where lock_flag = lock_flag_value这样的CAS逻辑 完成后删除线程上下文数据
package com.example.lock;import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;/*** 切面:基于 @LockWithCAS 注解对特定资源进行CAS操作加强* 在方法执行前获取数据库记录(lock_flag_value)保存到线程上下文中,* 在方法执行后,扩展mybatis拦截器,修改sql ,附加 update set lock_flag+1 where lock_flag = lock_flag_value这样的CAS逻辑* 完成后删除线程上下文数据*/
@Aspect
@Component
public class LockWithCASAspt {@Autowiredprivate ApplicationContext applicationContext; // 新增:注入 ApplicationContext/*** 线程局部存储当前正在处理的 entityClazzName 和 entityId* 用于 MyBatis 拦截器识别是否需要追加 WHERE 条件*/private static final ThreadLocal<String> CURRENT_ENTITY_KEY = new ThreadLocal<>();private static final ThreadLocal<Integer> CURRENT_EXPECTED_LOCK_FLAG = new ThreadLocal<>();@Around("@annotation(com.example.lock.LockWithCAS)")public Object around(ProceedingJoinPoint joinPoint) throws Throwable {Object[] args = joinPoint.getArgs();if (args.length == 0) {throw new IllegalArgumentException("方法参数不能为空");}Object proEntity = args[0];String clazzName = proEntity.getClass().getName();Field idField = proEntity.getClass().getDeclaredField("id");idField.setAccessible(true);Integer entityRecordId = (Integer) idField.get(proEntity);String[] _clazzName = clazzName.split(":")[0].split("\\.");String mapperNamespace = "com.example.mapper." + "My" + _clazzName[3] + "Mapper"; // 如 "com.example.model.UserMapper"//从数据库获取数据库记录的 lockFlag 值Integer lockFlag = getLockFlagFromDatabase(_clazzName[3], entityRecordId);// 构造唯一 keyString entityKey = clazzName + ":" + entityRecordId;try {// 设置当前线程上下文中的 lockFlagCURRENT_ENTITY_KEY.set(entityKey);CURRENT_EXPECTED_LOCK_FLAG.set(lockFlag);// 执行业务方法(内部会触发 MyBatis 更新)return joinPoint.proceed();} finally {// 方法结束后清除线程局部变量CURRENT_ENTITY_KEY.remove();CURRENT_EXPECTED_LOCK_FLAG.remove();}}// 提供给 MyBatis 拦截器访问当前锁信息public static Integer getExpectedLockFlag() {return CURRENT_EXPECTED_LOCK_FLAG.get();}public static String getCurrentEntityKey() {return CURRENT_ENTITY_KEY.get();}private Integer getLockFlagFromDatabase(String _clazzName3, Integer entityRecordId) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException, NoSuchFieldException {String mapperNamespace = "com.example.mapper." + _clazzName3 + "MapperImpl"; // 如 "com.example.model.UserMapper"// 从 Spring 容器中获取 Mapper 实例Class<?> mapperClass = Class.forName(mapperNamespace);Object mapperInstance = applicationContext.getBean(mapperClass); // 修改:从 Spring 容器获取 Mapper// 获取 selectOne 方法并调用Method selectOneMethod = mapperInstance.getClass().getMethod("selectOne", Integer.class);Object entityDataBase = selectOneMethod.invoke(mapperInstance, entityRecordId); // 修改:使用容器中的 Mapper 实例调用方法//这里对entity.lockFlag开启private保护Field lockFlagField = entityDataBase.getClass().getDeclaredField("lockFlag");lockFlagField.setAccessible(true); // 允许访问私有字段return (Integer) lockFlagField.get(entityDataBase); // 读取值}}
mybatis 是拦截器LockInterceptor:对sql进行cas操作增强
package com.example.lock;import org.apache.ibatis.executor.Executor;
import org.apache.ibatis.mapping.BoundSql;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.mapping.ParameterMapping;
import org.apache.ibatis.mapping.SqlCommandType;
import org.apache.ibatis.plugin.*;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;/*** MyBatis 插件:在 UPDATE 语句中自动追加 WHERE lock_flag = ?*/
@Intercepts({@Signature(type = Executor.class, method = "update", args = {MappedStatement.class, Object.class})
})
public class LockInterceptor implements Interceptor {@Overridepublic Object intercept(Invocation invocation) throws Throwable {MappedStatement mappedStatement = (MappedStatement) invocation.getArgs()[0];Object parameter = invocation.getArgs()[1];if (SqlCommandType.UPDATE != mappedStatement.getSqlCommandType()) {return invocation.proceed();}// 获取当前线程上下文中设置的预期 lockFlagInteger expectedLockFlag = LockWithCASAspt.getExpectedLockFlag();String entityKey = LockWithCASAspt.getCurrentEntityKey();BoundSql boundSql = mappedStatement.getBoundSql(parameter);String originalSql = boundSql.getSql();// 追加 WHERE lock_flag = ?String modifiedSql = addLockFlagToSql(originalSql, expectedLockFlag);// 移除原有lockFlagList<ParameterMapping> parameterMappings = boundSql.getParameterMappings().stream().filter(mapping -> !"lockFlag".equals(mapping.getProperty())).collect(Collectors.toList());BoundSql newBoundSql = new BoundSql(mappedStatement.getConfiguration(),modifiedSql,parameterMappings,boundSql.getParameterObject());// 构建新的 MappedStatementMappedStatement newMs = copyFromMappedStatement(mappedStatement, newBoundSql);// 获取原始 Executor(来自 invocation)Executor executor = (Executor) invocation.getTarget();// 使用 Executor 执行新的 MappedStatementreturn executor.update(newMs, parameter);}/*** 创建新的 MappedStatement* @param ms* @param newBoundSql* @return*/private MappedStatement copyFromMappedStatement(MappedStatement ms, BoundSql newBoundSql) {MappedStatement.Builder builder = new MappedStatement.Builder(ms.getConfiguration(),ms.getId(),parameterObject -> newBoundSql,ms.getSqlCommandType()).cache(ms.getCache()).flushCacheRequired(ms.isFlushCacheRequired()).useCache(ms.isUseCache()).resultMaps(ms.getResultMaps()).keyGenerator(ms.getKeyGenerator()).timeout(ms.getTimeout()).statementType(ms.getStatementType());// 如果是自定义主键映射,可以手动添加if (ms.getKeyProperties() != null && ms.getKeyProperties().length > 0) {for (int i = 0; i < ms.getKeyProperties().length; i++) {String keyProperty = ms.getKeyProperties()[i];String keyColumn = ms.getKeyColumns().length > i ? ms.getKeyColumns()[i] : "";builder = builder.keyColumn(keyColumn).keyProperty(keyProperty);}}return builder.build();}/*** 向 UPDATE SQL 中插入 lock_flag 字段的更新和条件判断** @param sql 原始 SQL* @param expectedLockFlag 预期的 lock_flag 值* @return 新 SQL*/private String addLockFlagToSql(String sql, Integer expectedLockFlag) {if (!sql.toLowerCase().startsWith("update")) {return sql;}// 查找 SET 关键字位置int setIndex = sql.toLowerCase().indexOf("set");if (setIndex == -1) {throw new IllegalArgumentException("无效的 UPDATE SQL 语句: " + sql);}//去掉原来的 set lock_flag 语句sql = sql.replaceAll("(?i)lock_flag = .*?,", "");// 在 SET 后面插入 lock_flag = lock_flag + 1String prefix = sql.substring(0, setIndex + 4); // 包含 "SET"String suffix = sql.substring(setIndex + 4); // 剩余部分String newSetClause = prefix + " lock_flag = IF(lock_flag IS NULL, 1, lock_flag + 1)," + suffix;// 查找 WHERE 位置int whereIndex = newSetClause.toLowerCase().indexOf("where");if (whereIndex == -1) {// 没有 WHERE 条件,直接附加if (null == expectedLockFlag) {return newSetClause;} else {return newSetClause + " WHERE lock_flag = " + expectedLockFlag;}} else {// 已有 WHERE,追加 AND lock_flag = ?if (null == expectedLockFlag) {return newSetClause.substring(0, whereIndex) +" WHERE (" + newSetClause.substring(whereIndex + 6) + ") ";} else {return newSetClause.substring(0, whereIndex) +" WHERE (" + newSetClause.substring(whereIndex + 6) + ") AND lock_flag = " + expectedLockFlag;}}}@Overridepublic Object plugin(Object target) {return Plugin.wrap(target, this);}@Overridepublic void setProperties(Properties properties) {// 可配置参数}
}
使用方式:
1,确保数据表里有相关锁字段:
2.在需要加上锁的方法上添加对应注解。
@Service
public class UserServiceImpl {@Autowiredprivate UserMapper userMapper;@LockWithSyncpublic void updateUser(User user) {List<User> userList = userMapper.selectAll();}@LockWithCASpublic void updateUserCAS(User user) {userMapper.updateUser(user);}
}
3.一个小细节:因为是从参数列表获取实体类型,以及实体id,所以这里标记的方法参数要使用实体类作为参数。