当前位置: 首页 > news >正文

Doris “_stream_load“ 方式批量导入数据

一、Stream Load简介

Doris 2.x 的 Stream Load 是一种高性能、实时的数据导入方式,适用于将 大量数据快速加载到 Doris 中 。它通过 HTTP 协议接收数据,并直接写入 Doris 的存储层,支持事务语义和高并发导入。

核心特点

  1. 实时性:数据导入后立即可见,延迟通常在秒级。
  2. 高吞吐量:支持批量数据导入,单节点导入速度可达 GB/分钟级别。
  3. 事务支持:导入操作是原子性的,要么全部成功,要么全部失败。
  4. 灵活的数据格式:支持 CSV、JSON 等格式。
  5. 部分更新:在 Unique Key 模型下支持按主键更新或插入数据。

基本语法

curl --location-trusted \-u username:password \-H "Expect: 100-continue" \-H "label:your_load_label" \-H "column_separator:," \-H "columns:col1,col2,col3" \-T /path/to/data.file \http://fe_host:fe_http_port/api/db_name/table_name/_stream_load

关键参数

参数描述示例
Expect: 100-continue必须参数,告诉服务器在接收数据前先验证请求头。-H "Expect: 100-continue"
label导入任务的唯一标识,用于幂等性控制。如果两次导入使用相同 label,第二次会被拒绝。-H "label:load_task_001"
column_separator列分隔符,用于 CSV 格式。-H "column_separator:,"
columns指定导入数据的列映射关系,可重命名或计算列。-H "columns:id,name,age"
partial_columns是否允许部分列导入(仅 Unique Key 模型支持),而非全部列。-H "partial_columns:true"
strict_mode是否开启严格模式,关闭后允许数据中缺少列(使用默认值填充)。-H "strict_mode:false"
max_filter_ratio最大过滤比例,超过此比例导入会失败。-H "max_filter_ratio:0.1"

数据格式支持

1. CSV 格式
curl ... -H "column_separator:," -T data.csv ...
2. JSON 格式
curl ... -H "format:json" -H "jsonpaths:[$.id, $.name, $.age]" -T data.json ...
3. 带计算列
curl ... -H "columns:id, name, age, birthday=from_unixtime(ts)" -T data.csv ...

导入状态码

状态码含义
200导入成功
307需要重定向到其他 FE 节点
400请求格式错误
500服务器内部错误

高级特性

1. Unique Key 模型下的 UPSERT

在 Unique Key 表中,使用 ON DUPLICATE KEY UPDATE 语义:

curl ... -H "partial_columns:true" -H "strict_mode:false" ...
2. 分区表导入

指定分区导入,提高性能:

curl ... -H "partition: p202501,p202502" ...
3. 并发导入

使用不同的 label 并行执行多个 Stream Load 任务,提高吞吐量。

最佳实践

  1. 数据批量处理:将大文件拆分为 100MB-1GB 的小文件,并行导入。
  2. 避免重复导入:确保每次导入使用唯一的 label。
  3. 监控导入状态:通过 Doris 自带的 SHOW LOAD 命令查看导入进度。
  4. 错误处理:设置合理的 max_filter_ratio 处理脏数据。

示例:导入 CSV 数据

假设 users 表结构为:

CREATE TABLE users (id BIGINT,name VARCHAR(32),age INT
) ENGINE=OLAP
UNIQUE KEY(id)
DISTRIBUTED BY HASH(id) BUCKETS 8;

导入命令:

curl --location-trusted \-u root: \-H "Expect: 100-continue" \-H "label:users_load_$(date +%s)" \-H "column_separator:," \-H "columns:id, name, age" \-T users.csv \http://fe_host:8030/api/test_db/users/_stream_load

常见问题及解决

  1. 导入失败:100-continue 错误

    • 原因:缺少 Expect: 100-continue 头。
    • 解决:添加 -H "Expect: 100-continue"
  2. 部分列更新失败

    • 原因:表不是 Unique Key 模型或未启用 Merge-On-Write。
    • 解决:修改表属性 enable_unique_key_merge_on_write=true
  3. 性能瓶颈

    • 原因:单个导入任务过大或并发不足。
    • 解决:拆分文件并增加并发导入任务。

