当前位置: 首页 > java >正文

基于Scala实现Flink的三种基本时间窗口操作

目录

代码结构

代码解析

(1) 主程序入口

(2) 窗口联结(Window Join)

(3) 间隔联结(Interval Join)

(4) 窗口同组联结(CoGroup)

(5) 执行任务

代码优化

(1) 时间戳分配

(2) 窗口大小

(3) 输出格式

(4) 并行度

优化后的代码


 

这段代码展示了 Apache Flink 中三种不同的流联结操作:窗口联结(Window Join)间隔联结(Interval Join) 和 窗口同组联结(CoGroup)。以下是对代码的详细解析和说明: 

代码结构

  • 包声明package transformplus
    定义了代码所在的包。

  • 导入依赖
    导入了 Flink 相关类库,包括流处理 API、窗口分配器、时间语义等。

  • WindowJoin 对象
    主程序入口,包含三种流联结操作的实现。

package transformplusimport java.langimport org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import source.Event/**** @PROJECT_NAME: flink1.13* @PACKAGE_NAME: transformplus* @author: 赵嘉盟-HONOR* @data: 2023-12-05 12:05* @DESCRIPTION**/
object WindowJoin {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)//TODO 窗口联结(join)val stream1 = env.fromElements(("a", 1000L),("b", 1000L),("a", 2000L),("b", 6000L)).assignAscendingTimestamps(_._2)val stream2 = env.fromElements(("a", 3000L),("b", 3000L),("a", 4000L),("b", 8000L)).assignAscendingTimestamps(_._2)stream1.join(stream2).where(_._1).equalTo(_._1).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply((e1,e2)=>e1+"->"+e2).print("Join")//TODO 间隔联结:用户行为事件联系(intervalJoin)// 订单事件流val orderStream: DataStream[(String, String, Long)] = env.fromElements(("Mary", "order-1", 5000L),("Alice", "order-2", 5000L),("Bob", "order-3", 20000L),("Alice", "order-4", 20000L),("Cary", "order-5", 51000L)).assignAscendingTimestamps(_._3)// 点击事件流val pvStream: DataStream[Event] = env.fromElements(Event("Bob", "./cart", 2000L),Event("Alice", "./prod?id=100", 3000L),Event("Alice", "./prod?id=200", 3500L),Event("Bob", "./prod?id=2", 2500L),Event("Alice", "./prod?id=300", 36000L),Event("Bob", "./home", 30000L),Event("Bob", "./prod?id=1", 23000L),Event("Bob", "./prod?id=3", 33000L)).assignAscendingTimestamps(_.timestamp)orderStream.keyBy(_._1).intervalJoin(pvStream.keyBy(_.user)).between(Time.seconds(-5),Time.seconds(10)).process(new ProcessJoinFunction[(String,String,Long),Event,String] {override def processElement(in1: (String, String, Long), in2: Event, context: ProcessJoinFunction[(String, String, Long), Event, String]#Context, collector: Collector[String]): Unit = {collector.collect(in1+"=>"+in2)}}).print("intervalJoin")//TODO 窗口同组联结: coGroup(iterable)stream1.coGroup(stream2).where(_._1).equalTo(_._1).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction[(String,Long),(String,Long),String] {override def coGroup(iterable: lang.Iterable[(String, Long)], iterable1: lang.Iterable[(String, Long)], collector: Collector[String]): Unit = {collector.collect(iterable+"=>"+iterable1)}}).print("coGroup")env.execute("windowJoin")}
}

代码解析

(1) 主程序入口
def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(1)
  • 创建 Flink 流处理环境 StreamExecutionEnvironment,并设置并行度为 1。
(2) 窗口联结(Window Join)
val stream1 = env.fromElements(("a", 1000L),("b", 1000L),("a", 2000L),("b", 6000L)
).assignAscendingTimestamps(_._2)val stream2 = env.fromElements(("a", 3000L),("b", 3000L),("a", 4000L),("b", 8000L)
).assignAscendingTimestamps(_._2)stream1.join(stream2).where(_._1).equalTo(_._1).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply((e1, e2) => e1 + "->" + e2).print("Join")
  • 数据流:定义了两个流 stream1 和 stream2,分别包含键值对 (String, Long)
  • 时间戳分配:使用 assignAscendingTimestamps 方法为事件分配时间戳。
  • 窗口联结
    • 使用 join 方法将两个流按键(_._1)联结。
    • 使用 TumblingEventTimeWindows 定义 5 秒的滚动窗口。
    • 使用 apply 方法将匹配的事件对拼接成字符串并输出。
