下面将详细介绍如何使用 Python 和深度学习框架 PyTorch,结合大规模平行语料库(以 WMT 英德数据集为例)实现一个基于注意力机制的神经机器翻译模型。
1. 安装必要的库
首先,确保你已经安装了以下库:
pip install torch torchtext spacy
python -m spacy download en_core_web_sm
python -m spacy download de_core_news_sm
2. 代码实现
import torch
import torch.nn as nn
import torch.optim as optim
from torchtext.datasets import Multi30k
from torchtext.data import Field, BucketIterator
import spacy
import random
import math
import time# 设置随机种子以保证结果可复现
SEED = 1234
random.seed(SEED)
torch.manual_seed(SEED)
torch.backends.cudnn.deterministic = True# 加载英语和德语的分词器
spacy_de = spacy.load('de_core_news_sm')
spacy_en = spacy.load('en_core_web_sm')# 定义分词函数
def tokenize_de(text):return [tok.text for tok in spacy_de.tokenizer(text)]def tokenize_en(text):return [tok.text for tok in spacy_en.tokenizer(text)]# 定义字段
SRC = Field(tokenize = tokenize_de,init_token = '<sos>',eos_token = '<eos>',lower = True)TRG = Field(tokenize = tokenize_en,init_token = '<sos>',eos_token = '<eos>',lower = True)# 加载数据集
train_data, valid_data, test_data = Multi30k.splits(exts = ('.de', '.en'),fields = (SRC, TRG))# 构建词汇表
SRC.build_vocab(train_data, min_freq = 2)
TRG.build_vocab(train_data, min_freq = 2)# 定义设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')# 创建迭代器
BATCH_SIZE = 128
train_iterator, valid_iterator, test_iterator = BucketIterator.splits((train_data, valid_data, test_data),batch_size = BATCH_SIZE,device = device)# 定义编码器
class Encoder(nn.Module):def __init__(self, input_dim, emb_dim, enc_hid_dim, dec_hid_dim, dropout):super().__init__()self.embedding = nn.Embedding(input_dim, emb_dim)self.rnn = nn.GRU(emb_dim, enc_hid_dim, bidirectional = True)self.fc = nn.Linear(enc_hid_dim * 2, dec_hid_dim)self.dropout = nn.Dropout(dropout)def forward(self, src):embedded = self.dropout(self.embedding(src))outputs, hidden = self.rnn(embedded)hidden = torch.tanh(self.fc(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1)))return outputs, hidden# 定义注意力机制
class Attention(nn.Module):def __init__(self, enc_hid_dim, dec_hid_dim):super().__init__()self.attn = nn.Linear((enc_hid_dim * 2) + dec_hid_dim, dec_hid_dim)self.v = nn.Parameter(torch.rand(dec_hid_dim))stdv = 1. / math.sqrt(self.v.size(0))self.v.data.uniform_(-stdv, stdv)def forward(self, hidden, encoder_outputs):batch_size = encoder_outputs.shape[1]src_len = encoder_outputs.shape[0]hidden = hidden.unsqueeze(1).repeat(1, src_len, 1)encoder_outputs = encoder_outputs.permute(1, 0, 2)energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim = 2)))energy = energy.permute(0, 2, 1)v = self.v.repeat(batch_size, 1).unsqueeze(1)attention = torch.bmm(v, energy).squeeze(1)return torch.softmax(attention, dim = 1)# 定义解码器
class Decoder(nn.Module):def __init__(self, output_dim, emb_dim, enc_hid_dim, dec_hid_dim, dropout, attention):super().__init__()self.output_dim = output_dimself.attention = attentionself.embedding = nn.Embedding(output_dim, emb_dim)self.rnn = nn.GRU((enc_hid_dim * 2) + emb_dim, dec_hid_dim)self.fc_out = nn.Linear((enc_hid_dim * 2) + dec_hid_dim + emb_dim, output_dim)self.dropout = nn.Dropout(dropout)def forward(self, input, hidden, encoder_outputs):input = input.unsqueeze(0)embedded = self.dropout(self.embedding(input))a = self.attention(hidden, encoder_outputs)a = a.unsqueeze(1)encoder_outputs = encoder_outputs.permute(1, 0, 2)weighted = torch.bmm(a, encoder_outputs)weighted = weighted.permute(1, 0, 2)rnn_input = torch.cat((embedded, weighted), dim = 2)output, hidden = self.rnn(rnn_input, hidden.unsqueeze(0))assert (output == hidden).all()embedded = embedded.squeeze(0)output = output.squeeze(0)weighted = weighted.squeeze(0)prediction = self.fc_out(torch.cat((output, weighted, embedded), dim = 1))return prediction, hidden.squeeze(0)# 定义序列到序列模型
class Seq2Seq(nn.Module):def __init__(self, encoder, decoder, device):super().__init__()self.encoder = encoderself.decoder = decoderself.device = devicedef forward(self, src, trg, teacher_forcing_ratio = 0.5):batch_size = src.shape[1]trg_len = trg.shape[0]trg_vocab_size = self.decoder.output_dimoutputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)encoder_outputs, hidden = self.encoder(src)input = trg[0,:]for t in range(1, trg_len):output, hidden = self.decoder(input, hidden, encoder_outputs)outputs[t] = outputteacher_force = random.random() < teacher_forcing_ratiotop1 = output.argmax(1)input = trg[t] if teacher_force else top1return outputs# 初始化模型
INPUT_DIM = len(SRC.vocab)
OUTPUT_DIM = len(TRG.vocab)
ENC_EMB_DIM = 256
DEC_EMB_DIM = 256
ENC_HID_DIM = 512
DEC_HID_DIM = 512
ENC_DROPOUT = 0.5
DEC_DROPOUT = 0.5attn = Attention(ENC_HID_DIM, DEC_HID_DIM)
enc = Encoder(INPUT_DIM, ENC_EMB_DIM, ENC_HID_DIM, DEC_HID_DIM, ENC_DROPOUT)
dec = Decoder(OUTPUT_DIM, DEC_EMB_DIM, ENC_HID_DIM, DEC_HID_DIM, DEC_DROPOUT, attn)model = Seq2Seq(enc, dec, device).to(device)# 定义优化器和损失函数
optimizer = optim.Adam(model.parameters())
TRG_PAD_IDX = TRG.vocab.stoi[TRG.pad_token]
criterion = nn.CrossEntropyLoss(ignore_index = TRG_PAD_IDX)# 训练模型
def train(model, iterator, optimizer, criterion, clip):model.train()epoch_loss = 0for i, batch in enumerate(iterator):src = batch.srctrg = batch.trgoptimizer.zero_grad()output = model(src, trg)output_dim = output.shape[-1]output = output[1:].view(-1, output_dim)trg = trg[1:].view(-1)loss = criterion(output, trg)loss.backward()torch.nn.utils.clip_grad_norm_(model.parameters(), clip)optimizer.step()epoch_loss += loss.item()return epoch_loss / len(iterator)# 评估模型
def evaluate(model, iterator, criterion):model.eval()epoch_loss = 0with torch.no_grad():for i, batch in enumerate(iterator):src = batch.srctrg = batch.trgoutput = model(src, trg, 0)output_dim = output.shape[-1]output = output[1:].view(-1, output_dim)trg = trg[1:].view(-1)loss = criterion(output, trg)epoch_loss += loss.item()return epoch_loss / len(iterator)# 训练模型
N_EPOCHS = 10
CLIP = 1
best_valid_loss = float('inf')for epoch in range(N_EPOCHS):start_time = time.time()train_loss = train(model, train_iterator, optimizer, criterion, CLIP)valid_loss = evaluate(model, valid_iterator, criterion)end_time = time.time()epoch_mins, epoch_secs = divmod(end_time - start_time, 60)if valid_loss < best_valid_loss:best_valid_loss = valid_losstorch.save(model.state_dict(), 'tut3-model.pt')print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs:.2f}s')print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')print(f'\t Val. Loss: {valid_loss:.3f} | Val. PPL: {math.exp(valid_loss):7.3f}')# 在测试集上评估模型
model.load_state_dict(torch.load('tut3-model.pt'))
test_loss = evaluate(model, test_iterator, criterion)
print(f'| Test Loss: {test_loss:.3f} | Test PPL: {math.exp(test_loss):7.3f} |')
3. 代码解释
- 数据预处理:
- 使用
torchtext
加载 WMT 英德平行语料库。 - 定义分词函数和字段,构建词汇表。
- 使用
BucketIterator
创建数据迭代器。
- 使用
- 模型构建:
- 编码器:使用双向 GRU 处理输入序列,并将最后一个隐藏状态映射到解码器的隐藏状态维度。
- 注意力机制:计算解码器隐藏状态与编码器输出之间的注意力权重。
- 解码器:结合注意力权重和嵌入输入,使用 GRU 生成输出序列。
- 序列到序列模型:将编码器和解码器组合在一起,实现整个翻译过程。
- 训练和评估:
- 使用 Adam 优化器和交叉熵损失函数进行训练。
- 在训练过程中使用教师强制策略,以提高模型的学习效率。
- 在验证集上评估模型,并保存最佳模型。
- 最后在测试集上评估模型的性能。
4. 注意事项
- 此代码示例可以在 GPU 上运行,前提是你的机器上安装了 CUDA 并且 PyTorch 支持 CUDA。
- 训练时间可能较长,你可以根据需要调整训练轮数和模型参数。
- 可以进一步优化模型,如使用更复杂的注意力机制、更大的模型容量等,以提高翻译质量。