打卡第44天:无人机数据集分类
重复以下内容
作业:
kaggle找到一个图像数据集,用cnn网络进行训练并且用grad-cam做可视化
进阶:
并拆分成多个文件
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import datasets, transforms, models
from PIL import Image # 添加缺失的导入
import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm# 设置随机种子确保结果可复现
torch.manual_seed(42)
if torch.cuda.is_available():torch.cuda.manual_seed_all(42)class CustomDataset(Dataset):def __init__(self, image_dir, label_dir, transform=None):self.image_paths = sorted([os.path.join(image_dir, f) for f in os.listdir(image_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png'))])self.label_paths = sorted([os.path.join(label_dir, f) for f in os.listdir(label_dir) if f.lower().endswith('.txt')])self.transform = transform# 确保图像和标签数量匹配assert len(self.image_paths) == len(self.label_paths), "图像数量与标签数量不匹配"def __len__(self):return len(self.image_paths)def __getitem__(self, idx):image = Image.open(self.image_paths[idx]).convert('RGB')with open(self.label_paths[idx], 'r') as f:label = int(f.read().strip()) # 假设标签文件中是整数类别if self.transform:image = self.transform(image)return image, label
class CustomDataset(Dataset):def __init__(self, image_dir, label_dir, transform=None, debug=False):self.image_dir = image_dirself.label_dir = label_dirself.transform = transformself.debug = debug# 获取图像和标签文件列表self.image_files = sorted([f for f in os.listdir(image_dir) if f.lower().endswith(('.jpg', '.jpeg', '.png'))])self.label_files = sorted([f for f in os.listdir(label_dir) if f.lower().endswith('.txt')])# 打印调试信息if self.debug:print(f"图像目录: {image_dir}")print(f"标签目录: {label_dir}")print(f"找到 {len(self.image_files)} 个图像文件")print(f"找到 {len(self.label_files)} 个标签文件")# 打印前10个文件检查排序是否匹配print("\n前10个图像文件:")for f in self.image_files[:10]:print(f" {f}")print("\n前10个标签文件:")for f in self.label_files[:10]:print(f" {f}")# 确保图像和标签数量匹配assert len(self.image_files) == len(self.label_files), \f"图像数量({len(self.image_files)})与标签数量({len(self.label_files)})不匹配"# 创建文件映射(假设文件名除去扩展名后相同)self.image_to_label = {}for img_file in self.image_files:# 提取图像文件名(不含扩展名)img_base = os.path.splitext(img_file)[0]# 查找对应的标签文件found = Falsefor lbl_file in self.label_files:lbl_base = os.path.splitext(lbl_file)[0]if img_base == lbl_base:self.image_to_label[img_file] = lbl_filefound = Truebreakif not found and self.debug:print(f"警告: 找不到图像 '{img_file}' 对应的标签文件")# 再次确认所有图像都有对应的标签assert len(self.image_to_label) == len(self.image_files), \f"只有 {len(self.image_to_label)} 个图像找到了对应的标签,总数应为 {len(self.image_files)}"if self.debug:print(f"成功建立 {len(self.image_to_label)} 个图像-标签映射")def __len__(self):return len(self.image_files)def __getitem__(self, idx):img_file = self.image_files[idx]lbl_file = self.image_to_label[img_file]image_path = os.path.join(self.image_dir, img_file)label_path = os.path.join(self.label_dir, lbl_file)image = Image.open(image_path).convert('RGB')with open(label_path, 'r') as f:label = int(f.read().strip())if self.transform:image = self.transform(image)return image, label
# 数据路径配置
data_dir = r"C:\Users\许兰\Desktop\打卡文件\mix20230204" # 替换为你的数据集路径
train_image_dir = os.path.join(data_dir, 'train/images')
train_label_dir = os.path.join(data_dir, 'train/labels')
val_image_dir = os.path.join(data_dir, 'validation/images')
val_label_dir = os.path.join(data_dir, 'validation/labels')
test_image_dir = os.path.join(data_dir, 'test/images')
test_label_dir = os.path.join(data_dir, 'test/labels')# 数据预处理和增强
data_transforms = {'train': transforms.Compose([transforms.RandomResizedCrop(224),transforms.RandomHorizontalFlip(),transforms.RandomRotation(10),transforms.ColorJitter(brightness=0.2, contrast=0.2),transforms.ToTensor(),transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),'val': transforms.Compose([transforms.Resize(256),transforms.CenterCrop(224),transforms.ToTensor(),transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),'test': transforms.Compose([transforms.Resize(256),transforms.CenterCrop(224),transforms.ToTensor(),transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])]),
}# 使用自定义数据集类加载数据
image_datasets = {'train': CustomDataset(train_image_dir, train_label_dir, data_transforms['train']),'val': CustomDataset(val_image_dir, val_label_dir, data_transforms['val']),'test': CustomDataset(test_image_dir, test_label_dir, data_transforms['test'])
}# 创建数据加载器
batch_size = 32
dataloaders = {'train': DataLoader(image_datasets['train'], batch_size=batch_size, shuffle=True, num_workers=4),'val': DataLoader(image_datasets['val'], batch_size=batch_size, shuffle=False, num_workers=4),'test': DataLoader(image_datasets['test'], batch_size=batch_size, shuffle=False, num_workers=4)
}# 获取类别数量(假设类别从0开始连续编号)
num_classes = len(set([label for _, label in image_datasets['train']]))
print(f"数据集包含 {num_classes} 个类别")# 检查GPU是否可用
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"使用设备: {device}")# 定义CNN模型
class CNNModel(nn.Module):def __init__(self, num_classes):super(CNNModel, self).__init__()# 特征提取部分self.features = nn.Sequential(nn.Conv2d(3, 64, kernel_size=3, padding=1),nn.ReLU(inplace=True),nn.MaxPool2d(kernel_size=2, stride=2),nn.Conv2d(64, 128, kernel_size=3, padding=1),nn.ReLU(inplace=True),nn.MaxPool2d(kernel_size=2, stride=2),nn.Conv2d(128, 256, kernel_size=3, padding=1),nn.ReLU(inplace=True),nn.Conv2d(256, 256, kernel_size=3, padding=1),nn.ReLU(inplace=True),nn.MaxPool2d(kernel_size=2, stride=2),nn.Conv2d(256, 512, kernel_size=3, padding=1),nn.ReLU(inplace=True),nn.Conv2d(512, 512, kernel_size=3, padding=1),nn.ReLU(inplace=True),nn.MaxPool2d(kernel_size=2, stride=2),)# 分类部分self.classifier = nn.Sequential(nn.Dropout(0.5),nn.Linear(512 * 7 * 7, 4096),nn.ReLU(inplace=True),nn.Dropout(0.5),nn.Linear(4096, 4096),nn.ReLU(inplace=True),nn.Linear(4096, num_classes))# 初始化权重self._initialize_weights()def forward(self, x):x = self.features(x)x = torch.flatten(x, 1)x = self.classifier(x)return xdef _initialize_weights(self):for m in self.modules():if isinstance(m, nn.Conv2d):nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')if m.bias is not None:nn.init.constant_(m.bias, 0)elif isinstance(m, nn.Linear):nn.init.normal_(m.weight, 0, 0.01)nn.init.constant_(m.bias, 0)# 初始化模型
model = CNNModel(num_classes)
model = model.to(device)# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=1e-4)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)# 训练模型
def train_model(model, dataloaders, criterion, optimizer, scheduler, num_epochs=25):best_model_wts = model.state_dict()best_acc = 0.0# 记录训练过程history = {'train_loss': [], 'train_acc': [],'val_loss': [], 'val_acc': []}for epoch in range(num_epochs):print(f'Epoch {epoch+1}/{num_epochs}')print('-' * 10)# 每个epoch有训练和验证阶段for phase in ['train', 'val']:if phase == 'train':model.train() # 训练模式else:model.eval() # 评估模式running_loss = 0.0running_corrects = 0# 迭代数据for inputs, labels in tqdm(dataloaders[phase]):inputs = inputs.to(device)labels = labels.to(device)# 梯度归零optimizer.zero_grad()# 前向传播# 只有在训练时才跟踪历史with torch.set_grad_enabled(phase == 'train'):outputs = model(inputs)_, preds = torch.max(outputs, 1)loss = criterion(outputs, labels)# 只有在训练阶段才反向传播+优化if phase == 'train':loss.backward()optimizer.step()# 统计running_loss += loss.item() * inputs.size(0)running_corrects += torch.sum(preds == labels.data)if phase == 'train' and scheduler is not None:scheduler.step()epoch_loss = running_loss / len(dataloaders[phase].dataset)epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')# 记录历史history[f'{phase}_loss'].append(epoch_loss)history[f'{phase}_acc'].append(epoch_acc.item())# 保存最佳模型if phase == 'val' and epoch_acc > best_acc:best_acc = epoch_accbest_model_wts = model.state_dict()print(f'保存最佳模型,准确率: {best_acc:.4f}')print()# 加载最佳模型权重model.load_state_dict(best_model_wts)return model, history# 训练模型
num_epochs = 25
model, history = train_model(model, dataloaders, criterion, optimizer, scheduler, num_epochs)# 在测试集上评估模型
def evaluate_model(model, dataloader):model.eval()running_corrects = 0all_preds = []all_labels = []with torch.no_grad():for inputs, labels in dataloader:inputs = inputs.to(device)labels = labels.to(device)outputs = model(inputs)_, preds = torch.max(outputs, 1)running_corrects += torch.sum(preds == labels.data)all_preds.extend(preds.cpu().numpy())all_labels.extend(labels.cpu().numpy())accuracy = running_corrects.double() / len(dataloader.dataset)print(f'测试集准确率: {accuracy:.4f}')return accuracy.item(), all_preds, all_labels# 评估模型
test_accuracy, predictions, true_labels = evaluate_model(model, dataloaders['test'])# 可视化训练过程
plt.figure(figsize=(12, 4))plt.subplot(1, 2, 1)
plt.plot(history['train_loss'], label='Training Loss')
plt.plot(history['val_loss'], label='Validation Loss')
plt.title('Loss Over Time')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()plt.subplot(1, 2, 2)
plt.plot(history['train_acc'], label='Training Accuracy')
plt.plot(history['val_acc'], label='Validation Accuracy')
plt.title('Accuracy Over Time')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend()plt.tight_layout()
plt.savefig('training_history.png')
plt.show()# 保存模型
torch.save(model.state_dict(), 'cnn_image_classifier.pth')
print("模型已保存为 'cnn_image_classifier.pth'")# 可选:使用预训练模型进行迁移学习
def use_pretrained_model(num_classes):# 加载预训练的ResNet18model_ft = models.resnet18(pretrained=True)# 冻结部分层for param in list(model_ft.parameters())[:-4]:param.requires_grad = False# 修改最后的全连接层num_ftrs = model_ft.fc.in_featuresmodel_ft.fc = nn.Linear(num_ftrs, num_classes)return model_ft.to(device)