RabbitMQ ②-工作模式
RabbitMQ 工作模式
官方提供了七种工作模式
Simple(简单模式)
P
:生产者,发布消息到队列C
:消费者,从队列中获取消息并消费Queue
:消息队列,存储消息。
一个生产者,一个消费者,消息只能被消费一次,也被称为点对点(Point-to-Point)模式。
Work-Queues(工作队列模式)
P
:生产者,发布消息到队列C1
、C2
:消费者,从队列中获取消息并消费Queue
:消息队列,存储消息。
Queue
存储多个消息时,就会分配给不同的消费者,每个消费者接收到不同的消息。
Publish/Subscribe(发布/订阅模式)
P
:生产者,发布消息到队列C1
、C2
:消费者,从队列中获取消息并消费Q1
、Q2
:消息队列,存储消息。X
:即为Exchange
,交换机,交换机可以根据一定的规则将生产者发布的消息路由到指定的队列中。
RabbitMQ 的交换机有四种类型,不同的类型有着不同的策略:
Fanout
:广播,将消息交给所有绑定到交换机的队列。(Publish/Subscribe 模式)Direct
:定向,将消息路由到符合routing key
(路由键)的队列(Routing 模式)。Topic
:通配符,将消息路由到符合routing pattern
(路由匹配规则)的队列(Topics 模式)。Headers
:Headers
类型的交换机不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的headers
属性进行匹配,这种类型的交换机性能很差,一般不会使用。
routing key 和 binding key:
routing key
:路由键,生产者将消息发给交换机时,指定的一个字符串,用来告诉交换机应该如何将消息路由到指定队列。binding key
:绑定,将队列和交换机绑定时,指定的一个字符串,这样 RabbitMQ 就可以知道如何正确地将消息路由到指定的队列。
Routing(路由模式)
X
:交换机,交换机根据routing key
进行消息路由。
Topics(通配符模式)
X
:交换机,交换机根据routing pattern
进行消息路由。*
:匹配一个单词。#
:匹配多个单词。
RPC(远程过程调用模式)
可以把该模式理解有客户端和服务端间的通信,客户端发送请求,服务端处理请求并返回结果。
客户端发送请求时,指定 correlation _id
和 reply_to
,将该请求发送到 rpc_queue
里。
服务端从 rpc_queue
里取出请求,处理请求后,将结果发送到 reply_to
里。
客户端根据 correlation _id
取出结果。
Publisher Confirms(发布确认模式)
该模式是 RabbitMQ
服务器也就是 Broker
向生产者发送确认消息,生产者接收到确认消息后才认为消息发送成功。
如果 RabbitMQ
服务器因为某种原因没有接收到确认消息,需要根据业务情况决定是否重新发送消息。
工作模式使用案例
创建普通 Maven 项目,引入依赖:
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.20.0</version>
</dependency>
定义 Constants 类
package mq.Constants;public class Constants {public static final String HOST = "47.94.9.33";public static final int PORT = 5672;public static final String USER_NAME = "admin";public static final String PASSWORD = "admin";public static final String VIRTUAL_HOST = "/";// * 工作队列模式public static final String WORK_QUEUE = "work.queue";// * 发布订阅模式public static final String FANOUT_EXCHANGE = "fanout.exchange";public static final String FANOUT_QUEUE1 = "fanout.queue1";public static final String FANOUT_QUEUE2 = "fanout.queue2";// * 路由模式public static final String DIRECT_EXCHANGE = "direct.exchange";public static final String DIRECT_QUEUE1 = "direct.queue1";public static final String DIRECT_QUEUE2 = "direct.queue2";// * 通配符模式public static final String TOPIC_EXCHANGE = "topic.exchange";public static final String TOPIC_QUEUE1 = "topic.queue1";public static final String TOPIC_QUEUE2 = "topic.queue2";// * RPC 模式public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";// * Publisher Confirms 模式public static final String P_CONFIRMS_QUEUE1 = "p.confirms.queue1";public static final String P_CONFIRMS_QUEUE2 = "p.confirms.queue2";public static final String P_CONFIRMS_QUEUE3 = "p.confirms.queue3";
}
Simple(简单模式)
生产者
package mq.simple;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import mq.Constants.Constants;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class ProducerDemo {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST); // ? 公网 IPconnectionFactory.setPort(Constants.PORT); // ? 端口connectionFactory.setUsername(Constants.USER_NAME); // ? 用户名connectionFactory.setPassword(Constants.PASSWORD); // ? 密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); // ? 虚拟主机Connection connection = connectionFactory.newConnection();// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明交换机(使用内置的交换机)// TODO 4. 声明队列/*** Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,* Map<String, Object> arguments)* queue:队列名称* durable:是否持久化* exclusive:是否独占* autoDelete:是否自动删除* arguments:参数*/channel.queueDeclare("hello", true, false, false, null);// TODO 5. 发送消息/*** void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)* exchange:交换机名称* routingKey:使用内置交换机,routingKey 和队列名保持一致* props:属性配置* body:消息体*/String msg = "hello rabbitMQ~";for (int i = 0; i < 1000; i++) {channel.basicPublish("", "hello", null, msg.getBytes(StandardCharsets.UTF_8));}System.out.println("消息发送成功," + msg);// TODO 6. 释放资源channel.close(); // ! 先关闭 channelconnection.close();}
}
消费者
package mq.simple;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class ConsumerDemo {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST); // ? 公网 IPconnectionFactory.setPort(Constants.PORT); // ? 端口connectionFactory.setUsername(Constants.USER_NAME); // ? 用户名connectionFactory.setPassword(Constants.PASSWORD); // ? 密码connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); // ? 虚拟主机Connection connection = connectionFactory.newConnection();// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)channel.queueDeclare("hello", true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){/*** 从队列中,收到消息就会执行该方法* @param consumerTag the <i>consumer tag</i> associated with the consumer* @param envelope packaging data for the message 封包的消息,比如交换机,队列名称等...* @param properties content header data for the message* @param body the message body (opaque, client-specific byte array)* @throws IOException IOException*/@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume("hello", true, consumer);Thread.sleep(1000);// TODO 5. 释放资源channel.close(); // ! 先关闭 channelconnection.close();}
}
Work-Queues(工作队列模式)
生产者
package mq.workQueues;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import mq.Constants.Constants;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明交换机(使用内置的交换机)// TODO 4. 声明队列channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// TODO 5. 发送消息for (int i = 0; i < 10; i++) {String msg = "hello work queue mode~:" + i;channel.basicPublish("", Constants.WORK_QUEUE, null, msg.getBytes(StandardCharsets.UTF_8));}System.out.println("消息发送成功");// TODO 6. 释放资源channel.close(); // ! 先关闭 channelconnection.close();}
}
消费者1
package mq.workQueues;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);// TODO 5. 释放资源
// channel.close(); // ! 先关闭 channel
// connection.close();}
}
消费者2
package mq.workQueues;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.WORK_QUEUE, true, consumer);// TODO 5. 释放资源
// channel.close(); // ! 先关闭 channel
// connection.close();}
}
Publish/Subscribe(发布/订阅模式)
生产者
package mq.fanout;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import mq.Constants.Constants;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明交换机/*** Exchange.DeclareOk exchangeDeclare(String exchange, the name of the exchange* BuiltinExchangeType type, the exchange type* boolean durable, true if we are declaring a durable exchange (the exchange will survive a server restart)* boolean autoDelete, true if the server should delete the exchange when it is no longer in use* boolean internal, true if the exchange is internal, it can't be directly published to by a client.* Map<String, Object> arguments), other properties (construction arguments) for the exchange*/channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);// TODO 4. 声明队列channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);/*** Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)* queue: the name of the queue* exchange: the name of the exchange* routingKey: the routing key to use for the binding* arguments: other properties (binding parameters)*/// TODO 5. 绑定交换机和队列channel.queueBind(Constants.FANOUT_QUEUE1, Constants.FANOUT_EXCHANGE, "");channel.queueBind(Constants.FANOUT_QUEUE2, Constants.FANOUT_EXCHANGE, "");// TODO 6. 发送消息for (int i = 0; i < 10; i++) {String msg = "hello work queue mode~:" + i;channel.basicPublish(Constants.FANOUT_EXCHANGE, "", null, msg.getBytes(StandardCharsets.UTF_8));}System.out.println("消息发送成功");// TODO 7. 释放资源channel.close(); // ! 先关闭 channelconnection.close();}
}
消费者1
package mq.fanout;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)channel.queueDeclare(Constants.FANOUT_QUEUE1, true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);// TODO 5. 释放资源
// channel.close(); // ! 先关闭 channel
// connection.close();}
}
消费者2
package mq.fanout;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)channel.queueDeclare(Constants.FANOUT_QUEUE2, true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.FANOUT_QUEUE2, true, consumer);// TODO 5. 释放资源
// channel.close(); // ! 先关闭 channel
// connection.close();}
}
Routing(路由模式)
生产者
package mq.direct;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import mq.Constants.Constants;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 开启隧道Channel channel = connection.createChannel();// TODO 3. 声明交换机channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);// TODO 4. 声明队列channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);// TODO 5. 将队列和交换机绑定channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "a");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");// TODO 6. 发送消息List<String> list = List.of("a", "b", "c", "d");Random random = new Random();for (int i = 0; i < 10; i++) {String routingKey = list.get(random.nextInt(3));String msg = "hello routing mode~:" + routingKey;System.out.println(msg);channel.basicPublish(Constants.DIRECT_EXCHANGE, routingKey, null, msg.getBytes(StandardCharsets.UTF_8));}System.out.println("消息发送成功");// TODO 7. 释放资源channel.close(); // ! 先关闭 channelconnection.close();}
}
消费者1
package mq.direct;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);// TODO 5. 释放资源
// channel.close(); // ! 先关闭 channel
// connection.close();}
}
消费者2
package mq.direct;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.DIRECT_QUEUE2, true, consumer);// TODO 5. 释放资源
// channel.close(); // ! 先关闭 channel
// connection.close();}
}
Topics(通配符模式)
生产者
package mq.topic;import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import mq.Constants.Constants;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 开启隧道Channel channel = connection.createChannel();// TODO 3. 声明交换机channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);// TODO 4. 声明队列channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);// TODO 5. 将队列和交换机绑定channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");// TODO 6. 发送消息String msg1 = "hello topic mode~:ef.a.c";channel.basicPublish(Constants.TOPIC_EXCHANGE, "ef.a.c", null, msg1.getBytes(StandardCharsets.UTF_8)); // ? 转发到 Q1String msg2 = "hello topic mode~:rr.a.b";channel.basicPublish(Constants.TOPIC_EXCHANGE, "rr.a.b", null, msg2.getBytes(StandardCharsets.UTF_8)); // ? 转发到 Q1,Q2String msg3 = "hello topic mode~:c.com.ljh";channel.basicPublish(Constants.TOPIC_EXCHANGE, "c.com.ljh", null, msg3.getBytes(StandardCharsets.UTF_8)); // ? 转发到 Q2System.out.println("消息发送成功");// TODO 7. 释放资源channel.close(); // ! 先关闭 channelconnection.close();}
}
消费者1
package mq.topic;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer1 {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE1, true, consumer);// TODO 5. 释放资源
// channel.close(); // ! 先关闭 channel
// connection.close();}
}
消费者2
package mq.topic;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.util.concurrent.TimeoutException;public class Consumer2 {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 声明队列,可以省略(如果生产者未声明队列的话,消费者也未声明队列则会报错,因为不知道和哪个队列绑定了)channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);// TODO 4. 接收消息DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("接收到消息:" + new String(body));}};channel.basicConsume(Constants.TOPIC_QUEUE2, true, consumer);// TODO 5. 释放资源
// channel.close(); // ! 先关闭 channel
// connection.close();}
}
RPC(远程过程调用模式)
服务端
package mq.rpc;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;public class RPCServer {public static void main(String[] args) throws IOException, TimeoutException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 开启隧道Channel channel = connection.createChannel();// TODO 2.1 声明队列channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);DefaultConsumer defaultConsumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String request = new String(body);System.out.println("[RPC Server 接收到请求]:" + request);String response = "针对请求:" + request + " 做出响应:" + "🫡";// TODO 4. 发送响应AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().correlationId(properties.getCorrelationId()).build();channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes(StandardCharsets.UTF_8));// TODO 5. 确认收到channel.basicAck(envelope.getDeliveryTag(), false);}};// TODO 3. 接收请求channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, defaultConsumer);}
}
客户端
package mq.rpc;import com.rabbitmq.client.*;
import mq.Constants.Constants;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeoutException;public class RPCClient {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);Connection connection = connectionFactory.newConnection();// TODO 2. 开启隧道Channel channel = connection.createChannel();// TODO 2.1 声明队列channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);// TODO 3. 发送请求String correlationId = UUID.randomUUID().toString();AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().correlationId(correlationId) // ? 唯一标识,标识接收该 ID 的响应.replyTo(Constants.RPC_RESPONSE_QUEUE).build();String msg = "hello RPC mode~:" + correlationId;channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, properties, msg.getBytes(StandardCharsets.UTF_8));// TODO 4. 接收响应BlockingQueue<String> queue = new LinkedBlockingQueue<>(1);DefaultConsumer consumer = new DefaultConsumer(channel){@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String resp = new String(body);System.out.println("接收到回调消息:" + resp);if (properties.getCorrelationId().equals(correlationId)) {queue.offer(resp);}}};channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);String res = queue.take();// ! 若没有对应的消息,程序会在这里阻塞System.out.println("[RPC Client接收到符合 ID 的消息]:" + res);}
}
Publisher Confirms(发布确认模式)
发布确认
package mq.publisher.confirms;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import mq.Constants.Constants;import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.TimeoutException;public class PublisherConfirms {private static final int MAX_MESSAGE = 10000;static Connection createConnection() throws Exception {// TODO 1. 建立连接ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost(Constants.HOST);connectionFactory.setPort(Constants.PORT);connectionFactory.setUsername(Constants.USER_NAME);connectionFactory.setPassword(Constants.PASSWORD);connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);return connectionFactory.newConnection();}public static void main(String[] args) throws Exception {// * Strategy #1: Publishing Messages Individually
// publishingMessagesIndividually();// * Strategy #2: Publishing Messages in BatchespublishingMessagesInBatches();// * Strategy #3: Handling Publisher Confirms AsynchronouslyhandlingPublisherConfirmsAsynchronously();}private static void handlingPublisherConfirmsAsynchronously() throws Exception {try (Connection connection = createConnection()) {// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 开启发布确认机制channel.confirmSelect();// TODO 4. 声明队列channel.queueDeclare(Constants.P_CONFIRMS_QUEUE3, true, false, false, null);SortedSet<Long> sortedSet = Collections.synchronizedSortedSet(new TreeSet<>());// TODO 5. 监听来自 Broker 的 ack 或 nackchannel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {if (multiple) {sortedSet.headSet(deliveryTag + 1).clear();} else {sortedSet.remove(deliveryTag);}}@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {if (multiple) {sortedSet.headSet(deliveryTag + 1).clear();} else {sortedSet.remove(deliveryTag);}// TODO 5.1 根据业务逻辑处理消息重传}});Long start = System.currentTimeMillis();// TODO 6. 发送消息for (int i = 0; i < MAX_MESSAGE; i++) {String msg = "hello Publisher Confirms~:" + i;Long ackSeq = channel.getNextPublishSeqNo();sortedSet.add(ackSeq);channel.basicPublish("", Constants.P_CONFIRMS_QUEUE3, null, msg.getBytes(StandardCharsets.UTF_8));}while (!sortedSet.isEmpty()) {
// Thread.sleep(10);}Long end = System.currentTimeMillis();System.out.printf("异步确认策略,消息条数:%d;总耗时:%d ms\n", MAX_MESSAGE, end - start);}}private static void publishingMessagesInBatches() throws Exception {try (Connection connection = createConnection()) {// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 开启发布确认机制channel.confirmSelect();// TODO 4. 声明队列channel.queueDeclare(Constants.P_CONFIRMS_QUEUE2, true, false, false, null);Long start = System.currentTimeMillis();// TODO 5. 发送消息int batchSize = 100, outstandingMessageCnt = 0;for (int i = 0; i < MAX_MESSAGE; i++) {String msg = "hello Publisher Confirms~:" + i;channel.basicPublish("", Constants.P_CONFIRMS_QUEUE2, null, msg.getBytes(StandardCharsets.UTF_8));outstandingMessageCnt++;if (outstandingMessageCnt >= batchSize) {channel.waitForConfirms(5_000);outstandingMessageCnt = 0;}}if (outstandingMessageCnt > 0) {channel.waitForConfirms(5_000);}Long end = System.currentTimeMillis();System.out.printf("批量确认策略,消息条数:%d;总耗时:%d ms\n", MAX_MESSAGE, end - start);}}private static void publishingMessagesIndividually() throws Exception {try (Connection connection = createConnection()) {// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 开启发布确认机制channel.confirmSelect();// TODO 4. 声明队列channel.queueDeclare(Constants.P_CONFIRMS_QUEUE1, true, false, false, null);Long start = System.currentTimeMillis();// TODO 5. 发布消息for (int i = 0; i < MAX_MESSAGE; i++) {String msg = "hello Publisher Confirms~:" + i;channel.basicPublish("", Constants.P_CONFIRMS_QUEUE1, null, msg.getBytes(StandardCharsets.UTF_8));// TODO 5.1 等待 5s 收到来自 broker 的确认消息channel.waitForConfirms(5_000);}Long end = System.currentTimeMillis();System.out.printf("单独确认策略,消息条数:%d;总耗时:%d ms\n", MAX_MESSAGE, end - start);}}
}
em.out.printf("批量确认策略,消息条数:%d;总耗时:%d ms\n", MAX_MESSAGE, end - start);}}private static void publishingMessagesIndividually() throws Exception {try (Connection connection = createConnection()) {// TODO 2. 开启信道Channel channel = connection.createChannel();// TODO 3. 开启发布确认机制channel.confirmSelect();// TODO 4. 声明队列channel.queueDeclare(Constants.P_CONFIRMS_QUEUE1, true, false, false, null);Long start = System.currentTimeMillis();// TODO 5. 发布消息for (int i = 0; i < MAX_MESSAGE; i++) {String msg = "hello Publisher Confirms~:" + i;channel.basicPublish("", Constants.P_CONFIRMS_QUEUE1, null, msg.getBytes(StandardCharsets.UTF_8));// TODO 5.1 等待 5s 收到来自 broker 的确认消息channel.waitForConfirms(5_000);}Long end = System.currentTimeMillis();System.out.printf("单独确认策略,消息条数:%d;总耗时:%d ms\n", MAX_MESSAGE, end - start);}}
}