Flink Vitess CDC 环境配置与验证
一、Vitess 集群核心配置(重点:开启 CDC 支持)
1. 启用 VStream 服务(Vitess CDC 的核心)
Vitess 通过 VStream 服务提供增量数据捕获能力,需确保 VTGate 正确配置:
# 启动 VTGate 时显式启用 VStream(生产环境建议配置)
vtgate \--logtostderr \--port 15991 \ # MySQL 协议端口--grpc_port 15999 \ # gRPC 端口(Flink CDC 连接此端口)--service_map 'grpc-vtgateservice' \ # 启用 VStream 服务--vstream_heartbeat_interval 5s \ # 心跳间隔(可选)--cells zone1 \ # 指定 cell 区域--tablet_types_to_wait MASTER,REPLICA \--mysql_server_port 3306 \ # MySQL 兼容端口--mysql_auth_server_impl none # 禁用认证(测试环境)
2. 验证 VStream 可用性
# 使用 vtctlclient 验证 VStream 是否正常工作
vtctlclient -server localhost:15999 VStream -pos "" -tables "mydb.orders"
预期输出:
VStream started at position: <current VGTID>
...
二、Flink 环境集成配置
1. 添加 Maven 依赖(同上)
<dependency><groupId>com.ververica</groupId><artifactId>flink-sql-connector-vitess-cdc</artifactId><version>3.0.1</version><scope>provided</scope>
</dependency>
2. SQL Client 部署(同上)
- 下载 JAR 包:
flink-sql-connector-vitess-cdc-3.0.1.jar - 将 JAR 包放入
$FLINK_HOME/lib/
后重启 Flink 集群。
三、Flink SQL 表定义与参数详解(重点:CDC 参数)
1. 完整建表示例(含关键 CDC 参数)
-- 配置 checkpoint(每 3 秒)
SET 'execution.checkpointing.interval' = '3s';-- 创建 Vitess CDC 表(仅增量同步,无快照)
CREATE TABLE vitess_orders (order_id INT,order_date TIMESTAMP(0),customer_name STRING,price DECIMAL(10, 5),order_status BOOLEAN,-- 元数据列(Vitess 特有)keyspace_name STRING METADATA FROM 'keyspace' VIRTUAL,shard_name STRING METADATA FROM 'shard' VIRTUAL,vgtid STRING METADATA FROM 'vgtid' VIRTUAL,op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,PRIMARY KEY(order_id) NOT ENFORCED
) WITH ('connector' = 'vitess-cdc','hostname' = 'localhost', -- VTGate 主机'port' = '15999', -- VTGate gRPC 端口(非 MySQL 端口)'keyspace' = 'mydb', -- 目标 Keyspace'table-name' = 'orders', -- 表名'tablet.type' = 'REPLICA', -- 从 REPLICA 节点读取(减少主库压力)'stopOnReshard' = 'false', -- 分片变更时不停服务'tombstonesOnDelete' = 'true', -- 删除事件后发送墓碑标记'debezium.connector.vitess.include.schema.changes' = 'false', -- 忽略 schema 变更'debezium.connector.vitess.snapshot.mode' = 'never' -- 禁用快照(仅增量)
);
2. 核心 CDC 参数详解(新增/重点参数)
参数名 | 必选 | 默认值 | 类型 | 说明 |
---|---|---|---|---|
port | 是 | 15999 | Integer | VTGate gRPC 端口(非 MySQL 端口 3306),VStream 服务使用此端口 |
tablet.type | 否 | REPLICA | String | 从哪个类型的节点读取变更:MASTER (主库)、REPLICA (从库)、RDONLY (只读从库) |
debezium.connector.vitess.snapshot.mode | 否 | never | String | 快照模式:never (禁用快照,仅增量)、initial (初始快照+增量) |
debezium.connector.vitess.heartbeat.interval.ms | 否 | 5000 | Long | VStream 心跳间隔(毫秒),用于检测连接状态 |
debezium.connector.vitess.include.schema.changes | 否 | false | Boolean | 是否捕获 schema 变更(如 ALTER TABLE) |
四、环境验证与测试
1. 准备测试数据(Vitess)
-- 连接 VTGate(3306 端口)
mysql -h localhost -P 3306 -u root-- 插入测试数据
USE mydb;
INSERT INTO orders VALUES
(1, '2023-01-01 10:00:00', 'Alice', 100.50, true),
(2, '2023-01-02 11:00:00', 'Bob', 200.75, false);
COMMIT;
2. Flink SQL 验证(重点:监控 VGTID)
-- 查询 Vitess CDC 表(验证增量同步)
SELECT order_id, price,vgtid, -- Vitess 特有的全局事务 IDop_ts -- 变更时间戳
FROM vitess_orders;-- 在 Vitess 中执行变更
UPDATE mydb.orders SET price = 150.00 WHERE order_id = 1;
COMMIT;-- 观察 Flink 输出(vgtid 应变化,op_ts 为变更时间)
五、常见问题与解决方案(重点:CDC 故障排查)
-
VStream 连接失败
ERROR: Failed to connect to VStream service at localhost:15999
- 解决方案:
- 确认 VTGate 已启动且
--service_map
包含grpc-vtgateservice
- 检查防火墙是否开放 15999 端口
- 验证 VStream 服务:
vtctlclient -server localhost:15999 VStream -pos ""
- 确认 VTGate 已启动且
- 解决方案:
-
无法捕获变更
- 解决方案:
- 确认表结构符合预期(无隐藏列)
- 检查
table-name
参数是否正确(格式:keyspace.table
) - 手动触发变更并观察 VTGate 日志:
docker logs vtgate | grep "VStream"
- 解决方案:
-
VGTID 不更新
- 解决方案:
- 增加日志级别:
SET 'execution.log-level' = 'DEBUG';
- 检查 Vitess 集群状态:
vtctlclient -server localhost:15999 ListAllTablets zone1
- 增加日志级别:
- 解决方案:
六、生产环境优化建议(重点:CDC 性能)
-
高可用配置
- 配置多 VTGate 节点(负载均衡):
'hostname' = 'vtgate1:15999,vtgate2:15999,vtgate3:15999'
- 配置多 VTGate 节点(负载均衡):
-
性能调优
- 增大 VStream 缓冲区:
'debezium.connector.vitess.buffer.size' = '8192' -- 变更事件缓冲区大小
- 调整 Flink 并行度:
'parallelism' = '4' -- 根据分片数量调整
- 增大 VStream 缓冲区:
-
监控指标
- 监控 VStream 延迟:
# 查询 Vitess 内部指标 curl -s http://vtgate:15001/metrics | grep vstream
- 监控 VStream 延迟:
通过以上步骤,可完成 Flink Vitess CDC 的全流程配置与验证。关键点在于确保 VTGate 正确启用 VStream 服务,并通过 Flink 正确连接到 gRPC 端口(默认 15999)。生产环境中需特别关注 VStream 服务的高可用性、变更捕获的实时性及分片变更时的处理策略。