当前位置: 首页 > news >正文

消费 Kafka 一个TOPIC数据,插入到另一个KAFKA的TOPIC

从 Kafka 消费 CDC 数据(变更捕获,需 Upsert 语义)
用 kafka 连接器 + 主键 + 处理函数 模拟 Upsert,示例:
CREATE TABLE `KAFKA_TEST_0002` (
`LGL_PERN_CODE` VARCHAR COMMENT 'LGL_PERN_CODE',
`LBLTY_ACCT_NUM` VARCHAR COMMENT 'LBLTY_ACCT_NUM',
`ACCT_NM` VARCHAR COMMENT 'ACCT_NM',
`CUST_NUM` VARCHAR COMMENT 'CUST_NUM',
`NAT_CODE` VARCHAR COMMENT 'NAT_CODE',
-- 声明主键(用于 Upsert 去重)
PRIMARY KEY (`LBLTY_ACCT_NUM`) NOT ENFORCED 
) WITH (
'connector' = 'kafka',  -- 恢复为 kafka 连接器
'topic' = 'KAFKA_TEST_0002',
'properties.bootstrap.servers' = '10.57.48.38:21007,10.57.48.37:21007,10.57.48.36:21007',
'properties.group.id' = '7a074dd07bfb4d4da39eb0f5773b952b',
'scan.startup.mode' = 'earliest-offset',
'format' = 'debezium-json',  -- 适配 CDC 格式
'debezium-json.ignore-parse-errors' = 'true',
'debezium-json.schema-include' = 'true',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.kerberos.domain.name' = 'hadoop.124dba82_3b54_0125_81e4_110652049a41.com',
'properties.sasl.kerberos.service.name' = 'kafka'
);

-- 如需 Upsert 输出,再通过 Sink 写入 upsert-kafka
CREATE TABLE KafkaUpsertSink (
`LBLTY_ACCT_NUM` VARCHAR,
`LGL_PERN_CODE` VARCHAR,
`ACCT_NM` VARCHAR,
PRIMARY KEY (`LBLTY_ACCT_NUM`) NOT ENFORCED 
) WITH (
'connector' = 'upsert-kafka',  -- Sink 侧使用 upsert-kafka
'topic' = 'sink_topic',
'properties.bootstrap.servers' = '...',
'key.format' = 'json',
'value.format' = 'json'
);

-- 业务逻辑:从 Kafka 读 CDC 数据,处理后 Upsert 写入
INSERT INTO KafkaUpsertSink
SELECT LBLTY_ACCT_NUM, LGL_PERN_CODE, ACCT_NM
FROM `KAFKA_TEST_0002`;

http://www.xdnf.cn/news/1120537.html

相关文章:

  • python学习2
  • ubuntu(22.04)系统上安装 MuJoCo
  • FRP Ubuntu 服务端 + MacOS 客户端配置
  • 微前端架构详解
  • 《C++初阶之STL》【泛型编程 + STL简介】
  • Nacos 技术研究文档(基于 Nacos 3)
  • 基于R语言的极值统计学及其在相关领域中的实践技术应用
  • 迅为八核高算力RK3576开发板摄像头实时推理测试 ppyoloe目标检测
  • 《亿级流量系统架构设计与实战》通用高并发架构设计 读场景
  • 文心4.5开源之路:引领技术开放新时代!
  • Go从入门到精通(22) - 一个简单web项目-统一日志输出
  • 如何单独安装设置包域名
  • LeetCode--45.跳跃游戏 II
  • 雷卯针对灵眸科技RV1106G3开发板防雷防静电方案
  • AI数字人正成为医药行业“全场景智能角色”,魔珐科技出席第24届全国医药工业信息年会
  • 2024年中国公交网络数据集(Shp/分城市)
  • 【DOCKER】-6 docker的资源限制与监控
  • 【机器学习深度学习】Ollama vs vLLM vs LMDeploy:三大本地部署框架深度对比解析
  • ElasticSearch重置密码
  • LabVIEW浏览器ActiveX事件交互
  • JavaScript 性能优化实战:深入性能瓶颈,精炼优化技巧与最佳实践
  • aspnetcore Mvc配置选项中的ModelBindingMessageProvider
  • 多任务——协程
  • VictoriaMetrics 架构
  • VR样板间:房产营销新变革
  • 纯数学专业VS应用数学专业:这两个哪个就业面更广?
  • Cannot add property 0, object is not extensible
  • 【橘子分布式】Thrift RPC(理论篇)
  • iOS APP 上架流程:跨平台上架方案的协作实践记录
  • [Nagios Core] 通知系统 | 事件代理 | NEB模块,事件,回调