当前位置: 首页 > ai >正文

【C/C++】记录一次麻烦的Kafka+Json体验

文章目录

  • 麻烦的Kafka+Json体验
    • 1 目标
    • 2 工程搭建
      • 2.1 docker配置
      • 2.2 代码
      • 2.3 工程压缩包
    • 3 执行结果

麻烦的Kafka+Json体验

1 目标

初心:结合kafka + json + docker,验证基本的数据生产/消费。

Kafka 配合 JSON 工具,主要是为了数据的序列化和反序列化,以及便于消息内容的格式化、解析和处理(让消息内容可读、结构化、标准化,方便发送、接收和处理)。

  1. 数据格式标准化
    Kafka 本身是一个消息队列系统,消息的内容可以是任意字节流。使用 JSON 作为消息格式,能够让消息结构清晰、规范,方便发送方和接收方统一约定消息格式。

  2. 跨语言和跨平台兼容
    JSON 是一种轻量级数据交换格式,几乎所有编程语言都支持 JSON 的解析和生成。这让 Kafka 发送的消息可以被不同语言和平台的消费者方便地处理。

  3. 方便调试和监控
    JSON 格式是文本格式,易于阅读和打印,方便开发和运维人员在调试、日志查看时快速理解消息内容。

  4. 灵活的消息结构
    JSON 支持嵌套结构、数组、键值对等灵活的数据组织方式,适合表达复杂的数据模型。

  5. 序列化/反序列化工具支持
    常见的 Kafka 客户端库通常提供 JSON 序列化器(Serializer)和反序列化器(Deserializer),让你可以方便地将业务对象转换成 JSON 字符串发送到 Kafka,或从 JSON 字符串转换回业务对象。

  6. 与 schema 注册中心配合
    虽然 JSON 本身没有强类型约束,但通过结合 JSON Schema 和 schema 注册中心(如 Confluent Schema Registry),可以保证数据格式的一致性和兼容性。


2 工程搭建

2.1 docker配置

docker-compose.yml

version: "3.8"services:zookeeper:image: confluentinc/cp-zookeeper:7.5.0container_name: zookeeperports:- "2181:2181"environment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000kafka:image: confluentinc/cp-kafka:7.5.0container_name: kafkaports:- "9092:9092"environment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1depends_on:- zookeeperdev:build:context: .dockerfile: Dockerfile.dev  # ✅ 使用构建好的开发镜像container_name: cpp_devtty: truestdin_open: trueworking_dir: /home/dev/codevolumes:- ./cpp_kafka_code:/home/dev/codedepends_on:- kafka

Dockerfile.dev

