目录
? ? ? ? 6.1 使用Intel Extension for PyTorch优化
? ? ? ? 6.3?Intel? Neural Compressor 量化模型?
? ? ? ? 杂草是农业经营中不受欢迎的入侵者,它们通过窃取营养、水、土地和其他关键资源来破坏种植,这些入侵者会导致产量下降和资源部署效率低下。一种已知的方法是使用杀虫剂来清除杂草,但杀虫剂会给人类带来健康风险。我们的目标是利用计算机视觉技术可以自动检测杂草的存在,开发一种只在杂草上而不是在作物上喷洒农药的系统,并使用针对性的修复技术将其从田地中清除,从而最小化杂草对环境的负面影响。
????????我们期待您将其部署到模拟的生产环境中——这里推理时间和二分类准确度(F1分数)将作为评分的主要依据。
????????https://filerepo.idzcn.com/hack2023/Weed_Detection5a431d7.zip
????????
????????
数据集结构
? ? ? ? 由上图可知,数据集由agri_0开头的文件组成,其中一个植物包含两个文件,分别是jpeg的图像和txt的文档,jpeg的文件存储植物的图像,而txt文档中的第一个数据表示该植物的种类,0表示农作物,1表示杂草
农作物文件
? ? ? ? 为了分析数据,截取了数据集中的随机3个农作物样本和3个杂草样本,分别展示其图像
import cv2
train_dir = r'task1Data/data' # 图片路径
crop_imgs = [] # 存放农作物的图像路径
weed_imgs = [] # 存放杂草的图像路径
Data = [] # 存放所有数据,用于后续分类
# 通过txt文件的第一个数字分辨是农作物还是杂草,获取第一个数字
for file in os.listdir(train_dir):
if file.endswith(".txt"):
file_path = os.path.join(train_dir, file)
with open(file_path, 'r') as f:
data = f.readline()
root, ext = os.path.splitext(file_path)
new_file_path = root + ".jpeg"
# 如果标签为0,即该图片是农作物
if(data[0][0] == '0'):
# 农作物的路径
crop_imgs.append(new_file_path)
else:
# 杂草的路径
weed_imgs.append(new_file_path)
# 图片和标签对应
Data.append((new_file_path, int(data[0][0])))
# 随机不重样的抽选3个农作物,3个杂草
select_CROP = np.random.choice(crop_imgs, 3, replace = False)
select_WEED = np.random.choice(weed_imgs, 3, replace = False)
# 使用pit打印出来,大小为20*10英寸
fig = plt.figure(figsize = (20,10))
for i in range(6):
if i < 3:
fp = f'{select_CROP[i]}'
label = '0'
else:
fp = f'{select_WEED[i-3]}'
label = '1'
ax = fig.add_subplot(2, 3, i+1)#两行三列
# to plot without rescaling, remove target_size
fn = cv2.imread(fp) # 加载图片
fn_gray = cv2.cvtColor(fn, cv2.COLOR_BGR2GRAY) # 将彩色图像转换为灰度图像
plt.imshow(fn, cmap='Greys_r') # 显示灰度反转后的图像
plt.title(label) # 设置label标题
plt.axis('off') # 关闭坐标轴
plt.show()
# 总的训练集样本数
print(f'农作物数量为: {len(crop_imgs)}')
print(f'杂草数量为: {len(weed_imgs)}')
? ? ? ? 可以看到农作物的总数据量为634,杂草的总数据量为666?
? ? ? ? 因为要使用VGG-16来进行训练,所以这里需要对数据集进行处理,首先是把原始的数据集分开,分成三部分,分别按照0.7:0.2:0.1的比例分为train,val,test数据集,然后再根据是农作物还是杂草,分别在三个数据集下又进行分类,分为crop,weed数据集
# 数据集路径
data_dir = 'task1Data/data'
# 将数据集分成3部分, train, val, test
for divide in ['train', 'val', 'test']:
divide_dir = os.path.join(data_dir, divide)
# 每部分的数据集都包含两部分,农作物和杂草
for kinds in ['crop', 'weed']:
kinds_dir = os.path.join(divide_dir, kinds)
os.makedirs(kinds_dir, exist_ok = True) # exist_ok为true表示当目标目录已经存在时,函数不会引发错误
# 将train, val, test按比例分别划分为0.7, 0.2, 0.1
train_data, testVal_data = train_test_split(Data, test_size = 0.3, random_state = 42)
test_data, val_data = train_test_split(testVal_data, test_size = 0.2, random_state = 42)
# 将train_data中的文件分类,并复制到train/crop or weed下
for file in train_data:
if (file[1] == 0): # 农作物
shutil.copy(file[0], 'task1Data/data/train/crop')
else:
shutil.copy(file[0], 'task1Data/data/train/weed')
# 将val_data中的文件分类,并复制到val/crop or weed下
for file in val_data:
if (file[1] == 0):
shutil.copy(file[0], 'task1Data/data/val/crop')
else:
shutil.copy(file[0], 'task1Data/data/val/weed')
# 将test_data中的文件分类,并复制到test/crop or weed下
for file in test_data:
if (file[1] == 0):
shutil.copy(file[0], 'task1Data/data/test/crop')
else:
shutil.copy(file[0], 'task1Data/data/test/weed')
????????可以看到data目录下已经有了3个数据集
?????????每个数据集下分别由两个crop和weed文件夹
? ? ? ? ?每个crop和weed文件夹都已经随机放好了植物图片
? ? ? ? 然后自定义一个数据集,将图片和标签联系起来
# 创建自定义数据集
class SelfDataset(Dataset):
def __init__(self, root_dir, transform=None):
self.root_dir = root_dir
self.transform = transform
self.classes = ['crop', 'weed']
self.data = self.load_data()
def load_data(self):
data = []
for class_idx, class_name in enumerate(self.classes): # 每次迭代返回标签和class_name
class_path = os.path.join(self.root_dir, class_name)
for file_name in os.listdir(class_path):
file_path = os.path.join(class_path, file_name)
if os.path.isfile(file_path) and file_name.lower().endswith('.jpeg'):
data.append((file_path, class_idx))
return data
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
img_path, label = self.data[idx]
img = Image.open(img_path).convert('RGB')
if self.transform:
img = self.transform(img)
return img, label
? ? ? ? 进行数据增强,能够在训练模型时,增加数据集的大小和多样性,从而提高模型的泛化能力
# 数据增强
transform = transforms.Compose([
transforms.RandomResizedCrop(64), # 随机裁剪为64*64像素大小
transforms.RandomHorizontalFlip(), # 随机翻转转换
transforms.ToTensor(), # 转换为PyTorch Tensor,有助于模型的输入与PyTorch的期望格式匹配
])
? ? ? ? 对三个数据集进行了数据增强,而DataLoader用于批量加载数据,并在train_loader中启用随机打乱,能够在模型的训练时,提高模型的泛化能力
# 创建数据集实例
train_dataset = SelfDataset(root_dir=train_dataset_path, transform=transform)
test_dataset = SelfDataset(root_dir=test_dataset_path, transform=transform)
val_dataset = SelfDataset(root_dir=val_dataset_path, transform=transform)
# 创建 DataLoader
batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
????????经典CNN是指卷积神经网络(Convolutional Neural Networks,CNN),这是一种包含卷积计算且具有深度结构的前馈神经网络。CNN是深度学习领域中非常重要的算法之一,尤其在计算机视觉领域有广泛应用,如人脸识别、视频追踪、遥感解译等。CNN的经典结构包括LeNet-5、AlexNet、VGG、GoogleLeNet、ResNet和SENet等。
????????CNN的基本结构包括输入层、卷积层、池化层、全连接层和输出层。卷积层和池化层是CNN的核心部分,卷积层通过卷积运算提取输入数据中的局部特征,池化层则对特征图进行下采样,减少数据的维度和参数数量,提高模型的泛化能力。全连接层则将前面层的输出作为输入,对每个样本进行分类。
????????VGG-16是一个深度学习的卷积神经网络模型,由牛津大学的Visual Geometry Group(VGG)提出,网络中包含16个层次。VGG-16模型的特点是具有连续的卷积层,每一层都使用3x3的卷积核进行卷积操作,这样可以更有效地提取图像中的特征。同时,该模型还采用了Inception模块,将不同尺度的特征进行融合,提高了模型的识别能力。
????????传统的VGG-16网络的全连接层的输出类别是1000的大小,而本项目只有两类作物,农作物和杂草,所以将网络改成了2分类问题并对一部分网络进行了优化。
vgg16_model = models.vgg16(pretrained=True)
# 如果需要微调,可以解冻最后几层
for param in vgg16_model.features.parameters():
param.requires_grad = False
# 修改分类层
num_features = vgg16_model.classifier[6].in_features
vgg16_model.classifier[6] = nn.Sequential(
nn.Linear(num_features, 512),
nn.ReLU(),
nn.Dropout(0.5),
nn.Linear(512, 2)
)
????????nn.CrossEntropyLoss()
是PyTorch中的一个损失函数,它用于多分类问题。
????????使用Adam优化器,获取了VGG-16模型的参数,学习率为0.001,权重衰减为0.0001
????????使用 ReduceLROnPlateau 调度器,用于动态调整学习率
criterion = nn.CrossEntropyLoss() # 定义了一个损失函数,用于衡量模型的预测与真实标签之间的差异
optimizer = optim.Adam(vgg16_model.parameters(), lr=0.001, weight_decay=1e-4) # 定义了一个优化器,用于更新模型的权重,设置权重衰减为0.0001
# 添加学习率调度器
# 使用 ReduceLROnPlateau 调度器,当验证损失在一定时间内不再下降时,它会降低学习率
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.1, patience=3, verbose=True)
# 训练参数
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
vgg16_model.to(device)
? ? ? ? 下面是VGG的结构:
? ? ? ? 在gpu上进行50次的训练,使用了之前定义的三个数据集
# 训练循环
num_epochs = 0
consecutive_f1_count = 0
while num_epochs < 100:
print(f'第{num_epochs+1}次训练开始了')
vgg16_model.train() # 设置模型为训练模式
train_loss = 0.0
for inputs, labels in train_loader:
inputs, labels = inputs.to(device), labels.to(device)
# 将数据传递给模型
outputs = vgg16_model(inputs)
# 计算损失
loss = criterion(outputs, labels)
# 反向传播和优化
optimizer.zero_grad()
loss.backward()
optimizer.step()
train_loss += loss.item()
# 在每个 epoch 结束时进行验证
val_loss = 0.0
with torch.no_grad():
for inputs, labels in val_loader:
inputs, labels = inputs.to(device), labels.to(device)
# 在验证集上进行推理,可根据需要添加评估代码
val_outputs = vgg16_model(inputs)
val_loss += criterion(val_outputs, labels).item()
# 计算平均训练损失
avg_train_loss = train_loss / len(train_loader)
# 计算平均验证损失
avg_val_loss = val_loss / len(val_loader)
# 打印训练过程中的损失和验证损失
print(f'Epoch [{num_epochs+1}], 第{num_epochs+1}轮:训练集损失: {avg_train_loss:.4f}, 验证集损失: {avg_val_loss:.4f}')
# 在模型训练完后,使用测试集进行最终评估
vgg16_model.eval()
all_predictions = []
all_labels = []
start_time = time.time() # 记录开始时间
with torch.no_grad():
for inputs, labels in test_loader:
inputs, labels = inputs.to(device), labels.to(device)
# 在测试集上进行推理
outputs = vgg16_model(inputs)
# 将预测结果和真实标签保存
_, predicted = torch.max(outputs, 1)
all_predictions.extend(predicted.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
end_time = time.time() # 记录结束时间
elapsed_time = end_time - start_time
print(f'测试集用的时间为: {elapsed_time:.2f} seconds')
# 计算F1分数
f1 = f1_score(all_labels, all_predictions, average='binary') # 适用于二分类问题
# 打印每轮的测试F1分数
print(f'第{num_epochs+1}轮的测试F1分数: {f1:.4f}')
# 调整学习率
scheduler.step(f1)
# 增加训练次数
num_epochs += 1
? ? ? ? 保存模型然后进行测试并打印图像?
? ? ? ? 首先保存模型
# 保存模型
torch.save(vgg16_model.state_dict(), 'vgg16_gpu.pth')
# 打印保存成功的消息
print("模型已保存为 vgg16_gpu.pth")
? ? ? ? ?保存成功
? ? ? ? 然后进行测试
import matplotlib.pyplot as plt
import numpy as np
# 选择一张 test_loader 中的图片
sample_image, true_label = next(iter(test_loader))
# 将图片传递给模型进行预测
sample_image = sample_image.to(device)
with torch.no_grad():
model_output = vgg16_model(sample_image)
# 获取预测结果
_, predicted_label = torch.max(model_output, 1)
# 转换为 NumPy 数组
sample_image = sample_image.cpu().numpy()[0] # 将数据从 GPU 移回 CPU 并取出第一张图片
predicted_label = predicted_label[0].item()
true_label = true_label[0].item() # 直接获取标量值
# 获取类别标签
class_labels = ['crop', 'weed']
# 显示图像
plt.imshow(np.transpose(sample_image, (1, 2, 0))) # 转置图片的维度顺序
plt.title(f'TRUE LABEL IS: {class_labels[true_label]}, PREDICT LABEL IS: {class_labels[predicted_label]}')
plt.axis('off')
plt.show()
?
????????这里将GPU训练的模型保存到了vgg16_gpu.pth中,在CPU上进行加载。
class CustomVGG16(nn.Module):
def __init__(self):
super(CustomVGG16, self).__init__()
self.vgg16_model = models.vgg16(pretrained=True)
for param in self.vgg16_model.features.parameters():
param.requires_grad = False
num_features = self.vgg16_model.classifier[6].in_features
self.vgg16_model.classifier[6] = nn.Sequential(
nn.Linear(num_features, 512),
nn.ReLU(),
nn.Dropout(0.5),
nn.Linear(512, 2)
)
def forward(self, x):
return self.vgg16_model(x)
# 创建 CustomVGG16 模型实例
vgg16_model = CustomVGG16()
# 加载权重
vgg16_model.vgg16_model.load_state_dict(torch.load('vgg16_gpu.pth', map_location=torch.device('cpu')))
vgg16_model.eval()
# Assuming you have a DataLoader for the test dataset (test_loader)
all_predictions = []
all_labels = []
start_time = time.time()
with torch.no_grad():
for inputs, labels in test_loader:
inputs, labels = inputs.to(device), labels.to(device)
outputs = vgg16_model(inputs)
_, predicted = torch.max(outputs, 1)
all_predictions.extend(predicted.cpu().numpy())
all_labels.extend(labels.cpu().numpy())
end_time = time.time() # 记录结束时间
elapsed_time = end_time - start_time
print(f'测试集用的时间为: {elapsed_time:.2f} seconds')
f1 = f1_score(all_labels, all_predictions, average='binary') # 适用于二分类问题
print(f'F1分数为: {f1:.4f}')
? ? ? ? 可以看到时间还是比较快的,运行需要2.74s
? ? ? ? F1分数为0.9603?
????????这里使用Intel Extension for PyTorch来进行模型的优化
# 将模型移动到CPU
device = torch.device('cpu')
vgg16_model.to(device)
# 重新构建优化器
optimizer = optim.Adam(vgg16_model.parameters(), lr=0.001, weight_decay=1e-4)
# 使用Intel Extension for PyTorch进行优化
vgg16_model, optimizer = ipex.optimize(model=vgg16_model, optimizer=optimizer, dtype=torch.float32)
? ? ? ? 然后进行测试,可以看到时间还是比直接运行快了0.3s,但是可能因为本来运行时间就比较短,所以优化的效果不太明显
# 保存模型参数
torch.save(vgg16_model.state_dict(), 'vgg16_optimized_gpu.pth')
# 加载模型参数
loaded_model = CustomVGG16()
loaded_model.load_state_dict(torch.load('vgg16_optimized_gpu.pth'))
????????检查之前保存的优化后的模型是否存在
import os
import torch
# 检查文件是否存在
assert os.path.exists("./vgg16_optimized_gpu.pth"), "文件不存在"
# 尝试加载模型
model = torch.load("./vgg16_optimized_gpu.pth")
print("模型加载成功")
????????加载完成后以准确度为评估函数进行量化
from neural_compressor.config import PostTrainingQuantConfig, AccuracyCriterion
from neural_compressor import quantization
import os
# 加载模型
model = CustomVGG16()
model.load_state_dict(torch.load('vgg16_optimized_gpu.pth'))
model.to('cpu') # 将模型移动到 CPU
model.eval()
# 定义评估函数
def eval_func(model):
with torch.no_grad():
y_true = []
y_pred = []
for inputs, labels in train_loader:
inputs = inputs.to('cpu')
labels = labels.to('cpu')
preds_probs = model(inputs)
preds_class = torch.argmax(preds_probs, dim=-1)
y_true.extend(labels.numpy())
y_pred.extend(preds_class.numpy())
return accuracy_score(y_true, y_pred)
# 配置量化参数
conf = PostTrainingQuantConfig(backend='ipex', # 使用 Intel PyTorch Extension
accuracy_criterion=AccuracyCriterion(higher_is_better=True,
criterion='relative',
tolerable_loss=0.01))
# 执行量化
q_model = quantization.fit(model,
conf,
calib_dataloader=train_loader,
eval_func=eval_func)
# 保存量化模型
quantized_model_path = './quantized_models'
if not os.path.exists(quantized_model_path):
os.makedirs(quantized_model_path)
q_model.save(quantized_model_path)
? ? ? ? 运行完成后可得到以下输出结果?
???????
????????查看量化后的模型,分别为pt文件和json文件 ,然后可以查看量化后模型的大小为130.2MB,而量化之前的模型大小为520.2MB,量化之后缩小了接近4倍,空间占有率大大减小,量化的效果还是非常好的
? ? ? ? 加载模型
import torch
import json
from neural_compressor import quantization
# 指定量化模型的路径
quantized_model_path = './quantized_models'
# 加载 Qt 模型和 JSON 配置
vgg16_model_path = f'{quantized_model_path}/best_model.pt'
json_config_path = f'{quantized_model_path}/best_configure.json'
# 加载 Qt 模型
vgg16_model = torch.jit.load(vgg16_model_path, map_location='cpu')
# 加载 JSON 配置
with open(json_config_path, 'r') as json_file:
json_config = json.load(json_file)
? ? ? ? 进行推理
import torch
from sklearn.metrics import f1_score
import time
# 假设 test_loader 是你的测试数据加载器
# 请确保它返回 (inputs, labels) 的形式
# 将模型设置为评估模式
vgg16_model.eval()
# 初始化变量用于存储真实标签和预测标签
y_true = []
y_pred = []
# 开始推理
start_time = time.time()
# 设置 batch_size
batch_size = 64
# 使用 DataLoader 时设置 batch_size
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
# 在推理时处理每个批次
with torch.no_grad():
for inputs, labels in test_loader:
# 将输入数据移动到 CPU(如果尚未在 CPU 上)
inputs = inputs.to('cpu')
labels = labels.to('cpu')
# 获取模型预测
preds_probs = vgg16_model(inputs)
preds_class = torch.argmax(preds_probs, dim=-1)
# 扩展真实标签和预测标签列表
y_true.extend(labels.numpy())
y_pred.extend(preds_class.numpy())
# 计算 F1 分数
f1 = f1_score(y_true, y_pred, average='weighted')
# 计算推理时间
inference_time = time.time() - start_time
# 打印结果
print(f"F1 Score: {f1}")
print(f"Inference Time: {inference_time} seconds")
? ? ? ? 运行结果如下:?
import torch
from sklearn.metrics import f1_score
import time
# 假设 test_loader 是你的测试数据加载器
# 请确保它返回 (inputs, labels) 的形式
# 数据集放在testData文件夹里
test_dataset_path = './testData'
# 将测试集分类,分为crop和weed
for kinds in ['crop', 'weed']:
kinds_dir = os.path.join('testData', kinds)
os.makedirs(kinds_dir, exist_ok = True)
Data = [] # 存放数据集里全部数据
# 通过txt文件的第一个数字分辨是农作物还是杂草,获取第一个数字
for file in os.listdir('testData'):
if file.endswith(".txt"):
file_path = os.path.join(test_dataset_path, file)
with open(file_path, 'r') as f:
data = f.readline()
root, ext = os.path.splitext(file_path)
new_file_path = root + ".jpeg"
# 图片和标签一一对应
Data.append((new_file_path, int(data[0][0])))
# 将农作物和杂草分别复制到对应文件夹里
for file in Data:
if (file[1] == 0):
shutil.copy(file[0], 'testData/crop')
else:
shutil.copy(file[0], 'testData/weed')
# 数据增强
transform = transforms.Compose([
transforms.RandomResizedCrop(64),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
])
test_dataset = SelfDataset(root_dir=test_dataset_path, transform=transform)
# 将模型设置为评估模式
vgg16_model.eval()
# 初始化变量用于存储真实标签和预测标签
y_true = []
y_pred = []
# 开始推理
start_time = time.time()
# 设置 batch_size
batch_size = 64
# 使用 DataLoader 时设置 batch_size
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
# 在推理时处理每个批次
with torch.no_grad():
for inputs, labels in test_loader:
# 将输入数据移动到 CPU(如果尚未在 CPU 上)
inputs = inputs.to('cpu')
labels = labels.to('cpu')
# 获取模型预测
preds_probs = vgg16_model(inputs)
preds_class = torch.argmax(preds_probs, dim=-1)
# 扩展真实标签和预测标签列表
y_true.extend(labels.numpy())
y_pred.extend(preds_class.numpy())
# 计算 F1 分数
f1 = f1_score(y_true, y_pred, average='weighted')
# 计算推理时间
inference_time = time.time() - start_time
# 打印结果
print(f"测试集用的时间为: {inference_time} seconds")
print(f"F1分数: {f1}")
? ? ? ? 最后运行结果为
????????在使用oneAPI的优化组件以后,经过多次测试,推理的时间下降了0.3s左右,考虑到本身运行的时间就很短,且数据集也不算很大,所以优化后的效果不明显。如果之后还有其它机会,可以拿到更大的数据集进行测试,我相信Intel Extension for PyTorch的优化效果会更加明显。
????????然后,在使用量化工具以后,模型的空间占用大幅减小,且模型的精确度F1一直稳定在0.95左右。证明了oneAPI优秀的模型压缩能力,在保证模型精确度F1值的基础上还能够缩小模型的规模。