当前位置: 首页 > news >正文

Docker安装Arroyo流处理引擎

介绍

Arroyo 是一个用 Rust 编写的分布式流处理引擎,旨在高效地对数据流执行有状态计算,有着与Apache Flink、Spanrk Streaming和Kafka Streams相同的实时流计算能力。与传统的批处理处理器不同,流式处理引擎可以运行 在有界和无界源上,一旦结果可用,就立即发出结果。
  • Arroyo 可以在本机读取和写入 JSON、Avro、Parquet 以及原始文本和二进制文件。自定义格式可以使用 UDF 实现。
  • 通过编写 Rust 用户定义的标量、聚合和异步函数来扩展内置 SQL,Python 即将推出。
  • 通过强大的 Arroyo Web UI 管理连接、开发和测试 SQL 查询以及监控管道。
  • 可以使用 REST API 创建、作和管理管道,从而大规模提供声明式编排。
  • Arroyo 随附大量连接器,可轻松集成到您的数据堆栈中
简而言之:Arroyo 允许您以亚秒级结果对大量实时数据提出复杂的问题。
Arroyo 可以自托管,也可以通过 Arroyo Systems 管理的 Arroyo Cloud 服务使用。

特征

  • 在 SQL 中定义的管道,支持复杂的分析查询
  • 可扩展至每秒数百万个事件
  • 有状态作,如 windows 和 joins
  • 用于容错和管道恢复的状态检查点
  • 支持水印的事件时间处理

Docker安装

docker run --rm \--name arroyo \-p 5115:5115 \ghcr.io/arroyosystems/arroyo:latest

默认安装相关配置在~/.config/arroyo目录下设置

Portainer平台安装

在Pull image》Image中输入: ghcr.io/arroyosystems/arroyo:latest,点击"Pull the image"拉取仓库镜像文件;稍等片刻拉取完毕后,Images列表中将会显示镜像信息;
确认镜像已拉取完毕,同样左侧Containers菜单中进入到容器管理面页,点击"Add container"进入添加容器配置界面;
设置docker容器映射端口,Port mapping设置如下:
  • 5115 =》5115:默认访问web ui的http端口;
Arroyo支持TOML或YAML配置,同时也支持将选项以环境变量的方式进行配置,相关配置参考官方: https://doc.arroyo.dev
在Runtime&resources中分配docker容器运行所需要的cpu、内存、内存交接空间等,此处根据实际硬件条件分配即可;
完成上述配置后,点击Deploy the container按钮发布docker容器并启动Arroyo服务,稍等片刻没有错误消息提示,即容器运行正常服务启动成功,如有错误提示,可在Containers列表中点击容器日志进行排查;

Arroyo功能概念

Arroyo提供了可视化的Web UI, 通过REST API与系统交互的Web应用程序。 它允许用户配置系统、创建管道并监控其状态。
Arroyo 的窗口函数实现基于 Apache DataFusion,因此这些窗口函数执行计算派生自DataFusion函数引用。
Arroyo依靠配置数据库来存储已注册的表和管道,并为API和Web UI查询提供注册数据支持。官方支持Postgres和本地默认安装的Sqlite,根据不同的实施规模,具有不同的可伸缩性和可靠性权衡:
  • Postgres 是大规模和分布式集群的推荐选择
  • 建议在本地运行和管道集群时使用 Sqlite
