当前位置: 首页 > news >正文

使用Python和Pandas实现的Snowflake权限检查与SQL生成用于IT审计

import snowflake.connector
import pandas as pddef get_snowflake_permissions():# 连接Snowflake(需要替换实际凭证)conn = snowflake.connector.connect(user='<USER>',password='<PASSWORD>',account='<ACCOUNT>',warehouse='<WAREHOUSE>',role='SECURITYADMIN')# 结果容器results = {'role_grants': [],'table_privileges': [],'views': [],'masking_policies': [],'row_policies': []}# 1. 获取角色继承关系cur = conn.cursor()cur.execute("""SELECT granted_to_role, role_granted FROM SNOWFLAKE.ACCOUNT_USAGE.GRANTS_TO_ROLES WHERE PRIVILEGE = 'USAGE' AND GRANTED_ON = 'ROLE'""")results['role_grants'] = cur.fetchall()# 2. 获取表权限cur.execute("""SELECT grantee, table_catalog, table_schema, table_name, privilege_type FROM INFORMATION_SCHEMA.TABLE_PRIVILEGES""")results['table_privileges'] = cur.fetchall()# 3. 获取视图定义cur.execute("""SELECT table_catalog, table_schema, table_name, view_definition FROM INFORMATION_SCHEMA.VIEWS""")results['views'] = cur.fetchall()# 4. 获取数据掩码策略cur.execute("""SELECT policy_name, policy_body, column FROM TABLE(INFORMATION_SCHEMA.POLICY_REFERENCES())WHERE POLICY_KIND = 'MASKING_POLICY'""")results['masking_policies'] = cur.fetchall()# 5. 获取行访问策略cur.execute("""SELECT policy_name, policy_body, ref_column_name, ref_table_name FROM TABLE(INFORMATION_SCHEMA.POLICY_REFERENCES())WHERE POLICY_KIND = 'ROW_ACCESS_POLICY'""")results['row_policies'] = cur.fetchall()conn.close()return resultsdef generate_descriptions(data):reports = []# 角色继承描述for grant in data['role_grants']:reports.append(f"角色 {grant[1]} 被授予给 {grant[0]},实现权限继承")# 表权限描述for priv in data['table_privileges']:reports.append(f"角色 {priv[0]} 在表 {priv[1]}.{priv[2]}.{priv[3]} 拥有 {priv[4]} 权限")# 视图描述for view in data['views']:reports.append(f"存在限制访问视图 {view[0]}.{view[1]}.{view[2]},定义:{view[3]}")# 数据掩码描述for policy in data['masking_policies']:reports.append(f"列 {policy[2]} 应用数据掩码策略 {policy[0]},策略逻辑:{policy[1]}")# 行策略描述for policy in data['row_policies']:reports.append(f"表 {policy[3]}{policy[2]} 列应用行访问策略 {policy[0]},策略逻辑:{policy[1]}")return reportsdef generate_sql_statements(data):sqls = []# 生成角色继承SQLfor grant in data['role_grants']:sqls.append(f"GRANT ROLE {grant[1]} TO ROLE {grant[0]};")# 生成表权限SQLfor priv in data['table_privileges']:sqls.append(f"GRANT {priv[4]} ON {priv[1]}.{priv[2]}.{priv[3]} TO ROLE {priv[0]};")# 生成数据掩码SQLfor policy in data['masking_policies']:sqls.extend([f"CREATE MASKING POLICY {policy[0]} AS {policy[1]};",f"ALTER TABLE {policy[2].split('.')[:3]} MODIFY COLUMN {policy[2].split('.')[3]} SET MASKING POLICY {policy[0]};"])# 生成行策略SQLfor policy in data['row_policies']:sqls.extend([f"CREATE ROW ACCESS POLICY {policy[0]} AS {policy[1]};",f"ALTER TABLE {policy[3]} ADD ROW ACCESS POLICY {policy[0]} ON ({policy[2]});"])return sqlsif __name__ == "__main__":permission_data = get_snowflake_permissions()print("=== 权限配置描述 ===")for desc in generate_descriptions(permission_data):print(desc)print("\n=== 权限重建SQL ===")for sql in generate_sql_statements(permission_data):print(sql)# 可选:将结果保存为DataFramedf_role_grants = pd.DataFrame(permission_data['role_grants'], columns=['被授权角色', '授权角色'])df_table_priv = pd.DataFrame(permission_data['table_privileges'],columns=['角色', '数据库', '模式', '表', '权限'])

