ElasticSearch不同环境同步索引数据
目的:在生产环境把一个索引的数据同步到测试环境中
1、在生产环境导出json数据
curl -u "adims_user:xkR%cHwR5I9g" -X GET "http://172.18.251.132:9200/unify_info_mb_sp_aggregatetb_0004/_search?scroll=1m" -H 'Content-Type: application/json' -d'{"size": 100000,"query": {"bool": {"must": [{ "term": { "categoryId": 30 }},{ "term": { "factoryType": "煤炭电厂" }},{ "term": { "isDelete": 0 }},{ "term": { "countryName": "中国" }}]}}}' > initial_batch.json
2、利用python把导出的json数据转成bulk数据
python脚本convert_to_bulk_simple.py
import json
import sysdef convert_search_to_bulk(input_file, output_file, target_index):"""将ES查询结果转换为bulk格式"""with open(input_file, 'r', encoding='utf-8') as f:data = json.load(f)if 'hits' not in data or 'hits' not in data['hits']:print("错误: 不是有效的ES查询结果格式")return Falsehits = data['hits']['hits']print(f"找到 {len(hits)} 个文档")with open(output_file, 'w', encoding='utf-8') as f:for hit in hits:# action行action = {"index": {"_index": target_index, "_id": hit.get('_id')}}f.write(json.dumps(action) + '\n')# document行f.write(json.dumps(hit.get('_source', {})) + '\n')# 确保以换行符结尾f.write('\n')print(f"转换完成: {output_file}")return Trueif __name__ == "__main__":if len(sys.argv) != 4:print("使用方法: python convert_to_bulk_simple.py input.json output.json target_index")sys.exit(1)convert_search_to_bulk(sys.argv[1], sys.argv[2], sys.argv[3])
执行转换命令:
python convert_to_bulk_simple.py initial_batch2.json bulk_data.json unify_info_mb_sp_aggregatetb_0004
3、把转换的数据导入到测试环境
curl -u "adims_user:j0SMMmI+Rwfv" -X POST "http://192.168.168.243:9200/_bulk" -H "Content-Type: application/json" --data-binary @bulk_data.json
4、导入前后查询数据量大小,验证是否导入成功
curl -u "adims_user:j0SMMmI+Rwfv" -X GET "http://192.168.168.243:9200/unify_info_mb_sp_aggregatetb_0004/_count"