当前位置: 首页 > ai >正文

封装 RabbitMQ 消息代理交互的功能

封装了与 RabbitMQ 消息代理交互的功能,包括发送和接收消息,以及管理连接和通道。

主要组件

  1. 依赖项

    • 代码使用了多个命名空间,包括 Microsoft.Extensions.Configuration(用于配置管理)、RabbitMQ.Client(用于与 RabbitMQ 的交互)、Newtonsoft.Json(用于 JSON 序列化和反序列化)。
    • 还使用了一些自定义命名空间,可能是更大应用程序框架的一部分(例如 MSEBP.Kernel.Common)。
  2. 配置模型

    • RabbitConfigModel 用于保存连接 RabbitMQ 的配置设置,包括 IPPortUserNamePasswordVirtualHost 等属性。
  3. 连接管理

    • 类中维护了一个静态字段 _connection 用于 RabbitMQ 连接,并提供了 GetConnection 方法来建立或获取该连接。
    • 连接使用 IP、端口、用户名、密码和虚拟主机等参数进行建立。
  4. 接收消息

    • Receive<T> 方法允许从 RabbitMQ 队列异步消费消息。
    • 使用 EventingBasicConsumer 来处理传入的消息,并通过提供的委托(receiveMethod)进行处理。
    • 消息处理包括错误处理,根据处理是否成功来确认(或否定确认)消息。
  5. 发送消息

    • Send<T> 方法将对象序列化为 JSON,并将其作为消息发送到指定的 RabbitMQ 队列。
    • 此方法允许指定消息是否应为持久化,并为消息设置适当的属性(如头信息)。
  6. 通道管理

    • CreateConsumerChannel 方法创建一个新的通道以与 RabbitMQ 进行交互。
    • 通道用于发送和接收消息,必须从已建立的连接中创建。
  7. 连接工厂

    • GetConnection 方法是一个静态方法,使用 ConnectionFactory 创建新的 RabbitMQ 连接。它配置了心跳和其他参数。
  8. 资源管理

    • Dispose 方法确保在不再需要时正确关闭 RabbitMQ 连接。

