딥러닝 이야기 / Transformer / 3. Transformer를 이용한 한국어 대화 챗봇

Transformer를 이용한 한국어 대화 챗봇

작성자: 여행 초짜
작성일: 2022.12.13

시작하기 앞서 틀린 부분이 있을 수 있으니, 틀린 부분이 있다면 지적해주시면 감사하겠습니다.

이전글에서는 transformer를 이용하여 기계 번역 모델 학습 코드 대해 설명하였습니다. 이번글에서는 transformer를 이용하여 한국어 대화 챗봇 모델을 학습해보겠습니다. 본 글에서 설명하는 코드는 기본적으로 학습을 진행할 때 모델 저장의 지표로 사용되는 BLEU-4 계산은 NLTK를 사용합니다.

학습에 사용한 데이터는 다음 카페 "사랑보다 아름다운 실연" 데이터를 사용하여 송영숙님께서 만든 챗봇 데이터를 사용합니다.

그리고 transformer에 대한 글은 Transformer (Attention Is All You Need)를 참고하시면 됩니다. 본 글에서 설명하는 transformer 구현 코드는 GitHub에 올려놓았으니 아래 링크를 참고하시기 바랍니다(본 글에서는 모델의 구현에 초점을 맞추고 있기 때문에 데이터 전처리, 토크나이저 학습 등 학습을 위한 전체 코드는 아래 GitHub 링크를 참고하시기 바랍니다).

오늘의 컨텐츠입니다. 오늘 사용하는 모델은 이전 기계 번역 학습했던 transformer 모델과 거의 동일합니다.

  1. Transformer 구현
  2. Transformer 학습
  3. Transformer 학습 결과

Transformer 기계 번역 모델

Transformer 구현

여기서는 transformer 구현 코드를 살펴보겠습니다. 먼저 encoder, decoder를 살펴보기 전에 embedding layer 코드부터 확인해보겠습니다. 한 줄씩 자세한 설명은 아래를 참고하시기 바랍니다.

Token Embedding & Positional Encoding (Embedding)

class Embeddings(nn.Module):
    def __init__(self, vocab_size, hidden_dim, pad_token_id):
        super(Embeddings, self).__init__()
        self.vocab_size = vocab_size
        self.hidden_dim = hidden_dim
        self.pad_token_id = pad_token_id
        self.emb_layer = nn.Embedding(self.vocab_size, self.hidden_dim, padding_idx=self.pad_token_id)


    def forward(self, x):
        output = self.emb_layer(x)
        return output



# positional encoding layer
class PositionalEncoding(nn.Module):
    def __init__(self, max_len, hidden_dim, pos_encoding, device):
        super(PositionalEncoding, self).__init__()
        self.max_len = max_len
        self.hidden_dim = hidden_dim
        self.pos_encoding = pos_encoding
        self.device = device

        self.pos = torch.arange(0, self.max_len)
        if self.pos_encoding:
            self.pe = torch.zeros(self.max_len, self.hidden_dim)
            for i in range(0, self.hidden_dim, 2):
                self.pe[:, i] = np.sin(self.pos/(10000**(i/self.hidden_dim)))
                self.pe[:, i+1] = np.cos(self.pos/(10000**(i/self.hidden_dim)))         
            self.pe = nn.Parameter(self.pe.unsqueeze(0), requires_grad=False)
        else:
            self.emb_layer = nn.Embedding(self.max_len, self.hidden_dim)


    def forward(self, x):
        if self.pos_encoding:
            return self.pe[:, :x.size(1)]
        return self.emb_layer(self.pos.unsqueeze(0).to(self.device))[:, :x.size(1)]

  • 4 ~ 7번째 줄: Token embbeding을 위해 필요한 파라미터 정의.
  • 10 ~ 12번째 줄: Token embedding을 지나는 부분.
  • 20 ~ 23번째 줄: Positional encoding을 위해 필요한 파라미터 정의.
  • 22번째 줄: pos_encoding이 1이면 positional encoding, 0이면 positional embedding을 수행.
  • 26 ~ 31번째 줄: sin, cos을 이용한 positional encoding 정의.
  • 32 ~ 33번째 줄: Positional embedding을 정의.
  • 36 ~ 39번째 줄: Positional Encoding (or positional embedding)을 거치는 부분.




이제는 multi-head attention 부분입니다.

Multi-head Attention

