当前位置: 首页 > java >正文

SQLMesh增量模型实战指南:时间范围分区

引言

在数据工程领域,处理大规模数据集和高频率数据更新是一项挑战。SQLMesh作为一款强大的数据编排工具,提供了增量模型功能,帮助数据工程师高效地管理和更新数据。本文将详细介绍如何使用SQLMesh创建和管理基于时间范围的增量模型,涵盖从开发到生产的完整工作流程。
在这里插入图片描述

需求背景

假设你是一名数据工程师,负责处理一家直接面向客户销售软件的公司的数据。你每天需要处理数百万笔销售交易数据,并且需要将这些数据与产品使用数据进行关联,以更好地理解销售趋势和产品使用情况。

你面临以下挑战:

  • 如何处理延迟到达的数据?
  • 如何处理UTC和PST时间戳的转换?
  • 应该在什么时间运行这些任务?
  • 如何测试这些数据?
  • 如何高效地运行增量更新?
  • 如何处理边缘情况下的历史数据错误?
  • 如何编写单元测试?
  • 如何确保生产环境的数据完整性?

本文将通过一个完整的示例,展示如何使用SQLMesh解决这些问题。

开发工作流程

在SQLMesh中,典型的开发工作流程如下:

  1. sqlmesh plan dev: 创建一个新的开发环境
  2. sqlmesh fetchdf: 在开发环境中预览数据
  3. sqlmesh create_external_models: 自动生成原始源表的列级血缘文档
  4. sqlmesh plan: 将模型从开发环境推广到生产环境
  5. sqlmesh plan dev --forward-only: 在开发环境中进行代码更改,并仅处理新数据
  6. sqlmesh fetchdf: 在开发环境中预览更改后的数据
  7. sqlmesh create_test: 自动生成单元测试
  8. sqlmesh test: 运行单元测试
  9. sqlmesh plan: 将更改推广到生产环境

环境设置

我们将从一个现有的SQLMesh项目开始,该项目已经包含一些生产模型。假设我们已经有以下原始数据表:

原始产品使用数据

product_idcustomer_idlast_usage_dateusage_countfeature_utilization_scoreuser_segment
PROD-101CUST-0012024-10-25 23:45:00+001200.85enterprise
PROD-103CUST-0012024-10-27 12:30:00+00950.75enterprise

原始销售数据

transaction_idproduct_idcustomer_idtransaction_amounttransaction_timestamppayment_methodcurrency
TX-001PROD-101CUST-00199.992024-10-25 08:30:00+00credit_cardUSD
TX-002PROD-102CUST-002149.992024-10-25 09:45:00+00paypalUSD

模型配置

我们将创建一个增量模型demo.incrementals_demo,该模型按天分区,并处理销售数据和产品使用数据的关联。

