当前位置: 首页 > java >正文

灾情分析报告数据集制作

项目目标: 将本地多年灾情分析行业报告转换为高质量的 Prompt-Response 数据集,用于微调语言模型。

核心过程: 文本提取 -> 结构化解析 -> 信息抽取 -> Prompt-Response 格式化 -> 数据集划分与验证。

1. 前期准备

  • 原始数据: 60篇行业报告 + 24篇综合报告(主要为 PDF 和 Word 格式,包含扫描件)。
  • 工具与库:
    • Python 3.8+
    • pip install python-docx PyMuPDF pdfminer.six pandas tqdm jsonlines scikit-learn
    • OCR 工具 (如果包含扫描件): PaddleOCRTesseract-OCR (需要安装对应的软件和语言包,并通过 pip install paddleocrpip install pytesseract 调用)。
    • 表格提取工具 (如果报告中表格重要且复杂): camelottabula-py (需要 Java 环境)。
  • 领域知识: 报告结构、常用术语、关键信息类型(如灾害类型、时间、地点、影响指标、应对措施等)。

2. 数据集制作流程

阶段 A: 数据获取与管理

  • 步骤 1.1: 数据收集与整理
    • 将所有报告文件收集到一个统一的目录结构中。
    • 为每个报告文件建立一个唯一的标识(如文件名或分配一个ID)。
    • 记录文件的元数据(如文件名、原始格式、可能的年份/日期信息)。
  • 步骤 1.2: 文件格式分类
    • 区分 PDF、Word、扫描件 PDF 等不同格式的文件,以便使用不同的提取工具。

阶段 B: 文本提取与初步处理

  • 步骤 2.1: 从 Word 文件提取文本
    • 使用 python-docx 库读取 .docx 文件并提取文本内容。
    • 注意:python-docx 不支持旧版 .doc 文件,需要先转换为 .docx。复杂格式(如文本框、图表内的文本)可能无法提取。
  • 步骤 2.2: 从 PDF 文件提取文本
    • 使用 PyMuPDF (更快、更准确,尤其适合结构化 PDF) 或 pdfminer.six (提供更多底层结构信息)。
    • 注意:PDF 格式自由,提取的文本顺序可能混乱,特别是对于多栏布局或包含大量图表的页面。
  • 步骤 2.3: 从扫描件 PDF 提取文本 (OCR)
    • 对扫描件 PDF 或图片格式的报告进行 OCR。
    • 工具:PaddleOCR (对中文支持较好) 或 Tesseract-OCR
    • OCR 结果可能包含识别错误,需要后期清洗。
  • 步骤 2.4: 合并与初步清洗
    • 将不同来源提取的文本合并。
    • 进行初步清洗:去除常见的页眉页脚文本、页码、连续的空白行、特殊符号、乱码等。
    • 将每个报告的提取文本保存为独立的 .txt 文件。

阶段 C: 报告结构化解析与信息抽取

这是最需要根据您的实际数据进行定制的部分。目标是识别报告的逻辑结构并提取关键信息。

  • 步骤 3.1: 定义报告结构模式
    • 分析您的报告样本,确定常见的章节标题、子标题模式(如“一、”、“1.”、“(一)”)、关键信息段落前后的标志性词语。
    • 识别报告开头的固定信息(标题、日期、作者/部门)和结尾信息(附件列表、盖章)。
  • 步骤 3.2: 实现结构化解析器
    • 编写代码,读取 .txt 文件,使用正则表达式、关键词匹配、基于行的分析等方法,识别并标记报告的各个逻辑部分。
    • 例如:识别以“摘要”、“背景”、“灾害影响”、“应对措施”、“对策建议”等开头的行作为章节标题。
    • 将报告文本按章节分割,形成一个结构化的数据表示(例如一个字典,键为章节标题,值为章节内容)。
  • 步骤 3.3: 抽取关键信息
    • 在结构化解析的基础上,针对特定章节或整个报告,抽取关键信息。
    • 例如:
      • 从“灾情背景”或“事件概述”中抽取灾害的类型、发生时间、发生地点
      • 从“灾害影响”章节中抽取人员伤亡、经济损失、基础设施破坏、影响范围等数据和描述。
      • 从“应对措施”章节中抽取采取的行动、参与部门、投入资源等。
      • 从“对策建议”章节中抽取未来规划、改进措施等。
    • 抽取方法:关键词匹配、正则表达式提取、或者简单地提取整个段落/章节内容。
    • 注意: 这里的抽取精度直接影响后续 Prompt-Response 对的质量。对于数值(如伤亡人数、经济损失),如果格式不固定,提取会很困难,可能需要人工介入。
  • 步骤 3.4: 表格数据处理 (如果重要)
    • 如果报告中的表格包含关键数据(如详细的损失统计),尝试使用 camelottabula-py 从 PDF 中提取表格数据,并转换为结构化格式(如 Pandas DataFrame)。
    • 将表格数据整合到报告的结构化表示中,或者作为 Prompt/Response 的一部分。

