Rnn 20 (attention and seq2seq learning using date dataset in pytorch)
Seqence to Seqence Learning을 이용한 날짜 변환 규칙 학습
import os
import random
import string
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torch.utils.data as data
import pickle
from copy import deepcopy
from sklearn.model_selection import train_test_split
# 시드값 고정
seed = 50
os.environ['PYTHONHASHSEED'] = str(seed)
random.seed(seed) # 파이썬 난수 생성기 시드 고정
np.random.seed(seed) # 넘파이 난수 생성기 시드 고정
torch.manual_seed(seed) # 파이토치 난수 생성기 시드 고정 (CPU 사용 시)
torch.cuda.manual_seed(seed) # 파이토치 난수 생성기 시드 고정 (GPU 사용 시)
torch.cuda.manual_seed_all(seed) # 파이토치 난수 생성기 시드 고정 (멀티GPU 사용 시)
torch.backends.cudnn.deterministic = True # 확정적 연산 사용
torch.backends.cudnn.benchmark = False # 벤치마크 기능 해제
torch.backends.cudnn.enabled = False # cudnn 사용 해제
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
device
device(type='cpu')
1. 데이터 다운로드
from google.colab import files
# addition.txt 업로드 하기
t = files.upload()
Saving date.txt to date.txt
2. 데이터 불러오기
Vocabulary
class Vocabulary():
def __init__(self, file_path, vocab_from_file, vocab_file='./vocab.pkl'):
# dictionary 초기화
self.char2idx = {}
self.idx2char = {}
self.idx = 0
questions, answers = [], []
for line in open(file_path, 'r'):
idx = line.find('_')
questions.append(line[:idx])
answers.append(line[idx:-1])
self.questions, self.answers = questions, answers
if vocab_from_file:
with open(vocab_file, 'rb') as f:
vocab = pickle.load(f)
self.char2idx = vocab.char2idx
self.idx2char = vocab.idx2char
print('Vocabulary successfully loaded from vocab.pkl file!')
else:
self.build_vocab()
with open(vocab_file, 'wb') as f:
pickle.dump(self, f)
def build_vocab(self):
for i in range(len(self.questions)):
question, answer = self.questions[i], self.answers[i]
self.add_char(question)
self.add_char(answer)
print('Vocabulary 길이 : ', len(self.char2idx))
def add_char(self, txt):
chars = list(txt) # ['1', '6', '+', '7', '5', ' ', ' ']
for i, char in enumerate(chars):
if char not in self.char2idx:
tmp_id = len(self.char2idx)
self.char2idx[char] = tmp_id
self.idx2char[tmp_id] = char
def __len__(self):
return len(self.char2idx)
# Note. Dataset 클래스에서 Vocabulary()를 사용하지만,
# 편의상 전역변수로 준비해 놓으면 Dataset 클래스와 상관없이 활용
file_path='./date.txt'
questions, answers = [], []
for line in open(file_path, 'r'):
idx = line.find('_')
questions.append(line[:idx])
answers.append(line[idx:-1])
vocab = Vocabulary(file_path, vocab_from_file=False)
Vocabulary 길이 : 59
Dataset
class DateDataset(Dataset):
def __init__(self, file_path, questions, answers, vocab_from_file, vocab_file='./vocab.pkl'):
vocab = Vocabulary(file_path, vocab_from_file, vocab_file)
self.questions, self.answers = questions, answers
self.x = []
self.t = []
for i, question in enumerate(self.questions):
self.x.append([vocab.char2idx[c] for c in list(question)])
for i, answer in enumerate(self.answers):
self.t.append([vocab.char2idx[c] for c in list(answer)])
def __getitem__(self, index):
#return torch.LongTensor(self.x[index]), torch.LongTensor(self.t[index])
return torch.LongTensor(self.x[index][::-1]), torch.LongTensor(self.t[index])
def __len__(self):
return len(self.t)
# train/valid/test 분리
train_indices, test_indices = train_test_split(range(len(questions)), test_size=0.1)
train_indices, valid_indices = train_test_split(range(len(train_indices)), test_size=0.1)
questions, answers = np.array(questions), np.array(answers)
questions_train, questions_valid, questions_test = questions[train_indices], questions[valid_indices], questions[test_indices]
answers_train, answers_valid, answers_test = answers[train_indices], answers[valid_indices], answers[test_indices]
questions_train.shape, answers_train.shape
((40500,), (40500,))
trainset = DateDataset(file_path, questions_train, answers_train, vocab_from_file=False)
validset = DateDataset(file_path, questions_valid, answers_valid, vocab_from_file=True)
testset = DateDataset(file_path, questions_test, answers_test, vocab_from_file=True)
Vocabulary 길이 : 59
Vocabulary successfully loaded from vocab.pkl file!
Vocabulary successfully loaded from vocab.pkl file!
trainset[0][0], trainset[0][1]
(tensor([ 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7,
20, 16, 16, 8, 7, 10, 12, 7, 45, 37, 30]),
tensor([14, 8, 16, 16, 20, 15, 16, 33, 15, 16, 12]))
# 입력
for i in trainset[1][0]:
i = int(i)
char = vocab.idx2char[i]
print(char, end='')
print()
# 정답
for i in trainset[1][1]:
i = int(i)
char = vocab.idx2char[i]
print(char, end='')
1002 ,31 yluj
_2001-07-13
3. 데이터 적재 : DataLoader
batch_size = 128
trainloader = DataLoader(dataset=trainset, batch_size=batch_size, shuffle=True)
validloader = DataLoader(dataset=validset, batch_size=batch_size, shuffle=True)
testloader = DataLoader(dataset=testset, batch_size=batch_size, shuffle=True)
batch = next(iter(trainloader))
batch[0].size(), batch[1].size()
(torch.Size([128, 29]), torch.Size([128, 11]))
batch = next(iter(validloader))
batch[0].size(), batch[1].size()
(torch.Size([128, 29]), torch.Size([128, 11]))
batch = next(iter(testloader))
batch[0].size(), batch[1].size()
(torch.Size([128, 29]), torch.Size([128, 11]))
len(trainloader), len(validloader), len(testloader)
(317, 36, 40)
5. 모델 생성: Seq2Seq
class AttentionEncoder(nn.Module):
def __init__(self, vocab_size, wordvec_size, hidden_size):
super().__init__()
self.embed = nn.Embedding(num_embeddings=vocab_size, embedding_dim=wordvec_size)
self.lstm = nn.LSTM(input_size=wordvec_size, hidden_size=hidden_size, batch_first=True)
def forward(self, inputs): # input shape (N=128, T=7)
embed = self.embed(inputs) # embed shape (N=128, T=7, D=16)
out, (h, c)= self.lstm(embed) # out shape (N=128, T=7, H=128)
# h(c) shape (num_layers=1, N=128, H=128)
return out
class Attention(nn.Module):
def __init__(self):
super().__init__()
def forward(self, enc_hs, dec_hs):
N, D_T, H = dec_hs.shape # N=128, D_T=4, H=128
_, E_T, _ = enc_hs.shape # N=128, E_T=7, H=128
out = torch.empty_like(dec_hs) # N=128, D_T=4, H=128
attention_weights = []
for t in range(D_T): # D_T : 4
# Attetion Weight
h = dec_hs[:, t, :] # h shape (N=128, H=128)
h = h.reshape(N, 1, H) # h shape (N=128, 1, H=128)
hr = h.repeat(1, E_T, 1) # hr shape (N=128, E_T=7, H=128)
t1 = enc_hs * hr # t1 shape (N=128, E_T=7, H=128)
s = torch.sum(t1, dim=2) # s shape (N=128, E_T=7)
a = torch.softmax(s, dim=1) # a shape (N=128, E_T=7)
attention_weights.append(a) # 추후 시각화에 사용가능
# Weighted Sum
a = a.reshape(N, E_T, 1) # a shape (N=128, E_T=7, 1)
ar = a.repeat(1, 1, H) # ar shape (N=128, E_T=7, H=128)
t2 = enc_hs * ar # t2 shape (N=128, E_T=7, H=128)
c = torch.sum(t2, dim=1) # c shape (N=128, H=128)
out[:, t, :] = c
return out, attention_weights # out shape (N=128, D_T=4, H=128)
class AttentionDecoder(nn.Module):
def __init__(self, vocab_size, wordvec_size, hidden_size):
super().__init__()
self.hidden_size = hidden_size
self.vocab_size = vocab_size
self.embed = nn.Embedding(num_embeddings=vocab_size, embedding_dim=wordvec_size)
self.lstm = nn.LSTM(input_size=wordvec_size, hidden_size=hidden_size, batch_first=True)
self.attention = Attention()
self.affine = nn.Linear(in_features=hidden_size+hidden_size, out_features=vocab_size)
def forward(self, inputs, enc_hs): # inputs shape (N=128, T=4)
# enc_hs shape (N=128, T=7, H=128)
N, T = inputs.shape
N, T, H = enc_hs.shape
h = enc_hs[:,-1].unsqueeze(0) # h shape (num_layers=1, N=128, H=128)
c = self.init_cell(N)
embed = self.embed(inputs) # embed shape (N=128, T=4, D=16)
dec_hs, _ = self.lstm(embed, (h, c)) # dec_hs shape (N=128, T=4, H=128)
context, weights = self.attention(enc_hs, dec_hs) # enc_hs shape (N=128, T=7, H=128)
# dec_hs shape (N=128, T=4, H=128)
# context shape (N=128, D_T=4, H=128)
out = torch.cat((context, dec_hs), dim=2) # out shape (N=128, T=4, H+H=128+128)
out = self.affine(out) # out shape (N=128, T=4, V=13)
#out = out.view(-1, self.vocab_size) # out shape (NxT=128*4, V=13)
return out
def init_cell(self, batch_size):
weight = next(self.parameters())
return weight.new_zeros(1, batch_size, self.hidden_size)
def generate(self, enc_hs, start_id, sample_size):
sampled = []
sample_id = start_id
h = enc_hs[:, -1].unsqueeze(0) # h shape (num_layers=1, N=1, H=128)
c = self.init_cell(batch_size = 1) # c shape : (num_layers=1, N=1, H=128)
for _ in range(sample_size):
# sample_id = torch.tensor(sample_id).reshape(1, 1) # sample_id shape : (N=1, T=1)
sample_id = sample_id.clone().detach().reshape(1, 1) # remove userwarning
embed = self.embed(sample_id) # embed shape : (N=1, T=1, D=16)
dec_hs, (h, c)= self.lstm(embed, (h, c)) # out shape : (N=1, T=1, H=128)
context = self.attention(enc_hs, dec_hs)
out = torch.cat((context, dec_hs), dim=2) # out shape : (N=1, T=1, H+H=128+128 )
score = self.affine(out) # score shape : (N=1, T=1, V=13)
sample_id = torch.max(score, dim=2)[1]
sampled.append(int(sample_id))
return sampled
class AttentionSeq2Seq(nn.Module):
def __init__(self, vocab_size, wordvec_size, hidden_size):
super().__init__()
self.encoder = AttentionEncoder(vocab_size, wordvec_size, hidden_size)
self.decoder = AttentionDecoder(vocab_size, wordvec_size, hidden_size)
def forward(self, inputs, targets): # inputs shape (N=128, T=7)
# targets shape (N=128, T=5)
decoder_in = targets[:, :-1] # decoder_in shape (N=128, T=4)
h = self.encoder(inputs) # h shape (num_layers=1, N=128, H=128)
out = self.decoder(decoder_in, h) # out shape (N=128, T=4, V=13)
return out
def generate(self, inputs, start_id, sample_size): # inputs : (N=1, T=7)
h = self.encoder(inputs) # h shape (num_layers=1, N=1, H=128)
sampled = self.decoder.generate(h, start_id, sample_size) # start_id = 6('_'), sample_size=4
return sampled
하이퍼 파라미터 설정
vocab_size = len(vocab)
wordvec_size = 16
hidden_size = 256
batch_size = 128
learning_rate = 0.01
num_epochs=1 #10
model = AttentionSeq2Seq(vocab_size=vocab_size,
wordvec_size=wordvec_size,
hidden_size=hidden_size)
model = model.to(device)
model
AttentionSeq2Seq(
(encoder): AttentionEncoder(
(embed): Embedding(59, 16)
(lstm): LSTM(16, 256, batch_first=True)
)
(decoder): AttentionDecoder(
(embed): Embedding(59, 16)
(lstm): LSTM(16, 256, batch_first=True)
(attention): Attention()
(affine): Linear(in_features=512, out_features=59, bias=True)
)
)
out = model(batch[0].to(device), batch[1].to(device))
out.shape
torch.Size([128, 10, 59])
6. 모델 설정 (손실함수, 옵티마이저 선택)
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer=optimizer,
mode='min', factor=0.4,
patience=3, verbose=True)
7. 모델 훈련
def validate(model, validloader, loss_fn):
model.eval()
total = 0
correct = 0
valid_loss = []
valid_epoch_loss=0
with torch.no_grad():
for batch_data in validloader:
inputs = batch_data[0].to(device)
targets = batch_data[1].to(device)
# 전방향 예측과 손실
optimizer.zero_grad()
logits = model(inputs, targets)
targets = targets[:, 1:].clone() # Decoder의 정답을 준비하기 위해 1번째부터 색인
loss = loss_fn(logits.view(-1, vocab_size), targets.view(-1))
valid_loss.append(loss.item())
valid_epoch_loss = np.mean(valid_loss)
total_loss["val"].append(valid_epoch_loss)
return valid_epoch_loss
def eval_seq2seq(model, question, correct, idx2char, verbose=False, is_reverse=True):
model.eval()
correct = correct.flatten()
# 머릿글자
start_id = correct[0]
correct = correct[1:]
guess = model.generate(question, start_id, len(correct))
# 문자열로 변환
question = ''.join([idx2char[int(c)] for c in question.flatten()])
correct = ''.join([idx2char[int(c)] for c in correct])
guess = ''.join([idx2char[int(c)] for c in guess])
if verbose :
if is_reverse:
question = question[::-1]
print('Question : ', question)
print('True : ', correct)
print('Guess : ', guess)
print()
return 1 if guess == correct else 0
def train_loop(model, trainloader, loss_fn, epochs, optimizer):
min_loss = 1000000
trigger = 0
patience = 10
max_grad = 5.0
for epoch in range(epochs):
model.train()
train_loss = []
for batch_data in (trainloader):
inputs = batch_data[0].to(device)
targets = batch_data[1].to(device)
optimizer.zero_grad()
logits = model(inputs, targets)
targets = targets[:, 1:].clone() # Decoder의 정답을 준비하기 위해 1번째부터 색인
loss = loss_fn(logits.view(-1, vocab_size), targets.view(-1))
loss.backward()
# clipping gradient
torch.nn.utils.clip_grad_norm_(model.parameters(), max_grad)
optimizer.step()
train_loss.append(loss.item())
train_epoch_loss = np.mean(train_loss)
total_loss["train"].append(train_epoch_loss)
valid_epoch_loss = validate(model, validloader, loss_fn)
# for valid accuracy (it takes time!!)
correct_num = 0
for i in range(len(validset)):
question = validset[i][0].unsqueeze(0).to(device)
correct = validset[i][1].unsqueeze(0).to(device)
correct_num += eval_seq2seq(model, question, correct, vocab.idx2char, verbose=False, is_reverse=True)
valid_accuracy = correct_num /len(validset)
print("Epoch: {}/{}, Train Loss={:.4f}, Val Loss={:.4f}, Val Accuracy={:.2f}".format(
epoch + 1, epochs,
total_loss["train"][-1],
total_loss["val"][-1],
valid_accuracy
))
# Early Stopping (조기 종료)
if valid_epoch_loss > min_loss: # valid_loss가 min_loss를 갱신하지 못하면
trigger += 1
print('trigger : ', trigger)
if trigger > patience:
print('Early Stopping !!!')
print('Training loop is finished !!')
return
else:
trigger = 0
min_loss = valid_epoch_loss
best_model_state = deepcopy(model.state_dict())
torch.save(best_model_state, 'best_checkpoint.pth')
# -------------------------------------------
# Learning Rate Scheduler
scheduler.step(valid_epoch_loss)
# -------------------------------------------
total_loss = {"train": [], "val": []}
%time train_loop(model, trainloader, loss_fn, num_epochs, optimizer)
import matplotlib.pyplot as plt
plt.plot(total_loss['train'], label="train_loss")
plt.plot(total_loss['val'], label="vallid_loss")
plt.legend()
plt.show()
8. 모델 평가
# test 데이터 전체로 평가
correct_num = 0
for i in range(len(testset)):
question = testset[i][0].unsqueeze(0).to(device)
correct = testset[i][1].unsqueeze(0).to(device)
correct_num += eval_seq2seq(model, question, correct, vocab.idx2char, verbose=False, is_reverse=True)
test_accuracy = correct_num / len(testset)
test_accuracy
1.0
9. 모델 예측
for i in range(0, 3):
question = testset[i][0].unsqueeze(0).to(device)
correct = testset[i][1].unsqueeze(0).to(device)
eval_seq2seq(model, question, correct, vocab.idx2char, verbose=True, is_reverse=True)
Question : 11/14/03
True : 2003-11-14
Guess : 2003-11-14
Question : APRIL 2, 1972
True : 1972-04-02
Guess : 1972-04-02
Question : apr 14, 2004
True : 2004-04-14
Guess : 2004-04-14
댓글남기기