当前位置: 首页 > news >正文

Flink SQL 将kafka topic的数据写到另外一个topic里面

-- 创建源表,使用 RAW 格式接收原始 JSON 数据
CREATE TABLE source_kafka (
id STRING,
`data` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'source_kafka-topic',
'properties.bootstrap.servers' = 'master01:9092',
'properties.group.id' = 'flink-kafka-group',
'scan.startup.mode' = 'latest-offset',
'key.format' = 'csv',
'key.fields' = 'id',
'value.format' = 'raw',
'value.fields-include' = 'EXCEPT_KEY'
);-- 创建目标表,同样使用 RAW 格式保持数据原样
CREATE TABLE sink_kafka (
id STRING,
`data` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'sink-kafka-topic',
'properties.bootstrap.servers' = 'master02:9092',
'key.format' = 'csv',
'key.fields' = 'id',
'value.format' = 'raw',
'value.fields-include' = 'EXCEPT_KEY'
);-- 执行复制操作
INSERT INTO sink_kafka
SELECT * FROM source_kafka;
  • value.fields-include

定义消息体(Value)格式如何处理消息键(Key)字段的策略。 默认情况下,表结构中 ‘ALL’ 即所有的字段都会包含在消息体格式中,即消息键字段在消息键和消息体格式中都会出现。

  • 参考文档:
    https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/kafka/#%E6%B6%88%E6%81%AF%E9%94%AEkey%E4%B8%8E%E6%B6%88%E6%81%AF%E4%BD%93value%E7%9A%84%E6%A0%BC%E5%BC%8F

https://nightlies.apache.org/flink/flink-docs-master/zh/docs/connectors/table/formats/overview/

http://www.xdnf.cn/news/436609.html

相关文章:

  • RPM 包制作备查 SRPM 包编译
  • 通过MCP让LLM调用系统接口
  • PDF Base64格式字符串转换为PDF文件临时文件
  • RabbitMQ 快速上手:安装配置与 HelloWorld 实践(一)
  • 【CUDA】Sgemm单精度矩阵乘法(下)
  • MQ消息队列的深入研究
  • STM32F103C8T6板子使用说明
  • 通讯录管理系统(IO_序列化和反序列化版)
  • 学习日志05 java
  • Cookie、 Local Storage、 Session Storage三种客户端存储方式
  • vshell渗透测试工具介绍
  • 从零实现一个高并发内存池 - 1
  • WHAT - 《成为技术领导者》思考题(第八章)
  • Yarn-Tool接口定义
  • python高级特性二
  • Java 反射
  • 久坐办公自动提醒休息的工具
  • QLineEdit增加点击回显功能
  • PH热榜 | 2025-05-13
  • arctanx 导数 泰勒展开式证明
  • 机器学习2
  • 鹅厂面试数学题
  • 典籍指数问答模块回答格式修改
  • java中的Optional
  • 如何优化 Linux 服务器的磁盘 I/O 性能
  • 【Nova UI】十五、打造组件库之滚动条组件(上):滚动条组件的起步与进阶
  • 【学习笔记】Shell编程---流程控制语句
  • PNG图片转icon图标Python脚本(简易版) - 随笔
  • 动态规划问题 -- 多状态模型(打家劫舍)
  • Java的进制转换