Elasticsearch Ruby 客户端 Bulk Scroll Helpers 实战指南
一、准备:安装与基础客户端
require 'elasticsearch'
require 'logger'client = Elasticsearch::Client.new(request_timeout: 30,retry_on_failure: 3, # 传输层重试(网络抖动有用)transport_options: { headers: { 'Accept-Encoding' => 'gzip' } },logger: Logger.new($stdout) # 调试期打开;生产可换成结构化日志
)
二、BulkHelper:高效写入的正确姿势
2.1 快速上手(数组 → 批量写入)
require 'elasticsearch/helpers/bulk_helper'bulk = Elasticsearch::Helpers::BulkHelper.new(client, 'my_index')docs = [{ name: 'document1', date: '2024-05-16' },{ name: 'document2', date: '2023-12-19' },{ name: 'document3', date: '2024-07-07' }
]# 最简单:一把全送
bulk.ingest(docs)# 切片发送(分成 2 份发送,降低单次体积)
bulk.ingest(docs, { slice: 2 })# 带回调:可记录每批次结果
bulk.ingest(docs) { |resp, sent| puts "Ingested #{sent.count} docs" }
小贴士(体量经验):单批 5–15MB 或 1k–5k 文档较稳妥;过大容易 413/网络重传,过小则握手开销高。
2.2 传参(Querystring / Body)与管道、路由
# 等价于 Bulk API 的 URL 查询参数与 body 参数
bulk.ingest(docs,{ refresh: false, pipeline: 'attach_meta', routing: 'user-42' }, # => querystring{} # => bulk body 级别的默认元数据(通常不需要)
)
2.3 更新与删除
# 删除一批 _id
ids = %w[a1 b2 c3 ...]
bulk.delete(ids)# 更新(文档需带 id)
updates = [{ id: 'a1', name: 'updated 1' },{ id: 'b2', name: 'updated 2' }
]
bulk.update(updates)
2.4 从 JSON 文件导入
file_path = './data.json'
bulk.ingest_json(file_path)# 若数据不在根,而是 data.items
bulk.ingest_json(file_path, { keys: %w[data items] })
大文件建议流式读取(分块 parse 后分批 ingest),避免一次性
JSON.parse(File.read)
占满内存。可以用Oj::Saj
或JSON::Stream
等流式解析库;若暂不引入三方库,至少每 N 行切一批送入bulk.ingest(slice: ...)
。
2.5 生产化:幂等写入与部分失败解析
幂等 _id
:用业务主键或内容哈希充当 _id
,避免重跑导入产生重复。
require 'digest/sha1'
docs = raw_rows.map do |row|row.merge(id: Digest::SHA1.hexdigest(row.fetch('business_key')))
end
bulk.ingest(docs)
部分失败处理:Bulk 可能“部分成功、部分失败”,需要解析 items
。
def bulk_ingest_with_retry(bulk, docs, max_retries: 3, sleep_base: 0.5)retries = 0bulk.ingest(docs) do |resp, sent|errors = []if resp && resp['items'].is_a?(Array)resp['items'].each_with_index do |item, i|op = item.keys.first # "index"/"create"/"update"/"delete"res = item[op]if res['error']errors << { doc: sent[i], error: res['error'], status: res['status'] }endendendif errors.any?if retries < max_retriessleep(sleep_base * (2 ** retries)) # 指数退避retries += 1retry_docs = errors.map { |e| e[:doc] }warn "Bulk partial errors=#{errors.size}, retry=#{retries}..."bulk_ingest_with_retry(bulk, retry_docs, max_retries: max_retries, sleep_base: sleep_base)elseraise "Bulk failed after retries, sample_error=#{errors.first.inspect}"endendend
end
常见错误码对策
409 version_conflict_engine_exception
:幂等/外部版本控制(version
+version_type: 'external_gte'
)或改为update
(带doc_as_upsert: true
)。413 Request Entity Too Large
:减小批大小/关闭压缩重试;429 too_many_requests
:退避重试,并调低并发;- 网络异常:传输层
retry_on_failure
+ 自己的批级重试。
2.6 并发切片(提高吞吐)
Ruby 的 IO 多路复用对 HTTP 写 ES 帮助很大(GIL 不是瓶颈)。示例:把大数组切分后并发发送:
require 'concurrent-ruby'
pool = Concurrent::FixedThreadPool.new(4)
docs.each_slice(2000) do |batch|pool.post dobulk.ingest(batch) { |_, sent| puts "batch=#{sent.size}" }end
end
pool.shutdown; pool.wait_for_termination
三、ScrollHelper:长结果集的顺序遍历
用途:导出/遍历大结果集的历史方案。注意:深分页/遍历的推荐方案是 PIT +
search_after
(见第 4 节),Scroll 更适合离线导出等一次性任务。
3.1 快速上手
require 'elasticsearch/helpers/scroll_helper'body = {scroll: '1m', # 游标保活时间size: 1000,query: { match_all: {} },sort: [{ _doc: 'asc' }] # Scroll 下常用稳定排序
}scroll = Elasticsearch::Helpers::ScrollHelper.new(client, 'my_index', body)# 方式一:each/map(Enumerable)
scroll.each do |hit|# hit 为单条命中(Hash),在这里处理或写出
end# 方式二:按页取
my_docs = []
while !(page = scroll.results).empty?my_docs.concat(page)
endscroll.clear # 用完务必清理
3.2 Scroll 使用须知
scroll: '1m'
表示每次请求之间保持 1 分钟有效期,不是总耗时;- Scroll 会持有快照,长期占资源;不要把它当在线深分页接口;
- 一旦需要边查边写出(如导出 CSV),请边读边消费,不要把全量结果塞进内存。
四、PIT + search_after(深分页首选,优于 Scroll)
为什么:一致性好、资源占用小且更现代。Ruby 客户端直接调 Search API 即可(不依赖 ScrollHelper)。
# 1) 打开 PIT
pit_id = client.open_point_in_time(index: 'my_index', params: { keep_alive: '1m' })['id']# 2) 首次查询
body = {size: 1000,pit: { id: pit_id, keep_alive: '1m' },sort: [ { created_at: 'asc' }, { _shard_doc: 'asc' } ],query: { match_all: {} },_source: %w[id created_at ...]
}
resp = client.search(body: body)# 3) 迭代
loop dohits = resp.dig('hits', 'hits')break if hits.empty?# 处理 hits ...last_sort = hits.last['sort']resp = client.search(body: body.merge(search_after: last_sort))
end# 4) 关闭 PIT
client.close_point_in_time(body: { id: pit_id })
关键点:使用稳定且全局唯一的排序组合(常见
业务时间戳 + _shard_doc
);keep_alive
只需覆盖到“下一次请求”。
五、组合拳示例:从大 JSON 导入 → 写入 ES → 校验
require 'json'
require 'elasticsearch/helpers/bulk_helper'bulk = Elasticsearch::Helpers::BulkHelper.new(client, 'logs_2024')# 简单分块读取(示例:每 2_000 条一批;超大文件建议流式解析库)
File.open('data.json') do |f|buf = []JSON.load(f)['data']['items'].each do |row|buf << rowif buf.size >= 2000bulk_ingest_with_retry(bulk, buf) # 第 2.5 节里实现的函数buf.clearendendbulk_ingest_with_retry(bulk, buf) unless buf.empty?
end# 校验:用 PIT + search_after 统计或抽样检查
pit = client.open_point_in_time(index: 'logs_2024', params: { keep_alive: '1m' })['id']
resp = client.search(body: { size: 0, pit: { id: pit, keep_alive: '1m' }, aggs: { by_day: { date_histogram: { field: 'date', calendar_interval: 'day' } } } })
client.close_point_in_time(body: { id: pit })
puts resp['aggregations']['by_day']['buckets'].size
六、实战建议清单(供复制到 Readme/Runbook)
- 批大小:控制在 5–15MB / 1k–5k;
- 并发:按 CPU×2 或与 ES 节点数匹配开 2–8 线程,观察 429/延迟后微调;
- 幂等:用业务主键/内容哈希生成
_id
,必要时用version
+version_type: 'external_gte'
; - 失败重试:指数退避;仅重试失败子集;记录失败样本;
- 管道:在 Ingest Pipeline 做清洗/字段补全,Bulk 侧加
pipeline
; - 刷新策略:大批量导入时
refresh: false
,收尾再手动indices.refresh
; - 遍历策略:在线深分页/全量遍历优先 PIT + search_after;Scroll 用于一次性导出;
- 观测:记录每批
items
错误率、took
、throttle_time_ms
、429 计数; - 资源:留意 ES Hot 节点磁盘/CPU/线程池队列;Bulk 压太猛会拖慢集群。