当前位置: 首页 > news >正文

微服务--消息队列mq

1. mq简介

        消息队列是分布式系统中的异步通信中间件,采用"生产者-消费者"模型实现服务间解耦通信

核心作用

  • 服务解耦
  • 异步处理
  • 流量削峰
  • 数据同步
  • 最终一致性

消息队列模式

  • 发布/订阅模式:一对多广播
  • 工作队列模式:竞争消费
  • 死信队列:处理失败消息
  • 延迟队列:定时任务处理
  • 消息回溯:Kafka按offset重新消费

2. mq入门

        使用SpringAMQP实现HelloWorld中的基础消息队列功能,一个生产者,一个队列,一个消费者

2.1 启动mq

        打开mq下载目录,输入命令(rabbitmq-server start)启动

网址localhost:15672访问,账号密码均为guest

2.2 导入依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.gaohe</groupId><artifactId>clouddemo</artifactId><packaging>pom</packaging><version>0.0.1-SNAPSHOT</version><modules><module>publisher</module><module>consumer</module></modules><name>clouddemo</name><description>clouddemo</description><properties><java.version>17</java.version><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-boot.version>3.3.3</spring-boot.version><maven.compiler.source>17</maven.compiler.source><maven.compiler.target>17</maven.compiler.target></properties><dependencies><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-dependencies</artifactId><version>${spring-boot.version}</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring-boot.version}</version><configuration><mainClass>com.gaohe.clouddemo.ClouddemoApplication</mainClass><skip>true</skip></configuration><executions><execution><id>repackage</id><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build></project>

2.3 在yml配置文件中配置连接信息

spring:rabbitmq:host: localhost # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: guest # 用户名password: guest # 密码

2.4 在publisher中利用RabbitTemplate发送信息到simple.queue队列

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@Slf4j
@SpringBootTest(classes = PublisherApplication.class)
public class PublisherTest {@Autowiredpublic RabbitTemplate rabbitTemplate;//    发送消息
@Test
public void test1(){
//        1.发送的队列String queueName1 ="hello.queue";
//        2.发送的消息String msg = "你好我哟一个帽衫";
//        3.发送rabbitTemplate.convertAndSend(queueName1,msg);
}}

2.5 在consumer服务中编写消费逻辑,绑定simple.queue这个队列

package com.gaohe.consumer.lisenner;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class HelloLisenner {@RabbitListener(queues = "hello.queue")public void helloQueueLisenner(String msg){System.out.println("helloQueueLisenner"+msg);}@RabbitListener(queues = "hello.queue")public void helloQueueLisenner2(String msg){System.out.println("helloQueueLisenner2"+msg);}}

3.交换机

        Exchange是消息队列系统中的消息路由中枢,负责接收生产者发送的消息并根据特定规则将消息路由到一个或多个队列中。

常见exchange类型包括:

  • Fanout:广播
  • Direct:路由
  • Topic:话题

3.1 路由交换机(FanoutExchange)

  • 在consumer服务创建一个类,添加注解,声明交换机,队列以及绑定关系对象
package com.gaohe.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
public class FanoutConfig {//    交换机@Beanpublic FanoutExchange fanout1(){return new FanoutExchange("itgaohe.fanout");}//    定义队列@Beanpublic Queue queue1(){return new Queue("fanout.queue1");}//    队列绑定交换机@Beanpublic Binding binding1(FanoutExchange fanout1){return BindingBuilder.bind(queue1()).to(fanout1);}//    定义队列@Beanpublic Queue queue2(){return new Queue("fanout.queue2");}//    队列绑定交换机@Beanpublic Binding binding2(FanoutExchange fanout1){return BindingBuilder.bind(queue2()).to(fanout1);}
}

 

  • 在consumer服务中的监听类中添加方法进行监听
package com.gaohe.consumer.lisenner;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class FanoutLisenner {@RabbitListener(queues = "fanout.queue1")public void fanoutQueueLisenner(String msg){System.out.println("fanoutQueueLisenner:"+msg);}@RabbitListener(queues = "fanout.queue2")public void fanoutQueueLisenner2(String msg){System.out.println("fanoutQueueLisenner2:"+msg);}
}

 

  • 在publisher服务创建测试类进行测试

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@Slf4j
@SpringBootTest(classes = PublisherApplication.class)
public class PublisherTest {@Autowiredpublic RabbitTemplate rabbitTemplate;@Testpublic void test3(){
//        1.发送的队列String exName ="itgaohe.fanout";
//        2.发送的消息String msg = "你好";
//        3.发送rabbitTemplate.convertAndSend(exName,"",msg);}}

3.2 路由交换机(DirectExchange)

