Машинный перевод, задача автоматического перевода текста с одного языка на другой, значительно продвинулся в последние годы. Одним из широко распространенных подходов к машинному переводу является модель «последовательность к последовательности» (Seq2Seq) с механизмом внимания (и в настоящее время Transformer доминирует над всем). Эта архитектура продемонстрировала замечательную производительность при захвате контекстной информации исходного языка и создании плавных и точных переводов. Однако понимание сложных деталей входных и выходных измерений в этой модели часто может быть сложной задачей для начинающих. В этой статье мы внимательно погрузимся в мир моделей Seq2Seq и исследуем сложности, связанные с входными и выходными измерениями.

В этой статье мы сосредоточимся на конкретном проекте машинного перевода, целью которого является создание модели перевода, преобразующей английские предложения в немецкие. Архитектура нашей модели состоит из двунаправленного GRU (Gated Recurrent Unit), выступающего в качестве кодировщика и собирающего контекстную информацию как в прямом, так и в обратном направлении. Для декодера мы используем однонаправленный GRU для генерации переведенного вывода. Чтобы облегчить процесс обучения, мы используем концепцию принуждения учителей, которая включает в себя использование целевых токенов достоверности в качестве входных данных во время обучения. Этот метод помогает стабилизировать процесс обучения и ускорить конвергенцию. Кроме того, мы включаем дополнительный механизм внимания, позволяющий модели сосредоточиться на соответствующих частях исходного предложения при создании перевода. Если вы новичок в этом, не волнуйтесь, эта статья именно для вас!

ЧАСТЬ I. Загрузка основных библиотек и предварительная обработка данных для повышения качества перевода

Как только мы закончим этот раздел, наши данные будут полностью подготовлены. Чтобы лучше понять, что будет дальше, давайте рассмотрим пример. В наших подготовленных данных каждое целое число соответствует индексу в нашем словаре. Эти индексы действуют как мост, соединяющий наши слова с их числовыми представлениями. Они играют жизненно важную роль в процессе перевода, позволяя нам преобразовывать лингвистическую информацию в формат, с которым наша модель может эффективно работать. Незнакомы со словом «словарь»? Ничего страшного, мы пройдем через это.

«Будь милым». -› [будь, милым, . ]-›[‹SOS›, be, nice, ., ‹EOS›]-> тензоры выше.

%%capture
!python -m spacy download en
!python -m spacy download de
!pip install spacy

import os
import re
import time
import math
import random
import unicodedata

import numpy as np
import pandas as pd
from tqdm import tqdm
import spacy
from sklearn.model_selection import train_test_split
import torch
from torch import nn, optim
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader, Dataset
import torch.nn.functional as F

Давайте засеем все, используя ответ на главный вопрос жизни, вселенной и всего — 42!

SEED = 42

random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True
# create our train_df and valid_df
train_df, valid_df = train_test_split(data_df, test_size = 0.1, shuffle = True,random_state = 42)

train_df = train_df.reset_index(drop = True)
valid_df = valid_df.reset_index(drop = True)


# reference to: https://zhuanlan.zhihu.com/p/93029007
def unicode_to_ascii(s):
  '''
  take input as a sentence, then return the sentence after
  fillterd all accent marks or diacritical marks
  unicodedata.normalize('NFD', s) can split à into a and '
  '''
  return ''.join(c for c in unicodedata.normalize('NFD', s) if unicodedata.category(c) != 'Mn')

def preprocess_sentence(sen):
  #Convert the sentence to lower case, remove the first space and remove the diacritical marks
  sen = unicode_to_ascii(sen.lower().strip()) 
  # Add (?, . , ! and ¿) with a space before and after
  sen = re.sub(r"([?.!,¿])", r" \1 ", sen) 
  # replace everything other than [^a-zA-Z?.,!¿] with space
  sen = re.sub(r"[^a-zA-Z?.,!¿]+", " ", sen) 
  sen = sen.strip() 

  return sen

"""
class vocabulary

mathed 1: tokenize(self, text)
Take input as a sentence and return the tokens in list

method 2: build_dictionary(self, sequence_list)
Take input as a list of sentences, then build 2 dictionaries, 1 for word2idx and 1 for idx2word
these two dictionaries save the vacabulary only if when frequence of the word > threshold

method 3: numericalize(self, text)
Take input as a sentence, return the list contain correponding idx of the dictionary of word2idx
"""

