深度学习(九):bert之代码实现

发布时间:2023年12月26日

任务1: Masked LM

即随机屏蔽(masking)部分输入token,然后只预测那些被屏蔽的token。在模型中,随机地屏蔽了每个序列中15%的WordPiece token。
训练数据生成器随机选择15%的token。例如在这个句子“my dog is hairy”中,它选择的token是“hairy”。然后,执行以下过程:

数据生成器将执行以下操作,而不是始终用[MASK]替换所选单词:

80%的时间:用[MASK]标记替换单词,例如,my dog is hairy → my dog is [MASK]

10%的时间:用一个随机的单词替换该单词,例如,my dog is hairy → my dog is apple

10%的时间:保持单词不变,例如,my dog is hairy → my dog is hairy.
这样做的目的是将表示偏向于实际观察到的单词。

Transformer encoder不知道它将被要求预测哪些单词或哪些单词已被随机单词替换,因此它被迫保持每个输入token的分布式上下文表示。此外,因为随机替换只发生在所有token的1.5%(即15%的10%),这似乎不会损害模型的语言理解能力。

任务2:下一句预测

语料中50%的句子,选择其相应的下一句一起形成上下句,作为正样本;其余50%的句子随机选择一句非下一句一起形成上下句,作为负样本。而后进行训练,有利于sentence-level tasks,例如问答。

总的来说,BERT本质上是在海量语料的基础上,通过自监督学习的方法为单词学习一个好的特征表示。该模型的优点是可以根据具体的人物进行微调,或者直接使用预训练的模型作为特征提取器。

代码实现

model.py

预训练模型

  1. https://huggingface.co/bert-base-chinese
  2. 在我的资源里

模型


import torch.nn as nn
from transformers import BertModel
from config import parsers
import torch


