File size: 8,592 Bytes
cd69490
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cb163b3
cd69490
 
 
 
 
 
 
cb163b3
cd69490
 
 
 
cb163b3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cd69490
 
cb163b3
 
 
cd69490
 
cb163b3
 
 
 
 
 
 
 
 
 
 
 
cd69490
 
cb163b3
df5cda0
 
 
 
cb163b3
cd69490
 
 
9f1f7b2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cd69490
 
 
 
 
 
 
cb163b3
cd69490
 
 
 
cb163b3
cd69490
 
 
 
 
 
 
 
 
9f1f7b2
cd69490
9f1f7b2
cd69490
9f1f7b2
 
 
 
 
cd69490
9f1f7b2
 
cd69490
9f1f7b2
cd69490
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import asyncio
from openai import AsyncOpenAI
from collections import defaultdict
import weave
from pydantic import BaseModel
from abc import ABC, abstractmethod
import json

class StepAnalysis(BaseModel):
    description: str
    action_type: str
    assessment: str
    success: bool
    headline: str

class TaskSummary(BaseModel):
    overview: str
    key_successes: str
    main_challenges: str
    overall_assessment: str


def get_weave_calls(client):
    calls = client.calls()

    processed_calls = []
    for call in calls:
        ChatCompletion = weave.ref(call.output).get()
        choices = [choice.message.content for choice in ChatCompletion.choices]
        output = {
            'weave_task_id': call.attributes['weave_task_id'],
            'trace_id': call.trace_id,
            'project_id': call.project_id,
            'created_timestamp': ChatCompletion.created,
            'inputs': dict(call.inputs),
            'id': call.id,
            'outputs': {'choices' : choices},
            'exception': call.exception,
            'summary': call.summary,
            'display_name': call.display_name,
            'attributes': dict(call.attributes),
            "_children": call._children,
            '_feedback': call._feedback,
        }
        processed_calls.append(output)
    return processed_calls

class AsyncLLMClient(ABC):
    @abstractmethod
    async def generate_text(self, prompt, system_message=None, response_format=None):
        pass

class AsyncOpenAIClient(AsyncLLMClient):
    def __init__(self, model="gpt-4o-mini"):
        self.model = model
        self.client = AsyncOpenAI()

    async def generate_text(self, prompt, system_message=None, response_format=None):
        messages = [
            {"role": "system", "content": system_message or "You are a helpful AI assistant."},
            {"role": "user", "content": prompt}
        ]
        response = await self.client.beta.chat.completions.parse(model=self.model, messages=messages, response_format=response_format)
        
        return response.choices[0].message.content

async def analyze_agent_steps(processed_calls, llm_client, llm_eval=False):
    task_calls = defaultdict(list)
    for call in processed_calls:
        task_calls[call['weave_task_id']].append(call)
    
    for task_id in task_calls:
        task_calls[task_id].sort(key=lambda x: x['created_timestamp'])
    
    tasks = [analyze_task(calls, llm_client, llm_eval) for task_id, calls in task_calls.items()]
    task_analyses = await asyncio.gather(*tasks)
    
    return dict(zip(task_calls.keys(), task_analyses))

async def analyze_task(calls, llm_client, llm_eval=False):
    if llm_eval:
        step_tasks = [analyze_step(call, i+1, len(calls), llm_client) for i, call in enumerate(calls)]
        steps = await asyncio.gather(*step_tasks)

    else:
        steps = []
        for i, call in enumerate(calls):
            steps.append({
                'call_data': call,
                'analysis': dict(StepAnalysis(
                    description="Not available",
                    action_type='other',
                    success=False,
                    assessment="Not available",
                    headline="Not available"
                ))
            })
    
    try:
        if llm_eval:
            task_analysis = await summarize_task(steps, llm_client)
            return {
            'steps': steps,
            'task_analysis': task_analysis
            }
        else:
            return {
                'steps': steps,
                'task_analysis': dict(TaskSummary(
                    overview="Not available",
                    key_successes='Not available',
                    main_challenges='Not available',
                    overall_assessment="Not available"
                ))
            }
        
    except Exception as e:
        print(f"Error in task summarization: {str(e)}")
        return dict(TaskSummary(
            overview="Not available",
            key_successes='Not available',
            main_challenges='Not available',
            overall_assessment="Not available"
        ))

