当前位置: 首页 > news >正文

Spark实时流数据处理实例(SparkStreaming通话记录消息处理)

所用资源:

通过网盘分享的文件:spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar等4个文件
链接: https://pan.baidu.com/s/1zYHu29tLgDvS_L2Ud-22ZA?pwd=hnpg 提取码: hnpg

1.需求分析 :

假定有一个手机通信计费系统,用户通话时在基站交换机上临时保存了相关记录,由于交换机的容量 有限且分散在各地,因此需要及时将这些通话记录汇总到计费系统中进行长时间保存,以方便后续的 统计分析。

2.准备工作:

(1)确保Kafka服务已经启动,可在Linux终端窗体中使用jps命令查看具体的进程

spark@vm01:/usr/local/kafka$ bin/zookeeper-server-start.sh config/zookeeper.properties &

[1] 2770

spark@vm01:/usr/local/kafka$ bin/kafka-server-start.sh config/server.properties &

[2] 3128

spark@vm01:/usr/local/kafka$ jps

2770 QuorumPeerMain

3128 Kafka

2104 Main

3529 Jps

(2)创建从130到139的十个主题,为简单起见,通过kafka附带的脚本命令来完成

spark@vm01:/usr/local/kafka$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic 130

Created topic 130.

查看:

spark@vm01:/usr/local/kafka$ bin/kafka-topics.sh --list --zookeeper localhost:2181

130

(3)启动HDFS服务,使用jps查看是否有相关进程在运行

spark@vm01:/usr/local/kafka$ start-dfs.sh

Starting namenodes on [localhost]

localhost: starting namenode, logging to /usr/local/hadoop-2.6.5/logs/hadoop-spark-namenode-vm01.out

localhost: starting datanode, logging to /usr/local/hadoop-2.6.5/logs/hadoop-spark-datanode-vm01.out

Starting secondary namenodes [0.0.0.0]

0.0.0.0: starting secondarynamenode, logging to /usr/local/hadoop-2.6.5/logs/hadoop-spark-secondarynamenode-vm01.out

spark@vm01:/usr/local/kafka$ ^C

spark@vm01:/usr/local/kafka$ jps

4081 SecondaryNameNode

2770 QuorumPeerMain

4181 Jps

3128 Kafka

2104 Main

3710 NameNode

3887 DataNode

(4)在HDFS根目录中创建datas目录及日期的子目录,根据自己当前运行的程序时间进行创建即可

spark@vm01:/usr/local/kafka$ cd

spark@vm01:~$ hdfs dfs -mkdir -p /datas/202505

spark@vm01:~$ hdfs dfs -mkdir -p /datas/202506

(5)在python3.6环境中安装一个kafka-python库,以便程序能够正常访问kafka,后面需要填写一个专门的python程序来模拟基站交换机随机产生通话记录

spark@vm01:~$ sudo pip install kafka-python

[sudo] spark 的密码:

(6)启动PyCharm集成开发环境,在其中创建一个名为SparkKafkaBilling的项目,对应的Python解释器使用python3.6即可

点击file,newproject

文件名(位置):/home/spark/PycharmProjects/SparkKafkaBilling

编译:python3.6

点击创建

3.通话记录生产者模拟:

(1)在新建的项目SparkKafkaBilling中创建CallMsgProducer.py文件,然后输入代码,负责按照要求的记录格式模拟产生通话消息,并将其发送到Kafka的topic中。

from kafka import KafkaProducer

import random, datetime, time

# 产生一个以13开头的手机号字符串,共11位

def gen_phone_num():

    phone = '13'

    for x in range(9):

        phone = phone + str(random.randint(0, 9))

    return phone

(2)为了持续不断地生成新的通话记录信息,可以使用一个循环创建符合格式要求的通话记录信息字符串,且每产生一条消息后休眠随机的时长,然后继续生成下一条通话记录

# Kafka的消息生产者对象准备

producer = KafkaProducer(bootstrap_servers="localhost:9092")

working = True

tformat = '%Y-%m-%d %H:%M:%S'     #设置时间日志格式