class MyModel(nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()
        self.args = parsers()
        self.device = "cuda:0" if torch.cuda.is_available() else "cpu"
        # 加载 bert 中文预训练模型
        self.bert = BertModel.from_pretrained(self.args.bert_pred)
        # 让 bert 模型进行微调(参数在训练过程中变化)
        for param in self.bert.parameters():
            param.requires_grad = True
        # 全连接层
        self.linear = nn.Linear(self.args.num_filters, self.args.class_num)

    def forward(self, x):
        input_ids, attention_mask = x[0].to(self.device), x[1].to(self.device)
        hidden_out = self.bert(input_ids, attention_mask=attention_mask,
                               output_hidden_states=False)  # 控制是否输出所有encoder层的结果
        # shape (batch_size, hidden_size)  pooler_output -->  hidden_out[0]
        pred = self.linear(hidden_out.pooler_output)
        # 返回预测结果
        return pred

# bert的输出结果有四个维度: last_hidden_state:shape是(batch_size, sequence_length, hidden_size),hidden_size=768,它是模型最后一层输出的隐藏状态。
# pooler_output:shape是(batch_size, hidden_size),这是序列的第一个token(classification token)的最后一层的隐藏状态,它是由线性层和Tanh激活函数进一步处理的。
# (通常用于句子分类,至于是使用这个表示,还是使用整个输入序列的隐藏状态序列的平均化或池化,视情况而定)

# hidden_states:这是输出的一个可选项,如果输出,需要指定config.output_hidden_states=True,它也是一个元组,它的第一个元素是embedding,其余元素是各层的输出,每个元素的形状是(
# batch_size, sequence_length, hidden_size)
# attentions:这也是输出的一个可选项,如果输出,需要指定config.output_attentions=True, 它也是一个元组,它的元素是每一层的注意力权重,用于计算self-attention heads的加权平均值。

# cross_attentions:shape是(batch_size, num_heads, encoder_sequence_length, embed_size_per_head)

# 我们是微调模式,需要获取bert最后一个隐藏层的输出输入到下一个全连接层,所以取第一个维度,也就是hiden_outputs[0]

这段代码定义了一个名为MyModel的深度学习模型,该模型是PyTorch的nn.Module的子类。这个模型使用了预训练的BERT模型,并在其基础上添加了一个全连接层进行分类。

在__init__方法中,首先调用了父类的初始化方法,然后定义了一些属性。self.args是一个解析器对象,它可能包含了一些模型的参数。self.device是一个字符串,表示模型运行的设备,如果有可用的CUDA设备,就使用第一个CUDA设备,否则使用CPU。

接着,加载了预训练的BERT模型。self.bert是一个BertModel对象,它是从self.args.bert_pred指定的路径加载的。然后,设置了BERT模型的所有参数的requires_grad属性为True,这意味着在训练过程中,BERT模型的参数会被更新。

然后,定义了一个全连接层self.linear,它的输入维度是self.args.num_filters,输出维度是self.args.class_num。这个全连接层将被用来将BERT模型的输出转换为分类预测。

在forward方法中,首先将输入数据x移动到指定的设备,然后将x分解为input_ids和attention_mask。接着,将这两个输入传递给BERT模型,得到隐藏状态hidden_out。然后,将hidden_out.pooler_output传递给全连接层,得到预测结果pred。最后,返回预测结果。

数据集

util.py


import os
from config import parsers
# transformer库是一个把各种预训练模型集成在一起的库,导入之后,你就可以选择性的使用自己想用的模型,这里使用的BERT模型。
# 所以导入了bert模型,和bert的分词器,这里是对bert的使用,而不是bert自身的源码。
from transformers import BertTokenizer
from torch.utils.data import Dataset, DataLoader
import torch


def read_data(file):
    # 读取文件
    all_data = open(file, "r", encoding="utf-8").read().split("\n")
    # 得到所有文本、所有标签、句子的最大长度
    texts, labels, max_length = [], [], []
    for data in all_data:
        if data:
            text, label = data.split("\t")
            max_length.append(len(text))
            texts.append(text)
            labels.append(label)
    # 根据不同的数据集返回不同的内容
    if os.path.split(file)[1] == "train.txt":
        max_len = max(max_length)
        return texts, labels, max_len
    return texts, labels,


class MyDataset(Dataset):
    def __init__(self, texts, labels, max_length):
        self.all_text = texts
        self.all_label = labels
        self.max_len = max_length
        self.tokenizer = BertTokenizer.from_pretrained(parsers().bert_pred)

    def __getitem__(self, index):
        # 取出一条数据并截断长度
        text = self.all_text[index][:self.max_len]
        label = self.all_label[index]

        # 分词
        text_id = self.tokenizer.tokenize(text)
        # 加上起始标志
        text_id = ["[CLS]"] + text_id

        # 编码
        token_id = self.tokenizer.convert_tokens_to_ids(text_id)
        # 掩码  -》
        mask = [1] * len(token_id) + [0] * (self.max_len + 2 - len(token_id))
        # 编码后  -》长度一致
        token_ids = token_id + [0] * (self.max_len + 2 - len(token_id))
        # str -》 int
        label = int(label)

        # 转化成tensor
        token_ids = torch.tensor(token_ids)
        mask = torch.tensor(mask)
        label = torch.tensor(label)

        return (token_ids, mask), label

    def __len__(self):
        # 得到文本的长度
        return len(self.all_text)


if __name__ == "__main__":
    train_text, train_label, max_len = read_data("./data/train.txt")
    print(train_text[0], train_label[0])
    trainDataset = MyDataset(train_text, train_label, max_len)
    trainDataloader = DataLoader(trainDataset, batch_size=3, shuffle=False)
    for batch_text, batch_label in trainDataloader:
        print(batch_text, batch_label)


这段代码主要包含两个部分:一个是read_data函数,用于读取和处理数据;另一个是MyDataset类,用于创建PyTorch的数据集。

read_data函数接收一个文件路径作为参数,然后读取该文件中的所有数据。数据文件中的每一行都包含一个文本和一个标签,它们之间用制表符\t分隔。函数首先将文件中的所有数据读取到一个列表中,然后遍历这个列表,对每一行数据进行处理。处理的过程中,会将文本和标签分别添加到texts和labels两个列表中,同时计算每个文本的长度,并将长度添加到max_length列表中。如果数据文件是训练数据,函数还会计算出所有文本的最大长度max_len,然后返回texts、labels和max_len;如果数据文件不是训练数据,函数只返回texts和labels。

MyDataset类是PyTorch的Dataset类的子类,用于创建数据集。在初始化方法中,接收文本、标签和最大长度作为参数,并将它们保存为实例属性。然后,创建一个BERT分词器,用于将文本分词和编码。在__getitem__方法中,接收一个索引,然后返回该索引对应的数据和标签。数据的处理过程包括:截断文本、分词、添加起始标志、编码、创建掩码、将标签转换为整数、将数据和标签转换为张量。在__len__方法中,返回数据集中的文本数量。

这段代码的最后部分是一个测试代码,用于测试read_data函数和MyDataset类的功能。首先,使用read_data函数读取训练数据,然后使用MyDataset类创建数据集,接着创建一个数据加载器,最后遍历数据加载器,打印出每个批次的数据和标签。

配置

config.py


import argparse
import os.path


def parsers():
    parser = argparse.ArgumentParser(description="Bert model of argparse")
    parser.add_argument("--train_file", type=str, default=os.path.join("D:\PycharmProjects\Multimodal emotion/dataset/text", "train.txt"))
    parser.add_argument("--dev_file", type=str, default=os.path.join("D:\PycharmProjects\Multimodal emotion/dataset/text", "valid.txt"))
    parser.add_argument("--test_file", type=str, default=os.path.join("D:\PycharmProjects\Multimodal emotion/dataset/text", "test.txt"))
    parser.add_argument("--classification", type=str, default=os.path.join("D:\PycharmProjects\Multimodal emotion/dataset/text", "class.txt"))
    parser.add_argument("--bert_pred", type=str, default="D:\PycharmProjects\Multimodal emotion/model/bert-base-chinese")
    parser.add_argument("--class_num", type=int, default=3)
    parser.add_argument("--max_len", type=int, default=47)
    parser.add_argument("--batch_size", type=int, default=32)
    parser.add_argument("--epochs", type=int, default=10)
    parser.add_argument("--learn_rate", type=float, default=1e-5)
    parser.add_argument("--num_filters", type=int, default=768)
    parser.add_argument("--save_model_best", type=str, default=os.path.join("D:\PycharmProjects\Multimodal emotion\output/text/bert/model", "best_model.pth"))
    parser.add_argument("--save_model_last", type=str, default=os.path.join("D:\PycharmProjects\Multimodal emotion\output/text/bert/model", "last_model.pth"))
    args = parser.parse_args()
    return args

这段代码定义了一个名为parsers的函数,该函数使用argparse库来解析命令行参数。这些参数包括训练、验证和测试数据的文件路径,BERT预训练模型的路径,分类的数量,最大文本长度,批次大小,训练周期数,学习率,BERT模型的过滤器数量,以及最佳模型和最后模型的保存路径。

在函数中,首先创建了一个argparse.ArgumentParser对象,然后使用add_argument方法添加了一系列的命令行参数。每个参数都有一个名称,一个类型,以及一个默认值。例如,"–train_file"参数的类型是字符串,其默认值是训练数据的文件路径。

在添加完所有参数后,使用parse_args方法解析命令行参数,并将结果保存在args变量中。最后,函数返回args变量。

这种方式的好处是,可以在命令行中方便地指定或更改参数,而无需修改代码。此外,argparse库还提供了一些其他功能,如生成帮助和使用消息,处理错误等。

训练

main.py


import torch
from utils import read_data, MyDataset
from config import parsers
from torch.utils.data import DataLoader
from model import MyModel
from torch.optim import AdamW
import torch.nn as nn
from sklearn.metrics import accuracy_score
import time
from test import test_data
from tqdm import tqdm

if __name__ == "__main__":
    start = time.time()
    args = parsers()

    device = "cuda:0" if torch.cuda.is_available() else "cpu"

    train_text, train_label, max_len = read_data(args.train_file)
    dev_text, dev_label = read_data(args.dev_file)
    args.max_len = max_len

    train_dataset = MyDataset(train_text, train_label, args.max_len)
    train_dataloader = DataLoader(train_dataset, batch_size=args.batch_size, shuffle=True)

    dev_dataset = MyDataset(dev_text, dev_label, args.max_len)
    dev_dataloader = DataLoader(dev_dataset, batch_size=args.batch_size, shuffle=False)

    model = MyModel().to(device)
    opt = AdamW(model.parameters(), lr=args.learn_rate)
    loss_fn = nn.CrossEntropyLoss()

    acc_max = float("-inf")
    for epoch in range(args.epochs):
        print(f"Epoch {epoch + 1}")
        loss_sum, count = 0, 0
        model.train()
        for batch_index, (batch_text, batch_label) in enumerate(tqdm(train_dataloader)):
            batch_label = batch_label.to(device)
            pred = model(batch_text)

            loss = loss_fn(pred, batch_label)
            opt.zero_grad()
            loss.backward()
            opt.step()
            loss_sum += loss
            count += 1

        model.eval()
        all_pred, all_true = [], []
        with torch.no_grad():
            for batch_text, batch_label in tqdm(dev_dataloader):
                batch_label = batch_label.to(device)
                pred = model(batch_text)

                pred = torch.argmax(pred, dim=1).cpu().numpy().tolist()
                label = batch_label.cpu().numpy().tolist()

                all_pred.extend(pred)
                all_true.extend(label)

        acc = accuracy_score(all_pred, all_true)
        print(f"dev acc:{acc:.4f}")
        if acc > acc_max:
            print(acc, acc_max)
            acc_max = acc
            torch.save(model.state_dict(), args.save_model_best)
            print(f"以保存最佳模型")

    torch.save(model.state_dict(), args.save_model_last)

    end = time.time()
    print(f"运行时间:{(end-start)/60%60:.4f} min")
    test_data()

这段代码是一个深度学习模型的训练过程。首先,导入了所需的库和模块,包括PyTorch、数据处理函数、配置解析函数、数据加载器、模型、优化器、损失函数、评估指标等。

在主函数中,首先获取了当前时间,用于计算整个训练过程的运行时间。然后,调用parsers函数解析命令行参数,并将结果保存在args变量中。接着,检查是否有可用的CUDA设备,如果有,就使用第一个CUDA设备,否则使用CPU。

然后,使用read_data函数读取训练数据和验证数据,并将最大文本长度保存在args.max_len中。接着,使用MyDataset类创建训练数据集和验证数据集,然后使用DataLoader类创建对应的数据加载器。

接着,创建了一个MyModel模型,并将其移动到指定的设备。然后,创建了一个AdamW优化器,用于更新模型的参数。同时,定义了一个交叉熵损失函数。

然后,进入训练循环。在每个训练周期中,首先将模型设置为训练模式,然后遍历训练数据加载器,对每个批次的数据进行处理。处理的过程包括:将标签移动到指定的设备,将数据传递给模型得到预测结果,计算损失,清空优化器的梯度,反向传播,更新参数,累加损失。

在每个训练周期结束后,将模型设置为评估模式,然后遍历验证数据加载器,对每个批次的数据进行处理。处理的过程包括:将标签移动到指定的设备,将数据传递给模型得到预测结果,将预测结果和标签转换为列表,然后添加到总的预测结果和真实标签中。最后,计算验证集的准确率,如果准确率超过之前的最高准确率,就保存模型。

在所有训练周期结束后,保存最后的模型,然后计算并打印出整个训练过程的运行时间。最后,调用test_data函数进行测试。

评估

test.py


import torch
from tqdm import tqdm
from utils import read_data, MyDataset
from config import parsers
from torch.utils.data import DataLoader
from model import MyModel
from sklearn.metrics import accuracy_score
import numpy as np
import matplotlib.pyplot as plt

def max_iter(true_labels,predicted_labels):
    # 获取类别数量
    num_classes = max(max(true_labels), max(predicted_labels)) + 1

    # 计算混淆矩阵
    confusion_matrix = np.zeros((num_classes, num_classes), dtype=int)
    for true_label, predicted_label in zip(true_labels, predicted_labels):
        confusion_matrix[true_label][predicted_label] += 1

    # 类别标签
    labels = [f'Class {i}' for i in range(num_classes)]

    # 绘制混淆矩阵
    fig, ax = plt.subplots()
    im = ax.imshow(confusion_matrix, cmap='Blues')

    # 设置颜色条
    cbar = ax.figure.colorbar(im, ax=ax)

    # 设置标签
    ax.set(xticks=np.arange(num_classes),
        yticks=np.arange(num_classes),
        xticklabels=labels, yticklabels=labels,
        title='Confusion Matrix',
        ylabel='True label',
        xlabel='Predicted label')

    # 在矩阵方格中显示数值
    thresh = confusion_matrix.max() / 2.
    for i in range(num_classes):
        for j in range(num_classes):
            ax.text(j, i, format(confusion_matrix[i, j], 'd'),
                    ha="center", va="center",
                    color="white" if confusion_matrix[i, j] > thresh else "black")

    # 自动调整布局
    fig.tight_layout()
    plt.savefig("D:\PycharmProjects\Multimodal emotion\output/text/混淆矩阵.jpg")
    # 显示图形
    plt.show()

def test_data():
    args = parsers()
    device = "cuda:0" if torch.cuda.is_available() else "cpu"

    test_text, test_label = read_data(args.test_file)
    test_dataset = MyDataset(test_text, test_label, args.max_len)
    test_dataloader = DataLoader(test_dataset, batch_size=args.batch_size, shuffle=False)

    model = MyModel().to(device)
    model.load_state_dict(torch.load(args.save_model_best))
    model.eval()

    all_pred, all_true = [], []
    with torch.no_grad():
        for batch_text, batch_label in tqdm(test_dataloader):
            
            batch_label, batch_label = batch_label.to(device), batch_label.to(device)
            pred = model(batch_text)
            pred = torch.argmax(pred, dim=1)
            
            pred = pred.cpu().numpy().tolist()
            label = batch_label.cpu().numpy().tolist()
            
            all_pred.extend(pred)
            all_true.extend(label)
    with open(".\output/test.txt", "w") as file:
        for item1, item2, item3 in zip(test_text, test_label,all_pred):
            file.write(f"{item1}\t{item2}\t{item3}\n")
            
    accuracy = accuracy_score(all_true, all_pred)

    print(f"test dataset accuracy:{accuracy:.4f}")
    max_iter(all_true,all_pred)


if __name__ == "__main__":
    test_data()

这段代码是一个深度学习模型的测试过程。首先,检查是否有可用的CUDA设备,如果有,就使用第一个CUDA设备,否则使用CPU。

然后,使用read_data函数读取测试数据,使用MyDataset类创建测试数据集,然后使用DataLoader类创建对应的数据加载器。

接着,创建了一个MyModel模型,并将其移动到指定的设备。然后,加载保存的最佳模型的参数,并将模型设置为评估模式。

然后,进入测试循环。在测试循环中,首先创建了两个空列表all_pred和all_true,用于保存所有的预测结果和真实标签。然后,遍历测试数据加载器,对每个批次的数据进行处理。处理的过程包括:将标签移动到指定的设备,将数据传递给模型得到预测结果,将预测结果的最大值的索引作为预测的类别,然后将预测结果和标签转换为列表,最后添加到总的预测结果和真实标签中。

在测试循环结束后,将测试文本、真实标签和预测结果写入到一个文本文件中。然后,计算测试集的准确率,并打印出来。最后,调用max_iter函数进行一些额外的处理。

预测

predict.py


from model import MyModel
from config import parsers
import torch
from transformers import BertTokenizer
import time
import csv
import pandas as pd

def write_to_csv(preds):
    # 读取已有的CSV文件
    df = pd.read_csv('predictions.csv')

    # 添加新的列
    df['bert'] = preds

    # 写入CSV文件
    df.to_csv('predictions.csv', index=False)

def load_model(device, model_path):
    myModel = MyModel().to(device)
    myModel.load_state_dict(torch.load(model_path))
    myModel.eval()
    return myModel


def process_text(text, bert_pred):
    tokenizer = BertTokenizer.from_pretrained(bert_pred)
    token_id = tokenizer.convert_tokens_to_ids(["[CLS]"] + tokenizer.tokenize(text))
    mask = [1] * len(token_id) + [0] * (args.max_len + 2 - len(token_id))
    token_ids = token_id + [0] * (args.max_len + 2 - len(token_id))
    token_ids = torch.tensor(token_ids).unsqueeze(0)
    mask = torch.tensor(mask).unsqueeze(0)
    x = torch.stack([token_ids, mask])
    return x


                    
def text_class_name(pred):
    result = torch.argmax(pred, dim=1)
    result = result.cpu().numpy().tolist()
    classification = open(args.classification, "r", encoding="utf-8").read().split("\n")
    classification_dict = dict(zip(range(len(classification)), classification))
    #print(f"文本:{text}\t预测的类别为:{classification_dict[result[0]]}")
    return result[0]
    
    
if __name__ == "__main__":
    # Convert integer values in preds to a list of lists
    
    start = time.time()
    args = parsers()
    device = "cuda:0" if torch.cuda.is_available() else "cpu"

    model = load_model(device, args.save_model_best)
    # 打开文件
    file = open("dataset\\text\\test.txt", "r", encoding="utf-8")

    # 按行读取文件内容
    lines = file.readlines()
    texts=[]
    # 处理每一行
    for line in lines:
        text, number = line.strip().split("\t")
        texts.append(text)
        # 现在,你可以使用text和number了

    # 关闭文件
    file.close()

    preds=[]

    print("模型预测结果:")
    for text in texts:
        x = process_text(text, args.bert_pred)
        with torch.no_grad():
            pred = model(x)
        
        d=text_class_name(pred)
        preds.append(int(d))
    end = time.time()
    
    write_to_csv(preds)
    print(f"耗时为:{end - start} s")
    

这段代码主要包含了三个函数:write_to_csv,load_model和process_text。

write_to_csv函数用于将预测结果写入到一个CSV文件中。首先,使用pandas的read_csv函数读取已有的CSV文件,并将结果保存在df变量中。然后,向df中添加一个新的列bert,其值为传入的预测结果preds。最后,使用to_csv函数将df写入到CSV文件中,其中index=False表示不保存索引。

load_model函数用于加载预训练的模型。首先,创建一个MyModel模型,并将其移动到指定的设备。然后,使用load_state_dict函数加载保存的模型参数。最后,将模型设置为评估模式,并返回模型。

process_text函数用于处理文本数据。首先,使用BertTokenizer.from_pretrained函数加载预训练的BERT分词器。然后,使用分词器将文本分词,并添加[CLS]标记,然后将分词结果转换为对应的ID。接着,创建一个掩码列表mask,其长度与ID列表相同,值全为1,然后将其扩展到args.max_len + 2的长度,扩展部分的值为0。同时,将ID列表扩展到args.max_len + 2的长度,扩展部分的值为0。然后,将ID列表和掩码列表转换为张量,并增加一个维度。最后,使用torch.stack函数将ID张量和掩码张量堆叠起来,作为模型的输入。

结果

在这里插入图片描述

文章来源:https://blog.csdn.net/m0_68926749/article/details/135219239
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。