当前位置: 首页 > java >正文

【Kafka】Kafka从入门到实战:构建高吞吐量分布式消息系统

Kafka从入门到实战:构建高吞吐量分布式消息系统

一、Kafka概述

Apache Kafka是一个分布式流处理平台,最初由LinkedIn开发,后成为Apache顶级项目。它被设计用于高吞吐量、低延迟的消息处理,能够处理来自多个生产者的海量数据,并将这些数据实时传递给消费者。

Kafka核心特性

  1. 高吞吐量:即使是非常普通的硬件,Kafka也能支持每秒数百万条消息
  2. 可扩展性:集群可以无缝扩展,无需停机
  3. 持久性:消息持久化到磁盘,并支持数据备份防止数据丢失
  4. 分布式:天然支持分布式部署,具有容错能力
  5. 实时性:消息产生后立即对消费者可见

二、Kafka核心概念

1. 基本组件

  • Producer:消息生产者,向Kafka集群发送消息
  • Consumer:消息消费者,从Kafka集群读取消息
  • Broker:Kafka服务器节点,组成Kafka集群
  • Topic:消息类别,生产者发送消息到特定Topic,消费者订阅特定Topic
  • Partition:Topic物理上的分组,一个Topic可以分为多个Partition
  • Replica:Partition的副本,保证数据可靠性
  • Consumer Group:一组Consumer实例共同消费一个Topic

2. 消息存储机制

Kafka采用顺序写入磁盘的方式存储消息,这种设计使得Kafka即使使用普通磁盘也能实现很高的吞吐量。每个Partition是一个有序的、不可变的消息序列,新消息被追加到Partition末尾。

3. 消息传递语义

  • 至少一次(At least once):消息不会丢失,但可能被重复消费
  • 至多一次(At most once):消息可能丢失,但不会被重复消费
  • 精确一次(Exactly once):消息恰好被消费一次

三、Kafka环境搭建

1. 单机版安装

# 下载Kafka
wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.6.0.tgz
tar -xzf kafka_2.13-3.6.0.tgz
cd kafka_2.13-3.6.0# 启动Zookeeper(新版本Kafka已内置,无需单独安装)
bin/zookeeper-server-start.sh config/zookeeper.properties# 启动Kafka
bin/kafka-server-start.sh config/server.properties

2. 集群部署

修改config/server.properties文件:

# 每个broker必须有唯一的id
broker.id=1# 监听地址
listeners=PLAINTEXT://:9092# 日志目录
log.dirs=/tmp/kafka-logs# Zookeeper连接地址
zookeeper.connect=localhost:2181# 副本数量
default.replication.factor=3# 分区数量
num.partitions=3

在不同节点上启动多个Broker实例即组成集群。

四、Kafka基础操作

1. Topic管理

# 创建Topic
bin/kafka-topics.sh --create --topic test-topic \
--bootstrap-server localhost:9092 \
--partitions 3 --replication-factor 1# 查看Topic列表
bin/kafka-topics.sh --list --bootstrap-server localhost:9092# 查看Topic详情
bin/kafka-topics.sh --describe --topic test-topic \
--bootstrap-server localhost:9092# 删除Topic
bin/kafka-topics.sh --delete --topic test-topic \
--bootstrap-server localhost:9092

2. 生产消费消息

# 启动生产者
bin/kafka-console-producer.sh --topic test-topic \
--bootstrap-server localhost:9092# 启动消费者
bin/kafka-console-consumer.sh --topic test-topic \
--bootstrap-server localhost:9092 --from-beginning

五、Java客户端开发

1. 添加依赖

<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.6.0</version>
</dependency>

2. 生产者示例

import org.apache.kafka.clients.producer.*;import java.util.Properties;public class SimpleProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 可选配置props.put("acks", "all"); // 确保所有副本都收到消息props.put("retries", 3); // 发送失败重试次数props.put("linger.ms", 1); // 发送延迟Producer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 100; i++) {ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", "key-" + i, "value-" + i);producer.
http://www.xdnf.cn/news/12987.html

相关文章:

  • GeoDrive:基于三维几何信息有精确动作控制的驾驶世界模型
  • 快速使用 Flutter 的 Dialog 和 AlertDialog
  • Delivering Arbitrary-Modal Semantic Segmentation(CVPR2023)任意模态语义分割论文阅读
  • 基于Uniapp开发HarmonyOS 5.0旅游应用技术实践
  • 如何把工业通信协议转换成http websocket
  • MongoDB 入门指南:安装、配置与 Navicat 连接教程
  • Vue3学习(接口,泛型,自定义类型,v-for,props)
  • 时间同步技术在电力系统中的应用
  • 水泥厂自动化升级利器:Devicenet转Modbus rtu协议转换网关
  • 达梦数据库CASE WHEN条件
  • Spring Boot 启动流程详解
  • 打手机检测算法AI智能分析网关V4守护公共/工业/医疗等多场景安全应用
  • Android Native 之 lmkd进程分析
  • 嵌入式学习之系统编程(九)OSI模型、TCP/IP模型、UDP协议网络相关编程(6.3)
  • NLP-数据集
  • wifi改ip地址有什么用?wifi改ip地址怎么改
  • LeetCode - 3. 无重复字符的最长子串
  • 无源一阶低通电路噪声如何计算
  • 音乐“穿梭机”AudioRelay,让你的音频“无缝对接”
  • push [特殊字符] present
  • 深入解析 Qwen3 基础模型:架构设计与技术创新
  • 第2课 SiC MOSFET与 Si IGBT 静态特性对比
  • 从0开始学习R语言--Day20--Wilcoxon秩和检验
  • 组件库实战-基建思路
  • Docker拉取MySQL后数据库连接失败的解决方案
  • P3 QT项目----记事本(3.8)
  • Qt的学习(二)
  • 用神经网络读懂你的“心情”:揭秘情绪识别系统背后的AI魔法
  • HDMI 显示器热插拔对应显示应用启停测试
  • 高分辨率图像合成归一化流扩展