当前位置: 首页 > ops >正文

反欺诈业务 Elasticsearch 分页与导出问题分析及解决方案

我是如何在反欺诈系统中使用 Redis 缓存客户年龄信息,提升导出性能的(实战经验总结)


一、背景

我在开发一个反欺诈系统的开户流水导出功能时,需要导出约 7w 条开户流水数据,每条数据包含客户号、开户日期,并且要补充客户年龄字段。

年龄字段来源于客户基本信息表,数据存储在 Elasticsearch 中,字段是客户出生日期。

开发初期,我们采用一次性查询客户号的方式,但上线后发现:

大部分年龄字段为空,数据异常!

排查发现:一次性传入 7w 个客户号查询 ES,由于 index.max_result_window 默认限制为 10000,只返回了前 1w 条数据。

于是我们做了如下优化:

  • 分批查询客户信息(每批 2000 条)
  • 使用 filter 查询,不记分
  • 只查询 custNo 和 birthDate 字段
  • 使用 Java 8 并行流加快查询速度
  • 引入 Redis 缓存客户出生日期信息,设置过期时间

二、我遇到的问题

1. 客户信息查询效率低

  • 每次导出都要重新查询 ES,性能差
  • 客户出生日期是静态数据,重复查询浪费资源

2. 一次性查询客户号数据不完整

  • 一次性传入 7w 个客户号,ES 默认最多返回 1w 条数据
  • 导致年龄字段缺失,数据异常

3. 未使用缓存,重复查询浪费资源

  • 客户出生日期基本不变,每次导出都重新查 ES,浪费资源

三、我是怎么做的?

我最终采用了如下方案进行优化:

优化项说明
分批查询每次查 2000 个客户号,规避 ES 的 max_result_window 限制
使用 filter 查询不记分,提升查询效率
只查 custNo 和 birthDate 字段减少数据传输量
使用 Java 8 并行流提升分批查询效率
引入 Redis 缓存客户出生日期减少重复查询
设置缓存过期时间(如 1 天)保证数据新鲜度

四、具体实现(Java + Elasticsearch + Redis)

✅ Redis 缓存工具类(使用 Spring Data Redis):

@Component
public class RedisCache {@Autowiredprivate RedisTemplate<String, String> redisTemplate;// 设置缓存,带过期时间public void setWithExpire(String key, String value, long timeout, TimeUnit unit) {redisTemplate.opsForValue().set(key, value, timeout, unit);}// 获取缓存public String get(String key) {return redisTemplate.opsForValue().get(key);}// 批量获取缓存public List<String> multiGet(List<String> keys) {return redisTemplate.opsForValue().multiGet(keys);}
}

✅ 分批查询客户信息 + 并行流 + Redis 缓存:

@Service
public class CustomerInfoService {@Autowiredprivate RestHighLevelClient esClient;@Autowiredprivate RedisCache redisCache;private static final int BATCH_SIZE = 2000;private static final String CACHE_KEY_PREFIX = "cust_birthdate_";public Map<String, String> getCustomerBirthDates(List<String> customerNos) throws IOException {Map<String, String> result = new HashMap<>();// 去重客户号List<String> uniqueCustNos = customerNos.stream().distinct().collect(Collectors.toList());// 先查 Redis 缓存List<String> cacheKeys = uniqueCustNos.stream().map(custNo -> CACHE_KEY_PREFIX + custNo).collect(Collectors.toList());List<String> cachedValues = redisCache.multiGet(cacheKeys);Map<String, String> cachedMap = new HashMap<>();for (int i = 0; i < uniqueCustNos.size(); i++) {String custNo = uniqueCustNos.get(i);String cachedValue = cachedValues.get(i);if (cachedValue != null) {cachedMap.put(custNo, cachedValue);}}// 筛出未缓存的客户号List<String> notCached = uniqueCustNos.stream().filter(custNo -> !cachedMap.containsKey(custNo)).collect(Collectors.toList());// 分批查询 ESList<List<String>> batches = Lists.partition(notCached, BATCH_SIZE);batches.parallelStream().forEach(batch -> {try {SearchRequest searchRequest = new SearchRequest("customer_info_index");SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();sourceBuilder.query(QueryBuilders.boolQuery().filter(QueryBuilders.termsQuery("custNo", batch)));sourceBuilder.fetchSource(new String[]{"custNo", "birthDate"}, null);sourceBuilder.size(batch.size());searchRequest.source(sourceBuilder);SearchResponse response = esClient.search(searchRequest, RequestOptions.DEFAULT);for (SearchHit hit : response.getHits()) {Map<String, Object> source = hit.getSourceAsMap();String custNo = source.get("custNo").toString();String birthDate = source.get("birthDate").toString();String cacheKey = CACHE_KEY_PREFIX + custNo;// 放入结果 & 缓存result.put(custNo, birthDate);redisCache.setWithExpire(cacheKey, birthDate, 1, TimeUnit.DAYS);}} catch (IOException e) {e.printStackTrace();}});// 合并缓存和新查的数据result.putAll(cachedMap);return result;}
}

✅ 补充年龄字段逻辑:

public List<OpenAccountRecord> enrichWithAge(List<OpenAccountRecord> records) throws IOException {List<String> customerNos = records.stream().map(OpenAccountRecord::getCustNo).distinct().collect(Collectors.toList());// 查询客户出生日期(优先 Redis 缓存,未命中则查 ES)Map<String, String> birthDateMap = getCustomerBirthDates(customerNos);// 补充年龄字段for (OpenAccountRecord record : records) {String custNo = record.getCustNo();String birthDate = birthDateMap.get(custNo);if (birthDate != null) {int age = calculateAge(birthDate);record.setAge(age);}}return records;
}private int calculateAge(String birthDateStr) {LocalDate birthDate = LocalDate.parse(birthDateStr, DateTimeFormatter.ofPattern("yyyy-MM-dd"));LocalDate now = LocalDate.now();return now.getYear() - birthDate.getYear();
}

五、优化亮点总结

优化点说明
分批查询避免一次性查询超过 ES 限制
filter 查询不记分,提升性能
只查必要字段减少数据传输
并行流处理提升查询效率
Redis 缓存客户出生日期减少重复查询,支持分布式
设置缓存过期时间(1 天)保证数据新鲜度
构建映射表便于字段补充,代码结构清晰

六、遇到的难点与思考

🔍 难点一:分页没问题,导出才暴露问题

