tonneli's picture
Delete history
f5776d3
import os
import json
# from peft import PeftConfig, PeftModel
# from transformers import AutoModelForSeq2SeqLM, AutoModelForCausalLM, AutoTokenizer, AutoConfig
from typing import Optional, Literal
from dsp.modules.lm import LM
# from dsp.modules.finetuning.finetune_hf import preprocess_prompt
from dsp.modules.cache_utils import CacheMemory, NotebookCacheMemory, cache_turn_on
import functools
def openai_to_hf(**kwargs):
hf_kwargs = {}
for k, v in kwargs.items():
if k == "n":
hf_kwargs["num_return_sequences"] = v
elif k == "frequency_penalty":
hf_kwargs["repetition_penalty"] = 1.0 - v
elif k == "presence_penalty":
hf_kwargs["diversity_penalty"] = v
elif k == "max_tokens":
hf_kwargs["max_new_tokens"] = v
elif k == "model":
pass
else:
hf_kwargs[k] = v
return hf_kwargs
class HFModel(LM):
def __init__(self, model: str, checkpoint: Optional[str] = None, is_client: bool = False,
hf_device_map: Literal["auto", "balanced", "balanced_low_0", "sequential"] = "auto"):
"""wrapper for Hugging Face models
Args:
model (str): HF model identifier to load and use
checkpoint (str, optional): load specific checkpoints of the model. Defaults to None.
is_client (bool, optional): whether to access models via client. Defaults to False.
hf_device_map (str, optional): HF config strategy to load the model.
Recommeded to use "auto", which will help loading large models using accelerate. Defaults to "auto".
"""
super().__init__(model)
self.provider = "hf"
self.is_client = is_client
self.device_map = hf_device_map
if not self.is_client:
try:
from transformers import AutoModelForSeq2SeqLM, AutoModelForCausalLM, AutoTokenizer, AutoConfig
import torch
except ImportError as exc:
raise ModuleNotFoundError(
"You need to install Hugging Face transformers library to use HF models."
) from exc
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
try:
architecture = AutoConfig.from_pretrained(model).__dict__["architectures"][0]
self.encoder_decoder_model = ("ConditionalGeneration" in architecture) or ("T5WithLMHeadModel" in architecture)
self.decoder_only_model = ("CausalLM" in architecture) or ("GPT2LMHeadModel" in architecture)
assert self.encoder_decoder_model or self.decoder_only_model, f"Unknown HuggingFace model class: {model}"
self.tokenizer = AutoTokenizer.from_pretrained(model if checkpoint is None else checkpoint)
self.rationale = True
AutoModelClass = AutoModelForSeq2SeqLM if self.encoder_decoder_model else AutoModelForCausalLM
if checkpoint:
# with open(os.path.join(checkpoint, '..', 'compiler_config.json'), 'r') as f:
# config = json.load(f)
self.rationale = False #config['rationale']
# if config['peft']:
# peft_config = PeftConfig.from_pretrained(checkpoint)
# self.model = AutoModelClass.from_pretrained(peft_config.base_model_name_or_path, return_dict=True, load_in_8bit=True, device_map=hf_device_map)
# self.model = PeftModel.from_pretrained(self.model, checkpoint)
# else:
self.model = AutoModelClass.from_pretrained(checkpoint).to(self.device)
else:
self.model = AutoModelClass.from_pretrained(model).to(self.device)
self.drop_prompt_from_output = False
except ValueError:
self.model = AutoModelForCausalLM.from_pretrained(
model if checkpoint is None else checkpoint,
device_map=hf_device_map
)
self.drop_prompt_from_output = True
self.tokenizer = AutoTokenizer.from_pretrained(model)
self.drop_prompt_from_output = True
self.history = []
def basic_request(self, prompt, **kwargs):
raw_kwargs = kwargs
kwargs = {**self.kwargs, **kwargs}
response = self._generate(prompt, **kwargs)
history = {
"prompt": prompt,
"response": response,
"kwargs": kwargs,
"raw_kwargs": raw_kwargs,
}
self.history.append(history)
return response
def _generate(self, prompt, **kwargs):
assert not self.is_client
# TODO: Add caching
kwargs = {**openai_to_hf(**self.kwargs), **openai_to_hf(**kwargs)}
# print(prompt)
if isinstance(prompt, dict):
try:
prompt = prompt['messages'][0]['content']
except (KeyError, IndexError, TypeError):
print("Failed to extract 'content' from the prompt.")
inputs = self.tokenizer(prompt, return_tensors="pt").to(self.device)
# print(kwargs)
outputs = self.model.generate(**inputs, **kwargs)
if self.drop_prompt_from_output:
input_length = inputs.input_ids.shape[1]
outputs = outputs[:, input_length:]
completions = [
{"text": c}
for c in self.tokenizer.batch_decode(outputs, skip_special_tokens=True)
]
response = {
"prompt": prompt,
"choices": completions,
}
return response
def __call__(self, prompt, only_completed=True, return_sorted=False, **kwargs):
assert only_completed, "for now"
assert return_sorted is False, "for now"
if kwargs.get("n", 1) > 1 or kwargs.get("temperature", 0.0) > 0.1:
kwargs["do_sample"] = True
response = self.request(prompt, **kwargs)
return [c["text"] for c in response["choices"]]
# @functools.lru_cache(maxsize=None if cache_turn_on else 0)
# @NotebookCacheMemory.cache
# def cached_generate(self, prompt, **kwargs):
# return self._generate(prompt, **kwargs)