当前位置: 首页 > java >正文

第8天:LSTM模型预测糖尿病(优化)

  • 🍨 本文为🔗365天深度学习训练营 中的学习记录博客
  • 🍖 原作者:K同学啊

具体实现

(一)环境

语言环境:Python 3.10
编 译 器: PyCharm
框 架: Pytorch

(二)具体步骤
1. 上代码
#!/usr/bin/env python3  
# -*- coding: utf-8 -*-  
"""  
基于LSTM神经网络的糖尿病预测代码  
"""  import warnings  
import os  
import logging  
from datetime import datetime  
from typing import Tuple, List, Dict, Optional  
from pathlib import Path  # 深度学习框架导入  
import torch  
import torch.nn as nn  
import torch.nn.functional as F  
from torch.utils.data import DataLoader, TensorDataset  
from torch.optim.lr_scheduler import ReduceLROnPlateau  # 数据科学工具包  
import numpy as np  
import pandas as pd  
import seaborn as sns  
import matplotlib.pyplot as plt  
from sklearn.model_selection import train_test_split, StratifiedKFold  
from sklearn.preprocessing import StandardScaler, LabelEncoder  
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, roc_curve  
import joblib  # 配置设置  
warnings.filterwarnings('ignore')  
plt.rcParams['savefig.dpi'] = 300  
plt.rcParams['figure.dpi'] = 100  
plt.rcParams['font.sans-serif'] = ['SimHei']  
plt.rcParams['axes.unicode_minus'] = False  # 设置日志  
logging.basicConfig(  level=logging.INFO,  format='%(asctime)s - %(levelname)s - %(message)s',  handlers=[  logging.FileHandler('diabetes_model.log'),  logging.StreamHandler()  ]  
)  class Config:  """配置类:集中管理所有超参数(手动配置的那些参数叫超参数)和配置"""  # 数据配置  DATA_PATH = "./data/dia.xls"  FEATURES_TO_DROP = ['卡号', '是否糖尿病']  # 保留高密度脂蛋白胆固醇  TARGET_COLUMN = '是否糖尿病'  # 模型配置 - 动态计算输入维度  HIDDEN_SIZE = 128  # 降低隐藏层维度防止过拟合  NUM_LAYERS = 2  # LSTM层数  DROPOUT_RATE = 0.3  NUM_CLASSES = 2  # 训练配置  BATCH_SIZE = 32  # 减小batch size提高训练稳定性  LEARNING_RATE = 1e-3  EPOCHS = 100  EARLY_STOPPING_PATIENCE = 15  TEST_SIZE = 0.2  VAL_SIZE = 0.2  RANDOM_STATE = 42  # 交叉验证配置  K_FOLDS = 5  # 设备配置  DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # 模型保存路径  MODEL_SAVE_PATH = "./models"  SCALER_SAVE_PATH = "./scalers"  class DataProcessor:  """负责数据加载、预处理和可视化"""  def __init__(self, config: Config):  self.config = config  self.scaler = StandardScaler()  self.logger = logging.getLogger(__name__)  # 创建保存目录  os.makedirs(config.MODEL_SAVE_PATH, exist_ok=True)  os.makedirs(config.SCALER_SAVE_PATH, exist_ok=True)  def load_data(self) -> pd.DataFrame:  """加载数据集,增加错误处理"""  try:  self.logger.info("正在加载数据...")  if not os.path.exists(self.config.DATA_PATH):  raise FileNotFoundError(f"数据文件不存在: {self.config.DATA_PATH}")  # 尝试不同的编码方式  try:  dataframe = pd.read_excel(self.config.DATA_PATH)  except UnicodeDecodeError:  dataframe = pd.read_excel(self.config.DATA_PATH, encoding='gbk')  self.logger.info(f"数据加载完成,数据形状: {dataframe.shape}")  return dataframe  except Exception as e:  self.logger.error(f"数据加载失败: {str(e)}")  raise  def explore_data(self, dataframe: pd.DataFrame) -> Dict:  """数据探索性分析,返回统计信息"""  self.logger.info("开始数据探索性分析...")  stats = {  'shape': dataframe.shape,  'columns': list(dataframe.columns),  'dtypes': dataframe.dtypes.to_dict(),  'missing_values': dataframe.isnull().sum().to_dict(),  'duplicates': dataframe.duplicated().sum(),  'target_distribution': dataframe[self.config.TARGET_COLUMN].value_counts().to_dict()  }  self.logger.info(f"数据形状: {stats['shape']}")  self.logger.info(f"缺失值: {sum(stats['missing_values'].values())}")  self.logger.info(f"重复值: {stats['duplicates']}")  self.logger.info(f"目标变量分布: {stats['target_distribution']}")  return stats  def clean_data(self, dataframe: pd.DataFrame) -> pd.DataFrame:  """数据清洗"""  self.logger.info("开始数据清洗...")  # 移除重复值  initial_shape = dataframe.shape  dataframe = dataframe.drop_duplicates()  self.logger.info(f"移除重复值: {initial_shape[0] - dataframe.shape[0]} 行")  # 处理异常值(使用IQR方法)  numeric_columns = dataframe.select_dtypes(include=[np.number]).columns  numeric_columns = [col for col in numeric_columns if col not in self.config.FEATURES_TO_DROP]  for col in numeric_columns:  Q1 = dataframe[col].quantile(0.25)  Q3 = dataframe[col].quantile(0.75)  IQR = Q3 - Q1  lower_bound = Q1 - 1.5 * IQR  upper_bound = Q3 + 1.5 * IQR  outliers = dataframe[(dataframe[col] < lower_bound) | (dataframe[col] > upper_bound)]  if len(outliers) > 0:  self.logger.info(f"列 {col} 发现 {len(outliers)} 个异常值")  # 使用中位数替换异常值  dataframe.loc[dataframe[col] < lower_bound, col] = dataframe[col].median()  dataframe.loc[dataframe[col] > upper_bound, col] = dataframe[col].median()  return dataframe  def visualize_features(self, dataframe: pd.DataFrame) -> None:  """特征分布可视化分析"""  self.logger.info("生成特征分布图...")  # 目标变量分布  plt.figure(figsize=(15, 12))  # 目标变量分布  plt.subplot(3, 3, 1)  target_counts = dataframe[self.config.TARGET_COLUMN].value_counts()  plt.pie(target_counts.values, labels=['正常', '糖尿病'], autopct='%1.1f%%')  plt.title('目标变量分布')  # 数值特征的箱线图  numeric_features = dataframe.select_dtypes(include=[np.number]).columns  numeric_features = [col for col in numeric_features if col not in self.config.FEATURES_TO_DROP]  for i, col in enumerate(numeric_features[:8], 2):  plt.subplot(3, 3, i)  sns.boxplot(x=dataframe[self.config.TARGET_COLUMN], y=dataframe[col])  plt.title(f'{col} 分布')  plt.xticks([0, 1], ['正常', '糖尿病'])  plt.tight_layout()  plt.savefig('feature_distribution.png')  plt.show()  # 相关性热力图  plt.figure(figsize=(12, 10))  correlation_matrix = dataframe[numeric_features].corr()  sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0)  plt.title('特征相关性热力图')  plt.tight_layout()  plt.savefig('correlation_heatmap.png')  plt.show()  def prepare_data(self, dataframe: pd.DataFrame) -> Tuple[  torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor, torch.Tensor]:  """数据预处理和划分"""  self.logger.info("开始数据预处理...")  # 特征和标签分离  X = dataframe.drop(self.config.FEATURES_TO_DROP, axis=1)  y = dataframe[self.config.TARGET_COLUMN]  # 动态更新输入维度  self.config.INPUT_SIZE = X.shape[1]  self.logger.info(f"输入特征维度: {self.config.INPUT_SIZE}")  # 标准化特征  X_scaled = self.scaler.fit_transform(X)  # 保存标准化器  joblib.dump(self.scaler, os.path.join(self.config.SCALER_SAVE_PATH, 'scaler.pkl'))  # 数据集划分:训练集、验证集、测试集  X_temp, X_test, y_temp, y_test = train_test_split(  X_scaled, y,  test_size=self.config.TEST_SIZE,  random_state=self.config.RANDOM_STATE,  stratify=y  )  X_train, X_val, y_train, y_val = train_test_split(  X_temp, y_temp,  test_size=self.config.VAL_SIZE,  random_state=self.config.RANDOM_STATE,  stratify=y_temp  )  # 转换为张量  X_train = torch.tensor(X_train, dtype=torch.float32).unsqueeze(1)  # 添加序列维度  X_val = torch.tensor(X_val, dtype=torch.float32).unsqueeze(1)  X_test = torch.tensor(X_test, dtype=torch.float32).unsqueeze(1)  y_train = torch.tensor(y_train.values, dtype=torch.long)  y_val = torch.tensor(y_val.values, dtype=torch.long)  y_test = torch.tensor(y_test.values, dtype=torch.long)  self.logger.info(f"训练集: {X_train.shape}, 验证集: {X_val.shape}, 测试集: {X_test.shape}")  return X_train, X_val, X_test, y_train, y_val, y_test  def create_dataloaders(self, X_train, X_val, X_test, y_train, y_val, y_test) -> Tuple[  DataLoader, DataLoader, DataLoader]:  """创建数据加载器"""  train_dl = DataLoader(  TensorDataset(X_train, y_train),  batch_size=self.config.BATCH_SIZE,  shuffle=True  )  val_dl = DataLoader(  TensorDataset(X_val, y_val),  batch_size=self.config.BATCH_SIZE,  shuffle=False  )  test_dl = DataLoader(  TensorDataset(X_test, y_test),  batch_size=self.config.BATCH_SIZE,  shuffle=False  )  return train_dl, val_dl, test_dl  class DiabetesLSTM_V2(nn.Module):  """LSTM模型"""  def __init__(self, config: Config):  super(DiabetesLSTM_V2, self).__init__()  self.lstm = nn.LSTM(  input_size=config.INPUT_SIZE,  hidden_size=config.HIDDEN_SIZE,  num_layers=config.NUM_LAYERS,  dropout=config.DROPOUT_RATE if config.NUM_LAYERS > 1 else 0,  batch_first=True,  bidirectional=True  # 双向LSTM  )  # 批量归一化  self.batch_norm = nn.BatchNorm1d(config.HIDDEN_SIZE * 2)  # 分类器  self.classifier = nn.Sequential(  nn.Linear(config.HIDDEN_SIZE * 2, config.HIDDEN_SIZE),  nn.ReLU(),  nn.Dropout(config.DROPOUT_RATE),  nn.Linear(config.HIDDEN_SIZE, config.NUM_CLASSES)  )  # 权重初始化  self._init_weights()  def _init_weights(self):  """权重初始化"""  for module in self.modules():  if isinstance(module, nn.Linear):  nn.init.xavier_uniform_(module.weight)  if module.bias is not None:  nn.init.constant_(module.bias, 0)  elif isinstance(module, nn.LSTM):  for name, param in module.named_parameters():  if 'weight' in name:  nn.init.orthogonal_(param)  elif 'bias' in name:  nn.init.constant_(param, 0)  def forward(self, x: torch.Tensor) -> torch.Tensor:  """前向传播"""  # LSTM  lstm_out, (hidden, cell) = self.lstm(x)  # 使用最后一个时间步的输出  output = lstm_out[:, -1, :]  # 批量归一化  output = self.batch_norm(output)  # 分类  output = self.classifier(output)  return output  class EarlyStopping:  """早停机制"""  def __init__(self, patience=15, min_delta=0.001):  self.patience = patience  self.min_delta = min_delta  self.counter = 0  self.best_score = None  self.early_stop = False  def __call__(self, val_loss):  if self.best_score is None:  self.best_score = val_loss  elif val_loss < self.best_score - self.min_delta:  self.best_score = val_loss  self.counter = 0  else:  self.counter += 1  if self.counter >= self.patience:  self.early_stop = True  class ModelTrainer:  """模型训练器"""  def __init__(self, config: Config):  self.config = config  self.device = config.DEVICE  self.logger = logging.getLogger(__name__)  # 初始化模型  self.model = DiabetesLSTM_V2(config).to(self.device)  # 损失函数(考虑类别不平衡)  self.loss_fn = nn.CrossEntropyLoss()  # 优化器  self.optimizer = torch.optim.Adam(  self.model.parameters(),  lr=config.LEARNING_RATE,  weight_decay=1e-5  )  # 学习率调度器  self.scheduler = ReduceLROnPlateau(  self.optimizer,  mode='min',  factor=0.5,  patience=10  # verbose=True  )  # 早停  self.early_stopping = EarlyStopping(patience=config.EARLY_STOPPING_PATIENCE)  # 训练历史  self.train_history = {  'train_loss': [],  'val_loss': [],  'train_acc': [],  'val_acc': []  }  def train_epoch(self, dataloader: DataLoader) -> Tuple[float, float]:  """训练一个epoch"""  self.model.train()  total_loss = 0  correct = 0  total = 0  for batch_idx, (data, target) in enumerate(dataloader):  data, target = data.to(self.device), target.to(self.device)  self.optimizer.zero_grad()  output = self.model(data)  loss = self.loss_fn(output, target)  loss.backward()  # 梯度裁剪  torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)  self.optimizer.step()  total_loss += loss.item()  pred = output.argmax(dim=1)  correct += pred.eq(target).sum().item()  total += target.size(0)  avg_loss = total_loss / len(dataloader)  accuracy = correct / total  return avg_loss, accuracy  def validate_epoch(self, dataloader: DataLoader) -> Tuple[float, float]:  """验证一个epoch"""  self.model.eval()  total_loss = 0  correct = 0  total = 0  with torch.no_grad():  for data, target in dataloader:  data, target = data.to(self.device), target.to(self.device)  output = self.model(data)  loss = self.loss_fn(output, target)  total_loss += loss.item()  pred = output.argmax(dim=1)  correct += pred.eq(target).sum().item()  total += target.size(0)  avg_loss = total_loss / len(dataloader)  accuracy = correct / total  return avg_loss, accuracy  def train(self, train_dl: DataLoader, val_dl: DataLoader) -> None:  """完整训练过程"""  self.logger.info("开始模型训练...")  self.logger.info(f"使用设备: {self.device}")  self.logger.info(f"模型参数数量: {sum(p.numel() for p in self.model.parameters())}")  best_val_loss = float('inf')  for epoch in range(self.config.EPOCHS):  # 训练  train_loss, train_acc = self.train_epoch(train_dl)  # 验证  val_loss, val_acc = self.validate_epoch(val_dl)  # 更新学习率  self.scheduler.step(val_loss)  # 记录历史  self.train_history['train_loss'].append(train_loss)  self.train_history['val_loss'].append(val_loss)  self.train_history['train_acc'].append(train_acc)  self.train_history['val_acc'].append(val_acc)  # 保存最佳模型  if val_loss < best_val_loss:  best_val_loss = val_loss  self.save_model('best_model.pth')  # 早停检查  self.early_stopping(val_loss)  # 日志输出  self.logger.info(  f'Epoch {epoch + 1}/{self.config.EPOCHS} - '                f'Train Loss: {train_loss:.4f} - Train Acc: {train_acc:.4f} - '                f'Val Loss: {val_loss:.4f} - Val Acc: {val_acc:.4f} - '                f'LR: {self.optimizer.param_groups[0]["lr"]:.6f}'  )  if self.early_stopping.early_stop:  self.logger.info(f"早停在第 {epoch + 1} 轮")  break  self.logger.info("训练完成")  def evaluate(self, test_dl: DataLoader) -> Dict:  """模型评估"""  self.model.eval()  all_preds = []  all_targets = []  all_probs = []  with torch.no_grad():  for data, target in test_dl:  data, target = data.to(self.device), target.to(self.device)  output = self.model(data)  probs = F.softmax(output, dim=1)  preds = output.argmax(dim=1)  all_preds.extend(preds.cpu().numpy())  all_targets.extend(target.cpu().numpy())  all_probs.extend(probs.cpu().numpy())  # 计算指标  accuracy = (np.array(all_preds) == np.array(all_targets)).mean()  auc = roc_auc_score(all_targets, np.array(all_probs)[:, 1])  # 分类报告  report = classification_report(all_targets, all_preds, output_dict=True)  results = {  'accuracy': accuracy,  'auc': auc,  'classification_report': report,  'predictions': all_preds,  'true_labels': all_targets,  'probabilities': all_probs  }  self.logger.info(f"测试准确率: {accuracy:.4f}")  self.logger.info(f"AUC: {auc:.4f}")  return results  def save_model(self, filename: str):  """保存模型"""  filepath = os.path.join(self.config.MODEL_SAVE_PATH, filename)  torch.save({  'model_state_dict': self.model.state_dict(),  'optimizer_state_dict': self.optimizer.state_dict(),  'config': self.config,  'train_history': self.train_history  }, filepath)  self.logger.info(f"模型已保存至: {filepath}")  def plot_training_history(self) -> None:  """绘制训练历史"""  epochs_range = range(1, len(self.train_history['train_loss']) + 1)  plt.figure(figsize=(15, 5))  # 损失图  plt.subplot(1, 3, 1)  plt.plot(epochs_range, self.train_history['train_loss'], label='训练损失')  plt.plot(epochs_range, self.train_history['val_loss'], label='验证损失')  plt.xlabel('Epochs')  plt.ylabel('Loss')  plt.title('模型损失')  plt.legend()  plt.grid(True)  # 准确率图  plt.subplot(1, 3, 2)  plt.plot(epochs_range, self.train_history['train_acc'], label='训练准确率')  plt.plot(epochs_range, self.train_history['val_acc'], label='验证准确率')  plt.xlabel('Epochs')  plt.ylabel('Accuracy')  plt.title('模型准确率')  plt.legend()  plt.grid(True)  # ROC曲线(需要在evaluate方法调用后)  plt.subplot(1, 3, 3)  plt.text(0.5, 0.5, 'ROC曲线\n(需要在evaluate后显示)',  ha='center', va='center', transform=plt.gca().transAxes)  plt.title('ROC曲线')  plt.tight_layout()  plt.savefig('training_history.png')  plt.show()  def plot_roc_curve(self, results: Dict):  """绘制ROC曲线"""  fpr, tpr, _ = roc_curve(results['true_labels'],  np.array(results['probabilities'])[:, 1])  plt.figure(figsize=(8, 6))  plt.plot(fpr, tpr, label=f'ROC Curve (AUC = {results["auc"]:.4f})')  plt.plot([0, 1], [0, 1], 'k--', label='Random')  plt.xlabel('False Positive Rate')  plt.ylabel('True Positive Rate')  plt.title('ROC Curve')  plt.legend()  plt.grid(True)  plt.savefig('roc_curve.png')  plt.show()  def main():  """主函数"""  logger = logging.getLogger(__name__)  logger.info("糖尿病预测深度学习模型训练开始")  # 配置  config = Config()  # 数据处理  data_processor = DataProcessor(config)  try:  # 数据加载和预处理  dataframe = data_processor.load_data()  stats = data_processor.explore_data(dataframe)  dataframe = data_processor.clean_data(dataframe)  data_processor.visualize_features(dataframe)  # 数据划分  X_train, X_val, X_test, y_train, y_val, y_test = data_processor.prepare_data(dataframe)  train_dl, val_dl, test_dl = data_processor.create_dataloaders(X_train, X_val, X_test, y_train, y_val, y_test)  # 模型训练  trainer = ModelTrainer(config)  trainer.train(train_dl, val_dl)  # 模型评估  results = trainer.evaluate(test_dl)  # 可视化结果  trainer.plot_training_history()  trainer.plot_roc_curve(results)  logger.info("训练流程完成!")  except Exception as e:  logger.error(f"训练过程中出现错误: {str(e)}")  raise  if __name__ == "__main__":  main()