async def analyze_step(call, step_number, total_steps, llm_client):
    prompt = f"""
    Analyze Step {step_number}/{total_steps} of the AI agent's USACO task solution:
    Input: {call['inputs']}
    Output: {call['outputs']}
    Exception: {call['exception']}
    Summary: {call['summary']}

    Provide a detailed, technical analysis with the following:
    1. Specific Description: Describe precisely what the agent did in this step, including any algorithms, data structures, or problem-solving techniques employed.
    2. Action Classification: Categorize the action as one of:
       - 'plan': Strategizing or outlining an approach
       - 'tool': Using a specific programming construct or algorithm
       - 'retrieve': Accessing or utilizing external information
       - 'other': Any action that doesn't fit the above categories
    3. Technical Evaluation: Assess the technical merit of the agent's approach. Comment on efficiency, correctness, and adherence to USACO problem-solving best practices.
    4. Success: Determine if the agent successfully completed its intended action.
    5. Concise Headline: Write a technically precise headline (max 7 words) that captures the essence of this step.

    Your analysis should be highly specific to this task. Avoid generalities and focus on the technical details of the agent's approach to this particular problem.
    """

    system_message = "You are an expert in AI agent design and evaluation. Analyze the AI agent's actions with the depth and specificity expected in a detailed expert review. Focus on providing insights that would be valuable to an AI researcher specializing in AI agent development."

    analysis = await llm_client.generate_text(prompt, system_message, response_format=StepAnalysis)

    try:
        analysis = json.loads(analysis)
    except json.JSONDecodeError:
        print(f"Error parsing analysis for step {step_number} of {total_steps} in task {call['weave_task_id']}. Using default values.")
        analysis = print(f"Error in analysis for step {step_number} of {total_steps} in task {call['weave_task_id']}: {str(e)}")
        analysis = dict(StepAnalysis(
            description="Analysis failed",
            category='other',
            success=False,
            assessment="Unable to assess due to error"
        ))
    
    return {
        'call_data': call,
        'analysis': analysis
    }
async def summarize_task(steps, llm_client):
    steps_summary = "\n".join([f"Step {i+1}: {step['analysis']}" for i, step in enumerate(steps)])
    
    prompt = f"""
    Provide a comprehensive analysis of the AI agent's approach to solving this USACO task:

    {steps_summary}

    Your analysis should include:
    1. Technical Overview: Describe the agent's overall problem-solving strategy, highlighting specific actions and techniques used throughout the task.
    2. Key Achievements: Identify and explain the most significant breakthroughs or efficient implementations demonstrated by the agent.
    3. Technical Challenges: Analyze the primary obstacles encountered, focusing on difficulties or conceptual misunderstandings in the context of the task.
    4. Performance Evaluation: Assess the agent's overall performance, considering factors such as time complexity, space efficiency, code quality, and adherence to competitive programming best practices.

    Your summary should be highly technical and specific to this task. Assume the reader is an expert as well and familiar with the task context. Focus on providing insights that would be valuable to an AI researcher specializing in AI agent development.
    """

    system_message = "You are an expert AI performance analyst, skilled in evaluating and summarizing AI agent task execution. You are specialized in providing analyses to support AI researchers to develop AI agents."
    analysis = await llm_client.generate_text(prompt, system_message, response_format=TaskSummary)
    return json.loads(analysis)

# async def main():
#     client = weave.init("citp_agent_eval/usaco_1723148990")
#     processed_calls = get_weave_calls(client)

#     weave.finish()
#     openai_client = AsyncOpenAIClient(model="gpt-4o-mini")
#     task_analyses_openai = await analyze_agent_steps(processed_calls, openai_client)

#     with open("task_analyses.json", "w") as f:
#         json.dump(task_analyses_openai, f, indent=4)

# if __name__ == "__main__":
#     asyncio.run(main())