当前位置: 首页 > ai >正文

Flink从Kafka读取数据的完整指南

目录

代码解析

方法解析

代码解释

进阶配置

完整代码示例

运行步骤

常见问题


代码解析

  1. 导入依赖

    • org.apache.flink.streaming.api.scala._:Flink 的 Scala API。
    • org.apache.flink.connector.kafka.source.KafkaSource:Flink 提供的 Kafka Source 连接器。
    • org.apache.flink.api.common.serialization.SimpleStringSchema:用于将 Kafka 消息反序列化为字符串。
  2. 主程序

    • StreamExecutionEnvironment.getExecutionEnvironment:获取 Flink 的执行环境。
    • KafkaSource.builder():构建 Kafka Source。
      • setBootstrapServers(""):设置 Kafka 的 broker 地址(需要填写实际的 Kafka 地址,如 localhost:9092)。
      • setTopics(""):设置要消费的 Kafka topic(需要填写实际的 topic 名称)。
      • setValueOnlyDeserializer(new SimpleStringSchema()):设置消息的反序列化器,将 Kafka 消息反序列化为字符串。
    • env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source"):将 Kafka Source 转换为 Flink 的 DataStream。
    • kafkaStream.print("kafka"):将数据流打印到控制台。
    • env.execute("SourceKafka"):启动 Flink 作业。
package sourceimport java.util.Propertiesimport org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.streaming.api.scala._/**** @PROJECT_NAME: flink1.13* @PACKAGE_NAME: source* @author: 赵嘉盟-HONOR* @data: 2023-11-19 1:54* @DESCRIPTION**/
object SourceKafka {def main(args: Array[String]): Unit = {val env=StreamExecutionEnvironment.getExecutionEnvironmentval kafkaSource =KafkaSource.builder().setBootstrapServers("").setTopics("").setValueOnlyDeserializer(new SimpleStringSchema()).build()val kafkaStream: DataStream[String] =env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source")kafkaStream.print("kafka")env.execute("SourceKafka")}
}

方法解析

  1. val env=StreamExecutionEnvironment.getExecutionEnvironment: 创建一个Flink流处理执行环境,并将其赋值给变量env
  2. val kafkaSource = KafkaSource.builder(): 创建一个Kafka源构建器对象。
  3. .setBootstrapServers(""): 设置Kafka集群的地址,这里为空字符串表示使用默认的Kafka集群地址。
  4. .setTopics(""): 设置要消费的Kafka主题,这里为空字符串表示消费所有主题。
  5. .setValueOnlyDeserializer(new SimpleStringSchema()): 设置值的反序列化器,这里使用SimpleStringSchema将消息的值解析为字符串类型。
  6. .build(): 构建Kafka源对象。
  7. val kafkaStream: DataStream[String] = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka Source"): 使用Flink流处理执行环境env从Kafka源kafkaSource中创建数据流,并指定没有水印策略和数据流名称为"kafka Source"。
  8. kafkaStream.print("kafka"): 打印数据流的内容,输出到控制台,并指定输出的前缀为"kafka"。

代码解释

  1. StreamExecutionEnvironment:

    • val env = StreamExecutionEnvironment.getExecutionEnvironment:创建 Flink 流处理环境。
  2. KafkaSource:

    • KafkaSource.builder():构建 Kafka 数据源。
    • .setBootstrapServers(""):设置 Kafka 集群的地址(例如:localhost:9092)。
    • .setTopics(""):设置要消费的 Kafka 主题(例如:my-topic)。
    • .setValueOnlyDeserializer(new SimpleStringSchema()):设置消息的反序列化器,这里使用 SimpleStringSchema 将消息反序列化为字符串。
  3. env.fromSource:

    • 将 Kafka 数据源添加到 Flink 流处理环境中。
    • WatermarkStrategy.noWatermarks():不生成水印(适用于无事件时间处理的场景)。
    • "kafka Source":为数据源指定一个名称。
  4. kafkaStream.print("kafka"):

    • 打印从 Kafka 消费的消息,每条消息前加上 "kafka" 前缀。
  5. env.execute("SourceKafka"):

