当前位置: 首页 > ds >正文

kafka之操作示例

一、常用shell命令

#1、创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replications 1 --topic test#2、查看创建的topic
bin/kafka-topics.sh --list --zookeeper localhost:2181#3、生产者发布消息命令
(执行完此命令后在控制台输入要发送的消息,回车即可)
bin/kafka-console-producer.sh --broker-list 192.168.91.231:9092,192.168.91.231:9093,192.168.91.231:9094 --topic test#4、消费者接受消息命令
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning#5、kafka启动
首先启动zookeeper zkServer.sh start(相当于一个server,kafka会连接这个server)
bin/kafka-server-start.sh config/server.properties # 启动kafka#6、查看kafka节点数目
在zookeeper中查看,登录客户端bin/zkCli.sh 执行ls /brokers/ids 查看节点数目及节点ID,[0,1,2]#7、kafka中的概念
生产者 Producer、代理Broker、消费者Consumer、主题Topic、分区 Partition、消费者组 Consumer Group#8、查看主颗信息
bin/kafka-topics.sh --zookeeper 192.168.91.231:2181 [加其他选项]eg:
bin/kafka-topics.sh --zookeeper 192.168.91.231:2181 --describe
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test#9、为主题创建分区
一共创建八个分区,编号分别为0~7
bin/kafka-topics.sh --zookeeper 192.168.91.231:2181 --alter -partitions 8 -topic test#10、查看kafka进程
ps -eflgrep server.properties
ps -eflgrep server-1.properties
ps -eflgrep server-2.properties#11、kafka宕机重启后,消息不会丢失#12、kafka其中一个broker宕机后,对消费者和生产者影响很小(命令行下测试)
消费者会尝试连接,连接不到,返回java.net.ConnectException:Connection refused异常 生产者可能会在发送消息的时候报异常,但会很快连接到其他broker,继续正常使用#13.查看kafka消息队列的积压情况
bin/kafka-consumer-groups.sh --zookeeper 192.168.91.231:2181 --describe --group console-consumer-37289#14.kafka 中查看所有的group列表信息
bin/kafka-consumer-groups.sh --zookeeper 192.168.91.231:2181 --list

二、python操作kafka

本地安装与启动(基于Docker)

#1、下载zookeeper镜像与kafka镜像:
docker pull registry.cn-shanghai.aliyuncs.com/egon-k8s-test/kafka-zookeeper:3.4.6
docker pull registry.cn-shanghai.aliyuncs.com/egon-k8s-test/wurstmeister-kafka:2.13-2.8.1

#2、本地启动zookeeper
docker run -d --name zookeeper -p 2181:2181 -t registry.cn-shanghai.aliyuncs.com/egon-k8s-test/kafka-zookeeper:3.4.6

#3、本地启动kafka(注意下述代码,将kafka启动在9092端口)
docker run -d --name kafka --publish 9092:9092 --link zookeeper \
--enV KAFKA ZO0KEEPER CONNECT=zookeeper:2181 \
--enV KAFKA ADVERTISED HOST NAME=192.168.71.113 \
--enV KAFKA ADVERTISED PORT=9092 \
registry.cn-shanghai.aliyuncs.com/egon-k8s-test/wurstmeister-kafka:2.13-2.8.1

上面写的localhost没有影响,查看端口如下
# netstat -tuanlp | grep 9092
tcp 0 0 0.0.0.0:9092 0.0.0.0:*LISTEN 102483/docker-proxy
tcp6 00:::9092 :::* LISTEN 102487/docker-proxy

#4、进入kafka bash
docker exec it kafka bash
cd /opt/kafka/bin

#5、创建Topic,分区为2,Topic name为'kafka_demo'
kafka-topics.sh --create --zookeeper zookeeper:2181 \
--replication-factor 1 --partitions 2 --topic kafka_demo

kafka-topics.sh --create --zookeeper zookeeper:2181 \
--replication-factor 1 --partitions 2 --topic egon

数据存在哪里
[root@web02 ~]# docker exec -it kafka bash
bash-5.1#
bash-5.1#
bash-5.1#
bash-5.1# ls /kafka/
kafka-logs-f33383f9c414
bash-5.1#
bash-5.1#
bash-5.1#
bash-5.1# 1s /kafka/kafka-logs-f33383f9c414/
kafka_demo-0 kafka_demo-1
egon-0 egon-1
.........
bash-5.1#
bash-5.1#
bash-5.1#
bash-5.1# ls /kafka/kafka-logs-f33383f9c414/egon-0
00000000000000000000.index0000000000000000000.timeindex
00000000800000080000.1og leader-epoch-checkpoint