while working:

    # 主叫号码,被叫号码,呼叫时间(模拟当前时间的前一天),接通时间,挂断时间

    src_phone = gen_phone_num()

    dst_phone = gen_phone_num()

    dail_time = datetime.datetime.now() + datetime.timedelta(days=-1)

    call_time = dail_time + datetime.timedelta(seconds=random.randint(0, 10))

    hangup_time = call_time + datetime.timedelta(seconds=random.randint(5, 600))

    # 将时间格式化为所需的字符串格式,类似2025-05-27 09:30:25

    s_dail_time = dail_time.strftime(tformat)

    s_call_time = call_time.strftime(tformat)

    s_hangup_time = hangup_time.strftime(tformat)

    # 生成通话记录消息字符串

    record = '%s,%s,%s,%s,%s' % (src_phone, dst_phone, s_dail_time, s_call_time, s_hangup_time)

    print('send : ', record)

    # 通话记录的主叫号码前三位为topic主题

    topic = src_phone[0:3]

    # 将通话记录字符串转换为字节数组

    msg = bytes(record, encoding='utf-8')

    # 调用send()将通话记录消息发送给Kafka

    producer.send(topic=topic, key=b"call",value=msg)

    # 休眠一个随机的时长,为一个0-1秒之间的随机小数

    time.sleep( random.random() )

producer.close()

4.消息接收者测试:

(1)在SparkKafkaBilling项目中创建CallMsgBilling.py文件,将Kafka中130~139这10个topic(主题)的消息接收并在屏幕上打印显示出来

如果报错 先执行(2),再重新运行

from pyspark.streaming.kafka import KafkaUtils

from pyspark import SparkContext

from pyspark.streaming import StreamingContext

sc = SparkContext('local[2]','KafkaStreamBilling')

sc.setLogLevel("OFF")

ssc = StreamingContext(sc, 5)

streamRdd = KafkaUtils.createDirectStream(ssc,

               topics = ['130','131','132','133','134',

                         '135','136','137','138','139'],

               kafkaParams = {"metadata.broker.list":"localhost:9092"} )

streamRdd.pprint()

ssc.start()

ssc.awaitTermination()

(2)打开一个Linux终端窗体,在其中输入下面的命令,将消息接收者程序提交到Spark中运行,其中用到的spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar依赖库文件此前已下载放在~/streaming目录中,为避免每次提交应用程序时在命令行手动指定,可以将其复制到集群的各节点Spark安装目录中(位于/usr/local/spark/jars目录)

spark@vm01:~$ ls streaming

 FileStreamDemo.py

'IDLE (Python 3.7 64-bit).lnk'

 KafkaStreamDemo.py

 NetworkWordCountall.py

 NetworkWordCount.py

 NetworkWordCount.py.txt

 NetworkWordCountSave.py

 NetworkWordCountWindow.py

 spark-streaming-kafka-0-8-assembly_2.11-2.4.8.jar