class Vocabulary:
  def __init__(self, freq_threshold = 2, language='en_core_web_sm', preprocessor = None, reverse = False):
    self.idx2word = {0:"<pad>", 1:"<SOS>", 2:"<EOS>", 3:"<UNK>"}
    self.word2idx = {"<pad>":0, "<SOS>":1, "<EOS>":2, "<UNK>":3}
    self.tokenizer = spacy.load(language)
    self.freq_threshold = freq_threshold
    self.language = language
    self.preprocessor = preprocessor
    self.reverse = reverse

  def __len__(self):
    return len(self.idx2word)

  def tokenize(self, text):
    if self.reverse:
      return [token.text.lower() for token in self.tokenizer(text)][::-1]
    else:
      return [token.text.lower() for token in self.tokenizer(text)]

  def build_dictionary(self, sequence_list):
    frequency = {}
    counter = 4
    for seq in sequence_list:
      if self.preprocessor:
        seq = self.preprocessor(seq)

      for word in self.tokenize(seq):
        if word in frequency:
          frequency[word] += 1
        else:
          frequency[word] = 1

        if frequency[word] == self.freq_threshold:
          self.word2idx[word] = counter
          self.idx2word[counter] = word
          counter += 1

  def numericalize(self, text):
    tokenized_text = self.tokenize(text)

    return [self.word2idx[token] if token in self.word2idx else self.word2idx["<UNK>"] for token in tokenized_text]
en_vocab = Vocabulary(freq_threshold=2, preprocessor=preprocess_sentence, reverse=False)
en_vocab.build_dictionary(train_df["en"].tolist())

de_vocab = Vocabulary(freq_threshold=2, language="de_core_news_sm", preprocessor=preprocess_sentence, reverse=False)
de_vocab.build_dictionary(train_df['de'].tolist())

Суммируя все вышесказанное:

  1. Мы определяем функцию для удаления всех знаков ударения или диакритических знаков в предложении. Мы называем это unicode_to_ascii.
  2. Определите функцию Preprocess_sentencen(sentence), принимайте входные данные как предложение. Пример: «Мне действительно нравится глубокое обучение! ǎ ’ —› ‘ мне действительно нравится глубокое обучение! а’
  3. Создайте класс Vocabulary, содержащий три метода: ···Метод 1··· tokenize(self, text). Примите ввод как предложение и верните токены в список. ···Метод 2··· build_dictionary(self, sequence_list). Возьмите ввод в виде списка предложений, затем создайте 2 словаря, 1 для word2idx и 1 для idx2word. Эти два словаря сохраняют слова только в том случае, если частота слова › пороговая. ···Метод 3··· numericalize(self, text). Принять ввод как предложение, вернуть список, содержащий соответствующий idx словаря word2idx.
  4. Вызов метода build_dictionary в классе словаря для создания словарей word2idx и idx2word для английского и немецкого языков.

Помните, что мы стремимся передать предложение в тензор pytorch с помощью загрузчика данных. Урок словарного запаса будет очень полезен. На данный момент мы успешно создали словари пословного индекса и индексного словаря. Теперь приступим к следующему шагу: подготовке данных для загрузчика данных.

from numpy.lib.arraypad import pad
from pandas.core.arrays import numeric
#Create Dataset and Dataloader
#Here we define our own custom collate function, TO SAVE MEMORY AND BE EFFICIENT!
#We will pad batch by batch

class customed_collect:
  def __init__(self, pad_idx):
    self.pad_idx = pad_idx

  def __call__(self, batch):
    source = [item[0] for item in batch]
    #We set batch_first = False here because the input of the model should be every ith word of the sequence(THIS MAKES MORE SENSE)
    source = pad_sequence(source, batch_first = False, padding_value=self.pad_idx) 
    target = [item[1] for item in batch]
    target = pad_sequence(target, batch_first = False, padding_value= self.pad_idx)

    return source,target



class dataset(Dataset):
  def __init__(self, df, en_vocab, de_vocab):
    super().__init__()
    self.en_vocab = en_vocab
    self.de_vocab = de_vocab
    self.df = df


  def __len__(self):
    return len(self.df)



  def _get_numericalized(self, sentence, vocab):

        """
        Add <SOS> and <EOS> in the begin and the end of the encoded setence
        Input as a sentence
        Output as a list of integers(index)
        """
        numericalized = [vocab.word2idx["<SOS>"]]
        numericalized.extend(vocab.numericalize(sentence))
        numericalized.append(vocab.word2idx["<EOS>"])
        return numericalized

  def __getitem__(self, index):
        en_numericalized = self._get_numericalized(self.df.iloc[index]["en"], self.en_vocab)
        de_numericalized = self._get_numericalized(self.df.iloc[index]["de"], self.de_vocab)

        return torch.tensor(de_numericalized), torch.tensor(en_numericalized)

