In [None]:
! pip install transformers



In [2]:
! pip install profiling-decorator

Collecting profiling-decorator
 Downloading profiling_decorator-0.0.6-py3-none-any.whl.metadata (6.2 kB)
Downloading profiling_decorator-0.0.6-py3-none-any.whl (9.2 kB)
Installing collected packages: profiling-decorator
Successfully installed profiling-decorator-0.0.6


In [9]:
def test_updated_retool_implementation():
 # 1. Setup model, tokenizer, and device
 from transformers import AutoModelForCausalLM, AutoTokenizer
 import torch
 import transformers
 import re
 from transformers import AutoTokenizer, AutoModelForCausalLM, DynamicCache

 # Use a model that fits in memory
 model_name = "gpt2-medium"
 tokenizer = AutoTokenizer.from_pretrained(model_name)

 # Ensure padding token is set
 if tokenizer.pad_token is None:
 tokenizer.pad_token = tokenizer.eos_token

 # Check device
 device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
 print(f"Using device: {device}")

 # Load model to device
 model = AutoModelForCausalLM.from_pretrained(model_name).to(device)

 # 2. Add special tokens
 special_tokens = {
 'additional_special_tokens': ['', '', '', '']
 }
 tokenizer.add_special_tokens(special_tokens)
 model.resize_token_embeddings(len(tokenizer))

 # Get token IDs
 code_start_id = tokenizer.convert_tokens_to_ids('')
 code_end_id = tokenizer.convert_tokens_to_ids('')
 interpreter_start_id = tokenizer.convert_tokens_to_ids('')
 interpreter_end_id = tokenizer.convert_tokens_to_ids('')

 print(f"EOS token ID: {tokenizer.eos_token_id}")
 print(f"Pad token ID: {tokenizer.pad_token_id}")
 print(f"Code tokens: {code_start_id}, {code_end_id}")
 print(f"Interpreter tokens: {interpreter_start_id}, {interpreter_end_id}")

 # 3. Create a test version of your ReToolTrainer with custom generation
 class TestReToolTrainer:
 def __init__(self, model, tokenizer, device):
 self.model = model
 self.processing_class = tokenizer
 self.device = device
 self.temperature = 0.7
 self.top_p = 0.9
 self.top_k = 50

 # Ensure pad token is set
 if self.processing_class.pad_token is None:
 self.processing_class.pad_token = self.processing_class.eos_token

 def _execute_code(self, code_block):
 """Mock code execution"""
 print(f"\n==== EXECUTING CODE ====")
 print(f"{code_block}")
 print(f"========================\n")
 return "0 1 1 2 3"

 def _custom_generate(self, input_ids, attention_mask=None, past_key_values=None, max_new_tokens=50, eos_token_ids=None):
 """Custom generation function that avoids KV cache issues"""
 if attention_mask is None:
 attention_mask = torch.ones_like(input_ids)

 if eos_token_ids is None:
 eos_token_ids = [self.processing_class.eos_token_id]

 # Initialize
 current_ids = input_ids.clone()
 current_mask = attention_mask.clone()
 current_kv = past_key_values

 # Generate tokens in batches for efficiency
 all_tokens = []
 batch_size = 10 # Process this many tokens at once

 for start_idx in range(0, max_new_tokens, batch_size):
 # How many tokens to generate in this batch
 batch_tokens = min(batch_size, max_new_tokens - start_idx)

 # Accumulate new tokens
 new_tokens = []

 for _ in range(batch_tokens):
 # Forward pass with proper cache handling
 with torch.no_grad():
 outputs = self.model(
 input_ids=current_ids if current_kv is None else current_ids[:, -1:],
 attention_mask=current_mask if current_kv is None else current_mask[:, -1:],
 past_key_values=DynamicCache.from_legacy_cache(current_kv) if current_kv is not None else None,
 use_cache=True
 )

 # Sample next token
 next_token_logits = outputs.logits[:, -1, :] / self.temperature
 filtered_logits = self._filter_logits(next_token_logits)
 probs = torch.nn.functional.softmax(filtered_logits, dim=-1)
 next_token = torch.multinomial(probs, num_samples=1)

 # Add to accumulated tokens
 token_id = next_token.item()
 new_tokens.append(token_id)

 # Update for next iteration
 current_ids = torch.cat([current_ids, next_token], dim=1)
 token_mask = torch.ones((1, 1), device=current_mask.device, dtype=current_mask.dtype)
 current_mask = torch.cat([current_mask, token_mask], dim=1)
 current_kv = outputs.past_key_values

 # Check for stop tokens - include both EOS and code_end
 if token_id in eos_token_ids:
 break

 # Add batch tokens to overall result
 all_tokens.extend(new_tokens)

 # Check if we hit a stop token
 if len(new_tokens) < batch_tokens:
 break

 # Convert to tensor
 result = torch.tensor([all_tokens], device=input_ids.device)
 return result, current_kv

 def _filter_logits(self, logits):
 """Apply top-k and top-p filtering"""
 if self.top_k > 0:
 top_k_logits, top_k_indices = torch.topk(logits, self.top_k, dim=-1)
 logits[0, :] = torch.full_like(logits[0, :], float('-inf'))
 logits[0, top_k_indices[0]] = top_k_logits[0]

 if self.top_p < 1.0:
 sorted_logits, sorted_indices = torch.sort(logits, descending=True, dim=-1)
 cumulative_probs = torch.cumsum(torch.nn.functional.softmax(sorted_logits, dim=-1), dim=-1)

 # Remove tokens with cumulative probability above threshold
 sorted_indices_to_remove = cumulative_probs > self.top_p
 # Shift the indices to the right to keep the first token above threshold
 sorted_indices_to_remove[:, 1:] = sorted_indices_to_remove[:, :-1].clone()
 sorted_indices_to_remove[:, 0] = 0

 # Scatter sorted tensors to original indexing
 indices_to_remove = sorted_indices_to_remove.scatter(1, sorted_indices, sorted_indices_to_remove)
 logits[indices_to_remove] = float('-inf')

 return logits

 def _retool_generate_with_interpreter(self, prompt_ids_batch, attention_mask_batch, eos_id, interpreter_id, code_id, max_turns=10):
 """Your updated implementation with custom generation"""
 batch_size = prompt_ids_batch.size(0)
 batch_completion = []
 batch_interpreter_positions = []

 for i in range(batch_size):
 print(f"Processing batch item {i+1}/{batch_size}")

 # Initialize
 current_input_id = prompt_ids_batch[i:i+1]
 current_attention_mask = attention_mask_batch[i:i+1]
 current_kv = None

 # Track the completion part (no prompt)
 cumulative_completion_ids = torch.empty((1, 0), dtype=torch.long, device=prompt_ids_batch.device)
 interpreter_positions = []

 for turn_idx in range(max_turns):
 # Check if input is empty
 if current_input_id.size(1) == 0:
 print(f"Turn {turn_idx + 1}: Input is empty, breaking loop")
 break

 print(f"\n--- Turn {turn_idx + 1} ---")
 print(f"Current input: {self.processing_class.decode(current_input_id[0])}")
 print(f"KV cache present: {current_kv is not None}")

 # Generate with custom function
 newly_generated_tokens, current_kv = self._custom_generate(
 input_ids=current_input_id,
 attention_mask=current_attention_mask,
 past_key_values=current_kv,
 max_new_tokens=30,
 eos_token_ids=[eos_id, code_id[1]]
 )

 # Display generated text
 print(f"Generated: {self.processing_class.decode(newly_generated_tokens[0])}")

 # Add to cumulative completion
 cumulative_completion_ids = torch.cat([cumulative_completion_ids, newly_generated_tokens], dim=1)

 # Check last token
 last_token_id = newly_generated_tokens[0, -1].item() if newly_generated_tokens.size(1) > 0 else None
 print(f"Last token ID: {last_token_id}")

 # Check for end conditions
 if last_token_id == eos_id:
 print("Found EOS token, ending generation")
 break

 # Check for code end token
 if last_token_id == code_id[1]:
 print("Found token, executing code")

 # Extract code from the full text
 full_text = self.processing_class.decode(
 torch.cat([prompt_ids_batch[i], cumulative_completion_ids[0]], dim=0)
 )
 code_match = re.search(r'(.*?)', full_text, re.DOTALL)

 if code_match:
 code_block = code_match.group(1).strip()

 # Execute code
 interpreter_text = self._execute_code(code_block)

 # Format and add interpreter output
 formatted_feedback = f"{self.processing_class.decode(interpreter_id[0])}{interpreter_text}{self.processing_class.decode(interpreter_id[1])}"
 interpreter_ids = self.processing_class(
 formatted_feedback,
 return_tensors="pt",
 add_special_tokens=False
 ).input_ids.to(prompt_ids_batch.device)

 # Record positions
 interpreter_start_idx = cumulative_completion_ids.size(1)
 cumulative_completion_ids = torch.cat([cumulative_completion_ids, interpreter_ids], dim=1)
 interpreter_end_idx = cumulative_completion_ids.size(1) - 1
 interpreter_positions.append((interpreter_start_idx, interpreter_end_idx))

 print(f"Added interpreter output: {formatted_feedback}")

 # Set up for next turn
 current_input_id = interpreter_ids
 current_attention_mask = torch.ones_like(current_input_id)
 # Keep current_kv from previous generation
 else:
 print("No code block found despite token")
 break
 else:
 # Continue with the newly generated tokens
 current_input_id = newly_generated_tokens
 current_attention_mask = torch.ones_like(current_input_id)

 # Add to batch results
 batch_completion.append(cumulative_completion_ids.squeeze(0))
 batch_interpreter_positions.append(interpreter_positions)

 # Pad sequences
 if len(batch_completion) > 0:
 # Ensure padding_value is a valid integer
 padding_value = self.processing_class.pad_token_id
 if padding_value is None:
 padding_value = 0 # Use 0 as a default if pad_token_id is None

 padded_sequences = torch.nn.utils.rnn.pad_sequence(
 batch_completion,
 batch_first=True,
 padding_value=padding_value
 )
 else:
 padded_sequences = torch.empty((0, 0), dtype=torch.long, device=prompt_ids_batch.device)

 return padded_sequences, batch_interpreter_positions

 # 4. Create test instance
 tester = TestReToolTrainer(model, tokenizer, device)

 # 5. Create a test prompt with a complete code block
 prompt = """Let me solve this problem with code:


def fibonacci(n):
 a, b = 0, 1
 result = []
 for _ in range(n):
 result.append(a)
 a, b = b, a + b
 return result

print(fibonacci(5))
"""

 # 6. Run the test
 try:
 print("\n=== Testing Updated ReTool Implementation ===\n")

 # Encode the prompt
 prompt_ids = tokenizer.encode(prompt, return_tensors="pt").to(device)
 attention_mask = torch.ones_like(prompt_ids)

 # Run the generation
 completions, positions = tester._retool_generate_with_interpreter(
 prompt_ids_batch=prompt_ids,
 attention_mask_batch=attention_mask,
 eos_id=tokenizer.eos_token_id,
 interpreter_id=[interpreter_start_id, interpreter_end_id],
 code_id=[code_start_id, code_end_id],
 max_turns=3
 )

 # Display results
 print("\n=== Final Results ===\n")
 print("Generated completion:")
 print(tokenizer.decode(completions[0]))

 print("\nFull text:")
 print(tokenizer.decode(torch.cat([prompt_ids[0], completions[0]])))

 print("\nInterpreter positions:", positions)

 except Exception as e:
 import traceback
 print(f"Error during testing: {e}")
 traceback.print_exc()

