当前位置: 首页 > news >正文

完整的 .NET 6 分布式定时任务实现(Hangfire + Redis 分布式锁)

完整的 .NET 6 分布式定时任务实现(Hangfire + Redis 分布式锁)

以下是完整的解决方案,包含所有必要组件:

1. 基础设施层

1.1 分布式锁服务

// IDistributedLockService.cs
public interface IDistributedLockService
{ValueTask<IAsyncDisposable?> AcquireLockAsync(string resourceKey, TimeSpan expiryTime);
}// RedisDistributedLockService.cs
public class RedisDistributedLockService : IDistributedLockService
{private readonly IConnectionMultiplexer _redis;private readonly ILogger<RedisDistributedLockService> _logger;public RedisDistributedLockService(IConnectionMultiplexer redis,ILogger<RedisDistributedLockService> logger){_redis = redis;_logger = logger;}public async ValueTask<IAsyncDisposable?> AcquireLockAsync(string resourceKey, TimeSpan expiryTime){var db = _redis.GetDatabase();var lockToken = Guid.NewGuid().ToString();var lockKey = $"distributed-lock:{resourceKey}";try{var acquired = await db.LockTakeAsync(lockKey, lockToken, expiryTime);if (acquired){_logger.LogDebug("成功获取分布式锁 {LockKey}", lockKey);return new RedisLockHandle(db, lockKey, lockToken, _logger);}_logger.LogDebug("无法获取分布式锁 {LockKey}", lockKey);return null;}catch (Exception ex){_logger.LogError(ex, "获取分布式锁 {LockKey} 时发生错误", lockKey);throw;}}private sealed class RedisLockHandle : IAsyncDisposable{private readonly IDatabase _db;private readonly string _lockKey;private readonly string _lockToken;private readonly ILogger _logger;private bool _isDisposed;public RedisLockHandle(IDatabase db,string lockKey,string lockToken,ILogger logger){_db = db;_lockKey = lockKey;_lockToken = lockToken;_logger = logger;}public async ValueTask DisposeAsync(){if (_isDisposed) return;try{var released = await _db.LockReleaseAsync(_lockKey, _lockToken);if (!released){_logger.LogWarning("释放分布式锁 {LockKey} 失败", _lockKey);}else{_logger.LogDebug("成功释放分布式锁 {LockKey}", _lockKey);}}catch (Exception ex){_logger.LogError(ex, "释放分布式锁 {LockKey} 时发生错误", _lockKey);}finally{_isDisposed = true;}}}
}

2. 任务服务层

2.1 定时任务服务

// IPollingService.cs
public interface IPollingService
{Task ExecutePollingTasksAsync();Task ExecuteDailyTaskAsync(int hour);
}// PollingService.cs
public class PollingService : IPollingService
{private readonly IDistributedLockService _lockService;private readonly ILogger<PollingService> _logger;public PollingService(IDistributedLockService lockService,ILogger<PollingService> logger){_lockService = lockService;_logger = logger;}[DisableConcurrentExecution(timeoutInSeconds: 60 * 30)] // 30分钟防并发public async Task ExecutePollingTasksAsync(){await using var lockHandle = await _lockService.AcquireLockAsync("polling-tasks-lock",TimeSpan.FromMinutes(25)); // 锁有效期25分钟if (lockHandle is null){_logger.LogInformation("其他节点正在执行轮询任务,跳过本次执行");return;}try{_logger.LogInformation("开始执行轮询任务 - 节点: {NodeId}", Environment.MachineName);// 执行所有轮询任务await Task.WhenAll(PollingTaskAsync(),PollingExpireTaskAsync(),PollingExpireDelCharactTaskAsync());// 触发后台任务_ = BackgroundTask.Run(() => PollingDelCharactTaskAsync(), _logger);_ = BackgroundTask.Run(() => AutoCheckApiAsync(), _logger);_ = BackgroundTask.Run(() => DelLogsAsync(), _logger);}catch (Exception ex){_logger.LogError(ex, "执行轮询任务时发生错误");throw;}}[DisableConcurrentExecution(timeoutInSeconds: 60 * 60)] // 1小时防并发public async Task ExecuteDailyTaskAsync(int hour){var lockKey = $"daily-task-{hour}:{DateTime.UtcNow:yyyyMMdd}";await using var lockHandle = await _lockService.AcquireLockAsync(lockKey,TimeSpan.FromMinutes(55)); // 锁有效期55分钟if (lockHandle is null){_logger.LogInformation("其他节点已执行今日 {Hour} 点任务", hour);return;}try{_logger.LogInformation("开始执行 {Hour} 点任务 - 节点: {NodeId}", hour, Environment.MachineName);if (hour == 21){await ExecuteNightlyMaintenanceAsync();}else if (hour == 4){await ExecuteEarlyMorningTasksAsync();}}catch (Exception ex){_logger.LogError(ex, "执行 {Hour} 点任务时发生错误", hour);throw;}}// 具体任务实现方法private async Task PollingTaskAsync(){// 实现游戏角色启动/关闭逻辑}private async Task ExecuteNightlyMaintenanceAsync(){// 21点特殊任务逻辑}// 其他方法...
}// BackgroundTask.cs (安全运行后台任务)
public static class BackgroundTask
{public static Task Run(Func<Task> task, ILogger logger){return Task.Run(async () =>{try{await task();}catch (Exception ex){logger.LogError(ex, "后台任务执行失败");}});}
}

3. 任务调度配置层

3.1 任务初始化器