# mulithead attention
class MultiHeadAttention(nn.Module):
    def __init__(self, hidden_dim, num_head, bias, self_attn, causal):
        super(MultiHeadAttention, self).__init__()
        self.hidden_dim = hidden_dim
        self.num_head = num_head
        self.bias = bias
        self.self_attn = self_attn
        self.causal = causal
        self.head_dim = self.hidden_dim // self.num_head
        assert self.hidden_dim == self.num_head * self.head_dim

        self.q_proj = nn.Linear(self.hidden_dim, self.hidden_dim, bias=self.bias)
        self.k_proj = nn.Linear(self.hidden_dim, self.hidden_dim, bias=self.bias)
        self.v_proj = nn.Linear(self.hidden_dim, self.hidden_dim, bias=self.bias)
        self.attn_proj = nn.Linear(self.hidden_dim, self.hidden_dim, bias=self.bias)


    def head_split(self, x):
        x = x.view(self.batch_size, -1, self.num_head, self.head_dim)
        x = x.permute(0, 2, 1, 3)
        return x


    def scaled_dot_product(self, q, k, v, mask):
        attn_wts = torch.matmul(q, torch.transpose(k, 2, 3))/(self.head_dim ** 0.5)
        if not mask == None:
            attn_wts = attn_wts.masked_fill(mask==0, float('-inf'))
        attn_wts = F.softmax(attn_wts, dim=-1)
        attn_out = torch.matmul(attn_wts, v)
        return attn_wts, attn_out


    def reshaping(self, attn_out):
        attn_out = attn_out.permute(0, 2, 1, 3).contiguous()
        attn_out = attn_out.view(self.batch_size, -1, self.hidden_dim)
        return attn_out


    def forward(self, query, key, value, mask):
        if self.self_attn:
            assert (query == key).all() and (key==value).all()

        self.batch_size = query.size(0)
        q = self.head_split(self.q_proj(query))
        k = self.head_split(self.k_proj(key))
        v = self.head_split(self.v_proj(value))

        attn_wts, attn_out = self.scaled_dot_product(q, k, v, mask)
        attn_out = self.attn_proj(self.reshaping(attn_out))

        return attn_wts, attn_out

  • 5 ~ 10번째 줄: Attention을 위해 필요한 파라미터 정의.
  • 11번째 줄: num_head에 따른 head dimension sanity check.
  • 13 ~ 16번째 줄: Query, key, value 및 attention 결과를 각각 mapping하는 linear layer.
  • 19 ~ 22번째 줄: Multi-head attention을 위해 행렬 차원 바꿔주는 부분(B x L x hidden_dim → B x num_head x L x head_dim).
  • 25 ~ 31번째 줄: Scaled dot product를 하는 부분. Encoder의 경우에는 pad mask, decoder일 경우 mask에 causal mask가 들어옴.
  • 34 ~ 37번째 줄: Multi-head attention 때문에 바뀌었던 행렬을 차원을 복구하는 부분(B x num_head x L x head_dim → B x L x hidden_dim).
  • 40 ~ 52번째 줄: Multi-head attention을 수행하는 부분.




이제는 postion wise feed forward network 부분입니다.

Postion Wise Feed Forward Network

# postion wise feed forward
class PositionWiseFeedForward(nn.Module):
    def __init__(self, hidden_dim, ffn_dim, dropout, bias):
        super(PositionWiseFeedForward, self).__init__()
        self.hidden_dim = hidden_dim
        self.ffn_dim = ffn_dim
        self.dropout = dropout
        self.bias = bias

        self.FFN1 = nn.Sequential(
            nn.Linear(self.hidden_dim, self.ffn_dim, bias=self.bias),
            nn.GELU(),
            nn.Dropout(self.dropout)
        )
        self.FFN2 = nn.Sequential(
            nn.Linear(self.ffn_dim, self.hidden_dim, bias=self.bias),
        )
        self.init_weights()


    def init_weights(self):
        for _, param in self.named_parameters():
            if param.requires_grad:
                nn.init.normal_(param.data, mean=0, std=0.5)

    
    def forward(self, x):
        output = self.FFN1(x)
        output = self.FFN2(output)
        return output

  • 5 ~ 8번째 줄: Feed forward network를 위해 필요한 파라미터 정의.
  • 10 ~ 17번째 줄: 첫 번째, 두 번째 linear layer 정의.
  • 18 ~ 24번째 줄: Feed forward network 가중치 초기화.
  • 27 ~ 30번째 줄: Feed forward network 거치는 부분.




이제는 transformer의 encoder 부분입니다. 아래 코드의 config.의 부분은 GitHub 코드에 보면 src/config.json이라는 파일에 존재하는 변수 값들을 모델에 적용하여 초기화 하는 것입니다.

Encoder

