当前位置: 首页 > ds >正文

kafka整合flume与DStream转换

一、Kafka整合flume

cd /opt/software/flume/conf/

vi flume-kafka.conf

a1.sources=r1

a1.sinks=k1

a1.channels=c1

a1.sources.r1.type=spooldirt

a1.sources.r1.spoolDir=/root/flume-kafka

a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.topic=testTopice

a1.sinks.k1.kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092

a1.sinks.k1.kafka.flumeBatchSize=20

a1.sinks.k1.kafka.producer.acks=1

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transactionCapacity=100

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

cd /root/

mkdir flume-kafka

ll

drwxr-xr-x   2 root root 4096 11月  8 22:02 agent3

-rw-------.  1 root root  955 9月   6 16:41 anaconda-ks.cfg

-rw-r--r--   1 root root    1 10月 25 18:30 exec-logger.conf

drwxr-xr-x   2 root root   27 11月 15 18:00 flume-hive

drwxr-xr-x   2 root root    6 12月  3 03:59 flume-kafka

-rw-r--r--   1 root root   63 11月  8 23:01 flume-position.json

drwxr-xr-x  22 root root 4096 12月  3 03:59 kafkadata

drwxr-xr-x   3 root root   21 10月 11 18:32 opt

drwxr-xr-x   3 root root 4096 11月  8 18:17 testDir

drwxr-xr-x   2 root root   38 11月  8 19:01 testdir2

-rw-r--r--   1 root root  108 11月 15 17:09 test.log

drwxr-xr-x   2 root root 4096 11月  8 18:49 testSink

kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --topic testTopic --partitions 3 --replication-factor 2

flume-ng agent -c /opt/software/flume/conf/ -f /opt/software/flume/conf/flume-kafka.conf -n a1

kafka-console-consumer.sh --topic testTopic --bootstrap-server node01:9092,node:9092,node03:9092 --from-beginning

cd /root/flume-kafka/

echo "hello" >>test3.txt

echo "hello flume" >>test2.txt

cd /opt/software/flume/conf/

vi kafka-flume.conf

kafka-console-consumer.sh --topic testTopic --bootstrap-server node01:9092,node:9092,node03:9092 --from-beginning

Hello

 hello kafka

hello flume

flume-ng agent -c conf/ -f conf/kafka-flume.conf -n a1 -Dflume.root.logger=INFO,console

二、kafka架构深入

分区策略:轮询(RoundRobin)、按 Key 哈希(Hash)、自定义分区。

数据可靠性:通过 ACK 机制(0、1、-1)和 ISR(同步副本集合)保证,acks=-1时需等待 Leader 和 Follower 全部落盘。

事务与幂等性:0.11 版本引入幂等性(enable.idompotence=true),结合 At Least Once 实现 Exactly Once 语义。

三、Spark-Streaming核心编程

1.DStream 转换

DStream 是 Spark-Streaming 处理实时数据的基本单位,可以理解为 “实时数据流”。

转换操作就是对这个数据流进行加工处理,比如过滤、拆分、统计等,就像工厂流水线对原材料进行加工一样。

操作分为两类:

无状态转换:只处理当前批次的数据,不关心历史数据(比如统计当前 3 秒内的单词数)。

有状态转换:会记住历史数据(比如统计从程序启动到现在的总单词数),文档里没详细讲,重点在无状态部分。

2.无状态转换的常见操作

无状态转换就像 “即处理即丢弃”,每次只处理当前批次的数据,不保留之前的结果。

常见函数举例

3.Transform转换

Transform是一个 “万能转换” 函数,可以对每个批次的 RDD(DStream 内部由多个 RDD 组成)执行任意自定义操作,甚至可以使用 Spark 原生的 RDD 函数(即使 DStream 没有直接提供)

4.Join转换

join用于合并两个数据流中相同键的数据,就像拼拼图一样,只有键匹配的部分才能拼在一起。

适用于合并两个来源的单词数据

最后运行结果

http://www.xdnf.cn/news/2053.html

相关文章:

  • Linux软硬链接和动静态库(20)
  • mac brew 无法找到php7.2 如何安装php7.2
  • 【机器学习速记】面试重点/期末考试
  • 【音视频】⾳频处理基本概念及⾳频重采样
  • 企业级智能合同管理解决方案升级报告:道本科技携手DeepSeek打造智能合同管理新标杆
  • (六)机器学习---聚类与K-means
  • 基于AI应用创业IDEA:使用百度搜索开放平台的MCP广场智能推荐MCPServices服务
  • Java 安全:如何防止 DDoS 攻击?
  • 全栈国产化信创适配,构建安全可控的呼叫中心系统
  • uniapp-商城-37-shop 购物车 选好了 进行订单确认3 支付栏
  • 【vue】 实现浏览器自动播放音频的指南
  • MongoDB Shard Cluster
  • MySQL触法器
  • Cadence学习笔记之---原理图设计基本操作
  • 电子电子架构 --- 主机厂视角下ECU开发流程
  • 统计服务器CPU、内存、磁盘、网络IO、队列、数据库占用空间等等信息
  • 【XR手柄交互】Unity 中使用 InputActions 实现手柄控制详解(基于 OpenXR + Unity新输入系统(Input Actions))
  • MySQL表的操作 -- 表的增删改查
  • Linux 权限修改详解:chmod 命令与权限数字的秘密
  • 算法 | 基于SSA-CNN-LSTM(麻雀算法优化卷积长短期记忆神经网络)的股票价格预测(附完整matlab代码,公式,原理,可用于毕业论文设计)
  • 600W电源的EMC整改心得记录(PFC+LLC)
  • 【Chrony 时间同步双实验实操】从单节点校准到本地 NTP 服务器搭建详解
  • guvcview-源码记录
  • 项目质量管理
  • 风吸式杀虫灯环保优势
  • Coze高阶玩法 | 使用Coze制作思维认知提升视频,效率提升300%!(附保姆级教程)
  • Django之旅:第七节--模版继承
  • Git基本使用(很详细)
  • FWFT_FIFO和Standard_FIFO对比仿真
  • Shell脚本参数处理:位置变量/预定义变量