监控与诊断

-- 查看导入历史
SHOW LOAD WHERE LABEL LIKE 'users_load_%';

通过 Stream Load,我们可以高效地将实时数据导入 Doris,满足分析场景的低延迟需求。

二、实战案例

2.1 创建Doris表

假设在 Doris 中创建一张订单表 order_tbl

CREATE TABLE IF NOT EXISTS order_tbl (order_id BIGINT NOT NULL COMMENT '订单ID(Key列)',order_status VARCHAR(32) COMMENT '订单状态(Value列)',order_amount DECIMAL(15, 2) COMMENT '订单金额(Value列)'
) ENGINE=OLAP
UNIQUE KEY(`order_id`)  -- 定义订单ID为主键
DISTRIBUTED BY HASH(`order_id`) BUCKETS 16  -- 哈希分桶
PROPERTIES ("replication_num" = "3",  -- 副本数"storage_format" = "V2"   -- 存储格式
);

关键说明:

  1. 表模型选择

    • 使用 UNIQUE KEY 模型确保相同 order_id 的记录会自动去重,后写入的数据覆盖旧数据。
    • 适合需要按主键进行更新的业务场景。
  2. 字段设计

    • order_id:使用 BIGINT 类型存储订单ID,并定义为主键(Key列)。
    • order_status:使用 VARCHAR 存储订单状态(如 “PAID”、“CANCELLED” 等)。
    • order_amount:使用 DECIMAL(15,2) 精确存储金额,避免浮点数精度问题。
  3. 数据分布

    • DISTRIBUTED BY HASH(order_id):按订单ID哈希分桶,确保数据均匀分布。
    • BUCKETS 16:设置16个分桶,可根据集群规模调整。
  4. 存储属性

    • replication_num = "3":每个数据分片保留3个副本,保证高可用。
    • storage_format = "V2":使用最新存储格式,提升查询性能。

可选优化(根据业务需求):

-- 带分区和二级索引的增强版本
CREATE TABLE IF NOT EXISTS order_tbl (order_id BIGINT NOT NULL COMMENT '订单ID',order_date DATE NOT NULL COMMENT '订单日期',order_status VARCHAR(32) COMMENT '订单状态',order_amount DECIMAL(15, 2) COMMENT '订单金额',INDEX status_idx (`order_status`) USING BITMAP  -- 为状态字段添加位图索引
) ENGINE=OLAP
UNIQUE KEY(`order_id`)
PARTITION BY RANGE(`order_date`) (  -- 按日期分区PARTITION p_202501 VALUES LESS THAN ("2025-02-01"),PARTITION p_202502 VALUES LESS THAN ("2025-03-01"),...
)
DISTRIBUTED BY HASH(`order_id`) BUCKETS 16
PROPERTIES ("replication_num" = "3","storage_format" = "V2"
);

使用建议:

  1. 数据写入

    • 使用 INSERT INTO order_tbl VALUES (...) 写入数据,重复的 order_id 会自动更新。
    • 支持批量写入(如 INSERT INTO ... VALUES (...) , (...) , (...))提升性能。
  2. 查询优化

    • 高频过滤字段(如 order_status)可添加二级索引。
    • 按时间范围查询时,建议结合分区使用(如 WHERE order_date >= '2025-01-01')。
  3. 性能调优

    • 根据数据量调整 BUCKETS 数量(通常为 BE 节点数的整数倍)。
    • 定期执行 ALTER TABLE order_tbl REFRESH INDEX ALL; 更新索引统计信息。

2.2 通过Stream Load方式导入数据

正常情况下,通过如下Stream Load命令,即可完成数据的正确导入。

curl --location-trusted \-u root: \-H "Expect: 100-continue" \-H "partial_columns:true" \  # 允许部分列-H "strict_mode:false" \    # 关闭严格模式(重要!)-H "column_separator:," \-H "columns:order_id,order_status" \  # 只提供部分列-T ./update.csv \http://192.168.1.1:8030/api/example_db/order_tbl/_stream_load

