当前位置: 首页 > news >正文

Elasticsearch Ruby 客户端 Bulk Scroll Helpers 实战指南

一、准备:安装与基础客户端

require 'elasticsearch'
require 'logger'client = Elasticsearch::Client.new(request_timeout: 30,retry_on_failure: 3,            # 传输层重试(网络抖动有用)transport_options: { headers: { 'Accept-Encoding' => 'gzip' } },logger: Logger.new($stdout)     # 调试期打开;生产可换成结构化日志
)

二、BulkHelper:高效写入的正确姿势

2.1 快速上手(数组 → 批量写入)

require 'elasticsearch/helpers/bulk_helper'bulk = Elasticsearch::Helpers::BulkHelper.new(client, 'my_index')docs = [{ name: 'document1', date: '2024-05-16' },{ name: 'document2', date: '2023-12-19' },{ name: 'document3', date: '2024-07-07' }
]# 最简单:一把全送
bulk.ingest(docs)# 切片发送(分成 2 份发送,降低单次体积)
bulk.ingest(docs, { slice: 2 })# 带回调:可记录每批次结果
bulk.ingest(docs) { |resp, sent| puts "Ingested #{sent.count} docs" }

小贴士(体量经验):单批 5–15MB 或 1k–5k 文档较稳妥;过大容易 413/网络重传,过小则握手开销高。

2.2 传参(Querystring / Body)与管道、路由

# 等价于 Bulk API 的 URL 查询参数与 body 参数
bulk.ingest(docs,{ refresh: false, pipeline: 'attach_meta', routing: 'user-42' }, # => querystring{} # => bulk body 级别的默认元数据(通常不需要)
)

2.3 更新与删除

# 删除一批 _id
ids = %w[a1 b2 c3 ...]
bulk.delete(ids)# 更新(文档需带 id)
updates = [{ id: 'a1', name: 'updated 1' },{ id: 'b2', name: 'updated 2' }
]
bulk.update(updates)

2.4 从 JSON 文件导入

file_path = './data.json'
bulk.ingest_json(file_path)# 若数据不在根,而是 data.items
bulk.ingest_json(file_path, { keys: %w[data items] })

大文件建议流式读取(分块 parse 后分批 ingest),避免一次性 JSON.parse(File.read) 占满内存。可以用 Oj::SajJSON::Stream 等流式解析库;若暂不引入三方库,至少每 N 行切一批送入 bulk.ingest(slice: ...)

2.5 生产化:幂等写入与部分失败解析

幂等 _id:用业务主键或内容哈希充当 _id,避免重跑导入产生重复。

require 'digest/sha1'
docs = raw_rows.map do |row|row.merge(id: Digest::SHA1.hexdigest(row.fetch('business_key')))
end
bulk.ingest(docs)

部分失败处理:Bulk 可能“部分成功、部分失败”,需要解析 items

def bulk_ingest_with_retry(bulk, docs, max_retries: 3, sleep_base: 0.5)retries = 0bulk.ingest(docs) do |resp, sent|errors = []if resp && resp['items'].is_a?(Array)resp['items'].each_with_index do |item, i|op  = item.keys.first         # "index"/"create"/"update"/"delete"res = item[op]if res['error']errors << { doc: sent[i], error: res['error'], status: res['status'] }endendendif errors.any?if retries < max_retriessleep(sleep_base * (2 ** retries))  # 指数退避retries += 1retry_docs = errors.map { |e| e[:doc] }warn "Bulk partial errors=#{errors.size}, retry=#{retries}..."bulk_ingest_with_retry(bulk, retry_docs, max_retries: max_retries, sleep_base: sleep_base)elseraise "Bulk failed after retries, sample_error=#{errors.first.inspect}"endendend
end

常见错误码对策

  • 409 version_conflict_engine_exception:幂等/外部版本控制(version + version_type: 'external_gte')或改为 update(带 doc_as_upsert: true)。
  • 413 Request Entity Too Large:减小批大小/关闭压缩重试;
  • 429 too_many_requests:退避重试,并调低并发;
  • 网络异常:传输层 retry_on_failure + 自己的批级重试。

2.6 并发切片(提高吞吐)

Ruby 的 IO 多路复用对 HTTP 写 ES 帮助很大(GIL 不是瓶颈)。示例:把大数组切分后并发发送:

require 'concurrent-ruby'
pool = Concurrent::FixedThreadPool.new(4)
docs.each_slice(2000) do |batch|pool.post dobulk.ingest(batch) { |_, sent| puts "batch=#{sent.size}" }end
end
pool.shutdown; pool.wait_for_termination

三、ScrollHelper:长结果集的顺序遍历

用途:导出/遍历大结果集的历史方案。注意:深分页/遍历的推荐方案是 PIT + search_after(见第 4 节),Scroll 更适合离线导出等一次性任务。

3.1 快速上手

require 'elasticsearch/helpers/scroll_helper'body = {scroll: '1m',  # 游标保活时间size:   1000,query:  { match_all: {} },sort:   [{ _doc: 'asc' }]  # Scroll 下常用稳定排序
}scroll = Elasticsearch::Helpers::ScrollHelper.new(client, 'my_index', body)# 方式一:each/map(Enumerable)
scroll.each do |hit|# hit 为单条命中(Hash),在这里处理或写出
end# 方式二:按页取
my_docs = []
while !(page = scroll.results).empty?my_docs.concat(page)
endscroll.clear # 用完务必清理

