当前位置: 首页 > backend >正文

RabbitMQ 核心概念与消息模型深度解析(二)

四、代码实战

了解了 RabbitMQ 的核心概念和消息模型后,接下来我们通过代码实战来进一步加深对它们的理解和掌握。下面将以 Java 和 Spring AMQP 为例,展示如何使用 RabbitMQ 进行消息的发送和接收。

4.1 环境准备

在开始编写代码之前,需要确保已经安装和配置好了 RabbitMQ 服务器 ,并且在项目中引入了相关的依赖。如果使用 Maven 项目,可以在pom.xml文件中添加以下依赖:

 

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring-boot-starter-amqp</artifactId>

</dependency>

4.2 简单队列模型代码示例

4.2.1 生产者代码
 

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

@Service

public class Producer {

@Autowired

private RabbitTemplate rabbitTemplate;

public void send(String queueName, String message) {

rabbitTemplate.convertAndSend(queueName, message);

System.out.println("发送消息到队列:" + queueName + ",消息内容:" + message);

}

}

4.2.2 消费者代码
 

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Component

public class Consumer {

@RabbitListener(queues = "simple.queue")

public void receive(String message) {

System.out.println("接收到队列 simple.queue 的消息:" + message);

}

}

4.3 工作队列模型代码示例

4.3.1 生产者代码
 

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

@Service

public class WorkProducer {

@Autowired

private RabbitTemplate rabbitTemplate;

public void send(String queueName, String message) {

for (int i = 0; i < 10; i++) {

String msg = message + " " + i;

rabbitTemplate.convertAndSend(queueName, msg);

System.out.println("发送消息到队列:" + queueName + ",消息内容:" + msg);

}

}

}

4.3.2 消费者代码
 

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Component

public class WorkConsumer {

@RabbitListener(queues = "work.queue")

public void receive1(String message) {

System.out.println("消费者1接收到队列 work.queue 的消息:" + message);

}

@RabbitListener(queues = "work.queue")

public void receive2(String message) {

System.out.println("消费者2接收到队列 work.queue 的消息:" + message);

}

}

4.4 发布 / 订阅模型代码示例

4.4.1 配置交换机和队列
 

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.FanoutExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class FanoutConfig {

@Bean

public Queue fanoutQueue1() {

return new Queue("fanout.queue1");

}

@Bean

public Queue fanoutQueue2() {

return new Queue("fanout.queue2");

}

@Bean

public FanoutExchange fanoutExchange() {

return new FanoutExchange("fanout.exchange");

}

@Bean

public Binding binding1() {

return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());

}

@Bean

public Binding binding2() {

return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());

}

}

4.4.2 生产者代码
 

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

@Service

public class FanoutProducer {

@Autowired

private RabbitTemplate rabbitTemplate;

public void send(String exchangeName, String message) {

rabbitTemplate.convertAndSend(exchangeName, "", message);

System.out.println("发送消息到交换机:" + exchangeName + ",消息内容:" + message);

}

}

4.4.3 消费者代码
 

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Component

public class FanoutConsumer {

@RabbitListener(queues = "fanout.queue1")

public void receive1(String message) {

System.out.println("消费者1接收到队列 fanout.queue1 的消息:" + message);

}

@RabbitListener(queues = "fanout.queue2")

public void receive2(String message) {

System.out.println("消费者2接收到队列 fanout.queue2 的消息:" + message);

}

}

4.5 路由模型代码示例

4.5.1 配置交换机和队列
 

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.DirectExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class DirectConfig {

@Bean

public Queue directQueue1() {

return new Queue("direct.queue1");

}

@Bean

public Queue directQueue2() {

return new Queue("direct.queue2");

}

@Bean

public DirectExchange directExchange() {

return new DirectExchange("direct.exchange");

}

@Bean

public Binding binding1() {

return BindingBuilder.bind(directQueue1()).to(directExchange()).with("routing.key1");

}

@Bean

public Binding binding2() {

return BindingBuilder.bind(directQueue2()).to(directExchange()).with("routing.key2");

}

}