阶段 D: 数据清洗与标准化

  • 步骤 4.1: 清洗提取的文本和数据
    • 去除结构化解析和抽取过程中可能产生的错误文本片段。
    • 纠正 OCR 错误(如果使用 OCR)。
    • 处理缺失值或异常值。
  • 步骤 4.2: 标准化
    • 命名实体标准化: 将同一地点、同一部门、同一灾害类型的不同表述统一(如“XX市”、“XX市人民政府” -> “XX市”;“洪涝”、“洪水灾害” -> “洪涝”)。可以使用查找替换或维护一个标准化词典。
    • 日期/时间标准化: 统一日期和时间的格式(如“2023年8月15日”、“2023/08/15” -> “2023-08-15”)。
    • 单位标准化: 统一数值单位(如“万元”、“万” -> “万元”)。
    • 术语标准化: 确保使用的专业术语前后一致。
  • 步骤 4.3: 去重
    • 识别并处理报告之间或报告内部的高度重复内容。

阶段 E: Prompt-Response 对构建

根据您希望模型完成的任务(生成完整报告、生成特定章节等),将处理好的报告数据格式化。

  • 步骤 5.1: 设计 Prompt-Response 策略
    • 回顾您希望模型如何被使用。是给模型一些核心信息让它生成整篇报告?还是提供一个章节标题和一些要点让它扩写该章节?
    • 选择并组合适合您需求的 Prompt-Response 策略(参考上一个回复中的策略示例)。
  • 步骤 5.2: 实现格式化逻辑
    • 编写代码,读取结构化后的报告数据,按照选定的策略构建 Prompt-Response 对。
    • 策略 1 (完整报告): Input (Prompt) = 提取的关键信息摘要 (如灾害类型、时间、地点、主要影响简述)。 Output (Response) = 完整的报告文本。
    • 策略 2 (章节生成): Input (Prompt) = 灾害核心信息 + 指定章节标题 + 该章节相关的关键要点或数据。 Output (Response) = 指定章节的完整文本。
    • 使用模板: 采用一个固定的 Prompt 模板(如 Alpaca/ShareGPT 风格的 Instruction/Input/Response)来包裹 Prompt 和 Response,有助于模型区分不同部分。这在 config.py 中定义了 PROMPT_TEMPLATE
    • 将每个 Prompt-Response 对保存为一个数据样本(如一个字典)。

阶段 F: 数据集输出与划分

  • 步骤 6.1: 保存为 JSONL 格式
    • 将所有构建好的数据样本保存为一个 JSONL 文件。每行是一个 JSON 对象,包含 Prompt 和 Response(或者按照 Prompt 模板格式化后的单条文本)。
    • 推荐使用 jsonlines 库或 Python 的 json 库结合文件写入操作。
  • 步骤 6.2: 数据集划分
    • 使用 sklearn.model_selection.train_test_split 或 Hugging Face datasets 库,将 JSONL 文件中的数据按比例划分为训练集、验证集和测试集。
    • 将划分结果保存为单独的 JSONL 文件或记录样本索引。

阶段 G: 质量控制与人工审核

  • 步骤 7.1: 自动化检查
    • 检查 JSONL 文件格式是否正确。
    • 检查每个样本是否包含 Prompt 和 Response(或 text 字段)。
    • 检查文本长度是否在合理范围内(避免过长或过短的样本)。
  • 步骤 7.2: 人工抽样审核
    • 随机抽取一定比例(例如 5%-10%)的样本。
    • 人工检查:
      • Prompt 是否清晰、合理,能否从原始报告中推断出来。
      • Response 是否与原始报告内容一致(特别是关键事实数据)。
      • Response 的格式、语言风格是否符合报告要求。
      • Prompt 和 Response 之间的对应关系是否正确。
    • 根据审核结果,找出数据处理流程中的问题并进行修正。
  • 步骤 7.3: 迭代优化
    • 数据准备是一个迭代过程。根据模型微调后的表现和人工评估结果,您可能需要回过头来改进数据提取、解析、格式化等步骤,以提升数据质量。

数据集制作代码框架 (Python)

这个代码框架将提供一个结构,您需要根据您的报告特点填充具体的实现逻辑。

文件结构:

disaster_dataset_prep/
├── data_config.py          # 数据路径和处理配置
├── text_extractor.py       # 文本提取模块
├── report_parser.py        # 报告结构解析和信息抽取模块 (需要大量定制)
├── data_formatter.py       # Prompt-Response 对格式化模块 (需要定制)
├── prepare_dataset.py      # 主流程控制脚本
├── validate_dataset.py     # 数据集验证脚本
├── raw_reports/            # 存放您的原始报告文件
│   ├── report_a.pdf
│   ├── report_b.docx
│   └── ...
└── processed_data/         # 存放中间和最终生成的数据├── texts/              # 提取出的纯文本├── structured_data/    # 结构化解析后的数据 (自定义格式)└── final_dataset.jsonl # 最终的 JSONL 数据集

data_config.py

