RabbitMQ 核心概念与消息模型深度解析(二)
四、代码实战
了解了 RabbitMQ 的核心概念和消息模型后,接下来我们通过代码实战来进一步加深对它们的理解和掌握。下面将以 Java 和 Spring AMQP 为例,展示如何使用 RabbitMQ 进行消息的发送和接收。
4.1 环境准备
在开始编写代码之前,需要确保已经安装和配置好了 RabbitMQ 服务器 ,并且在项目中引入了相关的依赖。如果使用 Maven 项目,可以在pom.xml文件中添加以下依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
4.2 简单队列模型代码示例
4.2.1 生产者代码
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class Producer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String queueName, String message) {
rabbitTemplate.convertAndSend(queueName, message);
System.out.println("发送消息到队列:" + queueName + ",消息内容:" + message);
}
}
4.2.2 消费者代码
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Consumer {
@RabbitListener(queues = "simple.queue")
public void receive(String message) {
System.out.println("接收到队列 simple.queue 的消息:" + message);
}
}
4.3 工作队列模型代码示例
4.3.1 生产者代码
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class WorkProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String queueName, String message) {
for (int i = 0; i < 10; i++) {
String msg = message + " " + i;
rabbitTemplate.convertAndSend(queueName, msg);
System.out.println("发送消息到队列:" + queueName + ",消息内容:" + msg);
}
}
}
4.3.2 消费者代码
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class WorkConsumer {
@RabbitListener(queues = "work.queue")
public void receive1(String message) {
System.out.println("消费者1接收到队列 work.queue 的消息:" + message);
}
@RabbitListener(queues = "work.queue")
public void receive2(String message) {
System.out.println("消费者2接收到队列 work.queue 的消息:" + message);
}
}
4.4 发布 / 订阅模型代码示例
4.4.1 配置交换机和队列
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
@Bean
public Queue fanoutQueue1() {
return new Queue("fanout.queue1");
}
@Bean
public Queue fanoutQueue2() {
return new Queue("fanout.queue2");
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout.exchange");
}
@Bean
public Binding binding1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
@Bean
public Binding binding2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}
}
4.4.2 生产者代码
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class FanoutProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String exchangeName, String message) {
rabbitTemplate.convertAndSend(exchangeName, "", message);
System.out.println("发送消息到交换机:" + exchangeName + ",消息内容:" + message);
}
}
4.4.3 消费者代码
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class FanoutConsumer {
@RabbitListener(queues = "fanout.queue1")
public void receive1(String message) {
System.out.println("消费者1接收到队列 fanout.queue1 的消息:" + message);
}
@RabbitListener(queues = "fanout.queue2")
public void receive2(String message) {
System.out.println("消费者2接收到队列 fanout.queue2 的消息:" + message);
}
}
4.5 路由模型代码示例
4.5.1 配置交换机和队列
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectConfig {
@Bean
public Queue directQueue1() {
return new Queue("direct.queue1");
}
@Bean
public Queue directQueue2() {
return new Queue("direct.queue2");
}
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct.exchange");
}
@Bean
public Binding binding1() {
return BindingBuilder.bind(directQueue1()).to(directExchange()).with("routing.key1");
}
@Bean
public Binding binding2() {
return BindingBuilder.bind(directQueue2()).to(directExchange()).with("routing.key2");
}
}
4.5.2 生产者代码
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class DirectProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String exchangeName, String routingKey, String message) {
rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
System.out.println("发送消息到交换机:" + exchangeName + ",路由键:" + routingKey + ",消息内容:" + message);
}
}
4.5.3 消费者代码
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class DirectConsumer {
@RabbitListener(queues = "direct.queue1")
public void receive1(String message) {
System.out.println("消费者1接收到队列 direct.queue1 的消息:" + message);
}
@RabbitListener(queues = "direct.queue2")
public void receive2(String message) {
System.out.println("消费者2接收到队列 direct.queue2 的消息:" + message);
}
}
4.6 主题模型代码示例
4.6.1 配置交换机和队列
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicConfig {
@Bean
public Queue topicQueue1() {
return new Queue("topic.queue1");
}
@Bean
public Queue topicQueue2() {
return new Queue("topic.queue2");
}
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topic.exchange");
}
@Bean
public Binding binding1() {
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.#");
}
@Bean
public Binding binding2() {
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.*.test");
}
}
4.6.2 生产者代码
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class TopicProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String exchangeName, String routingKey, String message) {
rabbitTemplate.convertAndSend(exchangeName, routingKey, message);
System.out.println("发送消息到交换机:" + exchangeName + ",路由键:" + routingKey + ",消息内容:" + message);
}
}
4.6.3 消费者代码
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class TopicConsumer {
@RabbitListener(queues = "topic.queue1")
public void receive1(String message) {
System.out.println("消费者1接收到队列 topic.queue1 的消息:" + message);
}
@RabbitListener(queues = "topic.queue2")
public void receive2(String message) {
System.out.println("消费者2接收到队列 topic.queue2 的消息:" + message);
}
}
4.7 RPC 模型代码示例
4.7.1 生产者代码
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class RPCClient {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private Connection connection;
private Channel channel;
private String replyQueueName;
private BlockingQueue<String> responseQueue = new LinkedBlockingQueue<>();
private String corrId;
public RPCClient() throws IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if (properties.getCorrelationId().equals(corrId)) {
responseQueue.offer(new String(body, "UTF-8"));
}
}
});
}
public String call(String message) throws IOException, InterruptedException {
corrId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes("UTF-8"));
return responseQueue.take();
}
public void close() throws IOException {
connection.close();
}
}
4.7.2 消费者代码
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
System.out.println(" [x] Awaiting RPC requests");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(properties.getCorrelationId())
.build();
String message = new String(body, "UTF-8");
int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")");
String response = "" + fib(n);
channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
}
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}
}
通过以上代码示例,我们展示了如何使用 Java 和 Spring AMQP 实现 RabbitMQ 的各种消息模型 。在实际应用中,可以根据具体的业务需求选择合适的消息模型,并对代码进行相应的优化和扩展。
五、总结
RabbitMQ 作为一款强大的消息队列中间件,其核心概念和消息模型是理解和使用它的关键 。通过深入了解生产者、消费者、交换机、队列、绑定、路由键、连接、信道以及虚拟主机等核心概念,我们明白了 RabbitMQ 是如何在分布式系统中实现高效、可靠的消息传递的 。不同类型的交换机和绑定规则,使得消息的路由更加灵活多样,能够满足各种复杂的业务需求 。
而 RabbitMQ 提供的多种消息模型,如简单队列模型、工作队列模型、发布 / 订阅模型、路由模型、主题模型和 RPC 模型,为我们解决不同场景下的消息通信问题提供了丰富的选择 。每种模型都有其独特的特点和适用场景,在实际应用中,我们需要根据具体的业务需求来选择合适的消息模型,以实现系统的最优性能和扩展性 。
通过本文的介绍和代码实战,希望大家对 RabbitMQ 的核心概念和消息模型有了更深入的理解和掌握 。在实际项目中,不断地实践和探索,充分发挥 RabbitMQ 的优势,为分布式系统的开发和优化提供有力的支持 。