当前位置: 首页 > web >正文

Doris 消费kafka消息

Doris 通过 Routine Load 功能来消费 Kafka 消息,这是一种自动化的、持续的数据导入方式。

核心概念:Routine Load

  • 持续消费:Doris 会作为一个消费者,持续地从 Kafka Topic 的一个或多个 Partition 中拉取数据。

  • ** exactly-once 语义**:Doris 能够保证数据不丢不重(在大多数正常场景下)。

  • 自动管理:你只需要定义一个例行导入作业,Doris 会自动管理消费偏移量 (offset)、故障转移和负载均衡。

  • 数据类型支持:支持导入 CSV、JSON 等格式的文本消息。


前提条件

  1. Doris 集群:已安装并正常运行。

  2. Kafka 集群:已安装并正常运行,并且有 Topic 和数据。

  3. 目标表:在 Doris 中创建好要导入数据的表。


步骤 1:在 Doris 中创建目标表

假设 Kafka 中的消息是用户行为日志,格式为 CSV:user_id, item_id, behavior, timestamp

sql

-- 在Doris中创建一个示例表
CREATE DATABASE IF NOT EXISTS example_db;
USE example_db;CREATE TABLE IF NOT EXISTS user_behavior (user_id INT,item_id INT,behavior VARCHAR(20),-- 假设Kafka中的时间戳是秒级或毫秒级,这里用BIGINT接收,后续可转为Doris的DATETIMEevent_time BIGINT,-- 生成一个计算列,将event_time转换为易读的日期时间格式dt AS FROM_UNIXTIME(event_time, '%Y-%m-%d %H:%i:%s'),-- 根据计算列dt生成一个分区列,按天分区date_label AS DATE(FROM_UNIXTIME(event_time))
) 
ENGINE=OLAP
DUPLICATE KEY(user_id, item_id, event_time) -- 明细模型
PARTITION BY RANGE(date_label) (PARTITION p202310 VALUES [('2023-10-01'), ('2023-11-01'))-- 可以动态添加分区,或使用动态分区特性
) 
DISTRIBUTED BY HASH(user_id) BUCKETS 10
PROPERTIES ("replication_num" = "3"
);

步骤 2:创建 Routine Load 作业

这是最核心的一步。我们将创建一个作业,将 Kafka 的 user_behavior_topic 中的数据持续导入到刚创建的 user_behavior 表中。

示例 1:导入 CSV 格式数据

假设 Kafka 消息是简单的 CSV 格式:123,456,pv,1698321234

sql

CREATE ROUTINE LOAD example_db.kafka_csv_load ON user_behavior
COLUMNS TERMINATED BY ",", -- 指定CSV列分隔符
COLUMNS (user_id, item_id, behavior, event_time) -- 定义Kafka消息中的列顺序,并映射到表字段
-- ROUTINE LOAD 高级属性
PROPERTIES
("desired_concurrent_number" = "3", -- 期望的并发任务数,通常小于等于Kafka分区数"max_error_number" = "1000",       -- 在达到此错误行数之前,作业不会暂停"max_batch_interval" = "20",       -- 每个子任务最大的执行间隔(秒)"max_batch_rows" = "200000",       -- 每个子任务最多消费的行数"max_batch_size" = "104857600"     -- 每个子任务最多消费的数据量(100MB)
)
FROM KAFKA
("kafka_broker_list" = "kafka_host1:9092,kafka_host2:9092,kafka_host3:9092", -- Kafka Broker列表"kafka_topic" = "user_behavior_topic", -- Topic名称"property.group.id" = "doris-routine-load-group", -- 消费组ID,用于偏移量管理"property.security.protocol" = "SASL_PLAINTEXT", -- 如果Kafka有安全认证,需要配置"property.sasl.mechanism" = "PLAIN","property.sasl.username" = "your_username","property.sasl.password" = "your_password"
);
示例 2:导入 JSON 格式数据(更常见)

假设 Kafka 消息是 JSON 格式:

json

{"user_id": 123, "item_id": 456, "behavior_type": "pv", "timestamp": 1698321234}

sql

CREATE ROUTINE LOAD example_db.kafka_json_load ON user_behavior
-- 使用jsonpaths来指定JSON字段的路径。'$'表示根目录
-- 你也可以使用json_root来指定根节点,如果JSON有外层嵌套的话
-- "json_root" = "$.data",
COLUMNS(user_id = $.user_id,item_id = $.item_id,behavior = $.behavior_type, -- 将JSON中的behavior_type字段映射到表的behavior列event_time = $.timestamp
)
PROPERTIES
("desired_concurrent_number" = "3","max_error_number" = "1000","strip_outer_array" = "true" -- 如果JSON消息是数组格式,如 [{"..."}, {"..."}],需要设置此属性为true以剥除外层数组
)
FROM KAFKA
("kafka_broker_list" = "kafka_host1:9092,kafka_host2:9092,kafka_host3:9092","kafka_topic" = "user_behavior_topic_json","property.group.id" = "doris-routine-load-group-json",-- 指定格式为json"format" = "json"
);

关键参数说明:

  • COLUMNS定义了 Kafka 消息中的字段与 Doris 表字段的映射关系。你可以在这里进行简单的计算和转换(如字段重命名)。

  • desired_concurrent_number建议设置成与 Kafka Topic 的分区数一致,以实现最大并行度。

  • max_error_number:允许的最大错误行数。超过此值,作业会自动暂停,需要手动检查原因并恢复。

  • property.group.id:Doris 会用这个消费组 ID 来存储消费偏移量。