# Run the test
test_updated_retool_implementation()

Using device: cpu
EOS token ID: 50256
Pad token ID: 50256
Code tokens: 50257, 50258
Interpreter tokens: 50259, 50260

=== Testing Updated ReTool Implementation ===

Processing batch item 1/1

--- Turn 1 ---
Current input: Let me solve this problem with code:


def fibonacci(n):
 a, b = 0, 1
 result = []
 for _ in range(n):
 result.append(a)
 a, b = b, a + b
 return result

print(fibonacci(5))

KV cache present: False
Generated: 

def fibonacci(n):

 a, b = 0, 1

 result = []

Last token ID: 198

--- Turn 2 ---
Current input: 

def fibonacci(n):

 a, b = 0, 1

 result = []

KV cache present: True
Generated: 
 a, b = b, a + b

 return result

print(fibon
Last token ID: 261

--- Turn 3 ---
Current input: 
 a, b = b, a + b

 return result

print(fibon
KV cache present: True
Generated: acci(5))

So the first two methods are all the same, the last one is a little different, and the second one is the
Last token ID: 262

=== Final Results ===

Generated completion:


def fibonacci(n):

 a, b =

In [25]:
def test_retool_core_functionality():
 # 1. Create minimal model and tokenizer
 from transformers import AutoModelForCausalLM, AutoTokenizer
 import torch
 import transformers
 import re

 # Use a small model for testing
 model_name = "gpt2-medium"
 tokenizer = AutoTokenizer.from_pretrained(model_name)

 # Check if CUDA is available
 device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
 print(f"Using device: {device}")

 # Load model directly to the selected device
 model = AutoModelForCausalLM.from_pretrained(model_name).to(device)

 # 2. Add special tokens to the tokenizer
 special_tokens = {
 'additional_special_tokens': ['', '', '', '']
 }
 tokenizer.add_special_tokens(special_tokens)
 model.resize_token_embeddings(len(tokenizer))

 # Get token IDs for special tokens
 code_start_id = tokenizer.convert_tokens_to_ids('')
 code_end_id = tokenizer.convert_tokens_to_ids('')
 interpreter_start_id = tokenizer.convert_tokens_to_ids('')
 interpreter_end_id = tokenizer.convert_tokens_to_ids('')

 print(f"Code tokens: {code_start_id}, {code_end_id}")
 print(f"Interpreter tokens: {interpreter_start_id}, {interpreter_end_id}")

 # 3. Create a simplified implementation of _retool_generate_with_interpreter
 def simplified_generate_with_interpreter(model, tokenizer, prompt_text, device):
 """Simplified version focusing just on the core functionality"""
 # Step 1: Tokenize the prompt
 prompt_ids = tokenizer.encode(prompt_text, return_tensors="pt").to(device)

 # Initialize tracking variables
 cumulative_completion_ids = torch.empty((1, 0), dtype=torch.long, device=device)
 interpreter_positions = []

 # Step 2: Extract a code block and execute it
 full_text = prompt_text
 code_match = re.search(r'(.*?)', full_text, re.DOTALL)

 if code_match:
 code_block = code_match.group(1).strip()
 print(f"Found code block: {code_block}")

 # Mock code execution
 interpreter_output = "0 1 1 2 3"
 print(f"Code execution result: {interpreter_output}")

 # Format interpreter feedback
 interpreter_text = f"{interpreter_output}"
 interpreter_ids = tokenizer.encode(
 interpreter_text,
 return_tensors="pt",
 add_special_tokens=False
 ).to(device)

 # Step 3: Generate a continuation after the interpreter output
 # First, create a sequence with prompt + interpreter output
 combined_input = prompt_text + interpreter_text
 combined_ids = tokenizer.encode(combined_input, return_tensors="pt").to(device)

 # Generate continuation
 with torch.no_grad():
 continuation_outputs = model.generate(
 input_ids=combined_ids,
 max_new_tokens=50,
 do_sample=True,
 temperature=0.7,
 top_p=0.9,
 pad_token_id=tokenizer.pad_token_id,
 eos_token_id=tokenizer.eos_token_id,
 return_dict_in_generate=True,
 cache_implementation= 'offloaded',
 )

 # Extract only the newly generated tokens
 continuation_tokens = continuation_outputs.sequences[:, combined_ids.size(1):]

 # Combine everything for the final result
 # The completion consists of: interpreter output + continuation
 cumulative_completion_ids = torch.cat([interpreter_ids, continuation_tokens], dim=1)

 # Record the interpreter position
 interpreter_positions.append((0, interpreter_ids.size(1) - 1))

 print(f"Generated continuation: {tokenizer.decode(continuation_tokens[0])}")
 else:
 print("No code block found in the prompt.")

 return cumulative_completion_ids, interpreter_positions

 # 4. Test with a prompt that has a complete code block
 prompt = """Let's calculate Fibonacci numbers in Python:


def fibonacci(n):
 a, b = 0, 1
 result = []
 for _ in range(n):
 result.append(a)
 a, b = b, a + b
 return result

print(fibonacci(5))
"""

 # 5. Run our simplified test
 try:
 print("\n--- Testing Core Functionality ---")
 completion, positions = simplified_generate_with_interpreter(model, tokenizer, prompt, device)

 print("\n--- Final Results ---")
 print("Completion:")
 print(tokenizer.decode(completion[0]))
 print("\nInterpreter positions:", positions)

 # 6. Now also test ReToolTrainer to verify core code execution functionality
 print("\n--- Testing ReToolTrainer with Direct Injection ---")

 # Setup trainer
 trainer = ReToolTrainer(
 model=model,
 processing_class=tokenizer,
 args=transformers.TrainingArguments(
 output_dir="./test_output",
 per_device_train_batch_size=1,
 ),
 train_dataset=None,
 eval_dataset=None,
 max_turns=3,
 interpreter_id=[interpreter_start_id, interpreter_end_id],
 code_id=[code_start_id, code_end_id],
 eos_id=tokenizer.eos_token_id
 )

 # Override the _execute_code method
 def mock_execute_code(self, code_block):
 print(f"Mock executing code: {code_block}")
 return "0 1 1 2 3"

 original_execute_code = trainer._execute_code
 trainer._execute_code = mock_execute_code.__get__(trainer, ReToolTrainer)

 # Create a sequence that has a prompt and ends with 
 # This is to simulate that the model has generated a complete code block
 prompt_ids = tokenizer.encode(prompt, return_tensors="pt").to(device)
 attention_mask = torch.ones_like(prompt_ids)

 # Directly inject a simulated generation with a code block
 # Create a custom testing function
 def test_execute_code_and_continue(self, prompt_ids, attention_mask):
 """Test just the code execution and continuation part"""
 print("Testing code execution and continuation...")
 device = next(self.model.parameters()).device
 prompt_ids = prompt_ids.to(device)
 attention_mask = attention_mask.to(device)

 # Extract code from the prompt
 full_text = self.processing_class.decode(prompt_ids[0])
 code_match = re.search(r'(.*?)', full_text, re.DOTALL)

 if not code_match:
 print("No code block found in the prompt!")
 return None, []

 code_block = code_match.group(1).strip()
 print(f"Executing code block: {code_block}")

 # Execute the code
 interpreter_text = self._execute_code(code_block)

 # Format and tokenize the interpreter output
 formatted_feedback = f"{self.processing_class.decode(self.interpreter_id[0])}{interpreter_text}{self.processing_class.decode(self.interpreter_id[1])}"
 interpreter_ids = self.processing_class(
 formatted_feedback,
 return_tensors="pt",
 add_special_tokens=False
 ).input_ids.to(device)

 # Record position (relative to completion only)
 interpreter_positions = [(0, interpreter_ids.size(1) - 1)]

 # Combine prompt with interpreter output for continuation
 combined_ids = torch.cat([prompt_ids, interpreter_ids], dim=1)
 combined_mask = torch.ones_like(combined_ids)

 # Generate continuation
 continuation_outputs = self.model.generate(
 input_ids=combined_ids,
 attention_mask=combined_mask,
 max_new_tokens=50,
 do_sample=True,
 temperature=0.7,
 top_p=0.9,
 pad_token_id=self.processing_class.pad_token_id,
 eos_token_id=self.eos_id,
 return_dict_in_generate=True,
 cache_implementation= 'offloaded',
 )

 # Extract only the newly generated continuation
 continuation_tokens = continuation_outputs.sequences[:, combined_ids.size(1):]

 # Full completion is: interpreter output + continuation
 completion = torch.cat([interpreter_ids, continuation_tokens], dim=1)

 return completion, interpreter_positions

 # Add the test method to the trainer
 trainer.test_execute_code_and_continue = test_execute_code_and_continue.__get__(trainer, ReToolTrainer)

 # Run the test
 completion, positions = trainer.test_execute_code_and_continue(prompt_ids, attention_mask)

 print("\n--- Trainer Test Results ---")
 if completion is not None:
 print("Completion:")
 print(tokenizer.decode(completion[0]))
 print("\nInterpreter positions:", positions)

 except Exception as e:
 import traceback
 print(f"Error during testing: {e}")
 traceback.print_exc()
 finally:
 # Restore original method if needed
 if 'trainer' in locals() and 'original_execute_code' in locals():
 trainer._execute_code = original_execute_code

# Run the test
test_retool_core_functionality()

Using device: cuda


The attention mask and the pad token id were not set. As a consequence, you may observe unexpected behavior. Please pass your input's `attention_mask` to obtain reliable results.
Setting `pad_token_id` to `eos_token_id`:50256 for open-end generation.


Code tokens: 50257, 50258
Interpreter tokens: 50259, 50260

--- Testing Core Functionality ---
Found code block: def fibonacci(n):
 a, b = 0, 1
 result = []
 for _ in range(n):
 result.append(a)
 a, b = b, a + b
 return result

print(fibonacci(5))
Code execution result: 0 1 1 2 3
Generated continuation: 0 1 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48

--- Final Results ---
Completion:
0 1 1 2 30 1 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48

Interpreter positions: [(0, 6)]

--- Testing ReToolTrainer with Direct Injection ---
Testing code execution and continuation...
Executing code block: def fibonacci(n):
 a, b = 0, 1
 result = []
 for _ in range(n):
 result.append(a)
 a, b = b, a + b
 return result

print(fibonacci(5))
Mock executing code: def fibonacci(n):
 a, b = 0, 1
 result = []
 for _ in range(n):
 r

 super().__init__(



--- Trainer Test Results ---
Completion:
0 1 1 2 31 1 1 2 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3

Interpreter positions: [(0, 6)]


In [3]:
from transformers import AutoTokenizer, AutoModelForCausalLM, DynamicCache

def test_direct_kv_cache_usage():
 # 1. Setup model, tokenizer, and device
 from transformers import AutoModelForCausalLM, AutoTokenizer
 import torch

 # Use a model that fits in memory
 model_name = "gpt2-medium"
 tokenizer = AutoTokenizer.from_pretrained(model_name)

 # Check device
 device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
 print(f"Using device: {device}")

 # Load model to device
 model = AutoModelForCausalLM.from_pretrained(model_name).to(device)

 # 2. Manual token-by-token generation with KV caching
 def generate_with_manual_kv_cache(input_ids, num_tokens=20):
 """Generate tokens one by one with manual KV cache management"""
 current_ids = input_ids.clone()
 past_key_values = None

 generated_tokens = []

 for _ in range(num_tokens):
 # Forward pass with past_key_values
 with torch.no_grad():
 outputs = model(
 input_ids=current_ids if past_key_values is None else current_ids[:, -1:],
 #past_key_values=past_key_values,
 past_key_values= DynamicCache.from_legacy_cache(past_key_values),
 use_cache=True
 )

 # Get logits for the next token (last position)
 next_token_logits = outputs.logits[:, -1, :]

 # Sample from the distribution
 probs = torch.nn.functional.softmax(next_token_logits / 0.7, dim=-1)
 next_token = torch.multinomial(probs, num_samples=1)

 # Add to generated tokens
 generated_tokens.append(next_token.item())

 # Update current_ids for next iteration
 current_ids = torch.cat([current_ids, next_token], dim=1)

 # Update past_key_values
 past_key_values = outputs.past_key_values
 #print('after generation, past_key_values ', past_key_values)

 return generated_tokens, past_key_values

 # 3. Run multi-turn generation with manual KV cache
 def run_manual_multi_turn_generation():
 # Start with a prompt
 prompt = "Once upon a time, in a magical forest, there lived a"

 # Tokenize the prompt
 prompt_ids = tokenizer.encode(prompt, return_tensors="pt").to(device)

 # Initialize tracking
 full_text = prompt
 current_ids = prompt_ids
 past_kv = None

 # Generate in multiple turns
 for turn_idx in range(3): # 3 turns
 print(f"\n==== Turn {turn_idx + 1} ====")
 print(f"Current input: {tokenizer.decode(current_ids[0])}")
 print(f"KV cache present: {past_kv is not None}")

 # Pause for inspection
 input("Press Enter to generate next part...")

 # Generate new tokens manually
 new_token_ids, past_kv = generate_with_manual_kv_cache(current_ids, num_tokens=20)

 # Decode and display new tokens
 new_text = tokenizer.decode(new_token_ids)
 print(f"Generated: {new_text}")

 # Accumulate
 full_text += new_text

 # Now inject a custom continuation
 custom_text = " Suddenly, a rainbow appeared in the sky!"
 custom_ids = tokenizer.encode(custom_text, return_tensors="pt").to(device)

 print(f"\n==== Injecting custom text: {custom_text} ====")

 # Update tracking
 full_text += custom_text

 # Prepare for next turn - start with the custom text
 current_ids = custom_ids
 # Keep past_kv from previous generation

 print(f"Full text so far: {full_text}")
 print("-" * 50)

 print("\n==== Final Story ====")
 print(full_text)

 return full_text

 # 4. Run the test
 try:
 print("\n=== Testing Manual KV Cache Usage ===\n")

 story = run_manual_multi_turn_generation()

 print("\n=== Test Complete ===")

 except Exception as e:
 import traceback
 print(f"Error during testing: {e}")
 traceback.print_exc()

# Run the test
test_direct_kv_cache_usage()

Using device: cuda

=== Testing Manual KV Cache Usage ===


==== Turn 1 ====
Current input: Once upon a time, in a magical forest, there lived a
KV cache present: False
Press Enter to generate next part...
Generated: wizard and a witch. One day, they were attacked by a flying beast and the wizard fled to

==== Injecting custom text: Suddenly, a rainbow appeared in the sky! ====
Full text so far: Once upon a time, in a magical forest, there lived a wizard and a witch. One day, they were attacked by a flying beast and the wizard fled to Suddenly, a rainbow appeared in the sky!
--------------------------------------------------

==== Turn 2 ====
Current input: Suddenly, a rainbow appeared in the sky!
KV cache present: True
Press Enter to generate next part...
Generated: Although it was a little less than a hundred meters wide, it was enough to cover the entire sky

==== Injecting custom text: Suddenly, a rainbow appeared in the sky! ====
Full text so far: Once upon a time, in a magical fo

In [4]:
def test_retool_with_working_kv_cache():
 # 1. Setup model, tokenizer, and device
 from transformers import AutoModelForCausalLM, AutoTokenizer
 import torch
 import re

 # Use a model that fits in memory
 model_name = "gpt2-medium"
 tokenizer = AutoTokenizer.from_pretrained(model_name)

 # Check device
 device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
 print(f"Using device: {device}")

 # Load model to device
 model = AutoModelForCausalLM.from_pretrained(model_name).to(device)

 # 2. Add special tokens
 special_tokens = {
 'additional_special_tokens': ['', '', '', '']
 }
 tokenizer.add_special_tokens(special_tokens)
 model.resize_token_embeddings(len(tokenizer))

 # Get token IDs
 code_start_id = tokenizer.convert_tokens_to_ids('')
 code_end_id = tokenizer.convert_tokens_to_ids('')
 interpreter_start_id = tokenizer.convert_tokens_to_ids('')
 interpreter_end_id = tokenizer.convert_tokens_to_ids('')

 print(f"EOS token ID: {tokenizer.eos_token_id}")
 print(f"Code tokens: {code_start_id}, {code_end_id}")
 print(f"Interpreter tokens: {interpreter_start_id}, {interpreter_end_id}")

 # 3. Manual token generation with KV caching
 def generate_with_manual_kv_cache(input_ids, past_key_values=None, max_tokens=20, stop_ids=None):
 """Generate tokens with KV cache until a stop token or max_tokens is reached"""
 if stop_ids is None:
 stop_ids = [tokenizer.eos_token_id]

 current_ids = input_ids.clone()
 generated_tokens = []

 for _ in range(max_tokens):
 # Forward pass with past_key_values
 with torch.no_grad():
 outputs = model(
 input_ids=current_ids if past_key_values is None else current_ids[:, -1:],
 past_key_values=past_key_values,
 use_cache=True
 )

 # Get logits for the next token
 next_token_logits = outputs.logits[:, -1, :]

 # Sample from the distribution
 probs = torch.nn.functional.softmax(next_token_logits / 0.7, dim=-1)
 next_token = torch.multinomial(probs, num_samples=1)

 # Get the token ID
 token_id = next_token.item()

 # Add to generated tokens
 generated_tokens.append(token_id)

 # Update current_ids for next iteration
 current_ids = torch.cat([current_ids, next_token], dim=1)

 # Update past_key_values
 past_key_values = outputs.past_key_values

 # Check if we hit a stop token
 if token_id in stop_ids:
 break

 # Convert list of token IDs to tensor
 result_tensor = torch.tensor([generated_tokens], device=device)
 return result_tensor, past_key_values

 # 4. ReTool simulation with working KV cache
 def simulate_retool_with_working_kv_cache(prompt, max_turns=3):
 """Simulate the ReTool process with working KV cache"""
 # Tokenize the prompt
 prompt_ids = tokenizer.encode(prompt, return_tensors="pt").to(device)

 # Initialize tracking
 full_sequence = prompt_ids.clone()
 completion = torch.empty((1, 0), dtype=torch.long, device=device)
 interpreter_positions = []

 # Keep the KV cache from previous turns
 past_kv = None

 for turn_idx in range(max_turns):
 print(f"\n==== Turn {turn_idx + 1} ====")

 # Determine what to generate from
 if turn_idx == 0:
 # First turn - generate from the prompt
 current_input = full_sequence
 print(f"Generating from prompt: {tokenizer.decode(current_input[0])}")
 else:
 # Later turns - might be generating from interpreter output
 current_input = full_sequence[:, -20:] if full_sequence.size(1) > 20 else full_sequence
 print(f"Generating from: {tokenizer.decode(current_input[0])}")

 # Generate with manual KV cache
 new_tokens, past_kv = generate_with_manual_kv_cache(
 current_input,
 past_key_values=past_kv,
 max_tokens=30,
 stop_ids=[tokenizer.eos_token_id, code_end_id]
 )

 # Decode and display
 new_text = tokenizer.decode(new_tokens[0])
 print(f"Generated: {new_text}")

 # Update tracking
 full_sequence = torch.cat([full_sequence, new_tokens], dim=1)
 completion = torch.cat([completion, new_tokens], dim=1)

 # Check for code blocks
 full_text = tokenizer.decode(full_sequence[0])
 code_blocks = re.findall(r'(.*?)', full_text, re.DOTALL)

 # Pause for inspection
 input("Press Enter to continue...")

 if code_blocks and code_end_id in new_tokens[0]:
 print("\n==== Found code block! ====")
 # Get the last code block
 code_block = code_blocks[-1].strip()
 print(f"Code block: {code_block}")

 # Mock code execution
 print("\n==== Executing code ====")
 interpreter_output = "0 1 1 2 3"
 print(f"Execution result: {interpreter_output}")

 # Format interpreter feedback
 interpreter_text = f"{interpreter_output}"
 interpreter_ids = tokenizer.encode(
 interpreter_text,
 return_tensors="pt",
 add_special_tokens=False
 ).to(device)

 # Record positions
 start_idx = completion.size(1)
 completion = torch.cat([completion, interpreter_ids], dim=1)
 end_idx = completion.size(1) - 1
 interpreter_positions.append((start_idx, end_idx))

 # Add to full sequence
 full_sequence = torch.cat([full_sequence, interpreter_ids], dim=1)
 print(f"Added interpreter output: {interpreter_text}")

 # We're still using the same past_kv for the next turn
 # The next input will be the interpreter output
 elif tokenizer.eos_token_id in new_tokens[0]:
 print("Found EOS token, ending generation")
 break

 return completion, interpreter_positions

 # 5. Test with a prompt containing a code block
 prompt = """Let me solve this problem with code:


def fibonacci(n):
 a, b = 0, 1
 result = []
 for _ in range(n):
 result.append(a)
 a, b = b, a + b
 return result

print(fibonacci(5))
"""

 # 6. Run the test
 try:
 print("\n=== Testing ReTool with Working KV Cache ===\n")

 completion, positions = simulate_retool_with_working_kv_cache(prompt)

 print("\n=== Final Results ===\n")
 print("Generated completion:")
 print(tokenizer.decode(completion[0]))

 print("\nFull text:")
 print(tokenizer.decode(torch.cat([tokenizer.encode(prompt, return_tensors="pt")[0].to(device), completion[0]])))

 print("\nInterpreter positions:", positions)

 except Exception as e:
 import traceback
 print(f"Error during testing: {e}")
 traceback.print_exc()

# Run the test
test_retool_with_working_kv_cache()

Using device: cuda


The new embeddings will be initialized from a multivariate normal distribution that has old embeddings' mean and covariance. As described in this article: https://nlp.stanford.edu/~johnhew/vocab-expansion.html. To disable this, use `mean_resizing=False`


EOS token ID: 50256
Code tokens: 50257, 50258
Interpreter tokens: 50259, 50260

=== Testing ReTool with Working KV Cache ===


==== Turn 1 ====
Generating from prompt: Let me solve this problem with code:


def fibonacci(n):
 a, b = 0, 1
 result = []
 for _ in range(n):
 result.append(a)
 a, b = b, a + b
 return result

print(fibonacci(5))

Generated: 
def fibonacci(n):

 a, b = 0, 1

 result = [0,
Press Enter to continue...

==== Turn 2 ====
Generating from: a, b = 0, 1

 result = [0,
Generated: 0, 0, 1]

 a, b = b, a + b


ret = [0,
Press Enter to continue...

==== Turn 3 ====
Generating from: a, b = b, a + b


ret = [0,
Generated: 1, 1, 1]

for i,j in enumerate(n, fibonacci(n-1, 1-f
Press Enter to continue...

=== Final Results ===

Generated completion:

def fibonacci(n):

 a, b = 0, 1

 result = [0, 0, 0, 1]

 a, b = b, a + b


ret = [0, 1, 1, 1]

for i,j in enumerate(n, fibonacci(n-1, 1-f

Full text:
Let me solve this problem with code:


def fibonacci(n):
 a, b = 0, 1
 result = 

**1. Clear CUDA Cache:**

This is often the first thing to try when you get a CUDA OOM error.

In [18]:
import torch

if torch.cuda.is_available():
 torch.cuda.empty_cache()
 print("CUDA cache cleared!")
else:
 print("CUDA not available, no cache to clear.")

CUDA cache cleared!


**2. Delete Large Variables and Run Garbage Collection:**

Identify variables holding large objects (like models, tensors, dataframes) that you don't need anymore and delete them. Then explicitly run garbage collection.

In [19]:
# Example: if you have a large model or tensor named 'model' or 'data'
# del model
# del data

import gc
gc.collect()

print("Garbage collection complete.")

Garbage collection complete.


**3. Restart Runtime:**

If the above steps don't work, restarting the runtime is the most drastic but often most effective way to clear all memory. Go to the Colab menu: `Runtime` -> `Restart runtime`.