当前位置: 首页 > java >正文

ERA5的UV合并成矢量并按时间维度转为nc或tif

转为nc

# -*- coding: utf-8 -*-
import os
import xarray as xr
import pandas as pd
import multiprocessing as muldef era5_hourly_wind_compose_array(args):inpath, outpath, variable_name1, variable_name2, ini_num, end_num = argsfor i in range(ini_num, end_num):year = 1981 + iyear_dir = "y" + str(year)year_path = os.path.join(inpath, year_dir)# 读取文件并按名称排序nc_files = sorted(os.listdir(year_path))# 创建输出年份子文件夹output_year_path = os.path.join(outpath, year_dir)os.makedirs(output_year_path, exist_ok=True)for j in range(0, len(nc_files), 2):# 确保成对读取文件file_u = os.path.join(year_path, nc_files[j])file_v = os.path.join(year_path, nc_files[j+1])data1 = xr.open_dataset(file_u)data2 = xr.open_dataset(file_v)time = data1['valid_time']time_pd = time.to_pandas()data_var1 = data1[variable_name1]data_var2 = data2[variable_name2]for k in range(len(data_var1)):# 计算矢量速度data_arr1 = data_var1[k]data_arr2 = data_var2[k]data_arr = (data_arr1.data ** 2 + data_arr2.data ** 2) ** 0.5  # Added .data here# 创建新 Dataset 并命名变量为 'vector'ds = xr.Dataset({'vector': (['latitude', 'longitude'], data_arr)}, coords={'latitude': data_arr1.latitude.data,  # Added .data here'longitude': data_arr1.longitude.data,  # Added .data here'time': time[k].data  # Added .data here})# 构建输出文件名outname = os.path.join(output_year_path, f"wind_{str(time_pd.index[k])[:10]}_{str(time_pd.index[k])[11:13]}.nc")# 保存为 NetCDF 文件ds.to_netcdf(outname)print(f"{outname} has been converted!")if __name__ == "__main__":inpath = r"inpath/"outpath = r"outpath/"variable_name1 = "u10"variable_name2 = "v10"ini_num = 34 end_num = 35 num_processes = 50  #era5_hourly_wind_compose_array((inpath, outpath, variable_name1, variable_name2, ini_num, end_num))num_processes = min(50, os.cpu_count())  total_years = end_num - ini_numyears_per_process = total_years // num_processesargs_list = []for i in range(num_processes):start = ini_num + i * years_per_processend = start + years_per_process if i < num_processes - 1 else end_numargs_list.append((inpath, outpath, variable_name1, variable_name2, start, end))# 使用进程池并行处理with mul.Pool(processes=num_processes) as pool:pool.map(era5_hourly_wind_compose_array, args_list)

转tif

