当前位置: 首页 > news >正文

快速掌握 GO 之 RabbitMQ

更多个人笔记见:
github个人笔记仓库
gitee 个人笔记仓库
个人学习,学习过程中还会不断补充~ (后续会更新在github和 gitee上)

文章目录

    • 作用
    • 经典例子
        • 生产者(发送端)
        • 消费者(接收端)

作用

类似一个“中间过渡器”,应对突发流量导致数据库连接池耗尽或者请求导致服务崩溃

  • 流量洪峰​​:促销活动时,前置 Nginx 将请求写入 RabbitMQ,后端服务按能力消费
  • 容灾恢复​​:数据库故障期间,消息持久化在队列;恢复后继续消费 (消费指的是 Mysql 取出数据然后存起来)
  • 将任务分发到多个消费者实例,确保高负载下任务均匀分配。这就可以实现负载均衡 (比如多个 worker 处理帖子审核)

需要考虑如果用户的申请不是很多情况下,多引入一层 RabbitMQ 其实会导致实际的速度变慢(毕竟多加了一层)

经典例子

GO 语言相关库:go get -u github.com/streadway/amqp

docker 快速部署 rabbitMQ:docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

  • 5672:AMQP 端口
  • 15672:管理界面端口,访问 http://localhost:15672 ( 默认用户/密码:guest/guest)
生产者(发送端)

创建 producer文件夹下创建producer.go ,然后单独 go run(同时 go run 后面的消费者记得)

package mainimport ("fmt""log""time""github.com/streadway/amqp"
)// 统一错误输出
func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接 RabbitMQconn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close() //关闭连接ch, err := conn.Channel() //建立通道,通过 conn 建立的,可以调用 amqp 中的函数failOnError(err, "Failed to open a channel")defer ch.Close()// 声明队列q, err := ch.QueueDeclare("post_queue", // 指定创建或引用的队列名称false,        // 持久化  false 表示队列不会持久化到磁盘,重启 RabbitMQ 后会丢失。true 的话重启后就还在false,        // 自动删除   设置为 false 表示队列不会自动删除,如果 true,最后一个消费者断开后队列删除false,        // 独占   设置为 true 表示该队列只供一个消费者使用,当连接关闭后,队列会自动删除。false表示队列可以被多个连接使用false,        // 无等待  false 表示需要服务器确认队列创建,true表示客户端不会等待服务器的确认响应,如果操作失败也不会收到错误通知nil,          // 额外参数  额外参数可以用来设置队列的特殊属性,如消息TTL、队列最大长度、死信队列等)failOnError(err, "Failed to declare a queue")// 设置定时器,每5秒发送一次消息ticker := time.NewTicker(1 * time.Second)defer ticker.Stop()// 创建一个函数用于发送消息,这样循环调用函数就是发送多次消息sendMessage := func(msgContent string) {err = ch.Publish("",     // 交换机名称   这里是默认交换机,能够将消息直接路由到与路由键同名的队列q.Name, // 路由键   也就是队列名称,路由键应该与目标队列名称一致,消息才能被正确路由false,  // mandatory标志  false 表示消息无法路由到队列,则消息会被丢弃  如果是 true 就是当消息不能路由到队列时,RabbitMQ会返回一个Basic.Return命令给生产者false,  // immediate 标志   false 表示如果队列中没有消费者,消息会被存入队列等待消费, true表示当没有消费者能够立即消费该消息时,消息不会入队而是被丢弃amqp.Publishing{ //消息内容和性质ContentType: "text/plain",       //制定为 MIME 类型Body:        []byte(msgContent), //转换为字节类型})if err != nil {log.Printf("Failed to publish a message: %s", err)return}log.Printf(" [x] Sent %s", msgContent)}count := 1log.Println("Starting periodic message sending. Press Ctrl+C to exit.")// 等待定时器触发,定期发送消息for range ticker.C {sendMessage(fmt.Sprintf("Hello, RabbitMQ! Message #%d", count))count++}
}
  • 这里我将函数设置为每间隔 1s 就发送消息,同时记录数据
  • 如果运行后,隔一段时间再启动消费者,或者说运行中途关闭消费者,过一段时间再启动消费者,会发现中间发出的信号也会打印出来,这说明实际上是有存储在 RabbitMQ 中的(运行的时候,关闭后存储就需要看上面的设置了)
消费者(接收端)

consumer 文件夹下创建 consumer.go 然后单独一个终端 go run

package mainimport ("log""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {//建立连接conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()//连接 channelch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()q, err := ch.QueueDeclare("post_queue", false, false, false, false, nil)failOnError(err, "Failed to declare a queue")msgs, err := ch.Consume(q.Name, // 队列"",     // 消费者标签true,   // 自动确认false,  // 独占false,  // 无本地false,  // 无等待nil,    // 额外参数)failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf("Received: %s", d.Body)}}()log.Printf(" [*] Waiting for messages. To exit press CTRL+C")<-forever
}
http://www.xdnf.cn/news/746425.html

相关文章:

  • 嵌入式编译工具链熟悉与游戏移植
  • Python训练第四十天
  • Jmeter requests
  • LLMs之Tool:Workflow Use的简介、特点、安装和使用方法、以及案例应用
  • c++ typeid运算符
  • 如何打包conda环境从一台电脑到另外一台电脑
  • 电力高空作业安全检测(3)RT-DETR模型
  • MySQL高级查询技巧:分组、聚合、子查询与分页【MySQL系列】
  • 深入理解CSS常规流布局
  • 【系统架构设计师】第一章 计算机硬件 1.1 计算机硬件 - CPU - 校验码
  • Unity 模拟高度尺系统开发详解——实现拖动、范围限制、碰撞吸附与本地坐标轴选择
  • Linux基本指令/下
  • 信息安全之为什么引入公钥密码
  • Linux系统下安装配置 Nginx
  • AUTOSAR图解==>AUTOSAR_EXP_AIADASAndVMC
  • 数组题解——最大子数组和​【LeetCode】
  • 机器学习算法04:SVC 算法(向量机分类)
  • Fastapi 学习使用
  • [GHCTF 2025]SQL???
  • 23种设计模式概览
  • ubuntu20.04.5--arm64版上使用node集成java
  • Ubuntu22.04通过命令行安装qt5
  • FPGA纯verilog实现MIPI-DSI视频编码输出,提供工程源码和技术支持
  • Redis7底层数据结构解析
  • [VMM]虚拟地址到物理地址的三级或四级页表查找过程详解
  • 4000万日订单背后,饿了么再掀即时零售的“效率革命”
  • win1011安装WinGet和Windows Terminal
  • CQF预备知识:一、微积分 -- 1.8 多变量函数:多元微积分详解
  • 离线安装 Python 包及其全部依赖
  • 谷歌Stitch:AI赋能UI设计,免费高效新利器