错误处理和重试机制

  • 消息消费逻辑中包含错误处理机制,以决定如何处理失败的消息处理:
    • 确认(BasicAck):在成功处理消息时调用。
    • 否定确认(BasicNack):允许根据情况(例如瞬态错误与永久错误)重试消息或拒绝消息
      using Microsoft.Extensions.Configuration;
      using Newtonsoft.Json;
      using RabbitMQ.Client;
      using RabbitMQ.Client.Events;
      using System;
      using System.Collections.Generic;
      using System.Text;
      using System.Threading;
      using System.Threading.Tasks;namespace MSEBP.External.RabbitMQMsg
      {/// <summary>/// 接受消息的服务 /// </summary>public class RabbitMessageService : IDisposable{private readonly RabbitConfigModel _rabbitConfig;private static IConnection _connection;private readonly IConfiguration _configuration;private readonly ILogger _logger;private readonly object _connectionLock = new object();private const int ReconnectDelaySeconds = 5;public RabbitMessageService(IConfiguration configuration){_configuration = configuration;_logger = MSEApplication.Resolve<ILogger>("CorrelationId");_rabbitConfig = LoadConfig();ValidateConfig(_rabbitConfig);}private RabbitConfigModel LoadConfig(){return new RabbitConfigModel{IP = _configuration["RabbitMQ:IP"],Port = _configuration["RabbitMQ:Port"].AsInt(5672),UserName = _configuration["RabbitMQ:UserName"],Password = _configuration["RabbitMQ:Password"],VirtualHost = _configuration["RabbitMQ:VirtualHost"],DurableQueue = _configuration["RabbitMQ:DurableQueue"].AsBool(),QueueName = _configuration["RabbitMQ:QueueName"],Exchange = _configuration["RabbitMQ:Exchange"],ExchangeType = _configuration["RabbitMQ:ExchangeType"],DurableMessage = _configuration["RabbitMQ:DurableMessage"].AsBool(),RoutingKey = _configuration["RabbitMQ:RoutingKey"],};}private void ValidateConfig(RabbitConfigModel config){if (string.IsNullOrWhiteSpace(config.IP) || config.Port <= 0 || string.IsNullOrWhiteSpace(config.UserName) || string.IsNullOrWhiteSpace(config.Password)){throw new InvalidOperationException("RabbitMQ configuration is invalid.");}}public IConnection GetConnection(string clientProvidedName){if (_connection == null || !_connection.IsOpen){lock (_connectionLock){if (_connection == null || !_connection.IsOpen){_connection = CreateConnection(clientProvidedName);}}}return _connection;}private IConnection CreateConnection(string clientProvidedName){var factory = new ConnectionFactory{ClientProvidedName = clientProvidedName,Endpoint = new AmqpTcpEndpoint(new Uri($"amqp://{_rabbitConfig.IP}/")),Port = _rabbitConfig.Port,UserName = _rabbitConfig.UserName,Password = _rabbitConfig.Password,VirtualHost = _rabbitConfig.VirtualHost,RequestedHeartbeat = TimeSpan.FromSeconds(60)};while (true){try{var connection = factory.CreateConnection();connection.ConnectionShutdown += OnConnectionShutdown;_logger.Info("RabbitMQ connection established.");return connection;}catch (Exception ex){_logger.Error("Failed to create RabbitMQ connection, retrying...", ex);Thread.Sleep(TimeSpan.FromSeconds(ReconnectDelaySeconds)); // 等待5秒后重试}}}private void OnConnectionShutdown(object sender, ShutdownEventArgs e){_logger.Warn("RabbitMQ connection was shut down. Attempting to reconnect...");Reconnect();}private void Reconnect(){lock (_connectionLock){if (_connection != null && !_connection.IsOpen){_logger.Info("Disposing of RabbitMQ connection.");_connection.Dispose();_connection = null;}// 重新建立连接_connection = CreateConnection("Reconnect");}}public async Task Receive<T>(Func<T, Task> receiveMethod, CancellationToken cancellationToken){using (var channel = CreateConsumerChannel("Receive")){try{SetupQueue(channel);var consumer = new EventingBasicConsumer(channel);consumer.Received += async (model, ea) => await ProcessMessage(channel, ea, receiveMethod, cancellationToken);channel.BasicConsume(_rabbitConfig.QueueName, false, consumer);// Wait for cancellationawait Task.Delay(Timeout.Infinite, cancellationToken);}catch (OperationCanceledException){_logger.Info("Message consumption was canceled.");}catch (Exception ex){_logger.Error("Failed to set up message consumer", ex);throw;}}}private void SetupQueue(IModel channel){if (!string.IsNullOrWhiteSpace(_rabbitConfig.Exchange)){channel.ExchangeDeclare(_rabbitConfig.Exchange, _rabbitConfig.ExchangeType.ToString(), _rabbitConfig.DurableQueue);channel.QueueDeclare(_rabbitConfig.QueueName, _rabbitConfig.DurableQueue, false, false, null);channel.QueueBind(_rabbitConfig.QueueName, _rabbitConfig.Exchange, _rabbitConfig.RoutingKey);}else{channel.QueueDeclare(_rabbitConfig.QueueName, _rabbitConfig.DurableQueue, false, false, null);}channel.BasicQos(0, 1, false);}private async Task ProcessMessage<T>(IModel channel, BasicDeliverEventArgs ea, Func<T, Task> receiveMethod, CancellationToken cancellationToken){var deliveryTag = ea.DeliveryTag;var message = Encoding.UTF8.GetString(ea.Body.ToArray());try{_logger.Info($"Received message: {message}");var data = JsonConvert.DeserializeObject<T>(message);await receiveMethod(data);channel.BasicAck(deliveryTag, false); // 确认处理成功}catch (Exception ex){_logger.Error($"Error processing message: {message}", ex);channel.BasicNack(deliveryTag, false, true); // 允许重新处理}}public bool Send<T>(T info){var message = JsonConvert.SerializeObject(info);if (string.IsNullOrWhiteSpace(message)) return false;try{using (var channel = CreateConsumerChannel("Send")){SetupSendChannel(channel);var properties = CreateMessageProperties(channel);var bytes = Encoding.UTF8.GetBytes(message);if (string.IsNullOrWhiteSpace(_rabbitConfig.Exchange)){channel.BasicPublish("", _rabbitConfig.QueueName, properties, bytes);}else{channel.BasicPublish(_rabbitConfig.Exchange, _rabbitConfig.RoutingKey, properties, bytes);}}return true;}catch (Exception ex){_logger.Error("Failed to send message to RabbitMQ", ex);return false;}}private void SetupSendChannel(IModel channel){if (!string.IsNullOrWhiteSpace(_rabbitConfig.Exchange)){channel.ExchangeDeclare(_rabbitConfig.Exchange, _rabbitConfig.ExchangeType.ToString(), _rabbitConfig.DurableQueue, false, null);}else{channel.QueueDeclare(_rabbitConfig.QueueName, _rabbitConfig.DurableQueue, false, false, null);}}private IBasicProperties CreateMessageProperties(IModel channel){var properties = channel.CreateBasicProperties();properties.DeliveryMode = Convert.ToByte(_rabbitConfig.DurableMessage ? 2 : 1);properties.Headers = new Dictionary<string, object>{{ "Content-Type", "application/json" },{ "__TypeId__", _rabbitConfig.QueueName }};return properties;}public IModel CreateConsumerChannel(string clientProvidedName){return GetConnection(clientProvidedName).CreateModel();}public void Dispose(){if (_connection != null && _connection.IsOpen){_connection.Close();_connection.Dispose();}}}
      }
      

