拦截指定注解(FeignClient),补偿重试
拦截指定注解(FeignClient),补偿重试;对代码无入侵
避免正常调用和重试逻辑调用重复插入;
根据自己的业务需求 插入新数据时 是否需要删除之前的旧数据,防止数据覆盖
import cn.hutool.core.util.ObjectUtil; import cn.hutool.json.JSONUtil; import com.ev.edge.computility.domain.dto.BaseResult; import com.ev.edge.retryFail.domain.RetryFailLog; import com.ev.edge.retryFail.service.RetryFailLogService; import lombok.extern.slf4j.Slf4j; import org.aspectj.lang.ProceedingJoinPoint; import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.reflect.MethodSignature; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;import java.util.Arrays; import java.util.Date;import static com.ev.edge.constants.Constants.CONSTANT_ZERO;@Aspect @Component @Slf4j public class RetryFailureAop {@Autowiredprivate RetryFailLogService retryFailLogService;@Around("@within(org.springframework.cloud.openfeign.FeignClient)")public Object handleFeignClientResponse(ProceedingJoinPoint joinPoint) throws Throwable {// 执行目标方法Object result = joinPoint.proceed();// 如果是重试调用,则跳过日志插入逻辑if (RetryFailureAopScheduled.isRetryCall()) {return result;}if (result instanceof BaseResult<?>) {BaseResult<?> sesResult = (BaseResult<?>) result;// 只对特定状态码做处理if (!ObjectUtil.equal(sesResult.getCode(), CONSTANT_ZERO)) {logFailedRequest(joinPoint, sesResult);}}return result;}/*** 记录失败请求到数据库*/private void logFailedRequest(ProceedingJoinPoint joinPoint, BaseResult<?> sesResult) {try {MethodSignature signature = (MethodSignature) joinPoint.getSignature();String targetClass = signature.getDeclaringTypeName(); // 获取目标类名String targetMethod = signature.getName(); // 获取方法名Object[] args = joinPoint.getArgs();RetryFailLog log = new RetryFailLog();log.setTargetClass(targetClass);log.setTargetMethod(targetMethod);// 序列化请求参数为 JSON 字符串log.setRequestArgs(JSONUtil.toJsonStr(args));// 在 logFailedRequest 方法中Class<?>[] parameterTypes = signature.getParameterTypes();// 将 Class 数组转换为包含全限定类名的字符串数组String[] parameterTypeNames = Arrays.stream(parameterTypes).map(Class::getName).toArray(String[]::new);log.setParameterTypeList(Arrays.asList(parameterTypeNames));log.setErrorMessage("Feign 调用失败: " + sesResult.getMsg());log.setStatus("pending");log.setRetryCount(0);log.setMaxRetryTimes(5); // 可以从配置文件读取log.setNextRetryTime(calculateNextRetryTime(0));log.setCreateTime(new Date());log.setUpdateTime(new Date());// 保存到数据库retryFailLogService.save(log);} catch (Exception e) {// 防止日志记录失败导致业务异常log.error("记录失败日志时发生异常", e);}}private Date calculateNextRetryTime(int retryCount) {long delay = (long) Math.pow(2, retryCount) * 1000 * 60; // 指数退避,分钟级return new Date(System.currentTimeMillis() + delay);} }
定时任务 补偿处理
@Configuration @Slf4j @AllArgsConstructor public class RetryFailureAopScheduled {// 缓存 <methodKey, Method> 提升反射效率private static final Map<String, Method> METHOD_CACHE = new ConcurrentHashMap<>();private final RetryFailLogService retryFailLogService;/*** 定时任务:每分钟执行一次,检查并重试失败的日志记录*/@Scheduled(fixedRate = 60_000)public void retryFailedRequests() {List<RetryFailLog> logs = retryFailLogService.findPendingAndDue();for (RetryFailLog logEntry : logs) {try {boolean success = invoke(logEntry);if (success) {logEntry.setStatus("success");log.info("重试成功: {}, method={}", logEntry.getTargetClass(), logEntry.getTargetMethod());} else {updateRetryInfo(logEntry);log.warn("重试未完成: {}, method={}, 当前重试次数: {}",logEntry.getTargetClass(), logEntry.getTargetMethod(), logEntry.getRetryCount());}} catch (Exception e) {log.error("重试失败: {}, method={}, error={}",logEntry.getTargetClass(), logEntry.getTargetMethod(), e.getMessage(), e);updateRetryInfo(logEntry);} finally {retryFailLogService.updateById(logEntry);}}}public boolean invoke(RetryFailLog logEntry) {try {markAsRetryCall();String targetClass = logEntry.getTargetClass();String targetMethod = logEntry.getTargetMethod();String requestArgsJson = logEntry.getRequestArgs();List<String> parameterTypeNames = logEntry.getParameterTypeList();Class<?>[] paramTypes = loadParameterTypes(parameterTypeNames);// 获取目标类和 Bean 实例Class<?> clazz = Class.forName(targetClass);Object bean = SpringUtils.getBean(clazz);// 构建唯一方法 KeyString methodKey = buildMethodKey(clazz, targetMethod, paramTypes);// 已经从缓存或首次加载中获取了 Method 对象Method method = METHOD_CACHE.computeIfAbsent(methodKey, k -> {try {return clazz.getDeclaredMethod(targetMethod, paramTypes);} catch (NoSuchMethodException e) {log.error("找不到目标方法: {}.{}", targetClass, targetMethod, e);return null;}});if (method == null) {throw new NoSuchMethodException("未找到方法: " + targetMethod + " 在类: " + targetClass);}// 反序列化请求参数Object[] args = deserializeArgs(requestArgsJson, paramTypes);// 获取目标对象(绕过 AOP)Object target = AopUtils.getTargetObject(bean);// 设置可访问性为 true(以防方法不是 public)method.setAccessible(true);// 直接使用已有的 method 对象调用目标对象上的方法Object result = method.invoke(target, args);// 调用方法 // Object result1 = ReflectUtil.invoke(bean, method, args);// 判断是否调用成功(注意类型安全)return isSuccessResult(result);} catch (InvocationTargetException e) {Throwable cause = e.getCause();log.error("调用目标方法失败", cause);return false;} catch (Exception e) {log.error("invoke 方法出现异常", e);return false;} finally {clear(); // 清除标记,确保不会影响其他调用}}/*** 动态调用 Feign Client 方法*/ // public boolean invoke(RetryFailLog logEntry) throws Exception { // String targetClass = logEntry.getTargetClass(); // String targetMethod = logEntry.getTargetMethod(); // String requestArgsJson = logEntry.getRequestArgs(); // // List<String> parameterTypeNames = logEntry.getParameterTypeList(); // Class<?>[] paramTypes = loadParameterTypes(parameterTypeNames); // // // 获取目标类和 Bean 实例 // Class<?> clazz = Class.forName(targetClass); // Object bean = SpringUtils.getBean(clazz); // // // 构建唯一方法 Key // String methodKey = buildMethodKey(clazz, targetMethod, paramTypes); // // // 从缓存中获取 Method 或首次加载 // Method method = METHOD_CACHE.computeIfAbsent(methodKey, k -> { // try { // return clazz.getDeclaredMethod(targetMethod, paramTypes); // } catch (NoSuchMethodException e) { // log.error("找不到目标方法: {}.{}", targetClass, targetMethod, e); // return null; // } // }); // // if (method == null) { // throw new NoSuchMethodException("未找到方法: " + targetMethod + " 在类: " + targetClass); // } // // // 反序列化请求参数 // Object[] args = deserializeArgs(requestArgsJson, paramTypes); // // 调用方法 // Object result = ReflectUtil.invoke(bean, method, args); // // // 判断是否调用成功(根据业务返回值判断) // return isSuccessResult(result); // }/*** 将参数类型字符串转换为 Class[]*/private Class<?>[] loadParameterTypes(List<String> typeNames) throws ClassNotFoundException {Class<?>[] types = new Class<?>[typeNames.size()];for (int i = 0; i < typeNames.size(); i++) {types[i] = getClassForName(typeNames.get(i));}return types;}/*** 根据类名字符串获取 Class 对象(支持泛型截断)*/private Class<?> getClassForName(String className) throws ClassNotFoundException {if (className.contains("<")) {className = className.substring(0, className.indexOf('<'));}return Class.forName(className);}/*** 构建方法唯一标识 Key*/private String buildMethodKey(Class<?> clazz, String methodName, Class<?>... paramTypes) {StringBuilder sb = new StringBuilder(clazz.getName()).append("#").append(methodName);for (Class<?> t : paramTypes) {sb.append("$").append(t.getName());}return sb.toString();}/*** 反序列化 JSON 字符串为参数数组*/private Object[] deserializeArgs(String json, Class<?>... paramTypes) {Object[] array = JSONUtil.parseArray(json).toArray();Object[] args = new Object[paramTypes.length];for (int i = 0; i < paramTypes.length; i++) {args[i] = Convert.convert(paramTypes[i], array[i]);}// // 解析 JSON 字符串为数组 // Object[] jsonArray = JSONUtil.parseArray(json).toArray(); // Object[] args = new Object[paramTypes.length]; // for (int i = 0; i < paramTypes.length; i++) { // Class<?> paramType = paramTypes[i]; // Object jsonElement = jsonArray[i]; // // if (jsonElement == null) { // args[i] = null; // continue; // } // // // 如果目标类型是 String,则直接使用 JSONUtil.toStr() // if (paramType == String.class) { // args[i] = JSONUtil.toJsonStr(jsonElement); // } // // 如果目标类型是 Long 或 long,则尝试将 JSON 元素转换为 Long // else if (paramType == Long.class || paramType == long.class) { // if (jsonElement instanceof Number) { // args[i] = ((Number) jsonElement).longValue(); // } else { // try { // args[i] = Long.parseLong(JSONUtil.toJsonStr(jsonElement)); // } catch (NumberFormatException e) { // throw new IllegalArgumentException("无法将参数 '" + JSONUtil.toJsonStr(jsonElement) + "' 转换为 Long 类型", e); // } // } // } // // 如果目标类型是 Integer 或 int,则尝试将 JSON 元素转换为 Integer // else if (paramType == Integer.class || paramType == int.class) { // if (jsonElement instanceof Number) { // args[i] = ((Number) jsonElement).intValue(); // } else { // try { // args[i] = Integer.parseInt(JSONUtil.toJsonStr(jsonElement)); // } catch (NumberFormatException e) { // throw new IllegalArgumentException("无法将参数 '" + JSONUtil.toJsonStr(jsonElement) + "' 转换为 Integer 类型", e); // } // } // } // // 对于其他复杂对象类型,使用 JSONUtil.toBean() // else { // args[i] = JSONUtil.toBean(JSONUtil.toJsonStr(jsonElement), paramType); // } // }return args;}/*** 更新重试次数、下次重试时间等信息*/private void updateRetryInfo(RetryFailLog logEntry) {int maxRetryTimes = logEntry.getMaxRetryTimes();int retryCount = logEntry.getRetryCount() + 1;if (retryCount >= maxRetryTimes) {logEntry.setStatus("failed");log.warn("已达最大重试次数({}), 请求失败: {}.{}", maxRetryTimes,logEntry.getTargetClass(), logEntry.getTargetMethod());} else {logEntry.setRetryCount(retryCount);logEntry.setNextRetryTime(calculateNextRetryTime(retryCount));}}/*** 计算下一次重试时间(指数退避策略)*/private Date calculateNextRetryTime(int retryCount) {long baseDelay = 30_000L; // 基础延迟 30slong delay = baseDelay * (1L << retryCount); // 指数增长return new Date(System.currentTimeMillis() + delay);}/*** 简单判断调用结果是否成功(可按实际业务逻辑扩展)*/private boolean isSuccessResult(Object result) {if (result instanceof BaseResult) {BaseResult ses = (BaseResult) result;String msg = ses.getMsg();return ObjectUtil.equal(ses.getCode(), SUCCESS.getCode());}return false;}private static final ThreadLocal<Boolean> isRetryCall = new ThreadLocal<>();public static void markAsRetryCall() {isRetryCall.set(true);}public static boolean isRetryCall() {return Boolean.TRUE.equals(isRetryCall.get());}public static void clear() {isRetryCall.remove();} }
@TableName(value = "retry_fail_log") @Data public class RetryFailLog {private Long id;private String targetClass;private String targetMethod;private String errorMessage;private Integer retryCount;private Integer maxRetryTimes;private Date nextRetryTime;private String status;private Date createTime;private Date updateTime;private String requestArgs; // 原始请求参数的 JSON 字符串private String parameterTypes; // 参数类型的字符串数组public List<String> getParameterTypeList() {return JSONUtil.toList(parameterTypes, String.class);}public void setParameterTypeList(List<String> parameterTypeList) {this.parameterTypes = JSONUtil.toJsonStr(parameterTypeList);} }
@Data public abstract class BaseResult<T> {private String code;private String msg;private T data; }
-- inference_platform.retry_fail_log definition
CREATE TABLE `retry_fail_log` (
`id` bigint NOT NULL AUTO_INCREMENT,
`target_class` varchar(255) COLLATE utf8mb4_bin NOT NULL,
`target_method` varchar(255) COLLATE utf8mb4_bin NOT NULL,
`request_args` text COLLATE utf8mb4_bin NOT NULL,
`parameter_types` text COLLATE utf8mb4_bin NOT NULL,
`error_message` text COLLATE utf8mb4_bin,
`retry_count` int DEFAULT '0',
`max_retry_times` int DEFAULT '5',
`next_retry_time` datetime DEFAULT NULL,
`status` varchar(50) COLLATE utf8mb4_bin DEFAULT 'pending',
`create_time` datetime DEFAULT CURRENT_TIMESTAMP,
`update_time` datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
KEY `idx_status` (`status`),
KEY `idx_next_retry_time` (`next_retry_time`)
) ENGINE=InnoDB AUTO_INCREMENT=70 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
@Select("SELECT * FROM retry_fail_log WHERE status = 'pending' AND next_retry_time <= NOW()") List<RetryFailLog> findPendingAndDue();
<dependency><groupId>org.aspectj</groupId><artifactId>aspectjrt</artifactId><version>1.9.19</version> <!-- 确保使用最新版本,这里以1.9.19为例 --> </dependency> <dependency><groupId>org.aspectj</groupId><artifactId>aspectjweaver</artifactId><version>1.9.19</version> <!-- 确保使用最新版本,这里以1.9.19为例 --> </dependency>