Машинный перевод, задача автоматического перевода текста с одного языка на другой, значительно продвинулся в последние годы. Одним из широко распространенных подходов к машинному переводу является модель «последовательность к последовательности» (Seq2Seq) с механизмом внимания (и в настоящее время Transformer доминирует над всем). Эта архитектура продемонстрировала замечательную производительность при захвате контекстной информации исходного языка и создании плавных и точных переводов. Однако понимание сложных деталей входных и выходных измерений в этой модели часто может быть сложной задачей для начинающих. В этой статье мы внимательно погрузимся в мир моделей Seq2Seq и исследуем сложности, связанные с входными и выходными измерениями.
В этой статье мы сосредоточимся на конкретном проекте машинного перевода, целью которого является создание модели перевода, преобразующей английские предложения в немецкие. Архитектура нашей модели состоит из двунаправленного GRU (Gated Recurrent Unit), выступающего в качестве кодировщика и собирающего контекстную информацию как в прямом, так и в обратном направлении. Для декодера мы используем однонаправленный GRU для генерации переведенного вывода. Чтобы облегчить процесс обучения, мы используем концепцию принуждения учителей, которая включает в себя использование целевых токенов достоверности в качестве входных данных во время обучения. Этот метод помогает стабилизировать процесс обучения и ускорить конвергенцию. Кроме того, мы включаем дополнительный механизм внимания, позволяющий модели сосредоточиться на соответствующих частях исходного предложения при создании перевода. Если вы новичок в этом, не волнуйтесь, эта статья именно для вас!
ЧАСТЬ I. Загрузка основных библиотек и предварительная обработка данных для повышения качества перевода
Как только мы закончим этот раздел, наши данные будут полностью подготовлены. Чтобы лучше понять, что будет дальше, давайте рассмотрим пример. В наших подготовленных данных каждое целое число соответствует индексу в нашем словаре. Эти индексы действуют как мост, соединяющий наши слова с их числовыми представлениями. Они играют жизненно важную роль в процессе перевода, позволяя нам преобразовывать лингвистическую информацию в формат, с которым наша модель может эффективно работать. Незнакомы со словом «словарь»? Ничего страшного, мы пройдем через это.
«Будь милым». -› [будь, милым, . ]-›[‹SOS›, be, nice, ., ‹EOS›]-> тензоры выше.
%%capture !python -m spacy download en !python -m spacy download de !pip install spacy import os import re import time import math import random import unicodedata import numpy as np import pandas as pd from tqdm import tqdm import spacy from sklearn.model_selection import train_test_split import torch from torch import nn, optim from torch.nn.utils.rnn import pad_sequence from torch.utils.data import DataLoader, Dataset import torch.nn.functional as F
Давайте засеем все, используя ответ на главный вопрос жизни, вселенной и всего — 42!
SEED = 42 random.seed(SEED) np.random.seed(SEED) torch.manual_seed(SEED) torch.cuda.manual_seed(SEED) torch.backends.cudnn.deterministic = True # create our train_df and valid_df train_df, valid_df = train_test_split(data_df, test_size = 0.1, shuffle = True,random_state = 42) train_df = train_df.reset_index(drop = True) valid_df = valid_df.reset_index(drop = True) # reference to: https://zhuanlan.zhihu.com/p/93029007 def unicode_to_ascii(s): ''' take input as a sentence, then return the sentence after fillterd all accent marks or diacritical marks unicodedata.normalize('NFD', s) can split à into a and ' ''' return ''.join(c for c in unicodedata.normalize('NFD', s) if unicodedata.category(c) != 'Mn') def preprocess_sentence(sen): #Convert the sentence to lower case, remove the first space and remove the diacritical marks sen = unicode_to_ascii(sen.lower().strip()) # Add (?, . , ! and ¿) with a space before and after sen = re.sub(r"([?.!,¿])", r" \1 ", sen) # replace everything other than [^a-zA-Z?.,!¿] with space sen = re.sub(r"[^a-zA-Z?.,!¿]+", " ", sen) sen = sen.strip() return sen """ class vocabulary mathed 1: tokenize(self, text) Take input as a sentence and return the tokens in list method 2: build_dictionary(self, sequence_list) Take input as a list of sentences, then build 2 dictionaries, 1 for word2idx and 1 for idx2word these two dictionaries save the vacabulary only if when frequence of the word > threshold method 3: numericalize(self, text) Take input as a sentence, return the list contain correponding idx of the dictionary of word2idx """ class Vocabulary: def __init__(self, freq_threshold = 2, language='en_core_web_sm', preprocessor = None, reverse = False): self.idx2word = {0:"<pad>", 1:"<SOS>", 2:"<EOS>", 3:"<UNK>"} self.word2idx = {"<pad>":0, "<SOS>":1, "<EOS>":2, "<UNK>":3} self.tokenizer = spacy.load(language) self.freq_threshold = freq_threshold self.language = language self.preprocessor = preprocessor self.reverse = reverse def __len__(self): return len(self.idx2word) def tokenize(self, text): if self.reverse: return [token.text.lower() for token in self.tokenizer(text)][::-1] else: return [token.text.lower() for token in self.tokenizer(text)] def build_dictionary(self, sequence_list): frequency = {} counter = 4 for seq in sequence_list: if self.preprocessor: seq = self.preprocessor(seq) for word in self.tokenize(seq): if word in frequency: frequency[word] += 1 else: frequency[word] = 1 if frequency[word] == self.freq_threshold: self.word2idx[word] = counter self.idx2word[counter] = word counter += 1 def numericalize(self, text): tokenized_text = self.tokenize(text) return [self.word2idx[token] if token in self.word2idx else self.word2idx["<UNK>"] for token in tokenized_text] en_vocab = Vocabulary(freq_threshold=2, preprocessor=preprocess_sentence, reverse=False) en_vocab.build_dictionary(train_df["en"].tolist()) de_vocab = Vocabulary(freq_threshold=2, language="de_core_news_sm", preprocessor=preprocess_sentence, reverse=False) de_vocab.build_dictionary(train_df['de'].tolist())
Суммируя все вышесказанное:
- Мы определяем функцию для удаления всех знаков ударения или диакритических знаков в предложении. Мы называем это unicode_to_ascii.
- Определите функцию Preprocess_sentencen(sentence), принимайте входные данные как предложение. Пример: «Мне действительно нравится глубокое обучение! ǎ ’ —› ‘ мне действительно нравится глубокое обучение! а’
- Создайте класс Vocabulary, содержащий три метода: ···Метод 1··· tokenize(self, text). Примите ввод как предложение и верните токены в список. ···Метод 2··· build_dictionary(self, sequence_list). Возьмите ввод в виде списка предложений, затем создайте 2 словаря, 1 для word2idx и 1 для idx2word. Эти два словаря сохраняют слова только в том случае, если частота слова › пороговая. ···Метод 3··· numericalize(self, text). Принять ввод как предложение, вернуть список, содержащий соответствующий idx словаря word2idx.
- Вызов метода build_dictionary в классе словаря для создания словарей word2idx и idx2word для английского и немецкого языков.
Помните, что мы стремимся передать предложение в тензор pytorch с помощью загрузчика данных. Урок словарного запаса будет очень полезен. На данный момент мы успешно создали словари пословного индекса и индексного словаря. Теперь приступим к следующему шагу: подготовке данных для загрузчика данных.
from numpy.lib.arraypad import pad from pandas.core.arrays import numeric #Create Dataset and Dataloader #Here we define our own custom collate function, TO SAVE MEMORY AND BE EFFICIENT! #We will pad batch by batch class customed_collect: def __init__(self, pad_idx): self.pad_idx = pad_idx def __call__(self, batch): source = [item[0] for item in batch] #We set batch_first = False here because the input of the model should be every ith word of the sequence(THIS MAKES MORE SENSE) source = pad_sequence(source, batch_first = False, padding_value=self.pad_idx) target = [item[1] for item in batch] target = pad_sequence(target, batch_first = False, padding_value= self.pad_idx) return source,target class dataset(Dataset): def __init__(self, df, en_vocab, de_vocab): super().__init__() self.en_vocab = en_vocab self.de_vocab = de_vocab self.df = df def __len__(self): return len(self.df) def _get_numericalized(self, sentence, vocab): """ Add <SOS> and <EOS> in the begin and the end of the encoded setence Input as a sentence Output as a list of integers(index) """ numericalized = [vocab.word2idx["<SOS>"]] numericalized.extend(vocab.numericalize(sentence)) numericalized.append(vocab.word2idx["<EOS>"]) return numericalized def __getitem__(self, index): en_numericalized = self._get_numericalized(self.df.iloc[index]["en"], self.en_vocab) de_numericalized = self._get_numericalized(self.df.iloc[index]["de"], self.de_vocab) return torch.tensor(de_numericalized), torch.tensor(en_numericalized)
В этом разделе мы представляем нашу собственную пользовательскую функцию сопоставления. Его цель состоит в том, чтобы эффективно дополнять предложения в каждом пакете, тем самым оптимизируя использование памяти и общую эффективность. Дополняя предложения пакет за пакетом, мы гарантируем, что они имеют одинаковую длину, что обеспечивает бесшовную обработку в рамках нашей модели. Такой подход не только улучшает использование памяти, но и упрощает параллельные вычисления, что приводит к более быстрому и эффективному обучению или выводу.
Ссылка на пользовательскую функцию сопоставления: understanding-collate-fn-in-pytorch-f9d1742647d3
train_dataset = dataset(train_df, en_vocab, de_vocab) train_loader = DataLoader( dataset=train_dataset, batch_size=128, shuffle=False, collate_fn=customed_collect(pad_idx=en_vocab.word2idx["<pad>"]) ) valid_dataset = dataset(valid_df, en_vocab, de_vocab) valid_loader = DataLoader( dataset=valid_dataset, batch_size=256, shuffle=False, collate_fn=customed_collect(pad_idx=en_vocab.word2idx["<pad>"]) )
ЧАСТЬ II. Определение кодировщика, внимания и декодера.
Во-первых, мы создадим кодировщик. Здесь мы используем двунаправленный GRU. С двунаправленным GRU у нас есть две RNN на каждом уровне. прямой GRU, проходящий по встроенному предложению слева направо (показан ниже зеленым), и обратный RNN, проходящий по встроенному предложению справа налево (бирюзовый). Все, что нам нужно сделать в коде, это установить bidirectional = True
, а затем передать встроенное предложение в RNN, как и раньше.
мы передаем только ввод (embedded
) в RNN, который сообщает PyTorch инициализировать как прямые, так и обратные начальные скрытые состояния (h0→ и h0 ← соответственно) тензором всех нулей. Мы также получим два вектора контекста: один от прямого RNN после того, как он увидит последнее слово в предложении, z→ = hT→, и один от обратного RNN после того, как он увидит первое слово в предложении, z ← = hT ← (см. график).
''' We are using Bidirectional GRU here!! hidden is of size [n layers * num directions, batch size, hid dim], where [-2, :, :] gives the top layer forward RNN hidden state after the final time-step (i.e. after it has seen the last word in the sentence) and [-1, :, :] gives the top layer backward RNN hidden state after the final time-step (i.e. after it has seen the first word in the sentence). ''' class Encoder(nn.Module): def __init__(self, input_dim, emb_dim, hidden_dim, n_layers = 1, dropout = 0.2): super().__init__() self.hidden_dim = hidden_dim self.n_layers = n_layers self.embedding = nn.Embedding(input_dim, emb_dim) self.gru = nn.GRU(emb_dim, hidden_dim, n_layers, bidirectional=True, dropout=0.0 if n_layers==1 else dropout) self.fc = nn.Linear(2*hidden_dim, hidden_dim) self.dropout = nn.Dropout(dropout) def forward(self, sentence): #sentence is of size [max_length, batch_size] embedded = self.dropout(self.embedding(sentence)) #embedded is of size [max_length, batch_size, embeded_dim] # we need the last hidden state,make it as the input of decoder output, hidden = self.gru(embedded) #output is of size [max_length, batch_size, hidden_size * num of directions] #hidden if of size [num of directions * num of layers, batch_size, hidden_size] hidden = torch.tanh(self.fc(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1))) # Apply tanh here to prevent gradient vanishing / exploding # output is of size [max_length, batch_size, decoder_hidden_dim] # hidden is of size [batch_size, decoder_hidden_dim] return output, hidden
Визуализация размеров:
ДОПОЛНИТЕЛЬНЫЙ МЕХАНИЗМ ВНИМАНИЯ
Ключевая идея аддитивного внимания состоит в том, чтобы вычислить показатель релевантности между скрытым состоянием декодера и выходом кодировщика на каждом временном шаге. Эта оценка представляет важность или вес внимания, присвоенный каждому элементу во входной последовательности.
# Here used Additive Attention! class Attention(nn.Module): def __init__(self, hidden_dim): super().__init__() self.attention = nn.Linear(3*hidden_dim, hidden_dim) self.v = nn.Linear(hidden_dim, 1, bias=False) def forward(self, hidden_state, encoder_outputs): # hidden_state has size of (batch_size, hidden_dim) # encoder_outputs is of size (max_length, batch_size, hidden_) batch_size = encoder_outputs.shape[1] src_len = encoder_outputs.shape[0] # repeat decoder hidden state src_len times # unsqueeze(1) -> make (batch_size, hidden_dim) to (batch_size, 1, hidden_dim) # repeat(1, sec_len, 1) -> make(batch_size, 1, hidden_dim) to (batch_size, sec_len, hidden_dim) hidden_state = hidden_state.unsqueeze(1).repeat(1, src_len, 1) # (src_len, batch_size, hidden_dim*2) -> (batch_size, src_len, hidden_dim*2) encoder_outputs = encoder_outputs.permute(1, 0, 2) energy = torch.tanh(self.attention(torch.cat((hidden_state, encoder_outputs), dim = 2))) attention = self.v(energy).squeeze(2) return F.softmax(attention, dim=1)
ОПРЕДЕЛИТЬ ДЕКОДЕР
Слой внимания декодера использует предыдущее скрытое состояние и все скрытые состояния кодировщика для вычисления вектора внимания. Этот вектор используется для создания взвешенного исходного вектора, который объединяется с входным словом и предыдущим скрытым состоянием для обновления скрытого состояния декодера. Обновленное скрытое состояние вместе с входным словом и взвешенным исходным вектором передается через линейный слой для предсказания следующего слова. Этот механизм внимания позволяет декодеру сосредоточиться на релевантной информации для точного перевода.
class Decoder(nn.Module): def __init__(self, output_dim, emb_dim, hidden_dim, n_layers, attention, dropout=0.2): super().__init__() self.output_dim = output_dim self.hidden_dim = hidden_dim self.attention = attention self.n_layers = n_layers self.embedding = nn.Embedding(output_dim, emb_dim) self.gru = nn.GRU(emb_dim+(hidden_dim*2), hidden_dim, n_layers, dropout=0.0 if n_layers==1 else dropout) self.dropout = nn.Dropout(dropout) self.fc = nn.Linear(hidden_dim*3 + emb_dim, output_dim) def forward(self, input, hidden, encoder_outputs): input = input.unsqueeze(0) #input = [1, batch size] embedded = self.dropout(self.embedding(input)) #embedded = [1, batch size, emb dim] a = self.attention(hidden, encoder_outputs) #a = [batch size, src len] a = a.unsqueeze(1) #a = [batch size, 1, src len] encoder_outputs = encoder_outputs.permute(1, 0, 2) #encoder_outputs = [batch size, src len, enc hid dim * 2] weighted = torch.bmm(a, encoder_outputs) #weighted = [batch size, 1, enc hid dim * 2] weighted = weighted.permute(1, 0, 2) #weighted = [1, batch size, enc hid dim * 2] rnn_input = torch.cat((embedded, weighted), dim = 2) #rnn_input = [1, batch size, (enc hid dim * 2) + emb dim] output, hidden = self.gru(rnn_input, hidden.unsqueeze(0)) #output = [seq len, batch size, dec hid dim * n directions] #hidden = [n layers * n directions, batch size, dec hid dim] #seq len, n layers and n directions will always be 1 in this decoder, therefore: #output = [1, batch size, dec hid dim] #hidden = [1, batch size, dec hid dim] #this also means that output == hidden assert (output == hidden).all() embedded = embedded.squeeze(0) output = output.squeeze(0) weighted = weighted.squeeze(0) prediction = self.fc(torch.cat((output, weighted, embedded), dim = 1)) #prediction = [batch size, output dim] return prediction, hidden.squeeze(0)
Основная идея та же, что и раньше, я просто напишу основные шаги:
1. ['<SOS>'] is the first input of our decoder. The next input determined by teacher-forcing 2. We first unsqueeze it into the size of (1, batch_size) 3. And Then send it into embeded layer, get embeded 4. Calculate the attention weight(score), represent as a. 5. unsqueeze a in dimension 1, of size (batch_size, 1, max_length) 6. Permute the encoder output from (0,1,2) to (1,0,2), sat P 7. Multiply a with P, get the weighted(hidden of the decoder) 8. torch.cat the embeded and weighted along dimension = 2, this is called rnn_input 9. get output, hidden = self.gru(rnn_input, hidden.unsqueeze(0)), hidden is from the encoder output. 10. create a fully connected layer to do the prediction
ОБЪЕДИНИТЕ ИХ ВМЕСТЕ
Выход декодера имеет размер (batch_size, output_dim), обратите внимание, output_dim здесь равен размеру нашего немецкого словаря. Как только мы получим вывод кодировщика, мы можем использовать argmax для получения индекса наибольшего значения по размерности = 1 (размерность категории). Это будет нашим прогнозом перевода.
class EncoderDecoder(nn.Module): def __init__(self, encoder, decoder): super().__init__() self.encoder = encoder self.decoder = decoder assert self.encoder.hidden_dim == decoder.hidden_dim assert self.encoder.n_layers == decoder.n_layers def forward(self, x, y, teacher_forcing_ratio=0.75): target_len = y.shape[0] batch_size = y.shape[1] target_vocab_size = self.decoder.output_dim # Output dim outputs = torch.zeros(target_len, batch_size, target_vocab_size).to(device) # Encode the source text using encoder. Last hidden state of encoder is context vector. encoder_outputs, hidden_state = self.encoder(x) # First input is <sos> input = y[0,:] # Decode the encoded vector using decoder for t in range(1, target_len): output, hidden_state = self.decoder(input, hidden_state, encoder_outputs) outputs[t] = output teacher_force = random.random() < teacher_forcing_ratio pred = output.argmax(1) input = y[t] if teacher_force else pred return outputs
ЧАСТЬ III: ОБУЧЕНИЕ И ОЦЕНКА
Часть кодирования не сложна, поэтому я оставлю ее вам. Тем не менее, я все же хотел бы объяснить основную часть
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') device # Initialize all models input_dim = len(de_vocab) output_dim = len(en_vocab) emb_dim = 256 hidden_dim = 512 n_layers = 1 dropout = 0.5 attention = Attention(hidden_dim) encoder = Encoder(input_dim, emb_dim, hidden_dim, n_layers, dropout) decoder = Decoder(output_dim, emb_dim, hidden_dim, n_layers, attention, dropout) model = EncoderDecoder(encoder, decoder).to(device) def init_weights(m): for name, param in m.named_parameters(): if 'weight' in name: nn.init.normal_(param.data, mean=0, std=0.01) else: nn.init.constant_(param.data, 0) model.apply(init_weights) optimizer = optim.Adam(model.parameters()) criterion = nn.CrossEntropyLoss(ignore_index=en_vocab.word2idx["<pad>"]) def train(model, iterator, optimizer, criterion, clip): model.train() epoch_loss = 0 for i, batch in tqdm(enumerate(iterator), total=len(iterator), position=0, leave=True): src = batch[0].to(device) trg = batch[1].to(device) optimizer.zero_grad() output = model(src, trg) #trg = [trg len, batch size] #output = [trg len, batch size, output dim] output_dim = output.shape[-1] output = output[1:].view(-1, output_dim) trg = trg[1:].view(-1) #trg = [(trg len - 1) * batch size] #output = [(trg len - 1) * batch size, output dim] loss = criterion(output, trg) loss.backward() torch.nn.utils.clip_grad_norm_(model.parameters(), clip) optimizer.step() epoch_loss += loss.item() return epoch_loss / len(iterator) def evaluate(model, iterator, criterion): model.eval() epoch_loss = 0 with torch.no_grad(): for i, batch in tqdm(enumerate(iterator), total=len(iterator), position=0, leave=True): src = batch[0].to(device) trg = batch[1].to(device) output = model(src, trg, 0) #turn off teacher forcing #trg = [trg len, batch size] #output = [trg len, batch size, output dim] output_dim = output.shape[-1] output = output[1:].view(-1, output_dim) trg = trg[1:].view(-1) #trg = [(trg len - 1) * batch size] #output = [(trg len - 1) * batch size, output dim] loss = criterion(output, trg) epoch_loss += loss.item() return epoch_loss / len(iterator) N_EPOCHS = 10 CLIP = 1 best_valid_loss = float('inf') for epoch in range(N_EPOCHS): start_time = time.time() train_loss = train(model, train_loader, optimizer, criterion, CLIP) valid_loss = evaluate(model, valid_loader, criterion) end_time = time.time() epoch_mins, epoch_secs = epoch_time(start_time, end_time) if valid_loss < best_valid_loss: best_valid_loss = valid_loss torch.save(model.state_dict(), 'best_model.pt') print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s') print(f'\t Train Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}') print(f'\t Val. Loss: {valid_loss:.3f} | Val. PPL: {math.exp(valid_loss):7.3f}')
На протяжении всей этой статьи мы внимательно изучали мир моделей Seq2Seq, стремясь пролить свет на сложности, связанные с входными и выходными измерениями. Изучая фундаментальные концепции и методы, мы надеемся получить более четкое представление о том, как модели Seq2Seq с вниманием работают в задачах машинного перевода.