【ElasticSearch】客户端选择
【ElasticSearch】客户端选择
- 【一】springboot整合es的客户端工具
- 【1】可选工具和客户端
- (1)ElasticsearchRepository
- (2)ElasticsearchRestTemplate(或旧版的ElasticsearchTemplate):
- (3)High Level REST Client
- (4)Java API Client
- 【2】如何选择
- 【3】如何使用
- (1)ElasticsearchRepository 使用
- (2)ElasticsearchRestTemplate 使用
- (3)新的Java API Client使用(Elasticsearch 7.17+)
- 【二】ElasticsearchRepository使用案例
- 【1】如何使用
- (1)步骤1:定义实体类
- (2)步骤2:创建继承ElasticsearchRepository的接口
- (3)步骤3:使用该接口进行CRUD操作
- (4)配置类
- 【2】可用方法
- (1)基本 CRUD 方法
- (2)分页和排序
- (3)自定义查询方法
- 【三】ElasticsearchRestTemplate使用案例
- 【1】核心功能
- 【2】可用方法
- (1)文档操作方法
- (2)查询操作方法
- (3)聚合分析方法
- (4)索引管理方法
- 【3】使用案例
- (1)配置类
- (2)实体类
- (3)服务层实现
- 【4】实践优化
- (1)批量操作优化
- (2)高校分页查询
- 【四】Java API Client使用案例
- 【1】介绍
- 【2】核心功能
- (1)主要模块
- (2)主要类
- 【3】使用案例
- (1)maven配置
- (2)配置类
- (3)实体类
- (4)服务层实现
- 【4】高级用法
- (1)异步操作
- (2)复杂聚合
- (3)自定义json映射
【一】springboot整合es的客户端工具
【1】可选工具和客户端
(1)ElasticsearchRepository
这是Spring Data Elasticsearch提供的一个接口,类似于Spring Data JPA的Repository。它提供了基本的CRUD操作和简单的查询方法(通过方法名或注解定义查询)。适用于简单的CRUD操作和查询,能够快速开发。
(2)ElasticsearchRestTemplate(或旧版的ElasticsearchTemplate):
(1)ElasticsearchTemplate是Spring Data Elasticsearch早期版本中的主要类,基于TransportClient(已弃用)。
(2)ElasticsearchRestTemplate是Spring Data Elasticsearch 3.2.x及以上版本推荐的类,基于High Level REST Client。它提供了更底层的操作,可以执行复杂的查询和聚合,适用于需要高度自定义查询的场景。
(3)High Level REST Client
Elasticsearch官方提供的Java高级 REST 客户端,用于与Elasticsearch集群通信。它提供了所有Elasticsearch操作的方法,但使用起来相对繁琐,需要手动构建请求和解析响应。在Spring Data Elasticsearch中,通常不需要直接使用,因为ElasticsearchRestTemplate已经对其进行了封装。
(4)Java API Client
Elasticsearch 7.15及以上版本引入了新的Java API客户端,这是一个基于Jackson的强类型客户端,提供了更好的类型安全和性能。但是,Spring Data Elasticsearch目前(截至3.2.x)还没有完全整合这个新客户端。
【2】如何选择
(1)如果只需要基本的CRUD和简单查询,推荐使用ElasticsearchRepository,因为它使用简单,代码量少。
(2)如果需要执行复杂的查询、聚合、或者需要更灵活地控制查询过程,那么应该使用ElasticsearchRestTemplate。
(3)如果Spring Data Elasticsearch提供的功能无法满足需求(例如,使用一些非常新的Elasticsearch特性),可以考虑直接使用Elasticsearch的Java API Client,但这样会失去Spring Data的便利性。
【3】如何使用
(1)ElasticsearchRepository 使用
(1)创建Repository接口
public interface ProductRepository extends ElasticsearchRepository<Product, String> {// 自定义查询方法List<Product> findByName(String name);List<Product> findByPriceBetween(Double minPrice, Double maxPrice);Page<Product> findByCategory(String category, Pageable pageable);// 使用@Query注解自定义DSL@Query("{\"match\": {\"name\": \"?0\"}}")Page<Product> findByNameCustom(String name, Pageable pageable);
}
(2)使用示例
@Service
@RequiredArgsConstructor
public class ProductService {private final ProductRepository productRepository;public Page<Product> searchProducts(String keyword, int page, int size) {return productRepository.findByNameCustom(keyword, PageRequest.of(page, size, Sort.by("price").descending()));}public List<Product> findProductsByPriceRange(double min, double max) {return productRepository.findByPriceBetween(min, max);}
}
(2)ElasticsearchRestTemplate 使用
(1)配置类
@Configuration
public class ElasticsearchConfig {@Beanpublic ElasticsearchRestTemplate elasticsearchRestTemplate(ElasticsearchRestClient elasticsearchRestClient) {return new ElasticsearchRestTemplate(elasticsearchRestClient);}
}
(2)复杂查询实现
@Service
@RequiredArgsConstructor
public class ProductSearchService {private final ElasticsearchRestTemplate elasticsearchRestTemplate;public SearchPage<Product> complexSearch(String keyword, String category, Double minPrice, Double maxPrice, int page, int size) {// 构建布尔查询BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();if (StringUtils.hasText(keyword)) {boolQuery.must(QueryBuilders.matchQuery("name", keyword).boost(2.0f));}if (StringUtils.hasText(category)) {boolQuery.must(QueryBuilders.termQuery("category", category));}if (minPrice != null || maxPrice != null) {RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("price");if (minPrice != null) rangeQuery.gte(minPrice);if (maxPrice != null) rangeQuery.lte(maxPrice);boolQuery.must(rangeQuery);}// 构建分页和排序NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().withQuery(boolQuery).withPageable(PageRequest.of(page, size)).withSort(SortBuilders.fieldSort("price").order(SortOrder.DESC)).build();SearchHits<Product> searchHits = elasticsearchRestTemplate.search(searchQuery, Product.class);return SearchHitSupport.searchPageFor(searchHits, searchQuery.getPageable());}// 聚合查询示例public Map<String, Long> getCategoryStats() {TermsAggregationBuilder aggregation = AggregationBuilders.terms("category_agg").field("category").size(10);NativeSearchQuery searchQuery = new NativeSearchQueryBuilder().addAggregation(aggregation).build();SearchHits<Product> searchHits = elasticsearchRestTemplate.search(searchQuery, Product.class);Terms terms = searchHits.getAggregations().get("category_agg");return terms.getBuckets().stream().collect(Collectors.toMap(Terms.Bucket::getKeyAsString, Terms.Bucket::getDocCount));}
}
(3)新的Java API Client使用(Elasticsearch 7.17+)
(1)添加依赖
<dependency><groupId>org.springframework.data</groupId><artifactId>spring-data-elasticsearch</artifactId><version>5.1.0</version>
</dependency>
<dependency><groupId>co.elastic.clients</groupId><artifactId>elasticsearch-java</artifactId><version>8.11.4</version>
</dependency>
(2)配置客户端
@Configuration
public class ElasticsearchClientConfig {@Value("${spring.elasticsearch.uris}")private String[] elasticsearchUris;@Beanpublic ElasticsearchClient elasticsearchClient() {// 创建低级客户端RestClient restClient = RestClient.builder(HttpHost.create(elasticsearchUris[0])).build();// 创建传输层ElasticsearchTransport transport = new RestClientTransport(restClient, new JacksonJsonpMapper());// 创建API客户端return new ElasticsearchClient(transport);}
}
(3)使用Java API Client
@Service
@RequiredArgsConstructor
public class ProductJavaClientService {private final ElasticsearchClient elasticsearchClient;public void createProduct(Product product) throws IOException {elasticsearchClient.index(i -> i.index("products").id(product.getId()).document(product));}public List<Product> searchProducts(String keyword) throws IOException {SearchResponse<Product> response = elasticsearchClient.search(s -> s.index("products").query(q -> q.match(m -> m.field("name").query(keyword))),Product.class);return response.hits().hits().stream().map(Hit::source).collect(Collectors.toList());}
}
【二】ElasticsearchRepository使用案例
【1】如何使用
(1)步骤1:定义实体类
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;@Document(indexName = "products")
@Data
public class Product {@Idprivate String id;@Field(type = FieldType.Text, analyzer = "ik_max_word")private String name;@Field(type = FieldType.Double)private Double price;@Field(type = FieldType.Keyword)private String category;@Field(type = FieldType.Integer)private Integer stock;// 构造方法public Product() {}public Product(String name, Double price, String category, Integer stock) {this.name = name;this.price = price;this.category = category;this.stock = stock;}}
(2)步骤2:创建继承ElasticsearchRepository的接口
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.annotations.Query;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;import java.util.List;public interface ProductRepository extends ElasticsearchRepository<Product, String> {// 1. 方法名派生查询List<Product> findByName(String name);List<Product> findByPriceBetween(Double minPrice, Double maxPrice);List<Product> findByCategoryAndStockGreaterThan(String category, Integer stock);Page<Product> findByCategory(String category, Pageable pageable);// 2. 使用@Query自定义查询@Query("{\"match\": {\"name\": \"?0\"}}")List<Product> findByNameCustom(String name);@Query("{\"range\": {\"price\": {\"gte\": ?0, \"lte\": ?1}}}")List<Product> findByPriceRange(Double minPrice, Double maxPrice);@Query("{\"bool\": {\"must\": [{\"match\": {\"category\": \"?0\"}}, {\"range\": {\"stock\": {\"gt\": ?1}}}]}}")List<Product> findByCategoryAndStockGreaterThanCustom(String category, Integer stock);// 3. 聚合查询@Query("{\"aggs\": {\"category_count\": {\"terms\": {\"field\": \"category.keyword\"}}}}")Map<String, Long> countByCategory();
}
(3)步骤3:使用该接口进行CRUD操作
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Sort;
import org.springframework.stereotype.Service;import java.util.List;
import java.util.Optional;@Service
public class ProductService {private final ProductRepository productRepository;@Autowiredpublic ProductService(ProductRepository productRepository) {this.productRepository = productRepository;}// 保存产品public Product saveProduct(Product product) {return productRepository.save(product);}// 批量保存public Iterable<Product> saveProducts(List<Product> products) {return productRepository.saveAll(products);}// 根据ID查询public Optional<Product> findById(String id) {return productRepository.findById(id);}// 检查是否存在public boolean existsById(String id) {return productRepository.existsById(id);}// 根据名称查询public List<Product> findByName(String name) {return productRepository.findByName(name);}// 价格范围查询public List<Product> findByPriceRange(Double minPrice, Double maxPrice) {return productRepository.findByPriceBetween(minPrice, maxPrice);}// 分类分页查询public Page<Product> findByCategory(String category, int page, int size) {return productRepository.findByCategory(category, PageRequest.of(page, size));}// 自定义查询public List<Product> findByNameCustom(String name) {return productRepository.findByNameCustom(name);}// 删除产品public void deleteProduct(String id) {productRepository.deleteById(id);}// 获取所有产品(排序)public List<Product> findAllSortedByPrice() {return (List<Product>) productRepository.findAll(Sort.by(Sort.Direction.DESC, "price"));}// 分页查询所有产品public Page<Product> findAllProducts(int page, int size) {return productRepository.findAll(PageRequest.of(page, size));}// 按分类统计产品数量public Map<String, Long> countProductsByCategory() {return productRepository.countByCategory();}
}
(4)配置类
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
import org.springframework.data.elasticsearch.config.AbstractElasticsearchConfiguration;
import org.springframework.data.elasticsearch.repository.config.EnableElasticsearchRepositories;@Configuration
@EnableElasticsearchRepositories(basePackages = "com.example.repository")
public class ElasticsearchConfig extends AbstractElasticsearchConfiguration {@Override@Beanpublic RestHighLevelClient elasticsearchClient() {ClientConfiguration clientConfiguration = ClientConfiguration.builder().connectedTo("localhost:9200").build();return RestClients.create(clientConfiguration).rest();}
}
【2】可用方法
(1)基本 CRUD 方法
save(S entity):保存单个实体
saveAll(Iterable entities):批量保存
findById(ID id):根据ID查询
existsById(ID id):检查是否存在
findAll():查询所有文档
findAllById(Iterable ids):根据ID列表查询
count():统计文档数量
deleteById(ID id):根据ID删除
delete(T entity):删除实体
deleteAll():删除所有文档
(2)分页和排序
findAll(Pageable pageable):分页查询
findAll(Sort sort):排序查询
(3)自定义查询方法
方法名派生查询:findBy[Field][Operation]
@Query注解自定义查询
【三】ElasticsearchRestTemplate使用案例
【1】核心功能
ElasticsearchRestTemplate是 Spring Data Elasticsearch 提供的高级操作类,比 ElasticsearchRepository更灵活,支持更复杂的操作:
文档操作:索引、更新、删除文档
查询操作:执行各种类型的查询(匹配、范围、布尔等)
聚合分析:执行统计、分组等聚合操作
索引管理:创建、删除索引,管理映射
批量操作:高效执行批量索引/删除
脚本支持:执行脚本更新
地理空间查询:执行地理位置相关查询
【2】可用方法
(1)文档操作方法
index(T entity):索引单个文档
save(T entity):保存/更新文档
bulkIndex(List queries):批量索引文档
delete(String id, Class clazz):删除文档
delete(Query query, Class clazz):按查询删除文档
deleteByQuery(Query query, Class clazz):按查询删除文档
get(String id, Class clazz):根据ID获取文档
(2)查询操作方法
search(Query query, Class clazz):执行查询
search(SearchQuery query, Class clazz):执行搜索查询
queryForList(Query query, Class clazz):查询文档列表
queryForPage(Query query, Class clazz):分页查询
suggest(SuggestBuilder suggestion, Class clazz):执行建议查询
(3)聚合分析方法
aggregate(Aggregation aggregation, Class clazz):执行聚合
query(Query query, ResultsExtractor resultsExtractor):自定义结果提取
(4)索引管理方法
indexOps(Class clazz):获取索引操作接口
createIndex(Class clazz):创建索引
deleteIndex(Class clazz):删除索引
putMapping(Class clazz):更新映射
refresh(Class clazz):刷新索引
【3】使用案例
(1)配置类
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.elasticsearch.client.ClientConfiguration;
import org.springframework.data.elasticsearch.client.RestClients;
import org.springframework.data.elasticsearch.core.ElasticsearchRestTemplate;@Configuration
public class ElasticsearchConfig {@Beanpublic RestHighLevelClient restHighLevelClient() {ClientConfiguration clientConfiguration = ClientConfiguration.builder().connectedTo("localhost:9200").withConnectTimeout(5000).withSocketTimeout(60000).build();return RestClients.create(clientConfiguration).rest();}@Beanpublic ElasticsearchRestTemplate elasticsearchRestTemplate() {return new ElasticsearchRestTemplate(restHighLevelClient());}
}
(2)实体类
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import org.springframework.data.elasticsearch.annotations.GeoPointField;
import org.springframework.data.elasticsearch.core.geo.GeoPoint;@Document(indexName = "products")
public class Product {@Idprivate String id;@Field(type = FieldType.Text, analyzer = "ik_max_word")private String name;@Field(type = FieldType.Double)private Double price;@Field(type = FieldType.Keyword)private String category;@Field(type = FieldType.Integer)private Integer stock;@Field(type = FieldType.Date, format = DateFormat.date_hour_minute_second)private Date createdAt;@GeoPointFieldprivate GeoPoint location;// 构造方法、Getter和Setterpublic Product() {}public Product(String name, Double price, String category, Integer stock) {this.name = name;this.price = price;this.category = category;this.stock = stock;this.createdAt = new Date();}// 省略Getter和Setter...
}
(3)服务层实现
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.Avg;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageImpl;
import org.springframework.data.domain.PageRequest;
import org.springframework.data.domain.Pageable;
import org.springframework.data.elasticsearch.core.*;
import org.springframework.data.elasticsearch.core.geo.GeoPoint;
import org.springframework.data.elasticsearch.core.query.*;
import org.springframework.stereotype.Service;import java.util.*;
import java.util.stream.Collectors;@Service
public class ProductService {private final ElasticsearchRestTemplate elasticsearchRestTemplate;@Autowiredpublic ProductService(ElasticsearchRestTemplate elasticsearchRestTemplate) {this.elasticsearchRestTemplate = elasticsearchRestTemplate;}// 1. 索引单个文档public String indexProduct(Product product) {IndexQuery indexQuery = new IndexQueryBuilder().withObject(product).build();return elasticsearchRestTemplate.index(indexQuery, IndexCoordinates.of("products"));}// 2. 批量索引文档public List<String> bulkIndexProducts(List<Product> products) {List<IndexQuery> queries = products.stream().map(product -> new IndexQueryBuilder().withObject(product).build()).collect(Collectors.toList());return elasticsearchRestTemplate.bulkIndex(queries, IndexCoordinates.of("products"));}// 3. 根据ID获取文档public Product getProductById(String id) {return elasticsearchRestTemplate.get(id, Product.class);}// 4. 根据ID删除文档public String deleteProductById(String id) {return elasticsearchRestTemplate.delete(id, Product.class);}// 5. 简单匹配查询public List<Product> searchByName(String name) {Query query = new NativeSearchQueryBuilder().withQuery(QueryBuilders.matchQuery("name", name)).build();SearchHits<Product> hits = elasticsearchRestTemplate.search(query, Product.class);return hits.stream().map(SearchHit::getContent).collect(Collectors.toList());}// 6. 分页查询public Page<Product> searchByCategory(String category, int page, int size) {Pageable pageable = PageRequest.of(page, size);Query query = new NativeSearchQueryBuilder().withQuery(QueryBuilders.termQuery("category", category)).withPageable(pageable).build();SearchHits<Product> hits = elasticsearchRestTemplate.search(query, Product.class);List<Product> products = hits.stream().map(SearchHit::getContent).collect(Collectors.toList());return new PageImpl<>(products, pageable, hits.getTotalHits());}// 7. 范围查询public List<Product> findByPriceRange(double minPrice, double maxPrice) {Query query = new NativeSearchQueryBuilder().withQuery(QueryBuilders.rangeQuery("price").gte(minPrice).lte(maxPrice)).build();SearchHits<Product> hits = elasticsearchRestTemplate.search(query, Product.class);return hits.stream().map(SearchHit::getContent).collect(Collectors.toList());}// 8. 布尔查询public List<Product> complexSearch(String keyword, String category, double minPrice) {Query query = new NativeSearchQueryBuilder().withQuery(QueryBuilders.boolQuery().must(QueryBuilders.matchQuery("name", keyword)).must(QueryBuilders.termQuery("category", category)).must(QueryBuilders.rangeQuery("price").gte(minPrice))).build();SearchHits<Product> hits = elasticsearchRestTemplate.search(query, Product.class);return hits.stream().map(SearchHit::getContent).collect(Collectors.toList());}// 9. 聚合分析 - 按类别统计数量public Map<String, Long> countByCategory() {Query query = new NativeSearchQueryBuilder().addAggregation(AggregationBuilders.terms("by_category").field("category.keyword")).build();SearchHits<Product> hits = elasticsearchRestTemplate.search(query, Product.class);Terms terms = hits.getAggregations().get("by_category");return terms.getBuckets().stream().collect(Collectors.toMap(Terms.Bucket::getKeyAsString,Terms.Bucket::getDocCount));}// 10. 聚合分析 - 计算平均价格public double averagePrice() {Query query = new NativeSearchQueryBuilder().addAggregation(AggregationBuilders.avg("avg_price").field("price")).build();SearchHits<Product> hits = elasticsearchRestTemplate.search(query, Product.class);Avg avg = hits.getAggregations().get("avg_price");return avg.getValue();}// 11. 更新文档 - 部分更新public void updateProductStock(String id, int newStock) {Map<String, Object> params = new HashMap<>();params.put("stock", newStock);UpdateQuery updateQuery = UpdateQuery.builder(id).withParams(params).build();elasticsearchRestTemplate.update(updateQuery, IndexCoordinates.of("products"));}// 12. 脚本更新public void increaseProductPrice(String id, double amount) {Map<String, Object> params = new HashMap<>();params.put("amount", amount);UpdateQuery updateQuery = UpdateQuery.builder(id).withScript("ctx._source.price += params.amount").withParams(params).build();elasticsearchRestTemplate.update(updateQuery, IndexCoordinates.of("products"));}// 13. 按查询更新public long increaseStockForCategory(String category, int amount) {UpdateQuery updateQuery = UpdateQuery.builder(QueryBuilders.termQuery("category", category)).withScript("ctx._source.stock += params.amount").withParams(Collections.singletonMap("amount", amount)).build();ByQueryResponse response = elasticsearchRestTemplate.updateByQuery(updateQuery, IndexCoordinates.of("products"));return response.getUpdated();}// 14. 按查询删除public long deleteByCategory(String category) {Query query = new NativeSearchQueryBuilder().withQuery(QueryBuilders.termQuery("category", category)).build();return elasticsearchRestTemplate.delete(query, Product.class);}// 15. 地理位置查询 - 附近的产品public List<Product> findNearbyProducts(double lat, double lon, double distance) {Query query = new NativeSearchQueryBuilder().withQuery(QueryBuilders.geoDistanceQuery("location").point(lat, lon).distance(distance + "km")).build();SearchHits<Product> hits = elasticsearchRestTemplate.search(query, Product.class);return hits.stream().map(SearchHit::getContent).collect(Collectors.toList());}// 16. 高亮显示public List<SearchHit<Product>> searchWithHighlight(String keyword) {Query query = new NativeSearchQueryBuilder().withQuery(QueryBuilders.matchQuery("name", keyword)).withHighlightFields(new HighlightBuilder.Field("name").preTags("<em>").postTags("</em>")).build();return elasticsearchRestTemplate.search(query, Product.class).getSearchHits();}// 17. 自动补全public List<String> suggestNames(String prefix) {SuggestBuilder suggestBuilder = new SuggestBuilder().addSuggestion("name-suggest", SuggestBuilders.completionSuggestion("suggest").prefix(prefix).skipDuplicates(true).size(10));SuggestResponse response = elasticsearchRestTemplate.suggest(suggestBuilder, Product.class);return response.getSuggest().getSuggestion("name-suggest").getEntries().get(0).getOptions().stream().map(Suggest.Suggestion.Entry.Option::getText).collect(Collectors.toList());}// 18. 索引管理 - 创建索引public boolean createProductIndex() {return elasticsearchRestTemplate.indexOps(Product.class).create();}// 19. 索引管理 - 更新映射public boolean updateProductMapping() {return elasticsearchRestTemplate.indexOps(Product.class).putMapping();}// 20. 索引管理 - 刷新索引public void refreshProductIndex() {elasticsearchRestTemplate.indexOps(Product.class).refresh();}
}
【4】实践优化
(1)批量操作优化
public void bulkInsert(List<Product> products) {List<IndexQuery> queries = new ArrayList<>();for (Product product : products) {IndexQuery query = new IndexQuery();query.setId(product.getId());query.setObject(product);queries.add(query);}// 分批处理,每批1000条int batchSize = 1000;for (int i = 0; i < queries.size(); i += batchSize) {int end = Math.min(i + batchSize, queries.size());List<IndexQuery> batch = queries.subList(i, end);elasticsearchRestTemplate.bulkIndex(batch, Product.class);}
}
(2)高校分页查询
public List<Product> efficientPagination(String category, int page, int size) {// 使用search_after实现高效分页Query query = new NativeSearchQueryBuilder().withQuery(QueryBuilders.termQuery("category", category)).withSort(SortBuilders.fieldSort("price").order(SortOrder.DESC)).withPageable(PageRequest.of(0, size)) // 第一页.build();SearchHits<Product> hits = elasticsearchRestTemplate.search(query, Product.class);List<Object> searchAfter = hits.getSearchHits().get(hits.getSearchHits().size() - 1).getSortValues();// 获取下一页Query nextPageQuery = new NativeSearchQueryBuilder().withQuery(QueryBuilders.termQuery("category", category)).withSort(SortBuilders.fieldSort("price").order(SortOrder.DESC)).withSearchAfter(searchAfter.toArray()).withPageable(PageRequest.of(0, size)).build();return elasticsearchRestTemplate.search(nextPageQuery, Product.class).stream().map(SearchHit::getContent).collect(Collectors.toList());
}
【四】Java API Client使用案例
【1】介绍
Elasticsearch Java API Client 是 Elasticsearch 官方推出的新一代 Java 客户端,具有以下特点:
强类型:所有请求和响应都是类型安全的
现代化设计:基于 Elasticsearch DSL 构建
异步支持:内置异步操作支持
模块化:按需引入不同功能模块
与 REST API 完全对应:1:1 映射 REST API
【2】核心功能
(1)主要模块
elasticsearch-java:核心模块
elasticsearch-java-api:API 模块
elasticsearch-transport:传输层
elasticsearch-x-content:JSON 处理
(2)主要类
ElasticsearchClient:主客户端类
*Request:各种操作请求(IndexRequest, SearchRequest 等)
*Response:各种操作响应
Query:查询构建器
Aggregation:聚合构建器
【3】使用案例
(1)maven配置
<dependencies><!-- 核心依赖 --><dependency><groupId>co.elastic.clients</groupId><artifactId>elasticsearch-java</artifactId><version>8.11.4</version></dependency><!-- JSON 处理 --><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId><version>2.15.2</version></dependency><!-- HTTP 客户端 --><dependency><groupId>org.apache.httpcomponents.client5</groupId><artifactId>httpclient5</artifactId><version>5.2.1</version></dependency>
</dependencies>
(2)配置类
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.client.RestClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class ElasticsearchConfig {@Beanpublic ElasticsearchClient elasticsearchClient() {// 1. 创建低级客户端RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200)).setHttpClientConfigCallback(httpClientBuilder -> {// 认证配置CredentialsProvider credentialsProvider = new BasicCredentialsProvider();credentialsProvider.setCredentials(AuthScope.ANY,new UsernamePasswordCredentials("elastic", "your-password"));return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider).setSSLHostnameVerifier((hostname, session) -> true); // 开发环境禁用主机名验证}).build();// 2. 创建传输层ElasticsearchTransport transport = new RestClientTransport(restClient,new JacksonJsonpMapper());// 3. 创建API客户端return new ElasticsearchClient(transport);}
}
(3)实体类
import com.fasterxml.jackson.annotation.JsonFormat;
import com.fasterxml.jackson.annotation.JsonProperty;import java.time.LocalDateTime;
import java.util.List;@Data
public class Product {private String id;private String name;private String description;private double price;private String category;private int stock;private List<String> tags;@JsonProperty("created_at")@JsonFormat(pattern = "yyyy-MM-dd HH:mm:ss")private LocalDateTime createdAt;private Location location;// 嵌套对象public static class Location {private double lat;private double lon;// Getter和Setterpublic double getLat() { return lat; }public void setLat(double lat) { this.lat = lat; }public double getLon() { return lon; }public void setLon(double lon) { this.lon = lon; }}}
(4)服务层实现
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.*;
import co.elastic.clients.elasticsearch._types.aggregations.*;
import co.elastic.clients.elasticsearch._types.query_dsl.*;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.core.search.*;
import co.elastic.clients.json.JsonData;
import co.elastic.clients.util.ObjectBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.io.IOException;
import java.time.LocalDateTime;
import java.util.*;
import java.util.stream.Collectors;@Service
public class ProductService {private static final String INDEX_NAME = "products";private final ElasticsearchClient client;@Autowiredpublic ProductService(ElasticsearchClient client) {this.client = client;}// 1. 创建索引public void createIndex() throws IOException {client.indices().create(c -> c.index(INDEX_NAME).settings(s -> s.numberOfShards("3").numberOfReplicas("1")).mappings(m -> m.properties("name", p -> p.text(t -> t.analyzer("ik_max_word"))).properties("description", p -> p.text(t -> t.analyzer("ik_smart"))).properties("price", p -> p.double_(d -> d)).properties("category", p -> p.keyword(k -> k)).properties("stock", p -> p.integer(i -> i)).properties("tags", p -> p.keyword(k -> k)).properties("created_at", p -> p.date(d -> d.format("yyyy-MM-dd HH:mm:ss"))).properties("location", p -> p.geoPoint(g -> g))));}// 2. 索引单个文档public String indexProduct(Product product) throws IOException {IndexResponse response = client.index(i -> i.index(INDEX_NAME).id(product.getId()).document(product));return response.id();}// 3. 批量索引文档public List<String> bulkIndexProducts(List<Product> products) throws IOException {List<BulkOperation> operations = products.stream().map(product -> BulkOperation.of(op -> op.index(idx -> idx.index(INDEX_NAME).id(product.getId()).document(product)))).collect(Collectors.toList());BulkResponse response = client.bulk(b -> b.index(INDEX_NAME).operations(operations));return response.items().stream().map(BulkResponseItem::id).collect(Collectors.toList());}// 4. 根据ID获取文档public Product getProductById(String id) throws IOException {GetResponse<Product> response = client.get(g -> g.index(INDEX_NAME).id(id),Product.class);if (response.found()) {return response.source();}return null;}// 5. 根据ID删除文档public boolean deleteProductById(String id) throws IOException {DeleteResponse response = client.delete(d -> d.index(INDEX_NAME).id(id));return response.result() == Result.Deleted;}// 6. 简单匹配查询public List<Product> searchByName(String name) throws IOException {SearchResponse<Product> response = client.search(s -> s.index(INDEX_NAME).query(q -> q.match(m -> m.field("name").query(name))),Product.class);return extractProducts(response);}// 7. 多字段搜索public List<Product> multiMatchSearch(String query) throws IOException {SearchResponse<Product> response = client.search(s -> s.index(INDEX_NAME).query(q -> q.multiMatch(m -> m.fields("name", "description", "tags").query(query))),Product.class);return extractProducts(response);}// 8. 布尔查询public List<Product> complexSearch(String keyword, String category, double minPrice) throws IOException {SearchResponse<Product> response = client.search(s -> s.index(INDEX_NAME).query(q -> q.bool(b -> b.must(m -> m.match(t -> t.field("name").query(keyword))).must(m -> m.term(t -> t.field("category").value(category))).must(m -> m.range(r -> r.field("price").gte(JsonData.of(minPrice)))))),Product.class);return extractProducts(response);}// 9. 范围查询public List<Product> findByPriceRange(double min, double max) throws IOException {SearchResponse<Product> response = client.search(s -> s.index(INDEX_NAME).query(q -> q.range(r -> r.field("price").gte(JsonData.of(min)).lte(JsonData.of(max)))),Product.class);return extractProducts(response);}// 10. 分页查询public List<Product> findByCategoryPaginated(String category, int page, int size) throws IOException {SearchResponse<Product> response = client.search(s -> s.index(INDEX_NAME).query(q -> q.term(t -> t.field("category").value(category))).from(page * size).size(size),Product.class);return extractProducts(response);}// 11. 聚合分析 - 按类别统计数量public Map<String, Long> countByCategory() throws IOException {SearchResponse<Product> response = client.search(s -> s.index(INDEX_NAME).size(0).aggregations("by_category", a -> a.terms(t -> t.field("category.keyword").size(100))),Product.class);return response.aggregations().get("by_category").sterms().buckets().array().stream().collect(Collectors.toMap(StringTermsBucket::key,StringTermsBucket::docCount));}// 12. 聚合分析 - 计算平均价格public double averagePrice() throws IOException {SearchResponse<Product> response = client.search(s -> s.index(INDEX_NAME).size(0).aggregations("avg_price", a -> a.avg(av -> av.field("price"))),Product.class);AvgAggregate avg = response.aggregations().get("avg_price").avg();return avg.value();}// 13. 更新文档 - 部分更新public void updateProductStock(String id, int newStock) throws IOException {client.update(u -> u.index(INDEX_NAME).id(id).doc(Map.of("stock", newStock)),Product.class);}// 14. 脚本更新public void increaseProductPrice(String id, double amount) throws IOException {client.update(u -> u.index(INDEX_NAME).id(id).script(s -> s.inline(i -> i.source("ctx._source.price += params.amount").params("amount", JsonData.of(amount)))),Product.class);}// 15. 按查询更新public long increaseStockForCategory(String category, int amount) throws IOException {UpdateByQueryResponse response = client.updateByQuery(u -> u.index(INDEX_NAME).query(q -> q.term(t -> t.field("category").value(category))).script(s -> s.inline(i -> i.source("ctx._source.stock += params.amount").params("amount", JsonData.of(amount)))));return response.updated();}// 16. 按查询删除public long deleteByCategory(String category) throws IOException {DeleteByQueryResponse response = client.deleteByQuery(d -> d.index(INDEX_NAME).query(q -> q.term(t -> t.field("category").value(category))));return response.deleted();}// 17. 地理位置查询 - 附近的产品public List<Product> findNearbyProducts(double lat, double lon, double distance) throws IOException {SearchResponse<Product> response = client.search(s -> s.index(INDEX_NAME).query(q -> q.geoDistance(g -> g.field("location").distance(distance + "km").location(l -> l.latlon(ll -> ll.lat(lat).lon(lon))))),Product.class);return extractProducts(response);}// 18. 高亮显示public List<Hit<Product>> searchWithHighlight(String keyword) throws IOException {SearchResponse<Product> response = client.search(s -> s.index(INDEX_NAME).query(q -> q.match(m -> m.field("name").query(keyword))).highlight(h -> h.fields("name", f -> f.preTags("<em>").postTags("</em>"))),Product.class);return response.hits().hits();}// 19. 自动补全public List<String> suggestNames(String prefix) throws IOException {SearchResponse<Product> response = client.search(s -> s.index(INDEX_NAME).suggest(su -> su.suggesters("name-suggest", sug -> sug.completion(c -> c.field("suggest").size(10).skipDuplicates(true))).text(prefix)),Product.class);return response.suggest().get("name-suggest").get(0).completion().options().stream().map(CompletionSuggestOption::text).collect(Collectors.toList());}// 20. 批量操作public void bulkOperations(List<Product> toCreate, List<String> toDelete) throws IOException {List<BulkOperation> operations = new ArrayList<>();// 添加创建操作toCreate.forEach(product -> operations.add(BulkOperation.of(op -> op.index(idx -> idx.index(INDEX_NAME).id(product.getId()).document(product)))));// 添加删除操作toDelete.forEach(id -> operations.add(BulkOperation.of(op -> op.delete(d -> d.index(INDEX_NAME).id(id)))));client.bulk(b -> b.index(INDEX_NAME).operations(operations));}// 辅助方法:从响应中提取产品列表private List<Product> extractProducts(SearchResponse<Product> response) {return response.hits().hits().stream().map(Hit::source).filter(Objects::nonNull).collect(Collectors.toList());}
}
【4】高级用法
(1)异步操作
import co.elastic.clients.util.ApiTypeHelper;
import co.elastic.clients.util.BinaryData;public void asyncIndexProduct(Product product) {BinaryData data = BinaryData.of(ApiTypeHelper.jsonBuilder(product).toString(),client._transport().jsonpMapper());client.indexAsync(i -> i.index(INDEX_NAME).id(product.getId()).document(data)).whenComplete((response, exception) -> {if (exception != null) {// 处理异常System.err.println("索引失败: " + exception.getMessage());} else {// 处理成功System.out.println("文档索引成功,ID: " + response.id());}});
}
(2)复杂聚合
public Map<String, Double> avgPriceByCategory() throws IOException {SearchResponse<Product> response = client.search(s -> s.index(INDEX_NAME).size(0).aggregations("by_category", a -> a.terms(t -> t.field("category.keyword").size(100)).aggregations("avg_price", avg -> avg.avg(av -> av.field("price")))),Product.class);return response.aggregations().get("by_category").sterms().buckets().array().stream().collect(Collectors.toMap(StringTermsBucket::key,bucket -> bucket.aggregations().get("avg_price").avg().value()));
}
(3)自定义json映射
import co.elastic.clients.json.JsonpMapper;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;// 自定义 JSON 映射器
public JsonpMapper customJsonpMapper() {ObjectMapper objectMapper = new ObjectMapper();objectMapper.registerModule(new JavaTimeModule()); // 支持 Java 8 时间类型return new JacksonJsonpMapper(objectMapper);
}// 在配置中使用自定义映射器
@Bean
public ElasticsearchClient elasticsearchClient() {RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200)).build();ElasticsearchTransport transport = new RestClientTransport(restClient,customJsonpMapper() // 使用自定义映射器);return new ElasticsearchClient(transport);
}