В этом разделе мы представляем нашу собственную пользовательскую функцию сопоставления. Его цель состоит в том, чтобы эффективно дополнять предложения в каждом пакете, тем самым оптимизируя использование памяти и общую эффективность. Дополняя предложения пакет за пакетом, мы гарантируем, что они имеют одинаковую длину, что обеспечивает бесшовную обработку в рамках нашей модели. Такой подход не только улучшает использование памяти, но и упрощает параллельные вычисления, что приводит к более быстрому и эффективному обучению или выводу.

Ссылка на пользовательскую функцию сопоставления: understanding-collate-fn-in-pytorch-f9d1742647d3

train_dataset = dataset(train_df, en_vocab, de_vocab)
train_loader = DataLoader(
    dataset=train_dataset,
    batch_size=128,
    shuffle=False,
    collate_fn=customed_collect(pad_idx=en_vocab.word2idx["<pad>"])
)

valid_dataset = dataset(valid_df, en_vocab, de_vocab)
valid_loader = DataLoader(
    dataset=valid_dataset,
    batch_size=256,
    shuffle=False,
    collate_fn=customed_collect(pad_idx=en_vocab.word2idx["<pad>"])
)

ЧАСТЬ II. Определение кодировщика, внимания и декодера.

Во-первых, мы создадим кодировщик. Здесь мы используем двунаправленный GRU. С двунаправленным GRU у нас есть две RNN на каждом уровне. прямой GRU, проходящий по встроенному предложению слева направо (показан ниже зеленым), и обратный RNN, проходящий по встроенному предложению справа налево (бирюзовый). Все, что нам нужно сделать в коде, это установить bidirectional = True, а затем передать встроенное предложение в RNN, как и раньше.

мы передаем только ввод (embedded) в RNN, который сообщает PyTorch инициализировать как прямые, так и обратные начальные скрытые состояния (h0→ и h0 ← соответственно) тензором всех нулей. Мы также получим два вектора контекста: один от прямого RNN после того, как он увидит последнее слово в предложении, z→ = hT→, и один от обратного RNN после того, как он увидит первое слово в предложении, z ← = hT ← (см. график).

'''
We are using Bidirectional GRU here!!
hidden is of size [n layers * num directions, batch size, hid dim],
where [-2, :, :] gives the top layer forward RNN hidden state after the final time-step
(i.e. after it has seen the last word in the sentence)
and [-1, :, :] gives the top layer backward RNN hidden state after the final time-step
(i.e. after it has seen the first word in the sentence).
'''

class Encoder(nn.Module):
  def __init__(self, input_dim, emb_dim, hidden_dim, n_layers = 1, dropout = 0.2):
    super().__init__()
    self.hidden_dim = hidden_dim
    self.n_layers = n_layers
    self.embedding = nn.Embedding(input_dim, emb_dim)
    self.gru = nn.GRU(emb_dim, hidden_dim, n_layers, bidirectional=True, dropout=0.0 if n_layers==1 else dropout)
    self.fc = nn.Linear(2*hidden_dim, hidden_dim)
    self.dropout = nn.Dropout(dropout)

  def forward(self, sentence):
    #sentence is of size [max_length, batch_size]
    embedded = self.dropout(self.embedding(sentence))

    #embedded is of size [max_length, batch_size, embeded_dim]
    # we need the last hidden state,make it as the input of decoder 
    output, hidden = self.gru(embedded)

    #output is of size [max_length, batch_size, hidden_size * num of directions]
    #hidden if of size [num of directions * num of layers, batch_size, hidden_size]

    hidden = torch.tanh(self.fc(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim = 1)))
    # Apply tanh here to prevent gradient vanishing / exploding
    # output is of size [max_length, batch_size, decoder_hidden_dim]
    # hidden is of size [batch_size, decoder_hidden_dim]

    return output, hidden

Визуализация размеров:

ДОПОЛНИТЕЛЬНЫЙ МЕХАНИЗМ ВНИМАНИЯ

Ключевая идея аддитивного внимания состоит в том, чтобы вычислить показатель релевантности между скрытым состоянием декодера и выходом кодировщика на каждом временном шаге. Эта оценка представляет важность или вес внимания, присвоенный каждому элементу во входной последовательности.

