当前位置: 首页 > news >正文

从Kafka读取数据

用Spark-Streaming从Kafka读取数据

  • 在大数据处理领域,Spark-Streaming和Kafka都是明星技术。今天咱们就来聊聊怎么用Spark-Streaming从Kafka读取数据并做处理,就算你是小白,也保证能看懂!
  • 先讲讲从Kafka获取数据的两种方式。早期有个ReceiverAPI,它需要专门的Executor接收数据,再发给其他Executor计算。想象一下,接收数据的人速度特别快,计算的人跟不上,数据就堆在计算的节点上,最后内存都被占满,这就是 ReceiverAPI 的问题,所以现在它不太常用了。
  • 后来出现了DirectAPI,它让计算的Executor自己主动去Kafka拿数据,速度自己能掌控,就像你自己去超市拿东西,想要多少、什么时候拿都自己决定,是不是方便多了?现在主流用的就是DirectAPI这种方式。
  • 下面进入实操环节。假设我们要通过SparkStreaming从Kafka读取数据,简单计算后打印到控制台。
  • 第一步,得在项目里导入依赖。就像搭积木,得先把要用的积木都准备好。在项目的配置文件里加上这段代码:
  • <dependency>
  •     <groupId>org.apache.spark</groupId>
  •     <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
  •     <version>3.0.0</version>
  • </dependency>
  • 这就把和Kafka交互的“工具”准备好了
  • 第二步,开始写代码。在代码里,先创建一个Spark配置,告诉程序要在哪里运行,叫什么名字,就像给它安排好“工作地点”和“名字牌”。然后创建一个StreamingContext,设置数据处理的时间间隔。接着定义Kafka的参数,比如Kafka集群的地址、消费者组ID,还有数据的反序列化方式,这就像是告诉程序去哪里找Kafka,属于哪个“消费小组”,以及怎么把数据“翻译”成能看懂的格式。
  • 准备好这些,就能通过KafkaUtils创建DStream来读取Kafka的数据啦。读取后,提取出数据里我们需要的部分,这里是value。再用熟悉的wordCount计算逻辑,把数据按空格切开、统计词频,最后打印出来。代码都写好后,启动程序,让它开始工作。
  • 接下来,还得启动Kafka集群,就像把工厂的机器都打开。再开启Kafka生产者,让它产生数据,这些数据就是我们要处理的“原材料”。一切准备就绪,运行程序,就能看到Spark-Streaming成功接收Kafka的数据并处理啦!
  • 最后,如果想看看消费进度,用kafka-consumer-groups.sh这个命令就行,它能清楚地告诉你数据处理到什么程度了。
http://www.xdnf.cn/news/111097.html

相关文章:

  • Cephalon端脑云:神经形态计算+边缘AI·重定义云端算力
  • Trae或者VsCode无法识别相对路径(不自动切换工作目录)
  • 高光谱相机在生物医学中的应用:病理分析、智慧中医与成分分析
  • React在什么情况下需要用useReducer
  • 前缀和-724.寻找数组的中心下标-力扣(LeetCode)
  • java—14 ZooKeeper
  • 【C++游戏引擎开发】第23篇:基础阴影映射(Shadow Mapping)
  • 2025/4/24
  • LeetCode 2799.统计完全子数组的数目:滑动窗口(哈希表)
  • 机器学习(9)——随机森林
  • 缓存与数据库数据一致性:旁路缓存、读写穿透和异步写入模式解析
  • “Daz to Unreal”将 G8 角色(包括表情)从 daz3d 导入到 UE5。在 UE5 中,我发现使用某个表情并与闭眼混合后,上眼睑出现了问题
  • 加密认证库openssl初始附带c/c++的使用源码
  • Nginx 中间件
  • 焊接机排错
  • 【C++指南】位运算知识详解
  • 直播预告 |【仓颉社区】第32期WORKSHOP
  • 蓝牙低功耗设备的漏洞与攻击——最新信息回顾
  • 图论算法体系:并查集、生成树、排序与路径搜索全解析
  • STM32F103系列单片机寄存器操作和标准库操作
  • CIFAR10图像分类学习笔记(三)---数据加载load_cifar10
  • 前端 Excel 工具组件实战:导入 → 可编辑表格 → 导出 + 样式同步 + 单元格合并
  • 《Llama.cpp:开启本地大模型部署新时代》
  • 动态规划问题 -- 斐波那契数列模型(使用最小花费爬楼梯)
  • 怎么实现RAG检索相似文档排序:similarities
  • 【hadoop】HBase shell 操作
  • 在 Ubuntu 环境为 Elasticsearch 引入 `icu_tokenizer
  • docker部署Jenkins工具
  • 宝塔里redis停止了自动启用脚本
  • Docker 部署 Redis:快速搭建高效缓存服务