Spaces:
Running
Running
async def run_query(query: str): | |
trace_id = f"agent-run-{uuid.uuid4().hex}" | |
try: | |
with instrumentor.observe( | |
trace_id=trace_id, | |
session_id="web-agent-session", | |
user_id=ANON_USER_ID, | |
): | |
# Clear the queue before starting | |
while not stream_queue.empty(): | |
try: | |
stream_queue.get_nowait() | |
except: | |
pass | |
# Add initial messages to the queue | |
await stream_queue.put("🤔 Thinking about your question...\n\n") | |
# The key is to patch each individual tool function to capture its usage | |
original_functions = {} | |
# Store original functions and patch each tool | |
for tool in tools: | |
tool_name = tool.metadata.name | |
original_fn = tool.fn | |
original_functions[tool_name] = original_fn | |
# Create a wrapper function that will log the tool usage | |
def create_wrapper(orig_fn, tool_name): | |
async def wrapper(*args, **kwargs): | |
# Log tool usage | |
await stream_queue.put(f"🔧 Using tool: {tool_name}...\n") | |
# Call original function | |
if asyncio.iscoroutinefunction(orig_fn): | |
result = await orig_fn(*args, **kwargs) | |
else: | |
result = orig_fn(*args, **kwargs) | |
# Log result | |
await stream_queue.put(f"📊 Got result from {tool_name}\n") | |
return result | |
return wrapper | |
# Replace the function with our wrapped version | |
tool.fn = create_wrapper(original_fn, tool_name) | |
# Start the agent run | |
await stream_queue.put("🧠 Planning approach...\n\n") | |
task = asyncio.create_task(web_agent.run(query, ctx=ctx)) | |
# Stream updates while waiting for completion | |
while not task.done(): | |
try: | |
# Check if there's anything in the queue to yield | |
if not stream_queue.empty(): | |
chunk = await stream_queue.get() | |
yield chunk | |
else: | |
# Wait a bit and check again | |
await asyncio.sleep(0.1) | |
except Exception as e: | |
yield f"\n⚠️ Error during streaming: {str(e)}\n" | |
# Get the final result | |
try: | |
result = await task | |
final_response = result.response if isinstance(result.response, str) else str(result.response) | |
# Yield the final answer | |
yield f"\n\n✅ Final answer: {final_response}" | |
except Exception as e: | |
yield f"\n\n❌ Error getting final result: {str(e)}" | |
# Restore original functions | |
for tool in tools: | |
tool_name = tool.metadata.name | |
if tool_name in original_functions: | |
tool.fn = original_functions[tool_name] | |
except Exception as e: | |
yield f"❌ Error: {str(e)}" | |
finally: | |
instrumentor.flush() |