当前位置: 首页 > news >正文

RabbitMQ 声明队列和交换机详解

RabbitMQ 声明队列和交换机详解

一、为什么需要声明队列和交换机?

RabbitMQ先声明再使用的机制:

  • 队列负责存储消息
  • 交换机负责路由消息
  • 绑定关系决定消息从交换机到队列的路径

如果没有事先声明:

  • 队列/交换机不存在时,发送消息会失败
  • 队列没有绑定到交换机时,消息会丢失(除非设置了备用交换机)

二、声明交换机(Exchange)

2.1 交换机参数说明

RabbitMQ 提供四种类型(direct、fanout、topic、headers),声明时需要指定以下参数:

参数类型说明
nameString交换机名称(不为空字符串)
typeString类型:directfanouttopicheaders
durableboolean是否持久化(重启 RabbitMQ 后仍存在)
autoDeleteboolean是否自动删除(最后一个队列解绑后删除)
argumentsMap额外参数(如 TTL、死信交换机配置)

2.2 Java 原生声明交换机

channel.exchangeDeclare("my.direct.exchange", // 交换机名称BuiltinExchangeType.DIRECT, // 类型true,  // durablefalse, // autoDeletenull   // arguments
);

2.3 Spring AMQP 声明交换机

@Bean
public DirectExchange directExchange() {return new DirectExchange("my.direct.exchange", true, false);
}

Spring 会自动在应用启动时向 RabbitMQ 发送声明请求。

三、声明队列(Queue)

3.1 队列参数说明

参数类型说明
nameString队列名称(匿名队列可由 RabbitMQ 自动生成)
durableboolean是否持久化(消息是否持久化取决于发送时的 deliveryMode
exclusiveboolean是否排他队列(仅连接可见,断开即删除)
autoDeleteboolean是否自动删除(最后一个消费者断开时删除)
argumentsMap额外参数(TTL、死信队列、最大长度等)

3.2 Java 原生声明队列

channel.queueDeclare("my.queue", // 队列名称true,       // durablefalse,      // exclusivefalse,      // autoDeletenull        // arguments
);

3.3 Spring AMQP 声明队列

@Bean
public Queue myQueue() {return new Queue("my.queue", true, false, false);
}

四、绑定交换机和队列

4.1 Java 原生绑定

channel.queueBind("my.queue",           // 队列名称"my.direct.exchange", // 交换机名称"order.create"        // routingKey
);

4.2 Spring AMQP 绑定

@Bean
public Binding binding() {return BindingBuilder.bind(myQueue()).to(directExchange()).with("order.create");
}

五、实战示例

5.2 fanout示例

package com.itheima.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {/*** 声明交换机* @return Fanout类型交换机*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("hmall.fanout");}/*** 第1个队列*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2个队列*/@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

5.3 direct示例

package com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DirectConfig {/*** 声明交换机* @return Direct类型交换机*/@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange("hmall.direct").build();}/*** 第1个队列*/@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}/*** 第2个队列*/@Beanpublic Queue directQueue2(){return new Queue("direct.queue2");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}/*** 绑定队列和交换机*/@Beanpublic Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}
}

六、基于注解声明

  • Direct模式
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
  • Topic模式
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),key = "china.#"
))
public void listenTopicQueue1(String msg){System.out.println("消费者1接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),key = "#.news"
))
public void listenTopicQueue2(String msg){System.out.println("消费者2接收到topic.queue2的消息:【" + msg + "】");
}
http://www.xdnf.cn/news/1280863.html

相关文章:

  • 基于FPGA的热电偶测温数据采集系统,替代NI的产品(三)测试
  • 基于领域事件驱动的微服务架构设计与实践
  • 面试实战 问题二十三 如何判断索引是否生效,什么样的sql会导致索引失效
  • C++ 限制类对象数量的技巧与实践
  • CS钓鱼鱼饵制作的方式
  • RFID系统:物联网时代的数字化管理中枢
  • 网络性能优化:Go编程视角 - 从理论到实践的性能提升之路
  • PyTorch基础(使用Tensor及Antograd实现机器学习)
  • Unity大型场景性能优化全攻略:PC与安卓端深度实践 - 场景管理、渲染优化、资源调度 C#
  • 请求报文和响应报文(详细讲解)
  • Android16新特性速记
  • 查看 php 可用版本
  • Spring Boot文件上传功能实现详解
  • DNS(域名系统)
  • cesium/resium 修改子模型材质
  • 第5节 大模型分布式推理通信优化与硬件协同
  • typecho博客设置浏览器标签页图标icon
  • 标准io(1)
  • MySQL中GROUP_CONCAT函数的使用详解
  • 机器翻译:一文掌握序列到序列(Seq2Seq)模型(包括手写Seq2Seq模型)
  • ssh 远程连接加密算法报错
  • MyBatis执行器与ORM特性深度解析
  • 十二、Linux Shell脚本:正则表达式
  • 导入CSV文件到MySQL
  • 打破内网枷锁!TRAE SOLO + cpolar 让AI开发告别“孤岛困境”
  • 腾讯 iOA 测评 | 横向移动检测、病毒查杀、外设管控、部署性能
  • 浏览器CEFSharp+X86+win7 之 测试抖音小店订单抓取(八)
  • 运动规划实战案例 | 基于多源流场(Flow Field)的路径规划(附ROS C++/Python实现)
  • Nmap 渗透测试弹药库:精准扫描与隐蔽渗透技术手册
  • Qt串口通信设计指南:通信层架构与实践