当前位置: 首页 > web >正文

202534 | KafKa简介+应用场景+集群搭建+快速入门

Apache Kafka 简介


一、什么是 Kafka?

Apache Kafka 是一个高吞吐量、分布式、可扩展的流处理平台,用于构建实时数据管道和流应用程序。它最初由 LinkedIn 开发,并于 2011 年开源,目前由 Apache 软件基金会进行维护。

Kafka 具备以下核心特性:

  • 发布-订阅消息系统:支持生产者向主题(Topic)发送消息,消费者从主题中读取消息。
  • 高吞吐量与低延迟:可处理百万级每秒消息,延迟低于几毫秒。
  • 持久化存储:消息以日志形式存储在磁盘上,可设定保留时间。
  • 可水平扩展:通过分区(Partition)机制轻松扩展读写能力。
  • 高容错性:副本机制保障在节点故障时依旧能够正常运行。

Kafka 不仅是一个消息队列,更是一个用于流数据处理的统一平台


二、Kafka 的应用场景

Kafka 在大数据和分布式系统领域具有广泛应用,主要包括:

1. 日志收集与传输

Kafka 可统一收集来自不同服务或服务器的日志,作为日志系统的核心组件,将数据传输至后端处理系统(如 Hadoop、Elasticsearch 等)。

2. 实时数据分析

结合 Apache Flink、Spark Streaming 等流处理框架,Kafka 可用于构建实时分析平台,实现实时用户行为分析、实时监控等。

3. 事件驱动架构(EDA)

Kafka 作为微服务架构中的事件总线,使服务之间通过事件解耦,从而提高系统灵活性与可维护性。

4. 数据管道(Data Pipeline)

Kafka 能将数据从数据库、日志系统等源系统传输到数据仓库或数据湖,是构建高效可靠数据管道的核心工具。

5. 替代传统消息队列

在对吞吐量、可扩展性有更高要求的系统中,Kafka 可替代传统消息中间件(如 RabbitMQ、ActiveMQ)作为消息传递通道。


三、Kafka 的诞生背景

Kafka 的诞生源于 LinkedIn 内部对于日志处理和数据传输系统的性能瓶颈

  • LinkedIn 的业务快速增长,系统需要处理海量的用户行为数据与日志。
  • 传统的消息队列系统难以满足高吞吐量与高可用性的要求。
  • 工程团队设计了一种新的架构,将“分布式日志”作为核心思想,构建出一个同时支持日志收集、传输与处理的统一平台。

Kafka 在设计上融合了以下理念:

  • 以持久化日志为核心数据结构:每条消息即为一条日志记录,可重复读取。
  • 分布式架构支持高可用性与高扩展性:通过集群部署和分区副本机制实现。
  • 支持批处理和流处理双模式:既可用于数据采集与离线分析,也适合实时流处理。

这一创新架构为 Kafka 后来的广泛应用打下了坚实基础,也推动了现代数据架构的演进。

好的!以下是 Kafka Java 快速入门指南,适合初学者快速了解如何在 Java 程序中使用 Kafka 实现消息的生产与消费。


明白了!以下是使用 Mermaid 格式图形 重新整理的 Kafka 集群搭建指南,清晰展示了 Kafka + ZooKeeper 的集群结构和搭建步骤。


Kafka 集群搭建指南(ZooKeeper 模式)


一、Kafka 集群架构图(Mermaid 格式)

Kafka集群
ZooKeeper集群
Kafka Broker 1
192.168.1.101:9092
Kafka Broker 2
192.168.1.102:9092
Kafka Broker 3
192.168.1.103:9092
ZooKeeper 节点1
192.168.1.101:2181
ZooKeeper 节点2
192.168.1.102:2181
ZooKeeper 节点3
192.168.1.103:2181
Kafka 客户端

Topic 分区布局
Leader
Replica
Replica
Leader
Replica
Replica
Leader
Replica
Replica
Partition 0
Leader: Kafka-1
Replica: Kafka-2, Kafka-3
Partition 1
Leader: Kafka-2
Replica: Kafka-1, Kafka-3
Partition 2
Leader: Kafka-3
Replica: Kafka-1, Kafka-2
Kafka Broker 1
192.168.1.101:9092
Kafka Broker 2
192.168.1.102:9092
Kafka Broker 3
192.168.1.103:9092

二、准备工作

1. 系统要求

  • Linux/CentOS/Ubuntu(或容器)
  • Java 8+(推荐 Java 11)
  • 至少 3 台机器或虚拟节点

2. 下载 Kafka 安装包(每台机器)

wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -xzf kafka_2.13-3.6.0.tgz
cd kafka_2.13-3.6.0

三、配置 ZooKeeper 集群

1. 修改 config/zookeeper.properties

dataDir=/tmp/zookeeper
clientPort=2181
server.1=192.168.1.101:2888:3888
server.2=192.168.1.102:2888:3888
server.3=192.168.1.103:2888:3888

2. 设置每个节点的 myid

echo 1 > /tmp/zookeeper/myid  # 节点1
echo 2 > /tmp/zookeeper/myid  # 节点2
echo 3 > /tmp/zookeeper/myid  # 节点3

