Spaces:
Running
Running
benediktstroebl
commited on
Commit
·
caec940
1
Parent(s):
cb163b3
added failure report
Browse files- agent_monitor/failure_report.py +234 -0
agent_monitor/failure_report.py
ADDED
@@ -0,0 +1,234 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import asyncio
|
2 |
+
from openai import AsyncOpenAI
|
3 |
+
from collections import defaultdict
|
4 |
+
import weave
|
5 |
+
from pydantic import BaseModel
|
6 |
+
from abc import ABC, abstractmethod
|
7 |
+
import json
|
8 |
+
from typing import Dict, List
|
9 |
+
|
10 |
+
class FailureCategory(BaseModel):
|
11 |
+
category_id: int
|
12 |
+
category_name: str
|
13 |
+
description: str
|
14 |
+
|
15 |
+
class FailureCategories(BaseModel):
|
16 |
+
failure_categories: list[FailureCategory]
|
17 |
+
|
18 |
+
class TaskSummary(BaseModel):
|
19 |
+
task_id: str
|
20 |
+
summary: str
|
21 |
+
|
22 |
+
class TaskClassification(BaseModel):
|
23 |
+
task_id: str
|
24 |
+
category_id: str
|
25 |
+
category_name: str
|
26 |
+
explanation: str
|
27 |
+
|
28 |
+
class OverallAnalysis(BaseModel):
|
29 |
+
failure_categories: List[Dict]
|
30 |
+
task_classifications: Dict[str, Dict]
|
31 |
+
summary: str
|
32 |
+
|
33 |
+
class AsyncLLMClient(ABC):
|
34 |
+
@abstractmethod
|
35 |
+
async def generate_text(self, prompt, system_message=None, response_format=None):
|
36 |
+
pass
|
37 |
+
|
38 |
+
class AsyncOpenAIClient(AsyncLLMClient):
|
39 |
+
def __init__(self, model="gpt-4o-mini"):
|
40 |
+
self.model = model
|
41 |
+
self.client = AsyncOpenAI()
|
42 |
+
|
43 |
+
async def generate_text(self, prompt, system_message=None, response_format=None):
|
44 |
+
messages = [
|
45 |
+
{"role": "system", "content": system_message or "You are a helpful AI assistant."},
|
46 |
+
{"role": "user", "content": prompt}
|
47 |
+
]
|
48 |
+
if response_format:
|
49 |
+
response = await self.client.beta.chat.completions.parse(model=self.model, messages=messages, response_format=response_format)
|
50 |
+
else:
|
51 |
+
response = await self.client.chat.completions.create(model=self.model, messages=messages)
|
52 |
+
return response.choices[0].message.content
|
53 |
+
|
54 |
+
def get_weave_calls(client):
|
55 |
+
calls = client.calls()
|
56 |
+
processed_calls = []
|
57 |
+
for call in calls:
|
58 |
+
ChatCompletion = weave.ref(call.output).get()
|
59 |
+
choices = [choice.message.content for choice in ChatCompletion.choices]
|
60 |
+
output = {
|
61 |
+
'weave_task_id': call.attributes['weave_task_id'],
|
62 |
+
'trace_id': call.trace_id,
|
63 |
+
'project_id': call.project_id,
|
64 |
+
'created_timestamp': ChatCompletion.created,
|
65 |
+
'inputs': dict(call.inputs),
|
66 |
+
'id': call.id,
|
67 |
+
'outputs': {'choices' : choices},
|
68 |
+
'exception': call.exception,
|
69 |
+
'summary': call.summary,
|
70 |
+
'display_name': call.display_name,
|
71 |
+
'attributes': dict(call.attributes),
|
72 |
+
"_children": call._children,
|
73 |
+
'_feedback': call._feedback,
|
74 |
+
}
|
75 |
+
processed_calls.append(output)
|
76 |
+
return processed_calls
|
77 |
+
|
78 |
+
async def analyze_agent_performance(processed_calls, failed_tasks: list, llm_client):
|
79 |
+
task_calls = defaultdict(list)
|
80 |
+
for call in processed_calls:
|
81 |
+
if call['weave_task_id'] in failed_tasks:
|
82 |
+
task_calls[call['weave_task_id']].append(call)
|
83 |
+
|
84 |
+
for task_id in task_calls:
|
85 |
+
task_calls[task_id].sort(key=lambda x: x['created_timestamp'])
|
86 |
+
|
87 |
+
task_summaries = await asyncio.gather(*[summarize_task(task_id, calls, llm_client) for task_id, calls in task_calls.items()])
|
88 |
+
|
89 |
+
failure_categories = await identify_failure_categories(task_summaries, llm_client)
|
90 |
+
task_classifications = await classify_tasks(task_summaries, failure_categories, llm_client)
|
91 |
+
overall_summary = await generate_overall_summary(failure_categories, task_classifications, llm_client)
|
92 |
+
|
93 |
+
task_classifications = {tc["task_id"]: tc for tc in task_classifications}
|
94 |
+
|
95 |
+
return dict(OverallAnalysis(
|
96 |
+
failure_categories=failure_categories,
|
97 |
+
task_classifications=task_classifications,
|
98 |
+
summary=overall_summary
|
99 |
+
))
|
100 |
+
|
101 |
+
async def summarize_task(task_id, calls, llm_client):
|
102 |
+
calls_summary = ""
|
103 |
+
for i, call in enumerate(calls, 1):
|
104 |
+
calls_summary += f"""
|
105 |
+
Step {i}:
|
106 |
+
Input: {call['inputs']}
|
107 |
+
Output: {call['outputs']}
|
108 |
+
"""
|
109 |
+
|
110 |
+
prompt = f"""
|
111 |
+
Summarize the AI agent's performance on the following task:
|
112 |
+
Task ID: {task_id}
|
113 |
+
Number of steps: {len(calls)}
|
114 |
+
|
115 |
+
Detailed steps:
|
116 |
+
{calls_summary}
|
117 |
+
|
118 |
+
Provide a brief summary of:
|
119 |
+
1. The main goal of the task (inferred from the inputs and outputs)
|
120 |
+
2. The agent's approach, including key steps and decisions made
|
121 |
+
3. Any significant challenges or errors encountered during the task
|
122 |
+
4. The final outcome why the task failed. Be detailed about the reason for failure.
|
123 |
+
|
124 |
+
Keep the summary concise (around 200 words) but include specific details about the agent's performance and any notable aspects of its problem-solving process.
|
125 |
+
"""
|
126 |
+
|
127 |
+
system_message = "You are an AI performance analyst tasked with summarizing an AI agent's performance on individual tasks. Focus on the most important aspects of the agent's approach and performance."
|
128 |
+
summary = await llm_client.generate_text(prompt, system_message, response_format=TaskSummary)
|
129 |
+
return json.loads(summary)
|
130 |
+
|
131 |
+
async def identify_failure_categories(task_summaries, llm_client):
|
132 |
+
summaries_text = "\n\n".join([f"Task {s['task_id']}:\n{s['summary']}" for s in task_summaries])
|
133 |
+
prompt = f"""
|
134 |
+
Analyze the following summaries of an AI agent's performance across multiple tasks:
|
135 |
+
|
136 |
+
{summaries_text}
|
137 |
+
|
138 |
+
Identify recurring categories of failures that the agent faces across these tasks. For each category:
|
139 |
+
1. Provide a short, descriptive name (max 5 words)
|
140 |
+
2. Write a brief description explaining the nature of this failure or challenge category
|
141 |
+
|
142 |
+
Focus on patterns that appear across multiple tasks and represent specific errors that impacted the agent's performance. Make sure that your categories are distinct and cover a range of recurring issues. The categories should not bee too general.
|
143 |
+
|
144 |
+
Examples for categories could include:
|
145 |
+
Incorrect Implementation - The agent made a change to a reasonable area but their solution didn’t correctly address the issue.
|
146 |
+
Gave Up Prematurely - The agent decides to stop solving the task after encountering some difficulty.
|
147 |
+
Failed Edit Recovery - The agent went into an loop, making recurrent failing edits without recovering.
|
148 |
+
"""
|
149 |
+
|
150 |
+
system_message = "You are an expert in AI agent analysis, tasked with identifying recurring patterns in agent performance across multiple tasks."
|
151 |
+
categories = await llm_client.generate_text(prompt, system_message, response_format=FailureCategories)
|
152 |
+
return [dict(category) for category in json.loads(categories)['failure_categories']]
|
153 |
+
|
154 |
+
async def classify_tasks(task_summaries, failure_categories, llm_client):
|
155 |
+
categories_text = "\n".join([f"{cat['category_id']}. {cat['category_name']}: {cat['description']}" for i, cat in enumerate(failure_categories)])
|
156 |
+
classifications = []
|
157 |
+
|
158 |
+
for task in task_summaries:
|
159 |
+
prompt = f"""
|
160 |
+
Failure Categories:
|
161 |
+
{categories_text}
|
162 |
+
|
163 |
+
Task Summary:
|
164 |
+
{task['summary']}
|
165 |
+
|
166 |
+
Classify this task into one of the failure categories listed above. Provide:
|
167 |
+
1. The number of the chosen category
|
168 |
+
2. A brief explanation of why this category best fits the task's outcome
|
169 |
+
|
170 |
+
If the task doesn't clearly fit any category, you may classify it as "0. Other" and explain why.
|
171 |
+
"""
|
172 |
+
|
173 |
+
system_message = "You are an AI performance analyst tasked with classifying task outcomes into predefined categories."
|
174 |
+
classification = await llm_client.generate_text(prompt, system_message, response_format=TaskClassification)
|
175 |
+
classification = json.loads(classification)
|
176 |
+
|
177 |
+
category_number = classification['category_id']
|
178 |
+
if str(category_number) == "0":
|
179 |
+
category_name = "Other"
|
180 |
+
else:
|
181 |
+
for cat in failure_categories:
|
182 |
+
if str(cat['category_id']) == str(category_number):
|
183 |
+
category_name = cat['category_name']
|
184 |
+
break
|
185 |
+
else:
|
186 |
+
category_name = "Other"
|
187 |
+
|
188 |
+
explanation = classification['explanation']
|
189 |
+
|
190 |
+
classifications.append(dict(TaskClassification(
|
191 |
+
task_id=task['task_id'],
|
192 |
+
category_id=category_number,
|
193 |
+
category_name=category_name,
|
194 |
+
explanation=explanation
|
195 |
+
)))
|
196 |
+
|
197 |
+
return classifications
|
198 |
+
|
199 |
+
async def generate_overall_summary(failure_categories, task_classifications, llm_client):
|
200 |
+
categories_text = "\n".join([f"{cat['category_name']}: {cat['description']}" for cat in failure_categories])
|
201 |
+
|
202 |
+
classifications_text = "\n".join([f"Task {tc['task_id']}: {tc['category_name']}" for tc in task_classifications])
|
203 |
+
|
204 |
+
prompt = f"""
|
205 |
+
Failure Categories:
|
206 |
+
{categories_text}
|
207 |
+
|
208 |
+
Task Classifications:
|
209 |
+
{classifications_text}
|
210 |
+
|
211 |
+
Based on the failure categories identified and the classification of tasks, provide an overall summary of the AI agent's performance across all tasks. Include:
|
212 |
+
1. The most common types of failures or challenges
|
213 |
+
2. Any patterns in the agent's performance across different tasks
|
214 |
+
3. Suggestions for areas of improvement in the agent's design or training
|
215 |
+
|
216 |
+
Keep the summary concise but insightful, focusing on the most significant findings and their implications for AI agent development. Do only return the summary itself without any preceding context etc.
|
217 |
+
"""
|
218 |
+
|
219 |
+
system_message = "You are a senior AI researcher tasked with providing a high-level analysis of an AI agent's performance across multiple tasks."
|
220 |
+
return await llm_client.generate_text(prompt, system_message)
|
221 |
+
|
222 |
+
async def main():
|
223 |
+
client = weave.init("citp_agent_eval/usaco_1723148990")
|
224 |
+
processed_calls = get_weave_calls(client)
|
225 |
+
|
226 |
+
weave.finish()
|
227 |
+
openai_client = AsyncOpenAIClient(model="gpt-4o-mini")
|
228 |
+
overall_analysis = await analyze_agent_performance(processed_calls, openai_client)
|
229 |
+
|
230 |
+
with open("agent_performance_analysis.json", "w") as f:
|
231 |
+
json.dump(overall_analysis.model_dump(), f, indent=4)
|
232 |
+
|
233 |
+
if __name__ == "__main__":
|
234 |
+
asyncio.run(main())
|