当前位置: 首页 > news >正文

Search After+PIT 解决ES深度分页问题

1.深分页和search after 原理 

深分页 (from/size)search_after
数据定位全局排序后跳过前 N 条基于上一页最后一条的排序值定位
排序开销每次请求重新全局排序 (O(N))仅首次全局排序,后续游标跳转 (O(1))
内存消耗堆内存存储完整结果集 (高风险OOM)无堆内存累积 (安全)
分页深度限制from + size ≤ 10000 (默认限制)无深度限制

2. 性能对比 

分页深度 
深分页响应时间 
search_after响应时间
1100ms100ms
100300ms110ms
10001500ms120ms
10000
超时/报错
130ms
3. 适用场景
深分页search_after
典型场景小数据量随机跳页大数据量连续翻页(如日志流)
排序要求任意排序字段必须指定唯一排序字段(如时间戳+ID)
跳页能力支持任意页跳转仅支持顺序翻页

4. Maven依赖配置

<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-java</artifactId><version>8.12.0</version>
</dependency>

5.ES分页服务类

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.SortOrder;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.json.JsonData;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;@Slf4j
@Service
public class EsSearchAfterService {private final ElasticsearchClient esClient;// 初始化ES客户端(通过构造函数注入)public EsSearchAfterService(ElasticsearchClient esClient) {this.esClient = esClient;}/*** 分页查询(支持深度分页)* @param indexName 索引名称* @param query 查询条件* @param sortField 主排序字段* @param pageSize 每页大小* @param pitId 时间点ID(首次查询传null)* @param searchAfter 分页游标(首次查询传null)* @return 分页结果(包含数据、下次分页游标、新的pitId)*/public PageResult<Object> searchWithPagination(String indexName,Query query,String sortField,int pageSize,String pitId,List<JsonData> searchAfter) throws IOException {// 1. 创建或延续PIT上下文String currentPitId = pitId;if (currentPitId == null) {CreatePitResponse pitResponse = esClient.createPit(c -> c.index(indexName).keepAlive(a -> a.time("30m")));currentPitId = pitResponse.id();log.info("Created new PIT: {}", currentPitId);}try {// 2. 构建SearchRequestSearchRequest.Builder searchBuilder = new SearchRequest.Builder().size(pageSize).query(query).pit(p -> p.id(currentPitId).keepAlive(a -> a.time("30m"))).sort(s -> s.field(f -> f.field(sortField).order(SortOrder.Desc))).sort(s -> s.field(f -> f.field("_id").order(SortOrder.Asc)));if (searchAfter != null && !searchAfter.isEmpty()) {searchBuilder.searchAfter(searchAfter);}// 3. 执行查询SearchResponse<Object> response = esClient.search(searchBuilder.build(), Object.class);// 4. 解析结果List<Object> data = new ArrayList<>();List<JsonData> nextSearchAfter = Collections.emptyList();if (response.hits().hits() != null && !response.hits().hits().isEmpty()) {List<Hit<Object>> hits = response.hits().hits();for (Hit<Object> hit : hits) {data.add(hit.source());}// 获取最后一个排序值nextSearchAfter = hits.get(hits.size() - 1).sort();}return new PageResult<>(data, nextSearchAfter, currentPitId);} catch (Exception e) {// 清理无效PITif (currentPitId != null && !currentPitId.equals(pitId)) {esClient.deletePit(d -> d.id(currentPitId));}throw new RuntimeException("ES查询失败", e);}}/*** 关闭PIT上下文*/public void closePit(String pitId) throws IOException {if (pitId != null && !pitId.isEmpty()) {DeletePitResponse response = esClient.deletePit(d -> d.id(pitId));log.info("Closed PIT {}: {}", pitId, response.succeeded());}}// 分页结果封装类public record PageResult<T>(List<T> data,List<JsonData> nextSearchAfter,String pitId) {}
}

6. 业务层使用示例

