当前位置: 首页 > ds >正文

Flink中Kafka连接器的基本应用

文章目录

    • 前言
    • Kafka连接器基础案例演示
      • 前置说明和环境准备步骤
      • Kafka连接器基本配置
      • 关联数据源
      • 映射转换
      • 案例效果演示
    • 基于Kafka连接器同步数据到MySQL
      • 案例说明
      • 前置准备
      • Kafka连接器消费位点调整
      • 映射转换与数据投递
      • MysqlSlink持久化收集器数据
      • 最终效果演示
    • 小结
    • 参考

前言

本文将基于内置kafka连接器演示如何使用kafka内置流收集器的api完成Kafka数据的采集,同时我们也会给出一个收集Kafka数据流数据保存到MySQL的示例,希望对你有帮助。

Kafka连接器基础案例演示

前置说明和环境准备步骤

本案例将基于Kafka投递的单词(用逗号分隔),通过flink完成抽取,切割为独立单词,并完成词频统计,例如我们输入hello,world,最终控制台就会输出hello,1world,1

在正式演示之前,笔者介绍一些flink的使用版本:

<flink.version>1.16.0</flink.version>

对应还有下面这些依赖分别用于:

  1. 使用Kafka连接器
  2. 使用hutool的jdbc连接器
  3. MySQL驱动包
 <!-- CSV Format for Kafka (因为你的配置中用了 'format' = 'csv') --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-csv</artifactId><version>${flink.version}</version></dependency><!-- JDBC Connector (用于你的 spend_report 表写入 MySQL) --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc</artifactId><version>${flink.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version> <!-- 推荐使用 8.0.x 版本 --></dependency><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.20</version></dependency>

完成这些后我们将Kafka等相关环境准备好就可以着手编码工作了。

Kafka连接器基本配置

首先我们基于StreamExecutionEnvironment 初始化环境构建配置:

 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

然后我们就可以基于内置的KafkaSource的建造者模式完成如Kafka连接器的构建:

  1. setBootstrapServers设置Kafka地址为broker字符串配置的ip和端口号
  2. setTopics设置消费的主题为input-topic
  3. setGroupId当前kafka消费者组为my-group
  4. setStartingOffsets设置为从最早偏移量开始消费
  5. setValueOnlyDeserializer设置收到Kafka数据时直接反序列化为字符串

对应的代码如下所示:

	//基于建造者模式完成Kafka连接器的配置KafkaSource<String> source = KafkaSource.<String>builder().setBootstrapServers(brokers)//设置Kafka server端地址.setTopics("input-topic") //指定消费的Topic为input-topic.setGroupId("my-group")//设置消费组ID为my-group.setStartingOffsets(OffsetsInitializer.earliest())//设置从Kafka的最开始位置开始消费.setValueOnlyDeserializer(new SimpleStringSchema())// 设置数据直接反序列化为字符串.build();

这里需要补充一下关于Kafka消费位点的设置,flink已经内置了如下几种消费位点的设置,对应的代码配置示例如下,读者可参阅并进行配置:

KafkaSource.builder()
http://www.xdnf.cn/news/8154.html

相关文章:

  • 曾经在知乎上看到一个回答:“入职做FPGA,后续是否还可以转数字IC设计?”
  • Triton 动态链接库(DLL)初始化例程失败。
  • redis基本操作和基础命令,另外附上如何使用命令导出redis数据库及反馈的正确报文,rdb
  • 飞翔的小燕子-第16届蓝桥第6次STEMA测评Scratch真题第1题
  • TCP原理解析
  • 2025年高防IP与SCDN全面对比:如何选择最佳防护方案?
  • 智慧交通的核心引擎-车牌识别接口-车牌识别技术-新能源车牌识别
  • Postgresql14+Repmgr部署
  • 【漫话机器学习系列】272.K近邻中K的大小(K-NN Neighborhood Size)
  • 通过现代数学语言重构《道德经》核心概念体系,形成一个兼具形式化与启发性的理论框架
  • C# Unity容器详解
  • Google Prompt Tuning:文本嵌入优化揭秘
  • 小米15周年战略新品发布会:多领域创新突破,构建科技生态新蓝图
  • HUAWEI华为MateBook D 14 2021款i5,i7集显非触屏(NBD-WXX9,NbD-WFH9)原装出厂Win10系统
  • JMeter 教程:响应断言
  • 【笔试强训day39】
  • 自制操作系统day7(获取按键编码、FIFO缓冲区、鼠标、键盘控制器(Keyboard Controller, KBC)、PS/2协议)
  • brepgen 源码 笔记2
  • 巧用 FFmpeg 命令行合并多个视频为一个视频文件教程
  • CaDDN- Categorical Depth Distribution Network for Monocular 3D Object Detection
  • 比斯特自动化|移动电源全自动点焊机:高效点焊助力移动电源制造
  • 【ffmpeg】硬软编码
  • 第十周作业
  • 从单链表 list 中删除第 i 个元素--Python
  • GaussDB(PostgreSQL)查询执行计划参数解析技术文档
  • 代码随想录算法训练营第四十六四十七天
  • Ubuntu/Linux 服务器上调整系统时间(日期和时间)
  • 零基础入门:MinerU 和 PyTorch、CUDA的关系
  • Facebook广告如何投放保健品类别?
  • Python爬虫(33)Python爬虫高阶:动态页面破解与验证码OCR识别全流程实战