目录
打卡
序列标注
条件随机场(Conditional Random Field, CRF)
SCORE计算
SCORE 计算的代码
Normalizer计算
Normalizer 计算的代码实现
Viterbi算法
CRF层组装
BiLSTM+CRF模型
模型实例化
模型训练
模型预测
新造一个句子重新预测看效果
序列标注指给定输入序列,给序列中每个Token进行标注标签的过程。序列标注问题通常用于从文本中进行信息抽取,包括分词(Word Segmentation)、词性标注(Position Tagging)、命名实体识别(Named Entity Recognition, NER)等。
一种常见的命名实体识别的标注方法——“BIOE”标注,将一个实体(Entity)的开头标注为B,其他部分标注为I,非实体标注为O。
例子:
输入序列 | 清 | 华 | 大 | 学 | 座 | 落 | 于 | 首 | 都 | 北 | 京 |
输出标注 | B | I | I | I | O | O | O | O | O | B | I |
如上表所示,`清华大学` 和 `北京`是地名,需要将其识别,我们对每个输入的单词预测其标签,最后根据标签来识别实体。
对序列进行标注,实际上是对序列中每个Token进行标签预测,可以直接视作简单的多分类问题。但是序列标注不仅仅需要对单个Token进行分类预测,同时相邻Token直接有关联关系。即,对于“BIOE”标注方法,I前必须是B或I。
将命名实体识别视为多分类问题,则每个词的预测概率都是独立的,易产生类似的问题,因此需要引入一种能够学习到此种关联关系的算法来保证预测结果的正确性。而条件随机场是适合此类场景的一种概率图模型(概率图模型利用图的结构来表达变量间的概率关系,简化了复杂系统的概率计算。在概率图模型中,节点通常代表随机变量,边则代表变量间的依赖关系。)。
下面对条件随机场的定义和参数化形式进行简析。
考虑到序列标注问题的线性序列特点,本节所述的条件随机场特指线性链条件随机场(Linear Chain CRF)。
设 为输入序列,,为输出的标注序列,其中 为序列的最大长度, 表示 对应的所有可能的输出序列集合。则输出序列 的概率为:设 , 为序列的第 i 个Token和对应的标签,则Score 需要能够在计算 和 的映射的同时,捕获相邻标签 和 之间的关系,因此我们定义两个概率函数:
- 发射概率函数 :表示 的概率。
- 转移概率函数 :表示 的概率。
则可以得到 Score 的计算公式:
设标签集合为,构造大小为 的矩阵 ,用于存储标签间的转移概率;由编码层(如Dense、LSTM等) 输出的隐状态 可以直接视作发射概率,此时 的计算公式可以转化为:
根据上述公式,使用 MindSpore 来实现 CRF 的参数化形式。
首先实现CRF层的前向训练部分,将CRF和损失函数做合并,选择分类问题常用的负对数似然函数(Negative Log Likelihood, NLL),则有:
由公式(1)与(4)可得,
根据公式(5),被减数为 Normalizer,减数为 Score,分别实现后相减得到最终 Loss。
注意上面两个概率函数的逻辑:(两个逻辑:输入决定输出的发射概率 + 上一个输出决定下一个输出 的转移概率)
上面推到的结论:计算出 Normalizer 和 Score,就可获得最终 Loss = Normalizer - Score。
CRF完整推导的论文:https://www.cs.columbia.edu/~mcollins/crf.pdf
首先根据公式(3),计算正确标签序列所对应的得分,需要注意,除了转移概率矩阵 P 外,还需要维护两个大小为 |𝑇| 的向量,分别作为序列开始和结束时的转移概率。同时引入了一个掩码矩阵𝑚𝑎𝑠𝑘 ,将多个序列打包为一个Batch时填充的值忽略,使得 Score 计算仅包含有效的Token。
def compute_score(emissions, tags, seq_ends, mask, trans, start_trans, end_trans): """ emissions: 表示每个时间步(序列位置)和每个批次中每个可能标签的发射概率。形状为(seq_length, batch_size, num_tags)。 tags: 表示预测的标签序列,形状为(seq_length, batch_size)。 seq_ends: 表示每个批次序列结束的位置。 mask: 表示有效的时间步(忽略填充部分),形状为(seq_length, batch_size)。 trans: 表示标签之间的转移概率矩阵,形状为(num_tags, num_tags)。 start_trans: 表示初始状态到每个标签的转移概率,形状为(num_tags,)。 end_trans: 表示每个标签到结束状态的转移概率,形状为(num_tags,)。 """ # emissions: (seq_length, batch_size, num_tags) # tags: (seq_length, batch_size) # mask: (seq_length, batch_size) # 获取序列长度seq_length和批次大小batch_size。 seq_length, batch_size = tags.shape # 将mask转换为与emissions相同的数据类型。 mask = mask.astype(emissions.dtype) # 将score设置为初始转移概率 # shape: (batch_size,) 初始化得分score为初始状态到序列第一个标签的转移概率,并加上第一个时间步的发射概率。 score = start_trans[tags[0]] # 选择每个批次第一个时间步的标签对应的初始转移概率。 # score += 第一次发射概率 # shape: (batch_size,) score += emissions[0, mnp.arange(batch_size), tags[0]] ## 加上第一个时间步的发射概率。 ## 遍历序列的其余时间步(从第二个时间步开始): for i in range(1, seq_length): # 标签由i-1转移至i的转移概率(当mask == 1时有效) # shape: (batch_size,) 在每个时间步,加上前一个标签到当前标签的转移概率,乘以mask确保只计算有效时间步。 score += trans[tags[i - 1], tags[i]] * mask[i] # 预测tags[i]的发射概率(当mask == 1时有效) # shape: (batch_size,) 加上当前时间步的发射概率。 score += emissions[i, mnp.arange(batch_size), tags[i]] * mask[i] # 结束转移---对于每个批次,计算最后一个有效时间步的标签到结束状态的转移概率,并加到score上。 # shape: (batch_size,) last_tags = tags[seq_ends, mnp.arange(batch_size)] # score += 结束转移概率, 返回计算得到的总得分score。 # shape: (batch_size,) 加上最后一个有效时间步的标签到结束状态的转移概率。 score += end_trans[last_tags] return score
根据公式 (5) ,Normalizer是 x 对应的所有可能的输出序列的 Score 的对数指数和(Log-Sum-Exp)。若按穷举法进行计算,需要将每个可能的输出序列Score都计算一遍,共有 个结果,此处采用动态规划算法,通过复用计算结果来提高效率。
假设需要计算从第 0 至第 i 个Token所有可能的输出序列得分 ,则可以先计算出从第 0 至第 个Token所有可能的输出序列得分 。因此,Normalizer 可以改写为以下形式:
其中 为第i 个Token的发射概率,是转移矩阵。由于发射概率矩阵 和转移概率矩阵 P 独立于 y 的序列路径计算,可以将其提出,可得:
def compute_normalizer(emissions, mask, trans, start_trans, end_trans): """给定发射概率、转移概率、初始和结束转移概率下,所有可能标签序列的总概率的对数。 emissions: 表示每个时间步(序列位置)和每个批次中每个可能标签的发射概率。形状为(seq_length, batch_size, num_tags)。 mask: 表示有效的时间步(忽略填充部分),形状为(seq_length, batch_size)。 trans: 表示标签之间的转移概率矩阵,形状为(num_tags, num_tags)。 start_trans: 表示初始状态到每个标签的转移概率,形状为(num_tags,)。 end_trans: 表示每个标签到结束状态的转移概率,形状为(num_tags,)。 """ # emissions: (seq_length, batch_size, num_tags) # mask: (seq_length, batch_size) seq_length = emissions.shape[0] # 将score设置为初始转移概率,并加上第一次发射概率 # shape: (batch_size, num_tags) 初始化得分score为初始状态到每个标签的转移概率加上第一个时间步的发射概率。 score = start_trans + emissions[0] # 遍历序列的其余时间步(从第二个时间步开始) for i in range(1, seq_length): # 扩展score的维度用于总score的计算 # shape: (batch_size, num_tags, 1) 将得分score扩展一个维度,以便进行广播操作。 broadcast_score = score.expand_dims(2) # 扩展emission的维度用于总score的计算 # shape: (batch_size, 1, num_tags) 将当前时间步的发射概率emissions[i]也扩展一个维度,以便进行广播操作。 broadcast_emissions = emissions[i].expand_dims(1) # 根据公式(7),计算score_i # 此时broadcast_score是由第0个到当前Token所有可能路径 # 对应score的log_sum_exp # shape: (batch_size, num_tags, num_tags) 计算下一个时间步的所有可能标签的得分,这包括前一个时间步的得分、转移概率和当前时间步的发射概率。 next_score = broadcast_score + trans + broadcast_emissions # 对score_i做log_sum_exp运算,用于下一个Token的score计算 # shape: (batch_size, num_tags) next_score = ops.logsumexp(next_score, axis=1) # 当mask == 1时,score才会变化 # shape: (batch_size, num_tags) score = mnp.where(mask[i].expand_dims(1), next_score, score) # 最后加结束转移概率 # shape: (batch_size, num_tags) score += end_trans # 对所有可能的路径得分求log_sum_exp # shape: (batch_size,) return ops.logsumexp(score, axis=1)
在完成前向训练部分后,需要实现解码部分。这里我们选择适合求解序列最优路径的Viterbi算法。与计算Normalizer类似,使用动态规划求解所有可能的预测序列得分。不同的是在解码时同时需要将第𝑖个Token对应的score取值最大的标签保存,供后续使用Viterbi算法求解最优预测序列使用。
取得最大概率得分Score ,以及每个Token对应的标签历史History 后,根据Viterbi算法可以得到公式:
从第0个至第i 个Token对应概率最大的序列,只需要考虑从第0个至第(i-1) 个Token对应概率最大的序列,以及从第i 个至第 (i-1) 个概率最大的标签即可。因此我们逆序求解每一个概率最大的标签,构成最佳的预测序列。
由于mindSpore静态图语法限制,我们将Viterbi算法求解最佳预测序列的部分作为后处理函数,不纳入后续CRF层的实现。
def viterbi_decode(emissions, mask, trans, start_trans, end_trans): # emissions: (seq_length, batch_size, num_tags) # mask: (seq_length, batch_size) seq_length = mask.shape[0] score = start_trans + emissions[0] history = () for i in range(1, seq_length): broadcast_score = score.expand_dims(2) broadcast_emission = emissions[i].expand_dims(1) next_score = broadcast_score + trans + broadcast_emission # 求当前Token对应score取值最大的标签,并保存 indices = next_score.argmax(axis=1) history += (indices,) next_score = next_score.max(axis=1) score = mnp.where(mask[i].expand_dims(1), next_score, score) score += end_trans return score, history def post_decode(score, history, seq_length): # 使用Score和History计算最佳预测序列 batch_size = seq_length.shape[0] seq_ends = seq_length - 1 # shape: (batch_size,) best_tags_list = [] # 依次对一个Batch中每个样例进行解码 for idx in range(batch_size): # 查找使最后一个Token对应的预测概率最大的标签, # 并将其添加至最佳预测序列存储的列表中 best_last_tag = score[idx].argmax(axis=0) best_tags = [int(best_last_tag.asnumpy())] # 重复查找每个Token对应的预测概率最大的标签,加入列表 for hist in reversed(history[:seq_ends[idx]]): best_last_tag = hist[idx][best_tags[-1]] best_tags.append(int(best_last_tag.asnumpy())) # 将逆序求解的序列标签重置为正序 best_tags.reverse() best_tags_list.append(best_tags) return best_tags_list
将上述前向训练和解码部分的代码组装完整的CRF层。使用nn.Cell
进行封装。
考虑到输入序列可能存在 Padding 的情况,CRF的输入需要考虑输入序列的真实长度,因此除发射矩阵和标签外,加入 seq_length
参数传入序列 Padding 前的长度,并实现生成mask矩阵的sequence_mask
方法。
import mindspore as ms import mindspore.nn as nn import mindspore.ops as ops import mindspore.numpy as mnp from mindspore.common.initializer import initializer, Uniform def sequence_mask(seq_length, max_length, batch_first=False): """根据序列实际长度和最大长度生成mask矩阵""" range_vector = mnp.arange(0, max_length, 1, seq_length.dtype) result = range_vector < seq_length.view(seq_length.shape + (1,)) if batch_first: return result.astype(ms.int64) return result.astype(ms.int64).swapaxes(0, 1) class CRF(nn.Cell): def __init__(self, num_tags: int, batch_first: bool = False, reduction: str = 'sum') -> None: if num_tags <= 0: raise ValueError(f'invalid number of tags: {num_tags}') super().__init__() if reduction not in ('none', 'sum', 'mean', 'token_mean'): raise ValueError(f'invalid reduction: {reduction}') self.num_tags = num_tags self.batch_first = batch_first self.reduction = reduction self.start_transitions = ms.Parameter(initializer(Uniform(0.1), (num_tags,)), name='start_transitions') self.end_transitions = ms.Parameter(initializer(Uniform(0.1), (num_tags,)), name='end_transitions') self.transitions = ms.Parameter(initializer(Uniform(0.1), (num_tags, num_tags)), name='transitions') def construct(self, emissions, tags=None, seq_length=None): if tags is None: return self._decode(emissions, seq_length) return self._forward(emissions, tags, seq_length) def _forward(self, emissions, tags=None, seq_length=None): if self.batch_first: batch_size, max_length = tags.shape emissions = emissions.swapaxes(0, 1) tags = tags.swapaxes(0, 1) else: max_length, batch_size = tags.shape if seq_length is None: seq_length = mnp.full((batch_size,), max_length, ms.int64) mask = sequence_mask(seq_length, max_length) # shape: (batch_size,) numerator = compute_score(emissions, tags, seq_length-1, mask, self.transitions, self.start_transitions, self.end_transitions) # shape: (batch_size,) denominator = compute_normalizer(emissions, mask, self.transitions, self.start_transitions, self.end_transitions) # shape: (batch_size,) llh = denominator - numerator if self.reduction == 'none': return llh if self.reduction == 'sum': return llh.sum() if self.reduction == 'mean': return llh.mean() return llh.sum() / mask.astype(emissions.dtype).sum() def _decode(self, emissions, seq_length=None): if self.batch_first: batch_size, max_length = emissions.shape[:2] emissions = emissions.swapaxes(0, 1) else: batch_size, max_length = emissions.shape[:2] if seq_length is None: seq_length = mnp.full((batch_size,), max_length, ms.int64) mask = sequence_mask(seq_length, max_length) ### viterbi_decode 前述算法 return viterbi_decode(emissions, mask, self.transitions, self.start_transitions, self.end_transitions)
设计一个双向 LSTM+CRF 的模型来进行命名实体识别任务的训练。
class BiLSTM_CRF(nn.Cell): def __init__(self, vocab_size, embedding_dim, hidden_dim, num_tags, padding_idx=0): super().__init__() self.embedding = nn.Embedding( vocab_size, ## 词表长度 embedding_dim, ## 每个嵌入向量的大小。 padding_idx=padding_idx) self.lstm = nn.LSTM(embedding_dim, hidden_dim // 2, bidirectional=True, batch_first=True) self.hidden2tag = nn.Dense( hidden_dim, num_tags, ### 标签集长度 'he_uniform') self.crf = CRF( num_tags, batch_first=True) def construct(self, inputs, seq_length, tags=None): embeds = self.embedding(inputs) outputs, _ = self.lstm(embeds, seq_length=seq_length) feats = self.hidden2tag(outputs) crf_outs = self.crf(feats, tags, seq_length) return crf_outs ### 生成两句例子和对应的标签,并构造词表和标签表。 embedding_dim = 16 hidden_dim = 32 training_data = [( "清 华 大 学 坐 落 于 首 都 北 京".split(), "B I I I O O O O O B I".split() ), ( "重 庆 是 一 个 魔 幻 城 市".split(), "B I O O O O O O O".split() )] word_to_idx = {} word_to_idx[''] = 0 for sentence, tags in training_data: for word in sentence: if word not in word_to_idx: word_to_idx[word] = len(word_to_idx) tag_to_idx = {"B": 0, "I": 1, "O": 2}
由于CRF层已经进行了NLLLoss的计算,因此不需要再设置Loss。
model = BiLSTM_CRF( len(word_to_idx), ## 词表长度 embedding_dim, ## embedding大小 hidden_dim, ## 隐藏层大小 len(tag_to_idx)) ## 标签集长度 optimizer = nn.SGD( model.trainable_params(), learning_rate=0.01, weight_decay=1e-4) ### 前向传播和训练步骤定义 grad_fn = ms.value_and_grad(model, None, optimizer.parameters) def train_step(data, seq_length, label): loss, grads = grad_fn(data, seq_length, label) optimizer(grads) return loss def prepare_sequence(seqs, word_to_idx, tag_to_idx): # 将生成的数据打包成Batch,按照序列最大长度,对长度不足的序列进行填充,分别返回输入序列、输出标签和序列长度构成的Tensor。 seq_outputs, label_outputs, seq_length = [], [], [] max_len = max([len(i[0]) for i in seqs]) for seq, tag in seqs: seq_length.append(len(seq)) idxs = [word_to_idx[w] for w in seq] labels = [tag_to_idx[t] for t in tag] idxs.extend([word_to_idx[''] for i in range(max_len - len(seq))]) labels.extend([tag_to_idx['O'] for i in range(max_len - len(seq))]) seq_outputs.append(idxs) label_outputs.append(labels) return ms.Tensor(seq_outputs, ms.int64), \ ms.Tensor(label_outputs, ms.int64), \ ms.Tensor(seq_length, ms.int64)
设计的模型查看:
输出的模型实例化例子查看:
from tqdm import tqdm steps = 500 with tqdm(total=steps) as t: for i in range(steps): loss = train_step(data, seq_length, label) t.set_postfix(loss=loss) t.update(1)
500步长的训练过程查看:
score, history = model(data, seq_length) print(score) predict = post_decode(score, history, seq_length) print(predict) ######################### ## 最后将预测的index序列转换为标签序列,打印输出结果,查看效果。 idx_to_tag = {idx: tag for tag, idx in tag_to_idx.items()} def sequence_to_tag(sequences, idx_to_tag): outputs = [] for seq in sequences: outputs.append([idx_to_tag[i] for i in seq]) return outputs
从预测输出结果查看,没有什么误差,毕竟训练集就那两句话,我们新造一个句子重新识别,看效果怎么样。
首先,新造的句子两个地点关键字不变,词表做一个扩充,如下,词表由20个增加到26个。新造的句子 new_data 是 20个字符组成。
由此看到,效果不行,可以理解,毕竟训练样本少,复杂一点就不能识别了。