当前位置: 首页 > news >正文

go 使用rabbitMQ

为了简单,我们使用docker 容器开启rabbitmq作为服务

1 安装centos docker

1. 卸载旧版本(如有)

sudo yum remove -y docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-engine

2. 安装依赖包

sudo yum install -y yum-utils \device-mapper-persistent-data \lvm2

3. 添加 Docker 官方 YUM 仓库

sudo yum-config-manager \--add-repo \https://download.docker.com/linux/centos/docker-ce.repo

如果你访问国外镜像很慢,可以使用国内镜像,例如阿里

  1. 修改 Docker 仓库地址为腾讯镜像
sudo sed -i 's+https://download.docker.com+https://mirrors.tencent.com/docker-ce+' /etc/yum.repos.d/docker-ce.repo
  1. 验证是否替换成功
cat /etc/yum.repos.d/docker-ce.repo```
  1. 清理缓存并重新安装
# 清理旧缓存
sudo yum clean all# 生成新的元数据缓存(缓存所有的yum元数据,时间很长,很慢)
sudo yum makecache# 缓存指定的yum源,推荐
# --disablerepo=*:禁用所有仓库
# --enablerepo=docker-ce-stable:只启用 docker-ce-stable
# --skip-unavailable:跳过无法访问的仓库(避免报错)
sudo  yum makecache --enablerepo=docker-ce-stable --disablerepo=* --skip-unavailable# 安装 Docker
sudo yum install -y docker-ce docker-ce-cli containerd.io

4. 安装 Docker Engine

sudo yum install -y docker-ce docker-ce-cli containerd.io

5. 启动并启用 Docker 服务

# 启动 Docker
sudo systemctl start docker# 设置开机自启
sudo systemctl enable docker

6. 验证安装

sudo docker --version

运行docker rabbitmq

设置国内镜像

需要国内镜像
修改 Docker 配置文件(推荐)
很多镜像无法使用,的用心找下,不然也无法下载镜像

# 1. 创建或编辑配置文件
sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{"registry-mirrors": ["https://docker-0.unsee.tech"]
}
EOF
# 重新加载配置
sudo systemctl daemon-reload# 重启 Docker
sudo systemctl restart docker# 查看是否生效
docker info | grep -A 2 "Registry Mirrors"

运行rabbitMQ

如果执行不成功,无法下载,则可能你没有公共的DNS,添加
两种方式
方式1,vi打开 /etc/resolv.conf

search lan
nameserver 192.168.111.1
nameserver 8.8.8.8 
nameserver 114.114.114.114

方式2

# 编辑 resolv.conf,添加公共 DNS
sudo tee /etc/resolv.conf <<-'EOF'
nameserver 8.8.8.8
nameserver 114.114.114.114
options timeout:1
EOF
docker run -d \--name rabbitmq \--hostname rabbitmq \-p 5672:5672 \-p 5671:5671 \-p 15672:15672 \-p 15671:15671 \-p 15692:15692 \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=SecurePass123! \-e RABBITMQ_DEFAULT_VHOST=my_vhost \--mount source=rabbitmq_data,target=/var/lib/rabbitmq \--mount source=rabbitmq_log,target=/var/log/rabbitmq \--restart=unless-stopped \rabbitmq:3.13-management

启动成功查看
在这里插入图片描述
访问

http://192.168.126.3:15672/#/
admin
SecurePass123!

go代码实现

五种消息模型

简单队列:点对点

‌特点‌:一个生产者发送消息到队列,仅有一个消费者接收并处理消息。 ‌
‌适用场景‌:一对一的简单通信场景。

下面的代码只有一个生产者,一个消费者

package mainimport ("log""strconv""sync""time""github.com/streadway/amqp"
)func main() {// 1. 连接 RabbitMQconn, err := amqp.Dial("amqp://admin:SecurePass123!@192.168.126.3:5672/my_vhost")if err != nil {log.Fatalf("Failed to connect to RabbitMQ: %v", err)}defer conn.Close()// 2. 创建 channelch, err := conn.Channel()if err != nil {log.Fatalf("Failed to open a channel: %v", err)}defer ch.Close()// 3. 声明队列(确保队列存在)queue, err := ch.QueueDeclare("Q1",  // 队列名true,  // 持久化:重启后队列仍存在false, // 自动删除:当最后一个消费者断开时,不自动删除false, // 排他性:非排他,其他连接也可使用false, // 阻塞:不阻塞nil,   // 额外参数)if err != nil {log.Fatalf("Failed to declare a queue: %v", err)}// 4. 启动多个消费者(使用 WaitGroup 等待)var wg sync.WaitGroupwg.Add(1) // 启动 1 个消费者go func() {defer wg.Done()consume("consumer-1", conn) // 每个消费者使用自己的 channel}()// 5. 生产者:发送消息go func() {i := 0for {str := "Hello World" + strconv.Itoa(i)err := ch.Publish("",         // 交换机:默认交换机queue.Name, // 路由键:队列名false,      // mandatory:如果没有匹配的队列,不返回消息false,      // immediate:不立即投递amqp.Publishing{ContentType: "text/plain",Body:        []byte(str),},)if err != nil {log.Printf("Failed to publish a message: %v", err)return}i++time.Sleep(200 * time.Millisecond)}}()// 6. 阻塞主函数,等待消费者完成(实际上消费者会一直运行)wg.Wait() // 这里不会退出,除非消费者退出
}// consume 消费者函数
func consume(consumerName string, conn *amqp.Connection) {// 每个消费者使用独立的 channelch, err := conn.Channel()if err != nil {log.Printf("[%s] Failed to open channel: %v", consumerName, err)return}defer ch.Close()// 消费消息msgs, err := ch.Consume("Q1",         // 队列名consumerName, // 消费者标签(唯一标识)true,         // autoAck:自动确认(消息处理完后自动从队列删除)false,        // exclusive:非排他false,        // noLocal:不接收本连接发送的消息(AMQP 未强制支持)false,        // noWait:不等待服务器确认nil,          // args)if err != nil {log.Printf("[%s] Failed to consume: %v", consumerName, err)return}// 持续接收消息for msg := range msgs {// 使用锁或 log 打印,避免并发输出乱序log.Printf("[%s] Received: %s", consumerName, msg.Body)// 模拟处理时间time.Sleep(100 * time.Millisecond)}log.Printf("[%s] Consumer stopped.", consumerName)
}

工作队列模式

‌特点‌:多个消费者共享同一队列,消息按轮询(默认)或公平分配(能者多劳)机制处理。 ‌
‌适用场景‌:任务分发与负载均衡,如分布式任务处理

仅仅是在简单队列基础上,增加了一个消费队列,两个队列轮流消费

	go func() {defer wg.Done()consume("consumer-1", conn) // 每个消费者使用自己的 channel}()go func() {defer wg.Done()consume("consumer-2", conn)}()

完整版

package mainimport ("log""strconv""sync""time""github.com/streadway/amqp"
)func main() {// 1. 连接 RabbitMQconn, err := amqp.Dial("amqp://admin:SecurePass123!@192.168.126.3:5672/my_vhost")if err != nil {log.Fatalf("Failed to connect to RabbitMQ: %v", err)}defer conn.Close()// 2. 创建 channelch, err := conn.Channel()if err != nil {log.Fatalf("Failed to open a channel: %v", err)}defer ch.Close()// 3. 声明队列(确保队列存在)queue, err := ch.QueueDeclare("Q1",  // 队列名true,  // 持久化:重启后队列仍存在false, // 自动删除:当最后一个消费者断开时,不自动删除false, // 排他性:非排他,其他连接也可使用false, // 阻塞:不阻塞nil,   // 额外参数)if err != nil {log.Fatalf("Failed to declare a queue: %v", err)}// 4. 启动多个消费者(使用 WaitGroup 等待)var wg sync.WaitGroupwg.Add(2) // 启动 2 个消费者go func() {defer wg.Done()consume("consumer-1", conn) // 每个消费者使用自己的 channel}()go func() {defer wg.Done()consume("consumer-2", conn)}()// 5. 生产者:发送消息go func() {i := 0for {str := "Hello World" + strconv.Itoa(i)err := ch.Publish("",         // 交换机:默认交换机queue.Name, // 路由键:队列名false,      // mandatory:如果没有匹配的队列,不返回消息false,      // immediate:不立即投递amqp.Publishing{ContentType: "text/plain",Body:        []byte(str),},)if err != nil {log.Printf("Failed to publish a message: %v", err)return}i++time.Sleep(200 * time.Millisecond)}}()// 6. 阻塞主函数,等待消费者完成(实际上消费者会一直运行)wg.Wait() // 这里不会退出,除非消费者退出
}// consume 消费者函数
func consume(consumerName string, conn *amqp.Connection) {// 每个消费者使用独立的 channelch, err := conn.Channel()if err != nil {log.Printf("[%s] Failed to open channel: %v", consumerName, err)return}defer ch.Close()// 消费消息msgs, err := ch.Consume("Q1",         // 队列名consumerName, // 消费者标签(唯一标识)true,         // autoAck:自动确认(消息处理完后自动从队列删除)false,        // exclusive:非排他false,        // noLocal:不接收本连接发送的消息(AMQP 未强制支持)false,        // noWait:不等待服务器确认nil,          // args)if err != nil {log.Printf("[%s] Failed to consume: %v", consumerName, err)return}// 持续接收消息for msg := range msgs {// 使用锁或 log 打印,避免并发输出乱序log.Printf("[%s] Received: %s", consumerName, msg.Body)// 模拟处理时间time.Sleep(100 * time.Millisecond)}log.Printf("[%s] Consumer stopped.", consumerName)
}

发布/订阅模式

‌特点‌:生产者通过交换机将消息广播至所有绑定该交换机的队列,每个队列对应一个消费者。 ‌
‌适用场景‌:消息多副本分发,如日志同步。

fanout 模式

package mainimport ("log""strconv""sync""time""github.com/streadway/amqp"
)const exchangeName = "go-exchange"func main() {// 1. 连接 RabbitMQconn, err := amqp.Dial("amqp://admin:SecurePass123!@192.168.126.3:5672/my_vhost")if err != nil {log.Fatalf("Failed to connect to RabbitMQ: %v", err)}defer conn.Close()// 2. 创建 channelch, err := conn.Channel()if err != nil {log.Fatalf("Failed to open a channel: %v", err)}defer ch.Close()err = ch.ExchangeDeclare(exchangeName, // 交换机名:默认交换机"fanout",     // 类型:直连 direct,  topic , headers, fanouttrue,         // 持久化:重启后交换机仍存在false,        // 自动删除:当最后一个绑定断开时,不自动删除false,        // 内部:不用于客户端应用false,        // 无等待:不等待服务器确认nil,          // 额外参数)if err != nil {log.Fatalf("Failed to declare an exchange: %v", err)}// 4.var wg sync.WaitGroupwg.Add(2) // 启动 2 个消费者go func() {defer wg.Done()subscribe(conn, exchangeName) // 每个消费者使用自己的 channel}()go func() {defer wg.Done()subscribe(conn, exchangeName) // 每个消费者使用自己的 channel}()// 5. 生产者:发送消息go func() {i := 0for {str := "Hello World" + strconv.Itoa(i)err := ch.Publish(exchangeName, // 交换机:默认交换机"",           //  routing key:路由键,fanout 类型不需要false,        // mandatory:如果没有匹配的队列,不返回消息false,        // immediate:不立即投递amqp.Publishing{ContentType: "text/plain",Body:        []byte(str),},)if err != nil {log.Printf("Failed to publish a message: %v", err)return}i++time.Sleep(200 * time.Millisecond)}}()// 6. 阻塞主函数,等待消费者完成(实际上消费者会一直运行)wg.Wait() // 这里不会退出,除非消费者退出
}func subscribe(conn *amqp.Connection, exchangeName string) {// 每个消费者使用独立的 channelch, err := conn.Channel()if err != nil {log.Printf("Failed to open channel: %v", err)return}defer ch.Close()// 声明队列(确保队列存在)queue, err := ch.QueueDeclare("",    //  队列名false, // 持久化:重启后队列仍存在true,  // 自动删除:当最后一个消费者断开时,不自动删除true,  // 排他性:非排他,其他连接也可使用false, // 阻塞:不阻塞nil,   // 额外参数)if err != nil {log.Printf("Failed to declare a queue: %v", err)return}// 绑定队列到交换机err = ch.QueueBind(queue.Name,   // 队列名"",           // 路由键:fanout 类型不需要exchangeName, // 交换机名false,        // 不等待服务器确认nil,          // 额外参数)defer ch.QueueDelete(queue.Name, false, false, false)//调用消费者consume("consumer-haha", conn, queue.Name)}// consume 消费者函数
func consume(consumerName string, conn *amqp.Connection, queueName string) {// 每个消费者使用独立的 channelch, err := conn.Channel()if err != nil {log.Printf("[%s] Failed to open channel: %v", consumerName, err)return}defer ch.Close()// 消费消息msgs, err := ch.Consume(queueName,    // 队列名consumerName, // 消费者标签(唯一标识)true,         // autoAck:自动确认(消息处理完后自动从队列删除)false,        // exclusive:非排他false,        // noLocal:不接收本连接发送的消息(AMQP 未强制支持)false,        // noWait:不等待服务器确认nil,          // args)if err != nil {log.Printf("[%s] Failed to consume: %v", consumerName, err)return}// 持续接收消息for msg := range msgs {// 使用锁或 log 打印,避免并发输出乱序log.Printf("[%s] Received: %s", consumerName, msg.Body)// 模拟处理时间time.Sleep(100 * time.Millisecond)}log.Printf("[%s] Consumer stopped.", consumerName)
}

每次获取两条消息,这是因为有两个subscribe()携程在收消息
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

http://www.xdnf.cn/news/1389043.html

相关文章:

  • 【谷歌浏览器】浏览器实用自用版——谷歌浏览器(Google Chrome)离线纯净版安装 官方版无任何捆绑及广告 【离线安装谷歌浏览器】
  • 通过 KafkaMQ 接入Skywalking 数据最佳实践
  • R ggplot2学习Nature子刊一张图,换数据即可用!
  • leetcode 338 比特位计数
  • 04数据库约束实战:从入门到精通
  • Linux下的网络编程SQLITE3详解
  • 算法题打卡力扣第1004. 最大连续1的个数 III(mid)
  • 技术速递|新手指南:如何在 Foundry Local 中使用自定义模型
  • 百度后端岗位--面试真题分析
  • CCS的诡异报错合集1(以C2000为例)
  • MAC spotlight 搜不到应用程序和 tags 生效
  • ZooKeeper 安装配置
  • C++基础(②VS2022创建项目)
  • 球型摄像机实现360°无死角
  • CLion 中配置运行 Qt 项目指南
  • 三一重工AI预测性维护破局:非计划停机减少60%,技师转型与数字孪生技术搅动制造业
  • 预制菜餐厅:工业化与温度餐平衡术
  • 【Rust】 5. Trait 与运算符重载
  • Python Imaging Library (PIL) 全面指南:PIL高级图像处理-分割与颜色空间转换
  • [Mysql数据库] 知识点总结6
  • 人工智能-python-深度学习-批量标准化与模型保存加载详解
  • 嵌入式-定时器的从模式控制器、PWM参数测量实验-Day24
  • 快手发布SeamlessFlow框架:完全解耦Trainer与Agent,时空复用实现无空泡的工业级RL训练!
  • OpenTenBase实战:从MySQL迁移到分布式HTAP的那些坑与收获
  • MySQL數據庫開發教學(三) 子查詢、基礎SQL注入
  • java开发连接websocket接口
  • system论文阅读--HPCA25
  • 基于SpringBoot和百度人脸识别API开发的保安门禁系统
  • LubanCat-RK3568 UART串口通信,以及遇到bug笔记
  • 实时音视频延迟优化指南:从原理到实践