# Here used Additive Attention!
class Attention(nn.Module):
    def __init__(self, hidden_dim):
        super().__init__()
        self.attention = nn.Linear(3*hidden_dim, hidden_dim)
        self.v = nn.Linear(hidden_dim, 1, bias=False)

    def forward(self, hidden_state, encoder_outputs):
        # hidden_state has size of (batch_size, hidden_dim)
        # encoder_outputs is of size (max_length, batch_size, hidden_)
        batch_size = encoder_outputs.shape[1]
        src_len = encoder_outputs.shape[0]

        # repeat decoder hidden state src_len times
        # unsqueeze(1) -> make (batch_size, hidden_dim) to (batch_size, 1, hidden_dim)
        # repeat(1, sec_len, 1) -> make(batch_size, 1, hidden_dim) to (batch_size, sec_len, hidden_dim)
        hidden_state = hidden_state.unsqueeze(1).repeat(1, src_len, 1)

        # (src_len, batch_size, hidden_dim*2) -> (batch_size, src_len, hidden_dim*2)
        encoder_outputs = encoder_outputs.permute(1, 0, 2)

        energy = torch.tanh(self.attention(torch.cat((hidden_state, encoder_outputs), dim = 2)))

        attention = self.v(energy).squeeze(2)

        return F.softmax(attention, dim=1)

ОПРЕДЕЛИТЬ ДЕКОДЕР

Слой внимания декодера использует предыдущее скрытое состояние и все скрытые состояния кодировщика для вычисления вектора внимания. Этот вектор используется для создания взвешенного исходного вектора, который объединяется с входным словом и предыдущим скрытым состоянием для обновления скрытого состояния декодера. Обновленное скрытое состояние вместе с входным словом и взвешенным исходным вектором передается через линейный слой для предсказания следующего слова. Этот механизм внимания позволяет декодеру сосредоточиться на релевантной информации для точного перевода.

class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hidden_dim, n_layers, attention, dropout=0.2):
        super().__init__()
        self.output_dim = output_dim
        self.hidden_dim = hidden_dim
        self.attention = attention
        self.n_layers = n_layers
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.gru = nn.GRU(emb_dim+(hidden_dim*2), hidden_dim, n_layers, dropout=0.0 if n_layers==1 else dropout)
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_dim*3 + emb_dim, output_dim)

    def forward(self, input, hidden, encoder_outputs):
        input = input.unsqueeze(0)
        #input = [1, batch size]

        embedded = self.dropout(self.embedding(input))
        #embedded = [1, batch size, emb dim]

        a = self.attention(hidden, encoder_outputs)
        #a = [batch size, src len]

        a = a.unsqueeze(1)
        #a = [batch size, 1, src len]

        encoder_outputs = encoder_outputs.permute(1, 0, 2)
        #encoder_outputs = [batch size, src len, enc hid dim * 2]

        weighted = torch.bmm(a, encoder_outputs)
        #weighted = [batch size, 1, enc hid dim * 2]

        weighted = weighted.permute(1, 0, 2)
        #weighted = [1, batch size, enc hid dim * 2]

        rnn_input = torch.cat((embedded, weighted), dim = 2)
        #rnn_input = [1, batch size, (enc hid dim * 2) + emb dim]

        output, hidden = self.gru(rnn_input, hidden.unsqueeze(0))
        #output = [seq len, batch size, dec hid dim * n directions]
        #hidden = [n layers * n directions, batch size, dec hid dim]

        #seq len, n layers and n directions will always be 1 in this decoder, therefore:
        #output = [1, batch size, dec hid dim]
        #hidden = [1, batch size, dec hid dim]
        #this also means that output == hidden
        assert (output == hidden).all()

        embedded = embedded.squeeze(0)
        output = output.squeeze(0)
        weighted = weighted.squeeze(0)

        prediction = self.fc(torch.cat((output, weighted, embedded), dim = 1))

        #prediction = [batch size, output dim]

        return prediction, hidden.squeeze(0)

Основная идея та же, что и раньше, я просто напишу основные шаги:

1. ['<SOS>'] is the first input of our decoder. 
   The next input determined by teacher-forcing

2. We first unsqueeze it into the size of (1, batch_size)

3. And Then send it into embeded layer, get embeded

4. Calculate the attention weight(score), represent as a.

5. unsqueeze a in dimension 1, of size (batch_size, 1, max_length)

6. Permute the encoder output from (0,1,2) to (1,0,2), sat P

7. Multiply a with P, get the weighted(hidden of the decoder)

8. torch.cat the embeded and weighted along dimension = 2, this is called rnn_input

9. get output, hidden = self.gru(rnn_input, hidden.unsqueeze(0)), hidden 
   is from the encoder output.