3.2 Scroll 使用须知

  • scroll: '1m' 表示每次请求之间保持 1 分钟有效期,不是总耗时;
  • Scroll 会持有快照,长期占资源;不要把它当在线深分页接口;
  • 一旦需要边查边写出(如导出 CSV),请边读边消费,不要把全量结果塞进内存。

四、PIT + search_after(深分页首选,优于 Scroll)

为什么:一致性好、资源占用小且更现代。Ruby 客户端直接调 Search API 即可(不依赖 ScrollHelper)。

# 1) 打开 PIT
pit_id = client.open_point_in_time(index: 'my_index', params: { keep_alive: '1m' })['id']# 2) 首次查询
body = {size: 1000,pit: { id: pit_id, keep_alive: '1m' },sort: [ { created_at: 'asc' }, { _shard_doc: 'asc' } ],query: { match_all: {} },_source: %w[id created_at ...]
}
resp = client.search(body: body)# 3) 迭代
loop dohits = resp.dig('hits', 'hits')break if hits.empty?# 处理 hits ...last_sort = hits.last['sort']resp = client.search(body: body.merge(search_after: last_sort))
end# 4) 关闭 PIT
client.close_point_in_time(body: { id: pit_id })

关键点:使用稳定且全局唯一的排序组合(常见 业务时间戳 + _shard_doc);keep_alive 只需覆盖到“下一次请求”。

五、组合拳示例:从大 JSON 导入 → 写入 ES → 校验

require 'json'
require 'elasticsearch/helpers/bulk_helper'bulk = Elasticsearch::Helpers::BulkHelper.new(client, 'logs_2024')# 简单分块读取(示例:每 2_000 条一批;超大文件建议流式解析库)
File.open('data.json') do |f|buf = []JSON.load(f)['data']['items'].each do |row|buf << rowif buf.size >= 2000bulk_ingest_with_retry(bulk, buf)   # 第 2.5 节里实现的函数buf.clearendendbulk_ingest_with_retry(bulk, buf) unless buf.empty?
end# 校验:用 PIT + search_after 统计或抽样检查
pit = client.open_point_in_time(index: 'logs_2024', params: { keep_alive: '1m' })['id']
resp = client.search(body: { size: 0, pit: { id: pit, keep_alive: '1m' }, aggs: { by_day: { date_histogram: { field: 'date', calendar_interval: 'day' } } } })
client.close_point_in_time(body: { id: pit })
puts resp['aggregations']['by_day']['buckets'].size

六、实战建议清单(供复制到 Readme/Runbook)

  • 批大小:控制在 5–15MB / 1k–5k
  • 并发:按 CPU×2 或与 ES 节点数匹配开 2–8 线程,观察 429/延迟后微调;
  • 幂等:用业务主键/内容哈希生成 _id,必要时用 version + version_type: 'external_gte'
  • 失败重试:指数退避;仅重试失败子集;记录失败样本;
  • 管道:在 Ingest Pipeline 做清洗/字段补全,Bulk 侧加 pipeline
  • 刷新策略:大批量导入时 refresh: false,收尾再手动 indices.refresh
  • 遍历策略:在线深分页/全量遍历优先 PIT + search_after;Scroll 用于一次性导出;
  • 观测:记录每批 items 错误率、tookthrottle_time_ms、429 计数;
  • 资源:留意 ES Hot 节点磁盘/CPU/线程池队列;Bulk 压太猛会拖慢集群。
http://www.xdnf.cn/news/1349533.html

相关文章:

  • TopK问题(堆排序)-- go
  • MySQL存储过程入门
  • 中农具身导航赋能智慧农业!AgriVLN:农业机器人的视觉语言导航
  • PostgreSQL15——查询详解
  • Python 十进制转二进制
  • 【每天一个知识点】AIOps 与自动化管理
  • 使用隧道(Tunnel)连接PostgreSQL数据库(解决防火墙问题)(含Java实现代码)
  • AI实验管理神器:WandB全功能解析
  • 【文献阅读】Advances and Challenges in Large Model Compression: A Survey
  • `strncasecmp` 字符串比较函数
  • Unreal Engine IWYU Include What You Use
  • Vue 插槽(Slots)全解析2
  • ubuntu - 终端工具 KConsole安装
  • AI + 教育:个性化学习如何落地?教师角色转变与技术伦理的双重考验
  • SymPy 中抽象函数的推导与具体函数代入
  • Spring Ai 1.0.1中存在的问题:使用MessageChatMemoryAdvisor导致System未被正确的放在首位
  • c++最新进展
  • fdisk工具源码编译生成
  • DAY14-新世纪DL(DeepLearning/深度学习)战士:破(优化算法)2
  • 多线程下为什么用ConcurrentHashMap而不是HashMap
  • 【Android】 连接wifi时,强制应用使用流量
  • 【从零开始java学习|第九篇】方法的相关知识与练习
  • 【微服务的数据一致性分发问题】究极解决方案
  • 日志的配置
  • 一键部署openGauss6.0.2轻量版单节点
  • Spring原理
  • 最近 | 黄淮教务 | 小工具合集
  • 世界模型一种能够对现实世界环境进行仿真,并基于文本、图像、视频和运动等输入数据来生成视频、预测未来状态的生成式 AI 模型
  • Maxscript如何清理3dMax场景?
  • 打工人日报20250822