pytorch02:数据读取DataLoader与Dataset、数据预处理transform

发布时间:2023年12月31日

模型训练数据处理

在这里插入图片描述

一、DataLoader

1.1torch.utils.data.DataLoader

功能:构建可迭代的数据装载器
在这里插入图片描述
常用参数
? dataset: Dataset类,决定数据从哪读取及如何读取
? batchsize : 批大小
? num_works: 是否多进程读取数据
? shuffle: 每个epoch是否乱序
? drop_last:当样本数不能被batchsize整除时,是否舍弃最后一批数据

# 构建MyDataset实例  用来获取图片路径以及图片类别,图片数量
train_data = RMBDataset(data_dir=train_dir, transform=train_transform)
valid_data = RMBDataset(data_dir=valid_dir, transform=valid_transform)

# 构建DataLoder
train_loader = DataLoader(dataset=train_data, batch_size=BATCH_SIZE, shuffle=True)
valid_loader = DataLoader(dataset=valid_data, batch_size=BATCH_SIZE)

当使用dataset获取到每张图片路径的时候可以使用DataLoader对数据图片进行打包封装。

1.2数据常见概念

Epoch: 所有训练样本都已输入到模型中,称为一个Epoch
Iteration:一批样本输入到模型中,称之为一个Iteration
Batchsize:批大小,决定一个Epoch有多少个Iteration
举例:样本总数:80, Batchsize:8 ;1 Epoch 为80/8=10 Iteration

样本总数:87, Batchsize:8
当drop_last = True时,1 Epoch = 10 Iteration ,因为舍弃了后面7个样本
当 drop_last = False时,1 Epoch = 11 Iteration,后面7个样本保留为一个Iteration

二、Dataset

2.1torch.utils.data.Dataset

功能:Dataset抽象类,所有自定义的
Dataset需要继承它,并且复写__getitem__()
getitem :接收一个索引,返回一个样本

在这里插入图片描述

# 构建MyDataset实例  用来获取图片路径以及图片类别,图片数量
train_data = RMBDataset(data_dir=train_dir, transform=train_transform)
valid_data = RMBDataset(data_dir=valid_dir, transform=valid_transform)
import numpy as np
import torch
import os
import random
from PIL import Image
from torch.utils.data import Dataset

random.seed(1)
rmb_label = {"1": 0, "100": 1}


class RMBDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        """
        rmb面额分类任务的Dataset
        :param data_dir: str, 数据集所在路径
        :param transform: torch.transform,数据预处理
        """
        self.label_name = {"1": 0, "100": 1}
        self.data_info = self.get_img_info(data_dir)  # data_info存储所有图片路径和标签,在DataLoader中通过index读取样本
        self.transform = transform

    def __getitem__(self, index):
        path_img, label = self.data_info[index]
        img = Image.open(path_img).convert('RGB')     # 0~255

        if self.transform is not None:
            img = self.transform(img)   # 在这里做transform,转为tensor等等

        return img, label

    def __len__(self):
        return len(self.data_info)

    @staticmethod
    def get_img_info(data_dir):
        data_info = list()
        for root, dirs, _ in os.walk(data_dir):
            # 遍历类别
            for sub_dir in dirs:
                img_names = os.listdir(os.path.join(root, sub_dir))
                img_names = list(filter(lambda x: x.endswith('.jpg'), img_names))

                # 遍历图片
                for i in range(len(img_names)):
                    img_name = img_names[i]
                    path_img = os.path.join(root, sub_dir, img_name)
                    label = rmb_label[sub_dir]
                    data_info.append((path_img, int(label)))

        return data_info

2.2代码展示

思考一下下面三个问题:
在这里插入图片描述

文件夹里面的数据读取处理如下:
在这里插入图片描述

2.2.1划分数据集

以人民币二分类为案例,需要将人民币数据集划分为训练集、验证集、测试集;代码如下:

# -*- coding: utf-8 -*-  # 设置文件编码格式为 UTF-8

