perspicacity / app.py
fdaudens's picture
fdaudens HF Staff
Update app.py
c266c49 verified
raw
history blame
3.54 kB
async def run_query(query: str):
trace_id = f"agent-run-{uuid.uuid4().hex}"
try:
with instrumentor.observe(
trace_id=trace_id,
session_id="web-agent-session",
user_id=ANON_USER_ID,
):
# Clear the queue before starting
while not stream_queue.empty():
try:
stream_queue.get_nowait()
except:
pass
# Add initial messages to the queue
await stream_queue.put("🤔 Thinking about your question...\n\n")
# The key is to patch each individual tool function to capture its usage
original_functions = {}
# Store original functions and patch each tool
for tool in tools:
tool_name = tool.metadata.name
original_fn = tool.fn
original_functions[tool_name] = original_fn
# Create a wrapper function that will log the tool usage
def create_wrapper(orig_fn, tool_name):
async def wrapper(*args, **kwargs):
# Log tool usage
await stream_queue.put(f"🔧 Using tool: {tool_name}...\n")
# Call original function
if asyncio.iscoroutinefunction(orig_fn):
result = await orig_fn(*args, **kwargs)
else:
result = orig_fn(*args, **kwargs)
# Log result
await stream_queue.put(f"📊 Got result from {tool_name}\n")
return result
return wrapper
# Replace the function with our wrapped version
tool.fn = create_wrapper(original_fn, tool_name)
# Start the agent run
await stream_queue.put("🧠 Planning approach...\n\n")
task = asyncio.create_task(web_agent.run(query, ctx=ctx))
# Stream updates while waiting for completion
while not task.done():
try:
# Check if there's anything in the queue to yield
if not stream_queue.empty():
chunk = await stream_queue.get()
yield chunk
else:
# Wait a bit and check again
await asyncio.sleep(0.1)
except Exception as e:
yield f"\n⚠️ Error during streaming: {str(e)}\n"
# Get the final result
try:
result = await task
final_response = result.response if isinstance(result.response, str) else str(result.response)
# Yield the final answer
yield f"\n\n✅ Final answer: {final_response}"
except Exception as e:
yield f"\n\n❌ Error getting final result: {str(e)}"
# Restore original functions
for tool in tools:
tool_name = tool.metadata.name
if tool_name in original_functions:
tool.fn = original_functions[tool_name]
except Exception as e:
yield f"❌ Error: {str(e)}"
finally:
instrumentor.flush()