当前位置: 首页 > ops >正文

RabbitMQ 工作模式(上)

前言

在 RabbitMQ 中,一共有七种工作模式,我们也可以打开官网了解:
在这里插入图片描述

本章我们先介绍前三种工作模式

(Simple)简单模式

在这里插入图片描述

P:producer 生产者,负责发送消息
C:consumer 消费者,接收消息
Queue: 消息队列

简单模式的特点:一个生产者P,一个消费者C,消息只能被消费一次,也被称为点对点【Point-to-Point】模式

案例:上一篇文章中的快速入门RabbitMQ 就是简单模式的演示,大家可以去看一下代码的实现

Work Queue(工作队列)

在这里插入图片描述
特点:多个消费者共同消费同一条队列的消息,消息不重复
假设队列有 10 条消息,C1 消费了 4 条,C2 消费了 6 条,这就是共同消费。

生产者:

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//进行绑定ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);//建立连接Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//这里使用默认的交换机//声明队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//发送消息for (int i = 0; i < 10; i++) {String msg = "hello work queue...."+i;channel.basicPublish("",Constants.WORK_QUEUE, null, msg.getBytes());}System.out.println("发送消息成功");//资源释放channel.close();connection.close();}
}

消费者:

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//进行参数绑定ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);//建立连接Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//使用默认的交换机//声明队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//消息消费DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到的消息:" + new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);}
}
public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//进行参数绑定ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);//建立连接Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//使用默认的交换机//声明队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);//消息消费DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到的消息:" + new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);}
}

交换机概念

这里解释一下为什么上面两种工作模式流程图没有交换机的存在,因为交换机在上面两种工作模式中不起重要作用,为了简化,所以省略了交换机,实际在 RabbitMQ 中 生产者发送的消息是需要通过交换机将消息发送到对应的队列中。

在RabbitMQ 中,一共有四种类型的交换机,分别是 Fanout、Direct、Topic、Headers,不同类型的交换机有着不同的路由策略。
在 AMQP 协议中还有额外的两种类型:System 和 自定义,在RabbitMQ 中我们就不介绍这额外的两种

Fanout:广播模式,将小心发送给所有与该交换机绑定的队列中,对应下面即将讲解的 Publish / Subscribe (发布 / 订阅) 工作模式

Direct:定向,将消息发送给指定的 routing key 的队列中,对应 Routing 模式

Topic:通配符,将消息交给符合指定的 routing pattern (路由模式)的队列

Headers : 该交换机不依赖路由键的批匹配规则来路由消息,而是根据发送的消息内容中的headers 属性进行匹配,headers 类型的交换机性能很差,很少很在工作中遇到。


这里解释一下 routing key 和 binding key 的概念:

在这里插入图片描述

生产者和交换机的联系使用的是 Routing Key,交换机和队列的联系使用的是 Binding Key

在后续的代码中 我们也会将 Binding Key 当作 Routing Key

Publish / Subscribe (发布 / 订阅)

在这里插入图片描述

交换机将消息发送到不同的队列中,类似广播的作用,所有和该交换机绑定的队列都会收到一样的消息

public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//设置 MQ 参数ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);//建立连接Connection connection = factory.newConnection();// 开启信道Channel channel = connection.createChannel();//声明交换机channel.exchangeDeclare("publish", BuiltinExchangeType.FANOUT, true, false, null);//声明队列channel.queueDeclare("fanout1", true, false, false, null);channel.queueDeclare("fanout2", true, false, false, null);//绑定队列和交换机channel.queueBind("fanout1","publish","");channel.queueBind("fanout2","publish","");//发送消息for (int i = 0; i < 10; i++) {channel.basicPublish("publish", "", null, ("hello" + i).getBytes());}//关闭资源channel.close();connection.close();}
}

方法参数介绍:

交换机声明:

Exchange.DeclareOk exchangeDeclare(String exchange,
BuiltinExchangeType type,
boolean durable, 
boolean autoDelete,
Map<String, Object> arguments) throws IOException;

exchange:交换机的名称

BuiltinExchangeType type: 交换机的类型,点击 BuiltinExchangeType 可以查看到 这是一个枚举类,一共有四种类型的交换机:DIRECT(“direct”), FANOUT(“fanout”), TOPIC(“topic”), HEADERS(“headers”);

public enum BuiltinExchangeType {DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers");private final String type;BuiltinExchangeType(String type) {this.type = type;}public String getType() {return type;}
}

durable:是否进行持久化

autoDelete: 是否自动删除

arguments:高级特性的设置


交换机与队列的绑定:

Queue.BindOk queueBind(String queue, 
String exchange, 
String routingKey) throws IOException;

queue:队列的名称

exchange: 交换机的名称

routingKey : 交换机和队列绑定的联系词BindingKey,这里写的RoutingKey,本质上是一样的。


消费者代码:

public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {//设置 MQ 参数ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);//建立连接Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare("fanout1", true, false, false, null);//消费消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};channel.basicConsume("fanout1", true, consumer);}
}
public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {//设置 MQ 参数ConnectionFactory factory = new ConnectionFactory();factory.setHost(Constants.HOST);factory.setPort(Constants.PORT);factory.setUsername(Constants.NAME);factory.setPassword(Constants.PASSWORD);factory.setVirtualHost(Constants.VIRTUAL_HOST);//建立连接Connection connection = factory.newConnection();//开启信道Channel channel = connection.createChannel();//声明队列channel.queueDeclare("fanout2", true, false, false, null);//进行消费Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println(new String(body));}};channel.basicConsume("fanout2", true, consumer);}
}
http://www.xdnf.cn/news/6957.html

相关文章:

  • MySQL事务的一些奇奇怪怪知识
  • linux本地部署ollama+deepseek过程
  • 大模型为什么学新忘旧(大模型为什么会有灾难性遗忘)?
  • EasyExcel动态表头
  • 【Java ee初阶】jvm(2)
  • 【Qt mainwindow 】窗口在启动时自动调整为适应屏幕大小
  • 正则表达式与文本处理的艺术
  • Selenium-Java版(css表达式)
  • go语法大赏
  • btc交易所关键需求区 XBIT反弹与上涨潜力分析​​
  • 深入理解Java中的Minor GC、Major GC和Full GC
  • 组态王|组态王中如何添加西门子1200设备
  • 2.2.4
  • 【数据结构】1-3 算法的时间复杂度
  • Zookeeper 入门(二)
  • Elasticsearch基础篇-java程序通过RestClient操作es
  • HarmonyOS 影视应用APP开发--配套的后台服务go-imovie项目介绍及使用
  • [创业之路-361]:企业战略管理案例分析-2-战略制定-使命、愿景、价值观的失败案例
  • VueUse/Core:提升Vue开发效率的实用工具库
  • 牛客网NC210769: 字母大小写转换问题解析
  • 灵光一现的问题和常见错误1
  • c++ 仿函数
  • [Android] 奇妙扫描 V1.0.7
  • Linux系统之----重定向
  • 基于OpenCV的SIFT特征和FLANN匹配器的指纹认证
  • 泛微对接金蝶云星空实战案例技术分享
  • C++:C++内存管理
  • DeerFlow试用
  • 一周学会Pandas2 Python数据处理与分析-Pandas2数据添加修改删除操作
  • 使用python进行人员轨迹跟踪