当前位置: 首页 > java >正文

Flink-Yarn运行模式

 Yarn的部署过程

        Yarn上部署的过程是:客户端把Flink应用提交给Yarn的ResourceManager,Yarn的ResourceManager会向Yarn的NodeManager申请容器,在这些容器上,Flink会部署JobManager和TaskManager的实例,从而启动集群,Flink会根据运行在JobManager上的作业所需要的Slot数量动态分配TaskManager资源。

Flink抽象作业提交流程

  1. 一般情况下,由客户端(App)通过分发器提供的REST接口,将作业提交给JobManager;
  2. 由分发器启动JobMaster,并将作业(包含JobGraph)提交给JobMaster;
  3. JobMaster将JobMaster解析为可执行的ExecutionGraph,得到所需的资源数量,然后向资源管理器请求资源(slots)
  4. 资源管理器判断当前是否由足够的可用资源,如果没有,启动新的TaskManager;
  5. TaskManager启动后,向ResouceManager注册自己的可用任务槽(slots);
  6. TaskManager连接到对应的JobMaster,提供slots
  7. JobMaster将需要执行的任务分发给TaskManager
  8. TaskManager执行任务,互相之间交换数据

数据流图

        所有的Flink程序都可以归纳为三部分构成:Source、Transformation和Sink

        Source表示“源算子”,负责读取数据源

        Transformation表示“转换算子”,利用各种算子进行处理加工

        Sink表示“下沉算子”,负责数据的输出

Flink任务执行图

        按照生成数据可以分为四层:

        逻辑流图(StreamGraph)->作业图(JobGraph)->执行图(ExecutionGraph)->物理图(Physical Graph)

水位线

        用来衡量事件时间进展的标记,就被称为“水位线”

自定义水位线代码

周期性生成水位线

import com.shirun.bean.Event;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;// 自定义水位线的产生
public class CustomPeriodicWatermarkExample {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new ClickSource()).assignTimestampsAndWatermarks(new CustomWatermarkStrategy()).print();env.execute();}public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {@Overridepublic TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {return new SerializableTimestampAssigner<Event>() {@Overridepublic long extractTimestamp(Event element, long recordTimestamp) {return element.timestamp; // 告诉程序数据源里的时间戳是哪一个字段}};}@Overridepublic WatermarkGenerator<Event> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {return new CustomBoundedOutOfOrdernessGenerator();}}public static class CustomBoundedOutOfOrdernessGenerator implements WatermarkGenerator<Event> {private Long delayTime = 5000L; // 延迟时间private Long maxTs = -Long.MAX_VALUE + delayTime + 1L; // 观察到的最大时间戳@Overridepublic void onEvent(Event event, long eventTimestamp, WatermarkOutput output) {// 每来一条数据就调用一次maxTs = Math.max(event.timestamp, maxTs); // 更新最大时间戳}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 发射水位线,默认200ms调用一次output.emitWatermark(new Watermark(maxTs - delayTime - 1L));}}
}

断点式水位线生成器        

public class PunctuatedGenerator implements WatermarkGenerator<Event> {@Overridepublic void onEvent(Event r, long eventTimestamp, WatermarkOutput output) {// 只有在遇到特定的itemId时,才发出水位线if (r.user.equals("Mary")) {output.emitWatermark(new Watermark(r.timestamp - 1));}}@Overridepublic void onPeriodicEmit(WatermarkOutput output) {// 不需要做任何事情,因为我们在onEvent方法中发射了水位线}
}

自定义数据源发送水位线

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;import java.sql.Timestamp;
import java.util.Calendar;
import java.util.Random;public class EmitWatermarkInSourceFunction {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(new ClickSource()).print();env.execute();}// 泛型是数据源中的类型public static class ClickSource implements SourceFunction<Event> {private boolean running = true;@Overridepublic void run(SourceContext<Event> sourceContext) throws Exception {Random random = new Random();String[] userArr = {"Mary", "Bob", "Alice"};String[] urlArr  = {"./home", "./cart", "./prod?id=1"};while (running) {long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒时间戳String username = userArr[random.nextInt(userArr.length)];String url      = urlArr[random.nextInt(urlArr.length)];Event event = new Event(username, url, currTs);// 使用collectWithTimestamp方法将数据发送出去,并指明数据中的时间戳的字段sourceContext.collectWithTimestamp(event, event.timestamp);// 发送水位线sourceContext.emitWatermark(new Watermark(event.timestamp - 1L));Thread.sleep(1000L);}}@Overridepublic void cancel() {running = false;}}
}

http://www.xdnf.cn/news/7827.html

相关文章:

  • Java异常处理全解析:从基础到自定义
  • COMPUTEX 2025 | 广和通率先发布基于MediaTek T930 平台的5G模组FG390
  • 集星云推“碰一碰源码”开发思路解析
  • 【神经网络与深度学习】流模型的通俗易懂的原理
  • 鸿蒙开发——8.wrapBuilder 封装全局@Builder的高阶用法
  • 离线服务器Python环境配置指南
  • LangChain入门
  • 专业 YouTube SEO 方案:打造高排名视频的关键步骤
  • 【容易坑】mybatis中使用if标签比较两个字符串是否相等
  • SpringBoot微服务编写Dockerfile流程及问题汇总
  • Burp Suite返回中文乱码?
  • 使用 Spring AI Alibaba 集成阿里云百炼大模型应用
  • 计算机网络学习(一)—— OSI vs TCP/IP网络模型
  • 在局域网(LAN)中查看设备的 IP 地址
  • 网络:如何通过已知的电脑的机器名(计算机名),获取ip地址
  • C++线程池实现
  • jenkins数据备份
  • 程序代码篇---Python处理ESP32-S3-cam视频流
  • ROS2学习(8)------ROS2 服务说明
  • ubuntu 搭建FTP服务,接收部标机历史音视频上报服务器
  • 线性表数据结构-堆栈
  • 网络安全之APP渗透测试总结
  • Vue3 组件之间传值
  • React深度解析:Hooks体系与Redux Toolkit现代状态管理实践
  • linux 学习之位图(bitmap)数据结构
  • 宝塔安装的 MySQL 无法连接的情况及解决方案
  • StepX-Edit:一个通用图像编辑框架——论文阅读笔记
  • 计算机网络相关面试题
  • 安全可控的AI底座:灯塔大模型应用开发平台全面实现国产信创兼容适配认证
  • 数据被泄露了怎么办?