5.25 打卡
仔细回顾一下神经网络到目前的内容,没跟上进度的同学补一下进度。
- 作业:对之前的信贷项目,利用神经网络训练下,尝试用到目前的知识点让代码更加规范和美观。
- 探索性作业(随意完成):尝试进入nn.Module中,查看他的方法
import pandas as pd import numpy as np import torch import torch.nn as nn import torch.optim as optim from torch.utils.data import Dataset, DataLoader from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score import matplotlib.pyplot as plt import seaborn as sns import os # 虽然现在不直接拼接路径,但os模块依然有用,比如检查文件是否存在 from typing import Tuple, Dict, List, Optional# --- 1. 配置常量 --- # 1.1 数据路径 # 直接指定完整的文件路径 FULL_DATA_PATH = r"D:\系统默认\桌面\机器学习\作业\day 36作业\data.csv" # 注意:使用原始字符串r""或双反斜杠\\避免转义问题# 1.2 模型保存路径 (可以保持不变,它会在脚本运行目录下创建 models 文件夹) MODEL_SAVE_DIR = "./models" MODEL_NAME = "credit_risk_nn_model.pth" FULL_MODEL_SAVE_PATH = os.path.join(MODEL_SAVE_DIR, MODEL_NAME)# 1.3 训练超参数 (保持不变) RANDOM_SEED = 42 BATCH_SIZE = 64 LEARNING_RATE = 0.001 NUM_EPOCHS = 50 HIDDEN_LAYER_SIZES = [128, 64, 32] # 隐藏层神经元数量# 1.4 设备配置 (保持不变) DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")# --- 2. 数据加载与预处理 --- # 这一部分的代码保持不变,因为它会使用上面定义的 FULL_DATA_PATH class CreditDataset(Dataset):"""自定义PyTorch数据集类,用于加载和准备信贷数据。"""def __init__(self, features: np.ndarray, labels: np.ndarray):self.features = torch.tensor(features, dtype=torch.float32)self.labels = torch.tensor(labels, dtype=torch.float32).unsqueeze(1)def __len__(self) -> int:return len(self.labels)def __getitem__(self, idx: int) -> Tuple[torch.Tensor, torch.Tensor]:return self.features[idx], self.labels[idx]def load_and_preprocess_data(data_path: str, target_column: str = 'default') -> Tuple[DataLoader, DataLoader, DataLoader, StandardScaler]:"""加载数据,进行特征工程,划分训练/验证/测试集,并标准化。Args:data_path (str): 数据文件路径。target_column (str): 目标(标签)列的名称。Returns:Tuple[DataLoader, DataLoader, DataLoader, StandardScaler]:训练数据加载器, 验证数据加载器, 测试数据加载器, 特征标准化器。"""print(f"正在加载数据来自: {data_path}")if not os.path.exists(data_path):raise FileNotFoundError(f"数据文件未找到: {data_path}\n请检查路径是否正确。")df = pd.read_csv(data_path)print("数据加载完成。")print("数据前5行:")print(df.head())print(f"\n数据信息:\n{df.info()}")# 检查和处理缺失值print("\n检查缺失值...")missing_counts = df.isnull().sum()if missing_counts.sum() > 0:print("发现缺失值,正在进行填充...")numeric_cols = df.select_dtypes(include=np.number).columnsfor col in numeric_cols:if missing_counts[col] > 0:df[col] = df[col].fillna(df[col].mean())print("缺失值填充完成。")else:print("无缺失值。")# 分离特征和标签if target_column not in df.columns:raise ValueError(f"目标列 '{target_column}' 不在数据文件中。请检查列名。")X = df.drop(target_column, axis=1)y = df[target_column]# 确保标签是 0 或 1,如果不是,需要进行编码if not all(val in [0, 1] for val in y.unique()):print(f"警告: 标签列 '{target_column}' 包含非0/1的值。请确保它已经正确编码。")# 如果你的标签是例如 'Yes'/'No',需要在这里进行转换# 例如:y = y.map({'Yes': 1, 'No': 0})# 或者使用 sklearn.preprocessing.LabelEncoder# 特征标准化scaler = StandardScaler()X_scaled = scaler.fit_transform(X)print("数据标准化完成。")# 划分训练集、验证集和测试集X_train_val, X_test, y_train_val, y_test = train_test_split(X_scaled, y.values, test_size=0.15, random_state=RANDOM_SEED, stratify=y)X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size=0.176, random_state=RANDOM_SEED, stratify=y_train_val)print(f"\n数据集划分比例:")print(f"训练集大小: {len(X_train)} ({len(X_train)/len(df)*100:.2f}%)")print(f"验证集大小: {len(X_val)} ({len(X_val)/len(df)*100:.2f}%)")print(f"测试集大小: {len(X_test)} ({len(X_test)/len(df)*100:.2f}%)")print(f"训练集标签分布:\n{pd.Series(y_train).value_counts(normalize=True)}")print(f"验证集标签分布:\n{pd.Series(y_val).value_counts(normalize=True)}")print(f"测试集标签分布:\n{pd.Series(y_test).value_counts(normalize=True)}")train_dataset = CreditDataset(X_train, y_train)val_dataset = CreditDataset(X_val, y_val)test_dataset = CreditDataset(X_test, y_test)train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)return train_loader, val_loader, test_loader, scaler# (以下部分代码与之前提供的一致,包括模型定义、训练、评估和主函数) # ... (从 class CreditRiskPredictor(nn.Module): 开始,到文件末尾都复制过来)# --- 3. 定义神经网络模型 --- class CreditRiskPredictor(nn.Module):"""信贷风险预测的神经网络模型。使用ReLU激活函数,输出层没有Sigmoid(因为我们使用BCEWithLogitsLoss)。"""def __init__(self, input_size: int, hidden_sizes: List[int], output_size: int = 1):super().__init__()layers = []layers.append(nn.Linear(input_size, hidden_sizes[0]))layers.append(nn.ReLU())for i in range(len(hidden_sizes) - 1):layers.append(nn.Linear(hidden_sizes[i], hidden_sizes[i+1]))layers.append(nn.ReLU())layers.append(nn.Linear(hidden_sizes[-1], output_size))self.network = nn.Sequential(*layers)def forward(self, x: torch.Tensor) -> torch.Tensor:return self.network(x)# --- 4. 训练模型 --- def train_model(model: nn.Module,train_loader: DataLoader,val_loader: DataLoader,criterion: nn.Module,optimizer: optim.Optimizer,num_epochs: int,device: torch.device,model_save_path: str ) -> nn.Module:model.to(device)best_val_loss = float('inf')best_epoch = -1train_losses: List[float] = []val_losses: List[float] = []print(f"\n--- 开始训练 ({'GPU' if device.type == 'cuda' else 'CPU'}) ---")for epoch in range(num_epochs):model.train()running_train_loss = 0.0for batch_idx, (inputs, labels) in enumerate(train_loader):inputs, labels = inputs.to(device), labels.to(device)optimizer.zero_grad()outputs = model(inputs)loss = criterion(outputs, labels)loss.backward()optimizer.step()running_train_loss += loss.item() * inputs.size(0)epoch_train_loss = running_train_loss / len(train_loader.dataset)train_losses.append(epoch_train_loss)model.eval()running_val_loss = 0.0with torch.no_grad():for inputs, labels in val_loader:inputs, labels = inputs.to(device), labels.to(device)outputs = model(inputs)loss = criterion(outputs, labels)running_val_loss += loss.item() * inputs.size(0)epoch_val_loss = running_val_loss / len(val_loader.dataset)val_losses.append(epoch_val_loss)print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {epoch_train_loss:.4f}, Val Loss: {epoch_val_loss:.4f}", end='')if epoch_val_loss < best_val_loss:best_val_loss = epoch_val_lossbest_epoch = epoch + 1os.makedirs(os.path.dirname(model_save_path), exist_ok=True)torch.save(model.state_dict(), model_save_path)print(f" -> 保存最佳模型 (Val Loss: {best_val_loss:.4f})")else:print()print(f"--- 训练完成!最佳验证损失 {best_val_loss:.4f} 在 Epoch {best_epoch} 达到。 ---")plt.figure(figsize=(10, 6))plt.plot(range(1, num_epochs + 1), train_losses, label='Train Loss')plt.plot(range(1, num_epochs + 1), val_losses, label='Validation Loss')plt.axvline(x=best_epoch, color='r', linestyle='--', label=f'Best Epoch ({best_epoch})')plt.title('Training and Validation Loss Over Epochs')plt.xlabel('Epoch')plt.ylabel('Loss')plt.legend()plt.grid(True)plt.tight_layout()plt.show()loaded_model = CreditRiskPredictor(model.network[0].in_features, HIDDEN_LAYER_SIZES, model.network[-1].out_features)loaded_model.load_state_dict(torch.load(model_save_path))loaded_model.to(device)return loaded_model# --- 5. 评估模型 --- def evaluate_model(model: nn.Module, test_loader: DataLoader, device: torch.device) -> Dict[str, float]:model.eval()model.to(device)all_preds: List[float] = []all_labels: List[float] = []all_probs: List[float] = []with torch.no_grad():for inputs, labels in test_loader:inputs, labels = inputs.to(device), labels.to(device)outputs = model(inputs)probs = torch.sigmoid(outputs)preds = (probs > 0.5).float()all_preds.extend(preds.cpu().numpy().flatten())all_labels.extend(labels.cpu().numpy().flatten())all_probs.extend(probs.cpu().numpy().flatten())metrics = {'accuracy': accuracy_score(all_labels, all_preds),'precision': precision_score(all_labels, all_preds),'recall': recall_score(all_labels, all_preds),'f1_score': f1_score(all_labels, all_preds),'roc_auc': roc_auc_score(all_labels, all_probs)}print("\n--- 模型评估结果 (测试集) ---")for metric_name, value in metrics.items():print(f"{metric_name.replace('_', ' ').capitalize()}: {value:.4f}")return metrics# --- 6. 主函数 --- def main():torch.manual_seed(RANDOM_SEED)np.random.seed(RANDOM_SEED)if torch.cuda.is_available():torch.cuda.manual_seed_all(RANDOM_SEED)torch.backends.cudnn.deterministic = Truetorch.backends.cudnn.benchmark = Falsetry:# 这里会使用上面定义的 FULL_DATA_PATH# 如果你的标签列名不是 'default',请修改 target_column 参数train_loader, val_loader, test_loader, scaler = load_and_preprocess_data(FULL_DATA_PATH, target_column='default')except (FileNotFoundError, ValueError) as e:print(f"数据加载失败: {e}")print("请检查:")print(f" 1. 数据文件 '{FULL_DATA_PATH}' 是否存在。")print(f" 2. 你的数据文件是否包含名为 'default' 的标签列。如果不是,请修改 `target_column` 参数。")returnsample_batch_features, _ = next(iter(train_loader))input_size = sample_batch_features.shape[1]print(f"\n模型输入特征数量: {input_size}")model = CreditRiskPredictor(input_size=input_size, hidden_sizes=HIDDEN_LAYER_SIZES)criterion = nn.BCEWithLogitsLoss()optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)trained_model = train_model(model,train_loader,val_loader,criterion,optimizer,NUM_EPOCHS,DEVICE,FULL_MODEL_SAVE_PATH)metrics = evaluate_model(trained_model, test_loader, DEVICE)print("\n--- 示例:使用模型进行单条预测 ---")sample_index = np.random.randint(0, len(test_loader.dataset))sample_features_tensor, sample_label_tensor = test_loader.dataset[sample_index]sample_features_tensor_batch = sample_features_tensor.unsqueeze(0).to(DEVICE)trained_model.eval()with torch.no_grad():model_output_logits = trained_model(sample_features_tensor_batch)predicted_prob = torch.sigmoid(model_output_logits).item()predicted_class = 1 if predicted_prob > 0.5 else 0print(f"原始标签 (实际结果): {int(sample_label_tensor.item())}")print(f"预测违约概率: {predicted_prob:.4f}")print(f"预测类别 (0:未违约, 1:违约): {predicted_class}")if __name__ == "__main__":main()