当前位置: 首页 > backend >正文

消息中间件RabbitMQ03:结合WebAPI实现点对点(P2P)推送和发布-订阅推送的Demo

一、模式的区别

  • 点对点模式(P2P):开启多个消费者程序(绑定到同一个队列名称)连接RabbitMQ,这些消费者 不会消费到同样的消息 ,每个消息只会被其中一个消费者消费
  • 发布/订阅(Publish/Subscribe)模式:开启多个消费者程序(绑定到同一个交换机名称)连接RabbitMQ,这些消费者会消费到同样的消息,类似于消息广播
  • 路由模式(Route):和点对点差不多,点对点依靠队列名称入队,路由模式依靠交换机+路由键+队列名称入队(简单了解即可)
  • 简单比喻,P2P就是你打电话给某一人告诉它八卦,Pub/Sub就是你播新闻八卦(人人皆知)。

【P2P】

【Pub/Sub】

模式交换机类型路由键
点对点默认""默认""
发布订阅Fanout,需要指定默认""
路由模式Direct,需要指定需要指定 routingKey

二、点对点推送

1.文件夹结构

2.连接管理类

我的消息队列是本地的,默认端口5672,账密都是consumer

public static class RabbitMQConnectionManager
{private static readonly Lazy<Task<IConnection>> _connection = new Lazy<Task<IConnection>>(() =>{var factory = new ConnectionFactory(){HostName = "localhost",UserName = "consumer",Password = "consumer",};return factory.CreateConnectionAsync();});// 获取连接public static Task<IConnection> Connection =>  _connection.Value;// 获取通道public static async Task<IChannel> CreateChannelAsync(){var connection = await Connection;return await connection.CreateChannelAsync();}
}

3.生产者

