当前位置: 首页 > news >正文

Spark–steaming

实验项目:

找出所有有效数据,要求电话号码为11位,但只要列中没有空值就算有效数据。 按地址分类,输出条数最多的前20个地址及其数据。

代码讲解: 导包和声明对象,设置Spark配置对象和SparkContext对象。 使用Spark SQL语言进行数据处理,包括创建数据库、数据表,导入数据文件,进行数据转换。 筛选有效数据并存储到新表中。 按地址分组并统计出现次数,排序并输出前20个地址。 代码如下 import org.apache.spark.SparkConf import org.apache.spark.sql.SparkSession object Demo { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Demo") val spark = SparkSession.builder().enableHiveSupport() .config("spark.sql.warehouse.dir", "hdfs://node01:9000/user/hive/warehouse").config(sparkConf).getOrCreate() spark.sql(sqlText = "create database spark_sql_2") spark.sql(sqlText = "use spark_sql_2") //创建存放原始数据的表 spark.sql( """ |create table user_login_info(data string |row format delimited |""".stripMargin) spark.sql(sqlText = "load data local inpath 'Spark-SQL/input/user_login_info.json' into table user_login_info") //利用get_json_object将数据做转换 spark.sql( """ |create table user_login_info_1 |as |select get_json_object(data,'$.uid') as uid, |get_json_object(data,'$.phone') as phone, |get_json_object(data,'$.addr') as addr from user_login_info |""".stripMargin) spark.sql(sqlText = "select count(*) count from user_login_info_1").show() //获取有效数据 spark.sql( """ |create table user_login_info_2 |as |select * from user_login_info_1 |where uid != ' ' and phone != ' ' and addr != ' ' |""".stripMargin) spark.sql(sqlText = "select count(*) count from user_login_info_2").show() //获取前20个地址 spark.sql( """ |create table hot_addr |as |select addr,count(addr) count from user_login_info_2 |group by addr order by count desc limit 20 |""".stripMargin) spark.sql(sqlText = "select * from hot_addr").show() spark.stop() } }

 

Spark Streaming介绍 Spark Streaming概述: 用于流式计算,处理实时数据流。 支持多种数据输入源(如Kafka、Flume、Twitter、TCP套接字等)和输出存储位置(如HDFS、数据库等)。

Spark Streaming特点: 易用性:支持Java、Python、Scala等编程语言,编写实时计算程序如同编写批处理程序。 容错性:无需额外代码和配置即可恢复丢失的数据,确保实时计算的可靠性。 整合性:可以在Spark上运行,允许重复使用相关代码进行批处理,实现交互式查询操作。

Spark Streaming架构: 驱动程序(StreamingContext)处理数据并传给SparkContext。 工作节点接收和处理数据,执行任务并备份数据到其他节点。 背压机制协调数据接收能力和资源处理能力,避免数据堆积和资源浪费。 Spark Streaming实操 词频统计案例: 使用ipad工具向999端口发送数据,Spark Streaming读取端口数据并统计单词出现次数。 代码配置包括设置关键对象、接收TCP套接字数据、扁平化处理、累加相同键值对、分组统计词频。 启动和运行: 启动netpad发送数据,Spark Streaming每隔三秒收集和处理数据。 代码中没有显式关闭状态,流式计算默认持续运行,确保数据处理不间断。 DStream创建 DStream创建方式: RDD队列:通过SSC创建RDD队列,将RDD推送到队列中作为DStream处理。 自定义数据源:下节课详细讲解。

RDD队列案例: 循环创建多个RDD并推送到队列中,使用Spark Streaming处理RDD队列进行词频统计。 代码包括配置对象、创建可变队列、转换RDD为DStream、累加和分组统计词频。 代码如下 import org.apache.spark.SparkConf object WordCount { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("streaming") val ssc = new StreamingContext(sparkConf,Seconds(3)) val lineStreams = ssc.socketTextStream("node01",9999) val wordStreams = lineStreams.flatMap(_.split(" ")) val wordAndOneStreams = wordStreams.map((_,1)) val wordAndCountStreams = wordAndOneStreams.reduceByKey(_+_) wordAndCountStreams.print() ssc.start() ssc.awaitTermination() } }

 

结果展示: 展示了词频统计的结果,验证了Spark Streaming的正确性和有效性。 自定义数据源的实现 需要导入新的函数并继承现有的函数。 创建数据源时需选择class而不是object。 在class中定义on start和on stop方法,并在这些方法中实现具体的功能。 类的定义和初始化 类的定义包括数据类型的设定,如端口号和TCP名称。 使用extends关键字继承父类的方法。 数据存储类型设定为内存中保存。 数据接收和处理 在on start方法中创建新线程并调用接收数据的方法。 连接到指定的主机和端口号,创建输入流并转换为字符流。 逐行读取数据并写入到spark stream中,进行词频统计。 数据扁平化和词频统计 使用block map进行数据扁平化处理。 将原始数据转换为键值对形式,并根据相同键进行分组和累加。 输出词频统计结果。 程序终止条件 设定手动终止和程序异常时的终止条件。 在满足终止条件时输出结果并终止程序。

http://www.xdnf.cn/news/91621.html

相关文章:

  • 【LLM+Code】Claude Code Agent 0.2.9 版本最细致解读
  • Cursor Free VIP 重置进程错误,轻松恢复使用!
  • Element Plus消息通知体系深度解析:从基础到企业级实践
  • SwiftInfer —— 大模型无限流式输入推理打破多轮对话长度限制
  • 序列决策问题(Sequential Decision-Making Problem)
  • 测试开发 - Java 自动化测试核心函数详解
  • 【云馨AI-大模型】Dify 1.2.0:极速集成 SearXNG,畅享智能联网搜索新境界,一键脚本轻松部署SearXNG
  • LeetCode算法题(Go语言实现)_55
  • 麒麟系统使用-系统设置
  • 详解BUG(又名:BUG的生命周期)
  • 从0到1构建企业级消息系统服务体系(终):当消息系统学会「读心术」揭秘情感计算如何让触达转化率飙升 200%
  • Unity 导出Excel表格
  • 可变参数模板 和 折叠表达式 (C++)
  • 人工智能-模型评价与优化(过拟合与欠拟合,数据分离与混淆矩阵,模型优化,实战)
  • 《AI大模型应知应会100篇》第32篇:大模型与医疗健康:辅助诊断的可能性与风险
  • RAG进阶:Embedding Models嵌入式模型原理和选择
  • 【网络应用程序设计】实验一:本地机上的聊天室
  • 1.HTTP协议与RESTful设计
  • char32_t、char16_t、wchar_t 用于 c++ 语言里存储 unicode 编码的字符,给出它们的具体定义
  • 【武汉理工大学第四届ACM校赛】copy
  • 凡清亮相第十五届北京国际电影节电影嘉年华,用音乐致敬青春与梦想
  • 调和平均数通俗易懂的解释以及为什么这样定义,有什么用
  • 《 C++ 点滴漫谈: 三十四 》从重复到泛型,C++ 函数模板的诞生之路
  • 客户对质量不满意,如何快速响应?
  • ycsb性能测试的优缺点
  • GRS认证有什么要求?GRS认证要审核多久,GRS认证流程
  • 旅游行业路线预定定制旅游小程序开发
  • vivado XMP使用
  • 2023蓝帽杯初赛内存取证-1
  • ROS2 安装详细教程,Ubuntu 22.04.5 LTS 64 位 操作系统