编码器
在序列到序列模型中,编码器将输入序列(如一个句子)转换为一个隐藏状态序列,供解码器生成输出。编码层通常由嵌入层和RNN(如GRU/LSTM)等组成
- Token:是模型处理文本时的基本单元,可以是词,子词,字符等,每个token都有一个对应的ID。是由原始文本中的词或子词通过分词器(Tokenizer)处理后得到的最小单位,这些 token 会被映射为词汇表中的唯一索引 ID
- 输入:
- 原始输入序列:通常是一个句子的词汇ID序列。例如 [“Hello”, “world”] 可能会被映射为 [1, 2],假设“Hello”的ID是1,“world”的ID是2;
- 嵌入向量: token ID 列表 [1, 2] 会作为模型的输入,每一个token即1和2经过嵌入层(embedding layer)都会转换为’encoder_embed_dim’大小的向量,即两个嵌入向量。嵌入向量最初是由模型在训练过程中学到的,初始时通常是随机的。在训练的过程中,嵌入向量会调整,使得语义相似的词在向量空间中更接近。
- 编码层(通常是RNN,GRU,LSTM,Transformer):
- 作用: 处理输入序列的时间依赖性,并生成隐藏状态
- 输入:嵌入向量序列,形状为(seq_len, embed_dim)
- 输出: 编码器对每个时间步(即每个token)计算一个隐藏状态,这些隐藏状态组成了一个隐藏状态序列,捕捉了当前 token 以及它的上下文信息,每个时间步的隐藏状态不仅考虑当前词的嵌入向量,还结合了之前所有时间步的信息。对于长度为seq_len的输入序列,隐藏状态序列的形状为 (seq_len, hidden_dim)。对于 LSTM 或 GRU,编码器还会输出最后一个时间步的隐藏状态,供解码器初始化使用
- 输出:
- 隐藏状态序列:编码器处理整个输入序列后,输出的隐藏状态序列通常被称为 encoder_outputs。其中每个隐藏状态序列对应于输入序列中的一个 token,这个序列的形状是 (seq_len, batch_size, hidden_dim)
- 最终隐藏状态:编码器的最后一个时间步的隐藏状态通常被用作解码器的初始状态。这被称为 encoder_hiddens,形状为 (num_layers, batch_size, hidden_dim),在双向RNN中,这个向量可能会有两倍的维度,变为(num_layers, batch_size, 2*hidden_dim)
'''定义了一个用于自然语言处理的编码器类 RNNEncoder,该类继承自 FairseqEncoder,
并实现了一个双向 GRU(门控循环单元,Gated Recurrent Unit)来对输入的文本进行编码'''
class RNNEncoder(FairseqEncoder):def __init__(self, args, dictionary, embed_tokens):super().__init__(dictionary)self.embed_tokens = embed_tokens # 嵌入层,用于将 token 索引转换为嵌入向量。self.embed_dim = args.encoder_embed_dim # 嵌入维度self.hidden_dim = args.encoder_ffn_embed_dim # 隐藏层维度self.num_layers = args.encoder_layers # GRU层数self.dropout_in_module = nn.Dropout(args.dropout)# 双向GRU层,用于处理输入序列self.rnn = nn.GRU(self.embed_dim, self.hidden_dim, self.num_layers, dropout=args.dropout, batch_first=False, bidirectional=True)self.dropout_out_module = nn.Dropout(args.dropout)self.padding_idx = dictionary.pad() # 填充索引,用于处理可变长度的输入序列。def combine_bidir(self, outs, bsz: int): # outs: 双向RNN的输出,[seq_len, batch_size, hidden_dim * 2],序列长度、批次大小和双向 RNN 的隐藏状态维度(2 倍的 hidden_dim)# bsz: 当前batch的大小# view: [self.num_layers, 2, bsz, -1], 2表示RNN双向的两个方向,-1 表示自动计算的隐藏状态维度hidden_dim# transpose: [self.num_layers, bsz, 2, -1]# 调用 contiguous() 来确保张量在内存中的布局是连续的out = outs.view(self.num_layers, 2, bsz, -1).transpose(1, 2).contiguous()# 将 out 重新调整为形状 [self.num_layers, bsz, hidden_dim * 2],即将双向的两个隐藏状态拼接在一起,成为一个新的隐藏状态张量。# 这里 -1 表示自动计算合并后的隐藏状态维度,等于 hidden_dim * 2。return out.view(self.num_layers, bsz, -1)# 执行编码器的前向传播,处理输入的 token 序列并生成输出def forward(self, src_tokens, **unused):bsz, seqlen = src_tokens.size()# get embedding 获取输入token的嵌入向量,并进行dropout操作x = self.embed_tokens(src_tokens)x = self.dropout_in_module(x)# [batch_size, sequence_length, hidden_dim] -> [sequence_length,batch_size,hidden_dim]# B x T x C -> T x B x Cx = x.transpose(0, 1)# pass thru bidirectional RNN# 初始化GRU的隐藏状态h0[2*num_layers,batch_size,hidden_dim]h0 = x.new_zeros(2 * self.num_layers, bsz, self.hidden_dim)x, final_hiddens = self.rnn(x, h0)outputs = self.dropout_out_module(x)# outputs = [sequence len, batch size, hid dim * directions]# hidden = [num_layers * directions, batch size , hid dim]# Since Encoder is bidirectional, we need to concatenate the hidden states of two directionsfinal_hiddens = self.combine_bidir(final_hiddens, bsz)# hidden = [num_layers , batch , num_directions*hidden]encoder_padding_mask = src_tokens.eq(self.padding_idx).t()return tuple((outputs, # seq_len , batch , hiddenfinal_hiddens, # num_layers , batch , num_directions*hiddenencoder_padding_mask, # seq_len , batch))def reorder_encoder_out(self, encoder_out, new_order):# This is used by fairseq's beam search. How and why is not particularly important here.return tuple((encoder_out[0].index_select(1, new_order), # outputsencoder_out[1].index_select(1, new_order), # final_hiddensencoder_out[2].index_select(1, new_order), # encoder_padding_mask))
解码器
根据编码器的输出生成目标序列,分为训练阶段和推理阶段,略有不同
- 训练阶段(teaching forcing)
在训练阶段,解码器知道整个目标序列,它使用前一个正确的 token(即目标序列的上一个 token)作为当前时间步的输入。这种方式称为 Teacher Forcing
-
输入:
- 初始输入:在序列开始时,解码器通常会接收到一个特殊的开始标记(如 ,表示 “Beginning of Sequence”)作为输入,是目标序列中上一个时间步的实际token ID,形状是(target_seq_len, batch_size)。可以稳定训练,加速收敛,因为训练早期模型生成的token可能不准确,通过使用实际的目标 token 作为输入,可以让模型在训练时保持在正确的轨道上,学习更稳定。能更快地学会生成目标序列的模式,训练过程更快收敛。
-
嵌入层:
解码器的每个输入 token(包括 和前一个时间步的输出 token)都会通过嵌入层转换成嵌入向量(target_seq_len, batch_size, embed_dim)。embed_dim 是嵌入向量的维度。 -
序列模型:
在每个时间步接收嵌入向量和隐藏状态,生成当前时间步的输出和更新的隐藏状态。- 隐藏状态:解码器会在每个时间步更新它的隐藏状态,这个隐藏状态将在下一个时间步作为输入的一部分。解码器的初始隐藏状态通常是由编码器的最终隐藏状态传递过来的。在双向 RNN 结构中,这个隐藏状态可以是编码器的最后一层前向和后向隐藏状态的拼接。
- RNN、LSTM、GRU:
- 形状:(num_layers, batch_size, hidden_dim)
- num_layers 是 RNN 层的数量。
- hidden_dim 是隐藏状态的维度。
- Transformer:
- (seq_len, batch_size, embed_dim)
- RNN、LSTM、GRU:
- 隐藏状态:解码器会在每个时间步更新它的隐藏状态,这个隐藏状态将在下一个时间步作为输入的一部分。解码器的初始隐藏状态通常是由编码器的最终隐藏状态传递过来的。在双向 RNN 结构中,这个隐藏状态可以是编码器的最后一层前向和后向隐藏状态的拼接。
-
注意力机制(可选):如果使用注意力机制,解码器还会基于编码器的输出和当前的隐藏状态计算注意力权重,以对编码器的隐藏状态进行加权求和。这有助于生成时更好地关注输入序列的相关部分。
-
输出:
- 生成的token:
- 预测的token概率分布:解码器的最后一层通常是一个全连接层,用于将隐藏状态映射到词汇表中的每个词的概率分布,(target_seq_len, batch_size, vocab_size),vocab_size 是词汇表的大小
- 最终生成的 token: 是根据这个概率分布选取的。解码器会在每个时间步生成一个 token,直到生成一个结束标记(如 ,表示 “End of Sequence”)或者达到最大长度,这个输出与实际目标序列的 token 进行比较,以计算损失,在训练时,目标序列通常包括 ,以帮助模型学习生成结束标记
- 隐藏状态
- 生成的token:
-
步骤:
-
初始化:用编码器的最终隐藏状态初始化解码器的隐藏状态,并输入 作为第一个 token。
-
每个时间步:
- 输入目标序列的上一个 token 以及当前隐藏状态到解码器。
- 解码器输出当前时间步的预测 token。
- 计算损失:将解码器的输出与实际目标序列的当前 token 进行比较,并计算损失。
-
更新:使用损失反向传播更新模型参数。
-
- 推理阶段
在推理阶段,解码器并不知道目标序列。它使用自己上一步生成的 token 作为当前时间步的输入,逐步生成整个序列。解码器通常是一个 token 一个 token 地进行输入和输出的
-
步骤:
- 初始化:与训练阶段相同,解码器的隐藏状态用编码器的最终隐藏状态初始化,并输入 作为第一个 token。
- 每个时间步:
- 使用解码器在前一个时间步生成的 token 作为当前时间步的输入。
- 解码器输出当前时间步的预测 token。
- 将预测 token 作为下一个时间步的输入。
- 如果生成了 ,则终止解码;否则继续。
- 输出:最终解码器生成的 token 序列作为输出序列。
-
输入:
- 在推理的开始阶段,解码器的输入通常是一个特殊的起始标记(),表示序列的开始。
- 形状:(1, batch_size),其中 1 是时间步的数量(在初始阶段只有一个 token),batch_size 是批处理的大小。
-
嵌入向量:(1, batch_size, embed_dim)
-
生成token概率分布:
解码器生成一个 token 的概率分布,这个概率分布表示当前时间步每个词汇的概率。(1, batch_size, vocab_size) -
更新输入,将生成的token作为下一个时间步的输入,经过嵌入曾,再次生成token分布,一直充分生成token,直到生成 或达到最大长度
class RNNDecoder(FairseqIncrementalDecoder):def __init__(self, args, dictionary, embed_tokens):super().__init__(dictionary)self.embed_tokens = embed_tokens# 解码器和编码器的层数必须相同assert args.decoder_layers == args.encoder_layers, f"""seq2seq rnn requires that encoder and decoder have same layers of rnn. got: {args.encoder_layers, args.decoder_layers}"""# 解码器的隐藏层维度必须是编码器隐藏层维度的两倍,因为在许多的seq2seq模型中,编码器的输出可能是双向的(双向GRU或LSTM)assert args.decoder_ffn_embed_dim == args.encoder_ffn_embed_dim*2, f"""seq2seq-rnn requires that decoder hidden to be 2*encoder hidden dim. got: {args.decoder_ffn_embed_dim, args.encoder_ffn_embed_dim*2}"""self.embed_dim = args.decoder_embed_dim # 解码器的嵌入维度self.hidden_dim = args.decoder_ffn_embed_dim # 解码器RNN的hidden layers维度self.num_layers = args.decoder_layers # 解码器RNN的层数self.dropout_in_module = nn.Dropout(args.dropout)self.rnn = nn.GRU(self.embed_dim, self.hidden_dim, self.num_layers, dropout=args.dropout, batch_first=False, bidirectional=False)self.attention = AttentionLayer(self.embed_dim, self.hidden_dim, self.embed_dim, bias=False) # self.attention = Noneself.dropout_out_module = nn.Dropout(args.dropout)if self.hidden_dim != self.embed_dim:self.project_out_dim = nn.Linear(self.hidden_dim, self.embed_dim)else:self.project_out_dim = Noneif args.share_decoder_input_output_embed:self.output_projection = nn.Linear(self.embed_tokens.weight.shape[1],self.embed_tokens.weight.shape[0],bias=False,)self.output_projection.weight = self.embed_tokens.weightelse:self.output_projection = nn.Linear(self.output_embed_dim, len(dictionary), bias=False)nn.init.normal_(self.output_projection.weight, mean=0, std=self.output_embed_dim ** -0.5)def forward(self, prev_output_tokens, encoder_out, incremental_state=None, **unused):# extract the outputs from encoderencoder_outputs, encoder_hiddens, encoder_padding_mask = encoder_out# outputs: seq_len x batch x num_directions*hidden# encoder_hiddens: num_layers x batch x num_directions*encoder_hidden# padding_mask: seq_len x batchif incremental_state is not None and len(incremental_state) > 0:# if the information from last timestep is retained, we can continue from there instead of starting from bosprev_output_tokens = prev_output_tokens[:, -1:]cache_state = self.get_incremental_state(incremental_state, "cached_state")prev_hiddens = cache_state["prev_hiddens"]else:# incremental state does not exist, either this is training time, or the first timestep of test time# prepare for seq2seq: pass the encoder_hidden to the decoder hidden statesprev_hiddens = encoder_hiddensbsz, seqlen = prev_output_tokens.size()# embed tokensx = self.embed_tokens(prev_output_tokens)x = self.dropout_in_module(x)# B x T x C -> T x B x Cx = x.transpose(0, 1)# decoder-to-encoder attentionif self.attention is not None:x, attn = self.attention(x, encoder_outputs, encoder_padding_mask)# pass thru unidirectional RNNx, final_hiddens = self.rnn(x, prev_hiddens)# outputs = [sequence len, batch size, hid dim]# hidden = [num_layers * directions, batch size , hid dim]x = self.dropout_out_module(x)# project to embedding size (if hidden differs from embed size, and share_embedding is True, # we need to do an extra projection)if self.project_out_dim != None:x = self.project_out_dim(x)# project to vocab sizex = self.output_projection(x)# T x B x C -> B x T x Cx = x.transpose(1, 0)# if incremental, record the hidden states of current timestep, which will be restored in the next timestepcache_state = {"prev_hiddens": final_hiddens,}self.set_incremental_state(incremental_state, "cached_state", cache_state)return x, Nonedef reorder_incremental_state(self,incremental_state,new_order,):# This is used by fairseq's beam search. How and why is not particularly important here.cache_state = self.get_incremental_state(incremental_state, "cached_state")prev_hiddens = cache_state["prev_hiddens"]prev_hiddens = [p.index_select(0, new_order) for p in prev_hiddens]cache_state = {"prev_hiddens": torch.stack(prev_hiddens),}self.set_incremental_state(incremental_state, "cached_state", cache_state)return