导入正确后,打印的统计日志如下:

{"TxnId": 2890991,"Label": "43a5d440-a5b0-4444-9a83-66610462dbf9","Comment": "","TwoPhaseCommit": "false","Status": "Success","Message": "OK","NumberTotalRows": 1,"NumberLoadedRows": 1,"NumberFilteredRows": 0,"NumberUnselectedRows": 0,"LoadBytes": 11,"LoadTimeMs": 267,"BeginTxnTimeMs": 3,"StreamLoadPutTimeMs": 4,"ReadDataTimeMs": 0,"WriteDataTimeMs": 234,"CommitAndPublishTimeMs": 22
}

2.3 错误的配置案例

但实际配置中,可能存在以下问题:

问题 1、 Doris FE端口误配成BE端口

zhangsan@DESKTOP-AAAA MINGW64 /d/Docs/测试/01-Doris/A1-Upsert
$ curl  --location-trusted \ 
-u root: \
-H "partial_columns:true" \
-H "column_separator:," \
-H "columns:order_id,order_status" \
-T ./update.csv http://192.168.1.1:9030/api/example_db/order_tbl/_stream_loadcurl: (1) Received HTTP/0.9 when not allowed

如上,如果将Doris FE端口误配成BE端口,则可能报上述错误。

这个错误 Received HTTP/0.9 when not allowed 通常表示客户端(curl)期望接收 HTTP 1.0 或更高版本的响应,但服务器返回了 HTTP 0.9 格式的响应,这是一个过时的 HTTP 版本,现代客户端默认不支持。

可能的原因及解决方案

1. Doris 服务端口配置错误

如果使用的端口是 9030,这通常是 Doris 的 BE 节点数据导入端口,而 Stream Load 需要通过 FE 节点的 HTTP 端口(默认 8030) 进行。

修正方法
将 URL 中的端口改为 FE 的 HTTP 端口(通常是 8030):

curl --location-trusted -u root: -H "partial_columns:true" -H "column_separator:," -H "columns:order_id,order_status" -T ./update.csv http://192.168.1.1:8030/api/example_db/order_tbl/_stream_load
2. 网络代理或防火墙干扰

中间代理或防火墙可能会修改 HTTP 协议版本。

修正方法

  • 检查网络环境,确保没有代理拦截请求。
  • 临时关闭防火墙测试:
    # CentOS/RHEL
    systemctl stop firewalld# Ubuntu/Debian
    ufw disable
    
3. Doris FE 服务异常

FE 服务可能未正常启动或配置错误。

修正方法

  • 检查 FE 服务状态:
    # 在 FE 节点上执行
    sh /path/to/doris/fe/bin/fe.sh status
    
  • 查看 FE 日志(/path/to/doris/fe/log/fe.log)确认是否有异常。
4. curl 版本兼容性问题

某些旧版本的 curl 可能对 HTTP 协议处理存在问题。

修正方法

  • 更新 curl 到最新版本:
    # Ubuntu/Debian
    sudo apt-get update && sudo apt-get install curl# CentOS/RHEL
    sudo yum update curl
    
  • 或使用 --http1.1 强制使用 HTTP 1.1:
    curl --http1.1 --location-trusted -u root: ...
    
5. Doris 配置检查

确认 FE 配置文件(fe.conf)中 HTTP 端口是否正确:

# fe.conf
http_port = 8030  # 确保端口未被修改

修正后的命令示例

curl --location-trusted \-u root: \-H "Expect: 100-continue" \-H "partial_columns:true" \-H "column_separator:," \-H "columns:order_id,order_status" \-T ./update.csv \http://192.168.1.1:8030/api/example_db/order_tbl/_stream_load

关键修改

  • 将端口从 9030 改为 8030
  • 添加 Expect: 100-continue 头(可选但推荐)。

验证 Doris FE 端口

# 检查 FE HTTP 端口是否正常监听
telnet 192.168.1.1 8030# 如果无法连接,检查 FE 服务是否启动
sh /path/to/doris/fe/bin/start_fe.sh --daemon