运行结果:
image.png
image.png
image.png
image.png

2. 什么是超参数

超参数(Hyperparameters)是在模型训练开始之前需要手动设置的参数,它们控制着学习过程本身,而不是通过训练数据学习得到的。

超参数 vs 参数的区别
类型超参数参数
定义控制学习过程的配置模型内部的权重和偏置
设置时间训练前手动设置训练过程中自动学习
举例学习率、批次大小、网络层数神经网络的权重矩阵
类比学习方法和环境设置学习到的知识内容
3. 什么是IQR

IQR = Inter Quartile Range = 四分位距,就像把班级同学按成绩排队,然后找到中间50%同学的成绩范围!

四分位数解释

想象100个学生按成绩排序:

学生排序: 1  2  3  ... 25  26  ... 50  51  ... 75  76  ... 100
位置:     ↑              ↑               ↑               ↑最低           Q1             Q2(中位数)        Q3           最高(25%位置)       (50%位置)      (75%位置)
  • Q1: 第25个学生的成绩(25%分位数)
  • Q2: 第50个学生的成绩(50%分位数,中位数)
  • Q3: 第75个学生的成绩(75%分位数)
  • IQR = Q3 - Q1: 第75名和第25名之间的成绩差距
在糖尿病数据中的应用
血糖数据: [85, 90, 95, 100, 105, 110, 250] # 最后一个是异常值Q1 = 90, Q3 = 110
IQR = 110 - 90 = 20异常值检测边界:
下界 = Q1 - 1.5×IQR = 90 - 1.5×20 = 60
上界 = Q3 + 1.5×IQR = 110 + 1.5×20 = 140结论: 250超过140,是异常值!
什么是ROC

