当前位置: 首页 > news >正文

基于 ABP vNext 框架实现高可用高性能的 Modbus 通信网关

🚀基于 ABP vNext 框架实现高可用高性能的 Modbus 通信网关⚡


🧩 一、项目背景与目标

在工业自动化和物联网系统中,Modbus 是使用最广泛的通信协议之一。而 ABP vNext 作为一套现代化、模块化、高扩展性的 .NET 框架,天然具备构建中大型服务端系统的能力。

本篇文章将展示如何基于 ABP vNext 实现一个具备高性能、高可用性的 Modbus TCP 通信网关。该网关不仅支持标准 Modbus 读写操作,还具备连接池管理、异常处理、实时日志监控、统一接口、配置集中、缓存、链路跟踪、版本控制与安全接入能力。


🏗️ 二、系统架构设计

API 层
Device 层
TCP Request
HTTP Response
Modbus Gateway API
异步连接池
缓存层 (Redis)
策略引擎 (Polly)
链路追踪 (OTel)
指标导出 (Prometheus)
Modbus Device
Web 前端 / API Client

💡建议:配合 UML 模块图或组件图,快速理解系统模块协作与调用链。


🔧 三、核心配置与服务注册(Program.cs)

using System;
using System.Text.Json;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Diagnostics.HealthChecks;
using Microsoft.AspNetCore.RateLimiting;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Caching.Distributed;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;
using OpenTelemetry.Trace;
using Prometheus;var builder = WebApplication.CreateBuilder(args);// 配置与校验
builder.Services.AddOptions<ModbusOptions>().Bind(builder.Configuration.GetSection("Modbus")).ValidateDataAnnotations().Validate(o => o.ConnectTimeout > 0 && o.ReadTimeout > 0, "超时配置必须大于 0");// 异步连接池与后台清理
builder.Services.AddSingleton<ModbusTcpClientFactory>();
builder.Services.AddSingleton<IModbusClientFactory>(sp => sp.GetRequiredService<ModbusTcpClientFactory>());
builder.Services.AddHostedService(sp => sp.GetRequiredService<ModbusTcpClientFactory>());// 应用服务
builder.Services.AddTransient<IModbusAppService, ModbusAppService>();// Redis 缓存
builder.Services.AddStackExchangeRedisCache(opts =>opts.Configuration = builder.Configuration["Modbus:RedisConnection"]);// 健康检查
builder.Services.AddHealthChecks().AddCheck<ModbusHealthCheck>("modbus", tags: new[] { "ready" }).AddRedis(builder.Configuration["Modbus:RedisConnection"], name: "redis", tags: new[] { "ready" });// OpenTelemetry 跟踪
builder.Services.AddOpenTelemetryTracing(tracer =>
{tracer.AddAspNetCoreInstrumentation();tracer.AddHttpClientInstrumentation();tracer.AddOtlpExporter(opt =>{opt.Endpoint = new Uri(builder.Configuration["Modbus:OtlpEndpoint"]);});
});// Prometheus 指标
builder.Services.AddPrometheusMetrics();
builder.Services.AddHttpMetrics();// API 版本化
builder.Services.AddApiVersioning(options =>
{options.AssumeDefaultVersionWhenUnspecified = true;options.DefaultApiVersion = new ApiVersion(1, 0);
});
builder.Services.AddVersionedApiExplorer(options =>
{options.GroupNameFormat = "'v'VVV";
});// 鉴权
builder.Services.AddAuthentication("Bearer").AddJwtBearer("Bearer", options =>{options.Authority = builder.Configuration["Auth:Authority"];options.Audience = "modbus-api";});// 速率限制
builder.Services.AddRateLimiter(options =>
{options.AddFixedWindowLimiter("fixed", opts =>{opts.PermitLimit = 100;opts.Window = TimeSpan.FromSeconds(1);});
});// CORS
builder.Services.AddCors(options =>
{options.AddPolicy("AllowAll", policy =>{policy.AllowAnyOrigin().AllowAnyMethod().AllowAnyHeader();});
});// Swagger
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();// Controllers
builder.Services.AddControllers();var app = builder.Build();// 中间件管道
app.UseCors("AllowAll");
app.UseHttpMetrics();
app.UseMetricServer("/metrics");app.UseMiddleware<CustomExceptionMiddleware>();app.UseSwagger();
app.UseSwaggerUI(options =>
{foreach (var desc in app.DescribeApiVersions()){options.SwaggerEndpoint($"/swagger/{desc.GroupName}/swagger.json", desc.GroupName.ToUpperInvariant());}
});app.UseAuthentication();
app.UseAuthorization();app.MapControllers();
app.MapHealthChecks("/health/live", new HealthCheckOptions { Predicate = _ => false });
app.MapHealthChecks("/health/ready", new HealthCheckOptions { Predicate = check => check.Tags.Contains("ready") });app.Run();

💾 四、连接池、缓存与服务实现

ModbusTcpClientFactory(异步连接池 & 自动清理)

