当前位置: 首页 > ops >正文

项目升级--kafka消息队列的应用

应用简介

我已经仔细通读了整个项目,并整理了项目中Kafka的应用。以下是关于该项目中Kafka集成的详细全面介绍:

  项目中Kafka的应用详解

  1. Kafka集成背景与目的

  根据项目文档UPGRADE_PLAN.md,该项目原本使用Redis的发布/订阅功能来实现集群间的消息传递。为了提升系统的可扩展性、可靠性和性能,项目计划并已实施了将消息队列从Redis替换为Kafka的升级。

  Kafka相比Redis的优势包括:
- 更高的吞吐量和更好的持久性
- 更好的水平扩展能力
- 强有序性保证
- 更好的容错能力

  2. Kafka组件架构

  项目中实现了完整的Kafka客户端组件,主要包括以下三个类:

  2.1 KafkaProducer(生产者)

  文件:include/server/kafka/kafka_producer.h 和 src/server/kafka/kafka_producer.cpp

  主要功能:
- 初始化Kafka生产者实例
- 发送消息到指定的Kafka主题
- 支持通过librdkafka库实现实际的消息发送

  关键方法:
- KafkaProducer(const std::string& brokers, const std::string& topic):构造函数,设置Kafka服务器地址和主题
- bool init():初始化生产者
- bool sendMessage(const std::string& message):发送消息
- bool sendMessage(const std::string& topic, const std::string& message):发送消息到指定主题

  2.2 KafkaConsumer(消费者)

  文件:include/server/kafka/kafka_consumer.h 和 src/server/kafka/kafka_consumer.cpp

  主要功能:
- 初始化Kafka消费者实例
- 订阅和取消订阅Kafka主题
- 设置消息回调函数
- 启动和停止消息消费

  关键方法:
- KafkaConsumer(const std::string& brokers, const std::string& topic):构造函数
- bool init():初始化消费者,设置消费者组ID和自动提交偏移量
- bool subscribe(const std::string& topic):订阅主题
- void setMessageCallback(std::function<void(const std::string& topic, const std::string& message)> callback):设置消息回调
- void startConsume():开始消费消息

  2.3 KafkaManager(管理器)

  文件:include/server/kafka/kafka_manager.h 和 src/server/kafka/kafka_manager.cpp

  主要功能:
- 作为单例模式管理所有Kafka生产者和消费者
- 提供统一的接口获取生产者和消费者实例
- 简化消息发送操作

  关键方法:
- static KafkaManager* instance():获取单例实例
- bool init(const std::string& brokers):初始化管理器
- KafkaProducer* getProducer(const std::string& topic):获取指定主题的生产者
- KafkaConsumer* getConsumer(const std::string& topic):获取指定主题的消费者
- bool sendMessage(const std::string& topic, const std::string& message):发送消息

  3. Kafka集成实现细节

  3.1 依赖管理

  在src/server/CMakeLists.txt中,项目通过find_library和find_path查找librdkafka库,并根据是否找到库来决定是否链接Kafka相关库。

  # 检查librdkafka库是否存在
find_library(RDKAFKA_LIBRARY rdkafka)
find_library(RDKAFKA++_LIBRARY rdkafka++)
find_path(RDKAFKA_INCLUDE_DIR librdkafka/rdkafka.h)

  if(RDKAFKA_LIBRARY AND RDKAFKA++_LIBRARY AND RDKAFKA_INCLUDE_DIR)
# 如果找到了librdkafka库,则添加相关配置
target_link_libraries(ChatServer
# ... 其他库
${RDKAFKA_LIBRARY}     # Kafka客户端库
${RDKAFKA++_LIBRARY}   # Kafka C++客户端库
)
target_include_directories(ChatServer PRIVATE ${RDKAFKA_INCLUDE_DIR})
target_compile_definitions(ChatServer PRIVATE HAS_LIBRDKAFKA)
endif()

  3.2 条件编译

  代码中使用了条件编译#ifdef HAS_LIBRDKAFKA来确保在没有安装librdkafka库的情况下也能编译项目,只是Kafka功能不可用。

  3.3 实际应用

  在chatservice.cpp中,Kafka被用于处理一对一聊天和群组聊天的跨服务器消息传递:

  // 一对一聊天业务
void ChatService::oneChat(const TcpConnectionPtr &conn, const string &data, Timestamp time)
{
// ... 解析消息 ...

      // 查询toid是否在线 
User user = _userModel.query(toid);
if (user.getState() == "online")
{
// 使用Kafka发送消息而不是Redis
// 直接转发protobuf消息
if (_kafkaManager) {
_kafkaManager->sendMessage("user_messages", chatMsg.SerializeAsString());
}
return;
}

      // ... 处理离线消息 ...
}

  // 群组聊天业务
