【RabbitMQ】路由模式和通配符模式的具体实现
文章目录
- 路由模式
- 创建队列和交换机
- 生产者代码
- 创建交换机
- 声明队列
- 绑定交换机和队列
- 发送消息
- 完整代码
- 消费者代码
- 运行程序
- 启动生产者
- 启动消费者
- 通配符模式
- 创建队列和交换机
- 生产者代码
- 创建交换机
- 声明队列
- 绑定交换机和队列
- 发送消息
- 完整代码
- 消费者代码
- 运行程序
- 启动生产者
- 启动消费者
路由模式
队列和交换机的绑定,不能是任意的绑定了,而是要指定一个 BindingKey
(RoutingKey
的一种) 消息的发送方在向 Exchange
发送消息时,也需要指定消息的 RoutingKey
Exchange
也不再把消息交给每一个绑定的 key
,而是根据消息的 RoutingKey
进行判断,只有队列绑定时的 BindingKey
和发送消息的 RoutingKey
完全一致,才会接收到消息
- 课程中所谓的
BindingKey
,是RoutingKey
的一种 - 早期也叫做
routingKey
,只是在最新的文档中被改成BindingKey
了
我们通常:
- 把消息发送称为:
Routingkey
- 把队列绑定称为:
BindingKey
创建队列和交换机
在 Constants
中添加:
// 路由模式
public static final String DIRECT_EXCHANGE = "direct.exchange";
public static final String DIRECT_QUEUE1 = "direct.queue1";
public static final String DIRECT_QUEUE2 = "direct.queue2";
生产者代码
和发布订阅模式的区别是:交换机类型不同,绑定队列的 BindingKey
不同
创建交换机
创建交换机,定义交换机类型为 BuiltinExchangeType.DIRECT
channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);
BuiltinExchangeType
一共有四种DIRECT("direct")
FANOUT("fanout")
TOPIC("topic")
HEADERS("headers")
声明队列
channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);
绑定交换机和队列
// 队列1绑定 a
channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "a");
// 队列2 绑定 a, b, c
channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");
channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");
channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");
发送消息
String msg_a = "hello direct, my routingKey is a...";
channel.basicPublish(Constants.DIRECT_EXCHANGE, "a", null, msg_a.getBytes()); String msg_b = "hello direct, my routingKey is b...";
channel.basicPublish(Constants.DIRECT_EXCHANGE, "b", null, msg_b.getBytes()); String msg_c = "hello direct, my routingKey is c...";
channel.basicPublish(Constants.DIRECT_EXCHANGE, "c", null, msg_c.getBytes());
- 发送消息时,指定
RoutingKey
完整代码
package rabbitmq.routing; import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants; import java.io.IOException;
import java.util.concurrent.TimeoutException;
import java.util.function.BinaryOperator; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { // 1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); // 2. 开启信道 Channel channel = connection.createChannel(); // 3. 声明交换机 channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true); // 4. 声明队列 channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null); channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null); // 5. 绑定交换机和队列 channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "a"); channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a"); channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b"); channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c"); // 6. 发送消息 String msg_a = "hello direct, my routingKey is a..."; channel.basicPublish(Constants.DIRECT_EXCHANGE, "a", null, msg_a.getBytes()); String msg_b = "hello direct, my routingKey is b..."; channel.basicPublish(Constants.DIRECT_EXCHANGE, "b", null, msg_b.getBytes()); String msg_c = "hello direct, my routingKey is c..."; channel.basicPublish(Constants.DIRECT_EXCHANGE, "c", null, msg_c.getBytes()); System.out.println("消息发送成功!"); // 7. 释放资源 channel.close(); connection.close(); }
}
消费者代码
Routing
模式的消费者代码和 Publish/Subscribe
代码一样,同样复制出来两份
Consumer1
Consumer2
package rabbitmq.routing; import com.rabbitmq.client.*;
import rabbitmq.constant.Constants; import java.io.IOException;
import java.util.concurrent.TimeoutException; public class Consumer1 { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); //2. 建立信道 Channel channel = connection.createChannel(); //3. 声明队列 channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null); //4. 消费信息 DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到消息:" + new String(body)); } }; channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer); }
}
运行程序
启动生产者
- 消息路由
- 可以看到
direct.queue1
队列中,路由了一条消息 - 可以看到
direct.queue2
队列中,路由了两条消息
- 队列和交换机的绑定
启动消费者
Consumer1
:
接收到消息:hello direct, my routingKey is a...
Consumer2
:
接收到消息:hello direct, my routingKey is a...
接收到消息:hello direct, my routingKey is b...
接收到消息:hello direct, my routingKey is c...
通配符模式
Topics
和 Routing
模式的区别是:
Topics
模式使用的交换机类型是topic
(Routing
模式用的交换机类型为direct
)topic
类型的交换机在匹配规则上进行了扩展,Binding Key
支持通配符匹配(direct
类型的交换机路由规则是BindingKey
和RoutingKey
完全匹配)
在 Topic
类型的交换机在匹配规则上,有一些要求:
RoutingKey
是由一系列由点(.
) 分隔的单词,比如“stock.sd.nyse
”, “nyse.vmw
”, “quick.orange.rabbit
”BindingKey
和RoutingKey
一样,也是点(.
) 分割的字符串BindingKey
中可以存在两种特殊的字符串,用于模糊匹配*
表示一个单词#
表示多个单词(0-N
个)
比如:
Binding Key
为“d.a.b
”会同时路由到Q1
和Q2
Binding Key
为“d.a.f
”会路由到Q1
Binding Key
为“c.e.f
”会路由到Q2
Binding Key
为“d.b.f
”会被丢弃,或者返回给生产者(需要设置mandatory
)
创建队列和交换机
public static final String TOPIC_EXCHANGE = "topic.exchange";
public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";
生产者代码
和路由模式,发布订阅模式的区别是:交换机类型不同,绑定队列的 RoutingKey
不同
创建交换机
定义交换机类型为 BuiltinExchangeType.TOPIC
channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);
声明队列
channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);
绑定交换机和队列
channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");
channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");
channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");
发送消息
String msg_a = "hello topic, my routingkey is ae.a.f...";
channel.basicPublish(Constants.TOPIC_EXCHANGE, "ae.a.f", null, msg_a.getBytes()); // 转发到 Q1 String msg_b = "hello topic, my routingkey is ef.a.b...";
channel.basicPublish(Constants.TOPIC_EXCHANGE, "ef.a.b", null, msg_b.getBytes()); // 转发到 Q1 和 Q2 String msg_c = "hello topic, my routingkey is c.ef.d...";
channel.basicPublish(Constants.TOPIC_EXCHANGE, "c.ef.d", null, msg_c.getBytes()); // 转发到 Q2
完整代码
package rabbitmq.topic; import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.constant.Constants; import java.io.IOException;
import java.util.concurrent.TimeoutException; public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3.声明交换机 channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true); //4. 声明队列 channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null); channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null); //5. 绑定交换机和队列 channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*"); channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b"); channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#"); //6. 发送消息 String msg_a = "hello topic, my routingkey is ae.a.f..."; channel.basicPublish(Constants.TOPIC_EXCHANGE, "ae.a.f", null, msg_a.getBytes()); // 转发到 Q1 String msg_b = "hello topic, my routingkey is ef.a.b..."; channel.basicPublish(Constants.TOPIC_EXCHANGE, "ef.a.b", null, msg_b.getBytes()); // 转发到 Q1 和 Q2 String msg_c = "hello topic, my routingkey is c.ef.d..."; channel.basicPublish(Constants.TOPIC_EXCHANGE, "c.ef.d", null, msg_c.getBytes()); // 转发到 Q2 System.out.println("消息发送成功"); //7. 释放资源 channel.close(); connection.close(); }
}
消费者代码
消费者代码和 Routing
模式的一样,只要修改消费队列的名称即可
- 消费者 1
- 消费者 2
package rabbitmq.topic; import com.rabbitmq.client.*;
import rabbitmq.constant.Constants; import java.io.IOException;
import java.util.concurrent.TimeoutException; public class Consumer2 { public static void main(String[] args) throws IOException, TimeoutException { //1. 建立连接 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(Constants.HOST); connectionFactory.setPort(Constants.PORT); connectionFactory.setUsername(Constants.USER_NAME); connectionFactory.setPassword(Constants.PASSWORD); connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); Connection connection = connectionFactory.newConnection(); //2. 开启信道 Channel channel = connection.createChannel(); //3. 声明队列 channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null); //4. 消费消息 DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("接收到消息:" + new String(body)); } }; channel.basicConsume(Constants.TOPIC_QUEUE2, true, consumer); }
}
运行程序
启动生产者
- 可以看到队列的消息数
启动消费者
Consumer1
接收到消息:hello topic, my routingkey is ae.a.f...
接收到消息:hello topic, my routingkey is ef.a.b...
Consumer2
接收到消息:hello topic, my routingkey is ef.a.b...
接收到消息:hello topic, my routingkey is c.ef.d...