当前位置: 首页 > news >正文

kafka connect 大概了解

kafka connect

Introduction

Kafka Connect is the component of Kafka that provides data integration between databases, key-value stores, search indexes, file systems, and Kafka brokers.

kafka connect 是一个框架,用来帮助集成其他系统的数据到kafka,或者将kafka 的数据吐到其他系统,例如数据库,Elasticsearch 之类的外部系统。

在kafka connect 里,有 kafka woker, kafka connectorkafka connect plugin的概念。

一个kafka worker 就是一个instance,类似一个pod 这种独立的实例。

kafka connector 就是用来搬运数据的连接器。有 source connector 和 sink connector.

Source 就是数据来源的connector,sink 就是吐出数据的 connector.

例如

Connector TypeNamePurpose
SourceJdbcSourceConnectorPull data from relational DBs
SinkElasticsearchSinkConnectorPush Kafka data to Elasticsearch
SourceFileStreamSourceConnectorRead lines from file into Kafka

connector 简略配置

{"name": "my-jdbc-source","connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector","tasks.max": "1","connection.url": "jdbc:mysql://localhost:3306/mydb","topic.prefix": "mysql-"
}

A Kafka Connect Plugin is the packaged code (JAR files) that implements one or more connectors.

It’s usually installed by placing the plugin into Kafka Connect’s plugin.path directory.

A plugin might include:

  • The connector logic

  • Converters (e.g., JSON, Avro)

  • Transformations (optional logic to modify data)

Think of a connector as a configuration, and a plugin as the actual implementation that makes it work.

实现结构

从部署的角度来看,kafka connect 是一个独立的service cluster。
下面的docker-compose.yml 配置可以看出 cp-kafka-connect 这个image 就可可以load 在 CONNECT_PLUGIN_PATH 配置目录下的 connect plugin 来实现不同的 connector 功能。

confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.1 这个命令就是部署mysql 相关的source connector 到 CONNECT_PLUGIN_PATH 配置的目录usr/share/confluent-hub-components 下面。

kafka-connect:image: confluentinc/cp-kafka-connect:7.1.0-1-ubi8environment:CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-componentscommand:- bash- -c- |confluent-hub install --no-prompt debezium/debezium-connector-mysql:1.7.1/etc/confluent/docker/run

然后再通过kafka connect 的REST API 就可以enable 这个connector 了。

http://www.xdnf.cn/news/439327.html

相关文章:

  • 新能源汽车三电质量护盾:蓝光三维扫描技术显身手
  • 力扣每日一题之移动零
  • HTTP 连接复用机制详解
  • egpo进行train_egpo训练时,keyvalueError:“replay_sequence_length“
  • GoogleTest:GMock2 EXPECT_CALL
  • 数据结构基础排序算法
  • 【MySQL 基础篇】深入解析MySQL逻辑架构与查询执行流程
  • 【Ansys 2023 R2 Icepak】热管模型
  • 武汉科技大学人工智能与演化计算实验室许志伟课题组参加2025中国膜计算论坛
  • 【PostgreSQL数据分析实战:从数据清洗到可视化全流程】附录-B. 错误代码与解决方案
  • 论文阅读笔记——双流网络
  • 从阿里SDK学习请求-响应模式
  • 【Python】抽象基类ABC
  • [论文阅读]Formalizing and Benchmarking Prompt Injection Attacks and Defenses
  • 构建现代化WPF应用:数据驱动开发与高级特性解析
  • LeetCode 热题 100 230. 二叉搜索树中第 K 小的元素
  • 多模态论文笔记——NaViT
  • 2005-2022年各省绿色信贷水平测算数据(含原始数据+计算过程+计算结果)
  • 《AI大模型应知应会100篇》第61篇:FastAPI搭建大模型API服务
  • Vue3 区分开发环境与生产环境
  • PostgreSQL常用DML操作的锁类型归纳
  • 搜索二维矩阵 II
  • 【达梦数据库】超出全局hash join空间问题处理
  • 生活实用小工具-手机号归属地查询
  • PaddleNLP框架训练模型:使用SwanLab教程
  • 养生:拥抱健康生活的实用之道
  • URP相机如何将场景渲染定帧模糊绘制
  • PyTorch中mean(dim=1)的深度解析
  • P2168 NOI2015 荷马史诗
  • Kubernetes排错(十七) :kubelet日志报device or resource busy