《2025国赛/高教杯》C题 完整实战教程(代码+公式详解)
NIPT数学建模完整实战教程(代码+公式详解)
- 一、前言
- 最终成绩单:
- 二、环境准备和数据加载
- 完整环境配置
- 数据加载和预处理
- 标准化数据分割
- 三、问题一:Y染色体浓度预测(核心挑战)
- 数学建模思路
- 第一步:高级特征工程(代码+公式详解)
- 第二步:集成学习模型(数学+代码详解)
- 四、问题二:BMI分组优化(多目标优化实战)
- 数学建模思路
- 完整代码实现:
- 三、问题三:多因素综合建模(超越极限)
- 数学建模思路
- 第一步:28维多因素特征空间构建(数学+代码对应)
- 第二步:ElasticNet正则化模型(数学+代码详解)
- 四、问题四:女胎异常检测(极度不平衡分类的终极挑战)
- 数学建模思路
- 第一步:三级筛查系统设计(数学+代码详解)
- 五、完整项目总结与成果展示
- 🏆 突破性成果总览
- 技术突破记录
- 数学方法创新点
- 💡 个人感悟与经验分享
- 踩坑记录与解决方案
一、前言
兄弟们好!这次给大家完整分享NIPT数学建模比赛的解题过程,重点是把代码实现和数学公式一一对应讲清楚。我会把每个步骤的数学原理、代码实现、以及为什么这么做都详细解释。
最终成绩单:
问题 | 数学模型 | 核心算法 | 最终结果 | 目标达成 |
---|---|---|---|---|
问题一 | 高维回归 | 特征工程+集成学习 | R² = 0.9459 | ✅ 超越0.9 |
问题二 | 多目标优化 | 遗传算法+TOPSIS | 5组分层 | ✅ 3组100%合格 |
问题三 | 高阶交互建模 | ElasticNet正则化 | R² = 0.9726 | ✅ 新纪录 |
问题四 | 不平衡分类 | 三级筛查+SMOTE | AUC = 0.792 | ✅ 良好性能 |
二、环境准备和数据加载
完整环境配置
# 创建环境
conda create -n nipt python=3.9
conda activate nipt# 核心包
pip install pandas numpy scikit-learn matplotlib seaborn
pip install xgboost lightgbm catboost # 集成学习三剑客
pip install imbalanced-learn # 处理样本不平衡
pip install openpyxl tqdm # 工具包
数据加载和预处理
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.metrics import r2_score, mean_squared_error
import warnings
warnings.filterwarnings('ignore')def load_and_explore_data():"""数据加载和初步探索重要:有两个sheet,用途不同!"""print("📊 加载NIPT数据...")# Sheet 0: 男胎数据(问题1,2,3用)male_data = pd.read_excel('附件.xlsx', sheet_name=0)# Sheet 1: 女胎数据(问题4用)female_data = pd.read_excel('附件.xlsx', sheet_name=1)print(f"男胎数据形状: {male_data.shape}")print(f"女胎数据形状: {female_data.shape}")print("\n男胎数据列名:")print(male_data.columns.tolist())print("\n数据基本信息:")print(male_data.describe())# 检查目标变量分布plt.figure(figsize=(12, 4))plt.subplot(1, 3, 1)plt.hist(male_data['Y染色体浓度'], bins=50, alpha=0.7, color='skyblue')plt.axvline(x=0.04, color='red', linestyle='--', label='合格线(0.04)')plt.title('Y染色体浓度分布')plt.xlabel('浓度值')plt.ylabel('频数')plt.legend()plt.subplot(1, 3, 2)plt.scatter(male_data['BMI'], male_data['Y染色体浓度'], alpha=0.5)plt.axhline(y=0.04, color='red', linestyle='--', alpha=0.7)plt.title('BMI vs Y染色体浓度')plt.xlabel('BMI')plt.ylabel('Y染色体浓度')plt.subplot(1, 3, 3)plt.scatter(male_data['weeks'], male_data['Y染色体浓度'], alpha=0.5, color='green')plt.axhline(y=0.04, color='red', linestyle='--', alpha=0.7)plt.title('孕周 vs Y染色体浓度')plt.xlabel('孕周')plt.ylabel('Y染色体浓度')plt.tight_layout()plt.show()# 计算合格率qualified_rate = (male_data['Y染色体浓度'] >= 0.04).mean()print(f"\n📈 原始合格率: {qualified_rate:.3f} ({qualified_rate*100:.1f}%)")return male_data, female_data# 加载数据
male_data, female_data = load_and_explore_data()
标准化数据分割
def split_data_712(X, y, random_state=42):"""7:1:2 分层数据分割数学原理:确保训练、验证、测试集的目标变量分布一致P(y ∈ Q_k | D_train) = P(y ∈ Q_k | D_val) = P(y ∈ Q_k | D_test)其中 Q_k 为第k个四分位数区间"""print("🔄 执行 7:1:2 分层数据分割...")# 第一次分割:70% vs 30%X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=random_state, stratify=pd.cut(y, bins=4, labels=False) # 按四分位数分层)# 第二次分割:10% vs 20%X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.667, random_state=random_state,stratify=pd.cut(y_temp, bins=4, labels=False))print(f"训练集: {X_train.shape[0]} 样本 ({X_train.shape[0]/len(X)*100:.1f}%)")print(f"验证集: {X_val.shape[0]} 样本 ({X_val.shape[0]/len(X)*100:.1f}%)")print(f"测试集: {X_test.shape[0]} 样本 ({X_test.shape[0]/len(X)*100:.1f}%)")# 检查分布一致性print("\n分布检查:")for dataset_name, y_data in [("训练集", y_train), ("验证集", y_val), ("测试集", y_test)]:qualified_rate = (y_data >= 0.04).mean()print(f"{dataset_name} 合格率: {qualified_rate:.3f}")return X_train, X_val, X_test, y_train, y_val, y_test
三、问题一:Y染色体浓度预测(核心挑战)
数学建模思路
目标函数:
maxθR2=1−∑i=1n(yi−fθ(xi))2∑i=1n(yi−yˉ)2≥0.9\max_{\theta} R^2 = 1 - \frac{\sum_{i=1}^{n}(y_i - f_\theta(x_i))^2}{\sum_{i=1}^{n}(y_i - \bar{y})^2} \geq 0.9θmaxR2=1−∑i=1n(yi−yˉ)2∑i=1n(yi−fθ(xi))2≥0.9
约束条件: R2≥0.9R^2 \geq 0.9R2≥0.9,这等价于预测误差不超过总方差的10%。
解决方案: 特征工程 + 集成学习
第一步:高级特征工程(代码+公式详解)
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.feature_selection import SelectKBest, mutual_info_regressionclass AdvancedFeatureEngineering:"""高级特征工程类实现:X_original (9维) → X_engineered (1053维) → X_selected (120维)数学基础:Φ(X) = [Φ_poly(X), Φ_interact(X), Φ_stat(X), Φ_transform(X)]"""def __init__(self):self.selectors = {}self.feature_names = []def polynomial_features(self, X, degrees=[2, 3, 4]):"""多项式特征扩展数学公式:Φ_poly(x_j) = [x_j^2, x_j^3, x_j^4]生物学解释:- x_j^2: 二次关系,如BMI对血液稀释的二次效应- x_j^3: 三次关系,如年龄对生理功能的非线性影响 - x_j^4: 四次关系,捕获更复杂的生物学过程特征数量:9 × 3 = 27 个多项式特征"""print("🔧 生成多项式特征...")poly_features = []feature_names = []for col in X.columns:if col == 'Y染色体浓度': # 跳过目标变量continuefor degree in degrees:# 多项式变换:x^degreepoly_feature = np.power(X[col], degree)poly_features.append(poly_feature)feature_names.append(f'{col}_pow{degree}')# 打印样例if len(feature_names) <= 3:print(f" 📝 {col}^{degree}: 前3个值 = {poly_feature.iloc[:3].values}")poly_df = pd.concat(poly_features, axis=1)poly_df.columns = feature_namesprint(f" ✅ 多项式特征完成: {len(feature_names)} 维")return poly_dfdef interaction_features(self, X):"""交互特征构建数学公式:Φ_interact = {x_i ⊗ x_j | i < j, ⊗ ∈ {×, +, |−|, ÷}}四种交互方式:1. 乘积: x_i × x_j (协同效应)2. 加和: x_i + x_j (累积效应) 3. 差值: |x_i - x_j| (对比效应)4. 比值: x_i / (x_j + ε) (相对效应)特征数量:C(9,2) × 4 = 36 × 4 = 144 个"""print("🔗 生成交互特征...")interaction_features = []feature_names = []# 数值型特征numeric_cols = [col for col in X.columns if X[col].dtype in ['int64', 'float64'] and col != 'Y染色体浓度']print(f" 📋 参与交互的特征: {numeric_cols}")for i in range(len(numeric_cols)):for j in range(i+1, len(numeric_cols)):col1, col2 = numeric_cols[i], numeric_cols[j]# 1. 乘积交互 - 捕获协同效应multiply_feat = X[col1] * X[col2]interaction_features.append(multiply_feat)feature_names.append(f'{col1}×{col2}')# 2. 加和交互 - 捕获累积效应add_feat = X[col1] + X[col2]interaction_features.append(add_feat)feature_names.append(f'{col1}+{col2}')# 3. 差值交互 - 捕获对比效应diff_feat = np.abs(X[col1] - X[col2])interaction_features.append(diff_feat)feature_names.append(f'|{col1}-{col2}|')# 4. 比值交互 - 捕获相对效应(避免除零)ratio_feat = X[col1] / (X[col2] + 1e-8)interaction_features.append(ratio_feat)feature_names.append(f'{col1}/{col2}')# 重要的三阶交互项(基于生物学知识)important_triplets = [('age', 'BMI', 'weeks'), # 年龄-体重-孕周三重影响('height', 'weight', 'GC_content') # 体格-技术质量关联]for triplet in important_triplets:if all(col in X.columns for col in triplet):col1, col2, col3 = triplettriplet_feat = X[col1] * X[col2] * X[col3]interaction_features.append(triplet_feat)feature_names.append(f'{col1}×{col2}×{col3}')interact_df = pd.concat(interaction_features, axis=1)interact_df.columns = feature_namesprint(f" ✅ 交互特征完成: {len(feature_names)} 维")# 展示重要交互特征的样例print(" 📊 重要交互特征示例:")for i, feat_name in enumerate(feature_names[:3]):feat_values = interact_df[feat_name].iloc[:3]print(f" {feat_name}: {feat_values.values}")return interact_dfdef statistical_features(self, X):"""统计变换特征数学公式:Φ_stat(x_j) = [z_j, rank_j, Q_{0.25}^j, Q_{0.75}^j]其中:- z_j = (x_j - μ_j) / σ_j (Z分数标准化)- rank_j = rank(x_j) / n (排序位置信息)- Q_k^j = I[x_j ∈ Q_k] (分位数指示变量)统计意义:- Z分数消除量纲影响,突出相对位置- 排序特征保持单调性,对异常值鲁棒- 分位数指示捕获分段线性关系"""print("📊 生成统计变换特征...")stat_features = []feature_names = []numeric_cols = [col for col in X.columns if X[col].dtype in ['int64', 'float64'] and col != 'Y染色体浓度']for col in numeric_cols:# 1. Z分数标准化z_score = (X[col] - X[col].mean()) / (X[col].std() + 1e-8)stat_features.append(z_score)feature_names.append(f'{col}_zscore')# 2. 排序特征rank_feature = X[col].rank() / len(X[col])stat_features.append(rank_feature)feature_names.append(f'{col}_rank')# 3. 分位数指示变量q25, q75 = X[col].quantile(0.25), X[col].quantile(0.75)q25_indicator = (X[col] <= q25).astype(int)q75_indicator = (X[col] >= q75).astype(int)stat_features.extend([q25_indicator, q75_indicator])feature_names.extend([f'{col}_q25', f'{col}_q75'])stat_df = pd.concat(stat_features, axis=1)stat_df.columns = feature_namesprint(f" ✅ 统计特征完成: {len(feature_names)} 维")# 显示Z分数示例print(" 📈 Z分数变换示例 (BMI):")if 'BMI' in X.columns:original = X['BMI'].iloc[:5]transformed = z_score.iloc[:5]print(f" 原始值: {original.values}")print(f" Z分数: {transformed.values}")return stat_dfdef nonlinear_transforms(self, X):"""非线性变换特征数学公式:Φ_transform(x_j) = [√x_j^+, ∛x_j, log(1+x_j^+), sin(π·x̃_j), cos(π·x̃_j)]其中:- x_j^+ = max(x_j, 0) (确保非负)- x̃_j = (x_j - min) / (max - min) (标准化到[0,1])生物学意义:- √x: 压缩大值,适合右偏分布(如BMI)- ∛x: 立方根保持符号,适合可能为负的特征- log(1+x): 对数变换,适合指数型分布- sin/cos: 捕获周期性模式(如孕期周期)"""print("🔄 生成非线性变换特征...")transform_features = []feature_names = []numeric_cols = [col for col in X.columns if X[col].dtype in ['int64', 'float64'] and col != 'Y染色体浓度']for col in numeric_cols:# 确保非负值x_positive = np.maximum(X[col], 0)# 标准化到[0,1]区间x_min, x_max = X[col].min(), X[col].max()x_normalized = (X[col] - x_min) / (x_max - x_min + 1e-8)# 1. 平方根变换(适合右偏分布)sqrt_feat = np.sqrt(x_positive)transform_features.append(sqrt_feat)feature_names.append(f'{col}_sqrt')# 2. 立方根变换(保持符号)cbrt_feat = np.sign(X[col]) * np.power(np.abs(X[col]), 1/3)transform_features.append(cbrt_feat)feature_names.append(f'{col}_cbrt')# 3. 对数变换log_feat = np.log1p(x_positive)transform_features.append(log_feat)feature_names.append(f'{col}_log1p')# 4. 三角函数变换(特别适合孕周等周期性特征)if 'weeks' in col.lower():sin_feat = np.sin(x_normalized * 2 * np.pi)cos_feat = np.cos(x_normalized * 2 * np.pi)transform_features.extend([sin_feat, cos_feat])feature_names.extend([f'{col}_sin', f'{col}_cos'])print(f" 🔄 {col} 周期性变换:")print(f" 原始值: {X[col].iloc[:3].values}")print(f" sin变换: {sin_feat.iloc[:3].values}")print(f" cos变换: {cos_feat.iloc[:3].values}")transform_df = pd.concat(transform_features, axis=1)transform_df.columns = feature_namesprint(f" ✅ 非线性变换完成: {len(feature_names)} 维")return transform_dfdef feature_selection(self, X, y, k_best=120):"""基于互信息的特征选择数学原理:I(X_j; Y) = ∫∫ p(x,y) log[p(x,y) / (p(x)p(y))] dx dy选择策略:选择使互信息最大的K个特征:F_selected = argmax_{|F|=K} Σ_{j∈F} I(X_j; Y)优势:- 捕获非线性关系(比相关系数更强)- 对特征分布无假设- 直接优化预测相关性"""print(f"🎯 基于互信息选择 Top {k_best} 特征...")# 处理异常值X_clean = X.replace([np.inf, -np.inf], np.nan)X_clean = X_clean.fillna(X_clean.median())# 互信息特征选择selector = SelectKBest(mutual_info_regression, k=k_best)X_selected = selector.fit_transform(X_clean, y)# 获取选中的特征名和重要性分数selected_mask = selector.get_support()selected_features = X.columns[selected_mask]feature_scores = selector.scores_[selected_mask]# 按重要性排序importance_ranking = sorted(zip(selected_features, feature_scores), key=lambda x: x[1], reverse=True)print(f" ✅ 特征选择完成,从 {X.shape[1]} 维降到 {k_best} 维")print(" 🏆 Top 10 重要特征及其互信息分数:")for i, (feat, score) in enumerate(importance_ranking[:10]):print(f" {i+1:2d}. {feat:25s}: {score:.4f}")# 保存选择器self.selectors['main'] = selectorreturn pd.DataFrame(X_selected, columns=selected_features, index=X.index)def fit_transform(self, X, y=None, k_best=120):"""完整特征工程流水线数学流程:X(9维) → Φ_poly(27维) → Φ_interact(144维) → Φ_stat(90维) → Φ_transform(120维)→ X_engineered(1053维) → X_selected(120维)"""print("🚀 开始完整特征工程流水线")print("="*60)original_shape = X.shapeprint(f"输入: {original_shape[0]} 样本 × {original_shape[1]} 特征")# 步骤1: 多项式特征poly_features = self.polynomial_features(X)# 步骤2: 交互特征interact_features = self.interaction_features(X)# 步骤3: 统计特征stat_features = self.statistical_features(X)# 步骤4: 非线性变换transform_features = self.nonlinear_transforms(X)# 合并所有特征all_features = [X]if not poly_features.empty:all_features.append(poly_features)if not interact_features.empty:all_features.append(interact_features)if not stat_features.empty:all_features.append(stat_features)if not transform_features.empty:all_features.append(transform_features)X_engineered = pd.concat(all_features, axis=1)# 移除目标变量feature_cols = [col for col in X_engineered.columns if col != 'Y染色体浓度']X_engineered = X_engineered[feature_cols]print(f"🔧 特征工程后: {X_engineered.shape[1]} 维")# 步骤5: 特征选择if y is not None and k_best > 0:X_selected = self.feature_selection(X_engineered, y, k_best)print("="*60)print(f"✅ 最终输出: {X_selected.shape[0]} 样本 × {X_selected.shape[1]} 特征")return X_selectedreturn X_engineered# 使用示例
def run_feature_engineering(data, target_col='Y染色体浓度'):"""运行完整特征工程示例"""print("🎯 问题一:Y染色体浓度预测 - 特征工程")print("="*80)# 分离特征和目标feature_cols = [col for col in data.columns if col != target_col]X = data[feature_cols].copy()y = data[target_col].copy()# 数据分割X_train, X_val, X_test, y_train, y_val, y_test = split_data_712(X, y)# 特征工程feature_engineer = AdvancedFeatureEngineering()X_train_engineered = feature_engineer.fit_transform(X_train, y_train, k_best=120)# 对验证集和测试集应用相同变换X_val_engineered = pd.DataFrame(feature_engineer.selectors['main'].transform(pd.concat([X_val,feature_engineer.polynomial_features(X_val),feature_engineer.interaction_features(X_val),feature_engineer.statistical_features(X_val),feature_engineer.nonlinear_transforms(X_val)], axis=1).replace([np.inf, -np.inf], np.nan).fillna(0)),columns=X_train_engineered.columns,index=X_val.index)X_test_engineered = pd.DataFrame(feature_engineer.selectors['main'].transform(pd.concat([X_test,feature_engineer.polynomial_features(X_test),feature_engineer.interaction_features(X_test),feature_engineer.statistical_features(X_test),feature_engineer.nonlinear_transforms(X_test)], axis=1).replace([np.inf, -np.inf], np.nan).fillna(0)),columns=X_train_engineered.columns,index=X_test.index)print("✅ 特征工程完成!")return (X_train_engineered, X_val_engineered, X_test_engineered, y_train, y_val, y_test, feature_engineer)# 执行特征工程
# engineered_data = run_feature_engineering(male_data)
第二步:集成学习模型(数学+代码详解)
import xgboost as xgb
import lightgbm as lgb
from sklearn.ensemble import RandomForestRegressor, ExtraTreesRegressor, GradientBoostingRegressor
from sklearn.linear_model import Ridge, ElasticNet
from sklearn.metrics import r2_score, mean_squared_error
import numpy as npclass AdvancedEnsemble:"""高级集成学习模型数学基础:ŷ_ensemble = Σ(k=1 to K) w_k × f_k(x)其中:- f_k(x): 第k个基础学习器的预测函数- w_k: 第k个模型的权重,满足 Σw_k = 1- 权重计算:w_k = (R²_k)³ / Σ(R²_j)³ (立方权重)立方权重的数学直觉:假设两个模型R²分别为0.8和0.9- 线性权重:0.8:0.9 ≈ 0.47:0.53- 立方权重:0.8³:0.9³ = 0.512:0.729 ≈ 0.41:0.59立方权重更突出优秀模型的贡献"""def __init__(self):self.models = {}self.weights = {}self.model_performance = {}def initialize_models(self):"""初始化11个异构基础学习器设计原理:- 模型多样性:不同算法有不同的归纳偏置- 参数多样性:同一算法的不同参数配置- 互补性:树模型+线性模型的组合"""print("🤖 初始化集成学习器...")self.models = {# XGBoost系列(3个配置)- 梯度提升决策树'xgb_deep': xgb.XGBRegressor(n_estimators=1000, max_depth=8, learning_rate=0.05,subsample=0.8, colsample_bytree=0.8, random_state=42, n_jobs=-1),'xgb_wide': xgb.XGBRegressor(n_estimators=800, max_depth=6, learning_rate=0.1,subsample=0.9, colsample_bytree=0.9,random_state=43, n_jobs=-1),'xgb_fast': xgb.XGBRegressor(n_estimators=1200, max_depth=4, learning_rate=0.08,subsample=0.85, colsample_bytree=0.7,random_state=44, n_jobs=-1),# LightGBM系列(3个配置)- 高效梯度提升'lgb_deep': lgb.LGBMRegressor(n_estimators=1000, max_depth=8, learning_rate=0.05,subsample=0.8, colsample_bytree=0.8,random_state=42, n_jobs=-1, verbose=-1),'lgb_wide': lgb.LGBMRegressor(n_estimators=800, max_depth=6, learning_rate=0.1,subsample=0.9, colsample_bytree=0.9,random_state=43, n_jobs=-1, verbose=-1),'lgb_balanced': lgb.LGBMRegressor(n_estimators=1200, max_depth=5, learning_rate=0.07,subsample=0.85, colsample_bytree=0.85,random_state=44, n_jobs=-1, verbose=-1),# 随机森林系列(2个配置)- Bagging集成'rf_deep': RandomForestRegressor(n_estimators=500, max_depth=15, min_samples_split=5,random_state=42, n_jobs=-1),'rf_wide': RandomForestRegressor(n_estimators=300, max_depth=20, min_samples_split=3,random_state=43, n_jobs=-1),# 梯度提升回归(2个配置)'gb_conservative': GradientBoostingRegressor(n_estimators=500, max_depth=5, learning_rate=0.1,random_state=42),'gb_aggressive': GradientBoostingRegressor(n_estimators=300, max_depth=7, learning_rate=0.15,random_state=43),# 极端随机树 - 高随机性'et_balanced': ExtraTreesRegressor(n_estimators=400, max_depth=12, min_samples_split=4,random_state=42, n_jobs=-1)}print(f" ✅ 初始化了 {len(self.models)} 个基础学习器")# 打印模型配置摘要for name, model in self.models.items():if hasattr(model, 'n_estimators'):print(f" {name}: {type(model).__name__} (n_est={model.n_estimators})")def train_models(self, X_train, y_train, X_val, y_val):"""训练所有基础模型并计算权重数学过程:1. 对每个模型 f_k,计算验证集 R²: R²_k = 1 - SS_res/SS_tot2. 计算立方权重: w_k = (R²_k)³ / Σ(R²_j)³3. 归一化确保: Σw_k = 1"""print("🚀 开始训练集成模型...")print("-" * 50)val_scores = {}train_scores = {}for name, model in self.models.items():print(f"🔄 训练 {name}...")# 训练模型model.fit(X_train, y_train)# 训练集预测train_pred = model.predict(X_train)train_r2 = r2_score(y_train, train_pred)train_scores[name] = train_r2# 验证集预测val_pred = model.predict(X_val)val_r2 = r2_score(y_val, val_pred)val_scores[name] = val_r2print(f" 📊 {name}: 训练R²={train_r2:.4f}, 验证R²={val_r2:.4f}")# 保存性能记录self.model_performance[name] = {'train_r2': train_r2,'val_r2': val_r2,'val_pred': val_pred}# 计算立方权重print("\n🎯 计算模型权重(立方权重策略)...")self.compute_cubic_weights(val_scores)return val_scoresdef compute_cubic_weights(self, val_scores):"""计算立方权重数学公式详解:w_k = (max(R²_k, 0))³ / Σ(max(R²_j, 0))³为什么用立方?1. 非线性放大:优秀模型权重显著增加2. 抑制劣质:表现差的模型权重快速衰减3. 数学稳定:保持权重和为1的约束"""print(" 🧮 权重计算过程:")# 确保所有R²非负(防止负权重)positive_scores = {name: max(score, 0) for name, score in val_scores.items()}# 立方缩放cubic_scores = {name: score ** 3 for name, score in positive_scores.items()}print(f" 立方后分数: {cubic_scores}")# 归一化total_cubic_score = sum(cubic_scores.values())if total_cubic_score > 0:self.weights = {name: score / total_cubic_score for name, score in cubic_scores.items()}else:# 如果所有模型都很差,使用均匀权重self.weights = {name: 1.0 / len(self.models) for name in self.models.keys()}print("\n 🏆 最终模型权重排序:")sorted_weights = sorted(self.weights.items(), key=lambda x: x[1], reverse=True)for i, (name, weight) in enumerate(sorted_weights):r2_score = val_scores[name]print(f" {i+1:2d}. {name:15s}: {weight:.4f} (R²={r2_score:.4f})")# 验证权重和weight_sum = sum(self.weights.values())print(f"\n ✅ 权重总和: {weight_sum:.6f} (应该=1.0)")def predict(self, X):"""集成预测数学公式:ŷ_ensemble(x) = Σ(k=1 to K) w_k × f_k(x)其中每个 f_k(x) 是训练好的基础学习器"""ensemble_pred = np.zeros(len(X))print("🔮 执行集成预测...")for name, model in self.models.items():weight = self.weights.get(name, 0)if weight > 0:pred = model.predict(X)weighted_pred = weight * predensemble_pred += weighted_predprint(f" {name}: 权重={weight:.4f}, 贡献={np.mean(weighted_pred):.6f}")return ensemble_preddef evaluate_performance(self, X_test, y_test):"""全面性能评估评估指标:1. R² = 1 - SS_res/SS_tot (决定系数)2. RMSE = √(MSE) (均方根误差) 3. MAE = Σ|y_true - y_pred|/n (平均绝对误差)"""print("📊 集成模型性能评估")print("=" * 40)# 集成预测ensemble_pred = self.predict(X_test)# 计算指标test_r2 = r2_score(y_test, ensemble_pred)test_rmse = np.sqrt(mean_squared_error(y_test, ensemble_pred))test_mae = np.mean(np.abs(y_test - ensemble_pred))print(f"🎯 测试集性能:")print(f" R² Score: {test_r2:.6f}")print(f" RMSE: {test_rmse:.6f}")print(f" MAE: {test_mae:.6f}")# 与单个模型对比print(f"\n📈 与单模型对比:")best_single_r2 = max(perf['val_r2'] for perf in self.model_performance.values())improvement = (test_r2 - best_single_r2) / best_single_r2 * 100print(f" 最佳单模型验证R²: {best_single_r2:.6f}")print(f" 集成模型测试R²: {test_r2:.6f}")print(f" 性能提升: {improvement:.2f}%")# 预测区间分析print(f"\n🔍 预测分析:")print(f" 预测值范围: [{ensemble_pred.min():.6f}, {ensemble_pred.max():.6f}]")print(f" 真实值范围: [{y_test.min():.6f}, {y_test.max():.6f}]")# 合格率分析(NIPT业务指标)pred_qualified = (ensemble_pred >= 0.04).sum()true_qualified = (y_test >= 0.04).sum()print(f" 预测合格样本: {pred_qualified}/{len(y_test)} ({pred_qualified/len(y_test)*100:.1f}%)")print(f" 实际合格样本: {true_qualified}/{len(y_test)} ({true_qualified/len(y_test)*100:.1f}%)")return {'test_r2': test_r2,'test_rmse': test_rmse,'test_mae': test_mae,'predictions': ensemble_pred,'improvement': improvement}# 完整训练流程
def train_problem1_model(X_train_eng, X_val_eng, X_test_eng, y_train, y_val, y_test):"""问题一完整训练流程"""print("🎯 问题一:Y染色体浓度预测 - 模型训练")print("=" * 80)# 初始化集成模型ensemble = AdvancedEnsemble()ensemble.initialize_models()# 训练模型val_scores = ensemble.train_models(X_train_eng, y_train, X_val_eng, y_val)# 测试集评估performance = ensemble.evaluate_performance(X_test_eng, y_test)# 可视化结果plt.figure(figsize=(15, 5))# 预测vs真实值散点图plt.subplot(1, 3, 1)plt.scatter(y_test, performance['predictions'], alpha=0.6)plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--', lw=2)plt.xlabel('真实值')plt.ylabel('预测值')plt.title(f'预测 vs 真实\nR² = {performance["test_r2"]:.4f}')plt.grid(True, alpha=0.3)# 残差分析plt.subplot(1, 3, 2)residuals = y_test - performance['predictions']plt.scatter(performance['predictions'], residuals, alpha=0.6)plt.axhline(y=0, color='r', linestyle='--')plt.xlabel('预测值')plt.ylabel('残差')plt.title('残差分析')plt.grid(True, alpha=0.3)# 模型权重可视化plt.subplot(1, 3, 3)models = list(ensemble.weights.keys())weights = list(ensemble.weights.values())plt.barh(models, weights)plt.xlabel('权重')plt.title('模型权重分布')plt.grid(True, alpha=0.3)plt.tight_layout()plt.show()# 成功检查target_r2 = 0.9if performance['test_r2'] >= target_r2:print(f"\n🎉 SUCCESS! 测试R² = {performance['test_r2']:.4f} >= {target_r2}")print(f"🏆 超额完成: +{(performance['test_r2'] - target_r2)/target_r2*100:.1f}%")else:print(f"\n❌ 未达目标: 测试R² = {performance['test_r2']:.4f} < {target_r2}")return ensemble, performance# 执行示例(需要实际数据)
# ensemble, performance = train_problem1_model(
# X_train_eng, X_val_eng, X_test_eng, y_train, y_val, y_test
# )
数学公式与代码的对应关系总结:
数学公式 | 代码实现 | 生物学意义 |
---|---|---|
Φpoly(xj)=[xj2,xj3,xj4]\Phi_{poly}(x_j) = [x_j^2, x_j^3, x_j^4]Φpoly(xj)=[xj2,xj3,xj4] | polynomial_features() | BMI²反映稀释的二次效应 |
Φinteract={xi⊙xj}\Phi_{interact} = \{x_i \odot x_j\}Φinteract={xi⊙xj} | interaction_features() | 年龄×BMI的协同影响 |
zj=(xj−μj)/σjz_j = (x_j - μ_j) / σ_jzj=(xj−μj)/σj | statistical_features() | 标准化消除量纲影响 |
I(Xj;Y)=∫p(x,y)logp(x,y)p(x)p(y)I(X_j; Y) = \int p(x,y)\log\frac{p(x,y)}{p(x)p(y)}I(Xj;Y)=∫p(x,y)logp(x)p(y)p(x,y) | SelectKBest(mutual_info_regression) | 选择最相关特征 |
wk=(Rk2)3/∑(Rj2)3w_k = (R^2_k)^3 / \sum(R^2_j)^3wk=(Rk2)3/∑(Rj2)3 | compute_cubic_weights() | 突出优秀模型贡献 |
y^=∑wkfk(x)\hat{y} = \sum w_k f_k(x)y^=∑wkfk(x) | predict() | 加权平均集成预测 |
这样,每个数学公式都有对应的代码实现,并且有明确的生物学解释,读者可以清楚地理解整个建模过程!
四、问题二:BMI分组优化(多目标优化实战)
数学建模思路
多目标优化问题:
minGF(G)=[R(G),1−Q(G),V(G)]T\min_{\mathbf{G}} \mathbf{F}(\mathbf{G}) = [\mathcal{R}(\mathbf{G}), 1-\mathcal{Q}(\mathbf{G}), \mathcal{V}(\mathbf{G})]^TGminF(G)=[R(G),1−Q(G),V(G)]T
其中:
- R(G)\mathcal{R}(\mathbf{G})R(G):平均风险得分(失败率)
- Q(G)\mathcal{Q}(\mathbf{G})Q(G):整体合格率
- V(G)\mathcal{V}(\mathbf{G})V(G):组大小方差(平衡性惩罚)
约束条件:
- ⋃k=1KGk=Dmale\bigcup_{k=1}^K G_k = \mathcal{D}_{male}⋃k=1KGk=Dmale (完全覆盖)
- ∣Gk∣≥nmin|G_k| \geq n_{min}∣Gk∣≥nmin (最小样本数约束)
- BMImin≤b1<b2<…<bK−1≤BMImaxBMI_{min} \leq b_1 < b_2 < \ldots < b_{K-1} \leq BMI_{max}BMImin≤b1<b2<…<bK−1≤BMImax (边界有序)
完整代码实现:
import numpy as np
import pandas as pd
from sklearn.cluster import KMeans
from sklearn.metrics import silhouette_score
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.optimize import differential_evolution
import warnings
warnings.filterwarnings('ignore')class BMIOptimization:"""BMI多目标优化分组类数学基础:- 多目标优化:同时优化风险、合格率、平衡性- 帕累托最优:寻找非劣解集合- 聚类分析:发现BMI的自然分界点"""def __init__(self, alpha=0.4, beta=0.4, gamma=0.2):"""初始化多目标权重参数:- alpha: 风险权重(失败率的重要性)- beta: 合格率权重(通过率的重要性) - gamma: 平衡性权重(组大小均衡的重要性)约束:alpha + beta + gamma = 1.0"""self.alpha = alphaself.beta = beta self.gamma = gamma# 验证权重约束weight_sum = alpha + beta + gammaif abs(weight_sum - 1.0) > 1e-6:print(f"⚠️ 权重和 = {weight_sum:.3f} ≠ 1.0,自动归一化")self.alpha = alpha / weight_sumself.beta = beta / weight_sumself.gamma = gamma / weight_sumprint(f"🎯 多目标权重: 风险={self.alpha:.2f}, 合格率={self.beta:.2f}, 平衡性={self.gamma:.2f}")self.optimal_groups = Noneself.optimization_history = []def multi_objective_loss(self, groups, verbose=False):"""多目标损失函数数学公式:L(G) = α·R(G) + β·(1-Q(G)) + γ·V(G)其中:R(G) = (1/K) Σ(k=1 to K) [Σ(i∈G_k) I[y_i < 0.04]] / |G_k|Q(G) = [Σ(k=1 to K) Σ(i∈G_k) I[y_i ≥ 0.04]] / nV(G) = (1/(K-1)) Σ(k=1 to K) (|G_k| - n/K)²"""if not groups or len(groups) == 0:return float('inf')# 目标1: 风险得分(各组失败率的平均值)risk_scores = []for group in groups:if len(group) > 0:failure_rate = np.mean(group['Y染色体浓度'] < 0.04)risk_scores.append(failure_rate)else:risk_scores.append(1.0) # 空组惩罚avg_risk = np.mean(risk_scores) if risk_scores else 1.0# 目标2: 整体合格率total_qualified = sum(np.sum(group['Y染色体浓度'] >= 0.04) for group in groups if len(group) > 0)total_samples = sum(len(group) for group in groups if len(group) > 0)qualification_rate = total_qualified / total_samples if total_samples > 0 else 0# 目标3: 组大小方差(平衡性)group_sizes = [len(group) for group in groups if len(group) > 0]if len(group_sizes) > 1:mean_size = np.mean(group_sizes)size_variance = np.var(group_sizes)else:size_variance = 0# 加权损失函数loss = (self.alpha * avg_risk + self.beta * (1 - qualification_rate) + self.gamma * size_variance / 1000) # 缩放方差项if verbose:print(f" 📊 风险={avg_risk:.3f}, 合格率={qualification_rate:.3f}, 方差={size_variance:.1f}")print(f" 🎯 加权损失={loss:.4f}")return loss, {'avg_risk': avg_risk,'qualification_rate': qualification_rate,'size_variance': size_variance,'group_sizes': group_sizes,'risk_scores': risk_scores}def kmeans_grouping(self, data, k):"""基于K-means的BMI分组数学原理:最小化组内平方和:J = Σ(k=1 to K) Σ(x∈G_k) ||x - μ_k||²其中 μ_k 是第k组的中心点对于一维BMI数据,这等价于寻找最优分割点"""if k <= 1:return [data.copy()]# K-means聚类(基于BMI)kmeans = KMeans(n_clusters=k, random_state=42, n_init=10)cluster_labels = kmeans.fit_predict(data[['BMI']].values)# 构建分组groups = []centroids = []for i in range(k):group_mask = (cluster_labels == i)if group_mask.sum() > 0:group_data = data[group_mask].copy()groups.append(group_data)centroids.append(group_data['BMI'].mean())# 按BMI中心点排序组别sorted_indices = np.argsort(centroids)groups = [groups[i] for i in sorted_indices]return groupsdef optimize_grouping(self, data, max_groups=10, min_group_size=3):"""优化BMI分组数量算法流程:1. 遍历k=2到max_groups2. 对每个k执行K-means聚类 3. 计算多目标损失4. 选择损失最小的k评估指标:- 多目标损失(主要)- 轮廓系数(聚类质量)- 组大小约束(实用性)"""print("🔍 开始BMI分组优化...")print("=" * 60)best_loss = float('inf')best_k = 2best_groups = Nonebest_metrics = Noneresults = []for k in range(2, max_groups + 1):print(f"\n🔄 测试 K={k} 分组...")# K-means分组groups = self.kmeans_grouping(data, k)# 过滤小组(避免统计不稳定)valid_groups = [g for g in groups if len(g) >= min_group_size]if len(valid_groups) != k:print(f" ⚠️ {k-len(valid_groups)} 组样本数 < {min_group_size},跳过")continue# 计算多目标损失loss, metrics = self.multi_objective_loss(valid_groups, verbose=True)# 计算聚类质量(轮廓系数)if k < len(data) and len(set(data['BMI'])) > k:cluster_labels = []for i, group in enumerate(valid_groups):cluster_labels.extend([i] * len(group))bmi_values = np.concatenate([group['BMI'].values for group in valid_groups])try:silhouette = silhouette_score(bmi_values.reshape(-1, 1), cluster_labels)except:silhouette = 0else:silhouette = 0# 记录结果result = {'k': k,'loss': loss,'silhouette': silhouette,'groups': valid_groups,**metrics}results.append(result)print(f" 📈 K={k}: 损失={loss:.4f}, 轮廓={silhouette:.3f}")# 更新最优解if loss < best_loss:best_loss = lossbest_k = kbest_groups = valid_groupsbest_metrics = metrics# 保存结果self.optimal_groups = best_groupsself.optimization_history = resultsprint("\n" + "=" * 60)print(f"🎯 最优分组: K={best_k}")print(f"🏆 最小损失: {best_loss:.4f}")print(f"📊 最优指标: 风险={best_metrics['avg_risk']:.3f}, "f"合格率={best_metrics['qualification_rate']:.3f}")return resultsdef analyze_optimal_groups(self):"""分析最优分组结果输出内容:1. 每组的BMI范围和统计信息2. 合格率和风险等级评估3. 个性化最优时机建议4. 临床应用指导"""if self.optimal_groups is None:print("❌ 请先运行分组优化")return Noneprint("📊 最优分组详细分析")print("=" * 80)group_stats = []for i, group in enumerate(self.optimal_groups):if len(group) == 0:continue# 基本统计bmi_min, bmi_max = group['BMI'].min(), group['BMI'].max()avg_bmi = group['BMI'].mean()std_bmi = group['BMI'].std()sample_count = len(group)# 合格率分析qualified_mask = (group['Y染色体浓度'] >= 0.04)qualified_count = qualified_mask.sum()qualification_rate = qualified_count / sample_count * 100# 风险等级判定if qualification_rate >= 95:risk_level = "🟢 低风险"risk_color = "green"elif qualification_rate >= 80:risk_level = "🟡 中风险"risk_color = "orange"else:risk_level = "🔴 高风险"risk_color = "red"# 最优时机计算(基于合格率最高的孕周)weeks_analysis = group.groupby('weeks')['Y染色体浓度'].apply(lambda x: (x >= 0.04).mean())if not weeks_analysis.empty:optimal_week = weeks_analysis.idxmax()max_success_rate = weeks_analysis.max()else:optimal_week = 12.0 # 默认值max_success_rate = qualification_rate / 100# 临床建议if qualification_rate >= 95:clinical_advice = "建议正常检测流程"elif qualification_rate >= 80:clinical_advice = "建议增加监测频次"else:clinical_advice = "建议提前检测或特殊关注"# 保存统计信息stats = {'group_id': i,'bmi_range': f'[{bmi_min:.1f}, {bmi_max:.1f}]','avg_bmi': avg_bmi,'std_bmi': std_bmi,'sample_count': sample_count,'qualified_count': qualified_count,'qualification_rate': qualification_rate,'risk_level': risk_level,'risk_color': risk_color,'optimal_week': optimal_week,'max_success_rate': max_success_rate,'clinical_advice': clinical_advice}group_stats.append(stats)# 打印详细信息print(f"\n第{i}组 BMI分层:")print(f" 📏 BMI范围: {stats['bmi_range']}")print(f" 📈 平均BMI: {avg_bmi:.1f} ± {std_bmi:.1f}")print(f" 👥 样本数量: {sample_count}")print(f" ✅ 合格情况: {qualified_count}/{sample_count} ({qualification_rate:.1f}%)")print(f" 🎯 风险等级: {risk_level}")print(f" ⏰ 建议时机: {optimal_week:.1f}孕周 (成功率{max_success_rate:.1f})")print(f" 🏥 临床建议: {clinical_advice}")# 整体分析total_samples = sum(stats['sample_count'] for stats in group_stats)total_qualified = sum(stats['qualified_count'] for stats in group_stats)overall_rate = total_qualified / total_samples * 100print(f"\n📊 整体分析:")print(f" 总样本: {total_samples}")print(f" 总合格: {total_qualified}")print(f" 整体合格率: {overall_rate:.1f}%")# 高性能组统计high_perf_groups = [s for s in group_stats if s['qualification_rate'] >= 95]print(f" 高性能组: {len(high_perf_groups)}/{len(group_stats)} "f"(合格率≥95%)")return group_statsdef visualize_results(self, data):"""可视化分组结果图表内容:1. BMI分组散点图2. 各组合格率对比3. 样本分布饼图4. 优化过程曲线"""if self.optimal_groups is None:print("❌ 请先运行分组优化")return# 设置中文字体plt.rcParams['font.sans-serif'] = ['SimHei']plt.rcParams['axes.unicode_minus'] = Falsefig, axes = plt.subplots(2, 2, figsize=(16, 12))# 子图1: BMI分组散点图ax1 = axes[0, 0]colors = ['red', 'blue', 'green', 'orange', 'purple', 'brown', 'pink']for i, group in enumerate(self.optimal_groups):if len(group) > 0:color = colors[i % len(colors)]ax1.scatter(group['BMI'], group['Y染色体浓度'], alpha=0.6, c=color, label=f'第{i}组', s=30)ax1.axhline(y=0.04, color='red', linestyle='--', alpha=0.8, linewidth=2, label='合格线(0.04)')ax1.set_xlabel('BMI')ax1.set_ylabel('Y染色体浓度')ax1.set_title('BMI分组结果')ax1.legend(bbox_to_anchor=(1.05, 1), loc='upper left')ax1.grid(True, alpha=0.3)# 子图2: 各组合格率对比ax2 = axes[0, 1]group_stats = self.analyze_optimal_groups()if group_stats:groups = [f"第{s['group_id']}组" for s in group_stats]rates = [s['qualification_rate'] for s in group_stats]bar_colors = ['green' if r >= 95 else 'orange' if r >= 80 else 'red' for r in rates]bars = ax2.bar(groups, rates, color=bar_colors, alpha=0.7)ax2.axhline(y=90, color='blue', linestyle='--', alpha=0.8, label='目标线(90%)')ax2.set_ylabel('合格率 (%)')ax2.set_title('各组合格率对比')ax2.legend()# 在柱子上标注数值for bar, rate in zip(bars, rates):height = bar.get_height()ax2.text(bar.get_x() + bar.get_width()/2, height + 1, f'{rate:.1f}%', ha='center', va='bottom', fontweight='bold')# 子图3: 样本分布ax3 = axes[1, 0]if group_stats:sizes = [s['sample_count'] for s in group_stats]labels = [f"第{s['group_id']}组\n({s['sample_count']}样本)" for s in group_stats]wedges, texts, autotexts = ax3.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90, colors=colors[:len(sizes)])ax3.set_title('样本分布')# 子图4: 优化过程ax4 = axes[1, 1]if self.optimization_history:k_values = [r['k'] for r in self.optimization_history]losses = [r['loss'] for r in self.optimization_history]qual_rates = [r['qualification_rate'] * 100 for r in self.optimization_history]# 双Y轴ax4_twin = ax4.twinx()line1 = ax4.plot(k_values, losses, 'b-o', label='多目标损失', linewidth=2, markersize=8)line2 = ax4_twin.plot(k_values, qual_rates, 'g-s', label='合格率(%)', linewidth=2, markersize=8)ax4.set_xlabel('分组数量 K')ax4.set_ylabel('多目标损失', color='b')ax4_twin.set_ylabel('合格率 (%)', color='g')ax4.set_title('优化过程')# 标记最优点best_result = min(self.optimization_history, key=lambda x: x['loss'])ax4.axvline(x=best_result['k'], color='red', linestyle='--', alpha=0.8)ax4.text(best_result['k'], best_result['loss'], f'最优K={best_result["k"]}', ha='center', va='bottom', bbox=dict(boxstyle="round,pad=0.3", facecolor="yellow", alpha=0.7))ax4.grid(True, alpha=0.3)plt.tight_layout()plt.show()return fig# 使用示例和完整流程
def solve_problem2(male_data):"""问题二完整解决方案"""print("🎯 问题二:BMI分组优化")print("=" * 80)# 初始化优化器optimizer = BMIOptimization(alpha=0.4, beta=0.4, gamma=0.2)# 执行分组优化results = optimizer.optimize_grouping(male_data, max_groups=10, min_group_size=3)# 分析最优结果group_stats = optimizer.analyze_optimal_groups()# 可视化结果fig = optimizer.visualize_results(male_data)# 临床应用建议print("\n🏥 临床应用建议:")print("=" * 40)high_perf_groups = [s for s in group_stats if s['qualification_rate'] >= 95]medium_perf_groups = [s for s in group_stats if 80 <= s['qualification_rate'] < 95]low_perf_groups = [s for s in group_stats if s['qualification_rate'] < 80]print(f"🟢 低风险组 ({len(high_perf_groups)}组): 可按标准流程检测")for group in high_perf_groups:print(f" BMI {group['bmi_range']}: {group['optimal_week']:.1f}孕周检测")print(f"🟡 中风险组 ({len(medium_perf_groups)}组): 建议提前检测")for group in medium_perf_groups:print(f" BMI {group['bmi_range']}: {group['optimal_week']:.1f}孕周检测,增加关注")print(f"🔴 高风险组 ({len(low_perf_groups)}组): 需要特殊方案")for group in low_perf_groups:print(f" BMI {group['bmi_range']}: 建议提前至{group['optimal_week']:.1f}孕周,或考虑其他方法")# 成功评估success_groups = len(high_perf_groups)total_groups = len(group_stats)print(f"\n📊 成功指标:")print(f" 最优分组数: {len(group_stats)}")print(f" 高性能组: {success_groups}/{total_groups}")print(f" 成功率: {success_groups/total_groups*100:.1f}%")if success_groups >= 3:print("🎉 SUCCESS! 超过3组达到优秀性能 (≥95%合格率)")else:print(f"⚠️ 仅{success_groups}组达到优秀性能,需要进一步优化")return optimizer, group_stats# 运行示例
# optimizer, group_stats = solve_problem2(male_data)
关键数学公式与代码对应:
数学公式 | 代码实现 | 业务意义 |
---|---|---|
L=αR+β(1−Q)+γVL = \alpha R + \beta(1-Q) + \gamma VL=αR+β(1−Q)+γV | multi_objective_loss() | 平衡风险、合格率、均衡性 |
$R = \frac{1}{K}\sum \frac{\sum I[y_i < 0.04]}{ | G_k | }$ |
Q=∑∑I[yi≥0.04]nQ = \frac{\sum \sum I[y_i \geq 0.04]}{n}Q=n∑∑I[yi≥0.04] | qualification_rate 计算 | 整体通过率 |
$V = \frac{1}{K-1}\sum ( | G_k | - \frac{n}{K})^2$ |
$J = \sum \sum | x - \mu_k |
这样每个数学概念都有对应的代码实现,并且结合了实际的业务意义!
三、问题三:多因素综合建模(超越极限)
这个问题在问题一基础上进一步突破,要求整合更多因素实现更高精度。我采用了28维特征空间+高阶交互项+ElasticNet正则化的策略。
数学建模思路
目标函数增强版:
maxθR2=1−∑i=1n(yi−fθ(ϕ(xi)))2∑i=1n(yi−yˉ)2≥0.95\max_{\boldsymbol{\theta}} R^2 = 1 - \frac{\sum_{i=1}^{n}(y_i - f_{\boldsymbol{\theta}}(\boldsymbol{\phi}(\mathbf{x}_i)))^2}{\sum_{i=1}^{n}(y_i - \bar{y})^2} \geq 0.95θmaxR2=1−∑i=1n(yi−yˉ)2∑i=1n(yi−fθ(ϕ(xi)))2≥0.95
其中:
- ϕ(xi)\boldsymbol{\phi}(\mathbf{x}_i)ϕ(xi):28维多因素特征映射函数
- fθf_{\boldsymbol{\theta}}fθ:ElasticNet回归模型
- 新目标:R2≥0.95R^2 \geq 0.95R2≥0.95(比问题一更严格)
解决方案: 多维度特征空间构建 + L1+L2正则化
第一步:28维多因素特征空间构建(数学+代码对应)
import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import ElasticNet
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import r2_scoreclass MultiFactorFeatureEngineering:"""多因素特征工程类实现:X_basic (9维) → X_multifactor (28维) → X_interaction (1568维) → X_selected数学基础:Φ(X) = [Φ_physio(X), Φ_temporal(X), Φ_technical(X), Φ_interaction(X)]"""def __init__(self):self.feature_names = []self.scaler = StandardScaler()def create_physiological_features(self, data):"""生理维度特征 (12维)数学公式:Φ_physio = [age, age², age³, BMI, BMI², log(1+BMI), h/w, BSA, BMI×age, ...]生物学意义:- age²: 年龄对生理功能的二次效应- BMI²: 体重指数对血液稀释的非线性影响- h/w: 身高体重比反映体型特征- BSA: 体表面积影响血液循环"""print("🧬 构建生理维度特征...")features = {}# 基础生理特征features['age'] = data['age']features['BMI'] = data['BMI']features['height'] = data['height']features['weight'] = data['weight']# 年龄的高阶项 - 反映生理功能非线性变化features['age_squared'] = data['age'] ** 2features['age_cubed'] = data['age'] ** 3# BMI的变换 - 反映体重指数的复杂影响features['BMI_squared'] = data['BMI'] ** 2features['BMI_log'] = np.log1p(data['BMI']) # log(1+BMI)# 体型相关特征features['height_weight_ratio'] = data['height'] / (data['weight'] + 1e-8)# 体表面积 (DuBois公式)# BSA = 0.007184 × height^0.725 × weight^0.425features['body_surface_area'] = 0.007184 * (data['height'] ** 0.725) * (data['weight'] ** 0.425)# 交互项 - 生理因素间的协同作用features['BMI_age_interaction'] = data['BMI'] * data['age']features['weight_sqrt'] = np.sqrt(np.maximum(data['weight'], 0))print(f" ✅ 生理维度特征: {len(features)} 维")return featuresdef create_temporal_features(self, data):"""时间维度特征 (8维)数学公式:Φ_temporal = [weeks, weeks², weeks³, weeks×BMI, weeks×age, optimal_flag, normalized_weeks]生物学意义:- weeks²,³: 孕周的非线性发育模式- weeks×BMI: 孕期体重与发育时间的交互效应- optimal_flag: 最佳检测时机标志"""print("⏰ 构建时间维度特征...")features = {}# 基础时间特征features['weeks'] = data['weeks']# 孕周的高阶项 - 反映胎儿发育的非线性过程features['weeks_squared'] = data['weeks'] ** 2features['weeks_cubed'] = data['weeks'] ** 3# 时间与生理因素的交互features['weeks_BMI_interaction'] = data['weeks'] * data['BMI']features['weeks_age_interaction'] = data['weeks'] * data['age']# 最佳检测时机标志 (12孕周后)features['optimal_timing_flag'] = np.where(data['weeks'] >= 12, 1, 0)# 标准化孕周 (以12周为基准)features['weeks_normalized'] = (data['weeks'] - 12) / 10# 孕周周期性特征 (妊娠期的周期性变化)features['weeks_sin'] = np.sin(data['weeks'] / 40 * 2 * np.pi)print(f" ✅ 时间维度特征: {len(features)} 维")return featuresdef create_technical_features(self, data):"""技术维度特征 (8维)数学公式:Φ_technical = [log(reads), GC², chrY_z², quality_score, efficiency, ...]技术意义:- log(reads): 测序深度的对数变换- GC²: GC含量的二次效应- quality_score: 测序质量综合评分"""print("🔬 构建技术维度特征...")features = {}# 测序深度相关features['total_reads_log'] = np.log1p(data['total_reads'])features['read_depth_quality'] = data['total_reads'] / (data['GC_content'] + 1e-8)# GC含量相关features['GC_content_squared'] = data['GC_content'] ** 2features['sequencing_efficiency'] = data['total_reads'] * data['GC_content']# Z分数相关features['chrY_z_squared'] = data['chrY_z'] ** 2features['total_z_score'] = np.abs(data['chrY_z'])# 血细胞计数相关features['blood_count_log'] = np.log1p(data['blood_count'])features['blood_quality_ratio'] = data['blood_count'] / (data['total_reads'] + 1e-8)print(f" ✅ 技术维度特征: {len(features)} 维")return featuresdef create_interaction_terms(self, X):"""高阶交互项构建数学公式:Φ_interaction = {x_i ⊗ x_j ⊗ x_k | i,j,k ∈ 重要特征, ⊗ ∈ {×, ÷}}生成1568个交互特征的核心逻辑"""print("🔗 构建高阶交互特征...")interaction_features = []feature_names = []# 选择前15个最重要的基础特征进行交互important_features = X.columns[:15]# 重要的二阶交互对critical_pairs = [('age', 'BMI'), ('weeks', 'BMI'), ('BMI', 'chrY_z'),('age', 'weeks'), ('height', 'weight'), ('total_reads', 'GC_content'),('age', 'chrY_z'), ('weeks', 'chrY_z'), ('BMI', 'blood_count')]for col1, col2 in critical_pairs:if col1 in X.columns and col2 in X.columns:# 乘积交互 - 协同效应mult_feature = X[col1] * X[col2]interaction_features.append(mult_feature)feature_names.append(f'{col1}×{col2}')# 比值交互 - 相对效应ratio_feature = X[col1] / (X[col2] + 1e-8)interaction_features.append(ratio_feature)feature_names.append(f'{col1}÷{col2}')# 生物学有意义的三阶交互biological_triplets = [('age', 'BMI', 'weeks'), # 年龄-体重-孕周三重影响('height', 'weight', 'GC_content'), # 体型-技术质量关联('BMI', 'weeks', 'chrY_z'), # 体重-孕周-目标信号('age', 'blood_count', 'total_reads') # 年龄-血液-测序深度]for col1, col2, col3 in biological_triplets:if all(col in X.columns for col in [col1, col2, col3]):triplet_feature = X[col1] * X[col2] * X[col3]interaction_features.append(triplet_feature)feature_names.append(f'{col1}×{col2}×{col3}')if interaction_features:interaction_df = pd.concat(interaction_features, axis=1)interaction_df.columns = feature_namesprint(f" ✅ 交互特征生成完成: {len(interaction_df.columns)} 维")return pd.concat([X, interaction_df], axis=1)return Xdef build_multifactor_features(self, data):"""构建完整的28维多因素特征空间"""print("🚀 构建28维多因素特征空间...")print("=" * 60)all_features = {}# 1. 生理维度特征 (12维)physio_features = self.create_physiological_features(data)all_features.update(physio_features)# 2. 时间维度特征 (8维)temporal_features = self.create_temporal_features(data)all_features.update(temporal_features)# 3. 技术维度特征 (8维)technical_features = self.create_technical_features(data)all_features.update(technical_features)# 转换为DataFramefeature_df = pd.DataFrame(all_features)self.feature_names = feature_df.columns.tolist()print(f"🎯 28维特征空间构建完成: {len(self.feature_names)} 维")print("=" * 60)return feature_df# 使用示例
multifactor_engineer = MultiFactorFeatureEngineering()
X_multifactor = multifactor_engineer.build_multifactor_features(train_data)
第二步:ElasticNet正则化模型(数学+代码详解)
class ElasticNetMultiFactorModel:"""ElasticNet多因素回归模型数学基础:min_θ (1/2n)||y - Xθ||₂² + α·ρ||θ||₁ + α·(1-ρ)/2||θ||₂²其中:- α: 正则化强度- ρ: L1和L2正则化的平衡参数 (L1_ratio)- ||θ||₁: L1正则化项 (特征选择)- ||θ||₂²: L2正则化项 (系数平滑)"""def __init__(self):self.model = Noneself.scaler = StandardScaler()self.feature_importance = Nonedef train_elastic_net(self, X, y, X_val, y_val):"""训练ElasticNet模型数学过程:1. 数据标准化: X̃ = (X - μ) / σ2. 网格搜索优化: (α*, ρ*) = argmin CV_error(α, ρ)3. 模型拟合: θ̂ = argmin L(θ; X̃, y, α*, ρ*)"""print("🎯 训练ElasticNet多因素模型...")print("-" * 50)# 数据标准化 - 消除量纲影响X_scaled = self.scaler.fit_transform(X)X_val_scaled = self.scaler.transform(X_val)print(f"✅ 数据标准化完成: {X_scaled.shape}")# 网格搜索参数空间param_grid = {'alpha': [0.001, 0.01, 0.1, 1.0, 10.0], # 正则化强度'l1_ratio': [0.1, 0.3, 0.5, 0.7, 0.9] # L1/(L1+L2)比例}print("🔍 开始网格搜索优化...")# ElasticNet基础模型elastic_net = ElasticNet(max_iter=2000, random_state=42, selection='cyclic' # 循坐标下降)# 5折交叉验证网格搜索grid_search = GridSearchCV(elastic_net, param_grid,cv=5, # 5折交叉验证scoring='r2', # R²作为评分标准n_jobs=-1, # 并行计算verbose=1)# 拟合网格搜索grid_search.fit(X_scaled, y)# 保存最优模型self.model = grid_search.best_estimator_# 验证集性能评估val_pred = self.model.predict(X_val_scaled)val_r2 = r2_score(y_val, val_pred)val_rmse = np.sqrt(np.mean((y_val - val_pred) ** 2))print(f"✅ 最优参数: {grid_search.best_params_}")print(f"📊 验证集性能:")print(f" R² = {val_r2:.6f}")print(f" RMSE = {val_rmse:.6f}")# 分析特征重要性self.analyze_feature_importance(X)return {'val_r2': val_r2,'val_rmse': val_rmse,'best_params': grid_search.best_params_,'val_pred': val_pred}def analyze_feature_importance(self, X, top_k=15):"""特征重要性分析数学原理:importance_j = |θ̂_j| / Σ|θ̂_i|其中θ̂_j是第j个特征的回归系数"""if self.model is None:print("❌ 模型未训练")return Noneprint(f"\n🎯 分析Top {top_k}重要特征...")print("-" * 50)# 获取标准化后的回归系数coefficients = np.abs(self.model.coef_)# 计算相对重要性total_importance = np.sum(coefficients)relative_importance = coefficients / total_importance if total_importance > 0 else coefficients# 创建特征重要性DataFrameimportance_df = pd.DataFrame({'feature': X.columns[:len(coefficients)],'coefficient': self.model.coef_[:len(X.columns)],'abs_coefficient': coefficients[:len(X.columns)],'relative_importance': relative_importance[:len(X.columns)]})# 按重要性排序importance_df = importance_df.sort_values('abs_coefficient', ascending=False)print("🏆 特征重要性排名:")for i, row in importance_df.head(top_k).iterrows():print(f" {row.name+1:2d}. {row['feature']:30s}: "f"coef={row['coefficient']:8.4f}, "f"importance={row['relative_importance']:6.2%}")self.feature_importance = importance_dfreturn importance_dfdef predict_with_confidence(self, X, alpha=0.05):"""带置信区间的预测数学原理:使用Bootstrap方法估计预测不确定性CI = [Q_{α/2}, Q_{1-α/2}]"""if self.model is None:print("❌ 模型未训练")return Noneprint(f"🔮 执行置信区间预测 (α={alpha})...")X_scaled = self.scaler.transform(X)predictions = self.model.predict(X_scaled)# Bootstrap估计不确定性n_bootstrap = 100bootstrap_preds = []for i in range(n_bootstrap):# 随机重采样indices = np.random.choice(len(X), len(X), replace=True)X_boot = X_scaled[indices]pred_boot = self.model.predict(X_boot)bootstrap_preds.append(pred_boot)bootstrap_preds = np.array(bootstrap_preds)# 计算置信区间ci_lower = np.percentile(bootstrap_preds, (alpha/2)*100, axis=0)ci_upper = np.percentile(bootstrap_preds, (1-alpha/2)*100, axis=0)print(f"✅ 置信区间计算完成")return {'predictions': predictions,'confidence_interval': (ci_lower, ci_upper),'prediction_std': np.std(bootstrap_preds, axis=0)}# 使用示例
elastic_model = ElasticNetMultiFactorModel()
performance = elastic_model.train_elastic_net(X_multifactor, y_train, X_val_multifactor, y_val
)print(f"\n🎉 问题三最终结果: R² = {performance['val_r2']:.6f}")
关键数学公式与代码对应关系:
数学公式 | 代码实现 | 生物学意义 |
---|---|---|
BSA=0.007184×h0.725×w0.425BSA = 0.007184 \times h^{0.725} \times w^{0.425}BSA=0.007184×h0.725×w0.425 | body_surface_area 计算 | DuBois公式计算体表面积 |
weeksnorm=(weeks−12)/10weeks_{norm} = (weeks - 12) / 10weeksnorm=(weeks−12)/10 | weeks_normalized | 以12孕周为基准的标准化 |
minθ∥y−Xθ∥22+αρ∥θ∥1+α(1−ρ)∥θ∥22\min_\theta \|y - X\theta\|_2^2 + \alpha\rho\|\theta\|_1 + \alpha(1-\rho)\|\theta\|_2^2minθ∥y−Xθ∥22+αρ∥θ∥1+α(1−ρ)∥θ∥22 | ElasticNet() 模型 | L1+L2混合正则化 |
importancej=∣θ^j∣/∑∣θ^i∣importance_j = |\hat{\theta}_j| / \sum|\hat{\theta}_i|importancej=∣θ^j∣/∑∣θ^i∣ | analyze_feature_importance() | 基于系数绝对值的特征重要性 |
CI=[Qα/2,Q1−α/2]CI = [Q_{\alpha/2}, Q_{1-\alpha/2}]CI=[Qα/2,Q1−α/2] | predict_with_confidence() | Bootstrap置信区间估计 |
四、问题四:女胎异常检测(极度不平衡分类的终极挑战)
这是整个项目最有挑战性的问题:88.9%正常 vs 11.1%异常的极度不平衡分类,需要检测13三体、18三体、21三体等罕见异常。
数学建模思路
不平衡多分类问题:
maxθAUC=∫01TPR(θ,t)dFPR(θ,t)≥0.8\max_{\boldsymbol{\theta}} \text{AUC} = \int_0^1 \text{TPR}(\boldsymbol{\theta}, t) \, d\text{FPR}(\boldsymbol{\theta}, t) \geq 0.8θmaxAUC=∫01TPR(θ,t)dFPR(θ,t)≥0.8
约束条件:
- 样本分布: P(y=0):P(y≠0)=88.9%:11.1%P(y=0) : P(y \neq 0) = 88.9\% : 11.1\%P(y=0):P(y=0)=88.9%:11.1%
- 多分类: y∈{0(正常),1(13三体),2(18三体),3(21三体)}y \in \{0(\text{正常}), 1(\text{13三体}), 2(\text{18三体}), 3(\text{21三体})\}y∈{0(正常),1(13三体),2(18三体),3(21三体)}
解决方案: 三级筛查系统 + SMOTE过采样 + 成本敏感学习
第一步:三级筛查系统设计(数学+代码详解)
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, roc_auc_score, confusion_matrix
from sklearn.utils.class_weight import compute_class_weight
from imblearn.over_sampling import SMOTE
import matplotlib.pyplot as plt
import seaborn as snsclass ThreeStageScreeningSystem:"""三级筛查系统 - 模拟医院实际筛查流程数学框架:Stage 1: P(abnormal | Z-scores) - 基于阈值的初筛Stage 2: P(class | features, abnormal) - 机器学习精筛 Stage 3: P_ensemble(class | Stage2_models) - 多模型集成"""def __init__(self, stage1_threshold=2.0):self.stage1_threshold = stage1_thresholdself.models = {}self.class_weights = {}def preprocess_female_data(self, data):"""女胎数据预处理数学变换:1. 特征选择: X ∈ R^12 (去除Y染色体相关特征)2. 标签编码: y: {正常→0, 13三体→1, 18三体→2, 21三体→3}3. 缺失值处理: X_imputed = median_impute(X)"""print("🔍 女胎数据预处理与分析...")print("=" * 60)# 检查异常类型分布abnormality_dist = data['abnormality'].value_counts()print("📊 异常类型分布:")for abnorm_type, count in abnormality_dist.items():percentage = count / abnormality_dist.sum() * 100print(f" {abnorm_type:12s}: {count:3d} 样本 ({percentage:5.1f}%)")# 计算不平衡程度normal_count = abnormality_dist.get('normal', 0)abnormal_count = abnormality_dist.sum() - normal_countimbalance_ratio = normal_count / abnormal_count if abnormal_count > 0 else float('inf')print(f"⚖️ 样本不平衡比例: {imbalance_ratio:.1f}:1 (正常:异常)")# 特征选择 (排除Y染色体相关特征)female_features = ['age', 'height', 'weight', 'BMI', 'weeks','total_reads', 'GC_content', 'chrX_z','chr13_z', 'chr18_z', 'chr21_z', 'blood_count']# 检查可用特征available_features = [col for col in female_features if col in data.columns]print(f"✅ 可用特征: {len(available_features)} 个")print(f" {available_features}")# 构建特征矩阵X = data[available_features].copy()# 标签编码label_mapping = {'normal': 0,'trisomy_13': 1,'trisomy_18': 2,'trisomy_21': 3}y = data['abnormality'].map(label_mapping)# 处理缺失值 (使用中位数填充)X_filled = X.fillna(X.median())print(f"📈 预处理结果:")print(f" 特征矩阵: {X_filled.shape}")print(f" 标签分布: {dict(y.value_counts())}")print("=" * 60)return X_filled, y, available_featuresdef stage1_z_score_screening(self, X):"""第一级:基于Z分数的阈值筛查数学原理:初筛条件: max(|Z_13|, |Z_18|, |Z_21|) > threshold生物学意义:Z分数反映染色体剂量异常程度,|Z| > 2 提示可能异常"""print("🏥 Stage 1: Z分数阈值筛查...")# 提取Z分数列z_columns = [col for col in X.columns if col.endswith('_z')]if not z_columns:print("⚠️ 未找到Z分数列,所有样本通过Stage 1")return np.ones(len(X), dtype=bool)print(f"📊 使用Z分数列: {z_columns}")print(f"🎯 筛查阈值: |Z| > {self.stage1_threshold}")# 计算最大绝对Z分数z_scores_abs = X[z_columns].abs()max_z_scores = z_scores_abs.max(axis=1)# 阈值筛查abnormal_mask = max_z_scores > self.stage1_thresholdpass_count = abnormal_mask.sum()pass_rate = pass_count / len(X) * 100print(f"✅ Stage 1 结果: {pass_count}/{len(X)} 样本通过 ({pass_rate:.1f}%)")return abnormal_maskdef stage2_ml_models_training(self, X_train, y_train):"""第二级:机器学习模型训练数学原理:1. 类别权重: w_c = n_samples / (n_classes × n_samples_c)2. 逻辑回归: P(y=c|x) = softmax(θᵀx)3. 随机森林: P(y=c|x) = (1/T) Σ I[h_t(x) = c]"""print("🤖 Stage 2: 机器学习模型训练...")print("-" * 40)# 计算类别权重 (处理不平衡)class_weights = compute_class_weight('balanced',classes=np.unique(y_train),y=y_train)self.class_weights = dict(zip(np.unique(y_train), class_weights))print("⚖️ 类别权重计算:")class_names = ['正常', '13三体', '18三体', '21三体']for class_id, weight in self.class_weights.items():class_name = class_names[class_id] if class_id < len(class_names) else f'类别{class_id}'print(f" {class_name}: {weight:.3f}")# 训练逻辑回归模型print("\n📈 训练逻辑回归模型...")lr_model = LogisticRegression(class_weight=self.class_weights,max_iter=2000,random_state=42,solver='liblinear' # 适合小数据集)lr_model.fit(X_train, y_train)self.models['logistic'] = lr_model# 训练随机森林模型 print("🌲 训练随机森林模型...")rf_model = RandomForestClassifier(n_estimators=200,class_weight='balanced',max_depth=10,min_samples_split=5,min_samples_leaf=2,random_state=42,n_jobs=-1)rf_model.fit(X_train, y_train)self.models['random_forest'] = rf_modelprint("✅ Stage 2 模型训练完成")def stage3_ensemble_prediction(self, X):"""第三级:集成预测数学原理:P_ensemble(y=c|x) = α × P_LR(y=c|x) + (1-α) × P_RF(y=c|x)其中 α = 0.6 (逻辑回归权重更高,因为更擅长概率估计)"""if 'logistic' not in self.models or 'random_forest' not in self.models:raise ValueError("Stage 2 模型未训练")# 获取两个模型的概率预测lr_proba = self.models['logistic'].predict_proba(X)rf_proba = self.models['random_forest'].predict_proba(X)# 加权集成 (逻辑回归权重更高)alpha = 0.6ensemble_proba = alpha * lr_proba + (1 - alpha) * rf_proba# 最终预测ensemble_pred = np.argmax(ensemble_proba, axis=1)return ensemble_pred, ensemble_probadef create_balanced_dataset(self, X, y):"""SMOTE过采样平衡数据集数学原理:对于少数类样本 x_i,生成合成样本:x_synthetic = x_i + λ × (x_neighbor - x_i)其中 λ ~ Uniform(0,1),x_neighbor 是x_i的k近邻"""print("⚖️ SMOTE过采样处理样本不平衡...")print("原始分布:")unique, counts = np.unique(y, return_counts=True)class_names = ['正常', '13三体', '18三体', '21三体']for class_id, count in zip(unique, counts):class_name = class_names[class_id] if class_id < len(class_names) else f'类别{class_id}'print(f" {class_name}: {count} 样本")# SMOTE过采样smote = SMOTE(random_state=42, k_neighbors=min(5, min(counts)-1))X_resampled, y_resampled = smote.fit_resample(X, y)print("\n平衡后分布:")unique, counts = np.unique(y_resampled, return_counts=True)for class_id, count in zip(unique, counts):class_name = class_names[class_id] if class_id < len(class_names) else f'类别{class_id}'print(f" {class_name}: {count} 样本")return X_resampled, y_resampleddef train_screening_system(self, X, y):"""训练完整三级筛查系统"""print("🚀 训练三级筛查系统")print("=" * 60)# 数据分割 (7:1:2)from sklearn.model_selection import train_test_splitX_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42, stratify=y)X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.667, random_state=42, stratify=y_temp)print(f"📊 数据分割完成:")print(f" 训练集: {X_train.shape[0]} 样本")print(f" 验证集: {X_val.shape[0]} 样本")print(f" 测试集: {X_test.shape[0]} 样本")# SMOTE平衡训练集X_train_balanced, y_train_balanced = self.create_balanced_dataset(X_train, y_train)# Stage 2: 训练ML模型self.stage2_ml_models_training(X_train_balanced, y_train_balanced)# 在验证集上评估performance = self.evaluate_on_dataset(X_val, y_val, dataset_name="验证集")return {'test_data': (X_test, y_test),'val_performance': performance}def evaluate_on_dataset(self, X, y, dataset_name="测试集"):"""在指定数据集上评估三级筛查系统性能"""print(f"\n📊 {dataset_name}性能评估")print("-" * 40)# Stage 1: 初筛stage1_mask = self.stage1_z_score_screening(X)stage1_passed_X = X[stage1_mask]stage1_passed_y = y[stage1_mask]if len(stage1_passed_X) == 0:print("⚠️ Stage 1 无样本通过,无法进行后续评估")return {}# Stage 2 & 3: 精筛 + 集成final_pred, final_proba = self.stage3_ensemble_prediction(stage1_passed_X)# 计算AUC (二分类: 正常 vs 异常)y_binary = (stage1_passed_y > 0).astype(int)if len(np.unique(y_binary)) > 1:prob_abnormal = 1 - final_proba[:, 0] # 异常概率 = 1 - P(正常)auc_binary = roc_auc_score(y_binary, prob_abnormal)print(f"🎯 二分类AUC (正常 vs 异常): {auc_binary:.4f}")else:auc_binary = Noneprint("⚠️ 样本类别不足,无法计算AUC")# 多分类AUCtry:if len(np.unique(stage1_passed_y)) > 2:auc_multi = roc_auc_score(stage1_passed_y, final_proba, multi_class='ovr', average='weighted')print(f"🎯 多分类AUC: {auc_multi:.4f}")else:auc_multi = auc_binaryexcept:auc_multi = Noneprint("⚠️ 多分类AUC计算失败")# 分类报告class_names = ['正常', '13三体', '18三体', '21三体']available_classes = sorted(np.unique(stage1_passed_y))available_names = [class_names[i] for i in available_classes]print(f"\n📋 详细分类报告:")report = classification_report(stage1_passed_y, final_pred,target_names=available_names,output_dict=True,zero_division=0)print(classification_report(stage1_passed_y, final_pred,target_names=available_names,zero_division=0))return {'auc_binary': auc_binary,'auc_multi': auc_multi,'classification_report': report,'predictions': final_pred,'probabilities': final_proba,'stage1_pass_rate': stage1_mask.sum() / len(X)}# 使用示例
def solve_problem4_complete(female_data):"""问题四完整解决方案"""print("🎯 问题四:女胎异常检测系统")print("=" * 80)# 初始化三级筛查系统screening_system = ThreeStageScreeningSystem(stage1_threshold=2.0)# 数据预处理X, y, features = screening_system.preprocess_female_data(female_data)# 训练筛查系统training_result = screening_system.train_screening_system(X, y)# 测试集最终评估X_test, y_test = training_result['test_data']final_performance = screening_system.evaluate_on_dataset(X_test, y_test, "测试集")print(f"\n🏆 问题四最终结果:")if final_performance.get('auc_binary'):print(f" 二分类AUC: {final_performance['auc_binary']:.4f}")if final_performance.get('auc_multi'):print(f" 多分类AUC: {final_performance['auc_multi']:.4f}")print(f" Stage 1通过率: {final_performance['stage1_pass_rate']:.1%}")# 绘制混淆矩阵if len(final_performance['predictions']) > 0:plot_confusion_matrix(y_test[screening_system.stage1_z_score_screening(X_test)], final_performance['predictions'])return screening_system, final_performancedef plot_confusion_matrix(y_true, y_pred):"""绘制混淆矩阵"""cm = confusion_matrix(y_true, y_pred)plt.figure(figsize=(8, 6))class_names = ['正常', '13三体', '18三体', '21三体']available_classes = sorted(np.unique(y_true))available_names = [class_names[i] for i in available_classes]sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',xticklabels=available_names,yticklabels=available_names)plt.title('三级筛查系统混淆矩阵')plt.xlabel('预测类别')plt.ylabel('真实类别')plt.tight_layout()plt.show()# 执行问题四
# screening_system, performance = solve_problem4_complete(female_data)
关键数学公式与代码对应关系:
数学公式 | 代码实现 | 医学意义 |
---|---|---|
wc=nsamplesnclasses×nsamplescw_c = \frac{n_{samples}}{n_{classes} \times n_{samples_c}}wc=nclasses×nsamplescnsamples | compute_class_weight('balanced') | 平衡类别权重 |
max(∣Z13∣,∣Z18∣,∣Z21∣)>threshold\max(|Z_{13}|, |Z_{18}|, |Z_{21}|) > thresholdmax(∣Z13∣,∣Z18∣,∣Z21∣)>threshold | stage1_z_score_screening() | Z分数阈值筛查 |
xsynthetic=xi+λ×(xneighbor−xi)x_{synthetic} = x_i + \lambda \times (x_{neighbor} - x_i)xsynthetic=xi+λ×(xneighbor−xi) | SMOTE() 过采样 | 合成少数类样本 |
Pensemble(c∣x)=αPLR(c∣x)+(1−α)PRF(c∣x)P_{ensemble}(c|x) = \alpha P_{LR}(c|x) + (1-\alpha) P_{RF}(c|x)Pensemble(c∣x)=αPLR(c∣x)+(1−α)PRF(c∣x) | stage3_ensemble_prediction() | 加权集成预测 |
AUC=∫01TPR(t)dFPR(t)AUC = \int_0^1 TPR(t) \, dFPR(t)AUC=∫01TPR(t)dFPR(t) | roc_auc_score() | ROC曲线下面积 |
五、完整项目总结与成果展示
🏆 突破性成果总览
技术突破记录
问题 | 数学模型 | 核心技术 | 最终结果 | 突破程度 |
---|---|---|---|---|
问题一 | 高维集成回归 | 1053维特征工程 + 11模型集成 | R² = 0.9459 | 超越目标51.0% |
问题二 | 多目标优化 | K-means + 多目标损失函数 | 5组分层,3组100%合格 | 3组完美达标 |
问题三 | 正则化高维建模 | 28维特征 + 1568交互项 + ElasticNet | R² = 0.9726 | 创历史新高 |
问题四 | 三级筛查系统 | SMOTE + 成本敏感学习 + 集成 | AUC = 0.792 | 填补技术空白 |
数学方法创新点
-
特征工程革命:
- 数学基础:Φ(X)=[Φpoly(X),Φinteract(X),Φstat(X),Φtransform(X)]\Phi(X) = [\Phi_{poly}(X), \Phi_{interact}(X), \Phi_{stat}(X), \Phi_{transform}(X)]Φ(X)=[Φpoly(X),Φinteract(X),Φstat(X),Φtransform(X)]
- 维度扩展:9维 → 1053维 → 120维精选
- 生物学融合:每个特征变换都有明确的生物学意义
-
集成学习突破:
- 立方权重策略:wk=(Rk2)3/∑(Rj2)3w_k = (R^2_k)^3 / \sum(R^2_j)^3wk=(Rk2)3/∑(Rj2)3
- 异构模型融合:XGBoost + LightGBM + RF + GB等11个模型
- 性能提升:相比单模型提升15.7%
-
多目标优化创新:
- 损失函数:L=α⋅R+β⋅(1−Q)+γ⋅VL = \alpha \cdot R + \beta \cdot (1-Q) + \gamma \cdot VL=α⋅R+β⋅(1−Q)+γ⋅V
- 三重平衡:风险最小化 + 合格率最大化 + 组均衡性
- 临床应用:个性化最佳检测时机推荐
-
不平衡分类突破:
- 三级筛查:阈值筛查 → ML精筛 → 集成决策
- SMOTE过采样:xsynthetic=xi+λ(xneighbor−xi)x_{synthetic} = x_i + \lambda(x_{neighbor} - x_i)xsynthetic=xi+λ(xneighbor−xi)
- 成本敏感:类别权重自适应调整
💡 个人感悟与经验分享
踩坑记录与解决方案
-
数据理解错误
- 坑点:最开始没注意到Excel有两个sheet
- 教训:数据探索是第一步,不能急于建模
- 解决:仔细阅读数据说明,区分男胎和女胎数据用途
-
特征工程过拟合
- 坑点:1053维特征导致严重过拟合
- 教训:特征越多不代表效果越好
- 解决:互信息特征选择降到120维,效果反而更好
-
样本不平衡处理
- 坑点:直接用准确率评估,结果误导性很大
- 教训:不平衡数据必须用AUC等指标
- 解决:SMOTE + 成本敏感学习 + AUC评估
-
模型验证不足
- 坑点:只看训练集结果,测试时性能大幅下降
- 教训:必须严格按7:1:2分割,避免数据泄露
- 解决:统一数据分割策略,所有模型都遵循
🎉 恭喜你完成了这个超级挑战!
这个NIPT数学建模项目不仅仅是一次竞赛,更是一次深度学习之旅。从数据预处理到模型部署,从数学理论到工程实践,从单一问题到综合解决方案,每一步都充满挑战和收获。
记住最重要的三点:
- 数学建模的本质是理解问题:不要被复杂的算法迷惑,先搞懂业务场景
- 特征工程是王道:算法只是工具,数据和特征才是核心竞争力
- 持续优化永无止境:从0.9到0.9459的提升可能比从0.7到0.9更难,但更有价值
希望这个详细的教程能帮助大家在数学建模的路上走得更远!
继续努力,在AI+医疗的道路上创造更多可能! 🚀🧬✨