|
|
|
import json, torch, torch.nn as nn, torch.nn.functional as F |
|
from transformers import AutoModelForCausalLM, AutoTokenizer |
|
from safetensors.torch import load_file |
|
from huggingface_hub import hf_hub_download |
|
|
|
class EarlyExitClassifier(nn.Module): |
|
def __init__(self, hidden_size, vocab_size, dtype=torch.float16, device=None): |
|
super().__init__() |
|
self.linear = nn.Linear(hidden_size, vocab_size, dtype=dtype) |
|
if device is not None: |
|
self.to(device) |
|
|
|
def forward(self, hidden_states): |
|
return self.linear(hidden_states) |
|
|
|
class EarlyExitModelWrapper(nn.Module): |
|
def __init__(self, model, confidence_threshold=0.9, num_layers_to_check=None, device=None, dtype=torch.float16): |
|
super().__init__() |
|
self.model = model |
|
self.config = model.config |
|
self.confidence_threshold = confidence_threshold |
|
nl = num_layers_to_check or len(model.model.layers) |
|
self.classifiers = nn.ModuleList([ |
|
EarlyExitClassifier(self.config.hidden_size, self.config.vocab_size, dtype=dtype, device=device) |
|
for _ in range(nl) |
|
]) |
|
self._device = device or next(model.parameters()).device |
|
self._dtype = dtype |
|
|
|
def forward(self, input_ids, attention_mask=None, **kwargs): |
|
outputs = self.model(input_ids=input_ids, attention_mask=attention_mask, output_hidden_states=True, **kwargs) |
|
hidden_states = outputs.hidden_states |
|
for layer_idx, classifier in enumerate(self.classifiers, start=1): |
|
h_state = hidden_states[layer_idx] |
|
logits = classifier(h_state[:, -1, :].to(self._dtype)) |
|
probs = F.softmax(logits, dim=-1) |
|
max_prob = probs.max(dim=-1)[0] |
|
if bool((max_prob >= self.confidence_threshold).item()): |
|
return {"logits": logits, "exit_layer": layer_idx, "hidden_states": h_state} |
|
|
|
final_hidden = self.model.model.norm(hidden_states[-1].to(self._dtype)) |
|
final_logits = self.model.lm_head(final_hidden[:, -1, :]) |
|
return {"logits": final_logits, "exit_layer": len(hidden_states)-1, "hidden_states": hidden_states[-1]} |
|
|
|
@torch.no_grad() |
|
def generate_with_early_exit(prompt, model, tokenizer, max_new_tokens=64, temperature=0.7, top_p=0.9, device=None): |
|
device = device or next(model.parameters()).device |
|
inputs = tokenizer(prompt, return_tensors="pt").to(device) |
|
input_ids = inputs["input_ids"] |
|
attention_mask = inputs.get("attention_mask", None) |
|
generated_ids = input_ids.clone() |
|
|
|
for _ in range(max_new_tokens): |
|
outputs = model(input_ids=generated_ids, attention_mask=attention_mask) |
|
logits = outputs["logits"] / temperature |
|
|
|
if top_p < 1.0: |
|
sorted_logits, sorted_indices = torch.sort(logits, dim=-1, descending=True) |
|
cumprobs = torch.cumsum(torch.softmax(sorted_logits, dim=-1), dim=-1) |
|
to_remove = cumprobs > top_p |
|
to_remove[:, 1:] = to_remove[:, :-1].clone() |
|
to_remove[:, 0] = 0 |
|
indices_to_remove = to_remove.scatter(1, sorted_indices, to_remove) |
|
logits[indices_to_remove] = float("-inf") |
|
|
|
probs = torch.softmax(logits, dim=-1) |
|
next_token_id = torch.multinomial(probs, num_samples=1) |
|
generated_ids = torch.cat([generated_ids, next_token_id], dim=-1) |
|
if attention_mask is not None: |
|
attention_mask = torch.cat([attention_mask, torch.ones_like(next_token_id)], dim=-1) |
|
if next_token_id.item() == tokenizer.eos_token_id: |
|
break |
|
return generated_ids |
|
|
|
def load_early_exit_from_hub(repo_id: str, device: str = None): |
|
""" |
|
Loads: |
|
- early_exit_config.json |
|
- early_exit_heads.safetensors |
|
and returns (wrapped_model, tokenizer). |
|
""" |
|
cfg_path = hf_hub_download(repo_id=repo_id, filename="early_exit_config.json") |
|
with open(cfg_path, "r") as f: |
|
cfg = json.load(f) |
|
|
|
base_id = cfg["base_model"] |
|
dtype = torch.float16 if cfg.get("dtype", "float16") == "float16" else torch.float32 |
|
device = device or ("mps" if torch.backends.mps.is_available() else ("cuda" if torch.cuda.is_available() else "cpu")) |
|
|
|
tokenizer = AutoTokenizer.from_pretrained(base_id) |
|
base_model = AutoModelForCausalLM.from_pretrained( |
|
base_id, |
|
torch_dtype=dtype, |
|
device_map={"": device} if device != "cpu" else None, |
|
) |
|
|
|
wrapped = EarlyExitModelWrapper( |
|
base_model, |
|
confidence_threshold=float(cfg["confidence_threshold"]), |
|
num_layers_to_check=int(cfg["num_layers_to_check"]), |
|
device=device, |
|
dtype=dtype, |
|
) |
|
|
|
heads_path = hf_hub_download(repo_id=repo_id, filename="early_exit_heads.safetensors") |
|
state = load_file(heads_path) |
|
wrapped.classifiers.load_state_dict(state, strict=True) |
|
|
|
return wrapped, tokenizer |
|
|