现今技术日新月异, Artificial Intelligence 的发展正在迅速的改变我们的生活和工作方式. 尤其是在自然语言处理 (Natural Linguistic Processing) 和计算机视觉 (Computer Vision) 等领域.
传统的多模态对话研究主要集中在单一用户与系统之间的交互, 而忽视了多用户场景的复杂性. 视觉信息 (Visual Info) 往往会被边缘化, 仅作为维嘉信息而非对话的核心部分. 在实际应用中, 算法需要 “观察” 并与多个用户的交互, 这些用户有可能不是当前的发言人.
【CCF BDCI 2023】多模态多方对话场景下的发言人识别, 核心思想是通过多轮连续对话的内容和每轮对应的帧, 以及对应的人脸 bbox 和 name label, 从每轮对话中识别出发言人 (speaker).
书接上文, 在上一篇博客中小白带大家详解了 Baseline 中的 CNN 模型部分. 今天我们来详解一下 NLP 部分. 包括 Roberta 和 Deberta 模型及其应用.
文本处理是 NLP 任务的第一步. 我们需要将原始文本转化成模型可以处理的格式.
步骤包含:
Rachel
对应 token id 5586
Chandler
对应 token id 13814
Phoebe
对应 token id 18188
[CLS]
: token id 101
, 表示句子的开始[SEP]
: token id 102
, 表示分隔句子或文本片段[PAD]
: token id 0
, 表示填充 (Padding), 当文本为达到指定长度时, 例如 512, 会用[PAD]
进行填充[MASK]
: token id 0
, 表示填充 (Padding), 当文本为达到指定长度时, 例如 512, 会用[PAD]
进行填充上述字符在 Bert & Bert-like 模型中扮演着至关重要的角色, 在不同的任务重, 这些 Token ID 都是固定的, 例如 Bert 为 30522 个.
FYI: 上面的超链接是 jieba 分词的一个简单示例.
词嵌入 (Word Embedding) 是将文本中的词汇映射到向量空间的过程. 词向量 (Word Vector) 对应为词汇的语义信息, 具有相似含义的词汇在向量空间中距离接近.
常见的词嵌入技术包含:
FYI: 想要了解词向量和 Word2Vec 的具体原理, 参考我上面超链接的博客.
在多方对话中, 上下文的理解至关重要, 包括对话的语境, 参与者之间的关系和对话的流程.
具体技术:
SpeakerIdentificationDataset
是用于加载多模态多方对话场景下的发言人识别任务中的数据的一个类. 下面小白带大家来逐行解析.
to_device 函数左右为将数据移动到指定设备, 例如 GPU:0
.
def to_device(obj, dev):
if isinstance(obj, dict):
return {k: to_device(v, dev) for k, v in obj.items()}
if isinstance(obj, list):
return [to_device(v, dev) for v in obj]
if isinstance(obj, tuple):
return tuple([to_device(v, dev) for v in obj])
if isinstance(obj, torch.Tensor):
return obj.to(dev)
return obj
obj, dict
则递归的对这个字典的每个值进行to_device
操作, 将结果汇总在一个新的字典上, key 不变, value.to(device)obj, list
则递归对列表的每个元素镜像``to_device```操作, 将结果汇总在一个新的列表上obj, tuple
, 同理, 返回元组obj, torch.tensor
, 将张量移动到指定的设备, 如: CPU->GPUclass SpeakerIdentificationDataset:
def __init__(self, base_folder, bos_token='<bos>', split='train', dataset='friends', data_aug=False, debug=False):
self.base_folder = base_folder
self.debug = debug
self.dataset = dataset
self.split = split
self.bos_token = bos_token
if dataset == 'friends':
if split == 'test':
metadata = json.load(open(os.path.join(base_folder, 'test-metadata.json')))
else:
if data_aug:
metadata = json.load(open(os.path.join(base_folder, 'train-metadata-aug.json')))
else:
metadata = json.load(open(os.path.join(base_folder, 'train-metadata.json')))
self.examples = list()
for dialog_data in metadata:
# 我们选择s01作为验证集好了
if split == 'valid' and not dialog_data[0]['frame'].startswith('s01'):
continue
if split == 'train' and dialog_data[0]['frame'].startswith('s01'):
continue
self.examples.append(dialog_data)
else:
if dataset == 'ijcai2019':
self.examples = [json.loads(line) for line in open(os.path.join(base_folder, '%s.json' % (split.replace('valid', 'dev'))))]
if dataset == 'emnlp2016':
self.examples = [json.loads(line) for line in open(os.path.join(base_folder, '10_%s.json' % (split.replace('valid', 'dev'))))]
self.examples = [example for example in self.examples if len(example['ctx_spk']) != len(set(example['ctx_spk']))]
和前面的 CNN Dataset 一样, 还是使用 s01 的 dialog 数据做为 valid, 剩下的作为 train.
def __len__(self):
return len(self.examples) if not self.debug else 32
def __getitem__(self, index):
example = self.examples[index]
if self.dataset == 'friends':
speakers, contents, frame_names = [i['speaker'] for i in example], [i['content'] for i in example], [i['frame'] for i in example]
else:
speakers, contents = example['ctx_spk'], example['context']
frame_names = ['%d-%d' % (index, i) for i in range(len(speakers))]
labels = list()
for i, speaker_i in enumerate(speakers):
for j, speaker_j in enumerate(speakers):
if i != j and speaker_i == speaker_j:
labels.append([i, j])
input_text = self.bos_token + self.bos_token.join(contents)
return input_text, labels, frame_names
这么说可能大家有点晕, 我来大大家拿 train 的第一个 dialog 演示一下.
Dialog[0] (sample), 5 句话组成:
[{"frame": "s06e07-000377", "speaker": "phoebe", "content": "Yeah, I know because you have all the good words. What do I get? I get \"it\u2019s,\" \"and\" oh I'm sorry, I have \"A.\" Forget it.", "start": 297, "end": 491, "faces": [[[752, 135, 881, 336], "rachel"], [[395, 111, 510, 329], "leslie"]]}, {"frame": "s06e07-000504", "speaker": "rachel", "content": "Phoebe, come on that's silly.", "start": 498, "end": 535, "faces": [[[466, 129, 615, 328], "phoebe"]]}, {"frame": "s06e07-000552", "speaker": "phoebe", "content": "All right, so let's switch.", "start": 535, "end": 569, "faces": [[[426, 120, 577, 320], "phoebe"]]}, {"frame": "s06e07-000629", "speaker": "rachel", "content": "No, I have all of the good words. OK, fine, fine, we can switch.", "start": 569, "end": 689, "faces": [[[420, 125, 559, 328], "phoebe"], [[652, 274, 771, 483], "rachel"]]}, {"frame": "s06e07-000892", "speaker": "phoebe", "content": "Please...wait, how did you do that?", "start": 879, "end": 906, "faces": [[[424, 133, 573, 334], "phoebe"], [[816, 197, 925, 399], "bonnie"]]}]
得到的 input_test:
<bos>Yeah, I know because you have all the good words. What do I get? I get "it’s," "and" oh I'm sorry, I have "A." Forget it.<bos>Phoebe, come on that's silly.<bos>All right, so let's switch.<bos>No, I have all of the good words. OK, fine, fine, we can switch.<bos>Please...wait, how did you do that?
得到的 labels:
[[0, 2], [0, 4], [1, 3], [2, 0], [2, 4], [3, 1], [4, 0], [4, 2]]
得到的 frame_names:
['s06e07-000377', 's06e07-000504', 's06e07-000552', 's06e07-000629', 's06e07-000892']
具体说明一下 labels 部分, 上述的 dialog 中的 5 个发言人, 依次为:
其中:
所以我们可以得到:
然后补充一下 input_text 部分:
<bos>
就是一个特殊的 Token ID, 用于表示句子的开始, 帮助模型在生成文本和处理序列时确定起始点<bos>
可以用来分隔不同的语句补充, <sep>
和<bos>
区别:
<bos>
用于标记句子的开始, <sep>
用于分隔句子的不同部分Collator 类的主要作用是将批次 (Batch) 样本, tokenize 后转换为模型需要的输入格式.
def __init__(self, tokenizer, max_length=512, temperature=1.0, use_turn_emb=False):
self.tokenizer = tokenizer
self.max_length = max_length
self.temperature = temperature
self.use_turn_emb = use_turn_emb
self.print_debug = True
def __call__(self, examples):
input_texts = [i[0] for i in examples]
labels = [i[1] for i in examples]
frame_names = [i[2] for i in examples]
model_inputs = self.tokenizer(input_texts, add_special_tokens=False, truncation=True, padding='longest', max_length=self.max_length, return_tensors='pt')
model_inputs = dict(model_inputs)
new_labels = list()
for input_id, label in zip(model_inputs['input_ids'], labels):
num_bos_tokens = torch.sum(input_id == self.tokenizer.bos_token_id).item()
label = [l for l in label if l[0] < num_bos_tokens and l[1] < num_bos_tokens] # 如果遇到了truncation,将被truncate掉的turn删除
new_labels.append(torch.tensor(label))
model_inputs['labels'] = new_labels
举个例子:
input_text:
Yeah, I know because you have all the good words. What do I get? I get "it’s," "and" oh I'm sorry, I have "A." Forget it.[CLS]Phoebe, come on that's silly.[CLS]All right, so let's switch.[CLS]No, I have all of the good words. OK, fine, fine, we can switch.[CLS]Please...wait, how did you do that?
tokenize 后:
[101, 3398, 1010, 1045, 2113, 2138, 2017, 2031, 2035, 1996, 2204, 2616, 1012, 2054, 2079, 1045, 2131, 1029, 1045, 2131, 1000, 2009, 1521, 1055, 1010, 1000, 1000, 1998, 1000, 2821, 1045, 1005, 1049, 3374, 1010, 1045, 2031, 1000, 1037, 1012, 1000, 5293, 2009, 1012, 101, 18188, 1010, 2272, 2006, 2008, 1005, 1055, 10021, 1012, 101, 2035, 2157, 1010, 2061, 2292, 1005, 1055, 6942, 1012, 101, 2053, 1010, 1045, 2031, 2035, 1997, 1996, 2204, 2616, 1012, 7929, 1010, 2986, 1010, 2986, 1010, 2057, 2064, 6942, 1012, 101, 3531, 1012, 1012, 1012, 3524, 1010, 2129, 2106, 2017, 2079, 2008, 1029, 102]
注: 这边我用的是 Bert [CLS]
, 等同于<bos>
new_labels:
[[0, 2], [0, 4], [1, 3], [2, 0], [2, 4], [3, 1], [4, 0], [4, 2]]
因为上面的 5 个句子加起来并没有达到 512 个词, 所以 label 并没有进行删减. 如果 比如<bos
只有4 个, 即最后一个句子被裁剪 (truncation) 了, 此时就要去掉所有包括句子 5 的 label.
假设上面句子只有三句半, new_labels 为:
[[0, 2], [1, 3], [2, 0], [3, 1]]
if self.use_turn_emb:
model_inputs['token_type_ids'] = torch.cumsum(model_inputs['input_ids'] == self.tokenizer.bos_token_id, dim=1)
model_inputs['frame_names'] = frame_names
model_inputs['temperature'] = self.temperature
计算轮次嵌入: 使用torch.cumsum
函数计算累积和. model_inputs['input_ids'] == self.tokenizer.bos_token_id
创建了一个布尔张亮, 每个句子开始<bos>
标记的位置为 True, 其他位置为 False
对话:
Yeah, I know because you have all the good words. What do I get? I get "it’s," "and" oh I'm sorry, I have "A." Forget it.
[CLS]
Phoebe, come on that's silly.
[CLS]
All right, so let's switch.
[CLS]
No, I have all of the good words. OK, fine, fine, we can switch.
[CLS]
Please...wait, how did you do that?
轮次嵌入前的 token_type_ids:
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
轮次嵌入后的 token_type_ids:
[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5]
类似:
1: ["Yeah", ",", "I", "know", "because", "you", "have", "all", "the", "good", "words", ".", "What", "do", "I", "get", "?", "I", "get", "it’s", ",", "and", "oh", "I", "'m", "sorry", ",", "I", "have", "A", ".", "Forget", "it", "."]
2: ["Phoebe", ",", "come", "on", "that", "'s", "silly", "."]
3: ["All", "right", ",", "so", "let", "'s", "switch", "."]
4: ["No", ",", "I", "have", "all", "of", "the", "good", "words", ".", "OK", ",", "fine", ",", "fine", ",", "we", "can", "switch", "."]
5: ["Please", "...", "wait", ",", "how", "did", "you", "do", "that", "?"]
轮次嵌入的作用:
轮次嵌入对处理对话和交互文本时至关重要. 轮次嵌入为模型提供了关于每个单词属于哪个对话, 对于模型理解对话结构和上下文非常重要.
在上面的例子中, 我们有 5 句话组成的 dialog, 经过轮次嵌入, 第一句的单词会被标记为 1, 第二句为 2, 第三句为 3, 第四句为 4, 第五句为 5. 通过 1, 2, 3, 4, 5 的标记, 可以帮助模型区分不同句子的语境, 以更好的处理每个对话.
Roberta (Robustly Optimized BERT Approach) 是一种基于 BERT (Bidirectional Encoder Representations from Transformers) 的 NLP 模型.
Roberta 在 Bert 的基础上创新了训练过程和数据处理方式. First, Roberta 使用的语料库更大, 数据更难多, 模型更更好的理解和处理复杂的语言模式. Second, Roberta 取消了 Bert 中的下句预测 (Next Sentence Prediction). Third, Roberta 对输入数据的处理方式也进行了优化, 具体表现为更长序列进行的训练, 因此 Roberta 的长文本处理能力也更为优秀.
构造函数:
def __init__(self, config):
super().__init__(config)
self.bos_token_id = config.bos_token_id
self.loss_fct = CrossEntropyLoss(reduction='none')
...以下省略
def forward(...):
...以上省略
outputs = self.roberta(...)
last_hidden_state = outputs[0]
...以下省略
这边的计算发言人相似度分为两个模式, 分别为推理模式 (Inference Mode) 和训练模式 (Training Mode).
在 labels == None 的时候, 模型进行推理模式 (Inference Mode). 在这种模式下, 模型的主要任务是计算并返回每个句子的隐层状态和相似度得分, 而不是进行模型的训练. 用于 valid 和 test.
if labels is None:
# inference mode
selected_hidden_state_list, logits_list = list(), list()
for i, (hidden_state, input_id) in enumerate(zip(last_hidden_state, input_ids)):
indices = input_id == self.bos_token_id
selected_hidden_state = hidden_state[indices]
if not self.linear_sim:
selected_hidden_state = F.normalize(selected_hidden_state, p=2, dim=-1)
logits = torch.matmul(selected_hidden_state, selected_hidden_state.t())
logits += torch.eye(len(logits), device=logits.device) * -100000.0 # set elements on the diag to -inf
else:
num_sents, hidden_size = selected_hidden_state.size()
# concatenated_hidden_state = torch.zeros(num_sents, num_sents, hidden_size*4, device=selected_hidden_state.device)
concatenated_hidden_state = torch.zeros(num_sents, num_sents, hidden_size*3, device=selected_hidden_state.device)
concatenated_hidden_state[:, :, :hidden_size] = selected_hidden_state.unsqueeze(1)
concatenated_hidden_state[:, :, hidden_size:hidden_size*2] = selected_hidden_state.unsqueeze(0)
concatenated_hidden_state[:, :, hidden_size*2:hidden_size*3] = torch.abs(selected_hidden_state.unsqueeze(0) - selected_hidden_state.unsqueeze(1))
# concatenated_hidden_state[:, :, hidden_size*3:hidden_size*4] = selected_hidden_state.unsqueeze(0) + selected_hidden_state.unsqueeze(1)
logits = self.sim_head(self.dropout(concatenated_hidden_state)).squeeze() # 但要注意,这里的logits就不能保证是在0-1之间了。需要过sigmoid才能应用在之后的任务中
selected_hidden_state_list.append(selected_hidden_state) # 不同对话的轮数可能不一样,所以结果可能不能stack起来。
logits_list.append(logits)
推理模式下的步骤:
sim_head
来计算不同句子之间的相似度得分. 这些得分表示句子间的相似性, 用于判断是否是同一个发言人 (Speaker)当 label != None, 模型进行训练模式 (Training Mode). 在这种模式下, 模型的主要任务是通过损失函数来优化模型.
else:
# training mode
selected_hidden_state_list = list()
batch_size = len(labels)
for hidden_state, input_id in zip(last_hidden_state, input_ids):
indices = input_id == self.bos_token_id
selected_hidden_state = hidden_state[indices]
if not self.linear_sim:
selected_hidden_state = F.normalize(selected_hidden_state, p=2, dim=-1)
selected_hidden_state_list.append(selected_hidden_state)
losses, logits_list = list(), list()
for i, (selected_hidden_state, label) in enumerate(zip(selected_hidden_state_list, labels)):
if not self.linear_sim:
other_selected_hidden_states = torch.cat([selected_hidden_state_list[j] for j in range(batch_size) if j != i])
all_selected_hidden_states = torch.cat([selected_hidden_state, other_selected_hidden_states])
logits = torch.matmul(selected_hidden_state, all_selected_hidden_states.t())
logits += torch.cat([torch.eye(len(logits), device=logits.device) * -100000.0, torch.zeros(len(logits), len(other_selected_hidden_states), device=logits.device)], dim=-1)
if label.numel():
losses.append(self.loss_fct(logits[label[:, 0]] / temperature, label[:, 1]))
else:
num_sents, hidden_size = selected_hidden_state.size()
# concatenated_hidden_state = torch.zeros(num_sents, num_sents, hidden_size*4, device=selected_hidden_state.device)
concatenated_hidden_state = torch.zeros(num_sents, num_sents, hidden_size*3, device=selected_hidden_state.device)
concatenated_hidden_state[:, :, :hidden_size] = selected_hidden_state.unsqueeze(1)
concatenated_hidden_state[:, :, hidden_size:hidden_size*2] = selected_hidden_state.unsqueeze(0)
concatenated_hidden_state[:, :, hidden_size*2:hidden_size*3] = torch.abs(selected_hidden_state.unsqueeze(0) - selected_hidden_state.unsqueeze(1))
# concatenated_hidden_state[:, :, hidden_size*3:hidden_size*4] = selected_hidden_state.unsqueeze(0) + selected_hidden_state.unsqueeze(1)
logits = self.sim_head(self.dropout(concatenated_hidden_state)).squeeze() # 但要注意,这里的logits就不能保证是在0-1之间了。需要过sigmoid才能应用在之后的任务中
# 使用mse作为loss。loss包括两部分,一个是和gold的,一个是和自己的转置的
logits = nn.Sigmoid()(logits)
real_labels = torch.zeros_like(logits)
if label.numel():
real_labels[label[:, 0], label[:, 1]] = 1
real_labels += torch.eye(len(logits), device=logits.device)
loss = nn.MSELoss()(real_labels, logits) + nn.MSELoss()(logits, logits.transpose(0, 1))
losses.append(loss)
logits_list.append(logits)
loss = torch.mean(torch.stack(losses))
return MaskedLMOutput(loss=loss, logits=logits_list, hidden_states=selected_hidden_state_list)
训练模式下的具体步骤:
以防大家没看懂, 下面我们来逐行解析:
提取隐层状态:
selected_hidden_state_list = list()
for hidden_state, input_id in zip(last_hidden_state, input_ids):
indices = input_id == self.bos_token_id
selected_hidden_state = hidden_state[indices]
if not self.linear_sim:
selected_hidden_state = F.normalize(selected_hidden_state, p=2, dim=-1)
selected_hidden_state_list.append(selected_hidden_state)
<bos>
标注每个句子开始, 并选取对应句子的隐藏状态线性层计算相似度:
losses, logits_list = list(), list()
for i, (selected_hidden_state, label) in enumerate(zip(selected_hidden_state_list, labels)):
# 根据配置选择相似度计算方法
if not self.linear_sim:
# 非线性相似度计算
...
else:
# 线性相似度计算
concatenated_hidden_state = ...
logits = self.sim_head(self.dropout(concatenated_hidden_state)).squeeze()
logits = nn.Sigmoid()(logits)
real_labels = torch.zeros_like(logits)
if label.numel():
real_labels[label[:, 0], label[:, 1]] = 1
real_labels += torch.eye(len(logits), device=logits.device)
loss = nn.MSELoss()(real_labels, logits) + nn.MSELoss()(logits, logits.transpose(0, 1))
losses.append(loss)
logits_list.append(logits)
sim_head
来计算句子间的相似度得分loss_similarity = nn.MSELoss()(real_labels, logits)
loss_symmetry = nn.MSELoss()(logits, logits.transpose(0, 1))
loss = loss_similarity + loss_symmetry
注: Baseline 代码为loss = nn.MSELoss()(real_labels, logits) + nn.MSELoss()(logits, logits.transpose(0, 1))
, 我就是拆开了而已, 勿喷.
Deberta (Decoding-enhanced Bert with Disentangled Attention) 也是一种 NLP 模型. Deberta 在 Bert (Bidirectional Encoder Representations from Transformers) 和 Roberta (Robustly Optimized Bert Approach) 的基础上进行了创新和改进, 主要为独特的注意力机制 (Attention) 和编码策略, 使得 Deberta 在 NLP 任务重表现出色.
Deberta 的主要创新点:
Deberta 构造函数:
def __init__(self, config):
super().__init__(config)
self.bos_token_id = config.bos_token_id
self.loss_fct = CrossEntropyLoss(reduction='none')
同 Roberta
同 cnn
同 cnn