import os  # 导入处理操作系统相关功能的模块
import random  # 导入随机数相关的模块
import shutil  # 导入文件和目录操作相关的模块


def makedir(new_dir):  # 定义一个函数,用于创建目录
    if not os.path.exists(new_dir):  # 如果目录不存在
        os.makedirs(new_dir)  # 创建目录


if __name__ == '__main__':  # 程序入口点

    random.seed(1)  # 设置随机数种子为1,保证随机性可重复

    dataset_dir = os.path.join("..", "..", "data", "RMB_data")  # 数据集目录路径
    split_dir = os.path.join("..", "..", "data", "rmb_split")  # 划分后数据集目录路径
    train_dir = os.path.join(split_dir, "train")  # 训练集目录路径
    valid_dir = os.path.join(split_dir, "valid")  # 验证集目录路径
    test_dir = os.path.join(split_dir, "test")  # 测试集目录路径

    train_pct = 0.8  # 训练集占比
    valid_pct = 0.1  # 验证集占比
    test_pct = 0.1  # 测试集占比

    for root, dirs, files in os.walk(dataset_dir):  # 遍历数据集目录中的文件夹
        for sub_dir in dirs:  # 遍历每个文件夹

            imgs = os.listdir(os.path.join(root, sub_dir))  # 获取文件夹中所有文件列表
            imgs = list(filter(lambda x: x.endswith('.jpg'), imgs))  # 过滤出以'.jpg'结尾的文件
            random.shuffle(imgs)  # 随机打乱文件列表顺序
            img_count = len(imgs)  # 获取文件数量

            train_point = int(img_count * train_pct)  # 计算训练集划分点
            valid_point = int(img_count * (train_pct + valid_pct))  # 计算验证集划分点

            for i in range(img_count):  # 遍历每个文件
                if i < train_point:  # 如果在训练集范围内
                    out_dir = os.path.join(train_dir, sub_dir)  # 输出目录为训练集目录下的子文件夹
                elif i < valid_point:  # 如果在验证集范围内
                    out_dir = os.path.join(valid_dir, sub_dir)  # 输出目录为验证集目录下的子文件夹
                else:  # 其余情况
                    out_dir = os.path.join(test_dir, sub_dir)  # 输出目录为测试集目录下的子文件夹

                makedir(out_dir)  # 创建输出目录

                target_path = os.path.join(out_dir, imgs[i])  # 目标路径为输出目录下的文件路径
                src_path = os.path.join(dataset_dir, sub_dir, imgs[i])  # 源文件路径为数据集目录下的文件路径

                shutil.copy(src_path, target_path)  # 复制文件到目标路径

            print('Class:{}, train:{}, valid:{}, test:{}'.format(sub_dir, train_point, valid_point-train_point,
                                                                 img_count-valid_point))  # 输出每个类别的训练、验证和测试集数量
                                                               
窗口输出结果:
Class:1, train:80, valid:10, test:10
Class:100, train:80, valid:10, test:10

在这里插入图片描述

2.2.2训练代码

# -*- coding: utf-8 -*-

import os  
import random # 导入随机数生成库
import numpy as np # 导入numpy库
import torch # 导入pytorch库
import torch.nn as nn # 导入pytorch的神经网络模块
from torch.utils.data import DataLoader # 导入pytorch的数据加载模块
import torchvision.transforms as transforms # 导入图像转换模块
import torch.optim as optim # 导入优化器模块 
from matplotlib import pyplot as plt # 导入matplotlib的绘图模块
from model.lenet import LeNet # 导入LeNet模型
from tools.my_dataset import RMBDataset # 导入自己构建的数据集


def set_seed(seed=1): # 设置随机种子
    random.seed(seed)  
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)


set_seed()  # 设置随机种子
rmb_label = {"1": 0, "100": 1} # 构建分类标签字典

# 参数设置
MAX_EPOCH = 10  
BATCH_SIZE = 16
LR = 0.01
log_interval = 10  
val_interval = 1

