当前位置: 首页 > java >正文

Windows系统下使用Kafka和Zookeeper,Python运行kafka(二)

1.配置 Zookeeper

进入解压后的 Zookeeper 目录(例如 F:\zookeeper\conf),复制 zoo_sample.cfg 文件并命名为 zoo.cfg(如果 zoo.cfg 已经存在,则直接编辑该文件)。
打开 zoo.cfg 文件,配置相关参数:

# 数据存储目录
dataDir=F:/zookeeper/data
# 日志存储目录
dataLogDir=F:/zookeeper/logs
# 客户端连接端口,默认 2181
clientPort=2181
# 初始化领导者选举和同步的配置,一般保持默认即可
initLimit=5
syncLimit=2
# 配置 Zookeeper 集群(单机模式下无需配置)
# server.1=localhost:2888:3888

确保 dataDirdataLogDir 目录存在,如果不存在则手动创建。
在这里插入图片描述
可以看到有dataDir
在这里插入图片描述
只改了这个地方
在这里插入图片描述
与此同时,创建文件夹

2.启动 Zookeeper

进入 Zookeeper 的 bin 目录(例如 F:\zookeeper\bin),打开命令提示符,执行以下命令启动 Zookeeper:

.\zkServer.cmd

如果启动成功,会看到类似以下的日志信息:
在这里插入图片描述

3.配置 Kafka 使用外部 Zookeeper

打开 Kafka 的配置文件 config/server.properties,修改 zookeeper.connect 参数,指定自己的 Zookeeper 地址和端口:

zookeeper.connect=localhost:2181

这里的 localhost:2181 是 Zookeeper 的默认地址和端口,如果你的 Zookeeper 配置了其他地址或端口,则需要相应修改。

4.启动 Kafka

进入 Kafka 的目录,执行以下命令启动 Kafka

.\bin\windows\kafka-server-start.bat .\config\server.properties

5.安装Kafka插件

在这里插入图片描述
安装完成点击这个kafka插件
在这里插入图片描述
在这里插入图片描述
如果没有topic主题是连接不了的
使用以下代码创建一个名为testtopic主题,3个分区,2个副本,前提是先启动 ZookeeperKafka

from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError# 配置 Kafka 服务器地址
bootstrap_servers = ['localhost:9092']# 创建 Kafka 管理客户端实例
admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)# 要创建的主题信息
topic_name = 'test'
num_partitions = 3  # 分区数量
replication_factor = 1  # 副本因子,调整为不超过可用 Broker 数量try:# 创建新主题topic = NewTopic(name=topic_name, num_partitions=num_partitions, replication_factor=replication_factor)admin_client.create_topics([topic])print(f"主题 {topic_name} 创建成功,分区数: {num_partitions},副本因子: {replication_factor}")
except TopicAlreadyExistsError:print(f"主题 {topic_name} 已存在,无需再次创建")
except Exception as e:print(f"创建主题 {topic_name} 时出错: {e}")
finally:# 关闭管理客户端admin_client.close()

在这里插入图片描述

有了一个test主题,就可以连接了
在这里插入图片描述

接下来发送1000条消息

for i in range(1000):message = f"message{i+1}"# 发送消息future = producer.send(topic, message)

在这里插入图片描述

在这里插入图片描述

http://www.xdnf.cn/news/4689.html

相关文章:

  • 量子密码的轻量级通信协议笔记
  • viewDesign里的table内嵌套select动态添加表格行绑定内容丢失
  • DeFi开发系统软件开发:技术架构与生态重构
  • MariaDB 与 MySQL 的关系:从同源到分道扬镳
  • 单体架构实现延时任务
  • WPF实时调试的一种实现方法
  • 聊一聊接口的压力测试如何进行的?
  • 多商户进销存一体化管理,Java+Vue,含源码与文档,高效统筹库存、销售与采购,适配多元商业场景
  • 2.4 点云数据存储格式——轻量文本型存储格式
  • 在一台服务器上通过 Nginx 配置实现不同子域名访问静态文件和后端服务
  • CTF - PWN之ORW记录
  • 全球森林数据如何分析?基于R语言森林生态系统结构、功能与稳定性分析与可视化
  • 一键设置动态域名+ipv6内网直通访问ssh服务-家庭云计算专家
  • 关于 wordpress 统计访问量初始数值错误的解决方法
  • 【Pandas】pandas DataFrame abs
  • 2025年小程序DDoS与CC攻击防御全指南:构建智能安全生态
  • typecho中的Widget设计文档
  • Vscode (Windows端)免密登录linux集群服务器
  • 搭建和优化CI/CD流水线
  • VTK|.obj文件数据处理+Jet/Viridis/CoolToWarm/Grayscale/Rainbow/风格颜色渲染
  • [逆向工程]什么是DLL注入(二十二)
  • 两种方法求解最长公共子序列问题并输出所有解
  • Ubuntu 22.04 出现 ‘Temporary failure resolving‘ 解决方案
  • 单圈精微,多圈无界——绝对值编码器如何重构工业定位的底层逻辑
  • React -> AI组件 -> 调用Ollama模型, qwen3:1.7B非常聪明
  • C++从入门到实战(十四)初识STL与STL简介
  • [ linux-系统 ] 权限管理
  • Android平台FFmpeg视频解码全流程指南
  • Hadoop MapReduce 图文代码讲解
  • C++ 复习(一)