在机器学习的众多任务中,分类问题无疑是最基础也是最重要的一环。本文将介绍如何在PyTorch框架下,使用线性层和交叉熵损失函数来解决分类问题。我们将以简单的Iris数据集作为起点,探讨线性模型在处理线性可分数据上的有效性。随后,我们将尝试将同样的线性模型应用于复杂的CIFAR-10图像数据集,并分析其性能表现。
Iris数据集:一个经典的线性可分数据集,包含三个类别的鸢尾花,每个类别有50个样本,每个样本有4个特征。
CIFAR-10数据集:一个由10个类别组成的图像数据集,每个类别有6000张32x32彩色图像,总共有60000张图像。
read_data
函数负责从CSV文件中读取数据,随机打乱,划分训练集和测试集,并进行标准化处理。
def read_data(file_path, only_test = False, normalize = True):
np_data = pd.read_csv(file_path).values
np.random.shuffle(np_data)
classes = np.unique(np_data[:,-1])
class_dict = {}
for index, class_name in enumerate(classes):
class_dict[index] = class_name
class_dict[class_name] = index
train_src = np_data[:int(len(np_data)*0.8)]
test_src = np_data[int(len(np_data)*0.8):]
train_data = train_src[:,:-1]
train_labels = train_src[:, -1].reshape(-1,1)
test_data = test_src[:, :-1]
test_labels = test_src[:, -1].reshape(-1,1)
if (normalize):
mean = np.mean(train_data)
std = np.std(train_data)
train_data = (train_data - mean) / std
mean = np.mean(test_data)
std = np.std(test_data)
test_data = (test_data - mean) / std
if (only_test):
return test_data, test_labels, class_dict
return train_data, train_labels, test_data, test_labels, class_dict
Linear_classify
类定义了一个简单的线性模型,其中包含一个线性层。
class Linear_classify(th.nn.Module):
def __init__(self, *args, **kwargs) -> None:
super(Linear_classify, self).__init__()
self.linear = th.nn.Linear(args[0], args[1])
def forward(self, x):
y_pred = self.linear(x)
return y_pred
main
函数中,我们初始化模型、损失函数和优化器。然后,通过多次迭代来训练模型,并记录损失值的变化。file_path = "J:\\MachineLearning\\数据集\\Iris\\iris.data"
train_data, train_labels, test_data, test_labels, label_dict = read_data(file_path)
print(train_data.shape)
print(train_labels.shape)
print(label_dict)
int_labels = np.vectorize(lambda x: int(label_dict[x]))(train_labels).flatten()
print(int_labels[:10])
tensor_labels = th.from_numpy(int_labels).type(th.long)
num_classes = int(len(label_dict)/2)
train_data = th.from_numpy(train_data.astype("float32"))
print (train_data.shape)
print (train_data[:2])
linear_classifier = Linear_classify(int(train_data.shape[1]), int(len(label_dict)/2))
loss_function = th.nn.CrossEntropyLoss()
optimizer = th.optim.SGD(linear_classifier.parameters(), lr = 0.001)
epochs = 10000
best_loss = 100
turn_to_bad_loss_count = 0
loss_history = []
for epoch in range(epochs):
y_pred = linear_classifier(train_data)
#print(y_pred)
#print(y_pred.shape)
loss = loss_function(y_pred, tensor_labels)
if (float(loss.item()) > best_loss):
turn_to_bad_loss_count += 1
else:
best_loss = float(loss.item())
if (turn_to_bad_loss_count > 1000):
break
if (epoch % 10 == 0):
print("epoch {} loss is {}".format(epoch, loss))
loss_history.append(float(loss.item()))
loss.backward()
optimizer.step()
plt.plot(loss_history)
plt.show()
使用测试集数据评估模型的准确率,并通过可视化损失值的变化来分析模型的学习过程。
accuracy = []
for _ in range(10):
test_data, test_labels, label_dict = read_data(file_path, only_test = True)
test_result = linear_classifier(th.from_numpy(test_data.astype("float32")))
print(test_result[:10])
result_index = test_result.argmax(dim=1)
iris_name_result = np.vectorize(lambda x: str(label_dict[x]))(result_index).reshape(-1,1)
accuracy.append(len(iris_name_result[iris_name_result == test_labels]) / len(test_labels))
print("Accuracy is {}".format(np.mean(accuracy)))
Accuracy is 0.9466666666666667
def unpickle(file):
import pickle
with open(file, 'rb') as fo:
dict = pickle.load(fo, encoding='bytes')
return dict
def read_data(file_path, gray = False, percent = 0, normalize = True):
data_src = unpickle(file_path)
np_data = np.array(data_src["data".encode()]).astype("float32")
np_labels = np.array(data_src["labels".encode()]).astype("float32").reshape(-1,1)
single_data_length = 32*32
image_ret = None
if (gray):
np_data = (np_data[:, :single_data_length] + np_data[:, single_data_length:(2*single_data_length)] + np_data[:, 2*single_data_length : 3*single_data_length])/3
image_ret = np_data.reshape(len(np_data),32,32)
else:
image_ret = np_data.reshape(len(np_data),32,32,3)
if(normalize):
mean = np.mean(np_data)
std = np.std(np_data)
np_data = (np_data - mean) / std
if (percent == 0):
return np_data, np_labels, image_ret
else:
return np_data[:int(len(np_data)*percent)], np_labels[:int(len(np_labels)*percent)], image_ret[:int(len(image_ret)*percent)]
可见只是从2.x 下降到了1.x
train Accuracy is 0.6048
test Accuracy is 0.282
首先,我们有一个未归一化的输入向量 z z z,其形状为 ( n , ) (n,) (n,),其中 n n n 是类别的数量。我们希望将这个向量转化为一个概率分布,其中所有元素的总和为1。
我们可以通过以下步骤来计算 softmax 函数:
对 z z z 中的每个元素应用指数函数,得到一个新的向量 e z e^z ez。
计算 e z e^z ez 中的最大值,记作 z ^ \hat{z} z^。
对 e z e^z ez 中的每个元素减去 z ^ \hat{z} z^,得到一个新的向量 v v v。
对 v v v 中的每个元素应用指数函数,得到一个新的向量 e v e^v ev。
计算 e v e^v ev 中的最大值,记作 v ^ \hat{v} v^。
对 e v e^v ev 中的每个元素除以 v ^ \hat{v} v^,得到最终的概率分布。
以上步骤可以用以下的公式表示:
z = ( z 1 , z 2 , … , z n ) T e z = ( e z 1 , e z 2 , … , e z n ) T z ^ = m a x ( e z ) v = e z ? z ^ e v = ( e v 1 , e v 2 , … , e v n ) T v ^ = m a x ( e v ) p = e v v ^ \begin{align*} z &= (z_1, z_2, \ldots, z_n)^T \\ e^z &= (e^{z_1}, e^{z_2}, \ldots, e^{z_n})^T \\ \hat{z} &= max(e^z) \\ v &= e^z - \hat{z} \\ e^v &= (e^{v_1}, e^{v_2}, \ldots, e^{v_n})^T \\ \hat{v} &= max(e^v) \\ p &= \frac{e^v}{\hat{v}} \end{align*} zezz^vevv^p?=(z1?,z2?,…,zn?)T=(ez1?,ez2?,…,ezn?)T=max(ez)=ez?z^=(ev1?,ev2?,…,evn?)T=max(ev)=v^ev??
其中, p p p 是最终的概率分布。
结论:
通过实验,我们发现线性模型在Iris数据集上表现良好,但在CIFAR-10数据集上效果不佳。这说明线性模型在处理复杂的非线性问题时存在局限性。为了解决这一问题,我们将在后续的博客中介绍如何使用卷积神经网络来提高图像分类的准确率。
后记:
感谢您的阅读,希望本文能够帮助您了解如何在PyTorch中使用线性层和交叉熵损失函数进行数据分类。敬请期待我们的下一篇博客——“在PyTorch中使用卷积神经网络进行图像分类”。
import torch as th
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torchvision
def read_data(file_path, only_test = False, normalize = True):
np_data = pd.read_csv(file_path).values
np.random.shuffle(np_data)
classes = np.unique(np_data[:,-1])
class_dict = {}
for index, class_name in enumerate(classes):
class_dict[index] = class_name
class_dict[class_name] = index
train_src = np_data[:int(len(np_data)*0.8)]
test_src = np_data[int(len(np_data)*0.8):]
train_data = train_src[:,:-1]
train_labels = train_src[:, -1].reshape(-1,1)
test_data = test_src[:, :-1]
test_labels = test_src[:, -1].reshape(-1,1)
if (normalize):
mean = np.mean(train_data)
std = np.std(train_data)
train_data = (train_data - mean) / std
mean = np.mean(test_data)
std = np.std(test_data)
test_data = (test_data - mean) / std
if (only_test):
return test_data, test_labels, class_dict
return train_data, train_labels, test_data, test_labels, class_dict
class Linear_classify(th.nn.Module):
def __init__(self, *args, **kwargs) -> None:
super(Linear_classify, self).__init__()
self.linear = th.nn.Linear(args[0], args[1])
def forward(self, x):
y_pred = self.linear(x)
return y_pred
def main():
file_path = "J:\\MachineLearning\\数据集\\Iris\\iris.data"
train_data, train_labels, test_data, test_labels, label_dict = read_data(file_path)
print(train_data.shape)
print(train_labels.shape)
print(label_dict)
int_labels = np.vectorize(lambda x: int(label_dict[x]))(train_labels).flatten()
print(int_labels[:10])
tensor_labels = th.from_numpy(int_labels).type(th.long)
num_classes = int(len(label_dict)/2)
train_data = th.from_numpy(train_data.astype("float32"))
print (train_data.shape)
print (train_data[:2])
linear_classifier = Linear_classify(int(train_data.shape[1]), int(len(label_dict)/2))
loss_function = th.nn.CrossEntropyLoss()
optimizer = th.optim.SGD(linear_classifier.parameters(), lr = 0.001)
epochs = 10000
best_loss = 100
turn_to_bad_loss_count = 0
loss_history = []
for epoch in range(epochs):
y_pred = linear_classifier(train_data)
#print(y_pred)
#print(y_pred.shape)
loss = loss_function(y_pred, tensor_labels)
if (float(loss.item()) > best_loss):
turn_to_bad_loss_count += 1
else:
best_loss = float(loss.item())
if (turn_to_bad_loss_count > 1000):
break
if (epoch % 10 == 0):
print("epoch {} loss is {}".format(epoch, loss))
loss_history.append(float(loss.item()))
loss.backward()
optimizer.step()
plt.plot(loss_history)
plt.show()
plt.show()
accuracy = []
for _ in range(10):
test_data, test_labels, label_dict = read_data(file_path, only_test = True)
test_result = linear_classifier(th.from_numpy(test_data.astype("float32")))
print(test_result[:10])
result_index = test_result.argmax(dim=1)
iris_name_result = np.vectorize(lambda x: str(label_dict[x]))(result_index).reshape(-1,1)
accuracy.append(len(iris_name_result[iris_name_result == test_labels]) / len(test_labels))
print("Accuracy is {}".format(np.mean(accuracy)))
if (__name__ == "__main__"):
main()
import torch as th
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
def unpickle(file):
import pickle
with open(file, 'rb') as fo:
dict = pickle.load(fo, encoding='bytes')
return dict
def read_data(file_path, gray = False, percent = 0, normalize = True):
data_src = unpickle(file_path)
np_data = np.array(data_src["data".encode()]).astype("float32")
np_labels = np.array(data_src["labels".encode()]).astype("float32").reshape(-1,1)
single_data_length = 32*32
image_ret = None
if (gray):
np_data = (np_data[:, :single_data_length] + np_data[:, single_data_length:(2*single_data_length)] + np_data[:, 2*single_data_length : 3*single_data_length])/3
image_ret = np_data.reshape(len(np_data),32,32)
else:
image_ret = np_data.reshape(len(np_data),32,32,3)
if(normalize):
mean = np.mean(np_data)
std = np.std(np_data)
np_data = (np_data - mean) / std
if (percent == 0):
return np_data, np_labels, image_ret
else:
return np_data[:int(len(np_data)*percent)], np_labels[:int(len(np_labels)*percent)], image_ret[:int(len(image_ret)*percent)]
class Linear_classify(th.nn.Module):
def __init__(self, *args, **kwargs) -> None:
super(Linear_classify, self).__init__()
self.linear = th.nn.Linear(args[0], args[1])
def forward(self, x):
x = self.linear(x)
return x
def main():
file_path = "J:\\MachineLearning\\数据集\\cifar-10-batches-py\\data_batch_1"
train_data, train_labels, image_data = read_data(file_path, percent=0.5)
print(train_data.shape)
print(train_labels.shape)
print(image_data.shape)
'''
fig, axs = plt.subplots(3, 3)
for i, ax in enumerate(axs.flat):
image = image_data[i]
ax.imshow(image_data[i],cmap="rgb")
ax.axis('off') # 关闭坐标轴
plt.show()
'''
int_labels = train_labels.flatten()
print(int_labels[:10])
tensor_labels = th.from_numpy(int_labels).type(th.long)
num_classes = int(len(np.unique(int_labels)))
train_data = th.from_numpy(train_data)
print (train_data.shape)
print (train_data[:2])
linear_classifier = Linear_classify(int(train_data.shape[1]), num_classes)
loss_function = th.nn.CrossEntropyLoss()
optimizer = th.optim.SGD(linear_classifier.parameters(), lr = 0.01)
epochs = 7000
best_loss = 100
turn_to_bad_loss_count = 0
loss_history = []
for epoch in range(epochs):
y_pred = linear_classifier(train_data)
#print(y_pred)
#print(y_pred.shape)
loss = loss_function(y_pred, tensor_labels)
if (float(loss.item()) > best_loss):
turn_to_bad_loss_count += 1
else:
best_loss = float(loss.item())
if (turn_to_bad_loss_count > 100):
break
if (epoch % 10 == 0):
print("epoch {} loss is {}".format(epoch, loss))
loss_history.append(float(loss.item()))
loss.backward()
optimizer.step()
plt.plot(loss_history)
plt.show()
plt.show()
test_result = linear_classifier(train_data)
print(test_result[:10])
result_index = test_result.argmax(dim=1).reshape(-1,1)
accuracy = (len(result_index[result_index.detach().numpy() == train_labels]) / len(train_labels))
print("train Accuracy is {}".format(accuracy))
file_path = "J:\\MachineLearning\\数据集\\cifar-10-batches-py\\test_batch"
test_data, test_labels, image_data = read_data(file_path)
test_result = linear_classifier(th.from_numpy(test_data))
print(test_result[:10])
result_index = test_result.argmax(dim=1).reshape(-1,1)
accuracy = (len(result_index[result_index.detach().numpy() == test_labels]) / len(test_labels))
print("test Accuracy is {}".format(accuracy))
if (__name__ == "__main__"):
main()