MODEL(name="demo.incrementals_demo",kind=INCREMENTAL_BY_TIME_RANGE(time_column="transaction_date",lookback=2,  # 处理过去2天的延迟数据),start="2024-10-25",  # 不回填此日期之前的数据cron="@daily",  # 每天午夜UTC运行grain="transaction_id",  # 主键audits=[UNIQUE_VALUES(columns=("transaction_id",)),NOT_NULL(columns=("transaction_id",)),]
)WITH sales_data AS (SELECTtransaction_id,product_id,customer_id,transaction_amount,transaction_timestamp,payment_method,currencyFROM sqlmesh-public-demo.tcloud_raw_data.salesWHERE transaction_timestamp BETWEEN @start_dt AND @end_dt
),product_usage AS (SELECTproduct_id,customer_id,last_usage_date,usage_count,feature_utilization_score,user_segmentFROM sqlmesh-public-demo.tcloud_raw_data.product_usageWHERE last_usage_date BETWEEN DATE_SUB(@start_dt, INTERVAL 30 DAY) AND @end_dt
)SELECTs.transaction_id,s.product_id,s.customer_id,s.transaction_amount,DATE(s.transaction_timestamp) as transaction_date,DATETIME(s.transaction_timestamp, 'America/Los_Angeles') as transaction_timestamp_pst,s.payment_method,s.currency,p.last_usage_date,p.usage_count,p.feature_utilization_score,p.user_segment,CASEWHEN p.usage_count > 100 AND p.feature_utilization_score > 0.8 THEN 'Power User'WHEN p.usage_count > 50 THEN 'Regular User'WHEN p.usage_count IS NULL THEN 'New User'ELSE 'Light User'END as user_type,DATE_DIFF(s.transaction_timestamp, p.last_usage_date, DAY) as days_since_last_usage
FROM sales_data s
LEFT JOIN product_usage pON s.product_id = p.product_idAND s.customer_id = p.customer_id

创建模型

首次创建模型时,我们需要将其添加到开发环境中:

sqlmesh plan dev

按照提示输入回填的起始和结束日期,SQLMesh将自动创建物理表并执行初始数据加载。

跟踪列级血缘

SQLMesh可以自动生成外部模型文档,记录原始表的列信息和数据类型:

sqlmesh create_external_models

通过SQLMesh UI,可以直观地查看列级血缘关系。

进行更改

假设我们需要调整“Power User”的定义,将阈值从100次使用调整为50次使用。我们可以使用--forward-only标志,仅对新数据应用更改:

sqlmesh plan dev --forward-only

SQLMesh会生成一个预览表,允许我们在开发环境中测试更改,而不会影响历史数据。

添加单元测试

使用sqlmesh create_test命令可以自动生成单元测试配置文件:

sqlmesh create_test demo.incrementals_demo \--query sqlmesh-public-demo.tcloud_raw_data.product_usage "select * from sqlmesh-public-demo.tcloud_raw_data.product_usage where customer_id='CUST-001'" \--query sqlmesh-public-demo.tcloud_raw_data.sales "select * from sqlmesh-public-demo.tcloud_raw_data.sales where customer_id='CUST-001'" \--var start_dt '2024-10-25' \--var end_dt '2024-10-27'

运行单元测试:

sqlmesh test

推广到生产环境

确认开发环境中的更改无误后,可以将其推广到生产环境:

sqlmesh plan

SQLMesh会自动处理模式演进和数据回填,确保生产环境的数据完整性。

总结

通过本文的示例,我们展示了如何使用SQLMesh创建和管理基于时间范围的增量模型。SQLMesh的优势在于:

  • 自动处理数据分区,提高查询效率
  • 支持增量更新,减少资源消耗
  • 提供强大的测试和验证工具,确保数据质量
  • 简化开发到生产的流程,减少人为错误

希望这篇指南能帮助你更好地理解和使用SQLMesh,提升数据工程的效率和准确性。

注意:本文基于SQLMesh官方文档和示例编写,实际操作中请参考最新版本的SQLMesh文档。

http://www.xdnf.cn/news/2917.html

相关文章:

  • 对比测评:为什么AI编程工具需要 Rules 能力?
  • 借助云蝠智能大模型呼叫,打造企业招聘竞争力
  • 如何强制触发 OpenShift 节点的 MachineConfig 更新
  • GC的查看
  • MySQL事务隔离级别的实现原理MVCC
  • 今日行情明日机会——20250429
  • UEC++第15天|番茄插件、实现跳跃、实现背景运动
  • JavaScript高级进阶(五)
  • 豪越科技消防立库方案:实现应急物资高效管理
  • 链表的中间节点
  • 机器视觉算法与应用:企业级开发全流程详解
  • Kotlin -> lateinit 和 lazy 详解
  • 嵌入式音视频实时通话EasyRTC打造设备安装与调试的高效远程解决方案
  • Nginx反向代理的负载均衡配置
  • Python入门:流程控制练习
  • 数据编辑器中变量的精妙计算与灵动转换​
  • 汽车启动原理是什么?
  • 水库现代化建设指南-水库运管矩阵管理系统建设方案
  • Linux环境变量的作用以及进程的虚拟地址原理
  • 【Tips】高效文献管理:Zotero 导入参考文献的多种方式详解
  • 【PyTorch动态计算图实战解析】从原理到高效开发
  • CSS in JS:机遇与挑战的思考
  • Java | 韩顺平 循序渐进学Java自用笔记---OOP高级
  • EPSG的作用
  • C++23中的std::forward_like:完美转发的增强
  • 第十六届蓝桥杯 2025 C/C++B组第一轮省赛 全部题解(未完结)
  • 探索目标检测:边界框与锚框的奥秘
  • leetcode 21. 合并两个有序链表(c++解法+相关知识点复习)
  • 目标检测和目标跟踪的区别与联系
  • 大前端开发——前端知识渐变分层讲解 利用金字塔原理简化前端知识体系