RabbitMQ的使用--Spring AMQP(更新中)
1.首先是创建项目
在一个父工程 mq_demo 的基础上建立两个子模块,生产者模块publisher,消费者模块 consumer
创建项目:
建立成功:
删除多余文件
创建子模块1:publisher(生产者模块)
右键-----new ----module
选中Java,填写publisher,选中maven,确认父模块
创建成功
同理:创建子模块2:consumer(消费者模式)
至此:项目创建完毕
2.进行基本配置(pom.xml、application.yml)
引入依赖:父模块引入依赖,子模块共享父模块依赖
pom.xml
<dependencies><!--AMQP依赖,包含 rabbitmq --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.13.2</version><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>
application.yml
logging:pattern:dateformat: yyyy-MM-dd HH:mm:ss.SSSlevel:mq.listener: debug
spring:rabbitmq:host: 127.0.0.1port: 5672username: guestpassword: guestvirtual-host: /
结构图:
建立具体的包结构,以及要用的一些类
消费者启动类:ConsumerApplication.class
@SpringBootApplication
public class ConsumerApplication {public static void main(String[] args) {SpringApplication.run(ConsumerApplication.class, args);}}
生产者启动类:PublisherApplication.class
@SpringBootApplication
public class PublisherApplication {public static void main(String[] args) {org.springframework.boot.SpringApplication.run(PublisherApplication.class, args);}
}
消费者监听类:SpringRabbitListerner.class
@Component
public class SpringRabbitListerner {@RabbitListener(queues = "queue.simple")public void listenSimpleQueueMessage(String msg){System.out.println("简单模式-消费者消费消息:"+msg);}
}
生产者启动类:SpringAmqpTest.class
@ExtendWith(SpringExtension.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendSimpleMessage() {String simpleQueue = "queue.simple";String message = "hello world";rabbitTemplate.convertAndSend(simpleQueue,message);}
}
做到这里,就已经可以进行mq的消息进行发送和获取了。
3.Spring AMQP的五种工作模式
生产者启动类:SpringAmqpTest.class
@ExtendWith(SpringExtension.class)
@SpringBootTest
public class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendSimpleMessage() {String simpleQueue = "queue.simple";String message = "hello world";rabbitTemplate.convertAndSend(simpleQueue,message);}@Testpublic void testSendWorkQueue(){String workQueue = "queue.work";for(int i=1 ;i<=10;i++){String message = "hello world.."+i;rabbitTemplate.convertAndSend(workQueue,message);}}@Testpublic void testSendFanout(){String fanoutExchange = "amq.fanout";String message = "hello world fanout..";rabbitTemplate.convertAndSend(fanoutExchange,"",message);}@Testpublic void testSendDirect(){String directExchange = "amq.direct";String message = "hello world direct..";rabbitTemplate.convertAndSend(directExchange,"red",message);}}
消费者监听类:SpringRabbitListerner.class
@Component
public class SpringRabbitListerner {@RabbitListener(queues = "queue.simple")public void listenSimpleQueueMessage(String msg){System.out.println("简单模式-消费者消费消息:"+msg);}@RabbitListener(queues = "queue.work")public void listenWorkQueueMessage1(String msg){System.out.println("工作模式-消费者消费消息:"+msg);}@RabbitListener(queues = "queue.work")public void listenWorkQueueMessage2(String msg){System.out.println("工作模式-消费者消费消息2:"+msg);}@RabbitListener(queues = "queue.fanout1")public void listenFanoutQueueMessage1(String msg){System.out.println("发布订阅模式-消费者1消费消息:"+msg);}@RabbitListener(queues = "queue.fanout2")public void listenFanoutQueueMessage2(String msg){System.out.println("发布订阅模式-消费者2消费消息:"+msg);}@RabbitListener(queues = "queue.direct1")public void listenDirectQueueMessage(String msg){System.out.println("路由模式-消费者消费消息:"+msg);}@RabbitListener(queues = "queue.direct2")public void listenTopicQueueMessage1(String msg){System.out.println("路由模式-消费者1消费消息:"+msg);}}
所用到的配置类:
主要是建立交换机、建立队列、绑定交换机和队列关系的
订阅者模式
@Configuration
public class FunoutConfig {@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("amq.fanout");}@Beanpublic Queue queue1() {return new Queue("queue.fanout1");}@Beanpublic Queue queue2() {return new Queue("queue.fanout2");}@Beanpublic Binding binding1(Queue queue1, FanoutExchange fanoutExchange) {return BindingBuilder.bind(queue1).to(fanoutExchange);}@Beanpublic Binding binding2(Queue queue2, FanoutExchange fanoutExchange) {return BindingBuilder.bind(queue2).to(fanoutExchange);}}
路由模式
@Configuration
public class DirectConfig {@Beanpublic DirectExchange directExchange(){return new DirectExchange("amq.direct");}@Beanpublic Queue directQueue1(){return new Queue("queue.direct1");}@Beanpublic Queue directQueue2(){return new Queue("queue.direct2");}@Beanpublic Binding directBinding1(Queue queue1, DirectExchange directExchange){return BindingBuilder.bind(queue1).to(directExchange).with("yellow");}@Beanpublic Binding directBinding2(Queue queue2, DirectExchange directExchange){return BindingBuilder.bind(queue2).to(directExchange).with("red");}}