// RecurringJobInitializer.cs
public class RecurringJobInitializer : IHostedService
{private readonly IRecurringJobManager _jobManager;private readonly IServiceProvider _services;private readonly ILogger<RecurringJobInitializer> _logger;public RecurringJobInitializer(IRecurringJobManager jobManager,IServiceProvider services,ILogger<RecurringJobInitializer> logger){_jobManager = jobManager;_services = services;_logger = logger;}public Task StartAsync(CancellationToken cancellationToken){try{using var scope = _services.CreateScope();var pollingService = scope.ServiceProvider.GetRequiredService<IPollingService>();// 每30分钟执行的任务_jobManager.AddOrUpdate<IPollingService>("polling-tasks-30min",s => s.ExecutePollingTasksAsync(),"*/30 * * * *");// 每天21:00执行的任务_jobManager.AddOrUpdate<IPollingService>("daily-task-21:00",s => s.ExecuteDailyTaskAsync(21),"0 21 * * *");// 每天04:00执行的任务_jobManager.AddOrUpdate<IPollingService>("daily-task-04:00",s => s.ExecuteDailyTaskAsync(4),"0 4 * * *");_logger.LogInformation("周期性任务初始化完成");}catch (Exception ex){_logger.LogError(ex, "初始化周期性任务失败");throw;}return Task.CompletedTask;}public Task StopAsync(CancellationToken cancellationToken) => Task.CompletedTask;
}

4. 应用启动配置

4.1 Program.cs

var builder = WebApplication.CreateBuilder(args);// 添加Redis
builder.Services.AddSingleton<IConnectionMultiplexer>(sp => ConnectionMultiplexer.Connect(builder.Configuration.GetConnectionString("Redis")));// 配置Hangfire
builder.Services.AddHangfire(config =>
{config.UseRedisStorage(builder.Configuration.GetConnectionString("Redis"),new RedisStorageOptions{Prefix = "hangfire:",Db = 1 // 使用单独的Redis数据库});config.UseColouredConsoleLogProvider();
});builder.Services.AddHangfireServer(options =>
{options.ServerName = $"{Environment.MachineName}:{Guid.NewGuid():N}";options.WorkerCount = 1;options.Queues = new[] { "default", "critical" };
});// 注册服务
builder.Services.AddSingleton<IDistributedLockService, RedisDistributedLockService>();
builder.Services.AddScoped<IPollingService, PollingService>();
builder.Services.AddHostedService<RecurringJobInitializer>();var app = builder.Build();// 配置Hangfire仪表盘
app.UseHangfireDashboard("/jobs", new DashboardOptions
{DashboardTitle = "任务调度中心",Authorization = new[] { new HangfireDashboardAuthorizationFilter() },StatsPollingInterval = 60_000 // 60秒刷新一次
});app.Run();// Hangfire仪表盘授权过滤器
public class HangfireDashboardAuthorizationFilter : IDashboardAuthorizationFilter
{public bool Authorize(DashboardContext context){var httpContext = context.GetHttpContext();return httpContext.User.Identity?.IsAuthenticated == true;}
}

5. appsettings.json 配置

{"ConnectionStrings": {"Redis": "localhost:6379,allowAdmin=true","Hangfire": "Server=(localdb)\\mssqllocaldb;Database=Hangfire;Trusted_Connection=True;"},"Hangfire": {"WorkerCount": 1,"SchedulePollingInterval": 5000}
}

关键设计说明

  1. 分布式锁

    • 使用Redis RedLock算法实现
    • 自动处理锁的获取和释放
    • 包含完善的错误处理和日志记录
  2. 任务隔离

    • 使用Hangfire的[DisableConcurrentExecution]防止同一任务重复执行
    • 分布式锁确保跨节点唯一执行
  3. 错误处理

    • 所有关键操作都有try-catch和日志记录
    • 后台任务使用安全包装器执行
  4. 可观测性

    • 详细的日志记录
    • Hangfire仪表盘监控
  5. 扩展性

    • 可以轻松添加新任务
    • 支持动态调整调度策略

这个实现方案完全符合.NET 6的最佳实践,支持分布式部署,确保任务在集群环境中安全可靠地执行。

http://www.xdnf.cn/news/12709.html

相关文章:

  • 故障诊断常用算法
  • 2025妈妈杯数学建模D题完整分析论文
  • Kubernetes Pod 调度策略:从基础到进阶
  • java面向对象09:方法的重写
  • PyTorch入门------卷积神经网络
  • TCP/IP和UDP协议的发展历程
  • POSIX 信号量(Semaphore)
  • MacOS怎么显示隐藏文件
  • Vue3 实战:打造多功能旅游攻略选项卡页面
  • 记录学习的第二十九天
  • unity TEngine学习记录3
  • 精准计量+AI管控——安科瑞助力高校水电管理数字化转型
  • C#插件与可扩展性
  • 闲来无事,用HTML+CSS+JS打造一个84键机械键盘模拟器
  • 优化自旋锁的实现
  • pdfjs库使用3
  • Linux内核机制——内存管理
  • C++ 迭代器失效详解:如何避免 vector 操作中的陷阱
  • 数控铣床自动上下料机械手控制装置设计
  • IDEA 2025.1更新-AI助手试用和第三方模型集成方案
  • C++类和对象上
  • 00.IDEA 插件推荐清单(2025)
  • Jenkins 简易使用记录
  • 从零到一:管理系统设计新手如何快速上手?
  • MATLAB 控制系统设计与仿真 - 37
  • package.json 里面出现 workspace:*,关于工作区的解释
  • 极狐GitLab 账号限制有哪些?
  • 使用MetaGPT 创建智能体(2)多智能体
  • 抽象类和接口的区别
  • 基于X86/RK/全志+FPGA+AI工业一体机在电力接地系统中的应用方案