# a single encoder layer
class EncoderLayer(nn.Module):
    def __init__(self, hidden_dim, ffn_dim, num_head, bias, dropout, layernorm_eps):
        super(EncoderLayer, self).__init__()
        self.hidden_dim = hidden_dim
        self.ffn_dim = ffn_dim
        self.num_head = num_head
        self.bias = bias
        self.dropout = dropout
        self.layernorm_eps = layernorm_eps
        self.dropout_layer = nn.Dropout(self.dropout)
        self.layer_norm = nn.LayerNorm(self.hidden_dim, eps=self.layernorm_eps)

        self.self_attention = MultiHeadAttention(self.hidden_dim, self.num_head, self.bias, self_attn=True, causal=False)
        self.positionWiseFeedForward = PositionWiseFeedForward(self.hidden_dim, self.ffn_dim, self.dropout, self.bias)


    def forward(self, x, mask):
        attn_wts, output = self.self_attention(query=x, key=x, value=x, mask=mask)
        output = self.dropout_layer(output)
        output = self.layer_norm(x + output)

        x = output
        output = self.positionWiseFeedForward(output)
        output = self.dropout_layer(output)
        output = self.layer_norm(x + output)

        return attn_wts, output



# all encoders
class Encoder(nn.Module):
    def __init__(self, config, tokenizer, device):
        super(Encoder, self).__init__()
        self.vocab_size = tokenizer.vocab_size
        self.pad_token_id = tokenizer.pad_token_id
        self.device = device

        self.enc_num_layers = config.enc_num_layers
        self.hidden_dim = config.hidden_dim
        self.ffn_dim = config.ffn_dim
        self.num_head = config.num_head
        self.max_len = config.max_len
        self.bias = bool(config.bias)
        self.dropout = config.dropout
        self.layernorm_eps = config.layernorm_eps
        self.pos_encoding = config.pos_encoding
        
        self.dropout_layer = nn.Dropout(self.dropout)
        self.emb_layer = Embeddings(self.vocab_size, self.hidden_dim, self.pad_token_id)
        self.pos_layer = PositionalEncoding(self.max_len, self.hidden_dim, self.pos_encoding, self.device)
        self.encoders = nn.ModuleList([EncoderLayer(self.hidden_dim, self.ffn_dim, self.num_head, self.bias, self.dropout, self.layernorm_eps) for _ in range(self.enc_num_layers)])


    def forward(self, x, mask=None):
        output = self.emb_layer(x) + self.pos_layer(x)
        output = self.dropout_layer(output)

        all_attn_wts = []
        for encoder in self.encoders:
            attn_wts, output = encoder(output, mask)
            all_attn_wts.append(attn_wts.detach().cpu())
        
        return all_attn_wts, output

  • 2 ~ 28번째 줄: 하나의 encoder block을 정의하는 코드.
  • 33 ~ 65번째 줄: Encoder block을 레이어 개수만큼 쌓아 전체 encoder를 정의하는 코드.
  • 5 ~ 12번째 줄: Encoder block을 제작할 때 필요한 파라미터 정의.
  • 14번째 줄: Self attention 정의.
  • 15번째 줄: Feed forward network 정의.
  • 18 ~ 28번째 줄: Encoder block을 거치는 부분.
  • 21, 26번째 줄: Residual connection 부분.
  • 36 ~ 48번째 줄: 전체 encoder를 제작하기 위한 파라미터 정의.
  • 51 ~ 52번째 줄: Encoder의 embedding 레이어 정의.
  • 53번째 줄: Encoder의 레이어 개수만큼 encoder block을 쌓아 정의하는 부분.
  • 56 ~ 65번째 줄: Encoder(모든 encoder block)를 거치는 부분.




이제는 transformer의 decoder 부분입니다. 아래 코드의 config.의 부분은 GitHub 코드에 보면 src/config.json이라는 파일에 존재하는 변수 값들을 모델에 적용하여 초기화 하는 것입니다.

Decoder