3. 启动 ZooKeeper(每台执行)

bin/zookeeper-server-start.sh config/zookeeper.properties

四、配置 Kafka Broker

每台机器修改 config/server.properties,示例:

broker.id=1                           # 每个节点唯一(如 1、2、3)
listeners=PLAINTEXT://192.168.1.101:9092
log.dirs=/tmp/kafka-logs-1
zookeeper.connect=192.168.1.101:2181,192.168.1.102:2181,192.168.1.103:2181

启动 Kafka:

bin/kafka-server-start.sh config/server.properties

五、验证集群

1. 创建主题

bin/kafka-topics.sh --create \
--bootstrap-server 192.168.1.101:9092 \
--replication-factor 3 --partitions 3 \
--topic test-topic

2. 查看主题分布

bin/kafka-topics.sh --describe --topic test-topic --bootstrap-server 192.168.1.101:9092

六、发送与消费消息

生产者:

bin/kafka-console-producer.sh --topic test-topic --bootstrap-server 192.168.1.101:9092

消费者:

bin/kafka-console-consumer.sh --topic test-topic --bootstrap-server 192.168.1.102:9092 --from-beginning

七、常见问题

问题原因
Kafka 启动失败broker.id 重复或端口冲突
消息无法消费ZooKeeper 未正常连接,主题未正确创建
节点日志混乱或冲突log.dirs 配置重复,broker.id 没有区分
ZooKeeper 单点故障节点不足,推荐部署奇数个节点(3/5)

Kafka Java 快速入门指南


一、准备工作

1. 添加 Maven 依赖

在你的项目的 pom.xml 中添加以下依赖:

<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version></dependency>
</dependencies>

二、Kafka Producer 示例(生产者)

1. 编写 KafkaProducerDemo.java

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;public class KafkaProducerDemo {public static void main(String[] args) {// 配置Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092"); // Kafka 地址props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 创建生产者KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 发送消息for (int i = 0; i < 5; i++) {String message = "Hello Kafka " + i;producer.send(new ProducerRecord<>("test-topic", message));System.out.println("Sent: " + message);}producer.close();}
}

三、Kafka Consumer 示例(消费者)

1. 编写 KafkaConsumerDemo.java

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class KafkaConsumerDemo {public static void main(String[] args) {// 配置Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("group.id", "test-group"); // 消费组props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("auto.offset.reset", "earliest"); // 从头开始消费// 创建消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 订阅主题consumer.subscribe(Collections.singletonList("test-topic"));// 消费消息while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : records) {System.out.printf("Received message: key=%s, value=%s, offset=%d%n",record.key(), record.value(), record.offset());}}}
}

四、运行顺序建议

  1. 启动 Kafka 服务(本地或远程)
  2. 先运行消费者 KafkaConsumerDemo(等待监听)
  3. 再运行生产者 KafkaProducerDemo(发送消息)

五、调试小技巧

  • 确保 Kafka 服务正常运行,端口默认为 9092
  • 主题 test-topic 必须提前创建,或在 Kafka 开启 auto.create.topics.enable=true 的情况下自动创建。
  • 消费者默认是“只消费一次”,再次运行需更改 group.id 或开启重复读取逻辑。
http://www.xdnf.cn/news/5177.html

相关文章:

  • 大模型微调终极方案:LoRA、QLoRA原理详解与LLaMA-Factory、Xtuner实战对比
  • 绑定 SSH key(macos)
  • uniapp-商城-49-后台 分类数据的生成(方法加精)
  • 【计算机视觉】OpenCV实战项目:FunnyMirrors:基于OpenCV的实时哈哈镜效果实现技术解析
  • Checkmk实战指南:从零构建企业级监控系统
  • 字节:增强LLM角色区分能力
  • 第十八章,入侵检测/防御系统(IDS/IPS)
  • mysql-8.0.30-winx64 Install/Remove of the Service Denied!
  • 互联网大厂Java求职面试实战:Spring Boot微服务与数据库优化详解
  • Java云原生到底是啥,有哪些技术
  • DA14585墨水屏学习
  • 电子电器架构 --- 新能源高压上下电那点事一文通
  • 浅谈装饰模式
  • 旅游推荐数据分析可视化系统算法
  • 数据结构中的栈与队列:原理、实现与应用
  • C++学习-入门到精通-【6】指针
  • 【AI智能推荐系统】第七篇:跨领域推荐系统的技术突破与应用场景
  • [RoarCTF 2019]Easy Calc1
  • 【许可证】Open Source Licenses
  • 异地多活单元化架构下的微服务体系
  • 某某文KU下载工具,请低调再低调使用!
  • Hadoop 2.x设计理念解析
  • 【大模型】使用 LLaMA-Factory 进行大模型微调:从入门到精通
  • AI 驱动数据库交互技术路线详解:角色、提示词工程与输入输出分析
  • Linux——Mysql索引和事务
  • 【验证码】⭐️集成图形验证码实现安全校验
  • Linux进程管理
  • journalctl使用
  • 网络地址转换之SNAT和DNAT
  • 《自动驾驶封闭测试场地建设技术要求》 GB/T 43119-2023——解读