当前位置: 首页 > ds >正文

Elasticsearch 如何实现跨数据中心的数据同步?

实战场景
双数据中心容灾,要求RPO<5分钟,RTO<30分钟

‌RPO(Recovery Point Objective)‌:
RPO指的是灾难发生后,系统能够恢复到的数据更新点的时间。简单来说,它衡量的是数据丢失的量。在你的例子中,RPO<5分钟意味着在灾难发生后,系统能够恢复到灾难发生前5分钟内的数据状态,从而确保数据丢失量控制在5分钟以内。

‌RTO(Recovery Time Objective)‌:
RTO则是指从灾难发生到业务系统完全恢复并可以重新提供服务所需的时间。它衡量的是业务中断的时间长度。在你的例子中,RTO<30分钟意味着在灾难发生后,系统能够在30分钟内完全恢复并重新提供服务,从而将业务中断的时间控制在30分钟以内。

1. CCR核心配置(北京->上海 单向同步)

# 建立集群间安全连接
PUT /_cluster/settings
{"persistent": {"cluster.remote.shanghai_cluster.seeds": "es-sh-node1:9300,es-sh-node2:9300","cluster.remote.shanghai_cluster.skip_unavailable": true}
}# 创建跟随策略(同步订单索引)
PUT /_ccr/auto_follow/jd_orders
{"remote_cluster" : "shanghai_cluster","leader_index_patterns" : ["jd_orders-*"],"follow_index_pattern" : "sh_{{leader_index}}","max_read_request_operation_count" : 5120,"max_outstanding_read_requests" : 24
}

2. Logstash增量备份(双向同步商品索引)

input {elasticsearch {hosts => ["http://es-bj-node1:9200"]index => "jd_goods"query => '{ "query": { "range": { "@timestamp": { "gte": "now-5m" }}}}'docinfo => truesize => 500scroll => "5m"}
}output {# 上海集群写入elasticsearch {hosts => ["http://es-sh-node1:9200"]index => "%{[@metadata][_index]}"document_id => "%{[@metadata][_id]}"pipeline => "timestamp_pipeline"}# 本地备份elasticsearch {hosts => ["http://localhost:9200"]index => "jd_goods_backup"}
}

3. 网络优化配置

# 跨数据中心专用线程池
thread_pool.search.size: 32
thread_pool.search.queue_size: 2000# 传输层参数优化
transport.tcp.compress: true
transport.profiles.default.tcp_keep_alive: true
transport.profiles.default.tcp_no_delay: true

监控方案(基于Kibana):

{"alert": {"name": "CCR延迟告警","conditions": {"script": "ctx.results[0].hits.hits[0]._source.follower_lag > 300000"},"actions": {"webhook": "http://alert.jd.com/ccr_warn"}}
}

实战经验

  1. 使用专用10Gbps通道,实测同步延迟120-180ms
  2. 索引分片数=数据中心数量×2(北京8节点集群使用24分片)
  3. 采用时间戳管道统一时区:
PUT _ingest/pipeline/timestamp_pipeline
{"processors": [{"date": {"field": "bj_timestamp","target_field": "@timestamp","timezone": "Asia/Shanghai","formats": ["ISO8601"]}}]
}
http://www.xdnf.cn/news/9282.html

相关文章:

  • word2016标题自动编号
  • Modbus通信中的延迟和时间间隔详解
  • 4.2.1、mysql进阶——存储过程基本语法,变量
  • 网络拓扑如何跨网段访问
  • ArcGIS Pro 3.4 二次开发 - 知识图谱
  • (自用)Java学习-5.15(模糊搜索,收藏,购物车)
  • 编程日志5.28
  • 了解一下C#的SortedSet
  • C++?继承!!!
  • Python的分布式网络爬虫系统实现
  • 代码随想录算法训练营 Day58 图论Ⅷ 拓扑排序 Dijkstra
  • Apache POI生成的pptx在office中打不开 兼容问题 wps中可以打卡问题 POI显示兼容问题
  • 多级体验体系构建:基于开源AI智能客服与AI智能名片的S2B2C商城小程序体验升级路径研究
  • 设计模式系列(06):抽象工厂模式(Abstract Factory)
  • 传统图像分割方法:阈值分割、Canny检测
  • AI测试用例生成系统设计与实现:融合多模态、OCR解析与知识库增强
  • EOFError: Unexpected EOF while reading bytes报错解决
  • 题目 3313: 蓝桥杯2025年第十六届省赛真题-电池分组
  • npm run build后将打包文件夹生成zip压缩包
  • Abstract Factory(抽象工厂)
  • FlagOS 新里程:开源面向多种硬件架构的统一AI 编译器 FlagTree
  • element-ui upload 组件源码分享
  • Android Cameara2 + MediaRecorder 完成录像功能
  • Prompt工程:解锁大语言模型的终极密钥
  • 解锁编程新境界:深入剖析现代编程技术与实践
  • spring4第2课-ioc控制反转
  • STM32CubeMX配置使用通用定时器产生PWM
  • 加密货币犯罪的涉案金额如何计算?
  • 编写一个算法frequency,统计在一个输入字符串中各个不同字符出现的频度。用适当的测试数据来验证这个算法
  • 打卡第29天:类的定义和方法