通过以上步骤,可以解决 HTTP 版本不兼容的问题。

问题 2、 发送HTTP请求未经服务器确认问题

如下所示,请求报错:“There is no 100-continue header”

$ curl  --location-trusted \ 
-u root: \
-H "partial_columns:true" \
-H "column_separator:," \
-H "columns:order_id,order_status" \
-T ./update.csv http://192.168.1.1:8030/api/example_db/order_tbl/_stream_load{"status":"FAILED","msg":"There is no 100-continue header"}

错误为:
{"status":"FAILED","msg":"There is no 100-continue header"}

原因:
Doris 在 Stream Load 过程中要求客户端发送 Expect: 100-continue 头部,但请求中缺少这个头部。

解决方案

在 curl 命令中添加 Expect: 100-continue 头部以满足 Doris 的要求:

curl --location-trusted \-u root: \-H "Expect: 100-continue" \-H "partial_columns:true" \-H "column_separator:," \-H "columns:order_id,order_status" \-T ./update.csv \http://192.168.1.1:8030/api/example_db/order_tbl/_stream_load

解释

  • Expect: 100-continue:这是 HTTP 协议中的一个特性,客户端通过发送这个头部告诉服务器:在发送实际请求体之前,先等待服务器确认(状态码 100)。这样可以避免在服务器可能拒绝请求的情况下发送大量数据。
  • Doris 强制要求这个头部,以确保在接收大量数据前进行必要的验证。

其他注意事项

  1. 端口确认: Doris FE 节点的 HTTP 端口(默认是 8030)。

  2. 数据文件格式:确保 update.csv 文件格式正确,例如:

    1001,PAID
    1002,PENDING
    
  3. 错误处理:如果仍然失败,可以检查 Doris FE 日志(fe/log/fe.log)获取更详细的错误信息。

替代方案:使用 Doris 官方工具

如果我们经常进行 Stream Load,可以考虑使用 Doris 提供的 Python 客户端工具 curl_load.py

python /path/to/doris/tools/curl_load.py \--host 192.168.1.1 \--port 8030 \--user root \--password '' \--db example_db \--table order_tbl \--file ./update.csv \--columns "order_id,order_status" \--column_separator "," \--partial_columns

这个工具会自动处理 Expect: 100-continue 等细节。

问题-3:Only unique key merge on write support partial update

执行Stream Load命令,报了如上错误。

通过反查建表语句

根据错误信息 Only unique key merge on write support partial update,Doris 在处理 Stream Load 请求时拒绝了部分列更新,因为当前表模型不支持。这个错误与 Doris 的表模型更新策略有关。

问题原因

Doris 中只有 Unique Key 模型支持真正的 UPSERT 语义(即主键冲突时自动更新),而你的表可能使用了其他模型(如 Aggregate Key 或 Duplicate Key)。

查看建表语句

