当前位置: 首页 > news >正文

【hadoop】实时计算词频案例

主要框架:Flume+Kafka+Spark

详细:

  1. VM Ware虚拟机、CentOS7、jdk-8u311、MySQL
  2. Hadoop-2.9.2、Flume-1.9.0、Zookeeper-3.4.6、Kafka_2.11-2.8.1、Spark-2.4.8

     3、IDEA、Scala-2.11.X

 一-技术选型与实现流程

本项目的流程,如下图所示,首先使用 Flume 采集本地文件中的单词内容,并把更新的数据实时推送到 Kafka 消息队列中间件中,然后使用 Spark 从 Kafka 中提取消息进行实时数据计算,并把计算结果存储到 MySQL 数据库中,后续使用 Davinci 等架构访问 MySQL 数据库获取实时计算的结果,并把结果动态地、可视化地呈现在网页上。

 

1.Flume的配置

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#
# # Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/hadoop/log.txt
a1.sources.r1.channels = c1
#
# # Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.channel = c1
a1.sinks.k1.brokerList = master:9092,slave1:9092,slave2:9092
a1.sinks.k1.topic = word_topic
a1.sinks.k1.producer.acks = 1
#
# # Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100
a1.channels.c1.transactionCapacity = 100

2. MySQL的前期准备

CREATE TABLE word_count (word VARCHAR(255) PRIMARY KEY,count INT
);

3. Spark Streaming

连接kafka,写入mysql,

scala代码,ai提示词:

写一个scala代码,功能是从 Kafka 实时消费数据、统计词频并同步到 MySQL。

首先通过 Log4j 配置屏蔽 Spark 和 Kafka 的 INFO 日志,确保控制台仅显示关键信息;接着创建 Spark Streaming 上下文,配置 Kafka 连接参数,从word_topic主题消费数据;对消息内容进行清洗(分割、过滤、小写处理)后,使用滑动窗口(30 秒窗口,5 秒滑动间隔)实时统计词频;然后将统计结果按分区批量写入 MySQL,每个分区复用一个数据库连接,通过事务管理和ON DUPLICATE KEY UPDATE语句确保数据一致性和高效更新;过程中通过println输出关键更新信息,便于监控

 4. Davinci 可视化

可设置自动刷新,写入文本文件的变化可对应图表的变化。

对比效果:

http://www.xdnf.cn/news/997615.html

相关文章:

  • 商业智能中的地图可视化模板:助力数据高效呈现
  • 55、错误处理-【源码流程】几种异常处理原理
  • 网络安全之RCE简单分析
  • 基于OpenCV实现视频运动目标检测与跟踪
  • hot100滑动窗口无重复字符串
  • 超简单部署离线语音合成TTS和语音识别
  • wpf 解决DataGridTemplateColumn中width绑定失效问题
  • 基于Django的购物系统
  • DevEco Studio 报错 “too many restarts of gpu-process (jcef)“
  • pyspark 初试
  • Spring 路由匹配机制详解:时间复杂度从 O(n) 降至 O(log n)
  • 【Zephyr 系列 20】BLE 模块产线测试系统设计:快速校验、参数写入、自动识别的完整方案
  • Package vs. Directory (包 vs. 目录)
  • HarmonyOS运动开发:打造便捷的静态快捷菜单
  • 以前在服务器启动了docker,现在不需要了,为了安全,去掉docker服务@Ubuntu
  • Linux 基本命令
  • GO后端开发内存管理及参考答案
  • 没有宝塔面板的服务器上的WordPress网站打包下载到本地?
  • 动态多目标进化算法:MOEA/D-SVR求解CEC2018(DF1-DF14),提供完整MATLAB代码
  • 数字图像处理与OpenCV初探
  • 机器学习 [白板推导](五)[支持向量机]
  • uni-app隐藏返回按钮
  • VAS5081电动工具专用3-8节串联电池监控芯片奇力科技
  • 深入理解常用依存关系标签
  • 常见的几种排序算法
  • ​​MPI + OpenMP 环境配置指南(Windows/Linux)​
  • 【C++】继承和派生
  • 【靶场】upload-labs-文件上传漏洞闯关
  • Java面试题020:一文深入了解微服务之负载均衡Feign
  • docker-Dockerfile 配置