Flink Python API 提交 Socket 数据源的 WordCount 作业
下面详细讲解使用 Flink Python API 提交 Socket 数据源的 WordCount 作业的完整流程:
一、环境准备
-
安装 Flink
- 下载并解压 Flink 安装包(建议 1.14 + 版本,对 Python 支持更完善)
- 确保环境变量
FLINK_HOME
已配置
-
安装 PyFlink
pip install apache-flink
-
准备工具
- 需要
netcat
工具用于创建 Socket 数据源 - 确保 Java 8 + 已安装(Flink 运行依赖)
- 需要
二、编写 WordCount 代码
创建wordcount_socket.py
文件,实现从 Socket 读取数据并进行词频统计:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.descriptors import Schema, Socket, Formatdef word_count():# 1. 创建执行环境env = StreamExecutionEnvironment.get_execution_environment()env.set_parallelism(1) # 设置并行度为1,方便测试t_env = StreamTableEnvironment.create(env)# 2. 定义Socket数据源t_env.connect(Socket().version("1.0").host("localhost").port(9999)).with_format(Format().type("csv").field_delimiter("\n") # 按行分割.field("line", DataTypes.STRING())).with_schema(Schema().field("line", DataTypes.STRING())).create_temporary_table("socket_source")# 3. 注册结果表(打印到控制台)t_env.execute_sql("""CREATE TEMPORARY TABLE result_sink (word STRING,count BIGINT) WITH ('connector' = 'print')""")# 4. 执行WordCount逻辑t_env.execute_sql("""INSERT INTO result_sinkSELECT word, COUNT(1) as countFROM (SELECT explode(split(line, ' ')) as wordFROM socket_source)GROUP BY word""")if __name__ == '__main__':word_count()
三、启动必要服务
-
启动 Flink 集群
$FLINK_HOME/bin/start-cluster.sh
访问 http://localhost:8081 可查看 Flink Web UI
-
启动 Socket 服务器
打开新终端,使用 netcat 启动 Socket 服务:nc -lk 9999
(-l:监听模式,-k:保持连接,9999:端口号)
四、提交 Flink Python 作业
使用 Flink 的 Python 提交脚本提交作业:
$FLINK_HOME/bin/flink run -py wordcount_socket.py
提交成功后,会显示作业 ID,同时在 Flink Web UI 上能看到运行中的作业。
五、测试作业
-
在启动 netcat 的终端中输入文本,例如:
hello flink hello python flink python hello world
-
查看 Flink 作业输出
- 方式 1:在 Flink Web UI 中,进入对应作业的 Task Managers 页面,查看 Stdout
- 方式 2:查看 Flink 的日志文件:
$FLINK_HOME/log/flink-*-taskexecutor-*.out
输出结果会类似:
+I[hello, 1] +I[flink, 1] +I[hello, 2] +I[python, 1] +I[flink, 2] +I[python, 2] +I[hello, 3] +I[world, 1]
六、停止服务
- 停止作业:在 Flink Web UI 中点击作业,选择 "Cancel"
- 停止 Socket 服务器:按
Ctrl+C
- 停止 Flink 集群:
$FLINK_HOME/bin/stop-cluster.sh
关键说明
- Socket 数据源特性:Socket 是流式数据源,会持续监听指定端口接收数据
- 动态结果:WordCount 会对输入的单词进行持续计数,相同单词会更新计数
- 并行度设置:示例中设置并行度为 1,实际生产可根据集群资源调整
- 异常处理:如果 Socket 服务中断,作业会失败,需要重启 Socket 服务和作业
通过以上步骤,你可以完整体验 Flink Python 作业从编写、提交到运行的整个流程。