AttnTrace / src /models /Llama.py
SecureLLMSys's picture
init
f214f36
raw
history blame
2.84 kB
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from .Model import Model
import os
import signal
def handle_timeout(sig, frame):
raise TimeoutError('took too long')
signal.signal(signal.SIGALRM, handle_timeout)
class Llama(Model):
def __init__(self, config, device = "cuda:0"):
super().__init__(config)
self.max_output_tokens = int(config["params"]["max_output_tokens"])
api_pos = int(config["api_key_info"]["api_key_use"])
hf_token = config["api_key_info"]["api_keys"][api_pos]
if hf_token is None or len(hf_token) == 0:
hf_token = os.getenv("HF_TOKEN")
self.tokenizer = AutoTokenizer.from_pretrained(self.name, use_auth_token=hf_token)
self.model = AutoModelForCausalLM.from_pretrained(
self.name,
torch_dtype=torch.bfloat16,
device_map=device,
token=hf_token
)
self.terminators = [
self.tokenizer.eos_token_id,
self.tokenizer.convert_tokens_to_ids("<|eot_id|>")
]
torch.set_default_tensor_type(torch.cuda.HalfTensor)
def query(self, msg, max_tokens=128000):
messages = self.messages
messages[1]["content"] = msg
input_ids = self.tokenizer.apply_chat_template(
messages,
add_generation_prompt=True,
return_tensors="pt",
).to(self.model.device)
attention_mask = torch.ones(input_ids.shape, device=self.model.device)
try:
signal.alarm(60)
output_tokens = self.model.generate(
input_ids,
max_length=max_tokens,
attention_mask=attention_mask,
eos_token_id=self.terminators,
top_k=50,
do_sample=False
)
signal.alarm(0)
except TimeoutError as exc:
print("time out")
return("time out")
# Decode the generated tokens back to text
result = self.tokenizer.decode(output_tokens[0][input_ids.shape[-1]:], skip_special_tokens=True)
return result
def get_prompt_length(self,msg):
messages = self.messages
messages[1]["content"] = msg
input_ids = self.tokenizer.apply_chat_template(
messages,
add_generation_prompt=True,
return_tensors="pt"
).to(self.model.device)
return len(input_ids[0])
def cut_context(self,msg,max_length):
tokens = self.tokenizer.encode(msg, add_special_tokens=True)
# Truncate the tokens to a maximum length
truncated_tokens = tokens[:max_length]
# Decode the truncated tokens back to text
truncated_text = self.tokenizer.decode(truncated_tokens, skip_special_tokens=True)
return truncated_text