当前位置: 首页 > ops >正文

RabbitMQ ③-Spring使用RabbitMQ

在这里插入图片描述

Spring使用RabbitMQ

创建 Spring 项目后,引入依赖:

<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-amqp -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件 application.yml

spring:application:name: spring-rabbitmq-demorabbitmq:
#    host: 47.94.9.33
#    port: 5672
#    username: admin
#    password: admin
#    virtual-host: /addresses: amqp://admin:admin@47.94.9.33:5672/

Work-Queue(工作队列模式)

声明队列

package com.ljh.mq.springrabbitmqdemo.config;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// * 工作队列模式@Beanpublic Queue workQueue() {return QueueBuilder.durable(Constants.WORK_QUEUE).build();}
}

生产者

package com.ljh.mq.springrabbitmqdemo.controller;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RequestMapping("/producer")
@RestController
public class ProducerController {private static final Logger log = LoggerFactory.getLogger(ProducerController.class);@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/work")public String work() {for (int i = 0; i < 10; i++) {String msg = "hello work queue mode~ " + i;// ? 当使用默认交换机时,routingKey 和队列名称保持一致rabbitTemplate.convertAndSend("", Constants.WORK_QUEUE, msg);}log.info("消息发送成功");return "消息发送成功";}
}

消费者

package com.ljh.mq.springrabbitmqdemo.listener;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class WorkListener {private static final Logger log = LoggerFactory.getLogger(WorkListener.class);@RabbitListener(queues = Constants.WORK_QUEUE)public void process1(Message message, Channel channel) {log.info("[process1]:成功接收到消息:[{}]:{}", Constants.WORK_QUEUE, message);}@RabbitListener(queues = Constants.WORK_QUEUE)public void process2(String message) {log.info("[process2]:成功接收到消息:[{}]:{}", Constants.WORK_QUEUE, message);}
}

Publish/Subscribe(发布/订阅模式)

声明队列和交换机

package com.ljh.mq.springrabbitmqdemo.config;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// * 发布订阅模式@Bean("fanoutExchange")public FanoutExchange fanoutExchange() {return ExchangeBuilder.fanoutExchange(Constants.FANOUT_EXCHANGE).durable(true).build();}@Bean("fanoutQueue1")public Queue fanoutQueue1 () {return QueueBuilder.durable(Constants.FANOUT_QUEUE1).build();}@Bean("fanoutQueue2")public Queue fanoutQueue2 () {return QueueBuilder.durable(Constants.FANOUT_QUEUE2).build();}@Bean("bindingFanout1")public Binding bindingFanout1(@Qualifier("fanoutExchange") FanoutExchange exchange, @Qualifier("fanoutQueue1") Queue queue) {return BindingBuilder.bind(queue).to(exchange);}@Bean("bindingFanout2")public Binding bindingFanout2(@Qualifier("fanoutExchange") FanoutExchange exchange, @Qualifier("fanoutQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange);}
}

生产者

package com.ljh.mq.springrabbitmqdemo.controller;
package com.ljh.mq.springrabbitmqdemo.controller;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RequestMapping("/producer")
@RestController
public class ProducerController {private static final Logger log = LoggerFactory.getLogger(ProducerController.class);@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/fanout")public String fanout() {for (int i = 0; i < 10; i++) {String msg = "hello publish fanout mode~ " + i;// ? 当使用默认交换机时,routingKey 和队列名称保持一致rabbitTemplate.convertAndSend(Constants.FANOUT_EXCHANGE, "", msg);}log.info("消息发送成功");return "消息发送成功";}
}

消费者

package com.ljh.mq.springrabbitmqdemo.listener;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class FanoutListener {private static final Logger log = LoggerFactory.getLogger(FanoutListener.class);@RabbitListener(queues = Constants.FANOUT_QUEUE1)public void process1(String message) {log.info("[process1]:成功接收到消息:[{}]:{}", Constants.FANOUT_QUEUE1, message);}@RabbitListener(queues = Constants.FANOUT_QUEUE2)public void process2(String message) {log.info("[process2]:成功接收到消息:[{}]:{}", Constants.FANOUT_QUEUE2, message);}
}

Routing(路由模式)

声明队列和交换机

package com.ljh.mq.springrabbitmqdemo.config;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// * 路由模式@Bean("directExchange")public DirectExchange directExchange() {return ExchangeBuilder.directExchange(Constants.DIRECT_EXCHANGE).durable(true).build();}@Bean("directQueue1")public Queue directQueue1() {return QueueBuilder.durable(Constants.DIRECT_QUEUE1).build();}@Bean("directQueue2")public Queue directQueue2() {return QueueBuilder.durable(Constants.DIRECT_QUEUE2).build();}@Bean("bindingDirect1")public Binding bindingDirect1(@Qualifier("directExchange") DirectExchange exchange, @Qualifier("directQueue1") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("orange");}@Bean("bindingDirect2")public Binding bindingDirect2(@Qualifier("directExchange") DirectExchange exchange, @Qualifier("directQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("orange");}@Bean("bindingDirect3")public Binding bindingDirect3(@Qualifier("directExchange") DirectExchange exchange, @Qualifier("directQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("black");}
}

生产者

