33.【.NET8 实战--孢子记账--从单体到微服务--转向微服务】--单体转微服务--财务服务--记账
这篇文章我们一起把记账模块从单体应用迁移到微服务架构中。记账模块的功能想必大家都已经了解了,主要是记录用户的收入和支出,以及对这些记录的删除修改和查询等操作。具体的功能可以参考单体应用专栏,在这里就不多讲了。我们现在一起开始迁移记账模块的代码吧。
一、小修改
和前面的功能一样,我们把记账模块的代码从单体应用中抽离出来,放到微服务项目中。但是需要做一些小修改,这些修改包括前面几篇文章中提到的将接口修改为标准的restful接口、将Controller继承自ControllerBase
而不是BaseController
,以及调整路由地址等,在记账模块中,还要对新增记账、修改记账、删除记账这三个功能进行调整。我们先来看一下单体应用中这三个功能在Service层中的代码:
// more code .../// <summary>
/// 收支记录实现类
/// </summary>
public class IncomeExpenditureRecordImp : IIncomeExpenditureRecordServer
{// more code .../// <summary>/// 新增收支记录/// </summary>/// <param name="incomeExpenditureRecord"></param>public void Add(IncomeExpenditureRecord incomeExpenditureRecord){//开启事务using (var transaction = _sporeAccountingDbContext.Database.BeginTransaction()){try{// 查找记录范围内的预算var budget = _sporeAccountingDbContext.Budgets.FirstOrDefault(x => x.UserId == incomeExpenditureRecord.UserId&& x.StartTime <= incomeExpenditureRecord.RecordDate &&x.EndTime >= incomeExpenditureRecord.RecordDate&& x.IncomeExpenditureClassificationId ==incomeExpenditureRecord.IncomeExpenditureClassificationId);if (budget != null){// 查询分类var classification = _sporeAccountingDbContext.IncomeExpenditureClassifications.FirstOrDefault(x => x.Id == incomeExpenditureRecord.IncomeExpenditureClassificationId);if (classification.Type== IncomeExpenditureTypeEnmu.Income){budget.Remaining -= incomeExpenditureRecord.AfterAmount;// 获取包含支出记录记录日期的报表记录var reports = _sporeAccountingDbContext.Reports.Where(x => x.UserId == incomeExpenditureRecord.UserId&& x.Year <= incomeExpenditureRecord.RecordDate.Year &&x.Month >= incomeExpenditureRecord.RecordDate.Month &&x.ClassificationId ==incomeExpenditureRecord.IncomeExpenditureClassificationId);// 如果没有就说明程序还未将其写入报表,那么就不做任何处理for (int i = 0; i < reports.Count(); i++){var report = reports.ElementAt(i);report.Amount += incomeExpenditureRecord.AfterAmount;_sporeAccountingDbContext.Reports.Update(report);}}_sporeAccountingDbContext.Budgets.Update(budget);}_sporeAccountingDbContext.IncomeExpenditureRecords.Add(incomeExpenditureRecord);_sporeAccountingDbContext.SaveChanges();//提交事务transaction.Commit();}catch (Exception e){//回滚事务transaction.Rollback();throw;}}}/// <summary>/// 删除收支记录/// </summary>/// <param name="incomeExpenditureRecordId"></param>/// <returns></returns>public void Delete(string incomeExpenditureRecordId){//开启事务using (var transaction = _sporeAccountingDbContext.Database.BeginTransaction()){try{var incomeExpenditureRecord = _sporeAccountingDbContext.IncomeExpenditureRecords.FirstOrDefault(x => x.Id == incomeExpenditureRecordId);if (incomeExpenditureRecord != null){// 查找记录范围内的预算var budget = _sporeAccountingDbContext.Budgets.FirstOrDefault(x => x.UserId == incomeExpenditureRecord.UserId&& x.StartTime <= incomeExpenditureRecord.RecordDate &&x.EndTime >= incomeExpenditureRecord.RecordDate&& x.IncomeExpenditureClassificationId == incomeExpenditureRecord.IncomeExpenditureClassificationId);if (budget != null){// 查询分类var classification = _sporeAccountingDbContext.IncomeExpenditureClassifications.FirstOrDefault(x => x.Id == incomeExpenditureRecord.IncomeExpenditureClassificationId);if (classification.Type== IncomeExpenditureTypeEnmu.Income){budget.Remaining += incomeExpenditureRecord.AfterAmount;// 获取包含支出记录记录日期的报表记录var reports = _sporeAccountingDbContext.Reports.Where(x => x.UserId == incomeExpenditureRecord.UserId&& x.Year <= incomeExpenditureRecord.RecordDate.Year &&x.Month >= incomeExpenditureRecord.RecordDate.Month &&x.ClassificationId ==incomeExpenditureRecord.IncomeExpenditureClassificationId);// 如果没有就说明程序还未将其写入报表,那么就不做任何处理for (int i = 0; i < reports.Count(); i++){var report = reports.ElementAt(i);report.Amount -= incomeExpenditureRecord.AfterAmount;_sporeAccountingDbContext.Reports.Update(report);}}_sporeAccountingDbContext.Budgets.Update(budget);}_sporeAccountingDbContext.IncomeExpenditureRecords.Remove(incomeExpenditureRecord);_sporeAccountingDbContext.SaveChanges();//提交事务transaction.Commit();}}catch (Exception e){//回滚事务transaction.Rollback();throw;}}}/// <summary>/// 修改收支记录/// </summary>/// <param name="incomeExpenditureRecord"></param>/// <returns></returns>public void Update(IncomeExpenditureRecord incomeExpenditureRecord){using (var transaction = _sporeAccountingDbContext.Database.BeginTransaction()){try{// 查询原记录var oldIncomeExpenditureRecord = _sporeAccountingDbContext.IncomeExpenditureRecords.FirstOrDefault(x => x.Id == incomeExpenditureRecord.Id);// 查找记录范围内的预算var budget = _sporeAccountingDbContext.Budgets.FirstOrDefault(x => x.UserId == incomeExpenditureRecord.UserId&& x.StartTime <= incomeExpenditureRecord.RecordDate &&x.EndTime >= incomeExpenditureRecord.RecordDate&& x.IncomeExpenditureClassificationId ==incomeExpenditureRecord.IncomeExpenditureClassificationId);if (budget != null){// 查询分类var classification = _sporeAccountingDbContext.IncomeExpenditureClassifications.FirstOrDefault(x => x.Id == incomeExpenditureRecord.IncomeExpenditureClassificationId);if (classification.Type== IncomeExpenditureTypeEnmu.Income){//如果是支出,需要减去原来的金额budget.Remaining = (budget.Amount - incomeExpenditureRecord.AfterAmount);// 根据旧的支出记录判断是否修改了记录日期// 如果是修改了记录日期,那么就将原记录日期所在的报表对应的分类金额减去,将新记录日期所在报表对应的分类金额加上if (oldIncomeExpenditureRecord.RecordDate != incomeExpenditureRecord.RecordDate){// 获取包含支出记录记录日期的报表记录var oldReports = _sporeAccountingDbContext.Reports.Where(x => x.UserId == incomeExpenditureRecord.UserId&& x.Year <= oldIncomeExpenditureRecord.RecordDate.Year &&x.Month >= oldIncomeExpenditureRecord.RecordDate.Month &&x.ClassificationId == oldIncomeExpenditureRecord.IncomeExpenditureClassificationId);// 如果没有就说明程序还未将其写入报表,那么就不做任何处理for (int i = 0; i < oldReports.Count(); i++){var oldReport = oldReports.ElementAt(i);oldReport.Amount -= oldIncomeExpenditureRecord.AfterAmount;_sporeAccountingDbContext.Reports.Update(oldReport);}// 获取包含支出记录记录日期的报表记录var newReport = _sporeAccountingDbContext.Reports.Where(x => x.UserId == incomeExpenditureRecord.UserId&& x.Year <= incomeExpenditureRecord.RecordDate.Year &&x.Month >= incomeExpenditureRecord.RecordDate.Month &&x.ClassificationId ==incomeExpenditureRecord.IncomeExpenditureClassificationId);// 如果没有就说明程序还未将其写入报表,那么就不做任何处理for (int i = 0; i < newReport.Count(); i++){var report = newReport.ElementAt(i);report.Amount += incomeExpenditureRecord.AfterAmount;_sporeAccountingDbContext.Reports.Update(report);}}}else{//如果是收入,需要加上原来的金额budget.Remaining = (budget.Amount + incomeExpenditureRecord.AfterAmount);}_sporeAccountingDbContext.Budgets.Update(budget);}oldIncomeExpenditureRecord.AfterAmount = incomeExpenditureRecord.AfterAmount;oldIncomeExpenditureRecord.BeforAmount = incomeExpenditureRecord.BeforAmount;oldIncomeExpenditureRecord.RecordDate = incomeExpenditureRecord.RecordDate;oldIncomeExpenditureRecord.Remark = incomeExpenditureRecord.Remark;oldIncomeExpenditureRecord.CurrencyId = incomeExpenditureRecord.CurrencyId;oldIncomeExpenditureRecord.IncomeExpenditureClassificationId =incomeExpenditureRecord.IncomeExpenditureClassificationId;_sporeAccountingDbContext.IncomeExpenditureRecords.Update(oldIncomeExpenditureRecord);_sporeAccountingDbContext.SaveChanges();//提交事务transaction.Commit();}catch (Exception e){//回滚事务transaction.Rollback();throw;}}}// more code ...
}
在上面的代码中,我们看到这三个功能,都有两个共同的操作:一个是对预算的操作,另一个是对报表的操作。在这个单体应用代码中,我们都是直接调用预算和报表的DbSet
进行操作,并且启用了事务来保证数据的一致性。这里看似合理,但是实际上并不符合微服务的设计原则,因为微服务应该是独立的,不能直接操作其他服务的数据,同时我们也要保证方法的单一原则,因此在这三个方法里操作其他数据是不合理的。并且,如果我们存储记账记录时,预算数据或者报表数据没有存储成功,就会触发回滚操作,这样就会导致记账记录没有存储成功,对于这种设计来不符合用户体验,因此我们需要对这三个方法进行修改。
我们要做的是将预算增删操作抽离出来,让新增记账、修改记账、删除记账这三个功能在每次存储数据成功后,发送MQ消息,然后预算订阅MQ消息,进行相应的增删改操作。这样就可以保证记账模块的独立性,同时也能保证数据的一致性,即使是预算服务出现问题,也不会影响到记账模块的正常运行。修改后的Service层代码如下:
// more code .../// <summary>
/// 记账服务实现类
/// </summary>
public class AccountingServerImpl : IAccountingServer
{/// <summary>/// RabbitMQ消息处理/// </summary>private readonly RabbitMqMessage _rabbitMqMessage;/// <summary>/// 新增记账/// </summary>/// <param name="accountBookId">账本ID</param>/// <param name="request">记账添加请求</param>/// <returns></returns>public long Add(long accountBookId, AccountingAddRequest request){// more code ...//通过MQ发送记账数据到消息队列,从预算中扣除金额MqPublisher mqPublisher = new MqPublisher(accounting.AfterAmount.ToString("F2"),MqExchange.BudgetExchange,MqRoutingKey.BudgetRoutingKey,MqQueue.BudgetQueue, MessageType.BudgetDeduct, ExchangeType.Direct);_rabbitMqMessage.SendAsync(mqPublisher).Start();// 返回新增的记账IDreturn accounting.Id;}/// <summary>/// 删除记账/// </summary>/// <param name="accountBookId">账本ID</param>/// <param name="id">记账ID</param>public void Delete(long accountBookId, long id){// more code ...//通过MQ发送删除记账数据到消息队列,把预算中的金额恢复MqPublisher mqPublisher = new MqPublisher(accounting.AfterAmount.ToString("F2"),MqExchange.BudgetExchange,MqRoutingKey.BudgetRoutingKey,MqQueue.BudgetQueue, MessageType.BudgetAdd, ExchangeType.Direct);_rabbitMqMessage.SendAsync(mqPublisher).Start();}/// <summary>/// 修改记账/// </summary>/// <param name="accountBookId">账本ID</param>/// <param name="request">修改请求</param>public void Edit(long accountBookId, AccountingEditRequest request){// more code ...// 通过MQ发送修改记账数据到消息队列,更新预算中的金额MqPublisher mqPublisher = new MqPublisher(amountDifference.ToString("F2"),MqExchange.BudgetExchange,MqRoutingKey.BudgetRoutingKey,MqQueue.BudgetQueue, MessageType.BudgetUpdate, ExchangeType.Direct);_rabbitMqMessage.SendAsync(mqPublisher).Start();}// more code ...
}
在上面的代码中,我们对新增记账、删除记账、修改记账这三个方法进行了修改。新增记账时,我们将记账数据发送到MQ消息队列中,预算服务会订阅这个消息,并进行相应的扣款操作。删除记账时,我们同样发送消息到MQ消息队列中,预算服务会将金额恢复到预算中。修改记账时,我们计算出原金额与新金额的差额,并发送到MQ消息队列中,预算服务会根据差额进行相应的更新操作。下面是新增的预算订阅MQ消息处理类的代码:
using SP.Common.Message.Model;
using SP.Common.Message.Mq;
using SP.Common.Message.Mq.Model;
using SP.FinanceService.Models.Entity;
using SP.FinanceService.Models.Enumeration;
using SP.FinanceService.Service;namespace SP.FinanceService.Mq;/// <summary>
/// Budget 消息消费者服务
/// </summary>
public class BudgetConsumerService : BackgroundService
{/// <summary>/// RabbitMq 消息/// </summary>private readonly RabbitMqMessage _rabbitMqMessage;/// <summary>/// 日志记录器/// </summary>private readonly ILogger<BudgetConsumerService> _logger;/// <summary>/// 预算服务/// </summary>private readonly IBudgetServer _budgetService;/// <summary>/// Budget 消息消费者服务/// </summary>/// <param name="rabbitMqMessage"></param>/// <param name="logger"></param>/// <param name="budgetService"></param>public BudgetConsumerService(RabbitMqMessage rabbitMqMessage, ILogger<BudgetConsumerService> logger,IBudgetServer budgetService){_logger = logger;_rabbitMqMessage = rabbitMqMessage;_budgetService = budgetService;}/// <summary>/// RabbitMq 消息/// </summary>/// <param name="stoppingToken"></param>/// <returns></returns>protected override async Task ExecuteAsync(CancellationToken stoppingToken){MqSubscriber subscriber = new MqSubscriber(MqExchange.BudgetExchange,MqRoutingKey.BudgetRoutingKey, MqQueue.BudgetQueue);await _rabbitMqMessage.ReceiveAsync(subscriber, async message =>{// 验证消息MqMessage mqMessage = ValidateMessage(message);if (mqMessage == null) return;// 获取当前预算List<Budget> budgets = GetCurrentBudgets();if (budgets == null || budgets.Count == 0) return;decimal amount = decimal.Parse(message.Body);// 根据消息类型处理预算switch (mqMessage.Type){case MessageType.BudgetAdd:_logger.LogInformation("接收到预算增加处理消息, {Message}", mqMessage);UpdateBudgetsByAmount(budgets, amount, true);break;case MessageType.BudgetUpdate:_logger.LogInformation("接收到预算更新消息, {Message}", mqMessage);UpdateBudgetsByAmount(budgets, amount, true);break;case MessageType.BudgetDeduct:_logger.LogInformation("接收到预算扣除消息, {Message}", mqMessage);UpdateBudgetsByAmount(budgets, amount, false);break;default:_logger.LogWarning("未知的消息类型: {Type}", mqMessage.Type);break;}await Task.CompletedTask;});}/// <summary>/// 验证消息/// </summary>/// <param name="message">原始消息</param>/// <returns>验证后的 MqMessage,如果验证失败返回 null</returns>private MqMessage ValidateMessage(object message){MqMessage mqMessage = message as MqMessage;if (mqMessage == null){_logger.LogError("消息转换失败");return null;}return mqMessage;}/// <summary>/// 获取当前预算/// </summary>/// <returns>当前预算列表,如果没有可用预算返回空列表</returns>private List<Budget> GetCurrentBudgets(){List<Budget> budgets = _budgetService.QueryCurrentBudgets();if (budgets == null || budgets.Count == 0){_logger.LogInformation("当前没有可用的预算,不执行处理");return new List<Budget>();}return budgets;}/// <summary>/// 根据金额更新预算/// </summary>/// <param name="budgets">预算列表</param>/// <param name="amount">金额</param>/// <param name="isAdd">是否为增加操作,true为增加,false为扣除</param>private void UpdateBudgetsByAmount(List<Budget> budgets, decimal amount, bool isAdd){string operation = isAdd ? "增加" : "扣除";decimal operationAmount = isAdd ? amount : -amount;// 更新月度预算UpdateBudgetByPeriod(budgets, PeriodEnum.Month, operationAmount, operation);// 更新年度预算UpdateBudgetByPeriod(budgets, PeriodEnum.Year, operationAmount, operation);// 更新季度预算UpdateBudgetByPeriod(budgets, PeriodEnum.Quarter, operationAmount, operation);// 保存更改到数据库_budgetService.UpdateBudgets(budgets);}/// <summary>/// 根据周期更新预算/// </summary>/// <param name="budgets">预算列表</param>/// <param name="period">预算周期</param>/// <param name="amount">金额变化</param>/// <param name="operation">操作类型描述</param>private void UpdateBudgetByPeriod(List<Budget> budgets, PeriodEnum period, decimal amount, string operation){Budget budget = budgets.FirstOrDefault(b => b.Period == period);if (budget != null){budget.Amount += amount;string periodName = GetPeriodName(period);_logger.LogInformation("{PeriodName}预算{Operation}成功,{Operation}金额: {Amount}", periodName, operation, operation, budget.Amount);}}/// <summary>/// 获取周期名称/// </summary>/// <param name="period">预算周期</param>/// <returns>周期名称</returns>private string GetPeriodName(PeriodEnum period){return period switch{PeriodEnum.Month => "月度",PeriodEnum.Year => "年度",PeriodEnum.Quarter => "季度",_ => "未知"};}
}
在上面的代码中,我们创建了一个BudgetConsumerService
类,它继承自BackgroundService
,用于处理预算相关的MQ消息。我们在ExecuteAsync
方法中订阅了MQ消息,并根据消息类型进行相应的处理。我们还添加了一些日志记录,以便于调试和监控。
对于调用报表的部分,我们暂时先将这部分代码删除掉,因为这个设计其实是不符合业务逻辑的,我们的报表都是定时生成的,而不是实时生成的,因此对报表的修改其实是没必要的,因此我们在这里不对报表进行任何操作。等到后面我们实现了报表模块后,再来处理报表相关的逻辑。
二、总结
在这篇文章中,我们将记账模块从单体应用迁移到微服务架构中,并对新增记账、删除记账、修改记账这三个功能进行了调整。我们将预算的增删改操作抽离出来,让记账模块通过MQ消息与预算服务进行交互,从而实现了模块的独立性和数据的一致性。我们还创建了一个BudgetConsumerService
类,用于处理预算相关的MQ消息。
通过这些修改,我们使得记账模块能够更好地适应微服务架构的设计原则,同时也提高了系统的可维护性和可扩展性。接下来,我们将继续实现记账模块的其他功能,并将其与其他模块进行集成。