第8天:LSTM模型预测糖尿病(优化)
- 🍨 本文为🔗365天深度学习训练营 中的学习记录博客
- 🍖 原作者:K同学啊
具体实现
(一)环境
语言环境:Python 3.10
编 译 器: PyCharm
框 架: Pytorch
(二)具体步骤
1. 上代码
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
基于LSTM神经网络的糖尿病预测代码
""" import warnings
import os
import logging
from datetime import datetime
from typing import Tuple, List, Dict, Optional
from pathlib import Path # 深度学习框架导入
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
from torch.optim.lr_scheduler import ReduceLROnPlateau # 数据科学工具包
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, roc_curve
import joblib # 配置设置
warnings.filterwarnings('ignore')
plt.rcParams['savefig.dpi'] = 300
plt.rcParams['figure.dpi'] = 100
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False # 设置日志
logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('diabetes_model.log'), logging.StreamHandler() ]
) class Config: """配置类:集中管理所有超参数(手动配置的那些参数叫超参数)和配置""" # 数据配置 DATA_PATH = "./data/dia.xls" FEATURES_TO_DROP = ['卡号', '是否糖尿病'] # 保留高密度脂蛋白胆固醇 TARGET_COLUMN = '是否糖尿病' # 模型配置 - 动态计算输入维度 HIDDEN_SIZE = 128 # 降低隐藏层维度防止过拟合 NUM_LAYERS = 2 # LSTM层数 DROPOUT_RATE = 0.3 NUM_CLASSES = 2 # 训练配置 BATCH_SIZE = 32 # 减小batch size提高训练稳定性 LEARNING_RATE = 1e-3 EPOCHS = 100 EARLY_STOPPING_PATIENCE = 15 TEST_SIZE = 0.2 VAL_SIZE = 0.2 RANDOM_STATE = 42 # 交叉验证配置 K_FOLDS = 5 # 设备配置 DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") # 模型保存路径 MODEL_SAVE_PATH = "./models" SCALER_SAVE_PATH = "./scalers" class DataProcessor: """负责数据加载、预处理和可视化""" def __init__(self, config: Config): self.config = config self.scaler = StandardScaler() self.logger = logging.getLogger(__name__) # 创建保存目录 os.makedirs(config.MODEL_SAVE_PATH, exist_ok=True) os.makedirs(config.SCALER_SAVE_PATH, exist_ok=True) def load_data(self) -> pd.DataFrame: """加载数据集,增加错误处理""" try: self.logger.info("正在加载数据...") if not os.path.exists(self.config.DATA_PATH): raise FileNotFoundError(f"数据文件不存在: {self.config.DATA_PATH}") # 尝试不同的编码方式 try: dataframe = pd.read_excel(self.config.DATA_PATH) except UnicodeDecodeError: dataframe = pd.read_excel(self.config.DATA_PATH, encoding='gbk') self.logger.info(f"数据加载完成,数据形状: {dataframe.shape}") return dataframe except Exception as e: self.logger.error(f"数据加载失败: {str(e)}") raise def explore_data(self, dataframe: pd.DataFrame) -> Dict: """数据探索性分析,返回统计信息""" self.logger.info("开始数据探索性分析...") stats = { 'shape': dataframe.shape, 'columns': list(dataframe.columns), 'dtypes': dataframe.dtypes.to_dict(), 'missing_values': dataframe.isnull().sum().to_dict(), 'duplicates': dataframe.duplicated().sum(), 'target_distribution': dataframe[self.config.TARGET_COLUMN].value_counts().to_dict() } self.logger.info(f"数据形状: {stats['shape']}") self.logger.info(f"缺失值: {sum(stats['missing_values'].values())}") self.logger.info(f"重复值: {stats['duplicates']}") self.logger.info(f"目标变量分布: {stats['target_distribution']}") return stats def clean_data(self, dataframe: pd.DataFrame) -> pd.DataFrame: """数据清洗""" self.logger.info("开始数据清洗...") # 移除重复值 initial_shape = dataframe.shape dataframe = dataframe.drop_duplicates() self.logger.info(f"移除重复值: {initial_shape[0] - dataframe.shape[0]} 行") # 处理异常值(使用IQR方法) numeric_columns = dataframe.select_dtypes(include=[np.number]).columns numeric_columns = [col for col in numeric_columns if col not in self.config.FEATURES_TO_DROP] for col in numeric_columns: Q1 = dataframe[col].quantile(0.25) Q3 = dataframe[col].quantile(0.75) IQR = Q3 - Q1 lower_bound = Q1 - 1.5 * IQR upper_bound = Q3 + 1.5 * IQR outliers = dataframe[(dataframe[col] < lower_bound) | (dataframe[col] > upper_bound)] if len(outliers) > 0: self.logger.info(f"列 {col} 发现 {len(outliers)} 个异常值") # 使用中位数替换异常值 dataframe.loc[dataframe[col] < lower_bound, col] = dataframe[col].median() dataframe.loc[dataframe[col] > upper_bound, col] = dataframe[col].median() return dataframe def visualize_features(self, dataframe: pd.DataFrame) -> None: """特征分布可视化分析""" self.logger.info("生成特征分布图...") # 目标变量分布 plt.figure(figsize=(15, 12)) # 目标变量分布 plt.subplot(3, 3, 1) target_counts = dataframe[self.config.TARGET_COLUMN].value_counts() plt.pie(target_counts.values, labels=['正常', '糖尿病'], autopct='%1.1f%%') plt.title('目标变量分布') # 数值特征的箱线图 numeric_features = dataframe.select_dtypes(include=[np.number]).columns numeric_features = [col for col in numeric_features if col not in self.config.FEATURES_TO_DROP] for i, col in enumerate(numeric_features[:8], 2): plt.subplot(3, 3, i) sns.boxplot(x=dataframe[self.config.TARGET_COLUMN], y=dataframe[col]) plt.title(f'{col} 分布') plt.xticks([0, 1], ['正常', '糖尿病']) plt.tight_layout() plt.savefig('feature_distribution.png') plt.show() # 相关性热力图 plt.figure(figsize=(12, 10)) correlation_matrix = dataframe[numeric_features].corr() sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0) plt.title('特征相关性热力图') plt.tight_layout() plt.savefig('correlation_heatmap.png') plt.show() def prepare_data(self, dataframe: pd.DataFrame) -> Tuple[ torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]: """数据预处理和划分""" self.logger.info("开始数据预处理...") # 特征和标签分离 X = dataframe.drop(self.config.FEATURES_TO_DROP, axis=1) y = dataframe[self.config.TARGET_COLUMN] # 动态更新输入维度 self.config.INPUT_SIZE = X.shape[1] self.logger.info(f"输入特征维度: {self.config.INPUT_SIZE}") # 标准化特征 X_scaled = self.scaler.fit_transform(X) # 保存标准化器 joblib.dump(self.scaler, os.path.join(self.config.SCALER_SAVE_PATH, 'scaler.pkl')) # 数据集划分:训练集、验证集、测试集 X_temp, X_test, y_temp, y_test = train_test_split( X_scaled, y, test_size=self.config.TEST_SIZE, random_state=self.config.RANDOM_STATE, stratify=y ) X_train, X_val, y_train, y_val = train_test_split( X_temp, y_temp, test_size=self.config.VAL_SIZE, random_state=self.config.RANDOM_STATE, stratify=y_temp ) # 转换为张量 X_train = torch.tensor(X_train, dtype=torch.float32).unsqueeze(1) # 添加序列维度 X_val = torch.tensor(X_val, dtype=torch.float32).unsqueeze(1) X_test = torch.tensor(X_test, dtype=torch.float32).unsqueeze(1) y_train = torch.tensor(y_train.values, dtype=torch.long) y_val = torch.tensor(y_val.values, dtype=torch.long) y_test = torch.tensor(y_test.values, dtype=torch.long) self.logger.info(f"训练集: {X_train.shape}, 验证集: {X_val.shape}, 测试集: {X_test.shape}") return X_train, X_val, X_test, y_train, y_val, y_test def create_dataloaders(self, X_train, X_val, X_test, y_train, y_val, y_test) -> Tuple[ DataLoader, DataLoader, DataLoader]: """创建数据加载器""" train_dl = DataLoader( TensorDataset(X_train, y_train), batch_size=self.config.BATCH_SIZE, shuffle=True ) val_dl = DataLoader( TensorDataset(X_val, y_val), batch_size=self.config.BATCH_SIZE, shuffle=False ) test_dl = DataLoader( TensorDataset(X_test, y_test), batch_size=self.config.BATCH_SIZE, shuffle=False ) return train_dl, val_dl, test_dl class DiabetesLSTM_V2(nn.Module): """LSTM模型""" def __init__(self, config: Config): super(DiabetesLSTM_V2, self).__init__() self.lstm = nn.LSTM( input_size=config.INPUT_SIZE, hidden_size=config.HIDDEN_SIZE, num_layers=config.NUM_LAYERS, dropout=config.DROPOUT_RATE if config.NUM_LAYERS > 1 else 0, batch_first=True, bidirectional=True # 双向LSTM ) # 批量归一化 self.batch_norm = nn.BatchNorm1d(config.HIDDEN_SIZE * 2) # 分类器 self.classifier = nn.Sequential( nn.Linear(config.HIDDEN_SIZE * 2, config.HIDDEN_SIZE), nn.ReLU(), nn.Dropout(config.DROPOUT_RATE), nn.Linear(config.HIDDEN_SIZE, config.NUM_CLASSES) ) # 权重初始化 self._init_weights() def _init_weights(self): """权重初始化""" for module in self.modules(): if isinstance(module, nn.Linear): nn.init.xavier_uniform_(module.weight) if module.bias is not None: nn.init.constant_(module.bias, 0) elif isinstance(module, nn.LSTM): for name, param in module.named_parameters(): if 'weight' in name: nn.init.orthogonal_(param) elif 'bias' in name: nn.init.constant_(param, 0) def forward(self, x: torch.Tensor) -> torch.Tensor: """前向传播""" # LSTM lstm_out, (hidden, cell) = self.lstm(x) # 使用最后一个时间步的输出 output = lstm_out[:, -1, :] # 批量归一化 output = self.batch_norm(output) # 分类 output = self.classifier(output) return output class EarlyStopping: """早停机制""" def __init__(self, patience=15, min_delta=0.001): self.patience = patience self.min_delta = min_delta self.counter = 0 self.best_score = None self.early_stop = False def __call__(self, val_loss): if self.best_score is None: self.best_score = val_loss elif val_loss < self.best_score - self.min_delta: self.best_score = val_loss self.counter = 0 else: self.counter += 1 if self.counter >= self.patience: self.early_stop = True class ModelTrainer: """模型训练器""" def __init__(self, config: Config): self.config = config self.device = config.DEVICE self.logger = logging.getLogger(__name__) # 初始化模型 self.model = DiabetesLSTM_V2(config).to(self.device) # 损失函数(考虑类别不平衡) self.loss_fn = nn.CrossEntropyLoss() # 优化器 self.optimizer = torch.optim.Adam( self.model.parameters(), lr=config.LEARNING_RATE, weight_decay=1e-5 ) # 学习率调度器 self.scheduler = ReduceLROnPlateau( self.optimizer, mode='min', factor=0.5, patience=10 # verbose=True ) # 早停 self.early_stopping = EarlyStopping(patience=config.EARLY_STOPPING_PATIENCE) # 训练历史 self.train_history = { 'train_loss': [], 'val_loss': [], 'train_acc': [], 'val_acc': [] } def train_epoch(self, dataloader: DataLoader) -> Tuple[float, float]: """训练一个epoch""" self.model.train() total_loss = 0 correct = 0 total = 0 for batch_idx, (data, target) in enumerate(dataloader): data, target = data.to(self.device), target.to(self.device) self.optimizer.zero_grad() output = self.model(data) loss = self.loss_fn(output, target) loss.backward() # 梯度裁剪 torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0) self.optimizer.step() total_loss += loss.item() pred = output.argmax(dim=1) correct += pred.eq(target).sum().item() total += target.size(0) avg_loss = total_loss / len(dataloader) accuracy = correct / total return avg_loss, accuracy def validate_epoch(self, dataloader: DataLoader) -> Tuple[float, float]: """验证一个epoch""" self.model.eval() total_loss = 0 correct = 0 total = 0 with torch.no_grad(): for data, target in dataloader: data, target = data.to(self.device), target.to(self.device) output = self.model(data) loss = self.loss_fn(output, target) total_loss += loss.item() pred = output.argmax(dim=1) correct += pred.eq(target).sum().item() total += target.size(0) avg_loss = total_loss / len(dataloader) accuracy = correct / total return avg_loss, accuracy def train(self, train_dl: DataLoader, val_dl: DataLoader) -> None: """完整训练过程""" self.logger.info("开始模型训练...") self.logger.info(f"使用设备: {self.device}") self.logger.info(f"模型参数数量: {sum(p.numel() for p in self.model.parameters())}") best_val_loss = float('inf') for epoch in range(self.config.EPOCHS): # 训练 train_loss, train_acc = self.train_epoch(train_dl) # 验证 val_loss, val_acc = self.validate_epoch(val_dl) # 更新学习率 self.scheduler.step(val_loss) # 记录历史 self.train_history['train_loss'].append(train_loss) self.train_history['val_loss'].append(val_loss) self.train_history['train_acc'].append(train_acc) self.train_history['val_acc'].append(val_acc) # 保存最佳模型 if val_loss < best_val_loss: best_val_loss = val_loss self.save_model('best_model.pth') # 早停检查 self.early_stopping(val_loss) # 日志输出 self.logger.info( f'Epoch {epoch + 1}/{self.config.EPOCHS} - ' f'Train Loss: {train_loss:.4f} - Train Acc: {train_acc:.4f} - ' f'Val Loss: {val_loss:.4f} - Val Acc: {val_acc:.4f} - ' f'LR: {self.optimizer.param_groups[0]["lr"]:.6f}' ) if self.early_stopping.early_stop: self.logger.info(f"早停在第 {epoch + 1} 轮") break self.logger.info("训练完成") def evaluate(self, test_dl: DataLoader) -> Dict: """模型评估""" self.model.eval() all_preds = [] all_targets = [] all_probs = [] with torch.no_grad(): for data, target in test_dl: data, target = data.to(self.device), target.to(self.device) output = self.model(data) probs = F.softmax(output, dim=1) preds = output.argmax(dim=1) all_preds.extend(preds.cpu().numpy()) all_targets.extend(target.cpu().numpy()) all_probs.extend(probs.cpu().numpy()) # 计算指标 accuracy = (np.array(all_preds) == np.array(all_targets)).mean() auc = roc_auc_score(all_targets, np.array(all_probs)[:, 1]) # 分类报告 report = classification_report(all_targets, all_preds, output_dict=True) results = { 'accuracy': accuracy, 'auc': auc, 'classification_report': report, 'predictions': all_preds, 'true_labels': all_targets, 'probabilities': all_probs } self.logger.info(f"测试准确率: {accuracy:.4f}") self.logger.info(f"AUC: {auc:.4f}") return results def save_model(self, filename: str): """保存模型""" filepath = os.path.join(self.config.MODEL_SAVE_PATH, filename) torch.save({ 'model_state_dict': self.model.state_dict(), 'optimizer_state_dict': self.optimizer.state_dict(), 'config': self.config, 'train_history': self.train_history }, filepath) self.logger.info(f"模型已保存至: {filepath}") def plot_training_history(self) -> None: """绘制训练历史""" epochs_range = range(1, len(self.train_history['train_loss']) + 1) plt.figure(figsize=(15, 5)) # 损失图 plt.subplot(1, 3, 1) plt.plot(epochs_range, self.train_history['train_loss'], label='训练损失') plt.plot(epochs_range, self.train_history['val_loss'], label='验证损失') plt.xlabel('Epochs') plt.ylabel('Loss') plt.title('模型损失') plt.legend() plt.grid(True) # 准确率图 plt.subplot(1, 3, 2) plt.plot(epochs_range, self.train_history['train_acc'], label='训练准确率') plt.plot(epochs_range, self.train_history['val_acc'], label='验证准确率') plt.xlabel('Epochs') plt.ylabel('Accuracy') plt.title('模型准确率') plt.legend() plt.grid(True) # ROC曲线(需要在evaluate方法调用后) plt.subplot(1, 3, 3) plt.text(0.5, 0.5, 'ROC曲线\n(需要在evaluate后显示)', ha='center', va='center', transform=plt.gca().transAxes) plt.title('ROC曲线') plt.tight_layout() plt.savefig('training_history.png') plt.show() def plot_roc_curve(self, results: Dict): """绘制ROC曲线""" fpr, tpr, _ = roc_curve(results['true_labels'], np.array(results['probabilities'])[:, 1]) plt.figure(figsize=(8, 6)) plt.plot(fpr, tpr, label=f'ROC Curve (AUC = {results["auc"]:.4f})') plt.plot([0, 1], [0, 1], 'k--', label='Random') plt.xlabel('False Positive Rate') plt.ylabel('True Positive Rate') plt.title('ROC Curve') plt.legend() plt.grid(True) plt.savefig('roc_curve.png') plt.show() def main(): """主函数""" logger = logging.getLogger(__name__) logger.info("糖尿病预测深度学习模型训练开始") # 配置 config = Config() # 数据处理 data_processor = DataProcessor(config) try: # 数据加载和预处理 dataframe = data_processor.load_data() stats = data_processor.explore_data(dataframe) dataframe = data_processor.clean_data(dataframe) data_processor.visualize_features(dataframe) # 数据划分 X_train, X_val, X_test, y_train, y_val, y_test = data_processor.prepare_data(dataframe) train_dl, val_dl, test_dl = data_processor.create_dataloaders(X_train, X_val, X_test, y_train, y_val, y_test) # 模型训练 trainer = ModelTrainer(config) trainer.train(train_dl, val_dl) # 模型评估 results = trainer.evaluate(test_dl) # 可视化结果 trainer.plot_training_history() trainer.plot_roc_curve(results) logger.info("训练流程完成!") except Exception as e: logger.error(f"训练过程中出现错误: {str(e)}") raise if __name__ == "__main__": main()
运行结果:
2. 什么是超参数
超参数(Hyperparameters)是在模型训练开始之前需要手动设置的参数,它们控制着学习过程本身,而不是通过训练数据学习得到的。
超参数 vs 参数的区别
类型 | 超参数 | 参数 |
---|---|---|
定义 | 控制学习过程的配置 | 模型内部的权重和偏置 |
设置时间 | 训练前手动设置 | 训练过程中自动学习 |
举例 | 学习率、批次大小、网络层数 | 神经网络的权重矩阵 |
类比 | 学习方法和环境设置 | 学习到的知识内容 |
3. 什么是IQR
IQR = Inter Quartile Range = 四分位距,就像把班级同学按成绩排队,然后找到中间50%同学的成绩范围!
四分位数解释
想象100个学生按成绩排序:
学生排序: 1 2 3 ... 25 26 ... 50 51 ... 75 76 ... 100
位置: ↑ ↑ ↑ ↑最低 Q1 Q2(中位数) Q3 最高(25%位置) (50%位置) (75%位置)
- Q1: 第25个学生的成绩(25%分位数)
- Q2: 第50个学生的成绩(50%分位数,中位数)
- Q3: 第75个学生的成绩(75%分位数)
- IQR = Q3 - Q1: 第75名和第25名之间的成绩差距
在糖尿病数据中的应用
血糖数据: [85, 90, 95, 100, 105, 110, 250] # 最后一个是异常值Q1 = 90, Q3 = 110
IQR = 110 - 90 = 20异常值检测边界:
下界 = Q1 - 1.5×IQR = 90 - 1.5×20 = 60
上界 = Q3 + 1.5×IQR = 110 + 1.5×20 = 140结论: 250超过140,是异常值!
什么是ROC
ROC曲线(Receiver Operating Characteristic Curve,受试者工作特征曲线)是评估二分类模型性能的重要工具。ROC曲线是以假正率(FPR) 为横轴,真正率(TPR) 为纵轴的曲线图。
糖尿病诊断场景
假设有100个患者,其中20个患糖尿病,80个健康:
模型A: AUC = 0.95
- 能正确识别18个糖尿病患者(TPR = 0.9)
- 只误诊8个健康人(FPR = 0.1)
模型B: AUC = 0.75 - 能正确识别16个糖尿病患者(TPR = 0.8)
- 误诊20个健康人(FPR = 0.25)
显然模型A更优秀。
注:以后代码规范性及检验模型质量参考了第三方工具,大量的知识点是在过程中学习到的。