当前位置: 首页 > news >正文

Flink03-学习-套接字分词流自动写入工具

上一节中通过如下命令启动服务摸来模拟Socket流。请添加图片描述
现在我们写一个ServerSocket来模拟让流自动写入不用手动操作。

pom.xml和上一节一致不需要修改

编写代码

同样适用Socket流

 // 使用socket流创建一个从 socket 读取文本的数据流,以换行符 \n 作为分隔符DataStreamSource<String> stringDataStreamSource = executionEnvironment.socketTextStream(ip, port, "\n");

FlinkServer
继承Thread启动线程

package org.example.snow.demo3;import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;/*** @author snowsong*/
public class FlinkServer extends Thread{@Overridepublic void run() {String ip = "0.0.0.0";int port = 8886;StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();// 使用socket流创建一个从 socket 读取文本的数据流,以换行符 \n 作为分隔符DataStreamSource<String> stringDataStreamSource = executionEnvironment.socketTextStream(ip, port, "\n");SingleOutputStreamOperator<Tuple2<String, Long>> tuple2SingleOutputStreamOperator = stringDataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {@Overridepublic void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {String[] splits = s.split("\\s");for (String word : splits) {collector.collect(Tuple2.of(word, 1L));}}});SingleOutputStreamOperator<Tuple2<String, Long>> word = tuple2SingleOutputStreamOperator.keyBy(new KeySelector<Tuple2<String, Long>, Object>() {@Overridepublic Object getKey(Tuple2<String, Long> stringLongTuple2) throws Exception {return stringLongTuple2.f0;}}).window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1))).sum(1);word.print();try {executionEnvironment.execute("stream!");} catch (Exception e) {throw new RuntimeException(e);}}}

NumRandom
使用ServerSocket实现一个持续的流输出

package org.example.snow.demo3;import java.io.OutputStream;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.Random;/*** @author snowsong*/
public class RandomNumClient extends Thread {@Overridepublic void run() {// 随机生成数字String ip = "0.0.0.0";int port = 8886;try {ServerSocket serverSocket = new ServerSocket();InetSocketAddress address = new InetSocketAddress(ip, port);// 灵活绑定服务器地址serverSocket.bind(address);// 监听并接收客户端的连接请求,有阻塞特性,当调用该方法的时候,线程会暂停执行,直到有客户端连接上来Socket accept = serverSocket.accept();// 获取输入流,读取客户端发送的数据OutputStream outputStream = accept.getOutputStream();// 包装成打印流,方便写入数据 true 自动刷新缓冲区PrintWriter printWriter = new PrintWriter(outputStream, true);Random random = new Random();// 遍历for (int i = 0; i < 10; i++) {// 生成随机数int num = random.nextInt(10) + 1;printWriter.println("随机数:" + num);System.out.println("send to flink:" + num);Thread.sleep(100);}} catch (Exception e) {throw new RuntimeException(e);}super.run();}
}

启动类

package org.example.snow.demo3;/*** @author snowsong*/
public class StartApp {public static void main(String[] args) throws Exception {RandomNumClient randomNumClient = new RandomNumClient();FlinkServer flinkServer = new FlinkServer();flinkServer.start();randomNumClient.start();}
}

运行结果

请添加图片描述

http://www.xdnf.cn/news/776809.html

相关文章:

  • nginx+tomcat负载均衡群集
  • 设计模式-原型模式
  • 接口重试的7种常用方案!
  • 基于Python学习《Head First设计模式》第四章 工厂方法+抽象工厂
  • 【Zephyr 系列 4】串口通信进阶:打造自己的 AT 命令框架
  • 在树莓派3B上用Python编程完成流水灯实验
  • RAG理论基础总结
  • ps曲线调整
  • JavaSE:面向对象进阶之内部类(Inner Class)
  • 使用autoGen处理多agent
  • ps黑白调整
  • 碳中和新路径:铁电液晶屏如何破解高性能与节能矛盾?
  • 无线错误排查、排错命令
  • 【电赛培训课程】测量与信号类赛题知识点讲解与赛题解析
  • 2 Studying《Effective STL》
  • Day43 Python打卡训练营
  • 人工智能在智能制造业中的创新应用与未来趋势
  • 电磁场与电磁波公式汇总
  • PH热榜 | 2025-06-02
  • AIGC 基础篇 高等数学篇 01函数与极限
  • Arch安装botw-save-state
  • P1803 凌乱的yyy / 线段覆盖
  • Python中sys模块详解
  • 云服务器突发宕机或无响应怎么办
  • 【PCB设计】STM32开发板——电源设计
  • Java注释详解:单行、多行与文档注释的区别与应用
  • c++泛型编程入门与STL介绍
  • ps色阶调整
  • 样本量计算:两独立样本定量资料——平均值差的置信区间法
  • Dify 部署问题处理