4.5.2 生产者代码
 

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

@Service

public class DirectProducer {

@Autowired

private RabbitTemplate rabbitTemplate;

public void send(String exchangeName, String routingKey, String message) {

rabbitTemplate.convertAndSend(exchangeName, routingKey, message);

System.out.println("发送消息到交换机:" + exchangeName + ",路由键:" + routingKey + ",消息内容:" + message);

}

}

4.5.3 消费者代码
 

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Component

public class DirectConsumer {

@RabbitListener(queues = "direct.queue1")

public void receive1(String message) {

System.out.println("消费者1接收到队列 direct.queue1 的消息:" + message);

}

@RabbitListener(queues = "direct.queue2")

public void receive2(String message) {

System.out.println("消费者2接收到队列 direct.queue2 的消息:" + message);

}

}

4.6 主题模型代码示例

4.6.1 配置交换机和队列
 

import org.springframework.amqp.core.Binding;

import org.springframework.amqp.core.BindingBuilder;

import org.springframework.amqp.core.TopicExchange;

import org.springframework.amqp.core.Queue;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

@Configuration

public class TopicConfig {

@Bean

public Queue topicQueue1() {

return new Queue("topic.queue1");

}

@Bean

public Queue topicQueue2() {

return new Queue("topic.queue2");

}

@Bean

public TopicExchange topicExchange() {

return new TopicExchange("topic.exchange");

}

@Bean

public Binding binding1() {

return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.#");

}

@Bean

public Binding binding2() {

return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("topic.*.test");

}

}

4.6.2 生产者代码
 

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

@Service

public class TopicProducer {

@Autowired

private RabbitTemplate rabbitTemplate;

public void send(String exchangeName, String routingKey, String message) {

rabbitTemplate.convertAndSend(exchangeName, routingKey, message);

System.out.println("发送消息到交换机:" + exchangeName + ",路由键:" + routingKey + ",消息内容:" + message);

}

}

4.6.3 消费者代码
 

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Component

public class TopicConsumer {

@RabbitListener(queues = "topic.queue1")

public void receive1(String message) {

System.out.println("消费者1接收到队列 topic.queue1 的消息:" + message);

}

@RabbitListener(queues = "topic.queue2")

public void receive2(String message) {

System.out.println("消费者2接收到队列 topic.queue2 的消息:" + message);

}

}

4.7 RPC 模型代码示例

4.7.1 生产者代码
 

import com.rabbitmq.client.AMQP;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.DefaultConsumer;

import com.rabbitmq.client.Envelope;

import java.io.IOException;

import java.util.UUID;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.LinkedBlockingQueue;

public class RPCClient {

private static final String RPC_QUEUE_NAME = "rpc_queue";

private Connection connection;

private Channel channel;

private String replyQueueName;

private BlockingQueue<String> responseQueue = new LinkedBlockingQueue<>();

private String corrId;

public RPCClient() throws IOException {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

connection = factory.newConnection();

channel = connection.createChannel();

replyQueueName = channel.queueDeclare().getQueue();

channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

if (properties.getCorrelationId().equals(corrId)) {

responseQueue.offer(new String(body, "UTF-8"));

}

}

});

}

public String call(String message) throws IOException, InterruptedException {

corrId = UUID.randomUUID().toString();

AMQP.BasicProperties props = new AMQP.BasicProperties

.Builder()

.correlationId(corrId)

.replyTo(replyQueueName)

.build();

channel.basicPublish("", RPC_QUEUE_NAME, props, message.getBytes("UTF-8"));

return responseQueue.take();

}

public void close() throws IOException {

connection.close();

}

}

4.7.2 消费者代码
 

import com.rabbitmq.client.AMQP;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.DefaultConsumer;