        交换机,队列不仅可以单独配置,也可以在监听类使用注解进行配置

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;@Component
public class DirectLisenner {@RabbitListener(bindings = @QueueBinding(value = @Queue("direct.queue1"),exchange = @Exchange(value = "itgaohe.direct",type = ExchangeTypes.DIRECT),key = {"blue","red"}))public void directQueueLisenner(String msg){System.out.println("directQueueLisenner"+msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue("direct.queue2"),exchange = @Exchange(value = "itgaohe.direct",type = ExchangeTypes.DIRECT),key = {"yellow","red"}))public void directQueueLisenner2(String msg){System.out.println("directQueueLisenner2"+msg);}}

         publisher测试类进行测试

package com.gaohe.publisher;import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@Slf4j
@SpringBootTest(classes = PublisherApplication.class)
public class PublisherTest {@Autowiredpublic RabbitTemplate rabbitTemplate;@Testpublic void test3(){
//        1.发送的队列String exName ="itgaohe.direct";
//        2.发送的消息String msg = "I LOVE YOU ";
//        3.发送rabbitTemplate.convertAndSend(exName,"yellow",msg);}}

3.3 广播交换机(TopicExchange)

  • 监听类
package com.gaohe.consumer.lisenner;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import java.util.Map;@Component
public class TopicLisenner {@RabbitListener(bindings = @QueueBinding(value = @Queue("topic.queue1"),exchange = @Exchange(value = "itgaohe.topic",type = ExchangeTypes.TOPIC),key = {"china.#","#.weather"}))public void directQueueLisenner(String msg){System.out.println("directQueueLisenner"+msg);}@RabbitListener(bindings = @QueueBinding(value = @Queue("topic.queue2"),exchange = @Exchange(value = "itgaohe.topic",type = ExchangeTypes.TOPIC),key = {"us.#","#.weather"}))public void directQueueLisenner2(String msg){System.out.println("directQueueLisenner2"+msg);}}
  • 测试类
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;@Slf4j
@SpringBootTest(classes = PublisherApplication.class)
public class PublisherTest {@Autowiredpublic RabbitTemplate rabbitTemplate;@Testpublic void test4(){
//        1.发送的String exName ="itgaohe.topic";
//        2.发送的消息String msg = "hello world6666";
//        3.发送rabbitTemplate.convertAndSend(exName,"aa.weather",msg);}
}

        用的最多的是路由交换机和广播交换机

4. mq消息转换器

        消息转换器是消息中间件中的数据格式转换层,负责在消息生产/消费过程中实现:

  • Java对象 ↔ 消息体序列化/反序列化

  • 消息属性(headers/properties)的自动处理

  • 不同数据格式间的相互转换

配置消息转换器

  • 父工程导入依赖
<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId>
</dependency>
  • 给提供者和消费者配置消息转换器Bean对象
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;@Component
public class mqConfig {@Beanpublic MessageConverter messageConverter(){return new Jackson2JsonMessageConverter();}
}
  • 定义消费者,监听队列并消费消息

  • 测试

 

http://www.xdnf.cn/news/1014319.html

相关文章:

  • 准确--CentOS 7.9在线安装docker
  • 微服务--nacos+feign
  • 开发指南121-微服务的弹性伸缩
  • 20.excel制作图表,图表跟随数据行数的变化而自动更新
  • 【prometheus+Grafana篇】基于Prometheus+Grafana实现postgreSQL数据库的监控与可视化
  • 产品推荐|一款具有单光子级探测能力的科学相机千眼狼Gloria 1605
  • RabbitMQ的使用--项目创建、五种工作模式、高级特性
  • VR 虚拟云展:科技浪潮下的新趋势​
  • 《第四章-筋骨淬炼》 C++修炼生涯笔记(基础篇)数组与函数
  • 砂石骨料数字孪生工厂应用案例:远眺科技三维可视化落地成效
  • 【解决方案】Kali 2022.3修复仓库密钥无交互一键安装docker,docker compose
  • 卷积神经网络(一)基础入门
  • VIC-3D应用指南系列之:DIC数字图像相关技术与热成像(VIC-3D IR System助力热载荷测试)
  • ue5的blender4.1groom毛发插件v012安装和使用方法(排除了冲突错误)
  • 小型化边缘计算设备 特点
  • ubuntu 系统 多条命令通过 bash 脚本执行
  • 深入解析 MySQL 架构:从基础到高级
  • 20250613在Ubuntu20.04.6下编译Rockchip的RK3576原厂Android14的SDK【整理编译】
  • 【Java学习笔记】集合介绍
  • C语言文件操作与预处理详解
  • 面向GPU、CPU及机器学习加速器的机器学习编译器
  • Blender基础知识-操作模式、基本操作、渲染、灯光、材质、粒子系统、动画
  • springboot项目中整合高德地图
  • leetcode题解538:把二叉搜索树转换为累加树
  • 微型导轨在实验室场景中的多元应用
  • 个人支出智能分析系统
  • 【HarmonyOS Next之旅】DevEco Studio使用指南(三十三) -> 构建任务
  • finereport普通报表根据用户权限限制数据查询
  • 动态规划算法的欢乐密码(二):路径问题
  • 【软件开发】什么是DSL