FROM ubuntu:22.04ENV DEBIAN_FRONTEND=noninteractiveRUN apt-get update && apt-get install -y \build-essential \cmake \git \curl \wget \pkg-config \vim \librdkafka-dev \libssl-dev \libzstd-dev \libjsoncpp-dev \ca-certificates \&& rm -rf /var/lib/apt/lists/*# 安装 nlohmann/json(如未预装)
# RUN wget https://github.com/nlohmann/json/releases/download/v3.11.3/json.tar.xz && \
#    tar -xf json.tar.xz && \
#    cp -r single_include/nlohmann /usr/include/ && \
#    rm -rf json.tar.xz single_include
RUN apt-get update && apt-get install -y \nlohmann-json3-dev# 创建开发用户
RUN useradd -ms /bin/bash dev
USER dev
WORKDIR /home/dev/code

2.2 代码

cpp_kafka_code/
├── CMakeLists.txt      
├── include/
│   ├── KafkaProducer.h
│   ├── KafkaConsumer.h
│   ├── ConfigManager.h
│   └── Message.h
├── src/
│   ├── KafkaProducer.cpp
│   ├── KafkaConsumer.cpp
│   ├── ConfigManager.cpp
│   └── Message.cpp
├── config/
│   └── kafka_config.json
├── test/
│   ├── producer_test.cpp
│   └── consumer_test.cpp

CMakeLists.txt

cmake_minimum_required(VERSION 3.14)
project(KafkaModule)set(CMAKE_CXX_STANDARD 17)set(ENV{PKG_CONFIG_PATH} "/usr/lib/x86_64-linux-gnu/pkgconfig")# 查找依赖库
find_package(PkgConfig REQUIRED)
pkg_check_modules(RDKAFKA REQUIRED rdkafka)# 添加头文件路径
include_directories(${CMAKE_SOURCE_DIR}/include${RDKAFKA_INCLUDE_DIRS}/usr/include/librdkafka
)link_directories(${RDKAFKA_LIBRARY_DIRS})# 源码文件
file(GLOB SOURCESsrc/*.cpp
)# 可执行测试文件
add_executable(producer_test test/producer_test.cpp ${SOURCES})
add_executable(consumer_test test/consumer_test.cpp ${SOURCES})# 链接依赖库
# target_link_libraries(producer_test PkgConfig::RDKAFKA)
# target_link_libraries(consumer_test PkgConfig::RDKAFKA)
target_link_libraries(producer_test ${RDKAFKA_LIBRARIES})
target_link_libraries(consumer_test ${RDKAFKA_LIBRARIES})

config/kafka_config.json

{"bootstrap.servers": "kafka:9092","group.id": "my-group","auto.offset.reset": "earliest","enable.auto.commit": "false"
}

include/ConfigManager.h

#pragma once
#include <string>
#include <nlohmann/json.hpp>struct KafkaConfig {std::string brokers;std::string groupId;bool enableIdempotence = true;int batchSize = 100;int lingerMs = 5;
};class ConfigManager {
public:static KafkaConfig loadConfig(const std::string& filename);
};

include/KafkaConsumer.h

#pragma once#include <string>
#include <librdkafka/rdkafka.h>class KafkaConsumer {
public:KafkaConsumer(const std::string& configFile, const std::string& topic);~KafkaConsumer();bool poll(std::string& outMessage);private:rd_kafka_t* rk_;rd_kafka_conf_t* conf_;rd_kafka_topic_partition_list_t* topics_;
};

include/KafkaProducer.h

#pragma once
#include <string>
#include <rdkafka.h>class KafkaProducer {
public:explicit KafkaProducer(const std::string& configFile);bool send(const std::string& topic, const std::string& message);~KafkaProducer();private:rd_kafka_t* producer_ = nullptr;rd_kafka_conf_t* conf_ = nullptr;
};

include/Message.h

#pragma once
#include <string>
#include <nlohmann/json.hpp>class Message {
public:Message() = default;explicit Message(const std::string& jsonStr);std::string toJson() const;void setField(const std::string& key, const std::string& value);std::string getField(const std::string& key) const;private:nlohmann::json data_;
};

src/ConfigManager.cpp

#include "ConfigManager.h"
#include <fstream>KafkaConfig ConfigManager::loadConfig(const std::string& filename) {std::ifstream in(filename);nlohmann::json j;in >> j;KafkaConfig cfg;cfg.brokers = j["bootstrap.servers"];cfg.groupId = j["group.id"];cfg.enableIdempotence = j.value("enable_idempotence", true);cfg.batchSize = j.value("batch_size", 100);cfg.lingerMs = j.value("linger_ms", 5);return cfg;
}

src/KafkaConsumer.cpp


#include "KafkaConsumer.h"
#include <fstream>
#include <iostream>
#include <nlohmann/json.hpp>KafkaConsumer::KafkaConsumer(const std::string& configFile, const std::string& topic) {std::ifstream file(configFile);if (!file.is_open()) {std::cerr << "Failed to open config file: " << configFile << "\n";exit(1);}nlohmann::json configJson;file >> configJson;char errstr[512];conf_ = rd_kafka_conf_new();// for (auto& el : configJson.items()) {//     std::string valueStr = el.value().dump();//     if (rd_kafka_conf_set(conf_, el.key().c_str(), valueStr.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {//         std::cerr << "Config error: " << errstr << "\n";//     }// }for (auto& el : configJson.items()) {std::string valStr;if (el.value().is_string()) {valStr = el.value().get<std::string>();} else {valStr = el.value().dump();  // 数字或布尔等用dump转换成字符串}if (rd_kafka_conf_set(conf_, el.key().c_str(), valStr.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) {std::cerr << "Config error: " << errstr << "\n";}}rk_ = rd_kafka_new(RD_KAFKA_CONSUMER, conf_, errstr, sizeof(errstr));if (!rk_) {std::cerr << "Failed to create consumer: " << errstr << "\n";exit(1);}// Subscribe to topictopics_ = rd_kafka_topic_partition_list_new(1);rd_kafka_topic_partition_list_add(topics_, topic.c_str(), -1);rd_kafka_subscribe(rk_, topics_);
}KafkaConsumer::~KafkaConsumer() {rd_kafka_consumer_close(rk_);rd_kafka_destroy(rk_);rd_kafka_topic_partition_list_destroy(topics_);
}bool KafkaConsumer::poll(std::string& outMessage) {rd_kafka_message_t* msg = rd_kafka_consumer_poll(rk_, 1000);if (!msg) return false;if (msg->err) {std::cerr << "Consumer error: " << rd_kafka_message_errstr(msg) << "\n";rd_kafka_message_destroy(msg);return false;}std::cout << "Raw message hex: ";for (unsigned int i = 0; i < msg->len; ++i) {printf("%02X ", ((unsigned char*)msg->payload)[i]);}std::cout << std::endl;outMessage = std::string((char*)msg->payload, msg->len);// if (!outMessage.empty()) {//     std::cout << "Test outMessage: " << outMessage << std::endl;// }rd_kafka_message_destroy(msg);return true;
}

src/KafkaProducer.cpp

#include "KafkaProducer.h"
#include "ConfigManager.h"
#include <iostream>KafkaProducer::KafkaProducer(const std::string& configFile) {KafkaConfig cfg = ConfigManager::loadConfig(configFile);conf_ = rd_kafka_conf_new();rd_kafka_conf_set(conf_, "bootstrap.servers", cfg.brokers.c_str(), nullptr, 0);if (cfg.enableIdempotence) {rd_kafka_conf_set(conf_, "enable.idempotence", "true", nullptr, 0);}producer_ = rd_kafka_new(RD_KAFKA_PRODUCER, conf_, nullptr, 0);
}bool KafkaProducer::send(const std::string& topic, const std::string& message) {for (unsigned char c : message) {printf("%02X ", c);}printf("\n");rd_kafka_resp_err_t err = rd_kafka_producev(producer_,RD_KAFKA_V_TOPIC(topic.c_str()),RD_KAFKA_V_VALUE(const_cast<char*>(message.c_str()), message.size()),RD_KAFKA_V_END);return err == RD_KAFKA_RESP_ERR_NO_ERROR;
}KafkaProducer::~KafkaProducer() {rd_kafka_flush(producer_, 3000);rd_kafka_destroy(producer_);
}

src/Message.cpp

#include "Message.h"
#include <iostream>Message::Message(const std::string& jsonStr) {data_ = nlohmann::json::parse(jsonStr);
}std::string Message::toJson() const {if (data_.empty()) {std::cerr << "[Message::toJson] Warning: data_ is empty.\n";}return data_.dump();
}void Message::setField(const std::string& key, const std::string& value) {data_[key] = value;
}std::string Message::getField(const std::string& key) const {return data_.value(key, "");
}

test/consumer_test.cpp

#include "KafkaConsumer.h"
#include <iostream>int main() {KafkaConsumer consumer("../config/kafka_config.json", "test_topic");std::cout << "Consumer started. Waiting for messages...\n";while (true) {std::string msg;if (consumer.poll(msg)) {std::cout << "Received: " << msg << std::endl;}}return 0;
}

test/producer_test.cpp

#include "KafkaProducer.h"
#include "Message.h"
#include <iostream>int main() {Message msg;msg.setField("type", "test");msg.setField("payload", "hello");std::string jsonStr = msg.toJson();std::cout << "序列化结果: " << jsonStr << std::endl;Message parsed(jsonStr);std::cout << "解析 payload: " << parsed.getField("payload") << std::endl;//    KafkaProducer producer("config/kafka_config.json");Message msg1;msg1.setField("type", "test");msg1.setField("payload", "hello from producer");std::cout << "Send message: " << msg1.toJson() << std::endl;KafkaProducer producer("../config/kafka_config.json");// if (producer.send("test_topic", msg1.toJson())) {//     std::cout << "Message sent successfully!\n";// } else {//     std::cout << "Message send failed.\n";// }std::string testStr = R"({"type":"test","payload":"hello from producer"})";producer.send("test_topic", testStr);return 0;
}

2.3 工程压缩包

3 执行结果

当前还存在一点bug,produce的数据,consumer接收时,前面总是出现乱码,应该时序列化/反序列化导致的,但是当前还未找到原因,
等我调试好后的好消息吧!!!!

http://www.xdnf.cn/news/8899.html

相关文章:

  • 互联网大厂Java求职面试:AI与大模型应用集成中的架构难题与解决方案-2
  • Go语言方法与接收者 -《Go语言实战指南》
  • 【数据结构】哈希表的实现
  • 【无标题】前端如何实现分页?
  • 数据结构第5章 树与二叉树(竟成)
  • 数据结构-查找(1)
  • 数据结构第4章 栈、队列和数组 (竟成)
  • Oracle 的V$LOCK 视图详解
  • 二十七、面向对象底层逻辑-SpringMVC九大组件之HandlerAdapter接口设计
  • 鸿蒙仓颉开发语言实战教程:自定义tabbar
  • 2025.5.26 关于后续更新内容的通知
  • 深入解析Kafka核心参数:buffer_memory、linger_ms与batch_size的优化之道
  • 机器学习多分类逻辑回归和二分类神经网络实践
  • [运维][服务器][lightsail] Nginx反向代理实现端口映射:将80端口转发至本地5000端口
  • 【运维】OpenWrt 中禁用 ZeroTier IPv6 配置指南
  • 【后端高阶面经:缓存篇】37、高并发系统缓存性能优化:从本地到分布式的全链路设计
  • 【数据结构】--二叉树--堆(上)
  • 【C++11】特性详解
  • UE 5 C++设置物体位置和旋转,初始化虚幻引擎样条线、加载引用虚幻编辑器中的蓝图、设置虚幻编辑器中Actor大小
  • [yolov11改进系列]基于yolov11替换卷积神经网CNN为KANConv的python源码+训练源码
  • AI 集成
  • Python应用运算符初解
  • Python笔记:windows下编译python3.8.20
  • Ecography投稿细节记录
  • 【C++】string的模拟实现
  • MYSQL中的分库分表及产生的分布式问题
  • Spring AI 与 Python:AI 开发的新老势力对决与协作
  • Java核心知识点DAY03:全解析从基础到企业级开发实战
  • 线程池实战——数据库连接池
  • 工程师 - Worm Gear