mjschock's picture
Enhance AgentRunner and graph functionality by adding answer extraction logic and improving logging throughout the processing flow. Update the handling of interrupts and state management to ensure clarity in debug output. Refactor the should_continue function in graph.py to better manage completion states and improve user interaction.
218633c unverified
raw
history blame
5.38 kB
import logging
import os
import uuid
from langgraph.types import Command
from graph import agent_graph
# Configure logging
logging.basicConfig(level=logging.INFO) # Default to INFO level
logger = logging.getLogger(__name__)
# Enable LiteLLM debug logging only if environment variable is set
import litellm
if os.getenv("LITELLM_DEBUG", "false").lower() == "true":
litellm.set_verbose = True
logger.setLevel(logging.DEBUG)
else:
litellm.set_verbose = False
logger.setLevel(logging.INFO)
class AgentRunner:
"""Runner class for the code agent."""
def __init__(self):
"""Initialize the agent runner with graph and tools."""
logger.info("Initializing AgentRunner")
self.graph = agent_graph
self.last_state = None # Store the last state for testing/debugging
self.thread_id = str(
uuid.uuid4()
) # Generate a unique thread_id for this runner
logger.info(f"Created AgentRunner with thread_id: {self.thread_id}")
def _extract_answer(self, state: dict) -> str:
"""Extract the answer from the state."""
if not state:
return None
# First try to get answer from direct answer field
if "answer" in state and state["answer"]:
logger.info(f"Found answer in direct field: {state['answer']}")
return state["answer"]
# Then try to get answer from messages
if "messages" in state and state["messages"]:
for msg in reversed(state["messages"]):
if hasattr(msg, "content") and msg.content:
logger.info(f"Found answer in message: {msg.content}")
return msg.content
return None
def __call__(self, input_data) -> str:
"""Process a question through the agent graph and return the answer.
Args:
input_data: Either a question string or a Command object for resuming
Returns:
str: The agent's response
"""
try:
config = {"configurable": {"thread_id": self.thread_id}}
logger.info(f"Using config: {config}")
if isinstance(input_data, str):
# Initial question
logger.info(f"Processing initial question: {input_data}")
initial_state = {
"question": input_data,
"messages": [],
"answer": None,
"step_logs": [],
"is_complete": False,
"step_count": 0,
# Initialize new memory fields
"context": {},
"memory_buffer": [],
"last_action": None,
"action_history": [],
"error_count": 0,
"success_count": 0,
}
logger.info(f"Initial state: {initial_state}")
# Use stream to get interrupt information
logger.info("Starting graph stream for initial question")
for chunk in self.graph.stream(initial_state, config):
logger.debug(f"Received chunk: {chunk}")
if isinstance(chunk, dict):
if "__interrupt__" in chunk:
logger.info("Detected interrupt in stream")
logger.info(f"Interrupt details: {chunk['__interrupt__']}")
# If we hit an interrupt, resume with 'c'
logger.info("Resuming with 'c' command")
for result in self.graph.stream(
Command(resume="c"), config
):
logger.debug(f"Received resume result: {result}")
if isinstance(result, dict):
answer = self._extract_answer(result)
if answer:
self.last_state = result
return answer
else:
answer = self._extract_answer(chunk)
if answer:
self.last_state = chunk
return answer
else:
logger.debug(f"Skipping chunk without answer: {chunk}")
else:
# Resuming from interrupt
logger.info(f"Resuming from interrupt with input: {input_data}")
for result in self.graph.stream(input_data, config):
logger.debug(f"Received resume result: {result}")
if isinstance(result, dict):
answer = self._extract_answer(result)
if answer:
self.last_state = result
return answer
else:
logger.debug(f"Skipping result without answer: {result}")
# If we get here, we didn't find an answer
logger.warning("No answer generated from stream")
return "No answer generated"
except Exception as e:
logger.error(f"Error processing input: {str(e)}")
raise