Direct Preference Optimization¶
- DPO 를 위한 데이터 준비
- DPOTrainer 이용한 DPO 학습
이 두 가지를 실습합니다.
자세한 내용은 허깅페이스를 참조하세요.
Huggingface's Alignment Handbook 을 참조하여 Zephyr 를 만들었던 사례를 한국어버전으로 비슷하게 재현하는 실습입니다.
In [ ]:
Copied!
#%%capture
#!pip install unsloth "xformers==0.0.28.post2"
# Also get the latest nightly Unsloth!
#!pip uninstall unsloth -y && pip install --upgrade --no-cache-dir "unsloth[colab-new] @ git+https://github.com/unslothai/unsloth.git"
!pip install unsloth
#%%capture
#!pip install unsloth "xformers==0.0.28.post2"
# Also get the latest nightly Unsloth!
#!pip uninstall unsloth -y && pip install --upgrade --no-cache-dir "unsloth[colab-new] @ git+https://github.com/unslothai/unsloth.git"
!pip install unsloth
- DPO 는 비슷한 데이터셋으로 SFT 된 모델이 먼저 필요합니다.
- Huggingface Zephyr 재현을 위해서는 참고로
HuggingFaceH4/mistral-7b-sft-beta
모델을 SFT 된 베이스 모델로 사용하는 것이 가능합니다.
In [ ]:
Copied!
# One must patch the DPO Trainer first!
from unsloth import PatchDPOTrainer
PatchDPOTrainer()
# One must patch the DPO Trainer first!
from unsloth import PatchDPOTrainer
PatchDPOTrainer()
In [ ]:
Copied!
from unsloth import FastLanguageModel
import torch
max_seq_length = 4096 # Choose any! We auto support RoPE Scaling internally!
dtype = None # None for auto detection. Float16 for Tesla T4, V100, Bfloat16 for Ampere+
load_in_4bit = True # Use 4bit quantization to reduce memory usage. Can be False.
model, tokenizer = FastLanguageModel.from_pretrained(
model_name = "Qwen/Qwen2.5-7B-Instruct",
max_seq_length = max_seq_length,
dtype = dtype,
load_in_4bit = load_in_4bit,
token = "", # use one if using gated models like meta-llama/Llama-2-7b-hf
)
from unsloth import FastLanguageModel
import torch
max_seq_length = 4096 # Choose any! We auto support RoPE Scaling internally!
dtype = None # None for auto detection. Float16 for Tesla T4, V100, Bfloat16 for Ampere+
load_in_4bit = True # Use 4bit quantization to reduce memory usage. Can be False.
model, tokenizer = FastLanguageModel.from_pretrained(
model_name = "Qwen/Qwen2.5-7B-Instruct",
max_seq_length = max_seq_length,
dtype = dtype,
load_in_4bit = load_in_4bit,
token = "", # use one if using gated models like meta-llama/Llama-2-7b-hf
)
In [4]:
Copied!
#@title Alignment Handbook utils
import os
import re
from typing import List, Literal, Optional
from datasets import DatasetDict, concatenate_datasets, load_dataset, load_from_disk
from datasets.builder import DatasetGenerationError
DEFAULT_CHAT_TEMPLATE = "{% for message in messages %}\n{% if message['role'] == 'user' %}\n{{ '<|user|>\n' + message['content'] + eos_token }}\n{% elif message['role'] == 'system' %}\n{{ '<|system|>\n' + message['content'] + eos_token }}\n{% elif message['role'] == 'assistant' %}\n{{ '<|assistant|>\n' + message['content'] + eos_token }}\n{% endif %}\n{% if loop.last and add_generation_prompt %}\n{{ '<|assistant|>' }}\n{% endif %}\n{% endfor %}"
def apply_chat_template(
example, tokenizer, task: Literal["sft", "generation", "rm", "dpo"] = "sft", assistant_prefix="<|assistant|>\n"
):
def _strip_prefix(s, pattern):
# Use re.escape to escape any special characters in the pattern
return re.sub(f"^{re.escape(pattern)}", "", s)
if task == "dpo":
if all(k in example.keys() for k in ("chosen", "rejected")):
# Compared to reward modeling, we filter out the prompt, so the text is everything after the last assistant token
prompt_messages = []
# TODO: handle case where chosen/rejected also have system messages
chosen_messages = example["chosen"][1:]
rejected_messages = example["rejected"][1:]
example["text_chosen"] = tokenizer.apply_chat_template(chosen_messages, tokenize=False)
example["text_rejected"] = tokenizer.apply_chat_template(rejected_messages, tokenize=False)
example["text_prompt"] = tokenizer.apply_chat_template(
prompt_messages, tokenize=False, add_generation_prompt=True
)
example["text_chosen"] = _strip_prefix(example["text_chosen"], assistant_prefix)
example["text_rejected"] = _strip_prefix(example["text_rejected"], assistant_prefix)
else:
raise ValueError(
f"Could not format example as dialogue for `dpo` task! Require `[chosen, rejected]` keys but found {list(example.keys())}"
)
else:
raise ValueError(
f"Task {task} not supported, please ensure that the provided task is one of {['sft', 'generation', 'rm', 'dpo']}"
)
return example
def get_datasets(
data_config: dict,
splits: List[str] = ["train", "test"],
shuffle: bool = True,
) -> DatasetDict:
"""
Loads one or more datasets with varying training set proportions.
Args:
data_config (`DataArguments` or `dict`):
Dataset configuration and split proportions.
splits (`List[str]`, *optional*, defaults to `['train', 'test']`):
Dataset splits to load and mix. Assumes the splits exist in all datasets and have a `train_` or `test_` prefix.
shuffle (`bool`, *optional*, defaults to `True`):
Whether to shuffle the training and testing/validation data.
Returns
[`DatasetDict`]: The dataset dictionary containing the loaded datasets.
"""
if type(data_config) is dict:
# Structure of the input is:
# dataset_mixer = {
# "dataset1": 0.5,
# "dataset1": 0.3,
# "dataset1": 0.2,
# }
dataset_mixer = data_config
else:
raise ValueError(f"Data config {data_config} not recognized.")
raw_datasets = mix_datasets(dataset_mixer, splits=splits, shuffle=shuffle)
return raw_datasets
def mix_datasets(dataset_mixer: dict, splits: Optional[List[str]] = None, shuffle=True) -> DatasetDict:
"""
Loads and mixes datasets according to proportions specified in `dataset_mixer`.
Args:
dataset_mixer (`dict`):
Dictionary containing the dataset names and their training proportions. By default, all test proportions are 1.
splits (Optional[List[str]], *optional*, defaults to `None`):
Dataset splits to load and mix. Assumes the splits exist in all datasets and have a `train_` or `test_` prefix.
shuffle (`bool`, *optional*, defaults to `True`):
Whether to shuffle the training and testing/validation data.
"""
raw_datasets = DatasetDict()
raw_train_datasets = []
raw_val_datasets = []
fracs = []
for ds, frac in dataset_mixer.items():
fracs.append(frac)
for split in splits:
try:
# Try first if dataset on a Hub repo
dataset = load_dataset(ds, split=split)
except DatasetGenerationError:
# If not, check local dataset
dataset = load_from_disk(os.path.join(ds, split))
if "train" in split:
raw_train_datasets.append(dataset)
elif "test" in split:
raw_val_datasets.append(dataset)
else:
raise ValueError(f"Split type {split} not recognized as one of test or train.")
if any(frac < 0 for frac in fracs):
raise ValueError("Dataset fractions cannot be negative.")
if len(raw_train_datasets) > 0:
train_subsets = []
for dataset, frac in zip(raw_train_datasets, fracs):
train_subset = dataset.select(range(int(frac * len(dataset))))
train_subsets.append(train_subset)
if shuffle:
raw_datasets["train"] = concatenate_datasets(train_subsets).shuffle(seed=42)
else:
raw_datasets["train"] = concatenate_datasets(train_subsets)
# No subsampling for test datasets to enable fair comparison across models
if len(raw_val_datasets) > 0:
if shuffle:
raw_datasets["test"] = concatenate_datasets(raw_val_datasets).shuffle(seed=42)
else:
raw_datasets["test"] = concatenate_datasets(raw_val_datasets)
if len(raw_datasets) == 0:
raise ValueError(
f"Dataset {dataset_mixer} not recognized with split {split}. Check the dataset has been correctly formatted."
)
return raw_datasets
#@title Alignment Handbook utils
import os
import re
from typing import List, Literal, Optional
from datasets import DatasetDict, concatenate_datasets, load_dataset, load_from_disk
from datasets.builder import DatasetGenerationError
DEFAULT_CHAT_TEMPLATE = "{% for message in messages %}\n{% if message['role'] == 'user' %}\n{{ '<|user|>\n' + message['content'] + eos_token }}\n{% elif message['role'] == 'system' %}\n{{ '<|system|>\n' + message['content'] + eos_token }}\n{% elif message['role'] == 'assistant' %}\n{{ '<|assistant|>\n' + message['content'] + eos_token }}\n{% endif %}\n{% if loop.last and add_generation_prompt %}\n{{ '<|assistant|>' }}\n{% endif %}\n{% endfor %}"
def apply_chat_template(
example, tokenizer, task: Literal["sft", "generation", "rm", "dpo"] = "sft", assistant_prefix="<|assistant|>\n"
):
def _strip_prefix(s, pattern):
# Use re.escape to escape any special characters in the pattern
return re.sub(f"^{re.escape(pattern)}", "", s)
if task == "dpo":
if all(k in example.keys() for k in ("chosen", "rejected")):
# Compared to reward modeling, we filter out the prompt, so the text is everything after the last assistant token
prompt_messages = []
# TODO: handle case where chosen/rejected also have system messages
chosen_messages = example["chosen"][1:]
rejected_messages = example["rejected"][1:]
example["text_chosen"] = tokenizer.apply_chat_template(chosen_messages, tokenize=False)
example["text_rejected"] = tokenizer.apply_chat_template(rejected_messages, tokenize=False)
example["text_prompt"] = tokenizer.apply_chat_template(
prompt_messages, tokenize=False, add_generation_prompt=True
)
example["text_chosen"] = _strip_prefix(example["text_chosen"], assistant_prefix)
example["text_rejected"] = _strip_prefix(example["text_rejected"], assistant_prefix)
else:
raise ValueError(
f"Could not format example as dialogue for `dpo` task! Require `[chosen, rejected]` keys but found {list(example.keys())}"
)
else:
raise ValueError(
f"Task {task} not supported, please ensure that the provided task is one of {['sft', 'generation', 'rm', 'dpo']}"
)
return example
def get_datasets(
data_config: dict,
splits: List[str] = ["train", "test"],
shuffle: bool = True,
) -> DatasetDict:
"""
Loads one or more datasets with varying training set proportions.
Args:
data_config (`DataArguments` or `dict`):
Dataset configuration and split proportions.
splits (`List[str]`, *optional*, defaults to `['train', 'test']`):
Dataset splits to load and mix. Assumes the splits exist in all datasets and have a `train_` or `test_` prefix.
shuffle (`bool`, *optional*, defaults to `True`):
Whether to shuffle the training and testing/validation data.
Returns
[`DatasetDict`]: The dataset dictionary containing the loaded datasets.
"""
if type(data_config) is dict:
# Structure of the input is:
# dataset_mixer = {
# "dataset1": 0.5,
# "dataset1": 0.3,
# "dataset1": 0.2,
# }
dataset_mixer = data_config
else:
raise ValueError(f"Data config {data_config} not recognized.")
raw_datasets = mix_datasets(dataset_mixer, splits=splits, shuffle=shuffle)
return raw_datasets
def mix_datasets(dataset_mixer: dict, splits: Optional[List[str]] = None, shuffle=True) -> DatasetDict:
"""
Loads and mixes datasets according to proportions specified in `dataset_mixer`.
Args:
dataset_mixer (`dict`):
Dictionary containing the dataset names and their training proportions. By default, all test proportions are 1.
splits (Optional[List[str]], *optional*, defaults to `None`):
Dataset splits to load and mix. Assumes the splits exist in all datasets and have a `train_` or `test_` prefix.
shuffle (`bool`, *optional*, defaults to `True`):
Whether to shuffle the training and testing/validation data.
"""
raw_datasets = DatasetDict()
raw_train_datasets = []
raw_val_datasets = []
fracs = []
for ds, frac in dataset_mixer.items():
fracs.append(frac)
for split in splits:
try:
# Try first if dataset on a Hub repo
dataset = load_dataset(ds, split=split)
except DatasetGenerationError:
# If not, check local dataset
dataset = load_from_disk(os.path.join(ds, split))
if "train" in split:
raw_train_datasets.append(dataset)
elif "test" in split:
raw_val_datasets.append(dataset)
else:
raise ValueError(f"Split type {split} not recognized as one of test or train.")
if any(frac < 0 for frac in fracs):
raise ValueError("Dataset fractions cannot be negative.")
if len(raw_train_datasets) > 0:
train_subsets = []
for dataset, frac in zip(raw_train_datasets, fracs):
train_subset = dataset.select(range(int(frac * len(dataset))))
train_subsets.append(train_subset)
if shuffle:
raw_datasets["train"] = concatenate_datasets(train_subsets).shuffle(seed=42)
else:
raw_datasets["train"] = concatenate_datasets(train_subsets)
# No subsampling for test datasets to enable fair comparison across models
if len(raw_val_datasets) > 0:
if shuffle:
raw_datasets["test"] = concatenate_datasets(raw_val_datasets).shuffle(seed=42)
else:
raw_datasets["test"] = concatenate_datasets(raw_val_datasets)
if len(raw_datasets) == 0:
raise ValueError(
f"Dataset {dataset_mixer} not recognized with split {split}. Check the dataset has been correctly formatted."
)
return raw_datasets
Data Prep¶
Alignment Handbook 에 나온 Zephyr 재현을 따라하기 위해, Ultra Feedback dataset 데이터셋을 사용하려고 합니다.
본 예제에서는 한국어 번역본 데이터셋 을 사용할 것이고, 10k 중 1%만 샘플링해서 사용하겠습니다, 다 쓰면 너무 오래걸려서요.
In [27]:
Copied!
raw_datasets = get_datasets(
{"maywell/ko_Ultrafeedback_binarized_10k" : 0.1}, # 10% sampled
splits = ["train"],
)
column_names = list(raw_datasets["train"].features)
raw_datasets = get_datasets(
{"maywell/ko_Ultrafeedback_binarized_10k" : 0.1}, # 10% sampled
splits = ["train"],
)
column_names = list(raw_datasets["train"].features)
In [36]:
Copied!
from datasets import Dataset, DatasetDict
def prepare_dataset_for_dpo(raw_datasets):
formatted_data = []
for row in raw_datasets["train"]:
# Create a dictionary for each entry with structured fields
formatted_entry = {
"prompt": f'<|im_start|>system\\nYou are Qwen, created by Alibaba Cloud. You are a helpful assistant.<|im_end|>\\n<|im_start|>user\\n{row["prompt"]}<|im_end|>\\n<|im_start|>assistant\\n',
"chosen": f'{row["chosen"]}\n',
"rejected": f'{row["rejected"]}\n'
}
formatted_data.append(formatted_entry)
# Create a Hugging Face Dataset from the list of dictionaries
dpo_dataset = Dataset.from_list(formatted_data)
# Wrap it in a DatasetDict to match the format of raw_datasets
dpo_dataset = DatasetDict({"train": dpo_dataset})
return dpo_dataset
# Example usage
dpo_ready_dataset = prepare_dataset_for_dpo(raw_datasets)
from datasets import Dataset, DatasetDict
def prepare_dataset_for_dpo(raw_datasets):
formatted_data = []
for row in raw_datasets["train"]:
# Create a dictionary for each entry with structured fields
formatted_entry = {
"prompt": f'<|im_start|>system\\nYou are Qwen, created by Alibaba Cloud. You are a helpful assistant.<|im_end|>\\n<|im_start|>user\\n{row["prompt"]}<|im_end|>\\n<|im_start|>assistant\\n',
"chosen": f'{row["chosen"]}\n',
"rejected": f'{row["rejected"]}\n'
}
formatted_data.append(formatted_entry)
# Create a Hugging Face Dataset from the list of dictionaries
dpo_dataset = Dataset.from_list(formatted_data)
# Wrap it in a DatasetDict to match the format of raw_datasets
dpo_dataset = DatasetDict({"train": dpo_dataset})
return dpo_dataset
# Example usage
dpo_ready_dataset = prepare_dataset_for_dpo(raw_datasets)
We shall print a random item from the dataset
In [ ]:
Copied!
import pprint
row = dpo_ready_dataset["train"][0]
pprint.pprint(row["prompt"])
pprint.pprint(row["chosen"])
pprint.pprint(row["rejected"])
import pprint
row = dpo_ready_dataset["train"][0]
pprint.pprint(row["prompt"])
pprint.pprint(row["chosen"])
pprint.pprint(row["rejected"])
In [ ]:
Copied!
FastLanguageModel.for_inference(model) # Enable native 2x faster inference
inputs = tokenizer(
[
row["prompt"]+"\n"
], return_tensors = "pt").to("cuda")
from transformers import TextStreamer
text_streamer = TextStreamer(tokenizer)
_ = model.generate(**inputs, streamer = text_streamer, max_new_tokens = 128)
FastLanguageModel.for_inference(model) # Enable native 2x faster inference
inputs = tokenizer(
[
row["prompt"]+"\n"
], return_tensors = "pt").to("cuda")
from transformers import TextStreamer
text_streamer = TextStreamer(tokenizer)
_ = model.generate(**inputs, streamer = text_streamer, max_new_tokens = 128)
We now add LoRA adapters so we only need to update 1 to 10% of all parameters!
In [ ]:
Copied!
model = FastLanguageModel.get_peft_model(
model,
r = 64, # Choose any number > 0 ! Suggested 8, 16, 32, 64, 128
target_modules = ["q_proj", "k_proj", "v_proj", "o_proj",
"gate_proj", "up_proj", "down_proj",],
lora_alpha = 64,
lora_dropout = 0, # Currently only supports dropout = 0
bias = "none", # Currently only supports bias = "none"
# [NEW] "unsloth" uses 30% less VRAM, fits 2x larger batch sizes!
use_gradient_checkpointing = "unsloth", # True or "unsloth" for very long context
random_state = 3407,
use_rslora = False, # We support rank stabilized LoRA
loftq_config = None, # And LoftQ
)
model = FastLanguageModel.get_peft_model(
model,
r = 64, # Choose any number > 0 ! Suggested 8, 16, 32, 64, 128
target_modules = ["q_proj", "k_proj", "v_proj", "o_proj",
"gate_proj", "up_proj", "down_proj",],
lora_alpha = 64,
lora_dropout = 0, # Currently only supports dropout = 0
bias = "none", # Currently only supports bias = "none"
# [NEW] "unsloth" uses 30% less VRAM, fits 2x larger batch sizes!
use_gradient_checkpointing = "unsloth", # True or "unsloth" for very long context
random_state = 3407,
use_rslora = False, # We support rank stabilized LoRA
loftq_config = None, # And LoftQ
)
Train the DPO model¶
Now let's use Huggingface TRL's DPOTrainer
! More docs here: TRL DPO docs. We do 3 epochs on 0.5% of the dataset to speed things up.
In [ ]:
Copied!
# One must patch the DPO Trainer first!
from unsloth import PatchDPOTrainer
PatchDPOTrainer()
# One must patch the DPO Trainer first!
from unsloth import PatchDPOTrainer
PatchDPOTrainer()
In [ ]:
Copied!
from transformers import TrainingArguments
from trl import DPOTrainer, DPOConfig
from unsloth import is_bfloat16_supported
dpo_trainer = DPOTrainer(
model = model,
ref_model = None,
args = DPOConfig(
per_device_train_batch_size = 2,
gradient_accumulation_steps = 4,
warmup_ratio = 0.1,
num_train_epochs = 3,
learning_rate = 5e-6,
fp16 = not is_bfloat16_supported(),
bf16 = is_bfloat16_supported(),
logging_steps = 1,
optim = "adamw_8bit",
weight_decay = 0.0,
lr_scheduler_type = "cosine",
seed = 42,
output_dir = "outputs",
report_to = "none", # Use this for WandB etc
),
beta = 0.1,
train_dataset = dpo_ready_dataset["train"],
tokenizer = tokenizer,
max_length = 1024,
max_prompt_length = 512,
)
from transformers import TrainingArguments
from trl import DPOTrainer, DPOConfig
from unsloth import is_bfloat16_supported
dpo_trainer = DPOTrainer(
model = model,
ref_model = None,
args = DPOConfig(
per_device_train_batch_size = 2,
gradient_accumulation_steps = 4,
warmup_ratio = 0.1,
num_train_epochs = 3,
learning_rate = 5e-6,
fp16 = not is_bfloat16_supported(),
bf16 = is_bfloat16_supported(),
logging_steps = 1,
optim = "adamw_8bit",
weight_decay = 0.0,
lr_scheduler_type = "cosine",
seed = 42,
output_dir = "outputs",
report_to = "none", # Use this for WandB etc
),
beta = 0.1,
train_dataset = dpo_ready_dataset["train"],
tokenizer = tokenizer,
max_length = 1024,
max_prompt_length = 512,
)
In [ ]:
Copied!
dpo_trainer.train()
dpo_trainer.train()
In [ ]:
Copied!
In [ ]:
Copied!
FastLanguageModel.for_inference(model) # Enable native 2x faster inference
inputs = tokenizer(
[
"<|im_start|>system\nYou are Qwen, created by Alibaba Cloud. You are a helpful assistant.<|im_end|>\n<|im_start|>user\n이 과제에서는 뉴스 기사가 주어집니다. 여러분의 임무는 기사의 주요 주제가 세계, 스포츠, 비즈니스, 과학/기술과 관련이 있는 경우 기사를 '세계', '스포츠', '비즈니스', '과학/기술' 중 하나로 분류하는 것입니다. 주제가 확실하지 않은 경우 가장 가까운 옵션을 선택하세요. 텍스트의 URL은 [링크]로 대체되었습니다.예시: 인근 별 주변의 혜성, 소행성 및 행성(SPACE.com) SPACE.com - 혜성과 소행성이 있는 것으로 추정되는 인근 별이 행성의 본거지로 보입니다. 새로운 관측에 따르면 이 세계는 목성보다 작고 명왕성만큼 작을 수도 있습니다.예시 솔루션: 과학/기술예시 설명: 뉴스 기사의 주제는 과학(천문학)과 관련이 있습니다. 따라서 레이블은 'Sci/Tech'입니다.문제: 인도의 휴대폰 사용자가 유선 기반을 넘어섰다(FT.com) FT.com - 지난달 인도의 휴대폰 가입자 수는 인접한 중국만큼 속도와 규모가 둘째로 빠른 성장 궤도로 국내 고정 전화 기반을 뛰어넘었습니다.<|im_end|>\n<|im_start|>assistant\n"
], return_tensors = "pt").to("cuda")
from transformers import TextStreamer
text_streamer = TextStreamer(tokenizer)
_ = model.generate(**inputs, streamer = text_streamer, max_new_tokens = 256)
FastLanguageModel.for_inference(model) # Enable native 2x faster inference
inputs = tokenizer(
[
"<|im_start|>system\nYou are Qwen, created by Alibaba Cloud. You are a helpful assistant.<|im_end|>\n<|im_start|>user\n이 과제에서는 뉴스 기사가 주어집니다. 여러분의 임무는 기사의 주요 주제가 세계, 스포츠, 비즈니스, 과학/기술과 관련이 있는 경우 기사를 '세계', '스포츠', '비즈니스', '과학/기술' 중 하나로 분류하는 것입니다. 주제가 확실하지 않은 경우 가장 가까운 옵션을 선택하세요. 텍스트의 URL은 [링크]로 대체되었습니다.예시: 인근 별 주변의 혜성, 소행성 및 행성(SPACE.com) SPACE.com - 혜성과 소행성이 있는 것으로 추정되는 인근 별이 행성의 본거지로 보입니다. 새로운 관측에 따르면 이 세계는 목성보다 작고 명왕성만큼 작을 수도 있습니다.예시 솔루션: 과학/기술예시 설명: 뉴스 기사의 주제는 과학(천문학)과 관련이 있습니다. 따라서 레이블은 'Sci/Tech'입니다.문제: 인도의 휴대폰 사용자가 유선 기반을 넘어섰다(FT.com) FT.com - 지난달 인도의 휴대폰 가입자 수는 인접한 중국만큼 속도와 규모가 둘째로 빠른 성장 궤도로 국내 고정 전화 기반을 뛰어넘었습니다.<|im_end|>\n<|im_start|>assistant\n"
], return_tensors = "pt").to("cuda")
from transformers import TextStreamer
text_streamer = TextStreamer(tokenizer)
_ = model.generate(**inputs, streamer = text_streamer, max_new_tokens = 256)
In [ ]:
Copied!