package com.ljh.mq.springrabbitmqdemo.controller;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RequestMapping("/producer")
@RestController
public class ProducerController {private static final Logger log = LoggerFactory.getLogger(ProducerController.class);@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/direct/{routingKey}")public String direct(@PathVariable("routingKey") String routingKey) {rabbitTemplate.convertAndSend(Constants.DIRECT_EXCHANGE, routingKey, "hello routing mode~;routingKey is " + routingKey);log.info("消息发送成功:{}", routingKey);return "消息发送成功:" + routingKey;}
}

消费者

package com.ljh.mq.springrabbitmqdemo.listener;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class DirectListener {private static final Logger log = LoggerFactory.getLogger(DirectListener.class);@RabbitListener(queues = Constants.DIRECT_QUEUE1)public void process1(String message) {log.info("队列[{}]成功接收到消息:{}", Constants.DIRECT_QUEUE1, message);}@RabbitListener(queues = Constants.DIRECT_QUEUE2)public void process2(String message) {log.info("队列[{}]成功接收到消息:{}", Constants.DIRECT_QUEUE2, message);}
}

Topics(通配符模式)

声明队列和交换机

package com.ljh.mq.springrabbitmqdemo.config;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {// * 通配符模式@Bean("topicExchange")public TopicExchange topicExchange() {return ExchangeBuilder.topicExchange(Constants.TOPIC_EXCHANGE).durable(true).build();}@Bean("topicQueue1")public Queue topicQueue1() {return QueueBuilder.durable(Constants.TOPIC_QUEUE1).build();}@Bean("topicQueue2")public Queue topicQueue2() {return QueueBuilder.durable(Constants.TOPIC_QUEUE2).build();}@Bean("bindingTopic1")public Binding bindingTopic1(@Qualifier("topicExchange") TopicExchange exchange, @Qualifier("topicQueue1") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("*.orange.*");}@Bean("bindingTopic2")public Binding bindingTopic2(@Qualifier("topicExchange") TopicExchange exchange, @Qualifier("topicQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("*.*.rabbit");}@Bean("bindingTopic3")public Binding bindingTopic3(@Qualifier("topicExchange") TopicExchange exchange, @Qualifier("topicQueue2") Queue queue) {return BindingBuilder.bind(queue).to(exchange).with("lazy.#");}
}

生产者

package com.ljh.mq.springrabbitmqdemo.controller;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RequestMapping("/producer")
@RestController
public class ProducerController {private static final Logger log = LoggerFactory.getLogger(ProducerController.class);@AutowiredRabbitTemplate rabbitTemplate;@RequestMapping("/topic/{routingKey}")public String topic(@PathVariable("routingKey") String routingKey) {rabbitTemplate.convertAndSend(Constants.TOPIC_EXCHANGE, routingKey, "hello topic mode~;routingKey is " + routingKey);log.info("消息发送成功:{}", routingKey);return "消息发送成功:" + routingKey;}
}

消费者

package com.ljh.mq.springrabbitmqdemo.listener;import com.ljh.mq.springrabbitmqdemo.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class TopicListener {private static final Logger log = LoggerFactory.getLogger(TopicListener.class);@RabbitListener(queues = Constants.TOPIC_QUEUE1)public void process1(String message) {log.info("队列[{}]成功接收到消息:{}", Constants.TOPIC_QUEUE1, message);}@RabbitListener(queues = Constants.TOPIC_QUEUE2)public void process2(String message) {log.info("队列[{}]成功接收到消息:{}", Constants.TOPIC_QUEUE2, message);}
}
http://www.xdnf.cn/news/5560.html

相关文章:

  • 基于 Spring Boot 瑞吉外卖系统开发(十二)
  • labview硬件驱动——测试软件的安装(基于win11系统)
  • 支持向量机算法
  • K8s进阶之一文搞懂PV,PVC及SC
  • 修改网页标签处文字
  • kubuntu系统详解
  • 【RabbitMQ】应用问题、仲裁队列(Raft算法)和HAProxy负载均衡
  • 类和对象(1)--《Hello C++ Wrold!》(3)--(C/C++)
  • 【Linux笔记】——进程信号的保存
  • 51单片机引脚功能概述
  • 十五、多态与虚函数
  • labview硬件采集
  • 数字人教学技术与产品方案的全面解析
  • 42、在.NET 中能够将⾮静态的⽅法覆写成静态⽅法吗?
  • 本地不安装oracle,还想连oracle
  • c++STL-STL简介和vector的使用
  • ngx_http_keyval_module动态键值管理
  • 基于STM32、HAL库的RN8209C电能计量芯片驱动程序设计
  • 系统架构-嵌入式系统架构
  • AI 搜索引擎 MindSearch
  • 香港维尔利健康科技集团亮相中国资本市场发展年会,被评为“最具投资价值医疗科技企业”
  • 面试题解析 | C++空类的默认成员函数(附生成条件与底层原理)
  • 高吞吐与低延迟的博弈:Kafka与RabbitMQ数据管道实战指南
  • 互联网大厂Java求职面试:优惠券服务架构设计与AI增强实践-1
  • 七、基于HAL库,实现串口+DMA+状态机通信实现
  • 国产化Excel处理控件Spire.XLS系列教程:如何通过 C# 删除 Excel 工作表中的筛选器
  • HTML简单语法标签(后续实操:云备份项目)
  • 《Spring Boot 4.0新特性深度解析》
  • 企业即时通讯软件,私有化安全防泄密
  • 图灵爬虫练习平台第十九题js逆向