当前位置: 首页 > backend >正文

net8.0一键创建支持(Kafka)

Necore项目生成器 - 在线创建Necore模板项目 | 一键下载

 KafkaController.cs

using Confluent.Kafka;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Threading.Tasks;
using UnT.Template.Application.Responses;
using UnT.Template.Domain;namespace UnT.Template.Controllers
{[Route("api/kafkas")][ApiController]public class KafkaController : ControllerBase{private readonly IConfiguration _configuration;public KafkaController(IConfiguration configuration){_configuration = configuration;}[HttpPost("publish")][Produces("application/json")][ProducesResponseType(typeof(ApiResponse<bool>), StatusCodes.Status200OK)]public async Task<IActionResult> Insert(){try{var producerConfig = new ProducerConfig{BootstrapServers = _configuration.GetValue<string>("KafkaConnectionString"),ClientId = "UnT.Template",Acks = Acks.All, MessageSendMaxRetries = 3,RetryBackoffMs = 1000,LingerMs = 5 };// 创建生产者using (var producer = new ProducerBuilder<Null, string>(producerConfig).Build()){var message = Newtonsoft.Json.JsonConvert.SerializeObject(new Pro_Product { Name = DateTime.Now.ToFileTime().ToString() });producer.Produce("unt_queue", new Message<Null, string> { Value = message },(deliveryReport) =>{if (deliveryReport.Error.Code != ErrorCode.NoError){Console.WriteLine($"消息发送失败: {deliveryReport.Error.Reason}");}else{Console.WriteLine($"消息发送到: {deliveryReport.TopicPartitionOffset}");}});producer.Flush(TimeSpan.FromSeconds(10));}return Ok(new ApiResponse<bool> { Success = true, Data = true });}catch (Exception ex){return Ok(new ApiResponse<bool> { Success = false, Message = ex.Message, Data = false });}}[HttpPost("consume")][Produces("application/json")][ProducesResponseType(typeof(ApiResponse<bool>), StatusCodes.Status200OK)]public async Task<IActionResult> Consume(){try{Task.Run(() =>{var consumerConfig = new ConsumerConfig{BootstrapServers = _configuration.GetValue<string>("KafkaConnectionString"),GroupId = "UnT.Template.Consumer.Group",EnableAutoCommit = false, AutoOffsetReset = AutoOffsetReset.Latest,EnablePartitionEof = true,StatisticsIntervalMs = 5000};using (var consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build()){//订阅主题consumer.Subscribe("unt_queue");//取消令牌,用于优雅退出var cts = new CancellationTokenSource();Console.CancelKeyPress += (_, e) => {e.Cancel = true;cts.Cancel();};try{while (true){try{//消费消息var cr = consumer.Consume(cts.Token);if (cr.IsPartitionEOF){Console.WriteLine($"分区 {cr.Partition} 已到达末尾,偏移量: {cr.Offset}");continue;}//检查空消息if (cr.Message == null){Console.WriteLine("收到空消息");continue;}//处理有效消息Console.WriteLine($"收到消息: {cr.Message.Value} [分区: {cr.Partition}, 偏移量: {cr.Offset}]");//手动提交偏移量(如果EnableAutoCommit=false)consumer.Commit(cr);}catch (ConsumeException e){Console.WriteLine($"消费错误: {e.Error.Reason}");}}}catch (OperationCanceledException){// 确保消费者正确关闭consumer.Close();}}});await Task.Delay(TimeSpan.FromSeconds(5));return Ok(new ApiResponse<bool> { Success = true, Data = true });}catch (Exception ex){return Ok(new ApiResponse<bool> { Success = false, Message = ex.Message, Data = false });}}}
}

http://www.xdnf.cn/news/16432.html

相关文章:

  • Redis6.0+安装教程(Linux)
  • CPA青少年编程能力等级测评试卷及答案 Python编程(三级)
  • 分表分库与分区表
  • 【第六节】方法与事件处理器
  • docker-desktop引擎启动失败报wsl --update
  • Day4.AndroidAudio初始化
  • 数独求解器与生成器(回溯算法实现)
  • 【ESP32】无法找到: “${env:IDF_PATH}/components/“的路径报错问题以及CMAKE构建不成功问题
  • JVM terminated. Exit code=1
  • 最优估计准则与方法(6)递推最小二乘估计(RLS)_学习笔记
  • BeautifulSoup 使用详解与实战示例
  • 单链表的冒泡排序实现:从原理到代码详解
  • Windows 11 Qt 5.15.x 源码编译,支持C++20
  • MySQL进阶学习与初阶复习第四天
  • Canvas实现微信小程序图片裁剪组件全攻略
  • 在docker中安装frp实现内网穿透
  • Ubuntu简述及部署系统
  • 负载均衡 LoadBalance
  • web刷题
  • c++11--static_assert
  • Linux->自定义shell
  • FPGA IP升级
  • 网络服务综合项目
  • Oracle 数据库报 ora-00257 错误并且执行alter system switch logfile 命令卡死的解决过程
  • XSS利用
  • 02人工智能中优雅草商业实战项目视频字幕翻译以及声音转译之以三方AI模型API制作方式预算-卓伊凡|莉莉
  • linux 板卡实现vxi11服务
  • 阿里 Qwen3 四模型齐发,字节 Coze 全面开源,GPT-5 8 月初发布!| AI Weekly 7.21-7.27
  • 初识 docker [上]
  • 《 接口日志与异常处理统一设计:AOP与全局异常捕获》