import os
import xarray as xr
import pandas as pd
import multiprocessing as mul
import numpy as np
from rasterio.transform import from_origin
import rasteriodef save_as_geotiff(data_array, lons, lats, filename):"""将数据保存为 GeoTIFF 文件"""# 计算仿射变换参数transform = from_origin(lons.min(),  # 左边界经度lats.max(),  # 上边界纬度(lons.max() - lons.min()) / len(lons),  # 经度分辨率(lats.max() - lats.min()) / len(lats)   # 纬度分辨率)# 保存为 GeoTIFFwith rasterio.open(filename,'w',driver='GTiff',height=len(lats),width=len(lons),count=1,dtype=data_array.dtype,crs='EPSG:4326',  # WGS84 坐标系transform=transform,) as dst:dst.write(data_array, 1)  # 写入数据到第1波段def era5_hourly_wind_compose_array(args):inpath, outpath, variable_name1, variable_name2, start_year, end_year = argsfor year in range(start_year, end_year + 1):year_dir = "y" + str(year)year_path = os.path.join(inpath, year_dir)# 检查年份目录是否存在if not os.path.exists(year_path):print(f"Warning: Directory {year_path} does not exist, skipping...")continue# 读取文件并按名称排序nc_files = sorted([f for f in os.listdir(year_path) if f.endswith('.nc')])# 创建输出年份子文件夹output_year_path = os.path.join(outpath, year_dir)os.makedirs(output_year_path, exist_ok=True)for j in range(0, len(nc_files), 2):# 确保成对读取文件if j + 1 >= len(nc_files):print(f"Warning: Missing pair for file {nc_files[j]} in {year_path}")continuefile_u = os.path.join(year_path, nc_files[j])file_v = os.path.join(year_path, nc_files[j+1])try:data1 = xr.open_dataset(file_u)data2 = xr.open_dataset(file_v)time = data1['valid_time']time_pd = time.to_pandas()data_var1 = data1[variable_name1]data_var2 = data2[variable_name2]for k in range(len(data_var1)):# 计算矢量风速data_arr1 = data_var1[k].valuesdata_arr2 = data_var2[k].valueswind_speed = np.sqrt(data_arr1**2 + data_arr2**2)# 获取经纬度坐标lons = data_var1.longitude.valueslats = data_var1.latitude.values# 构建输出文件名outname = os.path.join(output_year_path, f"wind_{str(time_pd.index[k])[:10]}_{str(time_pd.index[k])[11:13]}.tif")# 保存为 GeoTIFFsave_as_geotiff(wind_speed, lons, lats, outname)print(f"{outname} has been converted!")except Exception as e:print(f"Error processing files {file_u} and {file_v}: {str(e)}")continueif __name__ == "__main__":inpath = r"inpath/"outpath = r"outpath/tif"variable_name1 = "u10"variable_name2 = "v10"start_year = 2015  end_year = 2016   num_processes = min(50, os.cpu_count())  # 使用CPU核心数或50中的较小值# 计算每个进程处理的年数total_years = end_year - start_year + 1years_per_process = max(1, total_years // num_processes)# 创建参数列表args_list = []for i in range(num_processes):current_start = start_year + i * years_per_processcurrent_end = min(current_start + years_per_process - 1, end_year)# 确保最后一个进程处理剩余的所有年份if i == num_processes - 1:current_end = end_yearif current_start > end_year:breakargs_list.append((inpath, outpath, variable_name1, variable_name2, current_start, current_end))# 使用进程池并行处理with mul.Pool(processes=num_processes) as pool:pool.map(era5_hourly_wind_compose_array, args_list)

 

http://www.xdnf.cn/news/15295.html

相关文章:

  • 【版本控制】Perforce Helix Core (P4V) 完全入门指南(含虚幻引擎实战)
  • Spring Boot 集成 Spring Security 完整示例
  • C++ 单例模式实现
  • 牛客周赛 Round 100
  • AB实验评估指标体系之【实验评估指标体系】
  • 【Linux | 网络】应用层(HTTP)
  • RabbitMQ 之仲裁队列
  • 决策树的相关理论学习
  • 慢慢理解this
  • Dify离线安装包-集成全部插件、模板和依赖组件,方便安可内网使用
  • Matlab批量转换1km降水数据为tiff格式
  • 业务访问控制-ACL与包过滤
  • Qt窗口:QToolBar、QStatusBar、QDockWidget、QDialog
  • vue3 ref vs reactive值的修改
  • es里为什么node和shard不是一对一的关系
  • Git 使用笔记
  • 使用Starrocks替换Clickhouse的理由
  • SPSSPRO:数据分析市场SaaS挑战者的战略分析
  • 香港服务器Python自动化巡检脚本开发与邮件告警集成
  • 【Linux】线程机制深度实践:创建、等待、互斥与同步
  • 网络协议学习思维导图
  • python爬取新浪财经网站上行业板块股票信息的代码
  • java进阶(二)+学习笔记
  • 【算法】递归、搜索与回溯
  • Datawhale AI 夏令营2025科大讯飞AI大赛<夏令营:用AI做带货视频评论分析>
  • [Nagios Core] CGI接口 | 状态数据管理.dat | 性能优化
  • jenkins部署前端vue项目使用Docker+Jenkinsfile方式
  • 【星闪】Hi2821 | SDK开发入门,应用启动流程,创建自己的应用
  • 大模型聊天模板
  • 在人工智能自动化编程时代:AI驱动开发和传统软件开发的分析对比