Doris “_stream_load“ 方式批量导入数据
一、Stream Load简介
Doris 2.x 的 Stream Load 是一种高性能、实时的数据导入方式,适用于将 大量数据快速加载到 Doris 中
。它通过 HTTP 协议接收数据,并直接写入 Doris 的存储层,支持事务语义和高并发导入。
核心特点
- 实时性:数据导入后立即可见,延迟通常在秒级。
- 高吞吐量:支持批量数据导入,单节点导入速度可达 GB/分钟级别。
- 事务支持:导入操作是原子性的,要么全部成功,要么全部失败。
- 灵活的数据格式:支持 CSV、JSON 等格式。
- 部分更新:在 Unique Key 模型下支持按主键更新或插入数据。
基本语法
curl --location-trusted \-u username:password \-H "Expect: 100-continue" \-H "label:your_load_label" \-H "column_separator:," \-H "columns:col1,col2,col3" \-T /path/to/data.file \http://fe_host:fe_http_port/api/db_name/table_name/_stream_load
关键参数
参数 | 描述 | 示例 |
---|---|---|
Expect: 100-continue | 必须参数,告诉服务器在接收数据前先验证请求头。 | -H "Expect: 100-continue" |
label | 导入任务的唯一标识,用于幂等性控制。如果两次导入使用相同 label,第二次会被拒绝。 | -H "label:load_task_001" |
column_separator | 列分隔符,用于 CSV 格式。 | -H "column_separator:," |
columns | 指定导入数据的列映射关系,可重命名或计算列。 | -H "columns:id,name,age" |
partial_columns | 是否允许部分列导入(仅 Unique Key 模型支持),而非全部列。 | -H "partial_columns:true" |
strict_mode | 是否开启严格模式,关闭后允许数据中缺少列(使用默认值填充)。 | -H "strict_mode:false" |
max_filter_ratio | 最大过滤比例,超过此比例导入会失败。 | -H "max_filter_ratio:0.1" |
数据格式支持
1. CSV 格式
curl ... -H "column_separator:," -T data.csv ...
2. JSON 格式
curl ... -H "format:json" -H "jsonpaths:[$.id, $.name, $.age]" -T data.json ...
3. 带计算列
curl ... -H "columns:id, name, age, birthday=from_unixtime(ts)" -T data.csv ...
导入状态码
状态码 | 含义 |
---|---|
200 | 导入成功 |
307 | 需要重定向到其他 FE 节点 |
400 | 请求格式错误 |
500 | 服务器内部错误 |
高级特性
1. Unique Key 模型下的 UPSERT
在 Unique Key 表中,使用 ON DUPLICATE KEY UPDATE
语义:
curl ... -H "partial_columns:true" -H "strict_mode:false" ...
2. 分区表导入
指定分区导入,提高性能:
curl ... -H "partition: p202501,p202502" ...
3. 并发导入
使用不同的 label 并行执行多个 Stream Load 任务,提高吞吐量。
最佳实践
- 数据批量处理:将大文件拆分为 100MB-1GB 的小文件,并行导入。
- 避免重复导入:确保每次导入使用唯一的 label。
- 监控导入状态:通过 Doris 自带的
SHOW LOAD
命令查看导入进度。 - 错误处理:设置合理的
max_filter_ratio
处理脏数据。
示例:导入 CSV 数据
假设 users
表结构为:
CREATE TABLE users (id BIGINT,name VARCHAR(32),age INT
) ENGINE=OLAP
UNIQUE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 8;
导入命令:
curl --location-trusted \-u root: \-H "Expect: 100-continue" \-H "label:users_load_$(date +%s)" \-H "column_separator:," \-H "columns:id, name, age" \-T users.csv \http://fe_host:8030/api/test_db/users/_stream_load
常见问题及解决
-
导入失败:100-continue 错误
- 原因:缺少
Expect: 100-continue
头。 - 解决:添加
-H "Expect: 100-continue"
。
- 原因:缺少
-
部分列更新失败
- 原因:表不是 Unique Key 模型或未启用 Merge-On-Write。
- 解决:修改表属性
enable_unique_key_merge_on_write=true
。
-
性能瓶颈
- 原因:单个导入任务过大或并发不足。
- 解决:拆分文件并增加并发导入任务。
监控与诊断
-- 查看导入历史
SHOW LOAD WHERE LABEL LIKE 'users_load_%';
通过 Stream Load,我们可以高效地将实时数据导入 Doris,满足分析场景的低延迟需求。
二、实战案例
2.1 创建Doris表
假设在 Doris 中创建一张订单表 order_tbl
:
CREATE TABLE IF NOT EXISTS order_tbl (order_id BIGINT NOT NULL COMMENT '订单ID(Key列)',order_status VARCHAR(32) COMMENT '订单状态(Value列)',order_amount DECIMAL(15, 2) COMMENT '订单金额(Value列)'
) ENGINE=OLAP
UNIQUE KEY(`order_id`) -- 定义订单ID为主键
DISTRIBUTED BY HASH(`order_id`) BUCKETS 16 -- 哈希分桶
PROPERTIES ("replication_num" = "3", -- 副本数"storage_format" = "V2" -- 存储格式
);
关键说明:
-
表模型选择:
- 使用
UNIQUE KEY
模型确保相同order_id
的记录会自动去重,后写入的数据覆盖旧数据。 - 适合需要按主键进行更新的业务场景。
- 使用
-
字段设计:
order_id
:使用BIGINT
类型存储订单ID,并定义为主键(Key列)。order_status
:使用VARCHAR
存储订单状态(如 “PAID”、“CANCELLED” 等)。order_amount
:使用DECIMAL(15,2)
精确存储金额,避免浮点数精度问题。
-
数据分布:
DISTRIBUTED BY HASH(order_id)
:按订单ID哈希分桶,确保数据均匀分布。BUCKETS 16
:设置16个分桶,可根据集群规模调整。
-
存储属性:
replication_num = "3"
:每个数据分片保留3个副本,保证高可用。storage_format = "V2"
:使用最新存储格式,提升查询性能。
可选优化(根据业务需求):
-- 带分区和二级索引的增强版本
CREATE TABLE IF NOT EXISTS order_tbl (order_id BIGINT NOT NULL COMMENT '订单ID',order_date DATE NOT NULL COMMENT '订单日期',order_status VARCHAR(32) COMMENT '订单状态',order_amount DECIMAL(15, 2) COMMENT '订单金额',INDEX status_idx (`order_status`) USING BITMAP -- 为状态字段添加位图索引
) ENGINE=OLAP
UNIQUE KEY(`order_id`)
PARTITION BY RANGE(`order_date`) ( -- 按日期分区PARTITION p_202501 VALUES LESS THAN ("2025-02-01"),PARTITION p_202502 VALUES LESS THAN ("2025-03-01"),...
)
DISTRIBUTED BY HASH(`order_id`) BUCKETS 16
PROPERTIES ("replication_num" = "3","storage_format" = "V2"
);
使用建议:
-
数据写入:
- 使用
INSERT INTO order_tbl VALUES (...)
写入数据,重复的order_id
会自动更新。 - 支持批量写入(如
INSERT INTO ... VALUES (...) , (...) , (...)
)提升性能。
- 使用
-
查询优化:
- 高频过滤字段(如
order_status
)可添加二级索引。 - 按时间范围查询时,建议结合分区使用(如
WHERE order_date >= '2025-01-01'
)。
- 高频过滤字段(如
-
性能调优:
- 根据数据量调整
BUCKETS
数量(通常为 BE 节点数的整数倍)。 - 定期执行
ALTER TABLE order_tbl REFRESH INDEX ALL;
更新索引统计信息。
- 根据数据量调整
2.2 通过Stream Load方式导入数据
正常情况下,通过如下Stream Load命令,即可完成数据的正确导入。
curl --location-trusted \-u root: \-H "Expect: 100-continue" \-H "partial_columns:true" \ # 允许部分列-H "strict_mode:false" \ # 关闭严格模式(重要!)-H "column_separator:," \-H "columns:order_id,order_status" \ # 只提供部分列-T ./update.csv \http://192.168.1.1:8030/api/example_db/order_tbl/_stream_load
导入正确后,打印的统计日志如下:
{"TxnId": 2890991,"Label": "43a5d440-a5b0-4444-9a83-66610462dbf9","Comment": "","TwoPhaseCommit": "false","Status": "Success","Message": "OK","NumberTotalRows": 1,"NumberLoadedRows": 1,"NumberFilteredRows": 0,"NumberUnselectedRows": 0,"LoadBytes": 11,"LoadTimeMs": 267,"BeginTxnTimeMs": 3,"StreamLoadPutTimeMs": 4,"ReadDataTimeMs": 0,"WriteDataTimeMs": 234,"CommitAndPublishTimeMs": 22
}
2.3 错误的配置案例
但实际配置中,可能存在以下问题:
问题 1、 Doris FE端口误配成BE端口
zhangsan@DESKTOP-AAAA MINGW64 /d/Docs/测试/01-Doris/A1-Upsert
$ curl --location-trusted \
-u root: \
-H "partial_columns:true" \
-H "column_separator:," \
-H "columns:order_id,order_status" \
-T ./update.csv http://192.168.1.1:9030/api/example_db/order_tbl/_stream_loadcurl: (1) Received HTTP/0.9 when not allowed
如上,如果将Doris FE端口误配成BE端口,则可能报上述错误。
这个错误 Received HTTP/0.9 when not allowed
通常表示客户端(curl)期望接收 HTTP 1.0 或更高版本的响应,但服务器返回了 HTTP 0.9 格式的响应,这是一个过时的 HTTP 版本,现代客户端默认不支持。
可能的原因及解决方案
1. Doris 服务端口配置错误
如果使用的端口是 9030
,这通常是 Doris 的 BE 节点数据导入端口,而 Stream Load 需要通过 FE 节点的 HTTP 端口(默认 8030) 进行。
修正方法:
将 URL 中的端口改为 FE 的 HTTP 端口(通常是 8030):
curl --location-trusted -u root: -H "partial_columns:true" -H "column_separator:," -H "columns:order_id,order_status" -T ./update.csv http://192.168.1.1:8030/api/example_db/order_tbl/_stream_load
2. 网络代理或防火墙干扰
中间代理或防火墙可能会修改 HTTP 协议版本。
修正方法:
- 检查网络环境,确保没有代理拦截请求。
- 临时关闭防火墙测试:
# CentOS/RHEL systemctl stop firewalld# Ubuntu/Debian ufw disable
3. Doris FE 服务异常
FE 服务可能未正常启动或配置错误。
修正方法:
- 检查 FE 服务状态:
# 在 FE 节点上执行 sh /path/to/doris/fe/bin/fe.sh status
- 查看 FE 日志(
/path/to/doris/fe/log/fe.log
)确认是否有异常。
4. curl 版本兼容性问题
某些旧版本的 curl 可能对 HTTP 协议处理存在问题。
修正方法:
- 更新 curl 到最新版本:
# Ubuntu/Debian sudo apt-get update && sudo apt-get install curl# CentOS/RHEL sudo yum update curl
- 或使用
--http1.1
强制使用 HTTP 1.1:curl --http1.1 --location-trusted -u root: ...
5. Doris 配置检查
确认 FE 配置文件(fe.conf
)中 HTTP 端口是否正确:
# fe.conf
http_port = 8030 # 确保端口未被修改
修正后的命令示例
curl --location-trusted \-u root: \-H "Expect: 100-continue" \-H "partial_columns:true" \-H "column_separator:," \-H "columns:order_id,order_status" \-T ./update.csv \http://192.168.1.1:8030/api/example_db/order_tbl/_stream_load
关键修改:
- 将端口从
9030
改为8030
。 - 添加
Expect: 100-continue
头(可选但推荐)。
验证 Doris FE 端口
# 检查 FE HTTP 端口是否正常监听
telnet 192.168.1.1 8030# 如果无法连接,检查 FE 服务是否启动
sh /path/to/doris/fe/bin/start_fe.sh --daemon
通过以上步骤,可以解决 HTTP 版本不兼容的问题。
问题 2、 发送HTTP请求未经服务器确认问题
如下所示,请求报错:“There is no 100-continue header”
$ curl --location-trusted \
-u root: \
-H "partial_columns:true" \
-H "column_separator:," \
-H "columns:order_id,order_status" \
-T ./update.csv http://192.168.1.1:8030/api/example_db/order_tbl/_stream_load{"status":"FAILED","msg":"There is no 100-continue header"}
错误为:
{"status":"FAILED","msg":"There is no 100-continue header"}
原因:
Doris 在 Stream Load 过程中要求客户端发送 Expect: 100-continue
头部,但请求中缺少这个头部。
解决方案
在 curl 命令中添加 Expect: 100-continue
头部以满足 Doris 的要求:
curl --location-trusted \-u root: \-H "Expect: 100-continue" \-H "partial_columns:true" \-H "column_separator:," \-H "columns:order_id,order_status" \-T ./update.csv \http://192.168.1.1:8030/api/example_db/order_tbl/_stream_load
解释
Expect: 100-continue
:这是 HTTP 协议中的一个特性,客户端通过发送这个头部告诉服务器:在发送实际请求体之前,先等待服务器确认(状态码 100)。这样可以避免在服务器可能拒绝请求的情况下发送大量数据。- Doris 强制要求这个头部,以确保在接收大量数据前进行必要的验证。
其他注意事项
-
端口确认: Doris FE 节点的 HTTP 端口(默认是 8030)。
-
数据文件格式:确保
update.csv
文件格式正确,例如:1001,PAID 1002,PENDING
-
错误处理:如果仍然失败,可以检查 Doris FE 日志(
fe/log/fe.log
)获取更详细的错误信息。
替代方案:使用 Doris 官方工具
如果我们经常进行 Stream Load,可以考虑使用 Doris 提供的 Python 客户端工具 curl_load.py
:
python /path/to/doris/tools/curl_load.py \--host 192.168.1.1 \--port 8030 \--user root \--password '' \--db example_db \--table order_tbl \--file ./update.csv \--columns "order_id,order_status" \--column_separator "," \--partial_columns
这个工具会自动处理 Expect: 100-continue
等细节。
问题-3:Only unique key merge on write support partial update
执行Stream Load命令,报了如上错误。
通过反查建表语句
根据错误信息 Only unique key merge on write support partial update
,Doris 在处理 Stream Load 请求时拒绝了部分列更新,因为当前表模型不支持。这个错误与 Doris 的表模型和更新策略有关。
问题原因
Doris 中只有 Unique Key 模型支持真正的 UPSERT 语义(即主键冲突时自动更新),而你的表可能使用了其他模型(如 Aggregate Key 或 Duplicate Key)。
查看建表语句
SHOW CREATE TABLE `order_tbl`;-- 提取到建表语句
CREATE TABLE `order_tbl` (`order_id` bigint(20) NOT NULL COMMENT '订单ID(Key列)',`order_status` varchar(32) NULL COMMENT '订单状态(Value列)',`order_amount` DECIMAL(15, 2) NULL COMMENT '订单金额(Value列)'
) ENGINE=OLAP
UNIQUE KEY(`order_id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`order_id`) BUCKETS 16
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"is_being_synced" = "false",
"storage_format" = "V2",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false"
);
不同表模型的限制:
- Unique Key 模型:支持部分列更新,未提供的列保持原值。
- Aggregate Key 模型:只支持预定义的聚合操作(SUM、REPLACE 等),不支持部分列更新。
- Duplicate Key 模型:不支持更新,只支持插入新行。
解决方案
1. 确认并修改表模型为 Unique Key
如果表不是 Unique Key 模型,需要重建表:
-- 创建 Unique Key 模型表
CREATE TABLE IF NOT EXISTS order_tbl (order_id BIGINT NOT NULL COMMENT '订单ID',order_status VARCHAR(32) COMMENT '订单状态',order_amount DECIMAL(15, 2) DEFAULT 0.0 COMMENT '订单金额'
) ENGINE=OLAP
UNIQUE KEY(`order_id`) -- 必须定义 Unique Key
DISTRIBUTED BY HASH(`order_id`) BUCKETS 16;
2. 使用完整列导入(不推荐,但可临时解决)
如果无法修改表模型,可以在导入时提供所有列的值:
curl --location-trusted \-u root: \-H "Expect: 100-continue" \-H "column_separator:," \-H "columns:order_id,order_status,order_amount" \ # 包含所有列-T ./update.csv \http://192.168.1.1:8030/api/example_db/order_tbl/_stream_load
3. 使用 REPLACE 语义(适用于 Aggregate Key 模型)
如果表是 Aggregate Key 模型,且需要覆盖已有值,可以将字段定义为 REPLACE
类型:
-- 对于 Aggregate Key 模型
CREATE TABLE order_tbl (order_id BIGINT NOT NULL COMMENT '订单ID',order_status VARCHAR(32) REPLACE DEFAULT '' COMMENT '订单状态', -- 使用 REPLACE 聚合类型order_amount DECIMAL(15, 2) REPLACE DEFAULT 0.0 COMMENT '订单金额'
) ENGINE=OLAP
AGGREGATE KEY(`order_id`)
DISTRIBUTED BY HASH(`order_id`) BUCKETS 16;
最佳实践
- 优先使用 Unique Key 模型:如果需要频繁更新部分列,Unique Key 是最适合的选择。
- 避免混用表模型:在设计表时明确业务需求,选择合适的表模型。
- 验证表结构:导入前通过
DESC order_tbl;
确认表模型和列定义。
验证方法
-- 查看表结构和模型
DESC order_tbl;-- 结果示例(Unique Key 模型)
+--------------+---------------+------+-------+---------+-------+
| Field | Type | Null | Key | Default | Extra |
+--------------+---------------+------+-------+---------+-------+
| order_id | bigint | No | true | NULL | |
| order_status | varchar(32) | Yes | false | NULL | |
| order_amount | decimal(15,2) | Yes | false | 0.0 | |
+--------------+---------------+------+-------+---------+-------+
如果 Key
列包含 true
,则表示该列是 Unique Key 的一部分,支持部分更新。
三、Doris “Unique Key” 与 “Stream Load”实现数据批量新增OR更新
如下建表语句,使用了order_id作为Unique Key:
CREATE TABLE `order_tbl` (`order_id` bigint(20) NOT NULL COMMENT '订单ID(Key列)',`order_status` varchar(32) NULL COMMENT '订单状态(Value列)',`order_amount` DECIMAL(15, 2) NULL COMMENT '订单金额(Value列)'
) ENGINE=OLAP
UNIQUE KEY(`order_id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`order_id`) BUCKETS 16
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"is_being_synced" = "false",
"storage_format" = "V2",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false"
);
如果想实现数据批量新增OR更新,则需要考虑使用Doris 2.x 的 ** Unique Key 模型配置** 或 Stream Load 参数组合 上。
虽然在上述SQL里已正确使用 UNIQUE KEY
模型,但还需要确保以下几点:
配置要点
1. 启用 Merge-On-Write 存储格式
Doris 2.x 的 Unique Key 模型需要显式启用 Merge-On-Write (MOW) 存储格式才能支持部分列更新。
修改建表语句:
CREATE TABLE IF NOT EXISTS order_tbl (order_id BIGINT NOT NULL COMMENT '订单ID',order_status VARCHAR(32) COMMENT '订单状态',order_amount DECIMAL(15, 2) COMMENT '订单金额'
) ENGINE=OLAP
UNIQUE KEY(`order_id`)
DISTRIBUTED BY HASH(`order_id`) BUCKETS 16
PROPERTIES ("replication_num" = "3","storage_format" = "V2","enable_unique_key_merge_on_write" = "true" -- 启用 Merge-On-Write
);
2. 修改 Stream Load 参数
确保同时设置 partial_columns
和 strict_mode
:
curl --location-trusted \-u root: \-H "Expect: 100-continue" \-H "partial_columns:true" \ # 允许部分列-H "strict_mode:false" \ # 关闭严格模式(重要!)-H "column_separator:," \-H "columns:order_id,order_status" \ # 只提供部分列-T ./update.csv \http://192.168.1.1:8030/api/example_db/order_tbl/_stream_load
3. 验证表属性
确认表已启用 Merge-On-Write:
SHOW CREATE TABLE order_tbl;-- 检查输出中是否包含:
-- "enable_unique_key_merge_on_write" = "true"
不支持部分列更新原因分析
-
Merge-On-Write 未启用:
- Doris 2.x 的 Unique Key 模型默认使用 Write-Once (WOF) 存储格式,不支持部分列更新。
- 必须通过
enable_unique_key_merge_on_write=true
显式启用 MOW。
-
strict_mode 冲突:
- 当
strict_mode=true
(默认值)时,Doris 要求所有非 NULL 列必须在导入数据中出现。 - 关闭
strict_mode
允许部分列缺失,使用默认值填充。
- 当
验证测试
假设表中已有数据:
order_id | order_status | order_amount |
---|---|---|
1001 | PENDING | 100.00 |
执行以下操作:
-
创建包含
order_id
和order_status
的 CSV 文件:1001,PAID
-
使用上述修正后的 Stream Load 命令导入。
-
查询结果应变为:
order_id order_status order_amount 1001 PAID 100.00
其他注意事项
- 升级影响:如果是从旧版本升级到 2.x,需要重建表以应用新的存储格式。
- 性能考虑:MOW 格式在写入时会增加一些开销,但支持更灵活的更新操作。
- 批量更新:对于大量数据,建议使用
BATCH
模式减少事务开销。