当前位置: 首页 > news >正文

黑马商城(六)RabbitMQ

一、同步调用

二、异步调用

三、MQ技术选型

快速Docker部署:

docker run \-e RABBITMQ_DEFAULT_USER=itheima \-e RABBITMQ_DEFAULT_PASS=123321 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network hmall\-d \rabbitmq:3.8-management

RabbitMQ:

数据隔离: 

案例:

 快速入门(实现基于MQ收发消息):

        <!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

spring:rabbitmq:host: 192.168.50.129 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码

Work Queues:

案例:

构造了两个方法模拟两个消费者

package com.itheima.consumer.mq;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.time.LocalDateTime;@Slf4j
@Component
public class SpringRabbitListener {@RabbitListener(queues = "work.queue")public void listenWorkQueue1(String message){System.out.println("消费者1接收到消息: "+message+ ","+ LocalDateTime.now());}@RabbitListener(queues = "work.queue")public void listenWorkQueue2(String message){System.err.println("消费者2......接收到消息: "+message+ ","+ LocalDateTime.now());}}
package com.itheima.publisher;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest
class SpringAmqpTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testpublic void testWorkQueue(){//1.队列名String queueName="work.queue";//2.消息for (int i=1;i<=50;i++){String message="hello.spring amqp_"+i;//3.发送消息rabbitTemplate.convertAndSend(queueName,message);}}
}

    @RabbitListener(queues = "work.queue")public void listenWorkQueue1(String message) throws InterruptedException {System.out.println("消费者1接收到消息: "+message+ ","+ LocalDateTime.now());Thread.sleep(25);}@RabbitListener(queues = "work.queue")public void listenWorkQueue2(String message) throws InterruptedException {System.err.println("消费者2......接收到消息: "+message+ ","+ LocalDateTime.now());Thread.sleep(200);}

默认性能不影响分配规则

交换机:

Fanout交换机:

 案例:

    @Testpublic void testFanoutQueue(){//1.交换机名String exName="hmall.fanout";//2.消息String message="hello.everyone!";//3.发送消息rabbitTemplate.convertAndSend(exName,null,message);}
    @RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String message){log.info("消费者1监听到fanout.queue1的消息:{}",message);}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String message){log.info("消费者2监听到fanout.queue2的消息:{}",message);}

Direct交换机:

案例:

    @RabbitListener(queues = "direct.queue1")public void listenDirectQueue1(String message){log.info("消费者1监听到direct.queue1的消息:{}",message);}@RabbitListener(queues = "direct.queue2")public void listenDirectQueue2(String message){log.info("消费者2监听到 direct.queue2的消息:{}",message);}
    @Testpublic void testDirectQueue(){//1.交换机名String exName="hmall.direct";//2.消息String message="hello.yellow!";//3.发送消息rabbitTemplate.convertAndSend(exName,"yellow",message);}

Topic交换机:

案例:
    @Testpublic void testTopicQueue(){//1.交换机名String exName="hmall.topic";//2.消息String message="天气不错";//3.发送消息rabbitTemplate.convertAndSend(exName,"china.weather",message);}

声明队列交换机: 

package com.itheima.consumer.Config;import com.rabbitmq.client.AMQP;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfiguration {@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("hmall.fanout");/* return ExchangeBuilder.fanoutExchange("hamll.fanout").build();*/}@Beanpublic Queue fanoutQueue1(){return QueueBuilder.durable("fanout.queue1").build();/*return new Queue("fanout.queue1");*/}@Beanpublic Binding fanoutQueue1Binding(Queue fanoutQueue1,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Queue fanoutQueue2(){return QueueBuilder.durable("fanout.queue2").build();}@Beanpublic Binding fanoutQueue2Binding(Queue fanoutQueue2,FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}}
案例:

法一:
package com.itheima.consumer.Config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DirectConfiguration {@Beanpublic DirectExchange directExchange(){return new DirectExchange("hmall.direct");/* return ExchangeBuilder.fanoutExchange("hamll.direct").build();*/}@Beanpublic Queue directQueue1(){return QueueBuilder.durable("direct.queue1").build();/*return new Queue("direct.queue1");*/}//Direct交换机的Key每次只能绑定一个Key@Beanpublic Binding directQueue1BindingRed(Queue directQueue1,DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}@Beanpublic Binding directQueue1BindingBlue(Queue directQueue1,DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}@Beanpublic Queue directQueue2(){return QueueBuilder.durable("direct.queue2").build();/*return new Queue("direct.queue1");*/}//Direct交换机的Key每次只能绑定一个Key@Beanpublic Binding directQueue2BindingRed(Queue directQueue2,DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}@Beanpublic Binding directQueue2BindingYellow(Queue directQueue2,DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}
}
法二(基于注解):

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1",durable = "true"),exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}))/*@RabbitListener(queues = "direct.queue1")*/public void listenDirectQueue1(String message){log.info("消费者1监听到direct.queue1的消息:{}",message);}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2",durable = "true"),exchange = @Exchange(name = "hmall.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))/*@RabbitListener(queues = "direct.queue2")*/public void listenDirectQueue2(String message){log.info("消费者2监听到 direct.queue2的消息:{}",message);}

消息转换器:

案例:

package com.itheima.publisher.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MessageConverterConfiguration {@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}

    @RabbitListener(queues = "object.queue")public void listenObjectQueue(Map<String,Object> message){log.info("消费者2监听到 direct.queue2的消息:{}",message);}

四、业务改造

配置依赖:

        <!--amqp--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

配置yaml文件:

spring:rabbitmq:host: 192.168.50.129 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码

在Common中配置一个Json消息转换器:

package com.hmall.common.config;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class MqConfig {@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}

交易微服务中置消费者 Consumer--Listener(监听):

package com.hmall.trade.listener;import com.hmall.trade.service.IOrderService;
import lombok.RequiredArgsConstructor;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
@RequiredArgsConstructor
public class PayStatusListener {private final IOrderService orderService;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "trade.pay.success.queue",durable = "true"),exchange = @Exchange(name = "pay.direct",type = ExchangeTypes.DIRECT),key = {"pay.success"}))public void listenPaySuccess(Long orderId){orderService.markOrderPaySuccess(orderId);}}

 支付微服务中设置Publisher--基于MQ发送消息:

