mjschock's picture
Refactor app.py and update import paths in test_agent.py to improve code organization. Introduce new files for agent configuration, graph definition, and tools, enhancing the overall structure and functionality of the agent system.
43a2e87 unverified
raw
history blame
4.76 kB
import logging
import os
import uuid
from langgraph.types import Command
from services.graph import agent_graph
# Configure logging
logging.basicConfig(level=logging.INFO) # Default to INFO level
logger = logging.getLogger(__name__)
# Enable LiteLLM debug logging only if environment variable is set
import litellm
if os.getenv("LITELLM_DEBUG", "false").lower() == "true":
litellm.set_verbose = True
logger.setLevel(logging.DEBUG)
else:
litellm.set_verbose = False
logger.setLevel(logging.INFO)
class AgentRunner:
"""Runner class for the code agent."""
def __init__(self):
"""Initialize the agent runner with graph and tools."""
logger.info("Initializing AgentRunner")
self.graph = agent_graph
self.last_state = None # Store the last state for testing/debugging
self.thread_id = str(
uuid.uuid4()
) # Generate a unique thread_id for this runner
logger.info(f"Created AgentRunner with thread_id: {self.thread_id}")
def _extract_answer(self, state: dict) -> str:
"""Extract the answer from the state."""
if not state:
return None
# First try to get answer from direct answer field
if "answer" in state and state["answer"]:
logger.info(f"Found answer in direct field: {state['answer']}")
return state["answer"]
# Then try to get answer from messages
if "messages" in state and state["messages"]:
for msg in reversed(state["messages"]):
if hasattr(msg, "content") and msg.content:
logger.info(f"Found answer in message: {msg.content}")
return msg.content
return None
def __call__(self, input_data) -> str:
"""Process a question through the agent graph and return the answer.
Args:
input_data: Either a question string or a Command object for resuming
Returns:
str: The agent's response
"""
try:
config = {"configurable": {"thread_id": self.thread_id}}
logger.info(f"Using config: {config}")
if isinstance(input_data, str):
# Initial question
logger.info(f"Processing initial question: {input_data}")
initial_state = {
"question": input_data,
"messages": [],
"answer": None,
"step_logs": [],
"is_complete": False,
"step_count": 0,
# Initialize new memory fields
"context": {},
"memory_buffer": [],
"last_action": None,
"action_history": [],
"error_count": 0,
"success_count": 0,
}
logger.info(f"Initial state: {initial_state}")
# Use stream to get results
logger.info("Starting graph stream for initial question")
for chunk in self.graph.stream(initial_state, config):
logger.debug(f"Received chunk: {chunk}")
if isinstance(chunk, dict):
if "__interrupt__" in chunk:
logger.info("Detected interrupt in stream")
logger.info(f"Interrupt details: {chunk['__interrupt__']}")
# Let the graph handle the interrupt naturally
continue
answer = self._extract_answer(chunk)
if answer:
self.last_state = chunk
return answer
else:
logger.debug(f"Skipping chunk without answer: {chunk}")
else:
# Resuming from interrupt
logger.info(f"Resuming from interrupt with input: {input_data}")
for result in self.graph.stream(input_data, config):
logger.debug(f"Received resume result: {result}")
if isinstance(result, dict):
answer = self._extract_answer(result)
if answer:
self.last_state = result
return answer
else:
logger.debug(f"Skipping result without answer: {result}")
# If we get here, we didn't find an answer
logger.warning("No answer generated from stream")
return "No answer generated"
except Exception as e:
logger.error(f"Error processing input: {str(e)}")
raise