Python/NLP Code

NLP Question Answering 전처리, 학습 및 성능 평가 코드 예제

jimmy_AI 2022. 10. 19. 22:38
반응형

HuggingFace에서 제공하는 transformers 모듈을 활용하여

pre-train model을 불러와 자연어 처리의 대표 task 중 하나인 Question Answering에

대하여 학습하고 검증하는 과정들의 파이썬 실습 코드 예제를 다루어보도록 하겠습니다.

 

 

Step 1. 데이터셋 로드 및 전처리 과정

먼저, 학습을 위하여 데이터셋을 불러오고 모델 학습을 위한 전처리 과정을

수행해주어야 합니다.

여기서는 QA task의 대표 벤치마크 데이터셋인 SQuAD를 사용해보도록 하겠습니다.

 

전처리 과정에 대해서는 아래 링크의 허깅페이스 공식 페이지를 참고하였으니,

상세한 내용이 필요하신 분들은 해당 글을 참고해주세요.

 

Question answering - Hugging Face Course

Time to look at question answering! This task comes in many flavors, but the one we’ll focus on in this section is called extractive question answering. This involves posing questions about a document and identifying the answers as spans of text in the d

huggingface.co

 

먼저, 데이터셋 로드는 datasets 모듈을 활용하여 아래의 코드로 간단히 수행할 수 있습니다.

# 모듈 미설치 시 아래의 코드 선실행
!pip install transformers
!pip install datasets

# 데이터셋 로드
from datasets import load_dataset

raw_datasets = load_dataset("squad")

 

이제, 학습할 모델의 토크나이저를 불러와 전처리를 진행해보도록 하겠습니다.

이번 글에서는 학습할 모델을 RoBERTa로 정하고 코드를 설명하겠습니다.

from transformers import RobertaTokenizerFast

# 토크나이저 불러오기
tokenizer = RobertaTokenizerFast.from_pretrained('roberta-base', add_prefix_space=True, truncation=True)

# 전처리 조건 설정
max_length = 384
stride = 128

# 학습용 데이터 전처리 함수(윗 링크 공식글 참조)
def preprocess_training_examples(examples):
    questions = [q.strip() for q in examples["question"]]
    inputs = tokenizer(
        questions,
        examples["context"],
        max_length=max_length,
        truncation="only_second",
        stride=stride,
        return_overflowing_tokens=True,
        return_offsets_mapping=True,
        padding="max_length",
    )

    offset_mapping = inputs.pop("offset_mapping")
    sample_map = inputs.pop("overflow_to_sample_mapping")
    answers = examples["answers"]
    start_positions = []
    end_positions = []

    for i, offset in enumerate(offset_mapping):
        sample_idx = sample_map[i]
        answer = answers[sample_idx]
        start_char = answer["answer_start"][0]
        end_char = answer["answer_start"][0] + len(answer["text"][0])
        sequence_ids = inputs.sequence_ids(i)

        # 답변의 시작 및 끝 토큰 위치 인덱스 탐색
        idx = 0
        while sequence_ids[idx] != 1:
            idx += 1
        context_start = idx
        while sequence_ids[idx] == 1:
            idx += 1
        context_end = idx - 1

        # 답변이 문맥 내에 온전하게 없는 경우 (0, 0)으로 라벨링
        if offset[context_start][0] > start_char or offset[context_end][1] < end_char:
            start_positions.append(0)
            end_positions.append(0)
        else:
            idx = context_start
            while idx <= context_end and offset[idx][0] <= start_char:
                idx += 1
            start_positions.append(idx - 1)

            idx = context_end
            while idx >= context_start and offset[idx][1] >= end_char:
                idx -= 1
            end_positions.append(idx + 1)

    inputs["start_positions"] = start_positions
    inputs["end_positions"] = end_positions
    return inputs

# 불러온 데이터셋에 대하여 전처리 진행(수 분 가량 소요 가능)
train_dataset = raw_datasets["train"].map(
    preprocess_training_examples,
    batched=True,
    remove_columns=raw_datasets["train"].column_names,
)

train_dataset.set_format(type='torch')

 

전처리된 학습용 데이터셋의 feature 구성은 다음과 같이 문장 토큰의

input_ids 번호, attention_mask 및 답변의 시작과 끝 인덱스 번호가 저장되어 있습니다.

print(train_dataset)

# 출력 결과
Dataset({
    features: ['input_ids', 'attention_mask', 'start_positions', 'end_positions'],
    num_rows: 88567
})

 

만일, 학습 중간에 검증을 진행하고 싶은 경우, 검증용 데이터셋도 비슷한 원리로 전처리를