void ChatService::groupChat(const TcpConnectionPtr &conn, const string &data, Timestamp time)
{
// ... 解析消息 ...

      for (int id : useridVec)
{
// ... 处理在线用户 ...

          // 查询用户是否在线 
User user = _userModel.query(id);
if (user.getState() == "online")
{
// 使用Kafka发送消息而不是Redis
// 直接转发protobuf消息
if (_kafkaManager) {
_kafkaManager->sendMessage("group_messages", groupChatMsg.SerializeAsString());
}
}
else
{
// 存储离线群消息
_offlineMsgModel.insert(id, groupChatMsg.SerializeAsString());
}
}
}

  同时,ChatService类中实现了处理Kafka消息的回调函数:

  // 从Kafka消息队列中获取订阅的消息
void ChatService::handleKafkaMessage(const string& topic, const string& message) {
LOG_INFO << "Received Kafka message on topic: " << topic;

      // 直接转发protobuf消息
lock_guard<mutex> lock(_connMutex);
// 解析基础消息以获取目标用户ID
chat::BaseMessage baseMsg;
if (baseMsg.ParseFromString(message)) {
int targetUserId = baseMsg.toid();
auto it = _userConnMap.find(targetUserId);
if (it != _userConnMap.end()) {
// 用户在线,直接转发消息
it->second->send(message);
} else {
// 用户不在线,存储离线消息
_offlineMsgModel.insert(targetUserId, message);
}
} else {
LOG_ERROR << "Failed to parse Kafka protobuf message";
}
}

  在ChatService构造函数中初始化了Kafka管理器并设置了消息回调:

  ChatService::ChatService() : _kafkaManager(nullptr), _kafkaConsumerThread(nullptr)
{
// ... 其他初始化 ...

      // 初始化Kafka管理器
_kafkaManager = KafkaManager::instance();
_kafkaManager->init("localhost:9092");

      // ... 其他初始化 ...

      // 设置Kafka消息回调
_kafkaManager->setMessageCallback(std::bind(&ChatService::handleKafkaMessage, this, _1, _2));
}

  4. 总结

  该项目中的Kafka集成实现了一个完整的生产者-消费者模型,通过KafkaManager单例管理所有Kafka连接,实现了跨服务器的消息传
递。该实现具有以下特点:

  1. 完整的Kafka客户端功能:实现了生产者和消费者的基本功能
2. 灵活的配置:支持通过构造函数设置Kafka服务器地址和主题
3. 条件编译:在没有Kafka库的情况下仍可编译项目
4. 与业务逻辑集成:在聊天服务中使用Kafka处理跨服务器消息
5. 回调机制:通过回调函数处理接收到的Kafka消息
6. Protobuf支持:直接发送和接收Protobuf序列化的消息

  这种实现方式提升了系统的可扩展性和可靠性,为构建高性能的集群聊天服务器奠定了基础。

项目中相关文件

经过仔细通读整个项目,以下是所有与Kafka有关的应用和配置的完整列表:

  1. Kafka核心实现代码

  Kafka组件类

  1. KafkaProducer - Kafka生产者类
- 文件:include/server/kafka/kafka_producer.h
- 实现:src/server/kafka/kafka_producer.cpp
- 功能:负责向Kafka主题发送消息
2. KafkaConsumer - Kafka消费者类
- 文件:include/server/kafka/kafka_consumer.h
- 实现:src/server/kafka/kafka_consumer.cpp
- 功能:负责从Kafka主题消费消息
3. KafkaManager - Kafka管理器类
- 文件:include/server/kafka/kafka_manager.h
- 实现:src/server/kafka/kafka_manager.cpp
- 功能:作为单例管理所有Kafka生产者和消费者实例

  核心功能实现

  1. 消息发送:
- 在ChatService::oneChat()方法中使用Kafka发送点对点消息
- 在ChatService::groupChat()方法中使用Kafka发送群组消息
- 使用主题:"user_messages"和"group_messages"
2. 消息接收:
- ChatService::handleKafkaMessage()方法处理从Kafka接收到的消息
- 通过_kafkaManager->setMessageCallback()设置回调函数
3. 初始化配置:
- 在ChatService构造函数中初始化Kafka管理器
- 默认连接地址:"localhost:9092"

  2. 依赖库配置

  CMakeLists.txt配置

  - 文件:src/server/CMakeLists.txt
