当前位置: 首页 > backend >正文

基于Hadoop3.3.4+Flink1.17.0+FlinkCDC3.0.0+Iceberg1.5.0整合,实现数仓实时同步mysql数据

一、安装部署Hadoop 3.3.4

1. 基础环境准备(两台都做)

1. 服务器假设

主机名IP地址角色
master192.168.1.10NameNode, ResourceManager
slave192.168.1.11DataNode, NodeManager

2. 前置依赖安装(两台机器都执行)

① 安装 JDK(建议 JDK 8)
yum install -y java-1.8.0-openjdk-devel

验证:

java -version

② 配置主机名和 hosts(两台都做)
# 修改主机名(分别执行)
hostnamectl set-hostname master   # 在 master 执行
hostnamectl set-hostname slave    # 在 slave 执行# 编辑 hosts 文件(两台都加)
vim /etc/hosts

添加如下内容:

192.168.1.10 master
192.168.1.11 slave

③ 配置 SSH 免密(只在 master 上操作)
ssh-keygen -t rsa
ssh-copy-id master
ssh-copy-id slave

验证:

ssh master
ssh slave

3. 下载并解压 Hadoop

在两台机器都执行:

wget https://dlcdn.apache.org/hadoop/common/hadoop-3.3.4/hadoop-3.3.4.tar.gz
tar -zxvf hadoop-3.3.4.tar.gz -C /opt/
ln -s /opt/hadoop-3.3.4 /opt/hadoop

4. 配置环境变量(两台都配置)

vim /etc/profile

添加:

export JAVA_HOME=/usr/lib/jvm/java-1.8.0
export HADOOP_HOME=/opt/hadoop
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin# 启动权限设置
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_JOURNALNODE_USER=root
export HDFS_ZKFC_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export HDFS_DATANODE_SECURE_USER=hdfs
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root

执行使其生效:

source /etc/profile

2. Hadoop 配置(在 Master 上操作,配置完成后拷贝到 Slave)

假设 Hadoop 安装目录是 /opt/hadoop,配置文件在 $HADOOP_HOME/etc/hadoop/ 下。


1. core-site.xml

<configuration><property><name>fs.defaultFS</name><value>hdfs://master:9000</value></property>
</configuration>

2. hdfs-site.xml

<configuration><property><name>dfs.replication</name><value>1</value> <!-- 两台机器,只设一个副本 --></property><property><name>dfs.namenode.name.dir</name><value>file:/opt/hadoop/tmp/dfs/name</value></property><property><name>dfs.datanode.data.dir</name><value>file:/opt/hadoop/tmp/dfs/data</value></property>
</configuration>

3. mapred-site.xml

复制模板:

cp mapred-site.xml.template mapred-site.xml

内容如下:

<configuration><property><name>mapreduce.framework.name</name><value>yarn</value></property>
</configuration>

4. yarn-site.xml

<configuration><!-- ResourceManager 设置 --><property><name>yarn.resourcemanager.hostname</name><value>master</value></property><!-- NodeManager 使用 shuffle 服务 --><property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property>
</configuration>

5. 修改workers

编辑 $HADOOP_HOME/etc/hadoop/workers

vim workers

内容:

master
slave

3. 将 Hadoop 分发到 Slave

在 Master 上执行:

scp -r /opt/hadoop-3.3.4 slave:/opt/
scp -r /opt/hadoop slave:/opt/
scp /etc/profile slave:/etc/profile

然后在 slave 上:

source /etc/profile

4. 格式化 HDFS(仅第一次在 Master 上做)

hdfs namenode -format

5. 启动 Hadoop 集群(在 master 上)

1. 启动 HDFS

start-dfs.sh

后台会启动:

  • master:NameNode + DataNode

  • slave:DataNode

验证:

jps
  • master:NameNode, DataNode

  • slave:DataNode

浏览器访问:http://master:9870


2. 启动 YARN

start-yarn.sh

后台会启动:

  • master:ResourceManager

  • slave:NodeManager

验证:

jps
  • master:ResourceManager

  • slave:NodeManager

浏览器访问:http://master:8088


6. 验证集群是否成功运行

1. 查看 Web UI

  • HDFS UI: http://master:9870

  • YARN UI: http://master:8088


2. 测试上传文件

hdfs dfs -mkdir /test
echo "hello hadoop" > hello.txt
hdfs dfs -put hello.txt /test
hdfs dfs -ls /test

7. 总结启动/停止命令

功能命令
启动 HDFSstart-dfs.sh
停止 HDFSstop-dfs.sh
启动 YARNstart-yarn.sh
停止 YARNstop-yarn.sh