해주시면 됩니다. 이에 대한 내용은 윗 링크의 공식 포스팅을 참고해주세요.

 

 

Step 2. 학습 과정

1. transformers 모듈의 Trainer를 활용한 방법

허깅페이스에서 제공하는 trainer 기능을 활용하면

조건을 간편하게 지정할 수 있으면서 최적화된 학습 진행이 가능합니다.

 

학습할 모델을 불러와 학습을 진행하는 예시 코드는 아래와 같습니다.

from transformers import AutoModelForQuestionAnswering, TrainingArguments, Trainer

# 학습할 모델 불러오기
model = AutoModelForQuestionAnswering.from_pretrained("roberta-base")

# 모델 저장 경로
model_save_path = "model/baseline"

# 학습 조건 설정
# 상세 정보 참고 : https://huggingface.co/docs/transformers/main_classes/trainer
args = TrainingArguments(
    model_save_path,
    evaluation_strategy="no",
    save_strategy="epoch",
    learning_rate=2e-5,
    num_train_epochs=3,
    weight_decay=0.01,
    fp16=True,
)

# trainer 설정
trainer = Trainer(
    model=model,
    args=args,
    train_dataset=train_dataset,
    tokenizer=tokenizer,
)

# 학습 진행
trainer.train()

 

반응형

 

2. pytorch 수동 구현 방법

파이토치에서 QA 모델의 학습 과정을 수동으로 구현하는 것도 가능합니다.

이 경우에는 모델 구조나 loss 함수를 유연하게 변경하여 적용할 수 있다는 장점이 있습니다.

 

여기서는 아래와 같이 QA 모델에서 사용되는 기본적인 loss 함수를 적용해보겠습니다.

import torch.nn as nn

class QA_Loss(nn.Module):
    def __init__(self):
        super(QA_Loss, self).__init__()

        self.CE = nn.CrossEntropyLoss() # 크로스엔트로피 손실 함수 사용

    def forward(self, results, qa_pos_label): # qa_pos_label : tensor, [[s1, e1], [s2, e2], ...]
        start_logits, end_logits = results # 모델 예측 결과 logits 받아오기
		
        # batch_size 찾기
        batch_size = start_logits.shape[0]
        if batch_size == 512: batch_size = 1

        # 시작 위치 및 끝 위치에 대한 크로스엔트로피 결과 합산
        qa_loss = self.CE(start_logits, qa_pos_label[:, 0]) + self.CE(end_logits, qa_pos_label[:, 1])

        return qa_loss

 

위의 loss 함수를 통하여 학습을 진행하는 예시 코드 스니펫은 아래와 같습니다.

import torch
from torch.utils.data import TensorDataset, DataLoader
from torch.optim.lr_scheduler import LinearLR
from tqdm import tqdm

# 데이터로더 형태로 전처리
dataset = TensorDataset(train_dataset['input_ids'], train_dataset['attention_mask'],
                            train_dataset['start_positions'], train_dataset['end_positions'])

batch_size = 16

dataloader = DataLoader(
    dataset,
    batch_size=batch_size)

# gpu 설정
device = torch.device('cuda:0') if torch.cuda.is_available() else torch.device('cpu')

model = AutoModelForQuestionAnswering.from_pretrained("roberta-base")
model.to(device)

loss_fn = QA_Loss().to(device)

# 하이퍼파라미터 설정
epochs = 3
lr = 2e-5

optimizer = torch.optim.AdamW(model.parameters(), lr=float(lr), weight_decay=0.01)
scheduler = LinearLR(optimizer, start_factor=1.0, end_factor=0.0, total_iters=epochs * len(dataloader))

# 학습 모드 설정
model.train()

for i in range(1, epochs + 1):

    # 현재 epoch의 총 loss 값 초기화
    total_loss = 0
    
    for step, data in enumerate(tqdm(dataloader)):
		
        # 각 데이터를 gpu에 할당
        for j in range(len(data)):
            data[j] = data[j].to(device)
            
        # 라벨로 활용할 답변의 start 및 end 위치
        qa_pos_label = torch.stack([data[-2], data[-1]], axis=1).to(device)

        # input_ids 및 attention_mask 정보를 input으로 주어 결과 반환
        results = model(*data[:2])
        
        # start 및 end 위치 logits 받아오기
        logits = (results.start_logits, results.end_logits)
        
        # loss 값 계산
        loss = loss_fn(logits, qa_pos_label)
        total_loss += loss.item()
        
        # 모델 파라미터 업데이트
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        scheduler.step()
        
        # epoch 중간에 loss 값 출력
        if step % 500 == 0 and step != 0:
            print('Step : %d, Avg Loss : %.4f, lr : %.10f' % (step, total_loss / (step * batch_size), optimizer.param_groups[0]['lr']))

    # epoch 종료 후 loss 값 출력
    print('Epoch : %d, Avg Loss : %.4f' % (i, total_loss / len(train_dataset['input_ids'])))

    # epoch 종료 후 모델 저장 예시
    torch.save(model, f'{model_save_path}.pt')

 

 