ROC曲线(Receiver Operating Characteristic Curve,受试者工作特征曲线)是评估二分类模型性能的重要工具。ROC曲线是以假正率(FPR) 为横轴,真正率(TPR) 为纵轴的曲线图。

糖尿病诊断场景

假设有100个患者,其中20个患糖尿病,80个健康:
模型A: AUC = 0.95

  • 能正确识别18个糖尿病患者(TPR = 0.9)
  • 只误诊8个健康人(FPR = 0.1)
    模型B: AUC = 0.75
  • 能正确识别16个糖尿病患者(TPR = 0.8)
  • 误诊20个健康人(FPR = 0.25)
    显然模型A更优秀。

注:以后代码规范性及检验模型质量参考了第三方工具,大量的知识点是在过程中学习到的。

http://www.xdnf.cn/news/15354.html

相关文章:

  • 2025年采购管理系统深度测评
  • 小架构step系列14:白盒集成测试原理
  • 北京饮马河科技公司 Java 实习面经
  • DeepSeek 本地部署
  • LeetCode经典题解:206、两数之和(Two Sum)
  • 面向对象的设计模式
  • Vue+axios
  • XML vs JSON:核心区别与最佳选择
  • 前端常见十大问题讲解
  • 基于esp32系列的开源无线dap-link项目使用介绍
  • 机器人形态的几点讨论
  • GNhao,长期使用跨境手机SIM卡成为新趋势!
  • hive的相关的优化
  • flink 中配置hadoop 遇到问题解决
  • C++类与对象(上)
  • Kubernetes Ingress:实现HTTPHTTPS流量管理
  • 多客户端 - 服务器结构-实操
  • apt-get update失败解决办法
  • 15.Python 列表元素的偏移
  • k8s-高级调度(二)
  • 构建完整工具链:GCC/G++ + Makefile + Git 自动化开发流程
  • 【安卓笔记】线程基本使用:锁、锁案例
  • 学习开发之无参与有参
  • 【操作系统】strace 跟踪系统调用(一)
  • 删除screen会话以及查看进程信息的方法
  • DAY02:【ML 第一弹】KNN算法
  • Vue3 实现文件上传功能
  • 完整 Spring Boot + Vue 登录系统
  • EtherCAT开源主站 SOEM 2.0 最新源码在嵌入式 Linux 下的移植与编译
  • 【读书笔记】《C++ Software Design》第九章:The Decorator Design Pattern