当前位置: 首页 > java >正文

Kafka和flume整合

需求1:利用flume监控某目录中新生成的文件,将监控到的变更数据发送给kafka,kafka将收到的数据打印到控制台:

在flume/conf下添加.conf文件,

vi flume-kafka.conf

# 定义 Agent 组件

a1.sources=r1

a1.sinks=k1

a1.channels=c1

# 配置 Source(监控目录)

a1.sources.r1.type=spooldir

a1.sources.r1.spoolDir=/root/flume-kafka/

a1.sources.r1.inputCharset=utf-8

# 配置 Sink(写入 Kafka)

a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink

#指定写入数据到哪一个topic

a1.sinks.k1.kafka.topic=testTopic

#指定写入数据到哪一个集群

a1.sinks.k1.kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092

#指定写入批次

a1.sinks.k1.kafka.flumeBatchSize=20

#指定acks机制

a1.sinks.k1.kafka.producer.acks=1

# 配置 Channel(内存缓冲)

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

# 最大存储 1000 个 Event

a1.channels.c1.transactionCapacity=100

# 每次事务处理 100 个 Event

 

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

 

在指定目录之下创建文件夹:

kafka中创建topic:

kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --topic testTopic --partitions 3 --replication-factor 3

启动flume:

flume-ng agent -c /opt/software/flume/conf/ -f /opt/software/flume/conf/flume-kafka.conf -n a1 -Dflume.root.logger=INFO,console

启动kafka消费者,验证数据写入成功

kafka-console-consumer.sh --topic testTopic --bootstrap-server node01:9092,node02:9029,node03:9092 --from-beginning

新增测试数据:

echo "hello flume,hello kafka" >> /root/flume-kafka/1.txt

flume:

Kafka消费者

 

需求2:Kafka生产者生成的数据利用Flume进行采集,将采集到的数据打印到Flume的控制台上。

vi kafka-flume.conf

# 定义 Agent 组件

a1.sources=r1

a1.sinks=k1

a1.channels=c1

# 将 Flume Source 设置为 Kafka 消费者,从指定 Kafka 主题拉取数据。

a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource

#指定zookeeper集群地址

a1.sources.r1.zookeepers=node01:2181,node02:2181,node03:2181

#指定kafka集群地址

a1.sources.r1.kafka.bootstrap.servers=node01:9092,node02:9092,node03:9092

#指定生成消息的topic

a1.sources.r1.kafka.topics=testTopic

# 将 Flume 传输的数据内容直接打印到日志中,

a1.sinks.k1.type=logger

# 配置 Channel(内存缓冲)

a1.channels.c1.type=memory

a1.channels.c1.capacity=1000

a1.channels.c1.transcationCapacity=100

 

a1.sources.r1.channels=c1

a1.sinks.k1.channel=c1

启动Kafka生产者

kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic testTopic

启动Flume

flume-ng agent -c /opt/software/flume/conf/ -f /opt/software/flume/conf/kafka-flume.conf -n a1 -Dflume.root.logger=INFO,console

在生产者中写入数据

Flume中采集到数据 

 

 

 

 

http://www.xdnf.cn/news/1977.html

相关文章:

  • cJSON中#define cJSON_IsReference 256 和 #define cJSON_StringIsConst 512这定义的大小是?
  • CSS常见布局
  • 逐行解析性能奥秘:借助 `line_profiler` 深入优化热点函数
  • MySQL 从入门到精通:第二篇 - 数据类型、约束与索引
  • 【华为HCIP | 华为数通工程师】821—多选解析—第十六页
  • 那些年踩过的坑之Arrays.asList
  • CC攻击的类型都有哪些?
  • eclipse怎么导入junit4
  • 解读《数据资产质量评估实施规则》:企业数据资产认证落地的关键指南
  • MCP(Model Context Protocol)
  • AlarmClock4.8.4(官方版)桌面时钟工具软件下载安装教程
  • Zephyr kernel Build System (CMake)介绍
  • MySQL引擎分类与选择、SQL更新底层实现、分库分表、读写分离、主从复制 - 面试实战
  • 数字浪潮下的算力担当:GPU 服务器的多元应用、核心价值
  • 技术探索之路:从自我认知到成长规划
  • 实现层归一化
  • 数据结构------C语言经典题目(7)
  • 【T-MRMSM】文本引导多层次交互多尺度空间记忆融合多模态情感分析
  • 基于cesium实现鼠标移动动态绘制矩形和圆
  • Rust 学习笔记:函数和控制流
  • React 中什么时候用事件总线
  • 微信小程序直传阿里云 OSS 实践指南(V4 签名 · 秒传支持 · 高性能封装)
  • ROS1、ROS2如何把预编译好的二进制文件封装成功能包?
  • 【Django】新增字段后兼容旧接口 This field is required
  • 代码随想录:数组
  • 如何实现Android屏幕和音频采集并启动RTSP服务?
  • 如何使用@KafkaListener实现从nacos中动态获取监听的topic
  • 【Hive入门】Hive数据导出完全指南:从HDFS到本地文件系统的专业实践
  • 利用JMeter代理服务器方式实现高效压测
  • Leetcode 2845 题解