Search After+PIT 解决ES深度分页问题
1.深分页和search after 原理
深分页 (from/size) | search_after | |
数据定位 | 全局排序后跳过前 N 条 | 基于上一页最后一条的排序值定位 |
排序开销 | 每次请求重新全局排序 (O(N)) | 仅首次全局排序,后续游标跳转 (O(1)) |
内存消耗 | 堆内存存储完整结果集 (高风险OOM) | 无堆内存累积 (安全) |
分页深度限制 | from + size ≤ 10000 (默认限制) | 无深度限制 |
2. 性能对比
分页深度 | 深分页响应时间 | search_after响应时间 |
1 | 100ms | 100ms |
100 | 300ms | 110ms |
1000 | 1500ms | 120ms |
10000 | 超时/报错 | 130ms |
3. 适用场景
深分页 | search_after | |
典型场景 | 小数据量随机跳页 | 大数据量连续翻页(如日志流) |
排序要求 | 任意排序字段 | 必须指定唯一排序字段(如时间戳+ID) |
跳页能力 | 支持任意页跳转 | 仅支持顺序翻页 |
4. Maven依赖配置
<dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-java</artifactId><version>8.12.0</version>
</dependency>
5.ES分页服务类
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.SortOrder;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.search.Hit;
import co.elastic.clients.json.JsonData;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;@Slf4j
@Service
public class EsSearchAfterService {private final ElasticsearchClient esClient;// 初始化ES客户端(通过构造函数注入)public EsSearchAfterService(ElasticsearchClient esClient) {this.esClient = esClient;}/*** 分页查询(支持深度分页)* @param indexName 索引名称* @param query 查询条件* @param sortField 主排序字段* @param pageSize 每页大小* @param pitId 时间点ID(首次查询传null)* @param searchAfter 分页游标(首次查询传null)* @return 分页结果(包含数据、下次分页游标、新的pitId)*/public PageResult<Object> searchWithPagination(String indexName,Query query,String sortField,int pageSize,String pitId,List<JsonData> searchAfter) throws IOException {// 1. 创建或延续PIT上下文String currentPitId = pitId;if (currentPitId == null) {CreatePitResponse pitResponse = esClient.createPit(c -> c.index(indexName).keepAlive(a -> a.time("30m")));currentPitId = pitResponse.id();log.info("Created new PIT: {}", currentPitId);}try {// 2. 构建SearchRequestSearchRequest.Builder searchBuilder = new SearchRequest.Builder().size(pageSize).query(query).pit(p -> p.id(currentPitId).keepAlive(a -> a.time("30m"))).sort(s -> s.field(f -> f.field(sortField).order(SortOrder.Desc))).sort(s -> s.field(f -> f.field("_id").order(SortOrder.Asc)));if (searchAfter != null && !searchAfter.isEmpty()) {searchBuilder.searchAfter(searchAfter);}// 3. 执行查询SearchResponse<Object> response = esClient.search(searchBuilder.build(), Object.class);// 4. 解析结果List<Object> data = new ArrayList<>();List<JsonData> nextSearchAfter = Collections.emptyList();if (response.hits().hits() != null && !response.hits().hits().isEmpty()) {List<Hit<Object>> hits = response.hits().hits();for (Hit<Object> hit : hits) {data.add(hit.source());}// 获取最后一个排序值nextSearchAfter = hits.get(hits.size() - 1).sort();}return new PageResult<>(data, nextSearchAfter, currentPitId);} catch (Exception e) {// 清理无效PITif (currentPitId != null && !currentPitId.equals(pitId)) {esClient.deletePit(d -> d.id(currentPitId));}throw new RuntimeException("ES查询失败", e);}}/*** 关闭PIT上下文*/public void closePit(String pitId) throws IOException {if (pitId != null && !pitId.isEmpty()) {DeletePitResponse response = esClient.deletePit(d -> d.id(pitId));log.info("Closed PIT {}: {}", pitId, response.succeeded());}}// 分页结果封装类public record PageResult<T>(List<T> data,List<JsonData> nextSearchAfter,String pitId) {}
}
6. 业务层使用示例
@Service
@RequiredArgsConstructor
public class OrderQueryService {private final EsSearchAfterService esService;public PaginatedOrders queryOrders(int pageSize, String lastPitId, List<JsonData> lastSearchAfter) {try {// 1. 构建查询条件Query query = Query.of(q -> q.bool(b -> b.must(m -> m.term(t -> t.field("status").value("paid")))));// 2. 执行分页查询PageResult<Object> result = esService.searchWithPagination("orders",query,"order_time",pageSize,lastPitId,lastSearchAfter);// 3. 转换为业务DTOList<OrderDTO> orders = convertToDTO(result.data());return new PaginatedOrders(orders, result.nextSearchAfter(), result.pitId());} catch (IOException e) {throw new BusinessException("订单查询失败", e);}}// DTO转换逻辑(示例)private List<OrderDTO> convertToDTO(List<Object> esSources) {// 实现实际的转换逻辑return Collections.emptyList();}// 分页结果DTOpublic record PaginatedOrders(List<OrderDTO> orders,List<JsonData> nextSearchAfter,String pitId) {}
}
7. 控制器层实现
@RestController
@RequestMapping("/api/orders")
@RequiredArgsConstructor
public class OrderController {private final OrderQueryService orderService;@GetMappingpublic ResponseEntity<?> getOrders(@RequestParam(defaultValue = "20") int size,@RequestParam(required = false) String pitId,@RequestParam(required = false) List<String> searchAfter) {try {// 转换前端传来的searchAfter参数List<JsonData> searchAfterParams = Optional.ofNullable(searchAfter).orElse(Collections.emptyList()).stream().map(JsonData::of).toList();PaginatedOrders result = orderService.queryOrders(size, pitId, searchAfterParams);return ResponseEntity.ok().header("X-PIT-ID", result.pitId()).body(Map.of("data", result.orders(),"next_search_after", result.nextSearchAfter()));} catch (BusinessException e) {return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body(Map.of("error", e.getMessage()));}}@DeleteMapping("/pit/{pitId}")public ResponseEntity<?> closePitContext(@PathVariable String pitId) {try {orderService.closePit(pitId);return ResponseEntity.ok().build();} catch (Exception e) {return ResponseEntity.internalServerError().body(Map.of("error", "PIT关闭失败"));}}
}
8. 生产环境关键配置
elasticsearch:hosts: localhost:9200username: elasticpassword: your_passwordconnection-timeout: 30ssocket-timeout: 60s
9.ES客户端配置类
@Configuration
public class EsConfig {@Value("${elasticsearch.hosts}")private String hosts;@Beanpublic RestClient restClient() {return RestClient.builder(HttpHost.create(hosts)).setRequestConfigCallback(builder ->builder.setConnectTimeout(30000).setSocketTimeout(60000)).build();}@Beanpublic ElasticsearchClient elasticsearchClient(RestClient restClient) {ElasticsearchTransport transport = new RestClientTransport(restClient,new JacksonJsonpMapper());return new ElasticsearchClient(transport);}
}
10. 前端交互示例
无限滚动实现(React)
import React, { useState, useEffect } from 'react';const OrderList = () => {const [orders, setOrders] = useState([]);const [pitId, setPitId] = useState(null);const [searchAfter, setSearchAfter] = useState(null);const [loading, setLoading] = useState(false);const loadMore = async () => {setLoading(true);try {const params = new URLSearchParams({size: 20,...(pitId && { pitId }),...(searchAfter && { searchAfter: JSON.stringify(searchAfter) })});const response = await fetch(`/api/orders?${params}`);const { data, next_search_after } = await response.json();setOrders(prev => [...prev, ...data]);setSearchAfter(next_search_after);setPitId(response.headers.get('X-PIT-ID'));} finally {setLoading(false);}};// 组件卸载时清理PITuseEffect(() => {return () => {if (pitId) {fetch(`/api/orders/pit/${pitId}`, { method: 'DELETE' });}};}, [pitId]);return (<div>{/* 订单列表渲染 */}<button onClick={loadMore} disabled={loading}>{loading ? '加载中...' : '加载更多'}</button></div>);
};