- 功能:检查并链接librdkafka库
- 条件编译:使用HAS_LIBRDKAFKA宏控制Kafka功能的启用

  第三方库依赖

  - librdkafka C++客户端库
- 在README.md和RUNNING_GUIDE.md中列为可选依赖

  3. Docker容器化配置

  docker-compose.yml(主配置文件)

  1. Zookeeper服务:
- 镜像:confluentinc/cp-zookeeper:latest
- 端口:2181
- 功能:Kafka集群管理
2. Kafka服务:
- 镜像:confluentinc/cp-kafka:latest
- 端口:9092
- 环境变量配置:
- KAFKA_BROKER_ID: 1
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

  其他Docker配置文件

  1. docker-compose-cn.yml(国内镜像):
- 使用阿里云镜像仓库
- Kafka镜像:registry.cn-hangzhou.aliyuncs.com/dockerhub/kafka:3.4
2. docker-compose-domestic.yml(国内镜像):
- 使用腾讯云镜像仓库
- Kafka镜像:ccr.ccs.tencentyun.com/public/kafka:3.4
3. docker-compose-official.yml(官方镜像):
- 使用官方镜像仓库
- Kafka镜像:bitnami/kafka:3.4

  聊天服务器环境变量配置

  - 所有聊天服务器实例(chat_server_1, chat_server_2, chat_server_3)都配置了:
- KAFKA_HOST=kafka 环境变量

  4. 项目文档中的Kafka相关内容

  UPGRADE_PLAN.md

  1. 升级计划:
- 明确将Redis pub/sub替换为Kafka作为消息队列
- 集成librdkafka C++客户端库
- 实现KafkaProducer和KafkaConsumer类
2. 完成状态:
- 标记为"COMPLETE - Redis pub/sub has been fully replaced with Kafka"
- 已实现KafkaManager类进行连接管理

  README.md和RUNNING_GUIDE.md

  - 将Kafka客户端库列为可选依赖
- 在技术架构中提及Kafka作为可选的消息队列

  5. 架构设计特点

  1. 可选性:Kafka功能是可选的,项目可以在没有Kafka库的情况下编译和运行
2. 条件编译:通过HAS_LIBRDKAFKA宏实现条件编译
3. 单例模式:使用KafkaManager单例管理所有Kafka连接
4. Protobuf集成:直接发送和接收Protobuf序列化的消息
5. 容器化部署:完整的Docker配置支持Kafka集群部署

  这个Kafka集成方案提供了高可用、可扩展的消息传递机制,替代了原有的Redis
pub/sub方案,为构建高性能的集群聊天服务器提供了可靠的基础。

http://www.xdnf.cn/news/20239.html

相关文章:

  • 状压 dp --- 数据范围小
  • 雪球科技Java开发工程师笔试题
  • happen-before原则
  • WSL Ubuntu Docker 代理自动配置教程
  • LeetCode 139. 单词拆分 - 动态规划解法详解
  • 【软考架构】第二章 计算机系统基础知识:计算机网络
  • 主数据系统是否对于企业是必需的?
  • 最大似然估计:损失函数的底层数学原理
  • 基本数据类型和包装类的区别?
  • 2025年大数据专业人士认证发展路径分析
  • MySQL运维补充
  • 【目录-判断】鸿蒙HarmonyOS开发者基础
  • 敏捷scrum管理实战经验总结
  • 贪心算法应用:化工反应器调度问题详解
  • 【LLIE专题】SIED:看穿0.0001lux的极致黑暗
  • NPU边缘推理识物系统
  • 懒加载的概念
  • 新能源风口正劲,“充电第一股”能链智电为何掉队?
  • 操作系统启动过程详解
  • Coze源码分析-资源库-删除插件-前端源码-核心组件实现
  • 03-生产问题-慢SQL-20250926
  • 机器人控制器开发(导航算法——导航栈关联坐标系)
  • 创客匠人:什么是“好的创始人IP”
  • 2025年本体论:公理与规则的挑战与趋势
  • CentOS系统停服,系统迁移Ubuntu LTS
  • 【CSS,DaisyUI】自定义选取内容的颜色主题
  • Android开发——初步了解AndroidManifest.xml
  • 零基础入门深度学习:从理论到实战,GitHub+开源资源全指南(2025最新版)
  • C++ 条件变量 通知 cv.notify_all() 先释放锁再通知
  • [光学原理与应用-428]:非线性光学 - 为什么要改变光的波长/频率,获得特点波长/频率的光?