Doris 消费kafka消息
Doris 通过 Routine Load 功能来消费 Kafka 消息,这是一种自动化的、持续的数据导入方式。
核心概念:Routine Load
持续消费:Doris 会作为一个消费者,持续地从 Kafka Topic 的一个或多个 Partition 中拉取数据。
** exactly-once 语义**:Doris 能够保证数据不丢不重(在大多数正常场景下)。
自动管理:你只需要定义一个例行导入作业,Doris 会自动管理消费偏移量 (offset)、故障转移和负载均衡。
数据类型支持:支持导入 CSV、JSON 等格式的文本消息。
前提条件
Doris 集群:已安装并正常运行。
Kafka 集群:已安装并正常运行,并且有 Topic 和数据。
目标表:在 Doris 中创建好要导入数据的表。
步骤 1:在 Doris 中创建目标表
假设 Kafka 中的消息是用户行为日志,格式为 CSV:user_id, item_id, behavior, timestamp
。
sql
-- 在Doris中创建一个示例表 CREATE DATABASE IF NOT EXISTS example_db; USE example_db;CREATE TABLE IF NOT EXISTS user_behavior (user_id INT,item_id INT,behavior VARCHAR(20),-- 假设Kafka中的时间戳是秒级或毫秒级,这里用BIGINT接收,后续可转为Doris的DATETIMEevent_time BIGINT,-- 生成一个计算列,将event_time转换为易读的日期时间格式dt AS FROM_UNIXTIME(event_time, '%Y-%m-%d %H:%i:%s'),-- 根据计算列dt生成一个分区列,按天分区date_label AS DATE(FROM_UNIXTIME(event_time)) ) ENGINE=OLAP DUPLICATE KEY(user_id, item_id, event_time) -- 明细模型 PARTITION BY RANGE(date_label) (PARTITION p202310 VALUES [('2023-10-01'), ('2023-11-01'))-- 可以动态添加分区,或使用动态分区特性 ) DISTRIBUTED BY HASH(user_id) BUCKETS 10 PROPERTIES ("replication_num" = "3" );
步骤 2:创建 Routine Load 作业
这是最核心的一步。我们将创建一个作业,将 Kafka 的 user_behavior_topic
中的数据持续导入到刚创建的 user_behavior
表中。
示例 1:导入 CSV 格式数据
假设 Kafka 消息是简单的 CSV 格式:123,456,pv,1698321234
sql
CREATE ROUTINE LOAD example_db.kafka_csv_load ON user_behavior COLUMNS TERMINATED BY ",", -- 指定CSV列分隔符 COLUMNS (user_id, item_id, behavior, event_time) -- 定义Kafka消息中的列顺序,并映射到表字段 -- ROUTINE LOAD 高级属性 PROPERTIES ("desired_concurrent_number" = "3", -- 期望的并发任务数,通常小于等于Kafka分区数"max_error_number" = "1000", -- 在达到此错误行数之前,作业不会暂停"max_batch_interval" = "20", -- 每个子任务最大的执行间隔(秒)"max_batch_rows" = "200000", -- 每个子任务最多消费的行数"max_batch_size" = "104857600" -- 每个子任务最多消费的数据量(100MB) ) FROM KAFKA ("kafka_broker_list" = "kafka_host1:9092,kafka_host2:9092,kafka_host3:9092", -- Kafka Broker列表"kafka_topic" = "user_behavior_topic", -- Topic名称"property.group.id" = "doris-routine-load-group", -- 消费组ID,用于偏移量管理"property.security.protocol" = "SASL_PLAINTEXT", -- 如果Kafka有安全认证,需要配置"property.sasl.mechanism" = "PLAIN","property.sasl.username" = "your_username","property.sasl.password" = "your_password" );
示例 2:导入 JSON 格式数据(更常见)
假设 Kafka 消息是 JSON 格式:
json
{"user_id": 123, "item_id": 456, "behavior_type": "pv", "timestamp": 1698321234}
sql
CREATE ROUTINE LOAD example_db.kafka_json_load ON user_behavior -- 使用jsonpaths来指定JSON字段的路径。'$'表示根目录 -- 你也可以使用json_root来指定根节点,如果JSON有外层嵌套的话 -- "json_root" = "$.data", COLUMNS(user_id = $.user_id,item_id = $.item_id,behavior = $.behavior_type, -- 将JSON中的behavior_type字段映射到表的behavior列event_time = $.timestamp ) PROPERTIES ("desired_concurrent_number" = "3","max_error_number" = "1000","strip_outer_array" = "true" -- 如果JSON消息是数组格式,如 [{"..."}, {"..."}],需要设置此属性为true以剥除外层数组 ) FROM KAFKA ("kafka_broker_list" = "kafka_host1:9092,kafka_host2:9092,kafka_host3:9092","kafka_topic" = "user_behavior_topic_json","property.group.id" = "doris-routine-load-group-json",-- 指定格式为json"format" = "json" );
关键参数说明:
COLUMNS
:定义了 Kafka 消息中的字段与 Doris 表字段的映射关系。你可以在这里进行简单的计算和转换(如字段重命名)。desired_concurrent_number
:建议设置成与 Kafka Topic 的分区数一致,以实现最大并行度。max_error_number
:允许的最大错误行数。超过此值,作业会自动暂停,需要手动检查原因并恢复。property.group.id
:Doris 会用这个消费组 ID 来存储消费偏移量。
步骤 3:监控和管理 Routine Load 作业
作业创建后,Doris 会自动开始消费数据。你需要监控其状态。
1. 查看所有 Routine Load 作业
sql
SHOW ROUTINE LOAD FOR example_db; SHOW ALL ROUTINE LOAD \G; -- \G 用于格式化输出,更易读
2. 查看特定作业的详细状态
sql
SHOW ROUTINE LOAD FOR example_db.kafka_json_load \G;
关注以下字段:
State
:RUNNING
(运行中)、PAUSED
(已暂停,需要查看ReasonOfStateChanged
)、NEED_SCHEDULE
(等待调度)。Progress
:committedOffset
: 已提交的 Kafka offset。visibleOffset
: 已导入并对查询可见的 Kafka offset。
ReasonOfStateChanged
: 如果状态是PAUSED
,这里会显示暂停原因(例如:Too many filtered rows
过滤行太多)。
3. 查看作业的错误数据
如果 max_error_number
设置得较大,作业不会暂停,但错误数据会被记录。
sql
SHOW ROUTINE LOAD ERROR WHERE JobName = 'kafka_json_load';
4. 控制作业
暂停作业:
sql
PAUSE ROUTINE LOAD FOR example_db.kafka_json_load;
恢复已暂停的作业:
sql
RESUME ROUTINE LOAD FOR example_db.kafka_json_load;
停止作业(不可恢复):
sql
STOP ROUTINE LOAD FOR example_db.kafka_json_load;
常见问题与调优
导入速度跟不上:
检查
SHOW BACKENDS\G
查看集群负载。适当增加
desired_concurrent_number
(但不要超过 Kafka 分区数)。调整
max_batch_interval
和max_batch_size
,让每个批次处理更多数据。
作业频繁 PAUSED:
ReasonOfStateChanged: There are 1000 error rows in ...
: 数据格式错误。检查 Kafka 消息格式是否与COLUMNS
中定义的匹配。可以先用max_error_number
容忍一些错误,或者修复数据源。ReasonOfStateChanged: ...
: 根据具体原因排查,如网络问题、表不存在等。
数据延迟:
使用
SHOW ROUTINE LOAD ...\G
查看Lag
字段,它显示了尚未消费的 Kafka 消息数。如果 Lag 持续增长,说明消费速度跟不上生产速度,需要参考第 1 点进行调优。
总结
使用 Doris Routine Load 消费 Kafka 数据的流程非常清晰:
准备:创建 Doris 目标表。
创建作业:使用
CREATE ROUTINE LOAD
语句,明确定义 Kafka 源信息、数据格式和字段映射。监控:使用
SHOW ROUTINE LOAD
和SHOW ROUTINE LOAD ERROR
命令监控作业状态和错误。管理:根据监控结果,使用
PAUSE
/RESUME
/STOP
对作业进行控制。
这种方式为构建实时数仓提供了稳定高效的数据管道。