# ============================ step 1/5 数据 ============================ 

split_dir = os.path.join("..", "..", "data", "rmb_split") # 拼接数据集路径
train_dir = os.path.join(split_dir, "train")  # 训练数据路径
valid_dir = os.path.join(split_dir, "valid") # 验证数据路径

norm_mean = [0.485, 0.456, 0.406] # 图像规范化的均值
norm_std = [0.229, 0.224, 0.225] # 图像规范化的方差

train_transform = transforms.Compose([  
    transforms.Resize((32, 32)), # 将图片resize到32x32
    transforms.RandomCrop(32, padding=4), # 进行随机裁剪
    transforms.ToTensor(), # 转换为Tensor
    transforms.Normalize(norm_mean, norm_std), # 规范化
])

valid_transform = transforms.Compose([
    transforms.Resize((32, 32)), # 将图片resize到32x32  
    transforms.ToTensor(),
    transforms.Normalize(norm_mean, norm_std), # 规范化
]) 

# 构建MyDataset实例
train_data = RMBDataset(data_dir=train_dir, transform=train_transform)  
valid_data = RMBDataset(data_dir=valid_dir, transform=valid_transform)

# 构建DataLoder
train_loader = DataLoader(dataset=train_data, batch_size=BATCH_SIZE, shuffle=True) # 训练数据加载器
valid_loader = DataLoader(dataset=valid_data, batch_size=BATCH_SIZE) # 验证数据加载器

# ============================ step 2/5 模型 ============================

net = LeNet(classes=2) # 实例化LeNet模型
net.initialize_weights() # 初始化权重

# ============================ step 3/5 损失函数 ============================
criterion = nn.CrossEntropyLoss() # 选择损失函数,交叉熵损失                                                  

# ============================ step 4/5 优化器 ============================
optimizer = optim.SGD(net.parameters(), lr=LR, momentum=0.9) # 选择优化器,SGD随机梯度下降                                                              
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1) # 设置学习率下降策略

# ============================ step 5/5 训练 ============================
train_curve = list()  
valid_curve = list()

for epoch in range(MAX_EPOCH): 

    loss_mean = 0.
    correct = 0.
    total = 0.

    net.train() # 设置模型为训练模式
    for i, data in enumerate(train_loader): # 遍历训练数据

        # forward,前向传播
        inputs, labels = data  
        outputs = net(inputs)

        # backward,反向传播
        optimizer.zero_grad() # 梯度置零
        loss = criterion(outputs, labels) # 计算loss
        loss.backward() # 反向传播计算梯度

        # update weights,更新权重
        optimizer.step()  

        # 统计分类情况
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0) 
        correct += (predicted == labels).squeeze().sum().numpy()

        # 打印训练信息
        loss_mean += loss.item()
        train_curve.append(loss.item())  
        if (i+1) % log_interval == 0:
            loss_mean = loss_mean / log_interval
            print("Training:Epoch[{:0>3}/{:0>3}] Iteration[{:0>3}/{:0>3}] Loss: {:.4f} Acc:{:.2%}".format(
                epoch, MAX_EPOCH, i+1, len(train_loader), loss_mean, correct / total))
            loss_mean = 0.

    scheduler.step()  # 更新学习率

    # validate the model
    if (epoch+1) % val_interval == 0:

        correct_val = 0.
        total_val = 0.
        loss_val = 0.
        net.eval() # 设置模型为评估模式
        with torch.no_grad():
            for j, data in enumerate(valid_loader):
                inputs, labels = data
                outputs = net(inputs)
                loss = criterion(outputs, labels)

                _, predicted = torch.max(outputs.data, 1)
                total_val += labels.size(0)
                correct_val += (predicted == labels).squeeze().sum().numpy()

                loss_val += loss.item()

            loss_val_epoch = loss_val / len(valid_loader)
            valid_curve.append(loss_val_epoch)
            print("Valid:\t Epoch[{:0>3}/{:0>3}] Iteration[{:0>3}/{:0>3}] Loss: {:.4f} Acc:{:.2%}".format(
                epoch, MAX_EPOCH, j+1, len(valid_loader), loss_val_epoch, correct_val / total_val))


