当前位置: 首页 > ai >正文

记录一下学习kafka的使用以及思路

下面这是kafka的依赖

        <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId></dependency>

我在学习的时候直接导入是没有导入成功的,我猜测大概的原因是我本地的仓库没有kafka的依赖,其实也就是没有下载过,所以没有指定版本号的话是在本地是找不到的所以报错,指定版本号下载了以后就可以了,后面就可以不用指定版本号了。

我们主要使用kafka里面的注解进行使用,第一个就是配置kafka,第二步就是使用kafkaTemplate进行sent方法进行发数据放到队列里面即可。

下面是配置

spring:kafka:bootstrap-servers: 156.65.20.76:9092,156.65.20.77:9092,156.65.20.78:9092 #指定Kafka集群的地址,这里有三个地址,用逗号分隔。listener:ack-mode: manual_immediate #设置消费者的确认模式为manual_immediate,表示消费者在接收到消息后立即手动确认。concurrency: 3  #设置消费者的并发数为5missing-topics-fatal: false  #设置为false,表示如果消费者订阅的主题不存在,不会抛出异常。producer:key-serializer: org.apache.kafka.common.serialization.StringSerializer  # 设置消息键的序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer #设置消息值的序列化器acks: 1  #一般就是选择1,兼顾可靠性和吞吐量 ,如果想要更高的吞吐量设置为0,如果要求更高的可靠性就设置为-1consumer:auto-offset-reset: earliest #设置为"earliest"表示将从最早的可用消息开始消费,即从分区的起始位置开始读取消息。enable-auto-commit: false #禁用了自动提交偏移量的功能,为了避免出现重复数据和数据丢失,一般都是手动提交key-deserializer: org.apache.kafka.common.serialization.StringDeserializer  # 设置消息键的反序列化器value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #设置消息值的反序列化器

然后是生产者使用sent方法

executorService.submit(() -> {try {//获取内网流出带宽,并将结果封装到消息集合中dealMetricDataToMessage(ALiYunConstant.ECS_INTRANET_OUT_RATE, ALiYunConstant.INTRANET_OUT_RATE_NAME, ALiYunConstant.LW_INTRANET_OUT_RATE_CODE,startTime, endTime, instance, messages);} catch (Exception e) {log.error("获取ECS的指标数据-多线程处理任务异常!", e);} finally {latch.countDown();}});}//等待任务执行完毕latch.await();//将最终的消息集合发送到kafkaif (CollectionUtils.isNotEmpty(messages)) {for (int i = 0; i < messages.size(); i++) {if (StringUtils.isNotBlank(messages.get(i).getValue())&& "noSuchInstance".equals(messages.get(i).getValue())) {continue;}kafkaTemplate.send(topicName,  messages.get(i));}}

这里是一个线程池的发送信息的方法,但是我们只需要记住一个即可,就是kafkaTmeplate.sent(topicname,message);

然后消费者里面是这样写的

// 消费监听@KafkaListener(topics = "zhuoye",groupId ="zhuoye-aliyunmetric")public void consumeExtractorChangeMessage(ConsumerRecord<String, String> record, Acknowledgment ack){try {String value = record.value();//处理数据,存入openTsDb.................................ack.acknowledge();//手动提交}catch (Exception e){log.error("kafa-topic【zhuoye】消费阿里云指标源消息【失败】");log.error(e.getMessage());}}

主要就是@kafkalistener注解,然后就是使用数据就可以了。

然后其他的底层原理我们后面再去学习

http://www.xdnf.cn/news/4851.html

相关文章:

  • Windows远程访问Ubuntu的方法
  • zst-2001 历年真题 设计模式
  • 多视图密集对应学习:细粒度3D分割的自监督革命
  • 使用PyTorch训练马里奥强化学习代理的完整指南
  • 系统思考:短期困境与长期收益
  • Webpack基本用法学习总结
  • Vue3 + Typescript 基础进阶与实战完全指南
  • SQL进阶:如何把字段中的键值对转为JSON格式?
  • python调用国税乐企直连接口开数电票之额度管理
  • transformer 笔记 tokenizer moe
  • 科技创业园共享会议室线上预约及智能密码锁系统搭建指南
  • FPGA实战项目2———多协议通信控制器
  • 学习黑客认识数字取证与事件响应(DFIR)
  • 安科瑞ADL3000-E-A/KC三相交流电能表CE认证导轨表
  • Spring AI 系列——使用大模型对文本内容分类归纳并标签化输出
  • React 中 useMemo 和 useEffect 的区别(计算与监听方面)
  • 传统销售VS智能销售:AI如何重构商业变现逻辑
  • Microsoft 365 Copilot:为Teams在线会议带来多语言语音交流新体验
  • 【计算机网络-传输层】传输层协议-TCP核心机制与可靠性保障
  • Ubuntu通过源码编译方式单独安装python3.12
  • 分享一款开源的图片去重软件 ImageContrastTools,基于Electron和hash算法
  • 二叉树的深度
  • 《被讨厌的勇气》书摘
  • JVM——即时编译
  • RabbitMQ-运维
  • 【C++设计模式之Observer观察者模式】
  • 5G让媒体传播更快更智能——技术赋能内容新时代
  • 深入详解人工智能数学基础——微积分中的微分方程在神经常微分方程(Neural ODE)中的应用
  • Vue3+ts复制图片到剪贴板
  • git相关