多模态数据解压-Parquet
解压多模态音频文本的数据到指定文件夹。
- https://github.com/ParquetViewer
- https://huggingface.co/datasets/openai-whisper-SLR
查看数据格式
import sys
import subprocess# 自动安装 pandas(如未安装)
def ensure_package(pkg):try:__import__(pkg)except ImportError:print(f"[+] 正在自动安装 {pkg} ...")subprocess.check_call([sys.executable, "-m", "pip", "install", pkg])ensure_package("pandas")
ensure_package("pyarrow") # pandas 读 parquet 需要 pyarrowimport pandas as pd
df = pd.read_parquet(r"f:\dataset\train-00001-of-00003.parquet")
print("字段名:", df.columns)
print("前5行:")
print(df.head())
#print(df['audio'].iloc[0])
print(df['sentence'].iloc[0])
输出内容
字段名: Index(['file_id', 'sentence', 'audio', '__index_level_0__'], dtype='object')
前5行:file_id ... __index_level_0__
0 bur_7712_8367089238 ... 465
1 bur_7447_9491098611 ... 1447
2 bur_4632_0098345295 ... 2501
3 bur_9762_5479387529 ... 1907
4 bur_5189_0472171856 ... 1104[5 rows x 4 columns]
တစ်ခါတည်း တိုင်လုံးကျော် ကို ဖြဲလိုက် ဟောက်လိုက် ဟိန်းလိုက်ကြတာမှ
指定格式数据解压
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
批量从 Parquet 中提取音频和对应文本,并按照原始文件路径结构存放。
支持指定输入文件和输出目录;如果不传参,将使用脚本中默认路径。
"""
import subprocess
import sys
import os
import io
import argparse# 默认路径,可根据需要修改
DEFAULT_PARQUET = r"f:\dataset\train-00001-of-00003.parquet"
DEFAULT_OUTPUT = r"F:\dataset"# 安装依赖函数
def install_if_missing(pkg):try:__import__(pkg)except ImportError:print(f"[+] Installing {pkg}…")subprocess.check_call([sys.executable, "-m", "pip", "install", pkg])# 安装依赖
for pkg in ("pyarrow", "pandas", "soundfile", "requests"):install_if_missing(pkg)# 导入必需库
import pyarrow.parquet as pq
import pandas as pd
import soundfile as sf
import requests# 参数解析
parser = argparse.ArgumentParser(description="Extract audio and text from Parquet, preserving original structure."
)
parser.add_argument("--parquet", default=DEFAULT_PARQUET,help=f"Parquet 文件路径,默认: {DEFAULT_PARQUET}"
)
parser.add_argument("--output", default=DEFAULT_OUTPUT,help=f"输出根目录,默认: {DEFAULT_OUTPUT}"
)
args = parser.parse_args()parquet_path = args.parquet
out_base_dir = args.outputprint(f"📂 使用 Parquet: {parquet_path}")
print(f"📂 输出目录: {out_base_dir}")# 读取 Parquet
try:df = pq.read_table(parquet_path, columns=["file_id", "sentence", "audio"]).to_pandas()
except Exception as e:print(f"❌ 无法读取 Parquet: {e}")sys.exit(1)# 遍历并导出数据
for _, row in df.iterrows():fid = row["file_id"]sentence = row["sentence"]audio = row["audio"]# 确定原始子路径sub_path = Noneif isinstance(audio, dict) and audio.get("path"):orig_path = audio["path"].replace("\\", "/")sub_path = os.path.dirname(orig_path)target_dir = os.path.join(out_base_dir, sub_path) if sub_path else out_base_diros.makedirs(target_dir, exist_ok=True)# 导出 WAVwav_path = os.path.join(target_dir, f"{fid}.wav")if isinstance(audio, dict) and audio.get("bytes") is not None:with open(wav_path, "wb") as f:f.write(audio["bytes"])elif isinstance(audio, dict) and audio.get("array") is not None:sf.write(wav_path, audio["array"], audio["sampling_rate"])elif isinstance(audio, dict) and audio.get("path"):path = audio["path"]if path.startswith("http"):data = requests.get(path).contentelse:with open(path, "rb") as rf:data = rf.read()with open(wav_path, "wb") as f:f.write(data)else:print(f"⚠️ Unsupported audio format for {fid}")# 导出文本txt_path = os.path.join(target_dir, f"{fid}.txt")with open(txt_path, "w", encoding="utf-8") as f:f.write(sentence)print("✅ 全部文件已按原始目录结构导出到:", out_base_dir)
通用数据集解压
#!/usr/bin/env python
# -*- coding: utf-8 -*-import sys
import subprocess
import osdef ensure_package(pkg):try:__import__(pkg)except ImportError:print(f"[+] Installing {pkg}...")subprocess.check_call([sys.executable, "-m", "pip", "install", pkg])for pkg in ("pyarrow", "pandas"):ensure_package(pkg)import pyarrow.parquet as pq
import pandas as pdDEFAULT_PARQUET = r"f:\dataset\train-00001-of-00003.parquet"
DEFAULT_OUTPUT = r"F:\dataset"
ID_COL = "file_id" # 你自己的主键def get_ext_from_path(path):path = str(path)if "." in path:return "." + path.split(".")[-1].lower()return ""def smart_text_ext(val):if not isinstance(val, str):return ".txt"s = val.strip()if s.startswith("{") or s.startswith("["):return ".json"if s.lower().startswith("<html"):return ".html"if s.lower().startswith("<?xml") or (s.startswith("<") and s.endswith(">")):return ".xml"if s.startswith("WEBVTT"):return ".vtt"if s.startswith("[Script Info]"):return ".ass"if s.count("\n") > 1 and all(x in s for x in ["0:", "1:", "2:"]):return ".srt"if s.startswith("---") or "# " in s:return ".md"if "," in s and "\n" in s and not s.startswith("{"):return ".csv"if s.lower().startswith("%pdf"):return ".pdf"return ".txt"parquet_path = DEFAULT_PARQUET
out_dir = DEFAULT_OUTPUT
os.makedirs(out_dir, exist_ok=True)table = pq.read_table(parquet_path)
df = table.to_pandas()
print("==== 字段结构 ====")
print(table.schema)
print("==== 示例 ====")
print(df.head())fields = df.columns.tolist()for idx, row in df.iterrows():sid = str(row[ID_COL]) if ID_COL in row and not pd.isnull(row[ID_COL]) else str(idx)path_refs = {}# 先导出所有二进制内容(图像/音频/视频等)for fld in fields:val = row[fld]if isinstance(val, dict) and "bytes" in val and val["bytes"] and "path" in val and val["path"]:path_val = val["path"].replace("\\", "/")path_refs[fld] = path_val # 用于后续文本配对file_path = os.path.join(out_dir, path_val)os.makedirs(os.path.dirname(file_path), exist_ok=True)with open(file_path, "wb") as f:f.write(val["bytes"])# 再导出所有文本/结构化内容(txt/json/csv/html等)for fld in fields:val = row[fld]if isinstance(val, dict): continue# 是否和已有二进制字段同名(配对,保持同目录同名)pair_found = Falsefor k, v in path_refs.items():if fld.startswith(k) or k.startswith(fld) or fld in k or k in fld:base, _ = os.path.splitext(v)ext = smart_text_ext(val)file_path = os.path.join(out_dir, base + ext)os.makedirs(os.path.dirname(file_path), exist_ok=True)with open(file_path, "w", encoding="utf-8") as f:f.write(str(val))pair_found = Truebreakif not pair_found:# 其它文本/数字字段单独存if isinstance(val, str):ext = smart_text_ext(val)fld_dir = os.path.join(out_dir, fld)os.makedirs(fld_dir, exist_ok=True)file_path = os.path.join(fld_dir, f"{sid}{ext}")with open(file_path, "w", encoding="utf-8") as f:f.write(val)elif isinstance(val, (int, float, bool)):fld_dir = os.path.join(out_dir, fld)os.makedirs(fld_dir, exist_ok=True)file_path = os.path.join(fld_dir, f"{sid}.txt")with open(file_path, "w", encoding="utf-8") as f:f.write(str(val))elif isinstance(val, (bytes, bytearray)):fld_dir = os.path.join(out_dir, fld)os.makedirs(fld_dir, exist_ok=True)file_path = os.path.join(fld_dir, f"{sid}.bin")with open(file_path, "wb") as f:f.write(val)if idx % 100 == 0:print(f"已处理 {idx} 条")print(f"✅ 所有内容已批量还原导出到:{out_dir}")