当前位置: 首页 > ds >正文

C语言操作Kafka

Kafka服务

Kafka的快速入门 文档很详细,基本上几步就可以搭建一个Kafka测试环境。

下载Kafka的二进制包,然后解压。

wget https://www.apache.org/dyn/closer.cgi?path=/kafka/4.0.0/kafka_2.13-4.0.0.tgz
tar -xzf kafka_2.13-4.0.0.tgz
cd kafka_2.13-4.0.0

生成集群ID,使用集群ID格式化存储

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties

启动Kafka Server。

$ bin/kafka-server-start.sh config/server.properties

启动之后,默认的Kafka Server会连续输出日志到控制台。后面的测试命令,需要在另外的终端执行。

创建主题

bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

测试生产

bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>This is my first event
>This is my second event

测试消费

bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

C语言的Kafka库:librdkafka

在C语言中操作Kafka,有不少库可以选择,其中就有librdkafka。

根据librdkafka在gitthub上的信息,它主要是由C语言开发,但是也提供了C++的支持。

当我们在Linux平台上安装librdkafka之后,通过pkg-config命令加库名rdkafka,可以获取编译相关的信息。

如在Fedora 42上:

pkg-config --cflags --libs rdkafka  
-DWITH_GZFILEOP -lrdkafka

需要注意的是,rdkafka.pc中记录的头文件路径是/usr/include,但是它的头文件都位于/usr/include/librdkafka。在C语言的源代码中需要写成:

#include <librdkafka/rdkafka.h>

在C++中则需要写成:

#include <librdkafka/rdkafkacpp.h>

但是C++的API与C语言的类似,后续不再提及C++的相关API,仅以C语言举例。

rdkafka API

rd_kafka_t

API的核心是结构rd_kafka_t

它的创建和销毁是一对函数:

rd_kafka_t *rd_kafka_new(rd_kafka_type_t type,  rd_kafka_conf_t *conf,  char *errstr,  size_t errstr_size);  void rd_kafka_destroy(rd_kafka_t *rk);

其中,rd_kafka_type_t是一个enum,分别为RD_KAFKA_PRODUCERRD_KAFKA_CONSUMER,即生产者与消费者。

rd_kafka_conf_t

rd_kafka_conf_t是另一个结构,创建与销毁函数为:

rd_kafka_conf_t *rd_kafka_conf_new(void);void rd_kafka_conf_destroy(rd_kafka_conf_t *conf);

rd_kafka_conf_t创建之后,使用如下函数设置参数:

rd_kafka_conf_res_t rd_kafka_conf_set(rd_kafka_conf_t *conf,  const char *name,  const char *value,  char *errstr,  size_t errstr_size);

如:bootstrap.serverssasl.usernamesasl.password等。

rd_kafka_conf_t设置相关的函数返回值都是rd_kafka_conf_res_t。如果成功,值为RD_KAFKA_CONF_OK,即0。

如果要进行更详细的控制,还可以使用其它一些函数。

如:

void rd_kafka_conf_set_dr_msg_cb(  rd_kafka_conf_t *conf,  void (*dr_msg_cb)(rd_kafka_t *rk,  const rd_kafka_message_t *rkmessage,  void *opaque));

可以设置消息被处理之后的回调函数。其中,rkmessage是被处理的消息,opaque是使用rd_kafka_conf_set_opaque()设置的用户层指针。

证书设置

rd_kafka_conf_set_ssl_cert用于设置证书。

rd_kafka_conf_res_t  
rd_kafka_conf_set_ssl_cert(rd_kafka_conf_t *conf,  rd_kafka_cert_type_t cert_type,  rd_kafka_cert_enc_t cert_enc,  const void *buffer,  size_t size,  char *errstr,  size_t errstr_size);

其中,cert_type的值分别为:RD_KAFKA_CERT_PUBLIC_KEYRD_KAFKA_CERT_PRIVATE_KEYRD_KAFKA_CERT_CA,即公钥、私钥与CA。

cert_enc的值分别为:RD_KAFKA_CERT_ENC_PKCS12RD_KAFKA_CERT_ENC_DERRD_KAFKA_CERT_ENC_PEM,即分别是三种证书格式。

另外还可以设置证书验证的回调函数:

rd_kafka_conf_res_t rd_kafka_conf_set_ssl_cert_verify_cb(  rd_kafka_conf_t *conf,  int (*ssl_cert_verify_cb)(rd_kafka_t *rk,  const char *broker_name,  int32_t broker_id,  int *x509_error,  int depth,  const char *buf,  size_t size,  char *errstr,  size_t errstr_size,  void *opaque));

如果成功,回调函数必须返回1。否则需要设置errstr为错误原因,并且返回0。