train_x = range(len(train_curve))  
train_y = train_curve

train_iters = len(train_loader)  
valid_x = np.arange(1, len(valid_curve)+1) * train_iters*val_interval # 转换x轴记录点
valid_y = valid_curve

plt.plot(train_x, train_y, label='Train') # 画训练loss图
plt.plot(valid_x, valid_y, label='Valid') # 画验证loss图 

plt.legend(loc='upper right')  
plt.ylabel('loss value')
plt.xlabel('Iteration')
plt.show()

# ============================ inference ============================

BASE_DIR = os.path.dirname(os.path.abspath(__file__))
test_dir = os.path.join(BASE_DIR, "test_data")

test_data = RMBDataset(data_dir=test_dir, transform=valid_transform)  
valid_loader = DataLoader(dataset=test_data, batch_size=1)

for i, data in enumerate(valid_loader):
    # forward
    inputs, labels = data
    outputs = net(inputs)
    _, predicted = torch.max(outputs.data, 1)

    rmb = 1 if predicted.numpy()[0] == 0 else 100 
    print("模型获得{}元".format(rmb))

三、transform

3.1 torchvision视觉工具包

torchvision.transforms : 常用的图像预处理方法:图像翻转、裁剪等
torchvision.datasets : 常用数据集的dataset实现,MNIST,CIFAR-10,ImageNet等
torchvision.model : 常用的模型预训练,AlexNet,VGG, ResNet,GoogLeNet等

torchvision.transforms : 常用的图像预处理方法
? 数据中心化
? 数据标准化
? 缩放
? 裁剪
? 旋转
? 翻转
? 填充
? 噪声添加
? 灰度变换
? 线性变换
? 仿射变换
? 亮度、饱和度及对比度变换

3.2 代码展示

Compose:将一系列的图像处理方法整合到一块包装。

split_dir = os.path.join("..", "..", "data", "rmb_split")  # 从哪读取数据
train_dir = os.path.join(split_dir, "train")
valid_dir = os.path.join(split_dir, "valid")

norm_mean = [0.485, 0.456, 0.406]
norm_std = [0.229, 0.224, 0.225]

train_transform = transforms.Compose([
    transforms.Resize((32, 32)), #设置图像大小
    transforms.RandomCrop(32, padding=4), #随机裁剪
    transforms.ToTensor(),
    transforms.Normalize(norm_mean, norm_std),  #将数据均值变为0,标准差变为1
])
valid_transform = transforms.Compose([
    transforms.Resize((32, 32)),
    transforms.ToTensor(),
    transforms.Normalize(norm_mean, norm_std),
])

3.3 transforms.Normalize

功能:逐channel的对图像进行标准化,也就是output = (input - mean) / std,可以加快模型的收敛。
在这里插入图片描述
? mean:各通道的均值
? std:各通道的标准差
? inplace:是否原地操作

代码展示:

def normalize(tensor: Tensor, mean: List[float], std: List[float], inplace: bool = False) -> Tensor:
    _assert_image_tensor(tensor)

    if not tensor.is_floating_point():
        raise TypeError(f"Input tensor should be a float tensor. Got {tensor.dtype}.")

    if tensor.ndim < 3:
        raise ValueError(
            f"Expected tensor to be a tensor image of size (..., C, H, W). Got tensor.size() = {tensor.size()}"
        )

    if not inplace:
        tensor = tensor.clone()

    dtype = tensor.dtype
    mean = torch.as_tensor(mean, dtype=dtype, device=tensor.device)
    std = torch.as_tensor(std, dtype=dtype, device=tensor.device)
    if (std == 0).any():
        raise ValueError(f"std evaluated to zero after conversion to {dtype}, leading to division by zero.")
    if mean.ndim == 1:
        mean = mean.view(-1, 1, 1)
    if std.ndim == 1:
        std = std.view(-1, 1, 1)
    return tensor.sub_(mean).div_(std)  #减均值并且除以标准差