8. 注意事项

  1. NameNode 格式化只能执行一次,重复会导致元数据丢失。

  2. 所有路径(如 dfs.name.dir)需确保存在或能自动创建,权限正确。

  3. 如果端口冲突(9000、9870、8088),可以在配置中修改。

  4. 所有节点时间应同步(可配置 NTP)。

二、安装部署 Flink 1.17.0,并整合Flink CDC 3.0.0 + Iceberg 1.5.0

1. 组件依赖关系简图

Flink 1.17.0
├── 集成 Flink CDC 3.0.0(从 MySQL/Postgres 等采集变更数据)
└── 集成 Iceberg 1.5.0(用于将结果写入 HDFS 格式的表)└── 基于 Hadoop Catalog(不使用 Hive)

2. 已具备条件

软件/组件状态
Hadoop 3.3.4✔ 已部署(HDFS + YARN)
Java 8✔ 已安装
SSH 免密登录✔ 已配置
Master:192.168.1.10,Slave:192.168.1.11

3. 安装 Flink 1.17.0

1. 下载 Flink 并解压(两台都执行)

wget https://archive.apache.org/dist/flink/flink-1.17.0/flink-1.17.0-bin-scala_2.12.tgz
tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/
ln -s /opt/flink-1.17.0 /opt/flink

2. 配置环境变量(两台)

vim /etc/profile

添加:

export FLINK_HOME=/opt/flink
export PATH=$PATH:$FLINK_HOME/bin
export HADOOP_CLASSPATH=`hadoop classpath`
source /etc/profile

3. 配置 Flink(仅在 master 上配置好后分发)

编辑 $FLINK_HOME/conf/flink-conf.yaml

jobmanager.rpc.address: mastertaskmanager.numberOfTaskSlots: 2# 添加Hadoop类路径
env.hadoop.conf.dir: /path/to/hadoop/etc/hadoop

4. 设置 masters 和 workers

编辑 $FLINK_HOME/conf/masters:

master:8081

编辑 $FLINK_HOME/conf/workers

master
slave

5. 分发 Flink 到 Slave

scp -r /opt/flink-1.17.0 slave:/opt/
scp -r /opt/flink slave:/opt/
scp /etc/profile slave:/etc/profile
ssh slave 'source /etc/profile'

4. 集成 Flink CDC 3.0.0

Flink CDC 3.0.0 为 Table Store 模式,支持动态表更新流,可连接 MySQL、PostgreSQL、Oracle 等。需下载相关jar包,以下是 ​​Flink CDC 3.0.0​​ 与 ​​Flink JDBC Connector 3.1.0-1.17​​ 的官方下载地址及关键信息整理。

1. flink-sql-connector-mysql-cdc-3.0.0.jar

  • ​作用​​:用于 Flink SQL 连接 MySQL CDC 数据源,支持实时捕获 MySQL 的增量数据变更。
  • ​下载地址​​:
    • ​Maven 中央仓库​​:
      https://repo.maven.apache.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/3.0.0/flink-sql-connector-mysql-cdc-3.0.0.jar
      (直接下载链接,无需登录)
    • ​GitHub Release​​(备用):
      https://github.com/ververica/flink-cdc-connectors/releases/download/release-3.0.0/flink-sql-connector-mysql-cdc-3.0.0.jar

2. flink-connector-jdbc-3.1.0-1.17.jar

  • ​作用​​:提供 Flink 写入 JDBC 数据库(如 MySQL、PostgreSQL)的能力,用于构建数据同步的 Sink 表。
  • ​下载地址​​:
    • ​Maven 中央仓库​​:
      https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar
    • ​官方文档推荐​​:
      Apache Flink 官网的 Connectors 文档 中也引用此链接。

3. 关键注意事项​

  1. ​版本兼容性​​:
    • ​Flink CDC 3.0.0​​:需搭配 ​​Flink 1.14+​​ 和 ​​JDK 8+​​,支持 MySQL 5.6/5.7/8.0。
    • ​JDBC Connector 3.1.0-1.17​​:专为 ​​Flink 1.17.x​​ 设计,需配合 ​​MySQL JDBC 驱动 8.0.28​​(推荐)。
  2. ​部署方式​​:
    • 将下载的 JAR 包放入 Flink 的 lib/ 目录。
    • ​重启 Flink 集群​​生效(执行 stop-cluster.shstart-cluster.sh)。
  3. ​MySQL 驱动依赖​​(JDBC Sink 必需):
    • mysql-connector-java-8.0.28.jar​ 下载地址:
      https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar

4. 依赖关系总结​

​组件​​版本​​下载地址​
Flink SQL MySQL CDC3.0.0Maven
Flink JDBC Connector3.1.0-1.17Maven
MySQL JDBC Driver8.0.28Maven