Arroyo支持将流处理检查快照存储到本地文件目录和远程对象存储服务,以便在发生故障时,可以从存储的快照检查点恢复计算数据和流管道,以及在需要时对数据做重新处理;
通常本地文件目录用于Arroyo单机部署后的开发与测试使用,生产环境部署建议使用远程对象存储,如:s3(Amazon)、GCS (Google Cloud)、 和 ABS (Azure),以及 Minio、Cloudflare R2 和 Backblaze B2.
Arroyo会使用匿名的方式采集服务运行的遥测数据,来了解系统的使用情况并帮助确定未来的优先级发展。可以关闭该数据的采集,在运行Arroyo服务时设置DISABLE_TELEMETRY=true,或设置环境变量ARROYO__DISABLE_TELEMETRY=true;
Arroyo基于事件流执行窗口计算逻辑,目前 Arroyo 支持三种类型的窗口:翻滚、滑动(又名跳跃)、会话窗口。
  • TUMBLE():滚动窗口是具有固定大小的连续、不重叠的窗口,每次开窗后在固定时长或大小内获取数据重新流处理,如:开窗10分钟后,重新开窗下一个10分钟。
  • HOP():滑动窗口是滚动窗口的扩展,增加了 “滑动”概念,即按固定时间间隔向后滑动,如:开窗10分钟,每次向后滑动1分钟,重新对10分钟内流事件处理(包含上一个窗口时间重叠部份数据)。
  • SESSION():会话窗口是非固定宽度的窗口,由用户的活动事件创建,如:会话窗口30分钟,获取用户流事件30分钟后,会话窗口关闭,并从下次会话流事件开始打开新的30分钟会话窗口。

访问WebUI

部署启动后通过浏览器访问: http://localhost:5115

流处理示例

通常在使用流处理的场景中,将源源不断的实时数据发送到消息队列中,进行解耦处理,后端应用服务通过读取队列数据进行业务计算与格式化后存储;因此Kafka做为消息队列服务中主流的开源中间件,在大量企业中实际应用,Kafka有着强大性能与稳定性,支持高并发、高吞吐,经过了多年的发展与壮大,已经成为互联网行业的必备消息队列产品,也得到了极大的赞誉;
本示例以向Kafka消息服务发送自定义实时测试数据,在Arroyo上通过SQL语法创建数据表和连接Kafka的管道,通过SQL语句将消息数据从Kafka拉取后,再发送到时序数据库QuestDB中进行存储与分析;
基本流程: 消息 =》 Kafka =》 Arroyo =》 QuestDB

1、创建Kafka数据

构造用于测试的Kafka消息结构示例:
{"create_time":"2025-06-24 20:30:09.903","event":"login","game_id":"笑傲江湖","role":"无","server_id":"s45","uid":148952381}

实时推送测试数据到Kafka是由独立的运行程序执行生成,自已写个Demo代码不难,此处不做详述。

2、创建时序数据表

QuestDB是专为SQL构建的高性能时间序列数据库,支持做为Arroyo流处理数据的实时分析接收器;QuestDB入门参考:《 QuestDB时序数据库快速入门-CSDN博客》
在QuestDB时序数据库中创建以下表。
--删除表
drop table game_event_table;
--创建表
CREATE TABLE game_event_table (  uid LONG,  game_id SYMBOL,role STRING,server_id STRING, event SYMBOL, create_time TIMESTAMP
) TIMESTAMP(create_time) PARTITION BY MONTH;
-- 查询
select * from game_event_table;

3、创建流处理程序

在Arroyo中创建流处理SQL,每5秒将滚动窗口数据推送到QuestDB。
QuestDB支持InfluxDB线路协议,公开了一个可接受ILP数据格式HTTP端点"http://localhost:9000/write";
通过Arroyo内置的webhook组件,将流处理后的数据重新转换成ILP格式,输出到QuestDB时序数据库中。
-- {"create_time":"2025-06-24 20:30:09.903","event":"login","game_id":"笑傲江湖","role":"无","server_id":"s45","uid":148952381}
-- WATERMARK为数据流事件的水印,水印用于事件时间固定偏移量,用来解决数据流到达窗口后的延迟问题
CREATE TABLE game_event_source (create_time TIMESTAMP NOT NULL,  uid BIGINT,event VARCHAR,game_id VARCHAR,role VARCHAR,server_id VARCHAR,WATERMARK FOR create_time AS create_time - INTERVAL '5 seconds'
) WITH (connector = 'kafka',format = 'json',bootstrap_servers = '192.168.1.5:9092',topic = 'game_behavior_arroyo',type = 'source','source.offset' = 'earliest','source.read_mode' = 'read_committed'
);-- 创建输出表(通过webhook推送http请求到questDB时序数据库的/write端点)
CREATE TABLE game_event_sink (value TEXT
) WITH (connector = 'webhook',endpoint = 'http://192.168.1.3:9000/write',format = 'raw_string'
);-- 将Kafka输入源数据读取通过后输出到game_event_sink表(通过webhook服务发送http ILP格式数据到questDb时序数据库)
INSERT INTO game_event_sink
SELECTARRAY_TO_STRING(ARRAY_AGG(CONCAT('game_event_table,','game_id=', game_id, ',','event=', event, ' ','uid=', uid, 'i,','role="', role, '",','server_id="', server_id, '" ',CAST(to_timestamp_nanos(create_time) AS BIGINT))),CHR(10)) AS value
FROM game_event_source
GROUP BY TUMBLE(INTERVAL '5 SECONDS');

