| import torch |
| import sys |
| import time |
| from architecture import SmallGPT |
| from tokenizers import Tokenizer |
|
|
| def load_tokenizer(path="smptokenizer/tokenizer.json"): |
| tokenizer = Tokenizer.from_file(path) |
| return tokenizer |
|
|
| def generate_text_streaming(model, tokenizer, start_text, device, max_length=64, temperature=1.0, max_new_tokens=20, repetition_penalty=1.2): |
| """ |
| Generates text token by token, yielding each new token. |
| """ |
| model.eval() |
| |
| |
| input_ids = tokenizer.encode(start_text).ids |
| generated_ids = [] |
|
|
| |
| print("Generated Sentence:") |
| print(start_text, end="", flush=True) |
|
|
| current_ids = input_ids |
| |
| with torch.no_grad(): |
| for _ in range(max_new_tokens): |
| |
| current_input = current_ids[-max_length+1:] if len(current_ids) >= max_length else current_ids |
| input_tensor = torch.tensor([current_input], dtype=torch.long, device=device) |
| |
| |
| logits = model(input_tensor) |
| |
| |
| next_token_logits = logits[0, -1, :] / temperature |
|
|
| |
| if repetition_penalty > 1.0: |
| for token_id in set(current_ids): |
| next_token_logits[token_id] /= repetition_penalty |
| |
| |
| probs = torch.softmax(next_token_logits, dim=-1) |
| next_token_id = torch.multinomial(probs, 1).item() |
| |
| |
| if next_token_id == tokenizer.token_to_id("<eos>"): |
| break |
| |
| generated_ids.append(next_token_id) |
| current_ids.append(next_token_id) |
| |
| |
| new_token = tokenizer.decode([next_token_id]) |
| yield new_token |
|
|
| def main(seed): |
| device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| print(f"Using device: {device}") |
|
|
| |
| tokenizer_path = "smptokenizer/tokenizer.json" |
| tokenizer = load_tokenizer(tokenizer_path) |
| vocab_size = tokenizer.get_vocab_size() |
| pad_id = tokenizer.token_to_id("<pad>") or 0 |
|
|
| |
| d_model = 256 |
| n_heads = 8 |
| n_layers = 6 |
| max_length = 172 |
|
|
| |
| model = SmallGPT( |
| vocab_size=vocab_size, |
| d_model=d_model, |
| n_heads=n_heads, |
| n_layers=n_layers, |
| max_length=max_length, |
| pad_idx=pad_id, |
| ).to(device) |
|
|
| |
| model_path = "models/pytorch_model.bin" |
| try: |
| model.load_state_dict(torch.load(model_path, map_location=device)) |
| model.eval() |
| print(f"Model loaded from {model_path}") |
| except FileNotFoundError: |
| print(f"Error: Model file not found at {model_path}") |
| print("Please ensure the model is trained and the path is correct.") |
| return |
|
|
| while True: |
| |
| torch.manual_seed(seed) |
| torch.cuda.manual_seed(seed) |
|
|
| start_text = input("Enter a starting word or phrase (or 'quit' to exit): ") |
| if start_text.lower() == 'quit': |
| break |
|
|
| if not start_text.strip(): |
| print("Please enter some text. We are using a random character as a starting point.") |
| start_text = str(time.time()) |
|
|
| print("Generating...") |
| |
| token_count = 0 |
| start_time = time.time() |
|
|
| for token in generate_text_streaming( |
| model=model, |
| tokenizer=tokenizer, |
| start_text=start_text, |
| device=device, |
| max_new_tokens=1000, |
| temperature=0.7, |
| max_length=max_length, |
| repetition_penalty=1.2 |
| ): |
| print(token, end="", flush=True) |
| token_count += 1 |
| |
| end_time = time.time() |
| elapsed_time = end_time - start_time |
| tokens_per_sec = token_count / elapsed_time if elapsed_time > 0 else 0 |
|
|
| print(f"\n\nPerformance: {tokens_per_sec:.2f} tokens/sec") |
| print("-" * 30) |
|
|
| if __name__ == "__main__": |
| seed = 42 |
| torch.manual_seed(seed) |
| torch.cuda.manual_seed(seed) |
| main(seed) |