步骤 3:监控和管理 Routine Load 作业

作业创建后,Doris 会自动开始消费数据。你需要监控其状态。

1. 查看所有 Routine Load 作业

sql

SHOW ROUTINE LOAD FOR example_db;
SHOW ALL ROUTINE LOAD \G; -- \G 用于格式化输出,更易读
2. 查看特定作业的详细状态

sql

SHOW ROUTINE LOAD FOR example_db.kafka_json_load \G;

关注以下字段:

  • StateRUNNING(运行中)、PAUSED(已暂停,需要查看 ReasonOfStateChanged)、NEED_SCHEDULE(等待调度)。

  • Progress:

    • committedOffset: 已提交的 Kafka offset。

    • visibleOffset: 已导入并对查询可见的 Kafka offset。

  • ReasonOfStateChanged: 如果状态是 PAUSED,这里会显示暂停原因(例如:Too many filtered rows 过滤行太多)。

3. 查看作业的错误数据

如果 max_error_number 设置得较大,作业不会暂停,但错误数据会被记录。

sql

SHOW ROUTINE LOAD ERROR WHERE JobName = 'kafka_json_load';
4. 控制作业
  • 暂停作业

    sql

    PAUSE ROUTINE LOAD FOR example_db.kafka_json_load;
  • 恢复已暂停的作业

    sql

    RESUME ROUTINE LOAD FOR example_db.kafka_json_load;
  • 停止作业(不可恢复):

    sql

    STOP ROUTINE LOAD FOR example_db.kafka_json_load;

常见问题与调优

  1. 导入速度跟不上

    • 检查 SHOW BACKENDS\G 查看集群负载。

    • 适当增加 desired_concurrent_number(但不要超过 Kafka 分区数)。

    • 调整 max_batch_interval 和 max_batch_size,让每个批次处理更多数据。

  2. 作业频繁 PAUSED

    • ReasonOfStateChanged: There are 1000 error rows in ...: 数据格式错误。检查 Kafka 消息格式是否与 COLUMNS 中定义的匹配。可以先用 max_error_number 容忍一些错误,或者修复数据源。

    • ReasonOfStateChanged: ...: 根据具体原因排查,如网络问题、表不存在等。

  3. 数据延迟

    • 使用 SHOW ROUTINE LOAD ...\G 查看 Lag 字段,它显示了尚未消费的 Kafka 消息数。

    • 如果 Lag 持续增长,说明消费速度跟不上生产速度,需要参考第 1 点进行调优。

总结

使用 Doris Routine Load 消费 Kafka 数据的流程非常清晰:

  1. 准备:创建 Doris 目标表。

  2. 创建作业:使用 CREATE ROUTINE LOAD 语句,明确定义 Kafka 源信息、数据格式和字段映射。

  3. 监控:使用 SHOW ROUTINE LOAD 和 SHOW ROUTINE LOAD ERROR 命令监控作业状态和错误。

  4. 管理:根据监控结果,使用 PAUSE/RESUME/STOP 对作业进行控制。

这种方式为构建实时数仓提供了稳定高效的数据管道。

http://www.xdnf.cn/news/20329.html

相关文章:

  • 通过PXE的方式实现Ubuntu 24.04 自动安装
  • 版本管理系统与平台(权威资料核对、深入解析、行业选型与国产平台补充)
  • 50.4k Star!我用这个神器,在五分钟内搭建了一个私有 Git 服务器!
  • 小程序的project.private.config.json是无依赖文件,那可以删除吗?
  • Aspose.Words for .NET 25.7:支持自建大语言模型(LLM),实现更安全灵活的AI文档处理功能
  • 《LangChain从入门到精通》系统学习教材大纲
  • java基础学习(四):类 - 了解什么是类,类中都有什么?
  • 25年下载chromedriver.140
  • 项目必备流程图,类图,E-R图实例速通
  • 面试 TOP101 贪心专题题解汇总Java版(BM95 —— BM96)
  • 实力登榜!美创科技荣膺数说安全《2025中国网络安全企业100强》
  • IDEA中Transaction翻译插件无法使用,重新配置Transaction插件方法
  • 基于飞算JavaAI的在线图书借阅平台设计实现
  • Process Explorer 学习笔记(第三章 3.2.2):定制可显示的列与数据保存
  • Linux 入门到精通,真的不用背命令!零基础小白靠「场景化学习法」,3 个月拿下运维 offer,第二十七天
  • Bug排查日记:从崩溃到修复的实战记录
  • Nginx +Tomcat架构的必要性与应用示例
  • Kafka 消息队列:揭秘海量数据流动的技术心脏
  • 具身智能多模态感知与场景理解:融合语言模型的多模态大模型
  • 【关系型数据库SQL】MySql数据库基础学习(一)
  • 高级RAG策略学习(五)——llama_index实现上下文窗口增强检索RAG
  • 在本地使用Node.js和Express框架来连接和操作远程数据库
  • 从“找新家”到“走向全球”,布尔云携手涂鸦智能开启机器人新冒险
  • 突发奇想,还未实践,在Vben5的Antd模式下,将表单从「JS 配置化」改写成「模板可视化」形式(豆包版)
  • langchain 提示模版 PromptTemplate
  • Coze源码分析-资源库-编辑提示词-后端源码
  • 苹果TF签名全称TestFlight签名,需要怎么做才可以上架呢?
  • 如何选择靠谱的软文推广平台?这份行业TOP5清单请查收~
  • AGENTS.md: AI编码代理的开放标准
  • RL【3】:Bellman Optimality Equation