SHOW CREATE TABLE `order_tbl`;-- 提取到建表语句
CREATE TABLE `order_tbl` (`order_id` bigint(20) NOT NULL COMMENT '订单ID(Key列)',`order_status` varchar(32) NULL COMMENT '订单状态(Value列)',`order_amount` DECIMAL(15, 2) NULL COMMENT '订单金额(Value列)'
) ENGINE=OLAP
UNIQUE KEY(`order_id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`order_id`) BUCKETS 16
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"is_being_synced" = "false",
"storage_format" = "V2",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false"
);
不同表模型的限制:
  1. Unique Key 模型:支持部分列更新,未提供的列保持原值。
  2. Aggregate Key 模型:只支持预定义的聚合操作(SUM、REPLACE 等),不支持部分列更新。
  3. Duplicate Key 模型:不支持更新,只支持插入新行。

解决方案

1. 确认并修改表模型为 Unique Key

如果表不是 Unique Key 模型,需要重建表:

-- 创建 Unique Key 模型表
CREATE TABLE IF NOT EXISTS order_tbl (order_id BIGINT NOT NULL COMMENT '订单ID',order_status VARCHAR(32) COMMENT '订单状态',order_amount DECIMAL(15, 2) DEFAULT 0.0 COMMENT '订单金额'
) ENGINE=OLAP
UNIQUE KEY(`order_id`)  -- 必须定义 Unique Key
DISTRIBUTED BY HASH(`order_id`) BUCKETS 16;
2. 使用完整列导入(不推荐,但可临时解决)

如果无法修改表模型,可以在导入时提供所有列的值:

curl --location-trusted \-u root: \-H "Expect: 100-continue" \-H "column_separator:," \-H "columns:order_id,order_status,order_amount" \  # 包含所有列-T ./update.csv \http://192.168.1.1:8030/api/example_db/order_tbl/_stream_load
3. 使用 REPLACE 语义(适用于 Aggregate Key 模型)

如果表是 Aggregate Key 模型,且需要覆盖已有值,可以将字段定义为 REPLACE 类型:

-- 对于 Aggregate Key 模型
CREATE TABLE order_tbl (order_id BIGINT NOT NULL COMMENT '订单ID',order_status VARCHAR(32) REPLACE DEFAULT '' COMMENT '订单状态',  -- 使用 REPLACE 聚合类型order_amount DECIMAL(15, 2) REPLACE DEFAULT 0.0 COMMENT '订单金额'
) ENGINE=OLAP
AGGREGATE KEY(`order_id`)
DISTRIBUTED BY HASH(`order_id`) BUCKETS 16;

最佳实践

  1. 优先使用 Unique Key 模型:如果需要频繁更新部分列,Unique Key 是最适合的选择。
  2. 避免混用表模型:在设计表时明确业务需求,选择合适的表模型。
  3. 验证表结构:导入前通过 DESC order_tbl; 确认表模型和列定义。

验证方法

-- 查看表结构和模型
DESC order_tbl;-- 结果示例(Unique Key 模型)
+--------------+---------------+------+-------+---------+-------+
| Field        | Type          | Null | Key   | Default | Extra |
+--------------+---------------+------+-------+---------+-------+
| order_id     | bigint        | No   | true  | NULL    |       |
| order_status | varchar(32)   | Yes  | false | NULL    |       |
| order_amount | decimal(15,2) | Yes  | false | 0.0     |       |
+--------------+---------------+------+-------+---------+-------+

如果 Key 列包含 true,则表示该列是 Unique Key 的一部分,支持部分更新。

三、Doris “Unique Key” 与 “Stream Load”实现数据批量新增OR更新

如下建表语句,使用了order_id作为Unique Key:

CREATE TABLE `order_tbl` (`order_id` bigint(20) NOT NULL COMMENT '订单ID(Key列)',`order_status` varchar(32) NULL COMMENT '订单状态(Value列)',`order_amount` DECIMAL(15, 2) NULL COMMENT '订单金额(Value列)'
) ENGINE=OLAP
UNIQUE KEY(`order_id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`order_id`) BUCKETS 16
PROPERTIES (
"replication_allocation" = "tag.location.default: 3",
"is_being_synced" = "false",
"storage_format" = "V2",
"light_schema_change" = "true",
"disable_auto_compaction" = "false",
"enable_single_replica_compaction" = "false"
);

如果想实现数据批量新增OR更新,则需要考虑使用Doris 2.x 的 ** Unique Key 模型配置** 或 Stream Load 参数组合 上。

虽然在上述SQL里已正确使用 UNIQUE KEY 模型,但还需要确保以下几点:

配置要点

1. 启用 Merge-On-Write 存储格式

Doris 2.x 的 Unique Key 模型需要显式启用 Merge-On-Write (MOW) 存储格式才能支持部分列更新。

修改建表语句:

CREATE TABLE IF NOT EXISTS order_tbl (order_id BIGINT NOT NULL COMMENT '订单ID',order_status VARCHAR(32) COMMENT '订单状态',order_amount DECIMAL(15, 2) COMMENT '订单金额'
) ENGINE=OLAP
UNIQUE KEY(`order_id`)
DISTRIBUTED BY HASH(`order_id`) BUCKETS 16
PROPERTIES ("replication_num" = "3","storage_format" = "V2","enable_unique_key_merge_on_write" = "true"  -- 启用 Merge-On-Write
);
2. 修改 Stream Load 参数

