Docker安装Arroyo流处理引擎
介绍
Arroyo 是一个用 Rust 编写的分布式流处理引擎,旨在高效地对数据流执行有状态计算,有着与Apache Flink、Spanrk Streaming和Kafka Streams相同的实时流计算能力。与传统的批处理处理器不同,流式处理引擎可以运行 在有界和无界源上,一旦结果可用,就立即发出结果。
- Arroyo 可以在本机读取和写入 JSON、Avro、Parquet 以及原始文本和二进制文件。自定义格式可以使用 UDF 实现。
- 通过编写 Rust 用户定义的标量、聚合和异步函数来扩展内置 SQL,Python 即将推出。
- 通过强大的 Arroyo Web UI 管理连接、开发和测试 SQL 查询以及监控管道。
- 可以使用 REST API 创建、作和管理管道,从而大规模提供声明式编排。
- Arroyo 随附大量连接器,可轻松集成到您的数据堆栈中
简而言之:Arroyo 允许您以亚秒级结果对大量实时数据提出复杂的问题。

Arroyo 可以自托管,也可以通过 Arroyo Systems 管理的 Arroyo Cloud 服务使用。
特征
- 在 SQL 中定义的管道,支持复杂的分析查询
- 可扩展至每秒数百万个事件
- 有状态作,如 windows 和 joins
- 用于容错和管道恢复的状态检查点
- 支持水印的事件时间处理
Docker安装
docker run --rm \--name arroyo \-p 5115:5115 \ghcr.io/arroyosystems/arroyo:latest
默认安装相关配置在~/.config/arroyo目录下设置
Portainer平台安装
在Pull image》Image中输入: ghcr.io/arroyosystems/arroyo:latest,点击"Pull the image"拉取仓库镜像文件;稍等片刻拉取完毕后,Images列表中将会显示镜像信息;

确认镜像已拉取完毕,同样左侧Containers菜单中进入到容器管理面页,点击"Add container"进入添加容器配置界面;

设置docker容器映射端口,Port mapping设置如下:
- 5115 =》5115:默认访问web ui的http端口;
Arroyo支持TOML或YAML配置,同时也支持将选项以环境变量的方式进行配置,相关配置参考官方: https://doc.arroyo.dev
在Runtime&resources中分配docker容器运行所需要的cpu、内存、内存交接空间等,此处根据实际硬件条件分配即可;

完成上述配置后,点击Deploy the container按钮发布docker容器并启动Arroyo服务,稍等片刻没有错误消息提示,即容器运行正常服务启动成功,如有错误提示,可在Containers列表中点击容器日志进行排查;
Arroyo功能概念
Arroyo提供了可视化的Web UI, 通过REST API与系统交互的Web应用程序。 它允许用户配置系统、创建管道并监控其状态。
Arroyo 的窗口函数实现基于 Apache DataFusion,因此这些窗口函数执行计算派生自DataFusion函数引用。
Arroyo依靠配置数据库来存储已注册的表和管道,并为API和Web UI查询提供注册数据支持。官方支持Postgres和本地默认安装的Sqlite,根据不同的实施规模,具有不同的可伸缩性和可靠性权衡:
- Postgres 是大规模和分布式集群的推荐选择
- 建议在本地运行和管道集群时使用 Sqlite
Arroyo支持将流处理检查快照存储到本地文件目录和远程对象存储服务,以便在发生故障时,可以从存储的快照检查点恢复计算数据和流管道,以及在需要时对数据做重新处理;
通常本地文件目录用于Arroyo单机部署后的开发与测试使用,生产环境部署建议使用远程对象存储,如:s3(Amazon)、GCS (Google Cloud)、 和 ABS (Azure),以及 Minio、Cloudflare R2 和 Backblaze B2.
Arroyo会使用匿名的方式采集服务运行的遥测数据,来了解系统的使用情况并帮助确定未来的优先级发展。可以关闭该数据的采集,在运行Arroyo服务时设置DISABLE_TELEMETRY=true,或设置环境变量ARROYO__DISABLE_TELEMETRY=true;
Arroyo基于事件流执行窗口计算逻辑,目前 Arroyo 支持三种类型的窗口:翻滚、滑动(又名跳跃)、会话窗口。
- TUMBLE():滚动窗口是具有固定大小的连续、不重叠的窗口,每次开窗后在固定时长或大小内获取数据重新流处理,如:开窗10分钟后,重新开窗下一个10分钟。
- HOP():滑动窗口是滚动窗口的扩展,增加了 “滑动”概念,即按固定时间间隔向后滑动,如:开窗10分钟,每次向后滑动1分钟,重新对10分钟内流事件处理(包含上一个窗口时间重叠部份数据)。
- SESSION():会话窗口是非固定宽度的窗口,由用户的活动事件创建,如:会话窗口30分钟,获取用户流事件30分钟后,会话窗口关闭,并从下次会话流事件开始打开新的30分钟会话窗口。
访问WebUI
部署启动后通过浏览器访问: http://localhost:5115

