Spaces:
Build error
Build error
import logging | |
import os | |
import uuid | |
from langgraph.types import Command | |
from services.graph import agent_graph | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) # Default to INFO level | |
logger = logging.getLogger(__name__) | |
# Enable LiteLLM debug logging only if environment variable is set | |
import litellm | |
if os.getenv("LITELLM_DEBUG", "false").lower() == "true": | |
litellm.set_verbose = True | |
logger.setLevel(logging.DEBUG) | |
else: | |
litellm.set_verbose = False | |
logger.setLevel(logging.INFO) | |
class AgentRunner: | |
"""Runner class for the code agent.""" | |
def __init__(self): | |
"""Initialize the agent runner with graph and tools.""" | |
logger.info("Initializing AgentRunner") | |
self.graph = agent_graph | |
self.last_state = None # Store the last state for testing/debugging | |
self.thread_id = str( | |
uuid.uuid4() | |
) # Generate a unique thread_id for this runner | |
logger.info(f"Created AgentRunner with thread_id: {self.thread_id}") | |
def _extract_answer(self, state: dict) -> str: | |
"""Extract the answer from the state.""" | |
if not state: | |
return None | |
# First try to get answer from direct answer field | |
if "answer" in state and state["answer"]: | |
logger.info(f"Found answer in direct field: {state['answer']}") | |
return state["answer"] | |
# Then try to get answer from messages | |
if "messages" in state and state["messages"]: | |
for msg in reversed(state["messages"]): | |
if hasattr(msg, "content") and msg.content: | |
logger.info(f"Found answer in message: {msg.content}") | |
return msg.content | |
return None | |
def __call__(self, input_data) -> str: | |
"""Process a question through the agent graph and return the answer. | |
Args: | |
input_data: Either a question string or a Command object for resuming | |
Returns: | |
str: The agent's response | |
""" | |
try: | |
config = {"configurable": {"thread_id": self.thread_id}} | |
logger.info(f"Using config: {config}") | |
if isinstance(input_data, str): | |
# Initial question | |
logger.info(f"Processing initial question: {input_data}") | |
initial_state = { | |
"question": input_data, | |
"messages": [], | |
"answer": None, | |
"step_logs": [], | |
"is_complete": False, | |
"step_count": 0, | |
# Initialize new memory fields | |
"context": {}, | |
"memory_buffer": [], | |
"last_action": None, | |
"action_history": [], | |
"error_count": 0, | |
"success_count": 0, | |
} | |
logger.info(f"Initial state: {initial_state}") | |
# Use stream to get results | |
logger.info("Starting graph stream for initial question") | |
for chunk in self.graph.stream(initial_state, config): | |
logger.debug(f"Received chunk: {chunk}") | |
if isinstance(chunk, dict): | |
if "__interrupt__" in chunk: | |
logger.info("Detected interrupt in stream") | |
logger.info(f"Interrupt details: {chunk['__interrupt__']}") | |
# Let the graph handle the interrupt naturally | |
continue | |
answer = self._extract_answer(chunk) | |
if answer: | |
self.last_state = chunk | |
return answer | |
else: | |
logger.debug(f"Skipping chunk without answer: {chunk}") | |
else: | |
# Resuming from interrupt | |
logger.info(f"Resuming from interrupt with input: {input_data}") | |
for result in self.graph.stream(input_data, config): | |
logger.debug(f"Received resume result: {result}") | |
if isinstance(result, dict): | |
answer = self._extract_answer(result) | |
if answer: | |
self.last_state = result | |
return answer | |
else: | |
logger.debug(f"Skipping result without answer: {result}") | |
# If we get here, we didn't find an answer | |
logger.warning("No answer generated from stream") | |
return "No answer generated" | |
except Exception as e: | |
logger.error(f"Error processing input: {str(e)}") | |
raise | |