确保同时设置 partial_columnsstrict_mode

curl --location-trusted \-u root: \-H "Expect: 100-continue" \-H "partial_columns:true" \  # 允许部分列-H "strict_mode:false" \    # 关闭严格模式(重要!)-H "column_separator:," \-H "columns:order_id,order_status" \  # 只提供部分列-T ./update.csv \http://192.168.1.1:8030/api/example_db/order_tbl/_stream_load
3. 验证表属性

确认表已启用 Merge-On-Write:

SHOW CREATE TABLE order_tbl;-- 检查输出中是否包含:
-- "enable_unique_key_merge_on_write" = "true"

不支持部分列更新原因分析

  1. Merge-On-Write 未启用

    • Doris 2.x 的 Unique Key 模型默认使用 Write-Once (WOF) 存储格式,不支持部分列更新。
    • 必须通过 enable_unique_key_merge_on_write=true 显式启用 MOW。
  2. strict_mode 冲突

    • strict_mode=true(默认值)时,Doris 要求所有非 NULL 列必须在导入数据中出现。
    • 关闭 strict_mode 允许部分列缺失,使用默认值填充。

验证测试

假设表中已有数据:

order_idorder_statusorder_amount
1001PENDING100.00

执行以下操作:

  1. 创建包含 order_idorder_status 的 CSV 文件:

    1001,PAID
    
  2. 使用上述修正后的 Stream Load 命令导入。

  3. 查询结果应变为:

    order_idorder_statusorder_amount
    1001PAID100.00

其他注意事项

  • 升级影响:如果是从旧版本升级到 2.x,需要重建表以应用新的存储格式。
  • 性能考虑:MOW 格式在写入时会增加一些开销,但支持更灵活的更新操作。
  • 批量更新:对于大量数据,建议使用 BATCH 模式减少事务开销。
http://www.xdnf.cn/news/961273.html

相关文章:

  • Remmina远程访问如何开启本地音频?
  • (41)课60--61高级篇: MySQL体系结构(连接层、服务层、引擎层、存储层)。存储引擎是基于表的,可为不同表指定不同的存储引擎;查询表结构语句 show create table 表名
  • #Word“嵌入式”插图显示不全的解决教程
  • 在Word中使用 Microsoft Print to PDF和另存为PDF两种方式生成的 PDF文件
  • ubuntu24安装TensorRT
  • ubuntu24.04安装IDEA2025.1.2搭建java开发环境
  • 数据结构-链表OJ-回文链表,如何将时间复杂度控制为O(N),空间复杂度控制为O(1)?
  • POI设置Excel单元格背景色
  • DataFrame中.iloc 属性
  • HTAP 技术:融合事务与分析的数据处理新范式
  • 【数据篇】持久化核心:整合 JPA/MyBatis 实现优雅的数据库操作
  • pcie问答--0609
  • 激光隐形切割(Stealth Dicing)技术
  • Oracle数据库对IPv6的支持情况
  • 造成服务器重启的原因都有哪些?
  • Lang*生态系统多个专业框架及他们的作用
  • FTXUI::Dom 模块
  • 足球数据如何驱动 AI 模型进化:从数据采集到智能决策的技术解析
  • PH热榜 | 2025-06-09
  • 小红本批量改写 v1.2.0绿色版
  • 标注工具核心代码解析——def load_image【canvas.py]
  • BeckHoff -->电脑与PLC连接
  • 全微分证明 链式法则 乘法法则 除法法则
  • 基于正点原子阿波罗F429开发板的LWIP应用(6)——SNTP功能和lwiperf测速
  • 第一章 空间解析几何与向量代数 ~ 空间直角坐标系
  • 【Fifty Project - D35】
  • 在线学堂-第二章媒资管理模块上
  • 高效清理C盘
  • Quick BI 自定义组件开发 -- 第一篇 Lifecycle 接口的定义
  • esp_image: invalid segment length 0xffffffff