基于Hadoop3.3.4+Flink1.17.0+FlinkCDC3.0.0+Iceberg1.5.0整合,实现数仓实时同步mysql数据
一、安装部署Hadoop 3.3.4
1. 基础环境准备(两台都做)
1. 服务器假设
主机名 | IP地址 | 角色 |
---|---|---|
master | 192.168.1.10 | NameNode, ResourceManager |
slave | 192.168.1.11 | DataNode, NodeManager |
2. 前置依赖安装(两台机器都执行)
① 安装 JDK(建议 JDK 8)
yum install -y java-1.8.0-openjdk-devel
验证:
java -version
② 配置主机名和 hosts(两台都做)
# 修改主机名(分别执行)
hostnamectl set-hostname master # 在 master 执行
hostnamectl set-hostname slave # 在 slave 执行# 编辑 hosts 文件(两台都加)
vim /etc/hosts
添加如下内容:
192.168.1.10 master
192.168.1.11 slave
③ 配置 SSH 免密(只在 master 上操作)
ssh-keygen -t rsa
ssh-copy-id master
ssh-copy-id slave
验证:
ssh master
ssh slave
3. 下载并解压 Hadoop
在两台机器都执行:
wget https://dlcdn.apache.org/hadoop/common/hadoop-3.3.4/hadoop-3.3.4.tar.gz
tar -zxvf hadoop-3.3.4.tar.gz -C /opt/
ln -s /opt/hadoop-3.3.4 /opt/hadoop
4. 配置环境变量(两台都配置)
vim /etc/profile
添加:
export JAVA_HOME=/usr/lib/jvm/java-1.8.0
export HADOOP_HOME=/opt/hadoop
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin# 启动权限设置
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_JOURNALNODE_USER=root
export HDFS_ZKFC_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export HDFS_DATANODE_SECURE_USER=hdfs
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root
执行使其生效:
source /etc/profile
2. Hadoop 配置(在 Master 上操作,配置完成后拷贝到 Slave)
假设 Hadoop 安装目录是 /opt/hadoop
,配置文件在 $HADOOP_HOME/etc/hadoop/
下。
1. core-site.xml
<configuration><property><name>fs.defaultFS</name><value>hdfs://master:9000</value></property>
</configuration>
2. hdfs-site.xml
<configuration><property><name>dfs.replication</name><value>1</value> <!-- 两台机器,只设一个副本 --></property><property><name>dfs.namenode.name.dir</name><value>file:/opt/hadoop/tmp/dfs/name</value></property><property><name>dfs.datanode.data.dir</name><value>file:/opt/hadoop/tmp/dfs/data</value></property>
</configuration>
3. mapred-site.xml
复制模板:
cp mapred-site.xml.template mapred-site.xml
内容如下:
<configuration><property><name>mapreduce.framework.name</name><value>yarn</value></property>
</configuration>
4. yarn-site.xml
<configuration><!-- ResourceManager 设置 --><property><name>yarn.resourcemanager.hostname</name><value>master</value></property><!-- NodeManager 使用 shuffle 服务 --><property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property>
</configuration>
5. 修改workers
编辑 $HADOOP_HOME/etc/hadoop/workers
:
vim workers
内容:
master
slave
3. 将 Hadoop 分发到 Slave
在 Master 上执行:
scp -r /opt/hadoop-3.3.4 slave:/opt/
scp -r /opt/hadoop slave:/opt/
scp /etc/profile slave:/etc/profile
然后在 slave 上:
source /etc/profile
4. 格式化 HDFS(仅第一次在 Master 上做)
hdfs namenode -format
5. 启动 Hadoop 集群(在 master 上)
1. 启动 HDFS
start-dfs.sh
后台会启动:
-
master:NameNode + DataNode
-
slave:DataNode
验证:
jps
-
master:NameNode, DataNode
-
slave:DataNode
浏览器访问:http://master:9870
2. 启动 YARN
start-yarn.sh
后台会启动:
-
master:ResourceManager
-
slave:NodeManager
验证:
jps
-
master:ResourceManager
-
slave:NodeManager
浏览器访问:http://master:8088
6. 验证集群是否成功运行
1. 查看 Web UI
-
HDFS UI: http://master:9870
-
YARN UI: http://master:8088
2. 测试上传文件
hdfs dfs -mkdir /test
echo "hello hadoop" > hello.txt
hdfs dfs -put hello.txt /test
hdfs dfs -ls /test
7. 总结启动/停止命令
功能 | 命令 |
---|---|
启动 HDFS | start-dfs.sh |
停止 HDFS | stop-dfs.sh |
启动 YARN | start-yarn.sh |
停止 YARN | stop-yarn.sh |
8. 注意事项
-
NameNode 格式化只能执行一次,重复会导致元数据丢失。
-
所有路径(如
dfs.name.dir
)需确保存在或能自动创建,权限正确。 -
如果端口冲突(9000、9870、8088),可以在配置中修改。
-
所有节点时间应同步(可配置 NTP)。
二、安装部署 Flink 1.17.0,并整合Flink CDC 3.0.0 + Iceberg 1.5.0
1. 组件依赖关系简图
Flink 1.17.0
├── 集成 Flink CDC 3.0.0(从 MySQL/Postgres 等采集变更数据)
└── 集成 Iceberg 1.5.0(用于将结果写入 HDFS 格式的表)└── 基于 Hadoop Catalog(不使用 Hive)
2. 已具备条件
软件/组件 | 状态 |
---|---|
Hadoop 3.3.4 | ✔ 已部署(HDFS + YARN) |
Java 8 | ✔ 已安装 |
SSH 免密登录 | ✔ 已配置 |
Master:192.168.1.10,Slave:192.168.1.11 | ✔ |
3. 安装 Flink 1.17.0
1. 下载 Flink 并解压(两台都执行)
wget https://archive.apache.org/dist/flink/flink-1.17.0/flink-1.17.0-bin-scala_2.12.tgz
tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/
ln -s /opt/flink-1.17.0 /opt/flink
2. 配置环境变量(两台)
vim /etc/profile
添加:
export FLINK_HOME=/opt/flink
export PATH=$PATH:$FLINK_HOME/bin
export HADOOP_CLASSPATH=`hadoop classpath`
source /etc/profile
3. 配置 Flink(仅在 master 上配置好后分发)
编辑 $FLINK_HOME/conf/flink-conf.yaml
:
jobmanager.rpc.address: mastertaskmanager.numberOfTaskSlots: 2# 添加Hadoop类路径
env.hadoop.conf.dir: /path/to/hadoop/etc/hadoop
4. 设置 masters 和 workers
编辑 $FLINK_HOME/conf/
masters:
master:8081
编辑 $FLINK_HOME/conf/workers
:
master
slave
5. 分发 Flink 到 Slave
scp -r /opt/flink-1.17.0 slave:/opt/
scp -r /opt/flink slave:/opt/
scp /etc/profile slave:/etc/profile
ssh slave 'source /etc/profile'
4. 集成 Flink CDC 3.0.0
Flink CDC 3.0.0 为 Table Store 模式,支持动态表更新流,可连接 MySQL、PostgreSQL、Oracle 等。需下载相关jar包,以下是 Flink CDC 3.0.0 与 Flink JDBC Connector 3.1.0-1.17 的官方下载地址及关键信息整理。
1. flink-sql-connector-mysql-cdc-3.0.0.jar
- 作用:用于 Flink SQL 连接 MySQL CDC 数据源,支持实时捕获 MySQL 的增量数据变更。
- 下载地址:
- Maven 中央仓库:
https://repo.maven.apache.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/3.0.0/flink-sql-connector-mysql-cdc-3.0.0.jar
(直接下载链接,无需登录) - GitHub Release(备用):
https://github.com/ververica/flink-cdc-connectors/releases/download/release-3.0.0/flink-sql-connector-mysql-cdc-3.0.0.jar
- Maven 中央仓库:
2. flink-connector-jdbc-3.1.0-1.17.jar
- 作用:提供 Flink 写入 JDBC 数据库(如 MySQL、PostgreSQL)的能力,用于构建数据同步的 Sink 表。
- 下载地址:
- Maven 中央仓库:
https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar - 官方文档推荐:
Apache Flink 官网的 Connectors 文档 中也引用此链接。
- Maven 中央仓库:
3. 关键注意事项
- 版本兼容性:
- Flink CDC 3.0.0:需搭配 Flink 1.14+ 和 JDK 8+,支持 MySQL 5.6/5.7/8.0。
- JDBC Connector 3.1.0-1.17:专为 Flink 1.17.x 设计,需配合 MySQL JDBC 驱动 8.0.28(推荐)。
- 部署方式:
- 将下载的 JAR 包放入 Flink 的
lib/
目录。 - 重启 Flink 集群生效(执行
stop-cluster.sh
和start-cluster.sh
)。
- 将下载的 JAR 包放入 Flink 的
- MySQL 驱动依赖(JDBC Sink 必需):
-
mysql-connector-java-8.0.28.jar
下载地址:
https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.28/mysql-connector-java-8.0.28.jar
-
4. 依赖关系总结
组件 | 版本 | 下载地址 |
---|---|---|
Flink SQL MySQL CDC | 3.0.0 | Maven |
Flink JDBC Connector | 3.1.0-1.17 | Maven |
MySQL JDBC Driver | 8.0.28 | Maven |
按需下载后放置于 $FLINK_HOME/lib/
即可。
5. 集成 Iceberg 1.5.0
1. 下载 Flink-Iceberg 连接器(支持 Flink 1.17)
mkdir -p /opt/flink/lib/icebergcd /opt/flink/lib/icebergwget https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.17/1.5.0/iceberg-flink-runtime-1.17-1.5.0.jar
❗注意:该包自动包含 Flink Table 所需依赖,无需手动导入其他 flink-table jars。
2. 配置 Hadoop Catalog 模式(推荐方式)
确保 Hadoop 集群运行,然后 Flink 连接 Iceberg 时使用如下 SQL:
CREATE CATALOG hadoop_catalog WITH ('type' = 'iceberg','catalog-type' = 'hadoop','warehouse' = 'hdfs://master:9000/iceberg/warehouse','property-version' = '1'
);
6. 启动 Flink 集群
在 Master 上执行:
start-cluster.sh
默认会启动:
-
master:JobManager
-
slave:TaskManager
-
master:也作为一个 TaskManager(可选)
验证:
jps
-
master:JobManager(StandaloneSessionClusterEntrypoint) + TaskManager
-
slave:TaskManager
Web UI 访问:http://master:8081
7. 验证 CDC + Iceberg 流任务
以下为示例 SQL 流任务(使用 sql-client
):
1. 启动 SQL CLI
./bin/sql-client.sh embedded
2. 注册 catalog & 数据表
-- 注册 Iceberg catalog
CREATE CATALOG hadoop_catalog WITH ('type' = 'iceberg','catalog-type' = 'hadoop','warehouse' = 'hdfs://master:9000/iceberg/warehouse','property-version' = '1'
);USE CATALOG hadoop_catalog;
CREATE DATABASE demo;USE demo;-- 创建 CDC 源表(MySQL)
CREATE TABLE mysql_users (id INT,name STRING,age INT,update_time TIMESTAMP(3),PRIMARY KEY (id) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '192.168.1.100','port' = '3306','username' = 'root','password' = '123456','database-name' = 'test','table-name' = 'users'
);-- 创建 Iceberg 目标表
CREATE TABLE iceberg_users (id INT,name STRING,age INT,update_time TIMESTAMP(3),PRIMARY KEY (id) NOT ENFORCED
);-- 写入任务
INSERT INTO iceberg_users SELECT * FROM mysql_users;
8. 常用维护命令
操作 | 命令 |
---|---|
启动 Flink | start-cluster.sh |
停止 Flink | stop-cluster.sh |
查看 Web UI | http://master:8081 |
提交任务 | flink run xxx.jar 或 SQL CLI |
查看日志 | $FLINK_HOME/log |
9. 目录结构推荐
/opt/
├── hadoop/
├── flink/
│ ├── lib/
│ │ ├── cdc/
│ │ └── iceberg/
└── iceberg/└── warehouse/ (HDFS 路径)