File size: 3,543 Bytes
d8debf8
 
 
 
 
 
 
 
f487d08
 
 
 
 
 
 
c266c49
f487d08
 
c266c49
 
f487d08
c266c49
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
fd36d8c
c266c49
 
 
fd36d8c
c266c49
 
f487d08
c266c49
 
 
 
 
 
 
 
 
f487d08
 
c266c49
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d8debf8
c266c49
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
async def run_query(query: str):
    trace_id = f"agent-run-{uuid.uuid4().hex}"
    try:
        with instrumentor.observe(
            trace_id=trace_id,
            session_id="web-agent-session",
            user_id=ANON_USER_ID,
        ):
            # Clear the queue before starting
            while not stream_queue.empty():
                try:
                    stream_queue.get_nowait()
                except:
                    pass
            
            # Add initial messages to the queue
            await stream_queue.put("🤔 Thinking about your question...\n\n")
            
            # The key is to patch each individual tool function to capture its usage
            original_functions = {}
            
            # Store original functions and patch each tool
            for tool in tools:
                tool_name = tool.metadata.name
                original_fn = tool.fn
                original_functions[tool_name] = original_fn
                
                # Create a wrapper function that will log the tool usage
                def create_wrapper(orig_fn, tool_name):
                    async def wrapper(*args, **kwargs):
                        # Log tool usage
                        await stream_queue.put(f"🔧 Using tool: {tool_name}...\n")
                        
                        # Call original function
                        if asyncio.iscoroutinefunction(orig_fn):
                            result = await orig_fn(*args, **kwargs)
                        else:
                            result = orig_fn(*args, **kwargs)
                        
                        # Log result
                        await stream_queue.put(f"📊 Got result from {tool_name}\n")
                        return result
                    
                    return wrapper
                
                # Replace the function with our wrapped version
                tool.fn = create_wrapper(original_fn, tool_name)
            
            # Start the agent run
            await stream_queue.put("🧠 Planning approach...\n\n")
            task = asyncio.create_task(web_agent.run(query, ctx=ctx))
            
            # Stream updates while waiting for completion
            while not task.done():
                try:
                    # Check if there's anything in the queue to yield
                    if not stream_queue.empty():
                        chunk = await stream_queue.get()
                        yield chunk
                    else:
                        # Wait a bit and check again
                        await asyncio.sleep(0.1)
                except Exception as e:
                    yield f"\n⚠️ Error during streaming: {str(e)}\n"
            
            # Get the final result
            try:
                result = await task
                final_response = result.response if isinstance(result.response, str) else str(result.response)
                
                # Yield the final answer
                yield f"\n\n✅ Final answer: {final_response}"
            except Exception as e:
                yield f"\n\n❌ Error getting final result: {str(e)}"
                
            # Restore original functions
            for tool in tools:
                tool_name = tool.metadata.name
                if tool_name in original_functions:
                    tool.fn = original_functions[tool_name]
    except Exception as e:
        yield f"❌ Error: {str(e)}"
    finally:
        instrumentor.flush()