输出示例:

=== 权限配置描述 ===
角色 priv_hr_rw 被授予给 dept_hr,实现权限继承
角色 priv_finance_ro 在表 finance_db.salary.reports 拥有 SELECT 权限
存在限制访问视图 finance_db.salary.v_restricted_salary,定义:SELECT employee_id, department, base_salary...
列 hr_db.employee.contacts.phone 应用数据掩码策略 phone_mask,策略逻辑:CASE WHEN CURRENT_ROLE() IN ('DEPT_SALES') THEN '***-***-' || RIGHT(val, 4)...=== 权限重建SQL ===
GRANT ROLE priv_hr_rw TO ROLE dept_hr;
GRANT SELECT ON finance_db.salary.reports TO ROLE priv_finance_ro;
CREATE MASKING POLICY phone_mask AS (val STRING)...
ALTER TABLE hr_db.employee.contacts MODIFY COLUMN phone SET MASKING POLICY phone_mask;

关键实现逻辑说明:

  1. 数据采集:通过Snowflake系统视图获取五类关键信息

    • 角色继承关系
    • 表级权限分配
    • 视图定义及访问控制
    • 动态数据掩码策略
    • 行级访问策略
  2. 自然语言转换:将原始数据转换为易于理解的描述

    • 使用GRANT ROLE语句解析角色继承
    • 通过视图定义识别列级访问控制
    • 解析策略定义描述安全逻辑
  3. SQL重建:生成可重复执行的权限配置语句

    • 保持原始权限配置的精确重建
    • 处理策略定义中的Lambda表达式
    • 自动生成ALTER语句应用策略

使用注意事项:

  1. 需要确保执行账号具有ACCOUNTADMIN权限
  2. 系统视图数据可能存在最长2小时的延迟
  3. 视图定义中的敏感信息需要进行脱敏处理
  4. 生成的SQL需在测试环境验证后上生产

建议结合Snowflake的ACCESS_HISTORY视图进行权限使用分析,并通过定期运行此脚本实现权限配置的版本化管理。

http://www.xdnf.cn/news/242821.html

相关文章:

  • 利用无事务方式插入数据库解决并发插入问题
  • windows系统搭建自己的ftp服务器,保姆级教程(用户验证+无验证)
  • OkHttp3.X 工具类封装:链式调用,支持HTTPS、重试、文件上传【内含常用设计模式设计示例】
  • 深度学习基础--目标检测入门简介
  • PHP之CURL通过header传参数及接收
  • day12:遗传算法及常见优化算法分享
  • 指针与算法的双人舞:蓝桥杯两道趣味题的降维打击
  • Windows 查看电脑是否插拔过U盘
  • 【业务领域】电脑主板芯片电路结构
  • 【音视频】ffplay数据结构分析
  • C++中常用的十大排序方法之1——冒泡排序
  • 内存安全的攻防战:工具链与语言特性的协同突围
  • SIEMENS PLC程序代码 赋值 + 判断
  • 数值求解Eikonal方程的方法及开源实现
  • 25.4.30数据结构|并查集 路径压缩
  • 《汉诺塔问题的C语言实现》
  • 第十一届蓝桥杯 2020 C/C++组 既约分数
  • RocketMQ常见面试题一
  • 25_04_30Linux架构篇、第1章_02源码编译安装Apache HTTP Server 最新稳定版本是 2.4.62
  • 若依 FastAPI + Vue3 项目 Docker 部署笔记( 启动器打包教程)
  • 华为云Astro大屏连接器创建操作实例:抽取物联网iotda影子设备数据的连接器创建
  • (B题|矿山数据处理问题)2025年第二十二届五一数学建模竞赛(五一杯/五一赛)解题思路|完整代码论文集合
  • 【音频】Qt6实现MP3播放器
  • 深入自制操作系统(一、Bootloader的实现)
  • 微软与Meta大幅增加人工智能基础设施投入
  • AI大模型基础设施:NVIDIA的用于AI大语言模型训练和推理的几款主流显卡
  • Arduino程序函数从入门到精通
  • 中国发布Web3计划:区块链列为核心基础技术,不排除发展加密资产应用!
  • 2025五一杯B题超详细解题思路
  • Qwen3 发布:优化编码与代理能力,强化 MCP 支持引领 AI 新潮流