当前位置: 首页 > java >正文

kafka快速部署、集成、调优

kafka快速部署、集成、调优

1.部署

1.1 docker部署

参考:https://blog.csdn.net/taotao_guiwang/article/details/135508643
在这里插入图片描述

1.2 kafka部署

  • 资源见,百度网盘:https://pan.baidu.com/s/1qlabJ7m8BDm77GbDuHmbNQ?pwd=41ac
    在这里插入图片描述

  • 修改配置文件,说明要在那个机器上部署:
    在这里插入图片描述

  • 执行kafka_zk_install.sh,开始部署:

# 赋权,在kafka_zk_install目录,执行:
chmod -R 777 .
# 执行
./kafka_zk_install.sh

在这里插入图片描述

2. spring boot集成

核心代码,KafkaController.java:

package com.kafka;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {private final static String TOPIC_NAME = "my-replicated-topic";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@RequestMapping("/send")public void send() {kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg");}}

MyConsumer.java:

package com.kafka;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
public class KafkaController {private final static String TOPIC_NAME = "my-replicated-topic";@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;@RequestMapping("/send")public void send() {kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a msg");}}

application.yml:

server:port: 8080spring:kafka:bootstrap-servers: 10.86.97.210:9092,10.86.97.210:9093,10.86.97.210:9094producer: # 生产者retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送batch-size: 16384buffer-memory: 33554432acks: 1# 指定消息key和消息体的编解码方式key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:group-id: default-groupenable-auto-commit: falseauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:# 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交# RECORD# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交# BATCH# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交# TIME# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交# COUNT# TIME | COUNT 有一个条件满足时提交# COUNT_TIME# 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交# MANUAL# 手动调用Acknowledgment.acknowledge()后立即提交# MANUAL_IMMEDIATEack-mode: MANUAL_IMMEDIATE

浏览器访问:
http://localhost:8080/send

控制台打印:
在这里插入图片描述

3.相关资源

百度网盘:https://pan.baidu.com/s/1qlabJ7m8BDm77GbDuHmbNQ?pwd=41ac

在这里插入图片描述

http://www.xdnf.cn/news/16862.html

相关文章:

  • 香港正式启动稳定币牌照制度!推动中国的人民币国际化?
  • 智能Agent场景实战指南 Day 29:Agent市场趋势与前沿技术
  • ALOcc: Adaptive Lifting-based 3D Semantic Occupancy and
  • 异步函数被调用多次,多次处理同一个文件导致占用,如何让异步函数按顺序执行?
  • 【Node.js安装注意事项】-安装路径不能有空格
  • RustFS:高性能文件存储与部署解决方案(MinIO替代方案)
  • 10.Linux 用户和组的管理
  • 【智能协同云图库】第七期:基于AI调用阿里云百炼大模型,实现AI图片编辑功能
  • Apache Flink 2.1.0: 面向实时 Data + AI 全面升级,开启智能流处理新纪元
  • webpack面试题及详细答案80题(41-60)
  • 【科研绘图系列】R语言绘制环状分组显著性柱状堆积图
  • iOS 抓不到包怎么办?全流程排查思路与替代引导
  • 机械学习中的一些优化算法(以逻辑回归实现案例来讲解)
  • 带root权限_中国移动创维DT541_S905L3融合机器改机顶盒刷机教程 当贝纯净版安卓9.0系统线刷包 刷机包
  • Git 命令使用指南:从入门到进阶
  • 字节跳动招AI for Science算法研究员(AI分子动力学)
  • 图论-最短路Floyd算法
  • GXP6040K压力传感器可应用于医疗/汽车/家电
  • 【AI 加持下的 Python 编程实战 2_12】第九章:繁琐任务的自动化(上)——自动清理电子邮件文本
  • 如何将联系人从三星手机转移到 iPhone
  • 8.1.2 TiDB存储引擎的原理
  • Git 各场景使用方法总结
  • LT3045EDD#TRPBF ADI亚德诺半导体 线性稳压器 电源管理应用设计
  • Spark Shuffle性能优化实践指南:提升大数据处理效率
  • 通过CISSP考试,共答到第127题
  • PendingIntent的flag和原理解析
  • CMake set_source_files_properties使用解析
  • Day17--二叉树--654. 最大二叉树,617. 合并二叉树,700. 二叉搜索树中的搜索,98. 验证二叉搜索树
  • Mysql join语句
  • 最新docker国内镜像源地址大全