按需下载后放置于 $FLINK_HOME/lib/ 即可。

5. 集成 Iceberg 1.5.0

1. 下载 Flink-Iceberg 连接器(支持 Flink 1.17)

mkdir -p /opt/flink/lib/icebergcd /opt/flink/lib/icebergwget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.17/1.5.0/iceberg-flink-runtime-1.17-1.5.0.jar

❗注意:该包自动包含 Flink Table 所需依赖,无需手动导入其他 flink-table jars。


2. 配置 Hadoop Catalog 模式(推荐方式)

确保 Hadoop 集群运行,然后 Flink 连接 Iceberg 时使用如下 SQL:

CREATE CATALOG hadoop_catalog WITH ('type' = 'iceberg','catalog-type' = 'hadoop','warehouse' = 'hdfs://master:9000/iceberg/warehouse','property-version' = '1'
);

6. 启动 Flink 集群

在 Master 上执行:

start-cluster.sh

默认会启动:

  • master:JobManager

  • slave:TaskManager

  • master:也作为一个 TaskManager(可选)

验证:

jps
  • master:JobManager(StandaloneSessionClusterEntrypoint) + TaskManager

  • slave:TaskManager

Web UI 访问:http://master:8081


7. 验证 CDC + Iceberg 流任务

以下为示例 SQL 流任务(使用 sql-client):

1. 启动 SQL CLI

./bin/sql-client.sh embedded

2. 注册 catalog & 数据表

-- 注册 Iceberg catalog
CREATE CATALOG hadoop_catalog WITH ('type' = 'iceberg','catalog-type' = 'hadoop','warehouse' = 'hdfs://master:9000/iceberg/warehouse','property-version' = '1'
);USE CATALOG hadoop_catalog;
CREATE DATABASE demo;USE demo;-- 创建 CDC 源表(MySQL)
CREATE TABLE mysql_users (id INT,name STRING,age INT,update_time TIMESTAMP(3),PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '192.168.1.100','port' = '3306','username' = 'root','password' = '123456','database-name' = 'test','table-name' = 'users'
);-- 创建 Iceberg 目标表
CREATE TABLE iceberg_users (id INT,name STRING,age INT,update_time TIMESTAMP(3),PRIMARY KEY (id) NOT ENFORCED
);-- 写入任务
INSERT INTO iceberg_users SELECT * FROM mysql_users;

8. 常用维护命令

操作命令
启动 Flinkstart-cluster.sh
停止 Flinkstop-cluster.sh
查看 Web UIhttp://master:8081
提交任务flink run xxx.jar 或 SQL CLI
查看日志$FLINK_HOME/log

9. 目录结构推荐

/opt/
├── hadoop/
├── flink/
│   ├── lib/
│   │   ├── cdc/
│   │   └── iceberg/
└── iceberg/└── warehouse/ (HDFS 路径)
http://www.xdnf.cn/news/16452.html

相关文章:

  • 基于springboot的在线购票系统/在线售票系统
  • C++ 中实现 `Task::WhenAll` 和 `Task::WhenAny` 的两种方案
  • Gradle#Plugin
  • Triton编译
  • JavaScript - 实现套索工具的demo
  • MySQL 8.0.42创建MGR集群
  • 深度学习(鱼书)day04--手写数字识别项目实战
  • OpenCL study - code03 rgb2gray
  • 通过硬编码函数地址并转换为函数指针来调用函数
  • 6.Pinia快速入门
  • Mitk教程案例项目编译
  • ROS2总结(二)
  • Flutter中 Provider 的基础用法超详细讲解(二)之ChangeNotifierProvider
  • ES6模块详解:核心语法与最佳实践
  • c++加载qml文件
  • 小架构step系列27:Hibernate提供的validator
  • Oracle EBS 库存期间关闭状态“已关闭未汇总”处理
  • [ The Missing Semester of Your CS Education ] 学习笔记 shell篇
  • net8.0一键创建支持(Kafka)
  • Redis6.0+安装教程(Linux)
  • CPA青少年编程能力等级测评试卷及答案 Python编程(三级)
  • 分表分库与分区表
  • 【第六节】方法与事件处理器
  • docker-desktop引擎启动失败报wsl --update
  • Day4.AndroidAudio初始化
  • 数独求解器与生成器(回溯算法实现)
  • 【ESP32】无法找到: “${env:IDF_PATH}/components/“的路径报错问题以及CMAKE构建不成功问题
  • JVM terminated. Exit code=1
  • 最优估计准则与方法(6)递推最小二乘估计(RLS)_学习笔记
  • BeautifulSoup 使用详解与实战示例