public class RabbitMQProducerService : IRabbitMQProducerService
{private readonly string queueName = "hello";public async Task<bool> SendMessage(TimeEntity timeEntity){try{// 01 建立队列连接using var channel = await RabbitMQConnectionManager.CreateChannelAsync();// 02 声明队列(不存在则创建,确保消费前队列已就绪)await channel.QueueDeclareAsync(queue: queueName,durable: false,exclusive: false,autoDelete: false,arguments: null);// 03 发送内容到消息队列var message = JsonSerializer.Serialize(timeEntity);var body = Encoding.UTF8.GetBytes(message);await channel.BasicPublishAsync(exchange: "",routingKey: queueName,body: body);Console.WriteLine($"生产者发送了: {message}");return true;}catch (Exception ex){return false;}}
}

4.消费者

ublic class RabbitMQConsumerService : IRabbitMQConsumerService{private readonly string queueName = "hello";public async Task StartConsuming(){try{// 01 建立队列连接using var channel = await RabbitMQConnectionManager.CreateChannelAsync();// 02 消费的逻辑方法var consumer = new AsyncEventingBasicConsumer(channel);ConsumeMethod(channel, consumer);// 03 启动消费(禁用自动确认,需在消费逻辑ConsumeMethod中手动确认消息)await channel.BasicConsumeAsync(queue: queueName,autoAck: false,//禁止使用消息自动确认consumer: consumer);Console.WriteLine("点击退出");Console.ReadLine();}catch (Exception ex){Console.WriteLine( ex.Message );}}/// <summary>/// 消费方法/// </summary>private static void ConsumeMethod(IChannel channel, AsyncEventingBasicConsumer consumer){consumer.ReceivedAsync += async (model, ea) =>{// 01 获取入参var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);// 反序列化消息var timeEntity = JsonSerializer.Deserialize<TimeEntity>(message);// 02 根据入参执行一定逻辑int delayMilliseconds =timeEntity.Hour * 3600000 +  // 小时 → 毫秒(1小时=3600×1000毫秒)timeEntity.Minute * 60000 +   // 分钟 → 毫秒(1分钟=60×1000毫秒)timeEntity.Second * 1000;     // 秒 → 毫秒var guid = Guid.NewGuid();Console.WriteLine($"【{guid}】开始时间:现在是{DateTime.Now},将在 {timeEntity.Hour}时{timeEntity.Minute}分{timeEntity.Second}秒后打印消息...");await Task.Delay(delayMilliseconds);  // 关键:异步延迟,不阻塞线程Console.WriteLine($"【{guid}】结束时间: {DateTime.Now}");Console.WriteLine($"===============================");// 03 手动确认消息await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);};}}

5.服务的注册

public class Startup
{public Startup(IConfiguration configuration){Configuration = configuration;}public IConfiguration Configuration { get; }public void ConfigureServices(IServiceCollection services){//省略其他代码// 注册 RabbitMQ 服务services.AddSingleton<IRabbitMQConsumerService, RabbitMQConsumerService>();services.AddSingleton<IRabbitMQProducerService, RabbitMQProducerService>();}public void Configure(IApplicationBuilder app, IWebHostEnvironment env){//省略其他代码// 启动消费者var consumerService = app.ApplicationServices.GetRequiredService<IRabbitMQConsumerService>();Task.Run(() => consumerService.StartConsuming());app.UseRouting();app.UseAuthorization();app.UseEndpoints(endpoints =>{endpoints.MapControllers();});}
}

6.接口调用生产者

public class TimeEntity
{public int Hour { get; set; }public int Minute { get; set; }public int Second { get; set; } = 10;
}
/// <summary>
/// 调用生产者发送消息
/// </summary>
[HttpPost]
[Route(nameof(RabbitMQSend))]
public async Task<string> RabbitMQSend(TimeEntity input)
{try{var r = await _rabbitMQProducerService.SendMessage(input);//依赖注入IRabbitMQProducerService调用即可return  r ? "发送成功" : "发送失败";}catch (Exception ex){return "发生错误";}
}

7.调试及结论

调试及结论:
1.开启一个WebAPI连接消息队列名为hello
2.开启一个控制台程序连接消息队列名为hello
3.四条信息入队,分别是ABCD,结果:WebAPI消费了A、C,控制台程序消费了B、D
4.结论:点对点模式(P2P) 场景下,消息会在消费者之间公平分配,每个消费者只处理自己获得的消息

三、发布订阅模式

 1.文件夹结构

同上

2.连接管理类

同上

3.生产者

public class RabbitMQProducerService : IRabbitMQProducerService
{private readonly string exchangeName = "myexchange";public async Task<bool> SendMessage(TimeEntity timeEntity){try{// 01 建立队列连接using var channel = await RabbitMQConnectionManager.CreateChannelAsync();// 02 声明交换机(fanout类型,适用于发布-订阅模式)await channel.ExchangeDeclareAsync(exchange:exchangeName,type:ExchangeType.Fanout);// 03 发送内容到交换机(不指定routingKey,fanout交换机会将消息广播到所有绑定的队列)var message = JsonSerializer.Serialize(timeEntity);var body = Encoding.UTF8.GetBytes(message);await channel.BasicPublishAsync(exchange: exchangeName,routingKey: "",body: body);Console.WriteLine($"生产者发送了: {message}");return true;}catch (Exception ex){return false;}}
}

4.消费者

public class RabbitMQConsumerService : IRabbitMQConsumerService
{private readonly string exchangeName = "myexchange";public async Task StartConsuming(){try{// 01 建立队列连接using var channel = await RabbitMQConnectionManager.CreateChannelAsync();// 02 声明交换机(必须与生产者相同)await channel.ExchangeDeclareAsync(exchange: exchangeName, type: ExchangeType.Fanout);// 03 声明一个独立的临时队列,用于接收消息(断开链接后自动删除)var queueName = await channel.QueueDeclareAsync();// 04 绑定队列到交换机await channel.QueueBindAsync(queue: queueName, exchange: exchangeName, routingKey: "");// 05 消费的逻辑方法var consumer = new AsyncEventingBasicConsumer(channel);ConsumeMethod(channel, consumer);// 06 启动消费(禁用自动确认,需在消费逻辑ConsumeMethod中手动确认消息)await channel.BasicConsumeAsync(queue: queueName,autoAck: false,//禁止使用消息自动确认consumer: consumer);Console.WriteLine("点击退出");Console.ReadLine();}catch (Exception ex){Console.WriteLine( ex.Message );}}/// <summary>/// 消费方法/// </summary>private static void ConsumeMethod(IChannel channel, AsyncEventingBasicConsumer consumer){consumer.ReceivedAsync += async (model, ea) =>{// 01 获取入参var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);// 反序列化消息var timeEntity = JsonSerializer.Deserialize<TimeEntity>(message);// 02 根据入参执行一定逻辑int delayMilliseconds =timeEntity.Hour * 3600000 +  // 小时 → 毫秒(1小时=3600×1000毫秒)timeEntity.Minute * 60000 +   // 分钟 → 毫秒(1分钟=60×1000毫秒)timeEntity.Second * 1000;     // 秒 → 毫秒var guid = Guid.NewGuid();Console.WriteLine($"【{guid}】开始时间:现在是{DateTime.Now},将在 {timeEntity.Hour}时{timeEntity.Minute}分{timeEntity.Second}秒后打印消息...");await Task.Delay(delayMilliseconds);  // 关键:异步延迟,不阻塞线程Console.WriteLine($"【{guid}】结束时间: {DateTime.Now}");Console.WriteLine($"===============================");// 03 手动确认消息await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);};}
}

5.服务的注册

同上

6.接口调用生产者

同上

7.调试及结论

调试及结论:
1.开启一个WebAPI连接交换机名为exchangeName
2.开启一个控制台程序连接交换机名为exchangeName
3.两条信息入队,分别是AB,结果:WebAPI消费了A、B,控制台程序消费了A、B
4.结论:发布-订阅(Pub/Sub) 场景下,交换机会将所有消息广播到所有绑定的队列

四、路由模式

1.生产者

public class RabbitMQProducerService : IRabbitMQProducerService
{private readonly string exchangeName = "mydirect";private readonly string routingKey = "myroot";public async Task<bool> SendMessage(TimeEntity timeEntity){try{// 01 建立队列连接using var channel = await RabbitMQConnectionManager.CreateChannelAsync();// 02 声明交换机(Direct类型,适用于路由器模式)await channel.ExchangeDeclareAsync(exchange:exchangeName,type:ExchangeType.Direct);// 03 发送内容到交换机(不指定routingKey,fanout交换机会将消息广播到所有绑定的队列)var message = JsonSerializer.Serialize(timeEntity);var body = Encoding.UTF8.GetBytes(message);await channel.BasicPublishAsync(exchange: exchangeName,routingKey: routingKey,body: body);Console.WriteLine($"生产者发送了: {message}");return true;}catch (Exception ex){return false;}}
}

2.消费者

public class RabbitMQConsumerService : IRabbitMQConsumerService
{private readonly string exchangeName = "mydirect";private readonly string routingKey = "myroot";private readonly string qName = "myqname";public async Task StartConsuming(){try{// 01 建立队列连接using var channel = await RabbitMQConnectionManager.CreateChannelAsync();// 02 声明交换机(Direct类型,适用于路由模式,必须与生产者相同)await channel.ExchangeDeclareAsync(exchange: exchangeName, type: ExchangeType.Direct);// 03 声明一个独立的队列,用于接收消息var queueName = await channel.QueueDeclareAsync(qName,false,false);// 04 绑定队列到交换机await channel.QueueBindAsync(queue: queueName, exchange: exchangeName, routingKey: routingKey);// 05 消费的逻辑方法var consumer = new AsyncEventingBasicConsumer(channel);ConsumeMethod(channel, consumer);// 06 启动消费(禁用自动确认,需在消费逻辑ConsumeMethod中手动确认消息)await channel.BasicConsumeAsync(queue: queueName,autoAck: false,//禁止使用消息自动确认consumer: consumer);Console.WriteLine("点击退出");Console.ReadLine();}catch (Exception ex){Console.WriteLine( ex.Message );}}/// <summary>/// 消费方法/// </summary>private static void ConsumeMethod(IChannel channel, AsyncEventingBasicConsumer consumer){consumer.ReceivedAsync += async (model, ea) =>{// 01 获取入参var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);// 反序列化消息var timeEntity = JsonSerializer.Deserialize<TimeEntity>(message);// 02 根据入参执行一定逻辑int delayMilliseconds =timeEntity.Hour * 3600000 +  // 小时 → 毫秒(1小时=3600×1000毫秒)timeEntity.Minute * 60000 +   // 分钟 → 毫秒(1分钟=60×1000毫秒)timeEntity.Second * 1000;     // 秒 → 毫秒var guid = Guid.NewGuid();Console.WriteLine($"【{guid}】开始时间:现在是{DateTime.Now},将在 {timeEntity.Hour}时{timeEntity.Minute}分{timeEntity.Second}秒后打印消息...");await Task.Delay(delayMilliseconds);  // 关键:异步延迟,不阻塞线程Console.WriteLine($"【{guid}】结束时间: {DateTime.Now}");Console.WriteLine($"===============================");// 03 手动确认消息await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);};}
}

http://www.xdnf.cn/news/18657.html

相关文章:

  • 软考中级网络工程师通关指南:从学习到实战
  • 04-Maven工具介绍
  • 从0开始学习Java+AI知识点总结-25.web实战(AOP)
  • KEPServerEX——工业数据采集与通信的标准化平台
  • 服务器(Linux)新账户搭建Pytorch深度学习环境
  • Devops之Jenkins:Jenkins服务器中的slave节点是什么?我们为什么要使用slave节点?如何添加一个windows slave节点?
  • 云计算之中间件与数据库
  • 机器学习:贝叶斯派
  • 2025年金九银十Java面试场景题大全:高频考点+深度解析+实战方案
  • 【C++详解】哈希表概念与实现 开放定址法和链地址法、处理哈希冲突、哈希函数介绍
  • Linux 进阶之性能调优,文件管理,网络安全
  • Java 22 新特性及具体应用
  • c++ 常用接口设计
  • CSS 进阶用法
  • Linux camera 驱动流程介绍(rgb: ov02k10)(chatgpt version)
  • Java 20 新特性及具体应用
  • 关于并查集
  • Text Blocks:告别字符串拼接地狱
  • 量子链(Qtum)分布式治理协议
  • 单词搜索+回溯法
  • Linux内核ELF文件签名验证机制的设计与实现(C/C++代码实现)
  • 源滚滚React消息通知框架v1.0.2使用教程
  • 《支付回调状态异常的溯源与架构级修复》
  • 【RAGFlow代码详解-3】核心服务
  • Linux驱动之DMA(三)
  • ubuntu中网卡的 IP 及网关配置设置为永久生效
  • Maxwell学习笔记
  • 8月精选!Windows 11 25H2 【版本号:26200.5733】
  • 从技术精英到“芯”途末路:一位工程师的沉沦与救赎
  • IC验证 APB 项目(二)——框架结构(总)