Step 3. 성능 평가 과정(EM, F1 점수 계산)

QA task에서는 모델의 성능으로 실제 일치 여부인 Exact Match(EM)

실제 답변과 예측 답변 간의 F1 점수를 주로 사용합니다.

 

이 점수들의 정확한 계산을 위하여 텍스트 전처리 과정이 필요한데,

아래 사이트의 글을 참고하여 가져온 텍스트 전처리 과정 코드는 다음과 같습니다.

 

Evaluating QA: Metrics, Predictions, and the Null Response

A deep dive into computing QA predictions and when to tell BERT to zip it!

qa.fastforwardlabs.com

def normalize_text(s):
    """Removing articles and punctuation, and standardizing whitespace are all typical text processing steps."""
    import string, re

    def remove_articles(text):
        regex = re.compile(r"\b(a|an|the)\b", re.UNICODE)
        return re.sub(regex, " ", text)

    def white_space_fix(text):
        return " ".join(text.split())

    def remove_punc(text):
        exclude = set(string.punctuation)
        return "".join(ch for ch in text if ch not in exclude)

    def lower(text):
        return text.lower()

    return white_space_fix(remove_articles(remove_punc(lower(s))))


def compute_exact_match(prediction, truth):
    return int(normalize_text(prediction) == normalize_text(truth))


def compute_f1(prediction, truth):
    pred_tokens = normalize_text(prediction).split()
    truth_tokens = normalize_text(truth).split()

    # if either the prediction or the truth is no-answer then f1 = 1 if they agree, 0 otherwise
    if len(pred_tokens) == 0 or len(truth_tokens) == 0:
        return int(pred_tokens == truth_tokens)

    common_tokens = set(pred_tokens) & set(truth_tokens)

    # if there are no common tokens then f1 = 0
    if len(common_tokens) == 0:
        return 0

    prec = len(common_tokens) / len(pred_tokens)
    rec = len(common_tokens) / len(truth_tokens)

    return 2 * (prec * rec) / (prec + rec)

 

transformers 모듈의 pipeline 기능을 활용하면 쉽게 답변을 가져올 수 있습니다.

이를 선언하는 코드 예시는 아래와 같습니다.

from transformers import pipeline

# task 종류, 모델, 토크나이저, 사용할 gpu 번호 지정
pipe = pipeline("question-answering", model=model, tokenizer=tokenizer, device = 0)

 

이제 테스트용 데이터셋을 가져와 EM 및 F1 점수를 계산해보도록 하겠습니다.

성능 평가를 진행하는 코드 스니펫은 다음과 같습니다.

# 테스트용 데이터셋 가져오기
test_dataset = load_dataset("squad")["validation"]

batch_size = 64
em = 0
f1 = 0

for i in tqdm(range(0, len(test_dataset), batch_size)):

    if i % 256 == 0 and i != 0: # 중간 결과 출력
        print("EM : %.4f, F1 : %.4f"%(em * 100 / i, f1 * 100 / i))

    if i != (len(test_dataset) // batch_size) * batch_size: # last batch가 아닌 경우
        out = pipe(question=test_dataset[i:i + batch_size]['question'],
                   context=test_dataset[i:i + batch_size]['context'],
                   batch_size=batch_size, truncation=True)
    else: # last batch
        new_batch_size = len(test_dataset[i:]['question'])
        out = pipe(question=test_dataset[i:]['question'],
                   context=test_dataset[i:]['context'],
                   batch_size=new_batch_size, truncation=True)

    for j in range(len(out)):
    
        # 각 후보 답변에 대한 EM 및 F1 점수 계산
        em_list = []
        f1_list = []
        for ground_answer in test_dataset[i + j]['answers']['text']:
            em_list.append(compute_exact_match(out[j]['answer'], ground_answer))
            f1_list.append(compute_f1(out[j]['answer'], ground_answer))
            
        # 후보 답변 중 최고 EM 및 F1 점수를 반영하여 누적
        em += max(em_list)
        f1 += max(f1_list)

em /= (data_num / 100)
f1 /= (data_num / 100)

print("EM : %.4f, F1 : %.4f"%(em, f1))