public class ModbusTcpClientFactory : IModbusClientFactory, IHostedService
{private readonly ConcurrentDictionary<string, (TcpClient tcp, IModbusMaster master, DateTime lastUsed)> _pool = new();private readonly IOptions<ModbusOptions> _options;private Timer _cleanupTimer;public ModbusTcpClientFactory(IOptions<ModbusOptions> options){_options = options;}public async Task<IModbusMaster> GetOrCreateClientAsync(string ip, int port, CancellationToken ct){var key = $"{ip}:{port}";if (_pool.TryGetValue(key, out var entry)){entry.lastUsed = DateTime.UtcNow;_pool[key] = entry;return entry.master;}var client = new TcpClient();using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);cts.CancelAfter(_options.Value.ConnectTimeout);await client.ConnectAsync(ip, port, cts.Token);var master = new ModbusFactory().CreateMaster(client);master.Transport.ReadTimeout = _options.Value.ReadTimeout;_pool[key] = (client, master, DateTime.UtcNow);return master;}public Task StartAsync(CancellationToken cancellationToken){_cleanupTimer = new Timer(Cleanup, null, TimeSpan.Zero, TimeSpan.FromMinutes(5));return Task.CompletedTask;}private void Cleanup(object? state){var threshold = DateTime.UtcNow.AddMinutes(-60);var keysToRemove = _pool.Where(kvp => kvp.Value.lastUsed < threshold).Select(kvp => kvp.Key).ToList();foreach (var key in keysToRemove){var entry = _pool[key];entry.tcp.Close();entry.tcp.Dispose();(entry.master as IDisposable)?.Dispose();_pool.TryRemove(key, out _);}}public Task StopAsync(CancellationToken cancellationToken){_cleanupTimer?.Dispose();return Task.CompletedTask;}
}

ModbusAppService(缓存、重试、熔断与超时)

public class ModbusAppService : ApplicationService, IModbusAppService
{private readonly IModbusClientFactory _clientFactory;private readonly IOptions<ModbusOptions> _options;private readonly IDistributedCache _cache;private readonly ILogger<ModbusAppService> _logger;public ModbusAppService(IModbusClientFactory f,IOptions<ModbusOptions> o,IDistributedCache cache,ILogger<ModbusAppService> logger){_clientFactory = f;_options = o;_cache = cache;_logger = logger;}private static byte[] Serialize<T>(T obj) => JsonSerializer.SerializeToUtf8Bytes(obj);private static T Deserialize<T>(byte[] bytes) => JsonSerializer.Deserialize<T>(bytes)!;public async Task<ushort[]> ReadHoldingRegistersAsync(string ip, int port, ushort start, ushort count, CancellationToken ct){var cacheKey = $"modbus:{ip}:{port}:{start}:{count}";var cached = await _cache.GetAsync(cacheKey, ct);if (cached != null){_logger.LogInformation("Cache hit for {CacheKey}", cacheKey);return Deserialize<ushort[]>(cached);}var policy = Policy.WrapAsync(Policy.TimeoutAsync<ushort[]>(_options.Value.ReadTimeout, Polly.Timeout.TimeoutStrategy.Optimistic),Policy.Handle<IOException>().CircuitBreakerAsync(2, TimeSpan.FromSeconds(30)),Policy.Handle<IOException>().WaitAndRetryAsync(_options.Value.MaxRetry, retry => TimeSpan.FromSeconds(1),(ex, ts, attempt, ctx) => _logger.LogWarning(ex, "Retry {Attempt} for {Ip}:{Port}", attempt, ip, port)));var result = await policy.ExecuteAsync(async ct2 =>{var client = await _clientFactory.GetOrCreateClientAsync(ip, port, ct2);return await client.ReadHoldingRegistersAsync(1, start, count);}, ct);await _cache.SetAsync(cacheKey, Serialize(result), new DistributedCacheEntryOptions{AbsoluteExpirationRelativeToNow = TimeSpan.FromSeconds(_options.Value.CacheSeconds)}, ct);return result;}
}

🗄️ 缓存处理流程图

检查
命中
未命中
读取请求 Request
Redis Cache?
返回缓存数据
调用 Modbus 设备
写入 Redis

🔍 五、监控与可观测

Prometheus 集成

builder.Services.AddPrometheusMetrics();
app.UseMetricServer("/metrics");
app.UseHttpMetrics();

💡 Grafana Dashboard 示例(JSON 片段):

{"panels": [{"type": "graph","title": "Modbus Read Duration (p95)","targets": [{"expr": "histogram_quantile(0.95, sum(rate(modbus_read_duration_seconds_bucket[5m])) by (le))","legendFormat": "p95"}]}]
}

OpenTelemetry 链路追踪

builder.Services.AddOpenTelemetryTracing(tracer =>
{tracer.AddAspNetCoreInstrumentation();tracer.AddHttpClientInstrumentation();tracer.AddJaegerExporter();
});

示例 trace header:

traceparent: 00-4bf92f3577b34da6a3ce929d0e0e4733-00f067aa0ba902b7-01

🔁 重试与熔断流程图