    • 启动 Flink 作业,作业名称为 "SourceKafka"

进阶配置

  1. 添加 Kafka 配置

    • 如果需要额外的 Kafka 配置(如消费者组 ID),可以通过 setProperties 方法设置:
      val props = new Properties()
      props.setProperty("group.id", "flink-consumer-group")
      kafkaSource.setProperties(props)
  1. Watermark 策略

    • 如果你的数据流需要处理事件时间,可以自定义 WatermarkStrategy,而不是使用 WatermarkStrategy.noWatermarks()。例如:

      scala

      WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10))

完整代码示例

以下是改进后的代码:

package sourceimport java.util.Properties
import java.time.Durationimport org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.streaming.api.scala._object SourceKafka {def main(args: Array[String]): Unit = {// 获取 Flink 执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment// Kafka 配置val props = new Properties()props.setProperty("group.id", "flink-consumer-group")// 构建 Kafka Sourceval kafkaSource = KafkaSource.builder().setBootstrapServers("localhost:9092") // Kafka broker 地址.setTopics("test-topic") // Kafka topic.setValueOnlyDeserializer(new SimpleStringSchema()) // 反序列化器.setProperties(props) // Kafka 消费者配置.build()// 从 Kafka Source 创建 DataStreamval kafkaStream: DataStream[String] = env.fromSource(kafkaSource,WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(10)), // Watermark 策略"kafka Source")// 打印数据流kafkaStream.print("kafka")// 启动 Flink 作业env.execute("SourceKafka")}
}

运行步骤

  1. 启动 Kafka:确保 Kafka 服务已启动,并创建了指定的 topic(如 test-topic)。
  2. 发送测试数据:使用 Kafka 生产者向 test-topic 发送一些测试消息。
  3. 运行程序:运行 Flink 程序,观察控制台输出。

常见问题

  1. Kafka 连接失败
    • 检查 Kafka 服务是否正常运行,以及 bootstrap.servers 配置是否正确。
  2. No Watermark Strategy
    • 如果不需要事件时间处理,可以继续使用 WatermarkStrategy.noWatermarks()
  3. 依赖冲突
    • 确保 Flink 和 Kafka 的版本兼容。
http://www.xdnf.cn/news/16952.html

相关文章:

  • 段落注入(Passage Injection):让RAG系统在噪声中保持清醒的推理能力
  • 【动态规划 | 回文字串问题】动态规划解回文问题的核心套路
  • 基于落霞归雁思维框架的自动化测试实践与探索
  • 项目一:Python实现PDF增删改查编辑保存功能的全栈解决方案
  • 使用 SecureCRT 连接华为 eNSP 模拟器的方法
  • 浅谈 Python 中的 next() 函数 —— 迭代器的驱动引擎
  • 嵌入式开发学习———Linux环境下IO进程线程学习(三)
  • 【五大联赛】 2025-2026赛季基本信息
  • android TextView lineHeight 是什么 ?
  • Android GPU测试
  • 免费MCP: JSON 转 Excel MCP
  • kubernetes基础知识
  • 数据分析—numpy库
  • 【AI云原生】1、Function Calling:大模型幻觉破解与Agent底层架构全指南(附Go+Python实战代码)》
  • Spring Batch的2种STEP定义方式
  • 数组和指针的关系
  • 从0搭建YOLO目标检测系统:实战项目+完整流程+界面开发(附源码)
  • 疯狂星期四文案网第28天运营日记
  • zookeeper持久化和恢复原理
  • 锻造企业级数字基座 - 从生死线到增长引擎的全景蓝图
  • 【设计模式】5.代理模式
  • VUE2 学习笔记16 插槽、Vuex
  • Python特性工厂函数详解:优雅管理属性验证
  • 昇思学习营-开发版-模型开发与适配
  • 【鸿蒙高级】
  • AI Competitor Intelligence Agent Team
  • 36. 有一个高 100%的 div,里面有一个高 100px 的 div,剩下一个自动填满
  • HiveMQ核心架构思维导图2024.9(Community Edition)
  • VBA 64位API声明语句第012讲
  • 实现游戏排行榜