http://www.xdnf.cn/news/5077.html

相关文章:

  • mac u盘重装mac10.15Catalina系统
  • 1.短信登录
  • 数据库故障排查全攻略:从实战案例到体系化解决方案
  • expo多网络请求设定。
  • Jmeter中的BeanShell如何使用?
  • MySQL 从入门到精通(三):日志管理详解 —— 从排错到恢复的核心利器
  • 01背包类问题
  • 基于大模型与异步技术的股票分析系统实现
  • 在 Flink + Kafka 实时数仓中,如何确保端到端的 Exactly-Once
  • Stable Diffusion进阶之Controlnet插件使用
  • python连接sqllite数据库工具类
  • 二维旋转矩阵:让图形动起来的数学魔法 ✨
  • 操作系统 第2章节 进程,线程和作业
  • 移动设备常用电子屏幕类型对比
  • 互联网大厂Java求职面试:基于RAG的智能问答系统设计与实现-1
  • 驱动-信号量
  • 【Day 23】HarmonyOS开发实战:从AR应用到元宇宙交互
  • 容联云孔淼:AI Agent应深耕垂直场景,从效率提效向价值挖掘升级
  • Godot4.3类星露谷游戏开发之【昼夜循环】
  • 【大模型】LLM概念相关问题(上)
  • C++面向对象特性之多态篇
  • 如何解决按钮重复点击
  • 第十七章,反病毒---防病毒网管
  • MOS关断时波形下降沿振荡怎么解决
  • C语言实现:打印素数、最大公约数
  • gradle3.5的安装以及配置环境变量
  • 进行性核上性麻痹饮食指南:科学膳食守护神经健康
  • OpenMagnetic的介绍与使用
  • Redis 存储原理与数据模型(三)
  • 基于RAG+MCP开发【企文小智】企业智能体