4、发布与执行流处理程序

WebUI界面操作效果如下:
Check:对Query查询窗口中的SQL进行预加载,并生成流执行步骤与管道节点,通过可视化节点,观察是否异常
Priview:对Query查询窗口中的SQL进行预览试运行,此时并未正式发布,通过短暂运行检验SQL语法或管道是否存在错误,以便做纠正;
Launch:在Query查询窗口执行Priview无误后,即可点击进行发布,并正式调度运行;
在Launch后,回到Pipelines中查看到前流处理任务的执行状态与运行时间,默认Launch后,无法通过界面重新编辑SQL脚本,如需修改只能进入到流处理任务详情页,通过Stop按钮停止任务后,在回到Pipelines中删除流处理任务,再重新创建;

5、QuestDB查看数据

在QuestDB中通过查询game_event_table表记录,显示已接收到Arroyo通过webhook推送的数据;

参考

https://doc.arroyo.dev

https://questdb.com/blog/arroyo-to-questdb/

http://www.xdnf.cn/news/1069003.html

相关文章:

  • 【C++】std::function是什么
  • 基于STM32的个人健康助手的设计
  • ARM内核之CMSIS
  • 50天50个小项目 (Vue3 + Tailwindcss V4) ✨ | BackgroundSlider(背景滑块)
  • Spring Boot中日志管理与异常处理
  • FPGA笔记——ZYNQ-7020运行PS端的USB 2.0端口作为硬盘
  • SpringBoot(九)--- HttpClient、Spring Cache、Spring Task、WebSocket
  • 鸿蒙OpenHarmony[Disassembler反汇编工具]ArkTS运编译工具链
  • Webpack 核心概念
  • ubuntu22.04可以执行sudo命令,但不在sudo组
  • 通俗易懂解读BPE分词算法实现
  • 【评估指标】IoU 交并比
  • 北斗导航 | 基于CNN-LSTM-PSO算法的接收机自主完好性监测算法
  • <六> k8s + promtail + loki + grafana初探
  • 14.Linux Docker
  • JavaScript逆向工程核心技术解密:反混淆、反调试与加密破解全景指南
  • 【cursor实战】分析python下并行、串行计算性能
  • 【网站内容安全检测】之1:获取网站所有链接sitemap数据
  • 鸿蒙与h5的交互
  • 机器学习01
  • IDEA高效开发指南:JRebel热部署
  • 每日AI资讯速递 | 2025-06-25
  • Django
  • (C++)vector数组相关基础用法(C++教程)(STL库基础教程)
  • 7.Spring框架
  • TensorFlow Lite (TFLite) 和 PyTorch Mobile模型介绍1
  • 什么是功能测试和非功能测试?
  • Azure 托管 Redis 已正式发布
  • 打造属于你的AI智能体,从数据开始 —— 使用 Bright Data MCP+Trae快速构建垂直智能体
  • 【Elasticsearch】es初识,在项目架构中的用途,与mysql和kafka的配合使用,