spark@vm01:~$ cp streaming/*.jar /usr/local/spark/jars

spark@vm01:~$ cd PycharmProjects/SparkKafkaBilling

spark@vm01:~/PycharmProjects/SparkKafkaBilling$ spark-submit CallMsgBilling.py

(3)回到PyCharm集成开发环境中,运行CallMsgProducer.py程序,在底部会源源不断地显示模拟产生的通话记录消息

(4)再切换到运行消息接收者程序的Linux终端窗体,发现其不断地接收发送过来的消息

从输出结果可以清楚地看到,接收的Kafka消息是一系列(K,V)键值对形式的二元组,其中的K代表

CallMsgProducer.py程序中设定的"call"字符串,V代表消息内容。键(K)可以设置成任意字符串,当然

也可以不设置,实际使用的是二元组里面的值(V),即消息内容

5.Spark Streaming通话记录消息处理:将生成的通话记录消息进行简单的处理并保存在HDFS中

(1)在项目的main.py文件中将原有代码删除,并添加下面的代码

from pyspark.streaming.kafka import KafkaUtils

from pyspark import SparkContext

from pyspark.streaming import StreamingContext

from pyspark.sql import SparkSession

from datetime import datetime

# 初始化sc、ssc、spark等几个核心变量

spark = SparkSession.builder \

    .master('local[2]') \

    .appName('KafkaStreamingBilling') \

    .getOrCreate()

sc = spark.sparkContext

sc.setLogLevel("OFF")  #关闭日志

ssc = StreamingContext(sc, 5)

spark = SparkSession(sc)

(2)定义process()和saveYmCallData()函数

# 定义一个处理消息的函数,返回一个通话记录的元组

# (主叫号码,呼叫时间,接通时间,挂断时间,通话时长,被叫号码,年月)

def process(x):

    v = x[1].split(',')

    tformat = '%Y-%m-%d %H:%M:%S'

    d1 = datetime.strptime(v[3], tformat)

    d2 = datetime.strptime(v[4], tformat)

    ym = '%d%02d' % (d1.year, d1.month)

    sec = (d2-d1).seconds

    minutes = sec//60 if sec%60==0 else sec//60+1

    return (v[0],v[2],v[3],v[4],str(minutes),v[1],ym)

# 根据参数row中的年月信息,获取相应的通话消息记录,并保存到HDFS

def saveYmCallData(row):

    year_month = row.ym

    path = "hdfs://localhost:9000/datas/" + year_month + "/"

    ymdf = spark.sql("select * from phonecall where ym='" + year_month +"'")

    ymdf.drop('ym').write.save(path, format="csv", mode="append")

(3)再定义一个save()函数,以实现DStream的通话记录消息保存

# 保存DStream的消息记录

def save(rdd):

    if not rdd.isEmpty():

        rdd2 = rdd.map(lambda x: process(x))

        print(rdd2.count())

        df0 = rdd2.toDF(['src_phone', 'dail_time', 'call_time', 'hangup_time',

                         'call_minutes', 'dst_phone', 'ym'])

        df0.createOrReplaceTempView("phonecall")

        df1 = spark.sql('select distinct ym from phonecall')

        if df1.count() == 1:

            print('ooooooooooo')

            year_month = df1.first().ym

            path = "hdfs://localhost:9000/datas/" + year_month + "/"

            df0.drop("ym").write.save(path, format="csv", mode="append")

        else:

            df1.foreach(saveYmCallData)

(4)通过Kafka数据源创建一个DSteam对象,并开始Spark Streaming应用程序的循环

# 从Kafka的多个topic中接收消息

streamRdd = KafkaUtils.createDirectStream(ssc,

               topics = ['130','131','132','133','134',

                         '135','136','137','138','139'],

               kafkaParams = {"metadata.broker.list":"localhost:9092"})

streamRdd.pprint()

streamRdd.foreachRDD(save)

ssc.start()

ssc.awaitTermination()

(5)功能代码编写完毕,现在可以切换到Linux终端窗体,启动main.py程序

spark@vm01:~/PycharmProjects/SparkKafkaBilling$ spark-submit main.py

(6)再打开一个新的Linux终端窗体,启动消息生产者程序CallMsgProducer.py

cd ~/PycharmProjects/SparkKafkaBilling

spark@vm01:~/PycharmProjects/SparkKafkaBilling$ python CallMsgProducer.py

然后可以查看main.py程序所在终端窗体显示的通话记录消息

(7)最后,在HDFS上可以验证收到的通话记录消息是否被成功保存,注意应将下面目录路径中的年月改为实际的时间,这是因为数据是按照当前机器时间在运行的

spark@vm01:~$ hdfs dfs -cat /datas/202505/part-*

至此,我们就完成了整个通话记录处理功能的实现

http://www.xdnf.cn/news/670699.html

相关文章:

  • 【md2html python 将 Markdown 文本转换为 HTML】
  • HTML Day02
  • pythonday30
  • Spark SQL进阶:解锁大数据处理的新姿势
  • AG32 DMAC实现内部MCU与FPGA通信【知识库】
  • 运维自动化工具 ansible 知识点总结
  • 域控账号密码抓取
  • C++数据结构 : 哈希表的实现
  • 2025上半年软考高级系统架构设计师经验分享
  • 第十一节:第一部分:正则表达式:应用案例、爬取信息、搜索替换
  • 牙科低对比度模体,衡量牙科影像设备的性能和诊断能力的工具
  • 8种使用克劳德4的方法,目前可用随时更新!
  • 人工智能与机器学习从理论、技术与实践的多维对比
  • 打造AI智能旅行规划器:基于LLM和Crew AI的Agent实践
  • Flash Attention:让Transformer飞起来的硬件优化技术
  • 宝塔安装easyswoole框架
  • Cherry Studio连接配置MCP服务器
  • wsl图形界面显示
  • 探讨Facebook的元宇宙愿景下的虚拟现实技术
  • 【2025最新】Cline自定义API配置完全指南:接入Claude 3.7/GPT-4o
  • 用C#完成最小二乘法拟合平面方程,再计算点到面的距离
  • OpenGL Chan视频学习-8 How I Deal with Shaders in OpenGL
  • 深入理解设计模式之状态模式
  • kubernetes网络详解(内部网络、Pod IP分配、CNI)
  • 操作系统期中考试
  • 如何彻底禁用WordPress中的评论
  • 三、web安全-信息收集
  • 网络:华为S5720-52X-SI交换机重置console密码
  • 从0开始学习R语言--Day11--主成分分析
  • opencv(C++) 变换图像与形态学操作