当前位置: 首页 > news >正文

Flink02-学习-套接字分词

flatmap()

AMapFunction仅适用于执行一对一转换的情况:对于每个进入的流元素,map()都会发出一个转换后的元素。否则,您需要使用 flatmap()

DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));DataStream<EnrichedRide> enrichedNYCRides = rides.flatMap(new NYCEnrichment());enrichedNYCRides.print();

连同FlatMapFunction:

DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(...));DataStream<EnrichedRide> enrichedNYCRides = rides.flatMap(new NYCEnrichment());enrichedNYCRides.print();

通过Collector此接口提供的功能,该flatmap()方法可以发出任意数量的流元素,包括不发出任何元素。

实践

Flink 的 DataStream API 允许你流式传输任何可以序列化的数据。Flink 自己的序列化器用于

基本类型,即 String、Long、Integer、Boolean、Array
复合类型:Tuples、POJO
对于其他类型,Flink 会回退到 Kryo。Flink 也可以使用其他序列化器。

pom内容如上个内容,此处不再赘述
定义本机变量
连接的 IP 地址为 0.0.0.0(监听所有网络接口)

// 本机String ip = "0.0.0.0";//开启的端口号int port = 8886;

获取flink环境

StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

使用套接字-socket流

  DataStreamSource<String> stringDataStreamSource = executionEnvironment.socketTextStream(ip, port, "\n");

FlatMap-分词
数据转换 - 分词和计数。
使用 flatMap 操作对每行文本进行分词处理;
将每行文本按空白字符分割成单词数组;
为每个单词生成一个 (单词, 1) 的元组(Tuple2);
结果是一个包含 (单词, 计数) 对的流

SingleOutputStreamOperator<Tuple2<String, Long>> tuple2SingleOutputStreamOperator = stringDataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {String[] splits = s.split("\\s");for (String word : splits) {collector.collect(Tuple2.of(word, 1L));}}});

分组和窗口计算
keyBy: 按照元组的第一个字段(单词)进行分组;
window: 定义滑动窗口:窗口大小:5秒,滑动间隔:1秒;
sum(1): 对每个窗口内相同单词的计数(元组的第二个字段)求和;

SingleOutputStreamOperator<Tuple2<String, Long>> word = tuple2SingleOutputStreamOperator.keyBy(new KeySelector<Tuple2<String, Long>, Object>() {@Overridepublic Object getKey(Tuple2<String, Long> stringLongTuple2) throws Exception {return stringLongTuple2.f0;}}).window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1))).sum(1);

完整代码

package org.example.snow.demo2;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;public class startDemo {public static void main(String[] args) throws Exception {String ip = "0.0.0.0";int port = 8886;StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();// 使用socket流创建一个从 socket 读取文本的数据流,以换行符 \n 作为分隔符DataStreamSource<String> stringDataStreamSource = executionEnvironment.socketTextStream(ip, port, "\n");SingleOutputStreamOperator<Tuple2<String, Long>> tuple2SingleOutputStreamOperator = stringDataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {String[] splits = s.split("\\s");for (String word : splits) {collector.collect(Tuple2.of(word, 1L));}}});SingleOutputStreamOperator<Tuple2<String, Long>> word = tuple2SingleOutputStreamOperator.keyBy(new KeySelector<Tuple2<String, Long>, Object>() {@Overridepublic Object getKey(Tuple2<String, Long> stringLongTuple2) throws Exception {return stringLongTuple2.f0;}}).window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1))).sum(1);word.print();executionEnvironment.execute("stream!");}
}

启动服务

nc -lk 8886

运行效果
在这里插入图片描述
请添加图片描述

http://www.xdnf.cn/news/169417.html

相关文章:

  • Web前端开发:CSS Float(浮动)与 Positioning(定位)
  • 数据结构——二叉树和堆(万字,最详细)
  • 【AI论文】RefVNLI:迈向可扩展的主题驱动文本到图像生成评估
  • SLAM技术:从原理到应用的全面解析
  • 计算机网络 | 应用层(6) -- 套接字编程
  • Java自定义注解详解
  • 「Mac畅玩AIGC与多模态01」架构篇01 - 展示层到硬件层的架构总览
  • 深度学习常见框架:TensorFlow 与 PyTorch 简介与对比
  • 在 Ubuntu 24.04 系统上安装和管理 Nginx
  • body Param Query 三个 不同的入参 分别是什么意思 在前端 要怎么传 这三种不同的参数
  • DAY7-C++进阶学习
  • Python爬虫课程实验指导书
  • 麒麟系统搭建离线NTP服务器,让局域网内windows系统同步其时间,并付排错避坑思路
  • Android Studio中创建第一个Flutter项目
  • 前端性能优化面试回答技巧
  • django admin 去掉新增 删除
  • 【愚公系列】《Manus极简入门》005-DeepSeek与Manus的创新之处
  • PostSwigger Web 安全学习:CSRF漏洞3
  • C# 利用log4net 工作台打印和保存到文件
  • 央视两次采访报道爱藏评级,聚焦生肖钞市场升温,评级币成交易安全“定心丸”
  • C# 类的基本概念(类成员)
  • 16bit 高精度低延时霍尔角度编码器,KTH7824,替代MA730
  • 高自由度与多功能指尖设计:Allegro灵巧手V5(4F Plus)的技术亮点
  • 电商数据采集电商,行业数据分析,平台数据获取|稳定的API接口数据
  • 榕壹云国际版短剧系统:基于Spring Boot+MySQL+UniApp的全球短剧创作平台
  • Scala 函数柯里化及闭包
  • 用 Nodemon 解决 npm run serve 频繁重启服务
  • 2个小时1.5w字| React Golang 全栈微服务实战
  • 双目RealSense系统配置rs_camera.launch----实现D435i自制rosbag数据集到离线场景的slam建图
  • BP 算法探秘 :神经网络的幕后引擎