3.4 Normalize实验

为什么Normalize可以加快模型的收敛,以逻辑回归代码为例,此时bias = 1,均值接近0,当acc > 0.99停止训练,在380轮左右精确度达到99.5%

# -*- coding: utf-8 -*-

import torch
import torch.nn as nn
import matplotlib.pyplot as plt
import numpy as np

torch.manual_seed(10)

# ============================ step 1/5 生成数据 ============================
sample_nums = 100
mean_value = 1.7
bias = 1
n_data = torch.ones(sample_nums, 2)
x0 = torch.normal(mean_value * n_data, 1) + bias  # 类别0 数据 shape=(100, 2)
y0 = torch.zeros(sample_nums)  # 类别0 标签 shape=(100, 1)
x1 = torch.normal(-mean_value * n_data, 1) + bias  # 类别1 数据 shape=(100, 2)
y1 = torch.ones(sample_nums)  # 类别1 标签 shape=(100, 1)
train_x = torch.cat((x0, x1), 0)  # 在0维进行拼接
train_y = torch.cat((y0, y1), 0)


# ============================ step 2/5 选择模型 ============================
class LR(nn.Module):
    def __init__(self):
        super(LR, self).__init__()
        self.features = nn.Linear(2, 1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = self.features(x)
        x = self.sigmoid(x)
        return x


lr_net = LR()  # 实例化逻辑回归模型

# ============================ step 3/5 选择损失函数 ============================
loss_fn = nn.BCELoss()  # 二分类交叉熵损失函数

# ============================ step 4/5 选择优化器   ============================
lr = 0.01  # 学习率
optimizer = torch.optim.SGD(lr_net.parameters(), lr=lr, momentum=0.9)

# ============================ step 5/5 模型训练 ============================
for iteration in range(1000):

    # 前向传播
    y_pred = lr_net(train_x)

    # 计算 loss
    loss = loss_fn(y_pred.squeeze(), train_y)

    # 反向传播
    loss.backward()

    # 更新参数
    optimizer.step()

    # 清空梯度
    optimizer.zero_grad()

    # 绘图
    if iteration % 20 == 0:

        mask = y_pred.ge(0.5).float().squeeze()  # 以0.5为阈值进行分类
        correct = (mask == train_y).sum()  # 计算正确预测的样本个数
        acc = correct.item() / train_y.size(0)  # 计算分类准确率

        plt.scatter(x0.data.numpy()[:, 0], x0.data.numpy()[:, 1], c='r', label='class 0')
        plt.scatter(x1.data.numpy()[:, 0], x1.data.numpy()[:, 1], c='b', label='class 1')

        w0, w1 = lr_net.features.weight[0]
        w0, w1 = float(w0.item()), float(w1.item())
        plot_b = float(lr_net.features.bias[0].item())
        plot_x = np.arange(-6, 6, 0.1)
        plot_y = (-w0 * plot_x - plot_b) / w1

        plt.xlim(-5, 7)
        plt.ylim(-7, 7)
        plt.plot(plot_x, plot_y)

        plt.text(-5, 5, 'Loss=%.4f' % loss.data.numpy(), fontdict={'size': 20, 'color': 'red'})
        plt.title("Iteration: {}\nw0:{:.2f} w1:{:.2f} b: {:.2f} accuracy:{:.2%}".format(iteration, w0, w1, plot_b, acc))
        plt.legend()

        plt.show()
        plt.pause(0.5)

        if acc > 0.99:
            break

在这里插入图片描述

当修改bias = 5,均值远离0,当acc > 0.99停止训练,在580轮左右精确度还未达到99%。
在这里插入图片描述

文章来源:https://blog.csdn.net/J_oshua/article/details/135240585
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。