流处理示例
通常在使用流处理的场景中,将源源不断的实时数据发送到消息队列中,进行解耦处理,后端应用服务通过读取队列数据进行业务计算与格式化后存储;因此Kafka做为消息队列服务中主流的开源中间件,在大量企业中实际应用,Kafka有着强大性能与稳定性,支持高并发、高吞吐,经过了多年的发展与壮大,已经成为互联网行业的必备消息队列产品,也得到了极大的赞誉;
本示例以向Kafka消息服务发送自定义实时测试数据,在Arroyo上通过SQL语法创建数据表和连接Kafka的管道,通过SQL语句将消息数据从Kafka拉取后,再发送到时序数据库QuestDB中进行存储与分析;
基本流程: 消息 =》 Kafka =》 Arroyo =》 QuestDB
1、创建Kafka数据
构造用于测试的Kafka消息结构示例:
{"create_time":"2025-06-24 20:30:09.903","event":"login","game_id":"笑傲江湖","role":"无","server_id":"s45","uid":148952381}
实时推送测试数据到Kafka是由独立的运行程序执行生成,自已写个Demo代码不难,此处不做详述。
2、创建时序数据表
QuestDB是专为SQL构建的高性能时间序列数据库,支持做为Arroyo流处理数据的实时分析接收器;QuestDB入门参考:《 QuestDB时序数据库快速入门-CSDN博客》
在QuestDB时序数据库中创建以下表。
--删除表
drop table game_event_table;
--创建表
CREATE TABLE game_event_table ( uid LONG, game_id SYMBOL,role STRING,server_id STRING, event SYMBOL, create_time TIMESTAMP
) TIMESTAMP(create_time) PARTITION BY MONTH;
-- 查询
select * from game_event_table;
3、创建流处理程序
在Arroyo中创建流处理SQL,每5秒将滚动窗口数据推送到QuestDB。
QuestDB支持InfluxDB线路协议,公开了一个可接受ILP数据格式HTTP端点"http://localhost:9000/write";
通过Arroyo内置的webhook组件,将流处理后的数据重新转换成ILP格式,输出到QuestDB时序数据库中。
-- {"create_time":"2025-06-24 20:30:09.903","event":"login","game_id":"笑傲江湖","role":"无","server_id":"s45","uid":148952381}
-- WATERMARK为数据流事件的水印,水印用于事件时间固定偏移量,用来解决数据流到达窗口后的延迟问题
CREATE TABLE game_event_source (create_time TIMESTAMP NOT NULL, uid BIGINT,event VARCHAR,game_id VARCHAR,role VARCHAR,server_id VARCHAR,WATERMARK FOR create_time AS create_time - INTERVAL '5 seconds'
) WITH (connector = 'kafka',format = 'json',bootstrap_servers = '192.168.1.5:9092',topic = 'game_behavior_arroyo',type = 'source','source.offset' = 'earliest','source.read_mode' = 'read_committed'
);-- 创建输出表(通过webhook推送http请求到questDB时序数据库的/write端点)
CREATE TABLE game_event_sink (value TEXT
) WITH (connector = 'webhook',endpoint = 'http://192.168.1.3:9000/write',format = 'raw_string'
);-- 将Kafka输入源数据读取通过后输出到game_event_sink表(通过webhook服务发送http ILP格式数据到questDb时序数据库)
INSERT INTO game_event_sink
SELECTARRAY_TO_STRING(ARRAY_AGG(CONCAT('game_event_table,','game_id=', game_id, ',','event=', event, ' ','uid=', uid, 'i,','role="', role, '",','server_id="', server_id, '" ',CAST(to_timestamp_nanos(create_time) AS BIGINT))),CHR(10)) AS value
FROM game_event_source
GROUP BY TUMBLE(INTERVAL '5 SECONDS');
4、发布与执行流处理程序
WebUI界面操作效果如下:
Check:对Query查询窗口中的SQL进行预加载,并生成流执行步骤与管道节点,通过可视化节点,观察是否异常
Priview:对Query查询窗口中的SQL进行预览试运行,此时并未正式发布,通过短暂运行检验SQL语法或管道是否存在错误,以便做纠正;
Launch:在Query查询窗口执行Priview无误后,即可点击进行发布,并正式调度运行;

在Launch后,回到Pipelines中查看到前流处理任务的执行状态与运行时间,默认Launch后,无法通过界面重新编辑SQL脚本,如需修改只能进入到流处理任务详情页,通过Stop按钮停止任务后,在回到Pipelines中删除流处理任务,再重新创建;

5、QuestDB查看数据
在QuestDB中通过查询game_event_table表记录,显示已接收到Arroyo通过webhook推送的数据;

参考
https://doc.arroyo.dev
https://questdb.com/blog/arroyo-to-questdb/