10. create a fully connected layer to do the prediction

ОБЪЕДИНИТЕ ИХ ВМЕСТЕ

Выход декодера имеет размер (batch_size, output_dim), обратите внимание, output_dim здесь равен размеру нашего немецкого словаря. Как только мы получим вывод кодировщика, мы можем использовать argmax для получения индекса наибольшего значения по размерности = 1 (размерность категории). Это будет нашим прогнозом перевода.

class EncoderDecoder(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

        assert self.encoder.hidden_dim == decoder.hidden_dim
        assert self.encoder.n_layers == decoder.n_layers

    def forward(self, x, y, teacher_forcing_ratio=0.75):

        target_len = y.shape[0]
        batch_size = y.shape[1]
        target_vocab_size = self.decoder.output_dim  # Output dim

        outputs = torch.zeros(target_len, batch_size, target_vocab_size).to(device)

        # Encode the source text using encoder. Last hidden state of encoder is context vector.
        encoder_outputs, hidden_state = self.encoder(x)

        # First input is <sos>
        input = y[0,:]

        # Decode the encoded vector using decoder
        for t in range(1, target_len):
            output, hidden_state = self.decoder(input, hidden_state, encoder_outputs)
            outputs[t] = output
            teacher_force = random.random() < teacher_forcing_ratio
            pred = output.argmax(1)
            input = y[t] if teacher_force else pred

        return outputs

ЧАСТЬ III: ОБУЧЕНИЕ И ОЦЕНКА

Часть кодирования не сложна, поэтому я оставлю ее вам. Тем не менее, я все же хотел бы объяснить основную часть

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device
# Initialize all models
input_dim = len(de_vocab)
output_dim = len(en_vocab)
emb_dim = 256
hidden_dim = 512
n_layers = 1
dropout = 0.5

attention = Attention(hidden_dim)
encoder = Encoder(input_dim, emb_dim, hidden_dim, n_layers, dropout)
decoder = Decoder(output_dim, emb_dim, hidden_dim, n_layers, attention, dropout)
model = EncoderDecoder(encoder, decoder).to(device)


def init_weights(m):
    for name, param in m.named_parameters():
        if 'weight' in name:
            nn.init.normal_(param.data, mean=0, std=0.01)
        else:
            nn.init.constant_(param.data, 0)

model.apply(init_weights)

optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss(ignore_index=en_vocab.word2idx["<pad>"])
def train(model, iterator, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0

    for i, batch in tqdm(enumerate(iterator), total=len(iterator), position=0, leave=True):
        src = batch[0].to(device)
        trg = batch[1].to(device)

        optimizer.zero_grad()

        output = model(src, trg)

        #trg = [trg len, batch size]
        #output = [trg len, batch size, output dim]

        output_dim = output.shape[-1]
        output = output[1:].view(-1, output_dim)
        trg = trg[1:].view(-1)

        #trg = [(trg len - 1) * batch size]
        #output = [(trg len - 1) * batch size, output dim]

        loss = criterion(output, trg)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        epoch_loss += loss.item()

    return epoch_loss / len(iterator)
def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0

    with torch.no_grad():
        for i, batch in tqdm(enumerate(iterator), total=len(iterator), position=0, leave=True):
            src = batch[0].to(device)
            trg = batch[1].to(device)

            output = model(src, trg, 0) #turn off teacher forcing

            #trg = [trg len, batch size]
            #output = [trg len, batch size, output dim]

            output_dim = output.shape[-1]
            output = output[1:].view(-1, output_dim)
            trg = trg[1:].view(-1)

            #trg = [(trg len - 1) * batch size]
            #output = [(trg len - 1) * batch size, output dim]

            loss = criterion(output, trg)
            epoch_loss += loss.item()

    return epoch_loss / len(iterator)
N_EPOCHS = 10
CLIP = 1

best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):

    start_time = time.time()

    train_loss = train(model, train_loader, optimizer, criterion, CLIP)
    valid_loss = evaluate(model, valid_loader, criterion)

    end_time = time.time()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)

    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'best_model.pt')

    print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s')
    print(f'\t Train Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. PPL: {math.exp(valid_loss):7.3f}')

На протяжении всей этой статьи мы внимательно изучали мир моделей Seq2Seq, стремясь пролить свет на сложности, связанные с входными и выходными измерениями. Изучая фундаментальные концепции и методы, мы надеемся получить более четкое представление о том, как модели Seq2Seq с вниманием работают в задачах машинного перевода.