# a single decoder layer
class DecoderLayer(nn.Module):
    def __init__(self, hidden_dim, ffn_dim, num_head, bias, dropout, layernorm_eps):
        super(DecoderLayer, self).__init__()
        self.hidden_dim = hidden_dim
        self.ffn_dim = ffn_dim
        self.num_head = num_head
        self.bias = bias
        self.dropout = dropout
        self.layernorm_eps = layernorm_eps
        self.dropout_layer = nn.Dropout(self.dropout)
        self.layer_norm = nn.LayerNorm(self.hidden_dim, eps=self.layernorm_eps)

        self.masked_self_attention = MultiHeadAttention(self.hidden_dim, self.num_head, self.bias, self_attn=True, causal=True)
        self.enc_dec_attention = MultiHeadAttention(self.hidden_dim, self.num_head, self.bias, self_attn=False, causal=False)
        self.positionWiseFeedForward = PositionWiseFeedForward(self.hidden_dim, self.ffn_dim, self.dropout, self.bias)


    def forward(self, x, enc_output, dec_causal_mask, enc_dec_mask):
        dec_self_attn_wts, output = self.masked_self_attention(query=x, key=x, value=x, mask=dec_causal_mask)
        output = self.dropout_layer(output)
        output = self.layer_norm(x + output)

        x = output
        cross_attn_wts, output = self.enc_dec_attention(query=x, key=enc_output, value=enc_output, mask=enc_dec_mask)
        output = self.dropout_layer(output)
        output = self.layer_norm(x + output)

        x = output
        output = self.positionWiseFeedForward(output)
        output = self.dropout_layer(output)
        output = self.layer_norm(x + output)

        return dec_self_attn_wts, cross_attn_wts, output



# all decoders
class Decoder(nn.Module):
    def __init__(self, config, tokenizer, device):
        super(Decoder, self).__init__()
        self.vocab_size = tokenizer.vocab_size
        self.pad_token_id = tokenizer.pad_token_id
        self.device = device

        self.dec_num_layers = config.dec_num_layers
        self.hidden_dim = config.hidden_dim
        self.ffn_dim = config.ffn_dim
        self.num_head = config.num_head
        self.max_len = config.max_len
        self.bias = bool(config.bias)
        self.dropout = config.dropout
        self.layernorm_eps = config.layernorm_eps
        self.pos_encoding = config.pos_encoding

        self.dropout_layer = nn.Dropout(self.dropout)
        self.emb_layer = Embeddings(self.vocab_size, self.hidden_dim, self.pad_token_id)
        self.pos_layer = PositionalEncoding(self.max_len, self.hidden_dim, self.pos_encoding, self.device)
        self.decoders = nn.ModuleList([DecoderLayer(self.hidden_dim, self.ffn_dim, self.num_head, self.bias, self.dropout, self.layernorm_eps) for _ in range(self.dec_num_layers)])


    def forward(self, x, enc_output, dec_causal_mask=None, enc_dec_mask=None):
        output = self.emb_layer(x) + self.pos_layer(x)
        output = self.dropout_layer(output)

        all_self_attn_wts, all_cross_attn_wts = [], []
        for decoder in self.decoders:
            dec_self_attn_wts, cross_attn_wts, output = decoder(output, enc_output, dec_causal_mask, enc_dec_mask)
            all_self_attn_wts.append(dec_self_attn_wts.detach().cpu())
            all_cross_attn_wts.append(cross_attn_wts.detach().cpu())
        
        return all_cross_attn_wts, output

  • 2 ~ 34번째 줄: 하나의 decoder block을 정의하는 코드.
  • 39 ~ 72번째 줄: Decoder block을 레이어 개수만큼 쌓아 전체 decoder를 정의하는 코드.
  • 5 ~ 12번째 줄: Decoder block을 제작할 때 필요한 파라미터 정의.
  • 14번째 줄: Causal mask를 적용한 maked self attention 정의.
  • 15번째 줄: encoder-decoder attention 정의.
  • 16번째 줄: Feed forward network 정의.
  • 19 ~ 34번째 줄: Decoder block을 거치는 부분.
  • 22, 27, 32번째 줄: Residual connection 부분.
  • 42 ~ 54번째 줄: 전체 decoder를 제작하기 위한 파라미터 정의.
  • 57 ~ 58번째 줄: Decoder의 embedding 레이어 정의.
  • 59번째 줄: Decoder의 레이어 개수만큼 decoder block을 쌓아 정의하는 부분.
  • 62 ~ 75번째 줄: Decoder(모든 decoder block)를 거치는 부분.




이제는 encoder와 decoder를 합쳐 transformer를 구성하는 부분입니다.

Transformer