注入 RabbitTemplate  来实现

    private final RabbitTemplate rabbitTemplate;   @Override@Transactionalpublic void tryPayOrderByBalance(PayOrderFormDTO payOrderFormDTO) {// 1.查询支付单PayOrder po = getById(payOrderFormDTO.getId());// 2.判断状态if(!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())){// 订单不是未支付,状态异常throw new BizIllegalException("交易已支付或关闭!");}// 3.尝试扣减余额/*userService.deductMoney(payOrderFormDTO.getPw(), po.getAmount());*/userClient.deductMoney(payOrderFormDTO.getPw(), po.getAmount());// 4.修改支付单状态boolean success = markPayOrderSuccess(payOrderFormDTO.getId(), LocalDateTime.now());if (!success) {throw new BizIllegalException("交易已支付或关闭!");}//5.修改订单状态//异步通知try {rabbitTemplate.convertAndSend("pay.direct","pay.success",po.getBizOrderNo());} catch (Exception e) {log.error("发送支付状态通知失败,订单id:{}",po.getBizOrderNo(),e);//兜底方案}}

http://www.xdnf.cn/news/85663.html

相关文章:

  • 使用达梦官方管理工具SQLark快速生成数据库ER图并导出
  • ProxySQL 在路由层的核心作用
  • 深入理解CSS中的`transform-origin`属性
  • day30 学习笔记
  • 【C到Java的深度跃迁:从指针到对象,从过程到生态】第三模块·面向对象深度进化 —— 第十章 继承:超越C结构体嵌套的维度
  • 实时监测+远程管控:ADW300解锁阳台光伏运维新维度
  • 音视频学习 - MP3格式
  • 如何选择正确的PCB材料
  • 为什么家电主板采用GND走线而不是整面铺GND铜
  • [特殊字符]fsutil命令用法详解
  • Kotlin 的 suspend 关键字
  • 【文献分享】Model-based evaluation提供了数据和代码
  • synchronized锁
  • 为啥低速MCU单板辐射测试会有200M-1Ghz的辐射信号
  • ZYNQ笔记(十二):SD卡读写txt
  • 【Git】Fork和并请求
  • 《MySQL 核心技能:SQL 查询与数据库概述》
  • CentOS笔记本合上盖子不休眠
  • WeakSet:JavaScript 中容易被忽视的“弱集合”
  • 2025年4月22日第一轮
  • 本地部署DeepSeek-R1模型接入PyCharm
  • Java常用正则表达式及使用方法
  • 【屠龙勇士】BIT睿信书院屠龙勇士心得分享
  • Buffer of Thoughts: Thought-Augmented Reasoningwith Large Language Models
  • 第八天 AI开发:NavMesh导航系统 对话系统:使用ScriptableObject存储对话数据 存档系统:JSON序列化保存数据
  • 在Windows上安装Git
  • UDP协议理解
  • Linux 系统中使用 OpenSSL 生成适用于 IIS 的证书
  • L2-2、示范教学与角色扮演:激发模型“模仿力“与“人格“
  • Selenium 在爬取过程中,网络响应被退出的解决方案