import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class RPCServer {

private static final String RPC_QUEUE_NAME = "rpc_queue";

public static void main(String[] argv) throws Exception {

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

channel.basicQos(1);

System.out.println(" [x] Awaiting RPC requests");

DefaultConsumer consumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag,

Envelope envelope,

AMQP.BasicProperties properties,

byte[] body) throws IOException {

AMQP.BasicProperties replyProps = new AMQP.BasicProperties

.Builder()

.correlationId(properties.getCorrelationId())

.build();

String message = new String(body, "UTF-8");

int n = Integer.parseInt(message);

System.out.println(" [.] fib(" + message + ")");

String response = "" + fib(n);

channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));

channel.basicAck(envelope.getDeliveryTag(), false);

}

};

channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

}

private static int fib(int n) {

if (n == 0) return 0;

if (n == 1) return 1;

return fib(n - 1) + fib(n - 2);

}

}

通过以上代码示例,我们展示了如何使用 Java 和 Spring AMQP 实现 RabbitMQ 的各种消息模型 。在实际应用中,可以根据具体的业务需求选择合适的消息模型,并对代码进行相应的优化和扩展。

五、总结

RabbitMQ 作为一款强大的消息队列中间件,其核心概念和消息模型是理解和使用它的关键 。通过深入了解生产者、消费者、交换机、队列、绑定、路由键、连接、信道以及虚拟主机等核心概念,我们明白了 RabbitMQ 是如何在分布式系统中实现高效、可靠的消息传递的 。不同类型的交换机和绑定规则,使得消息的路由更加灵活多样,能够满足各种复杂的业务需求 。

而 RabbitMQ 提供的多种消息模型,如简单队列模型、工作队列模型、发布 / 订阅模型、路由模型、主题模型和 RPC 模型,为我们解决不同场景下的消息通信问题提供了丰富的选择 。每种模型都有其独特的特点和适用场景,在实际应用中,我们需要根据具体的业务需求来选择合适的消息模型,以实现系统的最优性能和扩展性 。

通过本文的介绍和代码实战,希望大家对 RabbitMQ 的核心概念和消息模型有了更深入的理解和掌握 。在实际项目中,不断地实践和探索,充分发挥 RabbitMQ 的优势,为分布式系统的开发和优化提供有力的支持 。

http://www.xdnf.cn/news/5810.html

相关文章:

  • 开源模型应用落地-qwen模型小试-Qwen3-8B-融合VLLM、MCP与Agent(七)
  • 六、Hive 分桶
  • OpenHarmony平台驱动开发(十五),SDIO
  • tomcat与nginx之间实现多级代理
  • DeepSeek、B(不是百度)AT、科大讯飞靠什么坐上中国Ai牌桌?
  • css iconfont图标样式修改,js 点击后更改样式
  • 哈希表:数据世界的超级索引
  • 基于深度学习的工业OCR数字识别系统架构解析
  • 机器学习 --- 特征工程(一)
  • Spring Boot 使用 OSHI 实现系统运行状态监控接口
  • Conda在powershell终端中无法使用conda activate命令
  • docker及docker-compose安装及使用
  • mac 10.15.7 svn安装
  • 设计模式系列(02):设计原则(一):SRP、OCP、LSP
  • Visual Studio 2022 跨网络远程调试
  • 多线程(二)
  • 【2025年前端高频场景题系列】使用同一个链接,如何实现PC打开是web应用、手机打是-个H5 应用?
  • 免费Office图片音频高效提取利器
  • ik 分词器 设置自定义词典
  • @Component 注解:Spring 组件扫描与管理的基石
  • 如何使用 WebBrowserPassView 查看所有浏览器密码?
  • 【WordPress博客AI内容辅助生成/优化工具箱插件下载无标题】
  • 语义分割模型部署到嵌入式终端的通用操作流程
  • journalctl 日志查看工具介绍
  • istringstream的简化源码详解
  • 热部署与双亲委派
  • pclinuxos系统详解
  • 应急响应靶机——WhereIS?
  • CRM和SCRM有什么区别
  • python实现usb热插拔检测(windows)