AppService Polly ModbusClient Device ExecuteAsync(ReadHoldingRegisters) 超时 & 重试 & 熔断检查 调用 GetOrCreateClient TCP Read 返回数据 数据 返回数据 若连续失败超限,则开启熔断 AppService Polly ModbusClient Device

🔐 六、安全、版本控制与异常处理

鉴权与版本控制

builder.Services.AddAuthentication("Bearer").AddJwtBearer("Bearer", options =>{options.Authority = builder.Configuration["Auth:Authority"];options.Audience = "modbus-api";});
[Authorize]
[ApiController]
[ApiVersion("1.0")]
[Route("api/v{version:apiVersion}/modbus")]
public class ModbusController : AbpControllerBase
{private readonly IModbusAppService _service;private readonly ILogger<ModbusController> _logger;public ModbusController(IModbusAppService service, ILogger<ModbusController> logger){_service = service;_logger = logger;}/// <summary>/// 读取指定寄存器数据/// </summary>[HttpPost("read")]public async Task<ActionResult<ApiResponse<ushort[]>>> Read([FromBody] ModbusReadDto input, CancellationToken ct){if (!ModelState.IsValid){return BadRequest(new ApiResponse<string> { Success = false, Message = "参数验证失败" });}_logger.LogInformation("Read request: {Ip}:{Port}, Start={Start}, Length={Length}", input.Ip, input.Port, input.Start, input.Length);try{var data = await _service.ReadHoldingRegistersAsync(input.Ip, input.Port, input.Start, input.Length, ct);return Ok(new ApiResponse<ushort[]> { Success = true, Data = data });}catch (Exception ex){_logger.LogError(ex, "Error reading registers from {Ip}:{Port}", input.Ip, input.Port);return StatusCode(500, new ApiResponse<string> { Success = false, Message = "读取寄存器失败" });}}
}

全局异常中间件示例

public class CustomExceptionMiddleware : IMiddleware
{private readonly ILogger<CustomExceptionMiddleware> _logger;public CustomExceptionMiddleware(ILogger<CustomExceptionMiddleware> logger) => _logger = logger;public async Task InvokeAsync(HttpContext context, RequestDelegate next){try{await next(context);}catch (BrokenCircuitException ex){_logger.LogError(ex, "Circuit open for {Path}", context.Request.Path);context.Response.StatusCode = 503;await context.Response.WriteAsJsonAsync(new { Success = false, Message = "Service unavailable, circuit open." });}catch (TaskCanceledException){context.Response.StatusCode = 504;await context.Response.WriteAsJsonAsync(new { Success = false, Message = "Request timed out." });}catch (Exception ex){_logger.LogError(ex, "Unhandled exception for {Path}", context.Request.Path);context.Response.StatusCode = 500;await context.Response.WriteAsJsonAsync(new { Success = false, Message = "Internal server error." });}}
}

📚 七、总结

本文通过异步连接池、策略封装、统一响应、自动清理与链路监控等设计构建出一个可落地、可观测、可扩展的 Modbus 网关系统。
进一步通过缓存集成、服务注册补全、响应封装与 DTO 校验,全面提升了系统可用性与可维护性。


📚 参考资料

  • ABP 官方文档
  • NModbus Library
  • Modbus 协议详解
  • Polly 重试策略
  • OpenTelemetry for .NET
  • Prometheus .NET Exporter
http://www.xdnf.cn/news/408619.html

相关文章:

  • 图像识别技术的定义与原理
  • 新手安装java所有工具(jdk、idea,Maven,数据库)
  • 26考研|数学分析:函数列与函数项级数
  • Java MVC架构在当今时代的技术解析
  • UART16550 IP core笔记二
  • 从0到1:Python机器学习实战全攻略(8/10)
  • 小白学习java第18天(下):mybatis
  • SHAP分析!Transformer-GRU组合模型SHAP分析,模型可解释不在发愁!
  • 5倍无损压缩+50 倍速转换HD Video 4K/8K 视频处理
  • 前端项目2-01:个人简介页面
  • 系统架构设计(五):构件
  • 服务器共享文件夹如何实现外网访问
  • [数据结构高阶]并查集初识、手撕、可以解决哪类问题?
  • hdfs-客户端操作-文件上传
  • 记一次redis未授权被种挖矿
  • Linux常见命令
  • GPL v2 许可证深度解析:条款逻辑与合规风险指南(下)
  • Tomcat服务部署
  • pth的模型格式怎么变成SafeTensors了?
  • Matlab 空调温度时延模型的模糊pid控制
  • YOLOv8网络结构
  • 1.10-数据传输格式
  • Java使用POI+反射灵活的控制字段导出Excel
  • MapReduce 的工作原理
  • 数据库分区与分表详解
  • java 中 pojo 的详细讲解
  • BGP练习
  • Java 内存模型(JMM)与内存屏障:原理、实践与性能权衡
  • Python基础:类的深拷贝与浅拷贝-->with语句的使用及三个库:matplotlib基本画图-->pandas之Series创建
  • 用户态到内核态:Linux信号传递的九重门(二)