当前位置: 首页 > news >正文

比较数据迁移后MySQL数据库和ClickHouse数据仓库中的表

设计一个MySQL数据库和Clickhouse数据仓库的表数据比较的详细程序流程,两张表是相同的结构,都有整型主键id字段,需要每次从数据库分批取得2000条数据,用于比较,比较操作的同时可以再取2000条数据,等上一次比较完成之后,开始比较,直到比较完所有的数据。比较操作需要比较两条两个数据库中,一个数据库有,另一个数据库没有的记录,以id是否存在作为标准,还需要比较两边都有记录的所有数据字段是否都相同,如果有不相同指出这条数据的id,以及不相同数据所在字段的名称和在两个数据库中的值,分批比较时需要考虑所有可能的情况。

以下是一个详细的设计方案,用于比较MySQL数据库和ClickHouse数据仓库中结构相同的表,支持并行数据获取和分批比较:

系统设计

  1. 并行处理架构

    • 主线程:负责数据比较
    • 工作线程:负责预取下一批数据
    • 使用队列实现线程间通信
  2. 关键组件

    • 数据获取器(DataFetcher)
    • 数据比较器(DataComparator)
    • 结果处理器(ResultHandler)
    • 批处理控制器(BatchController)

详细程序流程

import queue
import threading
from typing import Dict, List, Tuple, Optional# 假设已实现数据库连接池
from db_connectors import MySQLPool, ClickHousePoolclass BatchData:def __init__(self):self.mysql_data: Dict[int, dict] = {}  # {id: {field1:val1,...}}self.ch_data: Dict[int, dict] = {}self.last_id: int = 0  # 本批次最大IDclass DataComparator:def __init__(self, result_handler):self.result_handler = result_handlerdef compare_batch(self, batch: BatchData):# 1. 检查ID存在性差异mysql_ids = set(batch.mysql_data.keys())ch_ids = set(batch.ch_data.keys())# 1.1 MySQL有但ClickHouse缺失for mid in mysql_ids - ch_ids:self.result_handler.record_missing(mid, source="clickhouse")# 1.2 ClickHouse有但MySQL缺失for cid in ch_ids - mysql_ids:self.result_handler.record_missing(cid, source="mysql")# 2. 比较共有ID的字段值common_ids = mysql_ids & ch_idsfor cid in common_ids:mysql_row = batch.mysql_data[cid]ch_row = batch.ch_data[cid]self._compare_rows(cid, mysql_row, ch_row)def _compare_rows(self, id: int, mysql_row: dict, ch_row: dict):# 跳过主键字段fields = set(mysql_row.keys()) - {"id"}for field in fields:mysql_val = mysql_row[field]ch_val = ch_row[field]# 处理类型差异(如MySQL的Decimal转Float)if isinstance(mysql_val, Decimal):mysql_val = float(mysql_val)if isinstance(ch_val, Decimal):ch_val = float(ch_val)# 特殊处理浮点数精度if isinstance(mysql_val, float) and isinstance(ch_val, float):if abs(mysql_val - ch_val) > 1e-9:self.result_handler.record_mismatch(id, field, mysql_val, ch_val)elif mysql_val != ch_val:self.result_handler.record_mismatch(id, field, mysql_val, ch_val)class DataFetcher:BATCH_SIZE = 2000def __init__(self, mysql_pool, ch_pool):self.mysql_pool = mysql_poolself.ch_pool = ch_pooldef fetch_batch(self, last_id: int) -> Optional[BatchData]:batch = BatchData()# 从MySQL获取数据with self.mysql_pool.connection() as conn:cursor = conn.cursor(dictionary=True)cursor.execute("SELECT * FROM target_table ""WHERE id > %s ORDER BY id LIMIT %s",(last_id, self.BATCH_SIZE))for row in cursor:batch.mysql_data[row['id']] = rowbatch
http://www.xdnf.cn/news/916777.html

相关文章:

  • Faiss vs Milvus 深度对比:向量数据库技术选型指南
  • 在 Linux 中修改 Apache HTTP Server(httpd)默认端口的完整指南
  • 电路图识图基础知识-电动机制动控制电路(十八)
  • 【力扣】2434.使用机器人打印字典序最小的字符串
  • 计算机组成原理-总线
  • rabbit mq使用TTL和DLX实现延迟队列
  • ios苹果系统,js 滑动屏幕、锚定无效
  • Go 标准库 encoding/gob 快速上手
  • NLP学习路线图(三十一): 迁移学习在NLP中的应用
  • 在ROS中实现消息通信和服务通信Python
  • 【图像处理基石】如何构建一个简单好用的美颜算法?
  • 【win | docker开启远程配置】使用 SSH 隧道访问 Docker的前操作
  • 手拉手处理RuoYi脚手架常见文问题
  • win11系统 Docker Desktop 突然提示Docker Engine stopped解决情况之一
  • CentOS 7.9安装Nginx1.24.0时报 checking for LuaJIT 2.x ... not found
  • 【Linux】系统部分——进程控制
  • 使用 Python + SQLAlchemy 创建知识库数据库(SQLite)—— 构建本地知识库系统的基础《一》
  • Python Cookbook-7.11 在 PostgreSQL 中储存 BLOB
  • github中main与master,master无法合并到main
  • 《绩效管理》要点总结与分享
  • SpringBoot 配置加载顺序?
  • AI驱动的B端页面革命:智能布局、数据洞察的底层技术解析
  • vue前端字典映射
  • 【数据分析】探索婴儿年龄变化对微生物群落(呼吸道病毒和细菌病原体)结构的影响
  • opencv_stereoRectify源码解析
  • 辊式矫平机:金属板材的“整形大师”
  • 18-Oracle 23ai JSON二元性颠覆传统
  • Github 2025-06-07 Rust开源项目日报Top10
  • ThingsCloud事物云平台搭建-微信小程序
  • Python Cookbook-7.12 在 SQLite 中储存 BLOB