안녕하세요 '코딩 오페라'입니다.현재 저는 '초등학생도 이해하는 자연어 처리'라는 주제로 자연어 처리(NLP)에 대해 포스팅을 하고 있습니다. 제목처럼 진짜 핵심 내용을 쉽게 설명하는 것을 목표로 하고 있으니 자연어 처리(NLP)에 입문하고 싶은 분들은 많은 관심 부탁드립니다.
오늘은 저번에 알아본 'Transformer'를 하버드에서 구현한 python code를 리뷰해드리고자 합니다. 만약 아직 Transformer의 개념에 대해 모르시는 분들을 아래글들을 먼저 읽고 오시기 바랍니다.(안 그러시면 이해가 1도 안되실 겁니다 TT.)
https://codingopera.tistory.com/41
https://codingopera.tistory.com/43
https://codingopera.tistory.com/44
https://codingopera.tistory.com/45?category=1094804
Tansformer의 구조
간단하게 복습을 해보자면 Transformer는 크게 Encoder와 Decoder로 이루어져 있고 각각은 Multi-Head Attention, Normalization(정규화), Residual Connection, Feed Forward network로 구성되어 있습니다. 또한 Input 부분에는 Embedding과 Positional Encoding으로 구성되어 있고, Output 부분은 Linear layer와 Softmax함수로 구성되어 있습니다.
지금부터 이러한 Transformer의 구조들을 python 코드로 하나씩 구현하여 알아보도록 하겠습니다. 코드는 하버드에서 제작한 Harvard Transformer Code를 참고하였습니다. 그런데 제가 실행해 보니 몇 가지 에러들이 있어 이를 디버깅하여 아래 colab에 저장하였으니, 코드를 참고하시기 바랍니다.
https://colab.research.google.com/drive/1-B8obSiAgAcq-VEJBVVzKIijySDRwm3k?usp=sharing
Harvard Transformer Code는 위와 같은 구조로 이루어져 있습니다. 좀 복잡합니다 ㅎㅎ. 그러나 포기하지 마세요, 저 코딩 오페라가 지금부터 하나씩 차근차근 설명해 드리겠습니다. 우선 요약을 하자면 다음과 같습니다.
- run_epoch: 모델의 학습을 돌림
- make_model: 모델을 만듦
- NoamOpt: 모델의 옵티마이저
- data_gen: 임의의 데이터 생성
- Batch: 데이터를 배치화하여 생성
- subsequent_mask: mask 생성
- Embeddings: 단어 및 문장 임베딩 생성
- EncoderDecoder: Encoder와 Decoder 생성
- Encoder: Encoder 생성
- Decoder: Decoder 생성
- EncoderLayer: Encoder를 구성하는 EncoderLayer 생성
- DecoderLayer: Decoder를 구성하는 DecoderLayer 생성
- clones: Layer를 복사
- LayerNorm: Layer Normalization
- SublayerConnection: Residual Connection
- MultiHeadedAttention: Multi-Head Attention
- attention: attention
- PositionwiseFeedForward: Feed Forward
- PositionalEncoding: Positional Encoding
- SimpleLossCompute: loss값을 통해 backpropagation진행
- loss: loss함수 생성
- Generator: Transformer의 output 단 Linear layer와 Softmax함수 생성
- LabelSmoothing: Label smoothing
라이브러리 불러오기
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import math, copy, time
from torch.autograd import Variable
import matplotlib.pyplot as plt
import seaborn
seaborn.set_context(context="talk")
%matplotlib inline
제일 먼저 위와 같이 numpy, torch, math 등 라이브러리를 불러와줍니다. Python에서는 import 함수를 통해 라이브러리를 손쉽게 불러올 수 있습니다. 만약 라이브러리가 불러와지지 않는다면, 본인의 컴퓨터 가상환경에 해당 라이브러리가 없는 것이므로, 설치를 해줘야 합니다. 이때 "!pip install 라이브러리_이름"코드를 입력하여 라이브러리를 가상환경에 설치해 줍니다. (e.g. numpy를 설치하고 싶다: !pip install numpy, torch를 설치하고 싶다: !pip install pytorch)
EncoderDecoder
class EncoderDecoder(nn.Module):
def __init__(self, encoder, decoder, src_embed, tgt_embed, generator):
super(EncoderDecoder, self).__init__()
self.encoder = encoder
self.decoder = decoder
self.src_embed = src_embed
self.tgt_embed = tgt_embed
self.generator = generator
def forward(self, src, tgt, src_mask, tgt_mask):
return self.decode(self.encode(src, src_mask), src_mask, tgt, tgt_mask)
def encode(self, src, src_mask):
return self.encoder(self.src_embed(src), src_mask)
def decode(self, memory, src_mask, tgt, tgt_mask):
return self.decoder(self.tgt_embed(tgt), memory, src_mask, tgt_mask)
Encoder와 Decoder를 설정해 주는 Class입니다. 위를 보시면 Class안의 encode, decode함수를 통해 Encoder와 Decoder를 만들어줍니다. 여기서 src는 번역을 하려는 대상(source)을 가리키고, tgt는 번역한 대상(target)을 의미합니다. 즉 src_embed, src_mask는 각각 source의 임베딩과 mask를 가리킵니다.(decoder도 마찬가지입니다.) 마지막으로 forward함수는 Encoder와 Decoder를 이어주는 연동단 역할을 합니다. 아직 잘 이해가 안 되신다고요? 앞으로 한 땀 한 땀 제가 이해시켜 드리겠습니다.
Generator
class Generator(nn.Module):
def __init__ (self, d_model, vocab):
super(Generator, self).__init__()
self.proj = nn.Linear(d_model, vocab)
def forward(self, x):
return F.log_softmax(self.proj(x), dim=-1)
Transformer 마지막단의 Linear layer과 Softmax 함수를 만들어주는 Class입니다. nn.Linear() 함수를 통해 Linear layer를 만들어주고, F.log_softmax() 함수를 통해 활성함수 Softmax 함수를 통과한 구조를 만들어줍니다.
clones
def clones(module, N):
return nn.ModuleList([copy.deepcopy(module) for _ in range(N)])
Neural network layer와 같은 module을 복제할 수 있는 함수입니다. 여기서는 copy.deepcopy()를 사용하여 복제를 진행합니다. 이 함수는 완벽히 다른 객체로 복제를 진행하여 유용하게 사용할 수 있습니다. 그럼 일반적인 copy와는 어떤 차이가 있을까요?
a = [1, 2, 3]
b = a
a[0] = 100
print(a)
print(b)
예를 들어 위와 같이 list a를 먼저 정의하고, b를 a로 정의했을 때 a = [100, 2, 3], b = [100, 2, 3]로 a와 b가 서로 연결되어 있어, 둘 중 하나를 변환하면 다른 객체 역시 동일하게 바뀝니다.
Encoder
class Encoder(nn.Module):
"Core encoder is a stack of N layers"
def __init__(self, layer, N):
super(Encoder, self).__init__()
self.layers = clones(layer, N)
self.norm = LayerNorm(layer.size)
def forward(self, x, mask):
"Pass the input (and mask) through each layer in turn."
for layer in self.layers:
x = layer(x, mask)
return self.norm(x)
Transformer의 Encoder 코드 구조입니다. Encoder layer를 받아와 layer 수 N만큼 clone함수를 통해 복사해 줍니다. 그리고 아래 forward함수를 통해 각 layer들을 거쳐서 마지막에 LayerNorm을 통해 정규화를 진행해 줍니다.
LayerNorm
class LayerNorm(nn.Module):
"Construct a layernorm module (See citation for details)."
def __init__(self, features, eps=1e-6):
super(LayerNorm, self).__init__()
self.a_2 = nn.Parameter(torch.ones(features))
self.b_2 = nn.Parameter(torch.zeros(features))
self.eps = eps
def forward(self, x):
mean = x.mean(-1, keepdim=True) # 텐서 x의 마지막차원(-1)을 따라 평균을 구하고 차원을 유지하는 함수
std = x.std(-1, keepdim=True)
return self.a_2 * (x - mean) / (std + self.eps) + self.b_2
Transformer에서 정규화 부분을 담당하는 LayerNorm 파트입니다. LayerNorm은 NN의 layer들의 평균값과 분산값을 동일하게 해 주어 각 layer들을 정규화해 주는 방법입니다. 공식은 아래와 같습니다. 여기서 E(x)는 layer의 평균값을, Var(x)은 분산을, $\epsilon$은 분모가 0이 되지 않게 하는 상수를, $\gamma$와 $\beta$는 각각 이를 학습하는 NN들입니다.
위 코드에서는 mean함수를 통해 layer의 평균을 구하고, std를 통해 표준편차를 구했습니다. $\epsilon$의 값이 작을 때 $\sqrt{Var + \epsilon} \approx std + \epsilon$ 이므로 코드에서는 $std + \epsilon$ 를 사용한 것으로 보입니다. 또한$\gamma$와 $\beta$는 self.a_2와 slef.b_2 layer을 이용했습니다.
SublayerConnection
class SublayerConnection(nn.Module):
"""
A residual connection followed by a layer norm.
Note for code simplicity the norm is first as opposed to last.
"""
def __init__(self, size, dropout):
super(SublayerConnection, self).__init__()
self.norm = LayerNorm(size)
self.dropout = nn.Dropout(dropout)
def forward(self, x, sublayer):
"Apply residual connection to any sublayer with the same size."
return x + self.dropout(sublayer(self.norm(x)))
이 부분은 Transformer 구조의 Add & Norm 부분입니다. 즉 잔차연결(Residual connection)과 정규화(Normalization) 부분입니다. SublayerConnection 클래스의 forward 함수 부분을 보면 처음 self.norm() 함수로 정규화를 진행하고 self.dropout() 함수를 이용해 dropout을 진행해 줍니다. 마지막으로는 이 값을 원래값인 x에 더해줌으로써 잔차연결을 진행하는 것을 알 수 있습니다.
EncoderLayer
class EncoderLayer(nn.Module):
"Encoder is made up of self-attn and feed forward (defined below)"
def __init__(self, size, self_attn, feed_forward, dropout):
super(EncoderLayer, self).__init__()
self.self_attn = self_attn
self.feed_forward = feed_forward
self.sublayer = clones(SublayerConnection(size, dropout), 2)
self.size = size
def forward(self, x, mask):
"Follow Figure 1 (left) for connections."
x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, mask))
return self.sublayer[1](x, self.feed_forward)
Encoder를 구성하는 layer입니다. __init__함수를 보시면 처음에 attention, feed forward를 받아와 Encoder구조를 만들고 forward단에서 실제 attention과 feed forward과정을 진행하는 모습을 볼 수 있습니다. 추가적으로 clones함수를 통해 SublayerConnection 즉 잔차연결(Residual connection) 부분을 만들어줍니다(Encoder는 Residual connection이 2개가 들어가므로 2개를 만들어줍니다.). forward 함수 부분에서 self.self_attn(x, x, x, mask)라고 되어있는데 이는 self.self_attn(Query, Key, Value, mask)를 나타냅니다. self attention을 구하려는 경우 Query = Key = Value 이기 때문에 이 값들이 모두 x로 동일한 것입니다.
Decoder
class Decoder(nn.Module):
"Generic N layer decoder with masking."
def __init__(self, layer, N):
super(Decoder, self).__init__()
self.layers = clones(layer, N)
self.norm = LayerNorm(layer.size)
def forward(self, x, memory, src_mask, tgt_mask):
for layer in self.layers:
x = layer(x, memory, src_mask, tgt_mask)
return self.norm(x)
Transformer의 Decoder 코드 구조입니다. Encoder와 동일하게, Decoder layer를 받아와 layer 수 N만큼 clone함수를 통해 복사해 줍니다. 그리고 아래 forward함수를 통해 각 layer들을 거쳐서 마지막에 LayerNorm을 통해 정규화를 진행해 줍니다. Encoder와 다른 점은, forward함수 입력 부분에 memory, src_mask, tgt_mask가 추가되었다는 것입니다. 여기서 src_ mask , tgt_mask는 각각 source와 target의 mask를 의미합니다. 또한 memory는 Encoder의 output을 의미합니다.(Encoder의 ouput과 Decoder의 중간단에서 attention을 진행하기 때문)
DecoderLayer
class DecoderLayer(nn.Module):
"Decoder is made of self-attn, src-attn, and feed forward (defined below)"
def __init__(self, size, self_attn, src_attn, feed_forward, dropout):
super(DecoderLayer, self).__init__()
self.size = size
self.self_attn = self_attn
self.src_attn = src_attn
self.feed_forward = feed_forward
self.sublayer = clones(SublayerConnection(size, dropout), 3)
def forward(self, x, memory, src_mask, tgt_mask):
"Follow Figure 1 (right) for connections."
m = memory
x = self.sublayer[0](x, lambda x: self.self_attn(x, x, x, tgt_mask))
x = self.sublayer[1](x, lambda x: self.src_attn(x, m, m, src_mask))
return self.sublayer[2](x, self.feed_forward)
Decoder를 구성하는 layer입니다. Encoder와 동일하게 attention, feed forward를 받아와 Decoder구조를 만들고 forward단에서 실제 attention과 feed forward과정을 진행하는 모습을 볼 수 있습니다. Encoder와 다른 점이 있다면, attention을 2번 진행한다는 점입니다. forward부분의 첫 번째 self_attn 함수는 self attention을 진행하는 부분으로 Query = Key = Value = x인 것을 확인할 수 있습니다. 그다음 src_attn(x, m, m)은 Encoder, Decoder attention으로 여기서 'm'은 위에서 언급했듯이 Encoder output을 의미합니다. 즉 Query = x, Key = Value = m을 의미합니다.
subsequent_mask
def subsequent_mask(size):
"Mask out subsequent positions."
attn_shape = (1, size, size)
subsequent_mask = torch.triu(torch.ones(attn_shape), diagonal=1).type(
torch.uint8
)
return subsequent_mask == 0
mask를 만드는 함수입니다. torch.triu() 함수는 삼각행렬을 만드는 함수로, 위 상삼각행렬은 '1'로, 하삼각행렬은 '0'으로 만들어줍니다. 확인을 위해 아래코드를 실행시켜 주면 다음과 같은 그래프가 나와 masking이 되는 것을 확인할 수 있습니다.
plt.figure(figsize=(5,5))
plt.imshow(subsequent_mask(20)[0])
attention
def attention(query, key, value, mask=None, dropout=None):
"Compute 'Scaled Dot Product Attention'"
d_k = query.size(-1)
scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
if mask is not None:
scores = scores.masked_fill(mask == 0, -1e9)
p_attn = scores.softmax(dim=-1)
if dropout is not None:
p_attn = dropout(p_attn)
return torch.matmul(p_attn, value), p_attn
Attention을 진행하는 부분입니다. attention은 아래식과 같이 구성되어 있는데, torch.matmul() 함수를 통해 QK^T를 구하고 softmax() 함수를 사용하는 것을 알 수 있습니다. 여기서 masked_fill() 함수는 데이터를 변환해 주는 함수로 masked_fill(mask==0, -1e9)는 '0'값을 '-1e9'로 변환해주는 것을 의미합니다. 이렇게 변환해주는 이유는 softmax함수를 통과한 값이 '0'이 되기 위함입니다.(softmax( -1e9 ) = 0)
return은 attention score와 Value값을 행렬곱한 attention과 attention score를 반환합니다.
MultiHeadedAttention
class MultiHeadedAttention(nn.Module):
def __init__(self, h, d_model, dropout=0.1):
"Take in model size and number of heads."
super(MultiHeadedAttention, self).__init__()
assert d_model % h == 0
# We assume d_v always equals d_k
self.d_k = d_model // h
self.h = h
self.linears = clones(nn.Linear(d_model, d_model), 4)
self.attn = None
self.dropout = nn.Dropout(p=dropout)
def forward(self, query, key, value, mask=None):
"Implements Figure 2"
if mask is not None:
# Same mask applied to all h heads.
mask = mask.unsqueeze(1)
nbatches = query.size(0)
# 1) Do all the linear projections in batch from d_model => h x d_k
query, key, value = [
lin(x).view(nbatches, -1, self.h, self.d_k).transpose(1, 2)
for lin, x in zip(self.linears, (query, key, value))
]
# 2) Apply attention on all the projected vectors in batch.
x, self.attn = attention(
query, key, value, mask=mask, dropout=self.dropout
)
# 3) "Concat" using a view and apply a final linear.
x = (
x.transpose(1, 2)
.contiguous()
.view(nbatches, -1, self.h * self.d_k)
)
del query
del key
del value
return self.linears[-1](x)
Multi-Head Attention입니다. Multi-Head Attention은 attention을 head 수만큼 분할하여 진행하는 것으로 위에서 정의했던 attention함수를 불러오고 head 수 h를 정의해 진행합니다. 이때 d_k는 분할된 head의 벡터크기로 원래 벡터크기 d_model을 head로 나눠준 것을 확인할 수 있습니다. 중간 mask.unsqueeze(1) 부분이 있는데, 이는 텐서의 차원을 계산이 용이하도록 맞춰주는 역할을 합니다. 참고로 squeeze()는 차원을 삭제하는 함수, unsqueeze()는 차원을 생성하는 함수입니다. 또한 contiguous()는 컴퓨터 메모리적으로 비연속적인 텐서를 연속적으로 만들어주어 연산이 가능하도록 하는 함수입니다. 제 생각에는 head의 수만큼 텐서를 나눠 비연속적으로 변환된 값을 다시 연속적으로 만들어주는 역할을 하는 것 같습니다. 마지막을 정해준 NN layer를 통화하여 정제된 Multi-Head Attention값을 반환합니다.
참고로 여러분들을 위해 제가 유튜브 영상도 제작했으니 많은 관심 부탁드리겠습니다. 아마 영상 보시고 따라하시는게 더 편하실 겁니다. 앞으로 꾸준히 영상을 업로드할 예정이니 구독 좋아요도 부탁드려요 😍😍
https://youtu.be/8m6d28zV-F8?si=sD7lfzStBdj6_W67
https://youtu.be/5FRPLOIfZic?si=YpeEC4YELaWzPlp6
PositionwiseFeedForward
class PositionwiseFeedForward(nn.Module):
"Implements FFN equation."
def __init__(self, d_model, d_ff, dropout=0.1):
super(PositionwiseFeedForward, self).__init__()
self.w_1 = nn.Linear(d_model, d_ff)
self.w_2 = nn.Linear(d_ff, d_model)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
return self.w_2(self.dropout(self.w_1(x).relu()))
Encoder와 Decoder에서 attention을 지나면 나오는 Feed Forward부분입니다. linear(선형) NN로 구성되어 있고 dropout도 진행해 줍니다.
Embeddings
class Embeddings(nn.Module):
def __init__(self, d_model, vocab):
super(Embeddings, self).__init__()
self.lut = nn.Embedding(vocab, d_model)
self.d_model = d_model
def forward(self, x):
return self.lut(x) * math.sqrt(self.d_model)
단어들의 벡터차원을 Transformer 모델의 차원인 $d_{model}$로 변환해 주는 역할을 하는 클래스입니다. 보시면 NN을 통해 변환을 해주고 마지막에 $\sqrt{d_{model}}$을 곱하여 값을 보정해 줍니다.
PositionalEncoding
class PositionalEncoding(nn.Module):
"Implement the PE function."
def __init__(self, d_model, dropout, max_len=5000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)
# Compute the positional encodings once in log space.
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len).unsqueeze(1)
div_term = torch.exp(
torch.arange(0, d_model, 2) * -(math.log(10000.0) / d_model)
)
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0)
self.register_buffer("pe", pe)
def forward(self, x):
x = x + self.pe[:, : x.size(1)].requires_grad_(False)
return self.dropout(x)
병렬처리로 인해 단어들의 위치를 모르는 Transformer 모델에게, 이를 알려주는 역할을 하는 클래스입니다. 우선 torch.arange(0, max_len) 함수를 통해 max length까지의 position(위치) 값들을 정의해 줍니다.(0, 1, 2, ... , max_len) 그다음 div_term을 통해 $10000^{2i/d_{model}}$을 정의하고 마지막으로 torch.sin()과 torch.cos() 함수를 통해 Positional Encoding을 정의해 줍니다. 이때 pe[:, 0::2]는 i가 2의 배수일 때 만을 선택하는 함수입니다. 때문에 i = 0, 2, 4, ... 일 때는 sin함수를, i = 1, 3, 5, ... 일때는 cos함수를 적용할 수 있습니다. 이를 그래프로 나타내어 확인하면 아래와 같습니다.
plt.figure(figsize=(15, 5))
pe = PositionalEncoding(20, 0)
y = pe.forward(Variable(torch.zeros(1, 100, 20)))
plt.plot(np.arange(100), y[0, :, 4:8].data.numpy())
plt.legend(["dim %d"%p for p in [4,5,6,7]])
make_model
def make_model(
src_vocab, tgt_vocab, N=6, d_model=512, d_ff=2048, h=8, dropout=0.1
):
"Helper: Construct a model from hyperparameters."
c = copy.deepcopy
attn = MultiHeadedAttention(h, d_model)
ff = PositionwiseFeedForward(d_model, d_ff, dropout)
position = PositionalEncoding(d_model, dropout)
model = EncoderDecoder(
Encoder(EncoderLayer(d_model, c(attn), c(ff), dropout), N),
Decoder(DecoderLayer(d_model, c(attn), c(attn), c(ff), dropout), N),
nn.Sequential(Embeddings(d_model, src_vocab), c(position)),
nn.Sequential(Embeddings(d_model, tgt_vocab), c(position)),
Generator(d_model, tgt_vocab),
)
# This was important from their code.
# Initialize parameters with Glorot / fan_avg.
for p in model.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
return model
전체적인 Transformer모델을 만들어주는 클래스입니다. 보시다시피 Multi-Head Attention을 이용한 attention, Feed Forward, Positional Encoding, Encoder, Decoder 등 이전에 정의해 주었던 클래스들을 모두 불러와 융합하여 최종적으로 model을 생성합니다.
Batch
class Batch:
"""Object for holding a batch of data with mask during training."""
def __init__(self, src, tgt=None, pad=2): # 2 = <blank>
self.src = src
self.src_mask = (src != pad).unsqueeze(-2)
if tgt is not None:
self.tgt = tgt[:, :-1]
self.tgt_y = tgt[:, 1:]
self.tgt_mask = self.make_std_mask(self.tgt, pad)
self.ntokens = (self.tgt_y != pad).data.sum()
@staticmethod
def make_std_mask(tgt, pad):
"Create a mask to hide padding and future words."
tgt_mask = (tgt != pad).unsqueeze(-2)
tgt_mask = tgt_mask & subsequent_mask(tgt.size(-1)).type_as(
tgt_mask.data
)
return tgt_mask
source와 target의 값, mask, 토큰수를 지정해 주는 클래스입니다. 이 중에서 make_std_mask함수를 이용해 target mask를 지정해 줍니다. 이때 target data가 '0', 즉 pad면 mask를 False로 두어 masking작업을 하지 않고, 반대로 '0'이 아니면 pad가 아닌 실제 단어이므로 masking을 True로 하여 Transformer모델의 학습을 진행합니다.
run_epoch
def run_epoch(data_iter, model, loss_compute):
"Standard Training and Logging Function"
start = time.time()
total_tokens = 0
total_loss = 0
tokens = 0
for i, batch in enumerate(data_iter):
out = model.forward(batch.src, batch.tgt,
batch.src_mask, batch.tgt_mask)
loss = loss_compute(out, batch.tgt_y, batch.ntokens)
total_loss += loss
total_tokens += batch.ntokens
tokens += batch.ntokens
if i % 50 == 1:
elapsed = time.time() - start
print("Epoch Step: %d Loss: %f Tokens per Sec: %f" %
(i, loss / batch.ntokens, tokens / elapsed))
start = time.time()
tokens = 0
return total_loss / total_tokens
정의된 Transformer모델에 데이터를 불러와 전체적으로 모델을 학습하게 하는 함수입니다. 50 epoch마다 loss와 토큰별 학습시간을 반환하는데, 이 값은 원하시는 데로 설정하시면 됩니다.
NoamOpt
class NoamOpt:
"Optim wrapper that implements rate."
def __init__(self, model_size, factor, warmup, optimizer):
self.optimizer = optimizer
self._step = 0
self.warmup = warmup
self.factor = factor
self.model_size = model_size
self._rate = 0
def step(self):
"Update parameters and rate"
self._step += 1
rate = self.rate()
for p in self.optimizer.param_groups:
p['lr'] = rate
self._rate = rate
self.optimizer.step()
def rate(self, step = None):
"Implement `lrate` above"
if step is None:
step = self._step
return self.factor * \
(self.model_size ** (-0.5) *
min(step ** (-0.5), step * self.warmup ** (-1.5)))
def get_std_opt(model):
return NoamOpt(model.src_embed[0].d_model, 2, 4000,
torch.optim.Adam(model.parameters(), lr=0, betas=(0.9, 0.98), eps=1e-9))
NoamOpt는 Transformer을 학습시킬 때 필요한 옵티마이저로 Adam 옵티마이저를 사용합니다. 여기서 step함수는 학습의 step을 진행시키고, rate함수는 self.warmup 변수를 이용해, 학습 learning rate를 조절해 줍니다. 여기서 self.warmup은 모델의 학습에서 준비과정(warmup)을 얼마나 진행할지 결정하는 변수로, 이 값이 크면 클수록 learning rate가 천천히 올라가 학습이 천천히 이루어지고, 작으면 작을수록 learning rate가 빠르게 올라가 학습이 빨리 이루어집니다. 아래 코드와 그래프를 통해 이를 확인하실 수 있습니다.
# Three settings of the lrate hyperparameters.
opts = [NoamOpt(512, 1, 4000, None),
NoamOpt(512, 1, 8000, None),
NoamOpt(256, 1, 4000, None)]
plt.plot(np.arange(1, 20000), [[opt.rate(i) for opt in opts] for i in range(1, 20000)])
plt.legend(["512:4000", "512:8000", "256:4000"])
LabelSmoothing
class LabelSmoothing(nn.Module):
"Implement label smoothing."
def __init__(self, size, padding_idx, smoothing=0.0):
super(LabelSmoothing, self).__init__()
self.criterion = nn.KLDivLoss(reduction="sum")
self.padding_idx = padding_idx
self.confidence = 1.0 - smoothing
self.smoothing = smoothing
self.size = size
self.true_dist = None
def forward(self, x, target):
assert x.size(1) == self.size
true_dist = x.data.clone()
true_dist.fill_(self.smoothing / (self.size - 2))
true_dist.scatter_(1, target.data.unsqueeze(1), self.confidence)
true_dist[:, self.padding_idx] = 0
mask = torch.nonzero(target.data == self.padding_idx)
if mask.dim() > 0:
true_dist.index_fill_(0, mask.squeeze(), 0.0)
self.true_dist = true_dist
return self.criterion(x, true_dist.clone().detach())
Label Smoothing 기법은 정답 라벨에 작은 값을 더하고, 나머지 라벨에 균일하게 작은 값을 분배하여 모델이 과도하게 확신하지 않도록 만드는 데 사용됩니다. 아래 코드와 그래프는 Label Smoothing 예시를 나타냅니다.
# Example of label smoothing.
crit = LabelSmoothing(5, 0, 0.4)
predict = torch.FloatTensor([[0, 0.2, 0.7, 0.1, 0],
[0, 0.2, 0.7, 0.1, 0],
[0, 0.2, 0.7, 0.1, 0]])
v = crit(Variable(predict.log()),
Variable(torch.LongTensor([2, 1, 0])))
# Show the target distributions expected by the system.
plt.imshow(crit.true_dist)
data_gen
def data_gen(V, batch, nbatches):
"Generate random data for a src-tgt copy task."
for i in range(nbatches):
data = torch.from_numpy(np.random.randint(1, V, size=(batch, 10)))
data[:, 0] = 1
src = Variable(data, requires_grad=False)
tgt = Variable(data, requires_grad=False)
yield Batch(src, tgt, 0)
Transformer 모델이 잘 돌아가는지 확인하기 위해 모의 데이터를 생성하는 함수입니다. 모의 데이터는 np.random.randint() 함수를 사용하여 랜덤한 데이터를 생성합니다. data_gen함수를 통해 생성된 데이터는 다음과 같은 코드를 통해 확인 가능합니다.
gener = data_gen(11, 30, 20)
for i, batch in enumerate(gener):
print(batch.src.shape)
print(batch.tgt.shape)
print(batch.src_mask.shape)
print(batch.tgt_mask.shape)
if i == 0:
break
torch.Size([30, 10])
torch.Size([30, 9])
torch.Size([30, 1, 10])
torch.Size([30, 9, 9])
SimpleLossCompute
class SimpleLossCompute:
"A simple loss compute and train function."
def __init__(self, generator, criterion, opt=None):
self.generator = generator
self.criterion = criterion
self.opt = opt
def __call__(self, x, y, norm):
x = self.generator(x)
loss = self.criterion(x.contiguous().view(-1, x.size(-1)), y.contiguous().view(-1)) / norm
loss.backward()
if self.opt is not None:
self.opt.step()
self.opt.optimizer.zero_grad()
return loss.data * norm
Transformer 모델의 전체적인 loss 함수를 정의 내리는 Class입니다. 도입부에서 self.generator, self.criterion, self.opt를 정의하는데, 각각 Generator(), LabelSmoothing(), NoamOpt() 함수를 통해 정의됩니다. 최종적으로 loss값을 반환합니다.
Training
# Train the simple copy task.
V = 11
criterion = LabelSmoothing(size=V, padding_idx=0, smoothing=0.0)
model = make_model(V, V, N=2)
model_opt = NoamOpt(model.src_embed[0].d_model, 1, 400,
torch.optim.Adam(model.parameters(), lr=0, betas=(0.9, 0.98), eps=1e-9))
for epoch in range(10):
model.train()
run_epoch(data_gen(V, 30, 20), model, SimpleLossCompute(model.generator, criterion, model_opt))
model.eval()
print(run_epoch(data_gen(V, 30, 5), model,
SimpleLossCompute(model.generator, criterion, None)))
이제 본격적으로 Transformer 모델을 학습시키겠습니다. 먼저 LabelSmoothing()을 통해 criterion을 정의하고, make_model()을 통해 model을, NoamOpt()을 이용해 model_opt를 정의합니다. 정의된 model은 run_epoch 함수를 통해 학습이 진행됩니다. 학습 step별로 loss 값과 학습시간이 print 되는데, 그 과정은 아래와 같습니다.
Epoch Step: 1 Loss: 2.800094 Tokens per Sec: 343.285950
Epoch Step: 1 Loss: 1.381659 Tokens per Sec: 589.220520
tensor(1.4438)
Epoch Step: 1 Loss: 1.348317 Tokens per Sec: 458.075989
Epoch Step: 1 Loss: 0.702469 Tokens per Sec: 587.778076
tensor(0.7410)
Epoch Step: 1 Loss: 0.766169 Tokens per Sec: 486.482666
Epoch Step: 1 Loss: 0.290603 Tokens per Sec: 527.829834
tensor(0.3223)
Epoch Step: 1 Loss: 0.369620 Tokens per Sec: 456.586304
Epoch Step: 1 Loss: 0.159743 Tokens per Sec: 552.635620
tensor(0.1716)
Epoch Step: 1 Loss: 0.186084 Tokens per Sec: 452.356049
Epoch Step: 1 Loss: 0.031851 Tokens per Sec: 579.222290
tensor(0.0368)
Epoch Step: 1 Loss: 0.041815 Tokens per Sec: 332.412354
Epoch Step: 1 Loss: 0.014791 Tokens per Sec: 351.623566
tensor(0.0079)
Epoch Step: 1 Loss: 0.013618 Tokens per Sec: 481.405365
Epoch Step: 1 Loss: 0.008430 Tokens per Sec: 529.480713
tensor(0.0064)
Epoch Step: 1 Loss: 0.015100 Tokens per Sec: 418.758514
Epoch Step: 1 Loss: 0.008935 Tokens per Sec: 568.575562
tensor(0.0367)
Epoch Step: 1 Loss: 0.096136 Tokens per Sec: 471.545715
Epoch Step: 1 Loss: 0.037642 Tokens per Sec: 533.718140
tensor(0.0470)
Epoch Step: 1 Loss: 0.064408 Tokens per Sec: 374.028656
Epoch Step: 1 Loss: 0.007073 Tokens per Sec: 421.778076
tensor(0.0118)
Greedy_decode
def greedy_decode(model, src, src_mask, max_len, start_symbol):
memory = model.encode(src, src_mask)
ys = torch.ones(1, 1).fill_(start_symbol).type_as(src.data)
for i in range(max_len-1):
out = model.decode(memory, src_mask,
Variable(ys),
Variable(subsequent_mask(ys.size(1))
.type_as(src.data)))
prob = model.generator(out[:, -1])
_, next_word = torch.max(prob, dim = 1)
next_word = next_word.data[0]
ys = torch.cat([ys,
torch.ones(1, 1).type_as(src.data).fill_(next_word)], dim=1)
return ys
model.eval()
src = Variable(torch.LongTensor([[1,2,3,4,5,6,7,8,9,10]]) )
src_mask = Variable(torch.ones(1, 1, 10) )
print(greedy_decode(model, src, src_mask, max_len=10, start_symbol=1))
마지막으로 decoding을 진행해 주는 greedy_decode Class를 설명하겠습니다. Transformer의 본래 목적은 언어 간 번역입니다. 즉 학습된 모델을 통해 실제 번역을 진행해 주는 클래스가 되겠습니다. 하단을 보시면 src가 있는데, 이 부분은 source로 번역을 할 대상입니다. 예를 들어 한영번역을 한다고 가정하면 한국어에 해당합니다.(숫자인 이유는 문자를 계산을 위해 이미 임베딩 했기 때문입니다.) 그러면 아래와 같이 decoding 된 결과가 나옵니다. 한영번영의 예로 설명을 하자면 이 결과는 번역된 영어입니다. 여기에 실제 번역에는 단어 간 embedding도 알고 있기 때문에, 이 숫자를 다시 문자로 변환이 가능합니다.
tensor([[ 1, 1, 1, 1, 1, 1, 1, 10, 1, 10]])
정리를 하자면 이번 Transformer 코드는 위오 같은 함수와 클래스들로 구성되어 있습니다. 정말 복잡하죠? 하지만 글을 계속해서 읽으시면서 코드를 실행하면서 분석하시면 이해가 되리라 믿습니다. 여러분들의 이해를 돕기 위해 유튜브 영상도 제작 예정이니 많은 관심 부탁드리겠습니다. 관련해 질문 및 궁금한 사항이 있으시면 언제든 댓글로 알려주시기 바랍니다.
추가적으로 제가 AI 학습 관련 오픈 카카오톡방을 만들었습니다. AI와 코딩 학습에 목말라 있으신 분들이 라면 누구나 들어와서 즐겁게 맘껏 정보공유와 AI 공부하시면 될 것 같습니다. 링크는 다음과 같이니 꼭 한번 참석 및 홍보 부탁드리겠습니다.
https://open.kakao.com/o/ggxse9sg
지금 까지 저희는 'Transformer의 python 코드'에 대해 알아보았습니다. 도움이 되셨나요? 만약 되셨다면 구독 및 좋아요로 표현해 주시면 정말 많은 힘이 됩니다. 궁금한 사항 혹은 앞으로 다루어 주었으면 좋을 주제가 있으시면 댓글 남겨주시면 감사하겠습니다. 저는 '코딩 오페라'였습니다.
Reference
https://nlp.seas.harvard.edu/2018/04/03/attention.html