(3) 间隔联结(Interval Join)
val orderStream: DataStream[(String, String, Long)] = env.fromElements(("Mary", "order-1", 5000L),("Alice", "order-2", 5000L),("Bob", "order-3", 20000L),("Alice", "order-4", 20000L),("Cary", "order-5", 51000L)).assignAscendingTimestamps(_._3)val pvStream: DataStream[Event] = env.fromElements(Event("Bob", "./cart", 2000L),Event("Alice", "./prod?id=100", 3000L),Event("Alice", "./prod?id=200", 3500L),Event("Bob", "./prod?id=2", 2500L),Event("Alice", "./prod?id=300", 36000L),Event("Bob", "./home", 30000L),Event("Bob", "./prod?id=1", 23000L),Event("Bob", "./prod?id=3", 33000L)).assignAscendingTimestamps(_.timestamp)orderStream.keyBy(_._1).intervalJoin(pvStream.keyBy(_.user)).between(Time.seconds(-5), Time.seconds(10)).process(new ProcessJoinFunction[(String, String, Long), Event, String] {override def processElement(in1: (String, String, Long), in2: Event, context: ProcessJoinFunction[(String, String, Long), Event, String]#Context, collector: Collector[String]): Unit = {collector.collect(in1 + "=>" + in2)}}).print("intervalJoin")
  • 数据流:定义了两个流 orderStream(订单事件)和 pvStream(点击事件)。
  • 时间戳分配:为事件分配时间戳。
  • 间隔联结
    • 使用 intervalJoin 方法将两个流按键(_._1 和 user)联结。
    • 使用 between 方法定义时间间隔(前 5 秒到后 10 秒)。
    • 使用 process 方法将匹配的事件对拼接成字符串并输出。
(4) 窗口同组联结(CoGroup)
stream1.coGroup(stream2).where(_._1).equalTo(_._1).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction[(String, Long), (String, Long), String] {override def coGroup(iterable: lang.Iterable[(String, Long)], iterable1: lang.Iterable[(String, Long)], collector: Collector[String]): Unit = {collector.collect(iterable + "=>" + iterable1)}}).print("coGroup")
  • 窗口同组联结
    • 使用 coGroup 方法将两个流按键(_._1)联结。
    • 使用 TumblingEventTimeWindows 定义 5 秒的滚动窗口。
    • 使用 apply 方法将匹配的事件集合拼接成字符串并输出。
(5) 执行任务
env.execute("windowJoin")
  • 启动 Flink 流处理任务,任务名称为 windowJoin

代码优化

(1) 时间戳分配
  • assignAscendingTimestamps 方法假设事件时间戳是严格递增的。如果时间戳可能乱序,应使用 assignTimestampsAndWatermarks 方法:

    java

    stream1.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event: (String, Long), timestamp: Long) => event._2)
    )
(2) 窗口大小
  • 窗口大小(5 秒)可能不适合所有场景。应根据实际需求调整窗口大小。
(3) 输出格式
  • 输出格式较为简单,可以优化为更易读的形式:

    java

    collector.collect(s"Order: ${in1._2}, Click: ${in2.url}")
(4) 并行度
  • 并行度设置为 1,可能影响性能。可以根据集群资源调整并行度:

    java

    env.setParallelism(4)

优化后的代码

以下是优化后的完整代码:

package transformplusimport java.lang
import java.time.Durationimport org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector
import source.Eventobject WindowJoin {def main(args: Array[String]): Unit = {val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(4)// 窗口联结val stream1 = env.fromElements(("a", 1000L),("b", 1000L),("a", 2000L),("b", 6000L)).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event: (String, Long), timestamp: Long) => event._2))val stream2 = env.fromElements(("a", 3000L),("b", 3000L),("a", 4000L),("b", 8000L)).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event: (String, Long), timestamp: Long) => event._2))stream1.join(stream2).where(_._1).equalTo(_._1).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply((e1, e2) => s"${e1._1} (${e1._2}) -> ${e2._1} (${e2._2})").print("Join")// 间隔联结val orderStream: DataStream[(String, String, Long)] = env.fromElements(("Mary", "order-1", 5000L),("Alice", "order-2", 5000L),("Bob", "order-3", 20000L),("Alice", "order-4", 20000L),("Cary", "order-5", 51000L)).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event: (String, String, Long), timestamp: Long) => event._3))val pvStream: DataStream[Event] = env.fromElements(Event("Bob", "./cart", 2000L),Event("Alice", "./prod?id=100", 3000L),Event("Alice", "./prod?id=200", 3500L),Event("Bob", "./prod?id=2", 2500L),Event("Alice", "./prod?id=300", 36000L),Event("Bob", "./home", 30000L),Event("Bob", "./prod?id=1", 23000L),Event("Bob", "./prod?id=3", 33000L)).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)).withTimestampAssigner((event: Event, timestamp: Long) => event.timestamp))orderStream.keyBy(_._1).intervalJoin(pvStream.keyBy(_.user)).between(Time.seconds(-5), Time.seconds(10)).process(new ProcessJoinFunction[(String, String, Long), Event, String] {override def processElement(in1: (String, String, Long), in2: Event, context: ProcessJoinFunction[(String, String, Long), Event, String]#Context, collector: Collector[String]): Unit = {collector.collect(s"Order: ${in1._2}, Click: ${in2.url}")}}).print("intervalJoin")// 窗口同组联结stream1.coGroup(stream2).where(_._1).equalTo(_._1).window(TumblingEventTimeWindows.of(Time.seconds(5))).apply(new CoGroupFunction[(String, Long), (String, Long), String] {override def coGroup(iterable: lang.Iterable[(String, Long)], iterable1: lang.Iterable[(String, Long)], collector: Collector[String]): Unit = {collector.collect(s"Stream1: ${iterable.toString}, Stream2: ${iterable1.toString}")}}).print("coGroup")env.execute("windowJoin")}
}
http://www.xdnf.cn/news/12670.html

相关文章:

  • 跨平台资源下载工具:res-downloader 的使用体验
  • Vue3中computed和watch的区别
  • OpenLayers 导航之运动轨迹
  • 深入剖析 RocketMQ 中的 DefaultMQPushConsumerImpl:消息推送消费的核心实现
  • Docker基础(二)
  • TTL简述
  • Unity基础-欧拉角和四元数
  • 【Elasticsearch】映射:Join 类型、Flattened 类型、多表关联设计
  • 基于springboot的藏文古籍系统
  • Nature子刊:16S宏基因组+代谢组学联动,借助MicrobiomeGS2建模揭示IBD代谢治疗新靶点
  • Java高级 | 【实验六】Springboot文件上传和下载
  • Python 中的MVC与MVP 框架与示例
  • LVGL对显示接口的要求
  • 闲庭信步使用SV搭建图像测试平台:第一课——图片的读写
  • 【商城saas和商城源码的区别】
  • 【Zephyr 系列 13】BLE Mesh 入门实战:构建基础节点通信与中继组播系统
  • 类型别名与类型自动推导
  • Redis数据持久化之RDB快照
  • 【走好求职第一步】求职OMG——见面课测验4
  • SAP学习笔记 - 开发27 - 前端Fiori开发 Routing and Navigation(路由和导航)
  • 算术图片验证码(四则运算)+selenium
  • 【大模型】大模型RAG(Retrieval-Augmented Generation)面试题合集
  • 欢乐熊大话蓝牙知识16:蓝牙是怎么找设备的?扫描与广播的“对话内幕”
  • Shell编程精髓:表达式与数组实战指南
  • DbServer链接KingBase8(人大)数据库
  • Android座舱系统Agent改造方案
  • day 47
  • 微前端架构下的B端页面设计:模块化与跨团队协作的终极方案
  • Python爬虫-爬取各省份各年份高考分数线数据,进行数据分析
  • 国产pcie switch,支持PCIE 3.0/4.0/5.0,支持昇腾310/910 GPU,支持龙芯、海光、飞腾