conv_arch
指定了每个VGG块里卷积层个数和输出通道数。全连接模块则与AlexNet中的相同。
import os
from tensorboardX import SummaryWriter
from alive_progress import alive_bar
import torch
from torch import nn, optim
from torch.utils.data import DataLoader
from torchvision.transforms import Compose, ToTensor, Resize
from torchvision.datasets import FashionMNIST
def load_dataset(size):
"""加载数据集"""
root = "./dataset"
if size is None:
transform = Compose([ToTensor()])
else:
transform = Compose([Resize(size), ToTensor()])
mnist_train = FashionMNIST(root, True, transform, download=True)
mnist_test = FashionMNIST(root, False, transform, download=True)
dataloader_train = DataLoader(mnist_train, batch_size, shuffle=True,
num_workers=num_workers, pin_memory=True,
)
dataloader_test = DataLoader(mnist_test, batch_size, shuffle=False,
num_workers=num_workers, pin_memory=True,
)
return dataloader_train, dataloader_test
def train_on_FashionMNIST(title, size=None):
"""在FashionMNIST数据集上训练指定模型"""
# 创建记录器
def log_dir():
root = "runs"
if not os.path.exists(root):
os.mkdir(root)
order = len(os.listdir(root)) + 1
return f'{root}/exp{order}'
writer = SummaryWriter(log_dir=log_dir())
# 数据集配置
dataloader_train, dataloader_test = load_dataset(size)
# 模型配置
def init_weights(m):
if type(m) == nn.Linear or type(m) == nn.Conv2d:
nn.init.xavier_uniform_(m.weight)
nn.init.zeros_(m.bias)
net.apply(init_weights)
criterion = nn.CrossEntropyLoss(reduction='none')
optimizer = optim.SGD(net.parameters(), lr=lr)
# 训练循环
metrics_train = torch.zeros(3, device=device) # 训练损失、训练准确度、样本数
metrics_test = torch.zeros(2, device=device) # 测试准确度、样本数
with alive_bar(num_epochs, theme='classic', title=title) as bar:
for epoch in range(num_epochs):
net.train()
for X, y in dataloader_train:
X, y = X.to(device, non_blocking=True), y.to(device, non_blocking=True)
loss = criterion(net(X), y)
optimizer.zero_grad()
loss.mean().backward()
optimizer.step()
net.eval()
with torch.no_grad():
metrics_train.zero_()
for X, y in dataloader_train:
X, y = X.to(device, non_blocking=True), y.to(device, non_blocking=True)
y_hat = net(X)
loss = criterion(y_hat, y)
metrics_train[0] += loss.sum()
metrics_train[1] += (y_hat.argmax(dim=1) == y).sum()
metrics_train[2] += y.numel()
metrics_train[0] /= metrics_train[2]
metrics_train[1] /= metrics_train[2]
metrics_test.zero_()
for X, y in dataloader_test:
X, y = X.to(device, non_blocking=True), y.to(device, non_blocking=True)
y_hat = net(X)
metrics_test[0] += (y_hat.argmax(dim=1) == y).sum()
metrics_test[1] += y.numel()
metrics_test[0] /= metrics_test[1]
_metrics_train = metrics_train.cpu().numpy()
_metrics_test = metrics_test.cpu().numpy()
writer.add_scalars('metrics', {
'train_loss': _metrics_train[0],
'train_acc': _metrics_train[1],
'test_acc': _metrics_test[0]
}, epoch)
bar()
writer.close()
if __name__ == "__main__":
# 全局参数设置
num_epochs = 10
batch_size = 128
num_workers = 3
lr = 0.01
device = torch.device('cuda')
# 模型配置
net = nn.Sequential(
nn.Conv2d(1, 96, kernel_size=11, stride=4, padding=1), nn.ReLU(),
nn.MaxPool2d(kernel_size=3, stride=2),
nn.Conv2d(96, 256, kernel_size=5, padding=2), nn.ReLU(),
nn.MaxPool2d(kernel_size=3, stride=2),
nn.Conv2d(256, 384, kernel_size=3, padding=1), nn.ReLU(),
nn.Conv2d(384, 384, kernel_size=3, padding=1), nn.ReLU(),
nn.Conv2d(384, 256, kernel_size=3, padding=1), nn.ReLU(),
nn.MaxPool2d(kernel_size=3, stride=2),
nn.Flatten(),
nn.Linear(6400, 4096), nn.ReLU(),
nn.Dropout(p=0.5),
nn.Linear(4096, 4096), nn.ReLU(),
nn.Dropout(p=0.5),
nn.Linear(4096, 10)
).to(device, non_blocking=True)
train_on_FashionMNIST('AlexNet', 224)
if __name__ == '__main__':
# 全局参数配置
num_epochs = 10
batch_size = 128
num_workers = 3
lr = 0.1
device = torch.device('cuda')
# 模型配置
def vgg_block(num_convs, in_channels, out_channels):
"""定义VGG块"""
layers = []
for _ in range(num_convs):
layers.append(nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1))
layers.append(nn.ReLU())
in_channels = out_channels
layers.append(nn.MaxPool2d(kernel_size=2, stride=2))
return nn.Sequential(*layers)
def vgg(conv_arch):
"""定义VGG网络"""
vgg_blocks = []
in_channels = 1
out_channels = 0
for num_convs, out_channels in conv_arch:
vgg_blocks.append(vgg_block(num_convs, in_channels, out_channels))
in_channels = out_channels
return nn.Sequential(
*vgg_blocks,
nn.Flatten(),
nn.Linear(out_channels * 7 * 7, 4096), nn.ReLU(),
nn.Dropout(p=0.5),
nn.Linear(4096, 4096), nn.ReLU(),
nn.Dropout(p=0.5),
nn.Linear(4096, 10),
)
ratio = 4
conv_arch = ((1, 64), (1, 128), (2, 256), (2, 512), (2, 512))
small_conv_arch = [(pair[0], pair[1] // ratio) for pair in conv_arch]
net = vgg(small_conv_arch).to(device, non_blocking=True)
# 在FashionMNIST数据集上训练模型
train_on_FashionMNIST('VGG', 224)
if __name__ == '__main__':
# 全局参数配置
num_epochs = 10
batch_size = 128
num_workers = 3
lr = 0.1
device = torch.device('cuda')
# 模型配置
def nin_block(in_channels, out_channels, kernel_size, strides, padding):
"""定义NiN块"""
return nn.Sequential(
nn.Conv2d(in_channels, out_channels, kernel_size, strides, padding), nn.ReLU(),
nn.Conv2d(out_channels, out_channels, kernel_size=1), nn.ReLU(),
nn.Conv2d(out_channels, out_channels, kernel_size=1), nn.ReLU()
)
net = nn.Sequential(
nin_block(1, 96, kernel_size=11, strides=4, padding=0),
nn.MaxPool2d(kernel_size=3, stride=2),
nin_block(96, 256, kernel_size=5, strides=1, padding=2),
nn.MaxPool2d(kernel_size=3, stride=2),
nin_block(256, 384, kernel_size=3, strides=1, padding=1),
nn.MaxPool2d(kernel_size=3, stride=2),
nn.Dropout(0.5),
nin_block(384, 10, kernel_size=3, strides=1, padding=1),
nn.AdaptiveAvgPool2d((1, 1)),
nn.Flatten()
).to(device, non_blocking=True)
# 在FashionMNIST数据集上训练模型
train_on_FashionMNIST('NiN', 224)
if __name__ == '__main__':
# 全局参数配置
num_epochs = 10
batch_size = 128
num_workers = 3
lr = 0.1
device = torch.device('cuda')
# 模型配置
class Inception(nn.Module):
# c1--c4是每条路径的输出通道数
def __init__(self, in_channels, c1, c2, c3, c4, **kwargs):
super(Inception, self).__init__(**kwargs)
# 线路1,单1x1卷积层
self.p1_1 = nn.Conv2d(in_channels, c1, kernel_size=1)
# 线路2,1x1卷积层后接3x3卷积层
self.p2_1 = nn.Conv2d(in_channels, c2[0], kernel_size=1)
self.p2_2 = nn.Conv2d(c2[0], c2[1], kernel_size=3, padding=1)
# 线路3,1x1卷积层后接5x5卷积层
self.p3_1 = nn.Conv2d(in_channels, c3[0], kernel_size=1)
self.p3_2 = nn.Conv2d(c3[0], c3[1], kernel_size=5, padding=2)
# 线路4,3x3最大汇聚层后接1x1卷积层
self.p4_1 = nn.MaxPool2d(kernel_size=3, stride=1, padding=1)
self.p4_2 = nn.Conv2d(in_channels, c4, kernel_size=1)
def forward(self, x):
p1 = F.relu(self.p1_1(x))
p2 = F.relu(self.p2_2(F.relu(self.p2_1(x))))
p3 = F.relu(self.p3_2(F.relu(self.p3_1(x))))
p4 = F.relu(self.p4_2(self.p4_1(x)))
# 在通道维度上连结输出
return torch.cat((p1, p2, p3, p4), dim=1)
b1 = nn.Sequential(nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3),
nn.ReLU(),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
b2 = nn.Sequential(nn.Conv2d(64, 64, kernel_size=1),
nn.ReLU(),
nn.Conv2d(64, 192, kernel_size=3, padding=1),
nn.ReLU(),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
b3 = nn.Sequential(Inception(192, 64, (96, 128), (16, 32), 32),
Inception(256, 128, (128, 192), (32, 96), 64),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
b4 = nn.Sequential(Inception(480, 192, (96, 208), (16, 48), 64),
Inception(512, 160, (112, 224), (24, 64), 64),
Inception(512, 128, (128, 256), (24, 64), 64),
Inception(512, 112, (144, 288), (32, 64), 64),
Inception(528, 256, (160, 320), (32, 128), 128),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1))
b5 = nn.Sequential(Inception(832, 256, (160, 320), (32, 128), 128),
Inception(832, 384, (192, 384), (48, 128), 128),
nn.AdaptiveAvgPool2d((1,1)),
nn.Flatten())
net = nn.Sequential(
b1, b2, b3, b4, b5,
nn.Linear(1024, 10)
).to(device, non_blocking=True)
# 在FashionMNIST数据集上训练模型
train_on_FashionMNIST('GoogLeNet', 96)
if __name__ == '__main__':
# 全局参数设置
num_epochs = 10
batch_size = 256
num_workers = 3
lr = 1.0
device = torch.device('cuda')
# 模型配置
net = nn.Sequential(
nn.Conv2d(1, 6, kernel_size=5), nn.BatchNorm2d(6), nn.Sigmoid(),
nn.AvgPool2d(kernel_size=2, stride=2),
nn.Conv2d(6, 16, kernel_size=5), nn.BatchNorm2d(16), nn.Sigmoid(),
nn.AvgPool2d(kernel_size=2, stride=2), nn.Flatten(),
nn.Linear(256, 120), nn.BatchNorm1d(120), nn.Sigmoid(),
nn.Linear(120, 84), nn.BatchNorm1d(84), nn.Sigmoid(),
nn.Linear(84, 10)
).to(device, non_blocking=True)
train_on_FashionMNIST('LeNet_BN')
if __name__ == '__main__':
# 全局参数设置
num_epochs = 10
batch_size = 256
num_workers = 3
lr = 0.05
device = torch.device('cuda')
# 模型配置
class Residual(nn.Module):
def __init__(self, input_channels, num_channels,
use_1x1conv=False, strides=1):
super().__init__()
self.conv1 = nn.Conv2d(input_channels, num_channels,
kernel_size=3, padding=1, stride=strides)
self.conv2 = nn.Conv2d(num_channels, num_channels,
kernel_size=3, padding=1)
if use_1x1conv:
self.conv3 = nn.Conv2d(input_channels, num_channels,
kernel_size=1, stride=strides)
else:
self.conv3 = None
self.bn1 = nn.BatchNorm2d(num_channels)
self.bn2 = nn.BatchNorm2d(num_channels)
def forward(self, X):
Y = F.relu(self.bn1(self.conv1(X)))
Y = self.bn2(self.conv2(Y))
if self.conv3:
X = self.conv3(X)
Y += X
return F.relu(Y)
def resnet_block(input_channels, num_channels, num_residuals,
first_block=False):
blk = []
for i in range(num_residuals):
if i == 0 and not first_block:
blk.append(Residual(input_channels, num_channels,
use_1x1conv=True, strides=2))
else:
blk.append(Residual(num_channels, num_channels))
return blk
b1 = nn.Sequential(
nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3),
nn.BatchNorm2d(64), nn.ReLU(),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
)
b2 = nn.Sequential(*resnet_block(64, 64, 2, first_block=True))
b3 = nn.Sequential(*resnet_block(64, 128, 2))
b4 = nn.Sequential(*resnet_block(128, 256, 2))
b5 = nn.Sequential(*resnet_block(256, 512, 2))
net = nn.Sequential(
b1, b2, b3, b4, b5,
nn.AdaptiveAvgPool2d((1,1)),
nn.Flatten(), nn.Linear(512, 10)
).to(device, non_blocking=True)
train_on_FashionMNIST('ResNet', 96)
if __name__ == '__main__':
# 全局参数设置
num_epochs = 10
batch_size = 256
num_workers = 3
lr = 0.1
device = torch.device('cuda')
# 模型配置
def conv_block(input_channels, num_channels):
return nn.Sequential(
nn.BatchNorm2d(input_channels), nn.ReLU(),
nn.Conv2d(input_channels, num_channels, kernel_size=3, padding=1)
)
class DenseBlock(nn.Module):
def __init__(self, num_convs, input_channels, num_channels):
super(DenseBlock, self).__init__()
layer = []
for i in range(num_convs):
layer.append(conv_block(
num_channels * i + input_channels, num_channels))
self.net = nn.Sequential(*layer)
def forward(self, X):
for blk in self.net:
Y = blk(X)
# 连接通道维度上每个块的输入和输出
X = torch.cat((X, Y), dim=1)
return X
def transition_block(input_channels, num_channels):
return nn.Sequential(
nn.BatchNorm2d(input_channels), nn.ReLU(),
nn.Conv2d(input_channels, num_channels, kernel_size=1),
nn.AvgPool2d(kernel_size=2, stride=2)
)
b1 = nn.Sequential(
nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3),
nn.BatchNorm2d(64), nn.ReLU(),
nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
)
# num_channels为当前的通道数
num_channels, growth_rate = 64, 32
num_convs_in_dense_blocks = [4, 4, 4, 4]
blks = []
for i, num_convs in enumerate(num_convs_in_dense_blocks):
blks.append(DenseBlock(num_convs, num_channels, growth_rate))
# 上一个稠密块的输出通道数
num_channels += num_convs * growth_rate
# 在稠密块之间添加一个转换层,使通道数量减半
if i != len(num_convs_in_dense_blocks) - 1:
blks.append(transition_block(num_channels, num_channels // 2))
num_channels = num_channels // 2
net = nn.Sequential(
b1, *blks,
nn.BatchNorm2d(num_channels), nn.ReLU(),
nn.AdaptiveAvgPool2d((1, 1)),
nn.Flatten(),
nn.Linear(num_channels, 10)
).to(device, non_blocking=True)
train_on_FashionMNIST('DenseNet', 96)