rd_kafka_message_t

处理Kafka另外一个重要的结构是rd_kafka_message_t,即消息。它的定义为:

typedef struct rd_kafka_message_s {  rd_kafka_resp_err_t err;rd_kafka_topic_t *rkt; int32_t partition;void *payload;size_t len;                        void *key;size_t key_len;      int64_t offset;void *_private; 
} rd_kafka_message_t;

最开头的err非常重要。每次使用rd_kafka_consume*族函数取得一条消息,以及在生产一条消息的回调函数中时,都要检查这个值,确定是否成功。

生产者

使用rd_kafka_t生产一条消息的函数为:

int rd_kafka_produce(rd_kafka_topic_t *rkt,  int32_t partition,  int msgflags,  void *payload,  size_t len,  const void *key,  size_t keylen,  void *msg_opaque);

成功返回0。

生成一批消息的函数为:

int rd_kafka_produce_batch(rd_kafka_topic_t *rkt,  int32_t partition,  int msgflags,  rd_kafka_message_t *rkmessages,  int message_cnt);

返回值为成功的消息数目。

等待所有消息处理完成的函数为:

rd_kafka_resp_err_t rd_kafka_flush(rd_kafka_t *rk, int timeout_ms);

或者放弃没有发送的消息:

rd_kafka_resp_err_t rd_kafka_purge(rd_kafka_t *rk, int purge_flags);

消费者

控制Kafka消费的函数有一对:

int rd_kafka_consume_start(rd_kafka_topic_t *rkt,  int32_t partition,  int64_t offset);int rd_kafka_consume_stop(rd_kafka_topic_t *rkt, int32_t partition);

注意这两个函数的第一个参数是rd_kafka_topic_t

调用rd_kafka_consume_start()之后,kafka将开始把成批的消息放入本地的队列中,应用需要使用rd_kafka_consume()函数来消费。

rd_kafka_message_t *  
rd_kafka_consume(rd_kafka_topic_t *rkt, int32_t partition, int timeout_ms);

这个函数在timeout_ms毫秒以内,返回一条消息,或者NULL。返回的消息,必须使用rd_kafka_message_destroy()释放。

类似生产者,消费者也有批量处理的函数:

ssize_t rd_kafka_consume_batch(rd_kafka_topic_t *rkt,  int32_t partition,  int timeout_ms,  rd_kafka_message_t **rkmessages,  size_t rkmessages_size);
http://www.xdnf.cn/news/10177.html

相关文章:

  • 3DMAX+Photoshop教程:将树木和人物添加到户外建筑场景中的方法
  • java对接bacnet ip协议(跨网段方式)
  • Kotlin-特殊类型
  • 安卓逆向篇JEB 反编译断点动态调试加密算法还原逻辑会员绕过
  • 亚马逊商品评论爬取与情感分析:Python+BeautifulSoup实战(含防封策略)
  • InnoDB引擎逻辑存储结构及架构
  • 【Netty系列】Reactor 模式 2
  • 奇异值分解(SVD):线性代数在AI大模型中的核心工具
  • 使用原生前端技术封装一个组件
  • 面试题 08.08. 有重复字符串的排列组合【 力扣(LeetCode) 】
  • Smith圆图知识学习笔记
  • Linux 文件 IO 性能监控与分析指南
  • QEMU/KVM课程大纲暨学习路线(1)
  • 榕壹云医疗服务系统:基于ThinkPHP+MySQL+UniApp的多门店医疗预约小程序解决方案
  • 算法打卡第11天
  • BKP(备份寄存器)和 RTC(实时时钟)
  • 飞牛fnNAS的Docker应用之迅雷篇
  • leetcode538.把二叉搜索树转换为累加树:反向中序遍历的数值累加之道
  • 半导体厂房设计建造流程、方案和技术要点-江苏泊苏系统集成有限公司
  • 跨平台浏览器集成库JxBrowser 支持 Chrome 扩展程序,高效赋能 Java 桌面应用
  • Apache SeaTunnel 引擎深度解析:原理、技术与高效实践
  • 【Linux 基础知识系列】第四篇-用户与权限管理
  • c/c++的opencv霍夫变换
  • 阻止H5页面中键盘收起的问题
  • CTFSHOW Pwn94 WP
  • [原创](Windows使用技巧): Windwos11如何设置局域网共享访问? (多图详解)
  • 在Linux上安装Docker并配置镜像加速器:从入门到实战
  • PostgreSQL 临时表空间
  • AWS API Gateway 配置WAF(中国区)
  • 《智慧医疗分级评价方法及标准(2025版)》征求意见函全面解读:人工智能医疗应用的评价体系与指南方向