当前位置: 首页 > news >正文

Spark-streaming(一)

Spark-Streaming概述

Spark Streaming 用于流式数据的处理。

和 Spark 基于 RDD 的概念很相似,Spark Streaming 使用离散化流(discretized stream)作为抽象表示,叫作 DStream。

DStream 是随时间推移而收到的数据的序列。

Spark-Streaming的特点:易用、容错、易整合到spark体系。

Spark-Streaming架构

DStream实操

案例:词频统计

idea中运行

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}object wordcount {def main(args: Array[String]): Unit = {// 创建 SparkConf 对象,设置运行模式为本地多线程,应用名为 streamingval sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming")// 创建 StreamingContext 对象,设置批处理间隔为 3 秒val ssc = new StreamingContext(sparkConf, Seconds(3))// 从指定的主机和端口接收文本流数据val lineStreams = ssc.socketTextStream("node01", 9999)// 将每行文本拆分为单词val wordStreams = lineStreams.flatMap(_.split(" "))// 为每个单词映射为 (单词, 1) 的键值对val wordAndOneStreams = wordStreams.map((_, 1))// 按单词进行分组并对每个单词的计数进行累加val wordAndCountStreams = wordAndOneStreams.reduceByKey(_ + _)// 打印每个批次中每个单词的计数结果wordAndCountStreams.print()// 启动流式计算ssc.start()// 等待计算终止ssc.awaitTermination()}
}

在虚拟机中输入: nc -lk 9999   并输入数据

结果:

解析:

对数据的操作也是按照 RDD 为单位来进行的

计算过程由 Spark Engine 来完成

DStream 创建

RDD队列

案例:

循环创建几个 RDD,将 RDD 放入队列。通过 SparkStream 创建 Dstream,计算 WordCount

代码

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.rdd.RDD
import scala.collection.mutableobject RDD {def main(args: Array[String]): Unit = {// 创建 SparkConf 对象,设置运行模式为本地多线程,应用名为 RDDStreamval sparkConf = new SparkConf().setMaster("local[*]").setAppName("RDDStream")// 创建 StreamingContext 对象,设置批处理间隔为 4 秒val ssc = new StreamingContext(sparkConf, Seconds(4))// 创建一个可变队列,用于存储 RDDval rddQueue = new mutable.Queue[RDD[Int]]()// 从队列中创建输入流,oneAtATime 为 false 表示可以同时处理多个 RDDval inputStream = ssc.queueStream(rddQueue, oneAtATime = false)// 将输入流中的每个元素映射为 (元素, 1) 的键值对val mappedStream = inputStream.map((_, 1))// 按键对键值对进行聚合,统计每个键的出现次数val reducedStream = mappedStream.reduceByKey(_ + _)// 打印每个批次中每个键的计数结果reducedStream.print()// 启动流式计算ssc.start()// 循环 5 次,每次向队列中添加一个 RDD,并休眠 2 秒for (i <- 1 to 5) {rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)Thread.sleep(2000)}// 等待计算终止ssc.awaitTermination()}
}

运行结果:

自定义数据源

自定义数据源

import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsetsimport org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.receiver.Receiverimport scala.util.control.NonFatalclass CustomerReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {override def onStart(): Unit = {new Thread("Socket Receiver") {override def run(): Unit = {receive()}}.start()}def receive(): Unit = {var socket: Socket = nullvar reader: BufferedReader = nulltry {socket = new Socket(host, port)reader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))var input: String = reader.readLine()while (!isStopped() && input != null) {store(input)input = reader.readLine()}} catch {case NonFatal(e) =>restart("Error receiving data", e)} finally {if (reader != null) {try {reader.close()} catch {case NonFatal(e) =>println(s"Error closing reader: ${e.getMessage}")}}if (socket != null) {try {socket.close()} catch {case NonFatal(e) =>println(s"Error closing socket: ${e.getMessage}")}}}restart("Restarting receiver")}override def onStop(): Unit = {}
}    

使用自定义的数据源采集数据

object sparkConf {def main(args: Array[String]): Unit = {try {// 创建 SparkConf 对象,设置运行模式为本地多线程,应用名为 streamval sparkConf = new SparkConf().setMaster("local[*]").setAppName("stream")// 创建 StreamingContext 对象,设置批处理间隔为 5 秒val ssc = new StreamingContext(sparkConf, Seconds(5))// 使用自定义 Receiver 创建输入流val lineStream = ssc.receiverStream(new CustomerReceiver("node01", 9999))// 将每行文本拆分为单词val wordStream = lineStream.flatMap(_.split(" "))// 为每个单词映射为 (单词, 1) 的键值对val wordAndOneStream = wordStream.map((_, 1))// 按单词进行分组并对每个单词的计数进行累加val wordAndCountStream = wordAndOneStream.reduceByKey(_ + _)// 打印每个批次中每个单词的计数结果wordAndCountStream.print()// 启动流式计算ssc.start()// 等待计算终止ssc.awaitTermination()
}}}

http://www.xdnf.cn/news/86473.html

相关文章:

  • 第 1.4 节: G1 人形机器人足球项目定义与课程路线
  • LSTM如何解决梯度消失问题
  • uv包管理器如何安装依赖?
  • 火语言RPA--Ftp删除目录
  • 衡石ChatBI:依托开放架构构建技术驱动的差异化数据服务
  • 现有一整型数组,a[8] = { 4,8,7,0,3,5,9,1},现使用堆排序的方式原地对该数组进行升序排列。那么在进行第一轮排序结束之后,数组的顺序为?
  • 示例:spring xml+注解混合配置
  • FastAPI WebSocket 聊天应用详细教程
  • 搭建 Spark - Local 模式:开启数据处理之旅
  • 掌握 Altium Designer:轻松定制“交换器件”工具栏
  • 智能电网第1期 | 工业交换机在变电站自动化系统中的作用
  • Python 获取淘宝买家订单列表(buyer_order_list)接口的详细指南
  • [创业之路-377]:企业法务 - 有限责任公司与股份有限公司的优缺点对比
  • 如何在 Element UI 中优雅地使用 `this.$loading` 显示和隐藏加载动画
  • PyQt5、NumPy、Pandas 及 ModelArts 综合笔记
  • # 基于PyTorch的食品图像分类系统:从训练到部署全流程指南
  • 第 2.1 节: 机器人仿真环境选择与配置 (Gazebo, MuJoCo, PyBullet)
  • 【Dv3Admin】从零搭建Git项目安装·配置·初始化
  • iPaaS集成平台相比传统集成技术有哪些优势?
  • ECharts中的markPoint使用,最大值,最小值,展示label数值
  • JavaScript 渲染内容爬取实践:Puppeteer 进阶技巧
  • Qt之moveToThread
  • Spark-Streaming简介 核心编程
  • 【MySQL】索引失效场景大全
  • C++:继承
  • window上 elasticsearch v9.0 与 jmeter5.6.3版本 冲突,造成es 启动失败
  • 使用Autocannon.js进行HTTP压测
  • Vue3 + Vite + TS,使用 ExcelJS导出excel文档,生成水印,添加背景水印,dom转图片,插入图片,全部代码
  • 建造者模式详解及其在自动驾驶场景的应用举例(以C++代码实现)
  • 数据库对象与权限管理-Oracle数据字典详解