基于 ABP vNext 框架实现高可用高性能的 Modbus 通信网关
🚀基于 ABP vNext 框架实现高可用高性能的 Modbus 通信网关⚡
🧩 一、项目背景与目标
在工业自动化和物联网系统中,Modbus 是使用最广泛的通信协议之一。而 ABP vNext 作为一套现代化、模块化、高扩展性的 .NET 框架,天然具备构建中大型服务端系统的能力。
本篇文章将展示如何基于 ABP vNext 实现一个具备高性能、高可用性的 Modbus TCP 通信网关。该网关不仅支持标准 Modbus 读写操作,还具备连接池管理、异常处理、实时日志监控、统一接口、配置集中、缓存、链路跟踪、版本控制与安全接入能力。
🏗️ 二、系统架构设计
💡建议:配合 UML 模块图或组件图,快速理解系统模块协作与调用链。
🔧 三、核心配置与服务注册(Program.cs)
using System;
using System.Text.Json;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Diagnostics.HealthChecks;
using Microsoft.AspNetCore.RateLimiting;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Caching.Distributed;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;
using OpenTelemetry.Trace;
using Prometheus;var builder = WebApplication.CreateBuilder(args);// 配置与校验
builder.Services.AddOptions<ModbusOptions>().Bind(builder.Configuration.GetSection("Modbus")).ValidateDataAnnotations().Validate(o => o.ConnectTimeout > 0 && o.ReadTimeout > 0, "超时配置必须大于 0");// 异步连接池与后台清理
builder.Services.AddSingleton<ModbusTcpClientFactory>();
builder.Services.AddSingleton<IModbusClientFactory>(sp => sp.GetRequiredService<ModbusTcpClientFactory>());
builder.Services.AddHostedService(sp => sp.GetRequiredService<ModbusTcpClientFactory>());// 应用服务
builder.Services.AddTransient<IModbusAppService, ModbusAppService>();// Redis 缓存
builder.Services.AddStackExchangeRedisCache(opts =>opts.Configuration = builder.Configuration["Modbus:RedisConnection"]);// 健康检查
builder.Services.AddHealthChecks().AddCheck<ModbusHealthCheck>("modbus", tags: new[] { "ready" }).AddRedis(builder.Configuration["Modbus:RedisConnection"], name: "redis", tags: new[] { "ready" });// OpenTelemetry 跟踪
builder.Services.AddOpenTelemetryTracing(tracer =>
{tracer.AddAspNetCoreInstrumentation();tracer.AddHttpClientInstrumentation();tracer.AddOtlpExporter(opt =>{opt.Endpoint = new Uri(builder.Configuration["Modbus:OtlpEndpoint"]);});
});// Prometheus 指标
builder.Services.AddPrometheusMetrics();
builder.Services.AddHttpMetrics();// API 版本化
builder.Services.AddApiVersioning(options =>
{options.AssumeDefaultVersionWhenUnspecified = true;options.DefaultApiVersion = new ApiVersion(1, 0);
});
builder.Services.AddVersionedApiExplorer(options =>
{options.GroupNameFormat = "'v'VVV";
});// 鉴权
builder.Services.AddAuthentication("Bearer").AddJwtBearer("Bearer", options =>{options.Authority = builder.Configuration["Auth:Authority"];options.Audience = "modbus-api";});// 速率限制
builder.Services.AddRateLimiter(options =>
{options.AddFixedWindowLimiter("fixed", opts =>{opts.PermitLimit = 100;opts.Window = TimeSpan.FromSeconds(1);});
});// CORS
builder.Services.AddCors(options =>
{options.AddPolicy("AllowAll", policy =>{policy.AllowAnyOrigin().AllowAnyMethod().AllowAnyHeader();});
});// Swagger
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();// Controllers
builder.Services.AddControllers();var app = builder.Build();// 中间件管道
app.UseCors("AllowAll");
app.UseHttpMetrics();
app.UseMetricServer("/metrics");app.UseMiddleware<CustomExceptionMiddleware>();app.UseSwagger();
app.UseSwaggerUI(options =>
{foreach (var desc in app.DescribeApiVersions()){options.SwaggerEndpoint($"/swagger/{desc.GroupName}/swagger.json", desc.GroupName.ToUpperInvariant());}
});app.UseAuthentication();
app.UseAuthorization();app.MapControllers();
app.MapHealthChecks("/health/live", new HealthCheckOptions { Predicate = _ => false });
app.MapHealthChecks("/health/ready", new HealthCheckOptions { Predicate = check => check.Tags.Contains("ready") });app.Run();
💾 四、连接池、缓存与服务实现
ModbusTcpClientFactory(异步连接池 & 自动清理)
public class ModbusTcpClientFactory : IModbusClientFactory, IHostedService
{private readonly ConcurrentDictionary<string, (TcpClient tcp, IModbusMaster master, DateTime lastUsed)> _pool = new();private readonly IOptions<ModbusOptions> _options;private Timer _cleanupTimer;public ModbusTcpClientFactory(IOptions<ModbusOptions> options){_options = options;}public async Task<IModbusMaster> GetOrCreateClientAsync(string ip, int port, CancellationToken ct){var key = $"{ip}:{port}";if (_pool.TryGetValue(key, out var entry)){entry.lastUsed = DateTime.UtcNow;_pool[key] = entry;return entry.master;}var client = new TcpClient();using var cts = CancellationTokenSource.CreateLinkedTokenSource(ct);cts.CancelAfter(_options.Value.ConnectTimeout);await client.ConnectAsync(ip, port, cts.Token);var master = new ModbusFactory().CreateMaster(client);master.Transport.ReadTimeout = _options.Value.ReadTimeout;_pool[key] = (client, master, DateTime.UtcNow);return master;}public Task StartAsync(CancellationToken cancellationToken){_cleanupTimer = new Timer(Cleanup, null, TimeSpan.Zero, TimeSpan.FromMinutes(5));return Task.CompletedTask;}private void Cleanup(object? state){var threshold = DateTime.UtcNow.AddMinutes(-60);var keysToRemove = _pool.Where(kvp => kvp.Value.lastUsed < threshold).Select(kvp => kvp.Key).ToList();foreach (var key in keysToRemove){var entry = _pool[key];entry.tcp.Close();entry.tcp.Dispose();(entry.master as IDisposable)?.Dispose();_pool.TryRemove(key, out _);}}public Task StopAsync(CancellationToken cancellationToken){_cleanupTimer?.Dispose();return Task.CompletedTask;}
}
ModbusAppService(缓存、重试、熔断与超时)
public class ModbusAppService : ApplicationService, IModbusAppService
{private readonly IModbusClientFactory _clientFactory;private readonly IOptions<ModbusOptions> _options;private readonly IDistributedCache _cache;private readonly ILogger<ModbusAppService> _logger;public ModbusAppService(IModbusClientFactory f,IOptions<ModbusOptions> o,IDistributedCache cache,ILogger<ModbusAppService> logger){_clientFactory = f;_options = o;_cache = cache;_logger = logger;}private static byte[] Serialize<T>(T obj) => JsonSerializer.SerializeToUtf8Bytes(obj);private static T Deserialize<T>(byte[] bytes) => JsonSerializer.Deserialize<T>(bytes)!;public async Task<ushort[]> ReadHoldingRegistersAsync(string ip, int port, ushort start, ushort count, CancellationToken ct){var cacheKey = $"modbus:{ip}:{port}:{start}:{count}";var cached = await _cache.GetAsync(cacheKey, ct);if (cached != null){_logger.LogInformation("Cache hit for {CacheKey}", cacheKey);return Deserialize<ushort[]>(cached);}var policy = Policy.WrapAsync(Policy.TimeoutAsync<ushort[]>(_options.Value.ReadTimeout, Polly.Timeout.TimeoutStrategy.Optimistic),Policy.Handle<IOException>().CircuitBreakerAsync(2, TimeSpan.FromSeconds(30)),Policy.Handle<IOException>().WaitAndRetryAsync(_options.Value.MaxRetry, retry => TimeSpan.FromSeconds(1),(ex, ts, attempt, ctx) => _logger.LogWarning(ex, "Retry {Attempt} for {Ip}:{Port}", attempt, ip, port)));var result = await policy.ExecuteAsync(async ct2 =>{var client = await _clientFactory.GetOrCreateClientAsync(ip, port, ct2);return await client.ReadHoldingRegistersAsync(1, start, count);}, ct);await _cache.SetAsync(cacheKey, Serialize(result), new DistributedCacheEntryOptions{AbsoluteExpirationRelativeToNow = TimeSpan.FromSeconds(_options.Value.CacheSeconds)}, ct);return result;}
}
🗄️ 缓存处理流程图
🔍 五、监控与可观测
Prometheus 集成
builder.Services.AddPrometheusMetrics();
app.UseMetricServer("/metrics");
app.UseHttpMetrics();
💡 Grafana Dashboard 示例(JSON 片段):
{"panels": [{"type": "graph","title": "Modbus Read Duration (p95)","targets": [{"expr": "histogram_quantile(0.95, sum(rate(modbus_read_duration_seconds_bucket[5m])) by (le))","legendFormat": "p95"}]}]
}
OpenTelemetry 链路追踪
builder.Services.AddOpenTelemetryTracing(tracer =>
{tracer.AddAspNetCoreInstrumentation();tracer.AddHttpClientInstrumentation();tracer.AddJaegerExporter();
});
示例 trace header:
traceparent: 00-4bf92f3577b34da6a3ce929d0e0e4733-00f067aa0ba902b7-01
🔁 重试与熔断流程图
🔐 六、安全、版本控制与异常处理
鉴权与版本控制
builder.Services.AddAuthentication("Bearer").AddJwtBearer("Bearer", options =>{options.Authority = builder.Configuration["Auth:Authority"];options.Audience = "modbus-api";});
[Authorize]
[ApiController]
[ApiVersion("1.0")]
[Route("api/v{version:apiVersion}/modbus")]
public class ModbusController : AbpControllerBase
{private readonly IModbusAppService _service;private readonly ILogger<ModbusController> _logger;public ModbusController(IModbusAppService service, ILogger<ModbusController> logger){_service = service;_logger = logger;}/// <summary>/// 读取指定寄存器数据/// </summary>[HttpPost("read")]public async Task<ActionResult<ApiResponse<ushort[]>>> Read([FromBody] ModbusReadDto input, CancellationToken ct){if (!ModelState.IsValid){return BadRequest(new ApiResponse<string> { Success = false, Message = "参数验证失败" });}_logger.LogInformation("Read request: {Ip}:{Port}, Start={Start}, Length={Length}", input.Ip, input.Port, input.Start, input.Length);try{var data = await _service.ReadHoldingRegistersAsync(input.Ip, input.Port, input.Start, input.Length, ct);return Ok(new ApiResponse<ushort[]> { Success = true, Data = data });}catch (Exception ex){_logger.LogError(ex, "Error reading registers from {Ip}:{Port}", input.Ip, input.Port);return StatusCode(500, new ApiResponse<string> { Success = false, Message = "读取寄存器失败" });}}
}
全局异常中间件示例
public class CustomExceptionMiddleware : IMiddleware
{private readonly ILogger<CustomExceptionMiddleware> _logger;public CustomExceptionMiddleware(ILogger<CustomExceptionMiddleware> logger) => _logger = logger;public async Task InvokeAsync(HttpContext context, RequestDelegate next){try{await next(context);}catch (BrokenCircuitException ex){_logger.LogError(ex, "Circuit open for {Path}", context.Request.Path);context.Response.StatusCode = 503;await context.Response.WriteAsJsonAsync(new { Success = false, Message = "Service unavailable, circuit open." });}catch (TaskCanceledException){context.Response.StatusCode = 504;await context.Response.WriteAsJsonAsync(new { Success = false, Message = "Request timed out." });}catch (Exception ex){_logger.LogError(ex, "Unhandled exception for {Path}", context.Request.Path);context.Response.StatusCode = 500;await context.Response.WriteAsJsonAsync(new { Success = false, Message = "Internal server error." });}}
}
📚 七、总结
本文通过异步连接池、策略封装、统一响应、自动清理与链路监控等设计构建出一个可落地、可观测、可扩展的 Modbus 网关系统。
进一步通过缓存集成、服务注册补全、响应封装与 DTO 校验,全面提升了系统可用性与可维护性。
📚 参考资料
- ABP 官方文档
- NModbus Library
- Modbus 协议详解
- Polly 重试策略
- OpenTelemetry for .NET
- Prometheus .NET Exporter