当前位置: 首页 > news >正文

Flink Python API 提交 Socket 数据源的 WordCount 作业

下面详细讲解使用 Flink Python API 提交 Socket 数据源的 WordCount 作业的完整流程:

一、环境准备

  1. 安装 Flink

    • 下载并解压 Flink 安装包(建议 1.14 + 版本,对 Python 支持更完善)
    • 确保环境变量FLINK_HOME已配置
  2. 安装 PyFlink

    pip install apache-flink
    
  3. 准备工具

    • 需要netcat工具用于创建 Socket 数据源
    • 确保 Java 8 + 已安装(Flink 运行依赖)

二、编写 WordCount 代码

创建wordcount_socket.py文件,实现从 Socket 读取数据并进行词频统计:

from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, Socket, Formatdef word_count():# 1. 创建执行环境env = StreamExecutionEnvironment.get_execution_environment()env.set_parallelism(1)  # 设置并行度为1,方便测试t_env = StreamTableEnvironment.create(env)# 2. 定义Socket数据源t_env.connect(Socket().version("1.0").host("localhost").port(9999)).with_format(Format().type("csv").field_delimiter("\n")  # 按行分割.field("line", DataTypes.STRING())).with_schema(Schema().field("line", DataTypes.STRING())).create_temporary_table("socket_source")# 3. 注册结果表(打印到控制台)t_env.execute_sql("""CREATE TEMPORARY TABLE result_sink (word STRING,count BIGINT) WITH ('connector' = 'print')""")# 4. 执行WordCount逻辑t_env.execute_sql("""INSERT INTO result_sinkSELECT word, COUNT(1) as countFROM (SELECT explode(split(line, ' ')) as wordFROM socket_source)GROUP BY word""")if __name__ == '__main__':word_count()

三、启动必要服务

  1. 启动 Flink 集群

    $FLINK_HOME/bin/start-cluster.sh
    
     

    访问 http://localhost:8081 可查看 Flink Web UI

  2. 启动 Socket 服务器
    打开新终端,使用 netcat 启动 Socket 服务:

    nc -lk 9999
    
     

    (-l:监听模式,-k:保持连接,9999:端口号)

四、提交 Flink Python 作业

使用 Flink 的 Python 提交脚本提交作业:

$FLINK_HOME/bin/flink run -py wordcount_socket.py

提交成功后,会显示作业 ID,同时在 Flink Web UI 上能看到运行中的作业。

五、测试作业

  1. 在启动 netcat 的终端中输入文本,例如:

    hello flink
    hello python
    flink python
    hello world
    
  2. 查看 Flink 作业输出

    • 方式 1:在 Flink Web UI 中,进入对应作业的 Task Managers 页面,查看 Stdout
    • 方式 2:查看 Flink 的日志文件:$FLINK_HOME/log/flink-*-taskexecutor-*.out

    输出结果会类似:

    +I[hello, 1]
    +I[flink, 1]
    +I[hello, 2]
    +I[python, 1]
    +I[flink, 2]
    +I[python, 2]
    +I[hello, 3]
    +I[world, 1]
    

六、停止服务

  1. 停止作业:在 Flink Web UI 中点击作业,选择 "Cancel"
  2. 停止 Socket 服务器:按Ctrl+C
  3. 停止 Flink 集群:
    $FLINK_HOME/bin/stop-cluster.sh
    

关键说明

  1. Socket 数据源特性:Socket 是流式数据源,会持续监听指定端口接收数据
  2. 动态结果:WordCount 会对输入的单词进行持续计数,相同单词会更新计数
  3. 并行度设置:示例中设置并行度为 1,实际生产可根据集群资源调整
  4. 异常处理:如果 Socket 服务中断,作业会失败,需要重启 Socket 服务和作业

通过以上步骤,你可以完整体验 Flink Python 作业从编写、提交到运行的整个流程。

http://www.xdnf.cn/news/1288387.html

相关文章:

  • uni-app实战教程 从0到1开发 画图软件 (学会画图)
  • Flutter UI Kits by Olayemi Garuba:免费开源的高质量UI组件库
  • nvm install 14.21.3 时npm 无法下载和识别
  • -bash: ./restart.sh: /bin/bash^M: 坏的解释器: 没有那个文件或目录
  • 1.Ansible 自动化介绍
  • 串口通信“第二次总超时”的复盘
  • ETCD备份
  • aspose word for java 使用书签进行内容填充和更新
  • SM4对称加密算法的加密模式介绍
  • Python Day28 HTML 与 CSS 核心知识点 及例题分析
  • 自动驾驶 HIL 测试:构建 “以假乱真” 的实时数据注入系统
  • 《嵌入式Linux应用编程(四):Linux文件IO系统调用深度解析》
  • GraphQL 原理、应用与实践指南
  • 【Altium designer】快速建立原理图工程的步骤
  • Day05 店铺营业状态设置 Redis
  • MySQL-多表查询
  • 第23章,景深:技术综述
  • 下一代防火墙技术
  • 【KO】android 面试 算法
  • 数字气压传感器,筑牢汽车TPMS胎压监测系统的精准感知基石
  • 西门子S7-200与S7-1200通过PPI以太网模块通讯,赋能汽车制造行业发展
  • 如何在 Ubuntu 24.04 LTS Linux 中安装 JSON Server
  • WebAssembly的原理与使用
  • 前端最新Vue2+Vue3基础入门到实战项目全套教程,自学前端vue就选黑马程序员,一套全通关!笔记
  • Tauri Qt孰优孰劣
  • 计算机毕设不知道选什么题目?基于Spark的糖尿病数据分析系统【Hadoop+Spark+python】
  • 数据结构 二叉树(2)堆
  • 91、23种经典设计模式
  • AI大模型基础:BERT、GPT、Vision Transformer(ViT)的原理、实现与应用
  • 农业智慧大屏系统 - Flask + Vue实现