@Service
@RequiredArgsConstructor
public class OrderQueryService {private final EsSearchAfterService esService;public PaginatedOrders queryOrders(int pageSize, String lastPitId, List<JsonData> lastSearchAfter) {try {// 1. 构建查询条件Query query = Query.of(q -> q.bool(b -> b.must(m -> m.term(t -> t.field("status").value("paid")))));// 2. 执行分页查询PageResult<Object> result = esService.searchWithPagination("orders",query,"order_time",pageSize,lastPitId,lastSearchAfter);// 3. 转换为业务DTOList<OrderDTO> orders = convertToDTO(result.data());return new PaginatedOrders(orders, result.nextSearchAfter(), result.pitId());} catch (IOException e) {throw new BusinessException("订单查询失败", e);}}// DTO转换逻辑(示例)private List<OrderDTO> convertToDTO(List<Object> esSources) {// 实现实际的转换逻辑return Collections.emptyList();}// 分页结果DTOpublic record PaginatedOrders(List<OrderDTO> orders,List<JsonData> nextSearchAfter,String pitId) {}
}

7. 控制器层实现

@RestController
@RequestMapping("/api/orders")
@RequiredArgsConstructor
public class OrderController {private final OrderQueryService orderService;@GetMappingpublic ResponseEntity<?> getOrders(@RequestParam(defaultValue = "20") int size,@RequestParam(required = false) String pitId,@RequestParam(required = false) List<String> searchAfter) {try {// 转换前端传来的searchAfter参数List<JsonData> searchAfterParams = Optional.ofNullable(searchAfter).orElse(Collections.emptyList()).stream().map(JsonData::of).toList();PaginatedOrders result = orderService.queryOrders(size, pitId, searchAfterParams);return ResponseEntity.ok().header("X-PIT-ID", result.pitId()).body(Map.of("data", result.orders(),"next_search_after", result.nextSearchAfter()));} catch (BusinessException e) {return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(Map.of("error", e.getMessage()));}}@DeleteMapping("/pit/{pitId}")public ResponseEntity<?> closePitContext(@PathVariable String pitId) {try {orderService.closePit(pitId);return ResponseEntity.ok().build();} catch (Exception e) {return ResponseEntity.internalServerError().body(Map.of("error", "PIT关闭失败"));}}
}

8. 生产环境关键配置

elasticsearch:hosts: localhost:9200username: elasticpassword: your_passwordconnection-timeout: 30ssocket-timeout: 60s

9.ES客户端配置类

@Configuration
public class EsConfig {@Value("${elasticsearch.hosts}")private String hosts;@Beanpublic RestClient restClient() {return RestClient.builder(HttpHost.create(hosts)).setRequestConfigCallback(builder ->builder.setConnectTimeout(30000).setSocketTimeout(60000)).build();}@Beanpublic ElasticsearchClient elasticsearchClient(RestClient restClient) {ElasticsearchTransport transport = new RestClientTransport(restClient,new JacksonJsonpMapper());return new ElasticsearchClient(transport);}
}

10. 前端交互示例

无限滚动实现(React)

import React, { useState, useEffect } from 'react';const OrderList = () => {const [orders, setOrders] = useState([]);const [pitId, setPitId] = useState(null);const [searchAfter, setSearchAfter] = useState(null);const [loading, setLoading] = useState(false);const loadMore = async () => {setLoading(true);try {const params = new URLSearchParams({size: 20,...(pitId && { pitId }),...(searchAfter && { searchAfter: JSON.stringify(searchAfter) })});const response = await fetch(`/api/orders?${params}`);const { data, next_search_after } = await response.json();setOrders(prev => [...prev, ...data]);setSearchAfter(next_search_after);setPitId(response.headers.get('X-PIT-ID'));} finally {setLoading(false);}};// 组件卸载时清理PITuseEffect(() => {return () => {if (pitId) {fetch(`/api/orders/pit/${pitId}`, { method: 'DELETE' });}};}, [pitId]);return (<div>{/* 订单列表渲染 */}<button onClick={loadMore} disabled={loading}>{loading ? '加载中...' : '加载更多'}</button></div>);
};

http://www.xdnf.cn/news/324613.html

相关文章:

  • react+ts中函数组件父子通信方式
  • C#——NET Core 中实现汉字转拼音
  • Spring MVC Controller 方法的返回类型有哪些?
  • 项目优先级频繁变动,如何应对?
  • C++入门之认识整型
  • 使用OpenCV 和 Dlib 实现人脸融合技术
  • shell(11)
  • 使用ffmpeg截取MP3等音频片段
  • MCP Client适配DeepSeek
  • SpringBoot 集成 Ehcache 实现本地缓存
  • Vue3 自定义指令的原理,以及应用
  • Ubuntu 单机多卡部署脚本: vLLM + DeepSeek 70B
  • ERP进销存系统源码,SaaS模式多租户ERP管理系统,SpringBoot、Vue、UniAPP技术框架
  • 基于nnom的多选择器
  • springboot国家化多语言实现
  • mybatis-plus分页查询count语句为什么没有left join
  • 正则表达式非捕获分组?:
  • CHAPTER 17 Iterators, Generators, and Classic Coroutines
  • 构建高质量数据湖:大数据治理在湖仓一体架构下的实践指南
  • mathtype转化
  • Vivo 手机官网交互效果实现解析
  • arXiv论文 MALOnt: An Ontology for Malware Threat Intelligence
  • ubuntu中解决matplotlib无法显示中文问题
  • 【MVCP】基于解纠缠表示学习和跨模态-上下文关联挖掘的多模态情感分析
  • 码蹄集——平方根X、整除幸运数
  • Rust 与 Golang 深度对决:从语法到应用场景的全方位解析
  • 平面坐标系中判断点P是否在线段上AB上的常用方法总结
  • 【渗透测试】命令执行漏洞的原理、利用方式、防范措施
  • 滚动条样式
  • 数据治理与数据资产管理研究方向展望