# transformer
class Transformer(nn.Module):
    def __init__(self, config, tokenizer, device):
        super(Transformer, self).__init__()
        self.config = config
        self.tokenizer = tokenizer
        self.device = device
        
        self.hidden_dim = self.config.hidden_dim

        self.encoder = Encoder(self.config, self.tokenizer, self.device)
        self.decoder = Decoder(self.config, self.tokenizer, self.device)
        self.fc = nn.Linear(self.hidden_dim, self.tokenizer.vocab_size)


    def make_mask(self, src, trg):
        enc_mask = torch.where(src==self.tokenizer.pad_token_id, 0, 1).unsqueeze(1).unsqueeze(2)
        dec_causal_mask = torch.tril(torch.ones(trg.size(1), trg.size(1))).unsqueeze(0).unsqueeze(1).to(self.device) + torch.where(trg==self.tokenizesr.pad_token_id, 0, 1).unsqueeze(1).unsqueeze(2)
        dec_causal_mask = torch.where(dec_causal_mask < 2, 0, 1)
        enc_dec_mask = enc_mask
        return enc_mask, dec_causal_mask, enc_dec_mask


    def forward(self, src, trg):
        enc_mask, dec_causal_mask, enc_dec_mask = self.make_mask(src, trg)
        all_attn_wts, enc_output = self.encoder(src, enc_mask)
        all_cross_attn_wts, output = self.decoder(trg, enc_output, dec_causal_mask, enc_dec_mask)
        output = self.fc(output)
        return all_cross_attn_wts, output

  • 5 ~ 13번째 줄: Fully-connected layer, encoder, decoder 등을 정의하는 부분.
  • 16 ~ 21번째 줄: pad mask, causal mask 등을 제작하는 부분.
  • 24 ~ 29번째 줄: Encoder, decoder를 거치는 전체 transformer 부분.

Transformer 학습

이제 기계 번역 모델 학습 코드를 통해 어떻게 학습이 이루어지는지 살펴보겠습니다. 아래 코드에 self. 이라고 나와있는 부분은 GitHub 코드에 보면 알겠지만 학습하는 코드가 class 내부의 변수이기 때문에 있는 것입니다. 여기서는 무시해도 좋습니다.

그리고 아래 학습 코드는 실제 학습 코드를 간소화한 것입니다. Scheduler 등 전체 학습 코드는 GitHub 코드를 참고하면 됩니다.

self.model = Transformer(self.config, self.tokenizer, self.device).to(self.device)
self.criterion = nn.CrossEntropyLoss(ignore_index=self.tokenizer.pad_token_id)
self.optimizer = optim.Adam(self.model.parameters(), lr=self.lr)

self.model.train()

for i, (src, trg) in enumerate(self.dataloaders['train']):
    batch = src.size(0)
    src, trg = src.to(self.device), trg.to(self.device)
    self.optimizer.zero_grad()

    with torch.set_grad_enabled(phase=='train'):
        _, output = self.model(src, trg)
        loss = self.criterion(output[:, :-1, :].reshape(-1, output.size(-1)), trg[:, 1:].reshape(-1))
        loss.backward()
        self.optimizer.step()

학습에 필요한 것들 선언
먼저 위에 코드에서 정의한 모델을 불러오고 학습에 필요한 loss function, optimizer 등을 선언하는 부분입니다.

  • 1 ~ 3번째 줄: Loss function, transformer 모델 및 optimizer 선언.

모델 학습
  • 5 ~ 16번째 줄: Cross entropy loss를 이용하여 모델 학습하는 부분.
  • 14 ~ 16번째 줄: Loss를 계산하고 모델을 업데이트 하는 부분.

Transformer 학습 결과

이제 챗봇 데이터 중 validation set의 BLEU, NIST 점수의 history와 실제 문장을 입력했을 때 어떤 답을 내어주는지 살펴보겠습니다.

BLEU score

NIST score


대화 샘플

# Sample 1
Q : 어디로 여행 가면 좋을까?
A: 온 가족이 모두 마음에 드는 곳으로 가보세요.


# Sample 2
Q: 나 좋아하는 남자가 생겼어
A: 충분히 그럴 수 있어여.


# Sample 3
Q: 오늘 저녁 뭐 먹을까?
A: 맛있는 거 드세요.




지금까지 transformer를 통해 한국어 챗봇 모델을 학습해보았습니다. 본 설명에서 사용한 데이터는 일반적인 딥러닝 모델 학습 데이터에 비해 많은 양이 아닙니다. 따라서 만족할만한 성능을 가진 모델을 학습하기에 어려움이 있고, 연애 데이터라는 bias를 가지고 있기 때문에 open-domain 챗봇을 학습하기에는 무리가 있습니다. 만약 open-domain 챗봇을 구현하고싶다면 AI Hub에 공개되어있는 많은 양의 소상공인, SNS 데이터를 섞어서 학습하는 것을 추천합니다. 그리고 영어 챗봇같은 경우는 multi-turn으로 구성된 DailyDialog 데이터를 추천합니다.

마지막으로 학습 과정에 대한 전체 코드는 GitHub에 있으니 참고하시면 될 것 같습니다. 다음에는 transformer 기반의 아주 강력한 encoder 모델인 BERT에 대해 설명해보겠습니다.

태그 #Transformer 한국어대화챗봇
⟨ 이전글
Transformer를 이용한 WMT'14, IWSLT'14 (En-De) 기계 번역