import os# 原始报告目录
RAW_REPORTS_DIR = "./raw_reports"
# 处理后的数据输出目录
PROCESSED_DATA_DIR = "./processed_data"
TEXT_OUTPUT_DIR = os.path.join(PROCESSED_DATA_DIR, "texts")
STRUCTURED_OUTPUT_DIR = os.path.join(PROCESSED_DATA_DIR, "structured_data") # 示例中间格式输出目录# 最终数据集文件路径
FINAL_DATASET_PATH = os.path.join(PROCESSED_DATA_DIR, "final_dataset.jsonl")# 数据集划分比例
TRAIN_SPLIT_RATIO = 0.8
TEST_SPLIT_RATIO = 0.1 # 剩余的用于验证集 (0.1)# Prompt 模板 (与微调训练脚本中的一致)
PROMPT_TEMPLATE = "### Instruction:\n{}\n### Input:\n{}\n### Response:\n{}"
# 这里的 {0}, {1}, {2} 对应 instruction, input, response# --- 以下配置需要根据您的报告特点进行定制 ---# 文件类型识别 (可以扩展支持更多类型)
SUPPORTED_FILE_TYPES = ['.pdf', '.docx']# 报告结构识别的关键词或正则表达式 (示例,需要根据实际报告调整)
# 用于识别章节标题等
REPORT_SECTION_PATTERNS = [(1, r"^一、(.+)"), # 一级标题(1, r"^1\.(.+)"), # 一级标题(2, r"^(一)(.+)"), # 二级标题(2, r"^\d+\.\d+\s+(.+)"), # 二级标题(3, r"^\d+\.\d+\.\d+\s+(.+)"), # 三级标题(0, r"^摘要"), # 特殊章节(0, r"^背景"),(0, r"^灾害影响分析"),(0, r"^应急响应情况"),(0, r"^对策建议"),# 添加更多您报告中实际使用的标题模式
]# 关键信息抽取的关键词或正则表达式 (示例,需要根据实际报告调整)
# 用于从文本中提取结构化信息
KEY_INFO_PATTERNS = {"灾害类型": r"灾害类型[::]?\s*(.+)","发生时间": r"(发生时间|时间)[::]?\s*(\d{4}年\d{1,2}月\d{1,2}日)","发生地点": r"(发生地点|地点)[::]?\s*(.+?)[\s,;。]","死亡人数": r"(死亡|遇难)[\s人]*(\d+)","受伤人数": r"(受伤|受伤人数)[\s人]*(\d+)","直接经济损失": r"(直接经济损失)[::]?\s*(\d+(\.\d+)?)\s*万", # 示例,假设单位是万# 添加更多您关心的关键信息及其提取模式
}# Prompt-Response 构建策略配置 (选择和组合您需要的策略)
# 例如,True 表示启用该策略,False 表示禁用
PROMPT_RESPONSE_STRATEGIES = {"full_report_from_summary": True, # 从关键信息生成完整报告"section_from_points": True, # 从要点生成特定章节# 添加更多自定义策略
}# full_report_from_summary 策略相关的配置
FULL_REPORT_SUMMARY_INSTRUCTION = "请根据以下灾情概述生成一份详细的本地灾情分析报告:"
FULL_REPORT_SUMMARY_FIELDS = ["灾害类型", "发生时间", "发生地点", "初步影响"] # 从哪里提取信息作为 Input# section_from_points 策略相关的配置 (示例:生成“灾害影响分析”章节)
SECTION_GENERATION_INSTRUCTION_TEMPLATE = "请根据以下信息,撰写一份本地灾情分析报告的'{}'章节:" # 格式化为具体的章节名
SECTION_GENERATION_TARGET_SECTIONS = ["灾害影响分析", "应急响应情况", "对策建议"] # 要生成哪些章节
# 如何从报告中为每个目标章节构建 Input 和 Response
# 这是一个示例:Input 可能包含该章节的原始要点、关键数据;Response 是该章节的完整文本
SECTION_GENERATION_DATA_MAP = {"灾害影响分析": {"input_keys": ["灾害类型", "发生时间", "发生地点", "死亡人数", "受伤人数", "直接经济损失"], # 作为 Input 的关键信息"response_section": "灾害影响分析", # 作为 Response 的原始章节标题"input_template": "灾害:{灾害类型}\n时间:{发生时间}\n地点:{发生地点}\n关键数据:死亡{死亡人数}人,受伤{受伤人数}人,直接经济损失{直接经济损失}万元。\n其他影响要点:[从原始报告中提取的该章节要点简述或关键句]" # 构建 Input 的模板},# 添加其他章节的配置
}

text_extractor.py

import os
from docx import Document
import fitz  # PyMuPDF
import pytesseract
from PIL import Image
import io
import logginglogging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')def extract_text_from_word(docx_path):"""从 docx 文件提取文本"""try:document = Document(docx_path)text = []for paragraph in document.paragraphs:text.append(paragraph.text)# 可能需要处理表格中的文本# for table in document.tables:#     for row in table.rows:#         for cell in row.cells:#             text.append(cell.text) # 表格文本提取通常更复杂return "\n".join(text)except Exception as e:logging.error(f"Error extracting text from {docx_path}: {e}")return Nonedef extract_text_from_pdf(pdf_path):"""从 PDF 文件提取文本"""try:doc = fitz.open(pdf_path)text = ""for page_num in range(doc.page_count):page = doc.load_page(page_num)text += page.get_text()doc.close()# PyMuPDF 也支持提取图片,用于OCR# for page_num in range(doc.page_count):#     page = doc.load_page(page_num)#     pix = page.get_pixmap()#     img_bytes = pix.tobytes("png")#     img = Image.open(io.BytesIO(img_bytes))#     # ... apply OCR here ...return textexcept Exception as e:logging.error(f"Error extracting text from {pdf_path}: {e}")return Nonedef extract_text_from_scanned_pdf(pdf_path):"""从扫描件 PDF 提取文本 (OCR)"""# 需要安装 Tesseract 并配置环境变量,或使用 PaddleOCRtry:# 示例使用 PyTesseract,需要系统安装 Tesseract# 请确保已安装中文语言包 (chi_sim)# pytesseract.pytesseract.tesseract_cmd = r'<full_path_to_your_tesseract_executable>' # 如果未添加到环境变量doc = fitz.open(pdf_path)text = ""for page_num in range(doc.page_count):page = doc.load_page(page_num)pix = page.get_pixmap()img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)# 尝试使用 Tesseract 进行 OCR (假设已安装且配置好中文)# 如果使用 PaddleOCR,调用方式不同try:# text += pytesseract.image_to_string(img, lang='chi_sim')# PaddleOCR 示例 (更推荐中文)# from paddleocr import PaddleOCR# ocr = PaddleOCR(use_angle_cls=True, lang="ch") # 初始化一次即可# result = ocr.ocr(img, cls=True)# for line in result:#      text += line[0][0] + "\n" # 提取文本内容# 这里用一个占位符表示OCR成功但实际需要实现text += f"\n---OCR_PAGE_{page_num}---\n[OCR文本 from page {page_num}]\n" # 替换为实际 OCR 结果except Exception as ocr_e:logging.warning(f"OCR failed for page {page_num} in {pdf_path}: {ocr_e}")text += f"\n---OCR_FAILED_PAGE_{page_num}---\n[OCR failed for page {page_num}]\n"doc.close()return textexcept Exception as e:logging.error(f"Error processing scanned PDF {pdf_path}: {e}")return Nonedef extract_text(file_path):"""根据文件扩展名调用相应的提取函数"""_, file_extension = os.path.splitext(file_path)file_extension = file_extension.lower()if file_extension == '.docx':return extract_text_from_word(file_path)elif file_extension == '.pdf':# 简单判断是否可能是扫描件,实际判断可能更复杂# 例如,检查PDF是否包含可搜索的文本层# 对于演示,假设所有PDF都需要尝试文本提取return extract_text_from_pdf(file_path)# elif file_extension == '.pdf' and is_scanned(file_path): # 需要实现 is_scanned 函数#     return extract_text_from_scanned_pdf(file_path)else:logging.warning(f"Unsupported file type: {file_extension} for {file_path}")return None# 简单的判断PDF是否是扫描件(不完全准确)
# 更准确的方法是检查PDF的文本层,或者尝试提取一定量文本,如果少于阈值则认为是扫描件
# def is_scanned(pdf_path, text_threshold=100):
#      try:
#          doc = fitz.open(pdf_path)
#          text = ""
#          for page in doc:
#              text += page.get_text()
#          doc.close()
#          return len(text.strip()) < text_threshold
#      except Exception as e:
#          logging.warning(f"Error checking if PDF is scanned {pdf_path}: {e}")
#          return False # Assume not scanned on error

report_parser.py

(这个模块是您需要根据实际报告格式进行大量定制和实现的地方。下面的代码只是一个非常基础的示例框架。)

import re
import logging
from data_config import REPORT_SECTION_PATTERNS, KEY_INFO_PATTERNSlogging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')def parse_report_structure(text):"""尝试解析报告的结构,按章节分割文本。这是最需要定制的函数,因为报告结构差异很大。返回一个字典,键为章节标题,值为章节内容。或者返回一个结构化的列表,包含章节信息和文本。"""structured_data = {"_full_text": text, "_sections": []}current_section_title = "Unknown/Header" # 默认章节current_section_text = []lines = text.split('\n')for line in lines:line = line.strip()if not line:continueis_section_title = Falsefor level, pattern in REPORT_SECTION_PATTERNS:match = re.match(pattern, line)if match:# 保存前一个章节if current_section_text:structured_data["_sections"].append({"title": current_section_title,"level": level if level > 0 else 1, # 假设特殊章节也是一级"text": "\n".join(current_section_text).strip()})# 开始新的章节current_section_title = line # 使用匹配到的标题作为新章节标题current_section_text = []is_section_title = Truebreak # 找到匹配的标题模式,不再检查其他模式if not is_section_title:current_section_text.append(line)# 添加最后一个章节if current_section_text:structured_data["_sections"].append({"title": current_section_title,"level": 1, # 默认级别"text": "\n".join(current_section_text).strip()})# 简单的后处理:将 Unknown/Header 章节作为报告的 مقدمة 或 摘要/背景# 实际需要更智能的判断if structured_data["_sections"] and structured_data["_sections"][0]["title"].startswith("Unknown/Header"):structured_data["_sections"][0]["title"] = "报告引言/摘要" # 示例重命名# 打印解析结果示例logging.info(f"Parsed report with {len(structured_data['_sections'])} sections.")# for i, sec in enumerate(structured_data['_sections'][:5]): # 打印前5个章节#     logging.info(f"  Section {i+1}: Title='{sec['title']}', Level={sec['level']}, Text starts with='{sec['text'][:100]}...'")return structured_datadef extract_key_info(structured_data):"""从结构化报告数据中抽取关键信息。これも要定制。返回一个字典,键为信息名称,值为抽取到的信息。"""key_info = {}full_text = structured_data.get("_full_text", "")# 遍历预定义的关键信息模式进行抽取for info_name, pattern in KEY_INFO_PATTERNS.items():match = re.search(pattern, full_text) # 在全文中搜索if match:# 根据模式捕获组获取信息if len(match.groups()) > 0:# 取最后一个捕获组作为值,通常是信息内容extracted_value = match.groups()[-1].strip()key_info[info_name] = extracted_valueelse:# 如果模式没有捕获组,可能是匹配到了整个模式作为标志,这里不存储值或存储整个匹配文本key_info[info_name] = match.group(0).strip() # 示例:存储整个匹配文本# logging.info(f"Extracted '{info_name}': {key_info[info_name]}")else:key_info[info_name] = None # 未找到信息# 可以添加更复杂的抽取逻辑,例如只在特定章节中查找信息return key_info# 您可能还需要其他辅助函数,例如 find_section_by_title(structured_data, title) 等

data_formatter.py

(这个模块是根据解析和抽取的结果,构建 Prompt-Response 对的地方。您需要根据 data_config.py 中启用的策略和格式需求进行定制。)

import logging
from data_config import PROMPT_TEMPLATE, PROMPT_RESPONSE_STRATEGIES
from data_config import FULL_REPORT_SUMMARY_INSTRUCTION, FULL_REPORT_SUMMARY_FIELDS
from data_config import SECTION_GENERATION_INSTRUCTION_TEMPLATE, SECTION_GENERATION_TARGET_SECTIONS, SECTION_GENERATION_DATA_MAPlogging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname=s - %(message)s')def format_dataset_entry(instruction, input_text, response_text):"""按照 Prompt 模板格式化单个数据集条目"""# 确保 instruction, input_text, response_text 都是字符串instruction = str(instruction).strip() if instruction is not None else ""input_text = str(input_text).strip() if input_text is not None else ""response_text = str(response_text).strip() if response_text is not None else ""# 使用 Prompt 模板# 这里假设 PROMPT_TEMPLATE 是 "### Instruction:\n{}\n### Input:\n{}\n### Response:\n{}"formatted_text = PROMPT_TEMPLATE.format(instruction, input_text, response_text)return {"text": formatted_text} # 返回一个字典,键为 'text',符合 Hugging Face datasets 的 JSONL 加载要求def build_prompt_response_pairs(structured_data, key_info):"""根据配置的策略,从结构化数据和抽取信息中构建 Prompt-Response 对。这也是需要大量定制的函数。"""dataset_entries = []# 策略 1: 从关键信息生成完整报告if PROMPT_RESPONSE_STRATEGIES.get("full_report_from_summary"):instruction = FULL_REPORT_SUMMARY_INSTRUCTIONinput_parts = []# 从 key_info 中提取需要作为 Input 的字段for field in FULL_REPORT_SUMMARY_FIELDS:value = key_info.get(field)if value is not None:input_parts.append(f"{field}{value}")else:input_parts.append(f"{field}:[未找到]") # 或者跳过,取决于需求input_text = "\n".join(input_parts)# Response 是报告的完整文本response_text = structured_data.get("_full_text", "") # 使用原始全文作为 Responseif input_text and response_text: # 确保 Input 和 Response 非空dataset_entries.append(format_dataset_entry(instruction, input_text, response_text))# logging.info("Added full_report_from_summary entry.")else:logging.warning(f"Skipping full_report_from_summary for a report due to missing data. Input empty: {not bool(input_text)}, Response empty: {not bool(response_text)}")# 策略 2: 从要点生成特定章节if PROMPT_RESPONSE_STRATEGIES.get("section_from_points"):sections = structured_data.get("_sections", [])for target_section_title in SECTION_GENERATION_TARGET_SECTIONS:# 查找原始报告中对应的章节original_section = next((sec for sec in sections if sec['title'] == target_section_title), None)if original_section:instruction = SECTION_GENERATION_INSTRUCTION_TEMPLATE.format(target_section_title)response_text = original_section['text'] # 目标章节的完整文本作为 Response# 根据 SECTION_GENERATION_DATA_MAP 构建 Inputsection_config = SECTION_GENERATION_DATA_MAP.get(target_section_title)if section_config:input_template = section_config.get("input_template", "")input_data = {}# 从 key_info 和 structured_data 中提取构建 Input 需要的数据for key in section_config.get("input_keys", []):if key in key_info and key_info[key] is not None:input_data[key] = key_info[key]# 您可能还需要从特定章节文本中提取要点作为 Input 的一部分# 例如:从 original_section['text'] 中提取前几句话或包含特定关键词的句子# input_data["其他影响要点"] = extract_key_points_from_text(original_section['text']) # 需要实现 extract_key_points_from_text# 使用模板格式化 Input# 注意:这里需要确保 input_template 中的占位符能被 input_data 中的键填充try:input_text = input_template.format(**input_data)# 填充其他要点示例if "[从原始报告中提取的该章节要点简述或关键句]" in input_text:key_points_summary = original_section['text'][:200].strip() + "..." # 简单示例:截取前200字input_text = input_text.replace("[从原始报告中提取的该章节要点简述或关键句]", key_points_summary)except KeyError as e:logging.warning(f"Missing key in input_data for template '{input_template}': {e}. Skipping section {target_section_title} for this report.")input_text = None # 构建 Input 失败if input_text and response_text:dataset_entries.append(format_dataset_entry(instruction, input_text, response_text))# logging.info(f"Added section_from_points entry for '{target_section_title}'.")else:logging.warning(f"Skipping section_from_points for '{target_section_title}' due to missing data. Input empty: {not bool(input_text)}, Response empty: {not bool(response_text)}")else:logging.warning(f"No section_config found for target section '{target_section_title}'. Skipping.")else:logging.warning(f"Target section '{target_section_title}' not found in the parsed report structure. Skipping.")# 添加其他自定义策略...return dataset_entries# 辅助函数 (需要实现)
# def extract_key_points_from_text(text):
#     """从给定文本中提取关键句子或要点"""
#     # 实现方法:基于句子的重要性评分、关键词密度、位置等
#     return text # 占位符实现

prepare_dataset.py

import os
import jsonlines
from tqdm import tqdm
from data_config import RAW_REPORTS_DIR, TEXT_OUTPUT_DIR, STRUCTURED_OUTPUT_DIR, FINAL_DATASET_PATH
from data_config import SUPPORTED_FILE_TYPES, TRAIN_SPLIT_RATIO, TEST_SPLIT_RATIO
from text_extractor import extract_text
from report_parser import parse_report_structure, extract_key_info # 您需要实现这里的逻辑
from data_formatter import build_prompt_response_pairs # 您需要实现这里的逻辑
import logging
from sklearn.model_selection import train_test_split
from datasets import Dataset, DatasetDict # 使用 Hugging Face datasets 库处理划分和保存logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')def main():# 确保输出目录存在os.makedirs(TEXT_OUTPUT_DIR, exist_ok=True)os.makedirs(STRUCTURED_OUTPUT_DIR, exist_ok=True)os.makedirs(os.path.dirname(FINAL_DATASET_PATH), exist_ok=True)report_files = [os.path.join(RAW_REPORTS_DIR, f)for f in os.listdir(RAW_REPORTS_DIR)if os.path.splitext(f)[1].lower() in SUPPORTED_FILE_TYPES]if not report_files:logging.error(f"No supported report files found in {RAW_REPORTS_DIR}. Please place your reports there.")returnall_dataset_entries = []logging.info(f"Found {len(report_files)} supported report files. Starting processing...")# --- 阶段 B: 文本提取 ---logging.info("--- Starting Text Extraction ---")extracted_texts = {}for report_path in tqdm(report_files, desc="Extracting Text"):file_name = os.path.basename(report_path)text = extract_text(report_path)if text:extracted_texts[file_name] = text# 可选:将提取的文本保存到文件# text_output_path = os.path.join(TEXT_OUTPUT_DIR, os.path.splitext(file_name)[0] + ".txt")# with open(text_output_path, "w", encoding="utf-8") as f:#     f.write(text)else:logging.warning(f"Failed to extract text from {file_name}. Skipping.")logging.info(f"Finished Text Extraction. Successfully extracted {len(extracted_texts)} files.")# --- 阶段 C: 报告结构化解析与信息抽取 ---logging.info("--- Starting Report Parsing and Key Info Extraction ---")parsed_reports = {}for file_name, text in tqdm(extracted_texts.items(), desc="Parsing Reports"):# 这里调用您在 report_parser.py 中实现的解析逻辑structured_data = parse_report_structure(text)key_info = extract_key_info(structured_data)if structured_data and key_info: # 确保解析和抽取成功parsed_reports[file_name] = {"structured_data": structured_data,"key_info": key_info}# 可选:保存结构化数据# structured_output_path = os.path.join(STRUCTURED_OUTPUT_DIR, os.path.splitext(file_name)[0] + ".json")# with open(structured_output_path, "w", encoding="utf-8") as f:#      json.dump(parsed_reports[file_name], f, ensure_ascii=False, indent=4)else:logging.warning(f"Failed to parse structure or extract key info from {file_name}. Skipping.")logging.info(f"Finished Report Parsing and Key Info Extraction for {len(parsed_reports)} reports.")# --- 阶段 E: Prompt-Response 对构建 ---logging.info("--- Starting Prompt-Response Pair Building ---")for file_name, data in tqdm(parsed_reports.items(), desc="Building Dataset Entries"):structured_data = data["structured_data"]key_info = data["key_info"]# 这里调用您在 data_formatter.py 中实现的构建逻辑entries = build_prompt_response_pairs(structured_data, key_info)if entries:all_dataset_entries.extend(entries)# logging.info(f"Generated {len(entries)} entries from {file_name}.")else:logging.warning(f"No dataset entries generated from {file_name}.")logging.info(f"Finished Prompt-Response Pair Building. Total entries generated: {len(all_dataset_entries)}")if not all_dataset_entries:logging.error("No dataset entries were generated. Please check your parsing and formatting logic.")return# --- 阶段 F: 数据集输出与划分 ---logging.info("--- Saving and Splitting Dataset ---")# 使用 Hugging Face datasets 库进行划分和保存# 注意:由于数据量小,划分可能需要谨慎full_dataset = Dataset.from_list(all_dataset_entries)# 划分数据集# 先分成训练集和其他 (测试+验证)train_test_split_dataset = full_dataset.train_test_split(test_size=TEST_SPLIT_RATIO + (1.0 - TRAIN_SPLIT_RATIO - TEST_SPLIT_RATIO), seed=42)train_dataset = train_test_split_dataset['train']# 再将剩余的部分分成测试集和验证集(这里将剩余的全部作为测试集,因为总数据量太小)# 如果需要验证集,按比例再次划分 train_test_split_dataset['test']# 例如:val_test_split_dataset = train_test_split_dataset['test'].train_test_split(test_size=0.5, seed=42)# dataset_dict = DatasetDict({#     'train': train_dataset,#     'validation': val_test_split_dataset['train'],#     'test': val_test_split_dataset['test']# })dataset_dict = DatasetDict({'train': train_dataset,'test': train_test_split_dataset['test']})# 将划分后的数据集保存为 JSONL 文件dataset_dict['train'].to_json(FINAL_DATASET_PATH.replace(".jsonl", "_train.jsonl"), lines=True, force_ascii=False)dataset_dict['test'].to_json(FINAL_DATASET_PATH.replace(".jsonl", "_test.jsonl"), lines=True, force_ascii=False)# if 'validation' in dataset_dict:#     dataset_dict['validation'].to_json(FINAL_DATASET_PATH.replace(".jsonl", "_validation.jsonl"), lines=True, force_ascii=False)logging.info("Dataset splitting and saving complete.")logging.info(f"Train set saved to {FINAL_DATASET_PATH.replace('.jsonl', '_train.jsonl')}")logging.info(f"Test set saved to {FINAL_DATASET_PATH.replace('.jsonl', '_test.jsonl')}")logging.info(f"Train set size: {len(dataset_dict['train'])}")logging.info(f"Test set size: {len(dataset_dict['test'])}")# if 'validation' in dataset_dict: logging.info(f"Validation set size: {len(dataset_dict['validation'])}")logging.info("Dataset preparation finished.")logging.info("Next Steps: Run validate_dataset.py and perform artificial review.")if __name__ == "__main__":main()

validate_dataset.py

import jsonlines
import os
from data_config import FINAL_DATASET_PATH, PROMPT_TEMPLATE
import logging
from datasets import load_dataset # 使用 datasets 库加载验证logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')def validate_dataset():"""加载并对生成的数据集文件进行基本验证。包含自动化检查和打印样本以供人工审核。"""train_path = FINAL_DATASET_PATH.replace(".jsonl", "_train.jsonl")test_path = FINAL_DATASET_PATH.replace(".jsonl", "_test.jsonl")# val_path = FINAL_DATASET_PATH.replace(".jsonl", "_validation.jsonl") # 如果生成了验证集files_to_validate = {'train': train_path,'test': test_path,# 'validation': val_path # 如果有}print("--- Dataset Validation ---")for split_name, file_path in files_to_validate.items():if not os.path.exists(file_path):logging.error(f"Dataset file not found: {file_path}")continueprint(f"\nValidating {split_name} set ({file_path})...")try:# 使用 datasets 库加载dataset = load_dataset("json", data_files=file_path, split="train") # split="train" 是 load_dataset 的一个用法,这里不代表训练集logging.info(f"Successfully loaded {len(dataset)} samples.")# 自动化检查num_samples = len(dataset)if num_samples == 0:logging.warning(f"{split_name} dataset is empty.")continue# 检查样本格式(例如,是否有名为 'text' 的列)if 'text' not in dataset.column_names:logging.error(f"{split_name} dataset samples do not have a 'text' column.")# 尝试从 Prompt 模板解析字段try:sample_text = dataset[0]logging.info(f"First sample raw data: {sample_text}")# 这里需要更复杂的逻辑来检查是否符合 PROMPT_TEMPLATE 格式except Exception as e:logging.error(f"Could not inspect sample data format: {e}")continue# 打印样本,供人工审核print(f"\n--- First 5 samples from {split_name} set for manual review ---")for i in range(min(5, num_samples)):sample = dataset[i]print(f"\nSample {i+1}:")print(sample['text'])print("-" * 20)# 您可以添加更多自定义检查,例如检查文本长度范围,是否存在特定关键词等except Exception as e:logging.error(f"Error validating {file_path}: {e}")print("\n--- Validation Finished ---")print("\n**Important:** Automated checks are basic. Manual review of generated samples is crucial for quality.**")print(f"Review the printed samples and the content of the generated JSONL files ({FINAL_DATASET_PATH.replace('.jsonl', '_*.jsonl')}).")if __name__ == "__main__":validate_dataset()

3. 部署与使用步骤

  1. 安装依赖:
    pip install python-docx PyMuPDF pdfminer.six pandas tqdm jsonlines scikit-learn transformers datasets peft accelerate bitsandbytes trl evaluate rouge_score nltk
    
    如果需要 OCR 或表格提取,安装相应的库和软件(如 Tesseract、PaddleOCR、Java)。
  2. 准备原始报告: 将您的 84 篇报告文件放入 disaster_dataset_prep/raw_reports/ 目录下。
  3. 定制配置: 非常重要! 修改 data_config.py 中的以下部分:
    • RAW_REPORTS_DIR, PROCESSED_DATA_DIR, FINAL_DATASET_PATH:确认路径正确。
    • SUPPORTED_FILE_TYPES:如果您的报告有其他格式,添加进来。
    • REPORT_SECTION_PATTERNS核心定制点! 根据您的报告实际章节标题和层级模式,编写或调整正则表达式。您可能需要分析多份报告来找到通用的模式。
    • KEY_INFO_PATTERNS核心定制点! 根据您需要抽取的关键信息类型和它们在报告中的常见表述方式,编写或调整正则表达式。
    • PROMPT_RESPONSE_STRATEGIES:选择您希望生成哪种类型的 Prompt-Response 对。
    • FULL_REPORT_SUMMARY_FIELDS, SECTION_GENERATION_TARGET_SECTIONS, SECTION_GENERATION_DATA_MAP 等:核心定制点! 根据您选择的策略,定义如何从解析出的结构化数据和抽取信息中构建 Prompt 和 Response。
  4. 定制解析和格式化逻辑: 最核心的开发工作!
    • 打开 report_parser.py
      • 修改 parse_report_structure 函数,使其能够准确识别您报告的章节结构。可能需要更复杂的逻辑,比如基于缩进、字体大小、编号等来判断标题层级。
      • 修改 extract_key_info 函数,使用您在 data_config.py 中定义的模式或其他方法,从报告文本中准确抽取关键信息。
    • 打开 data_formatter.py
      • 修改 build_prompt_response_pairs 函数,确保它能够根据您在 data_config.py 中启用的策略,正确地组合 instruction, input_text, 和 response_text,并使用 PROMPT_TEMPLATE 进行格式化。如果需要从章节文本中提取要点作为 Input,您还需要实现相应的辅助函数。
  5. 运行数据准备脚本:
    python prepare_dataset.py
    
    脚本会按顺序执行文本提取、解析、格式化、合并和划分。这个过程可能会因为文件数量和解析复杂度而耗时。在运行过程中,注意控制台输出的日志和警告信息,它们会指示哪些文件处理失败或哪些信息未能提取。
  6. 验证生成的数据集:
    python validate_dataset.py
    
    脚本会加载生成的 JSONL 文件,进行基本检查,并打印前几条样本供您人工审核。
  7. 人工审核: 这是保证数据质量最重要的一步! 打开生成的 JSONL 文件(如 processed_data/final_dataset_train.jsonl),随机查看一些样本。检查 Prompt 是否清晰、能否合理引导模型?Response 是否准确反映了原始报告的内容和格式?Prompt 与 Response 之间的对应关系是否正确?根据人工审核结果,回到步骤 3 和 4 调整配置和代码,然后重新运行 prepare_dataset.py,直到数据集质量达到要求。

4. 挑战与优化

  • 报告格式不一致: 这是最大的挑战。正则表达式和规则难以覆盖所有变体。可能需要为不同年份或不同来源的报告编写不同的解析规则,并在 report_parser.py 中加入逻辑进行判断和分派。
  • 表格数据: 提取和结构化表格数据非常复杂,特别是格式不规则的表格。如果表格中的信息对生成报告至关重要,可能需要专门的表格提取工具,甚至大量人工录入或校对。
  • OCR 错误: 扫描件的 OCR 结果会有错误,特别是专业术语和数字。需要进行 OCR 后处理(如基于词典的校正)或人工校对。
  • 信息抽取精度: 基于规则的抽取容易漏报或误报。对于重要信息,可能需要更复杂的 NLP 方法(如命名实体识别、关系抽取),但这会增加项目复杂度。初期可以接受较低的自动化抽取精度,通过人工审核和补充来弥补。
  • Prompt-Response 对应关系: 确保 Input 和 Output 之间的逻辑关系清晰、合理。例如,Input 中的信息是否足够让模型生成期望的 Response?Response 是否完全由 Input 或原始报告推导出来?
  • 数据量小: 即使经过所有努力,84 份报告生成的数据集数量仍然可能非常有限。这再次强调了 RAG 的重要性,以及在微调阶段采取 QLoRA 等参数高效方法来减轻过拟合。

总结:

数据集制作是一个劳动密集型的过程,特别是从非结构化或半结构化数据中提取信息。目前搭建了基本的流水线,但核心的报告解析和 Prompt-Response 构建逻辑需要根据实际数据进行深入迭代开发和定制。工审核环节必不可少,是保证数据集质量和后续模型效果的基石。

http://www.xdnf.cn/news/2622.html

相关文章:

  • 跟着文档学Vuex(一):什么是Vuex
  • WP记录。
  • 单元测试总结
  • Linux0.11引导启动程序:简略过程
  • 相机-IMU联合标定:相机标定
  • K8S ConfigMap 快速开始
  • spring cloud 服务注册与发现(Service registration and discovery)
  • SAP S/4HANA迁移现状与展望(2025)
  • 解锁服务器迁移的未来:《2025 服务器迁移效率白皮书》(附下载)
  • (一)Linux的历史与环境搭建
  • Hadoop伪分布式模式搭建全攻略:从环境配置到实战测试
  • WebRTC SDK是什么?
  • 在matlab中使用UAV123官方toolkits测试自己的数据集
  • 小熊派BearPi-Pico H3863(二)环境配置 Ubuntu编译源码与VSCode远程开发指南
  • 制作一款打飞机游戏28:编辑器完善鲁棒性
  • 01 C++概述
  • MATLAB Coder代码生成(工业部署)——MATLAB技巧
  • 机器学习-入门-线性模型(2)
  • 线下零售数据采集:在精度与效率之间寻找平衡点
  • 在 Ubuntu 24.04 LTS 一台机子上同时部署Dify 1.3.1 和 RAGflow 0.18.0
  • 《数据结构之美--二叉树》
  • PCI/PXI 总线的可编程电阻卡
  • oracle 数据库查询指定用户下每个表占用空间的大小,倒序显示
  • Java垃圾收集器与内存分配策略深度解析
  • 再看 BBR 到 BBRv3 的公平性改进
  • Hadoop 单机模式(Standalone Mode)部署与 WordCount 测试
  • 深入解析 Babylon.js 中的 TransformNode.lookAt 方法
  • AI大模型应用之按照设计稿还原代码
  • 第36课 常用快捷操作——用“鼠标右键”退出当前命令
  • 计算机考研精炼 计网