#6、查看当前所有topic
kafka-topics.sh --zookeeper zookeeper:2181 --list

#7、命令行操作
$docker exec -ti kafka sh
/ # cd /opt/kafka/bin
/ # kafka-console-producer.sh --bootstrap-server 192.168.71.113:9092 --topic test_topic
然后一行行输入,回车即发送一条消息
>111
>222
>333

另外一个终端
$ docker exec -ti kafka sh
/ # cd /opt/kafka/bin
/ # kafka-console-consumer.sh --bootstrap-server 192.168.71.113:9092 --topic test_topic --from-beginning可以收到消息
111
222
333

#8、安装kafka-python
pip install kafka-python

代码示例:

# pip3 install kafka-python  # 版本是2.0.2
from kafka import KafkaProducer, KafkaConsumer
import json
import threading
import time# Kafka broker address
bootstrap_servers = '192.168.71.113:9092'# Topic name
topic = 'test_topic'# Producer function
def kafka_producer():producer = KafkaProducer(bootstrap_servers=bootstrap_servers,value_serializer=lambda v: json.dumps(v).encode('utf-8'))try:for i in range(10):message = {'message': f'Hello Kafka! Message {i}'}producer.send(topic, value=message)print(f"Sent: {message}")time.sleep(1)else:print("发送完成")except Exception as ex:print(f"Exception occurred: {ex}")finally:producer.close()# Consumer function
def kafka_consumer():consumer = KafkaConsumer(topic,bootstrap_servers=bootstrap_servers,auto_offset_reset='earliest',consumer_timeout_ms=5000)  # 设置超时时间为1秒try:for message in consumer:print(f"Received: {message.value}")else:print("消费完毕,等5000毫秒超时即可结束,执行finally内的代码")except Exception as ex:print(f"Exception occurred: {ex}")finally:print("消费者结束")consumer.close()# Create threads for producer and consumer
producer_thread = threading.Thread(target=kafka_producer)
consumer_thread = threading.Thread(target=kafka_consumer)# Start both threads
producer_thread.start()
consumer_thread.start()# Wait for threads to complete
producer_thread.join()
consumer_thread.join()print("Kafka producer and consumer threads have finished.")

执行结果:

                  

http://www.xdnf.cn/news/8707.html

相关文章:

  • 大文件上传,对接阿里oss采用前端分片技术。完成对应需求!
  • 【MySQL】第7节|Mysql锁机制与优化实践以及MVCC底层原理剖析
  • ubuntu 安装latex
  • 清除 Ubuntu 磁盘空间
  • 安卓开发用到的设计模式(2)结构型模式
  • 开发者工具箱-鸿蒙金额转换开发笔记
  • R语言学习--Day08--bootstrap原理及误区
  • Ollama01-安装教程
  • 【MySQL】07.表内容的操作
  • Android 16系统源码_自由窗口(一)触发自由窗口模式
  • Gateway全局过滤器:接口耗时统计与黑白名单配置
  • R语言科研编程-柱状图
  • STM32 定时器输出比较深度解析:从原理到电机控制应用 (详解)
  • 黑马点评双拦截器和Threadlocal实现原理
  • 行列式的线性性质(仅限于单一行的加法拆分)
  • 电机控制储备知识学习(五) 三项直流无刷电机(BLDC)学习(四)
  • 思科硬件笔试面试题型解析
  • 7:OpenCV—图像形态学处理
  • 深度学习实战:从图像分类到文本生成的完整案例解析
  • DAY 35 模型可视化与推理
  • 力扣面试150题--求根节点到叶节点数字之和
  • 如何屏蔽mac电脑更新提醒,禁止系统更新(最新有效方法)
  • 5060显卡驱动PyCUDA开发环境搭建
  • 25. 日志装饰器的开发
  • 使用 Go 语言实现完整且轻量级高性能的 MQTT Broker
  • Vue3 Composition API: 企业级应用最佳实践方案
  • SDL2常用函数:SDL_Texture 数据结构及使用介绍
  • 微信小程序数据接收
  • 数据结构---二叉树
  • 基于python的机器学习(九)—— 评估算法(二)