  • 分页只查 10 条客户号,ES 限制未触发
  • 导出时一次性查 7w 条客户号,才暴露数据截断问题

🔍 难点二:ES 默认限制不报错,只截断

  • ES 不会抛异常,只是返回前 1w 条数据
  • 容易造成数据丢失而不自知

🔍 难点三:缓存过期时间如何设置?

  • 客户出生日期不变,年龄每年更新一次
  • 设置缓存过期时间为 1 天,足够使用,且数据不会太旧

七、总结

这次优化让我深刻认识到:

缓存不是万能的,但合理使用缓存可以极大提升系统性能。

通过这次优化,我们解决了:

  • 年龄字段缺失问题
  • 数据完整性保障
  • 查询性能提升
  • 减少 ES 调用次数
  • 支持多节点部署下的缓存共享
  • 代码结构更清晰

如果你也在开发反欺诈、风控、数据报表等系统,遇到类似的 ES 分页、导出、数据关联问题,希望这篇文章能给你带来一些启发和参考。

http://www.xdnf.cn/news/15835.html

相关文章:

  • 基于单片机的智能家居安防系统设计
  • Linux文件系统三要素:块划分、分区管理与inode结构解析
  • Linux: rsync+inotify实时同步及rsync+sersync实时同步
  • Claude Code 逆向工程分析,探索最新Agent设计
  • 【机器学习深度学习】量化与选择小模型的区别:如何理解两者的优势与局限?
  • Day1||Vue指令学习
  • PyTorch的基础概念和复杂模型的基本使用
  • Facebook 开源多季节性时间序列数据预测工具:Prophet 快速入门 Quick Start
  • macOs上交叉编译ffmpeg及安装ffmpeg工具
  • 测试中的bug
  • 基于深度学习的自然语言处理:构建情感分析模型
  • urllib.parse.urlencode 的使用详解
  • AI+预测3D新模型百十个定位预测+胆码预测+去和尾2025年7月20日第144弹
  • Uniapp 纯前端台球计分器开发指南:能否上架微信小程序 打包成APP?
  • 安全信息与事件管理(SIEM)系统架构设计
  • 【前端】懒加载(组件/路由/图片等)+预加载 汇总
  • AI绘画生成东汉末年赵云全身像的精细提示词
  • 四、多频技术与复杂场景处理
  • 基于卷积傅里叶分析网络 (CFAN)的心电图分类的统一时频方法
  • SpringBoot3集成MapstructPlus
  • GaussDB select into和insert into的用法
  • 基于智慧经营系统的学校住宿登记报表分析与应用探究-毕业论文—仙盟创梦IDE
  • Qt--Widget类对象的构造函数分析
  • 上电复位断言的自动化
  • 网络安全初级(前端页面的编写分析)
  • Java 递归方法详解:从基础语法到实战应用,彻底掌握递归编程思想
  • C++STL系列之list
  • Spring Boot 第一天知识汇总
  • UE5多人MOBA+GAS 26、为角色添加每秒回血回蓝(番外:添加到UI上)
  • redis-plus-plus安装与使用