benediktstroebl commited on
Commit
cd69490
·
1 Parent(s): 3e874db

format update and added monitor llm client backend

Browse files
Files changed (4) hide show
  1. agent_monitor/monitor.py +170 -0
  2. utils/data.py +12 -1
  3. utils/processing.py +25 -10
  4. utils/viz.py +34 -6
agent_monitor/monitor.py ADDED
@@ -0,0 +1,170 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ from openai import AsyncOpenAI
3
+ from collections import defaultdict
4
+ import weave
5
+ from pydantic import BaseModel
6
+ from abc import ABC, abstractmethod
7
+ import json
8
+
9
+ class StepAnalysis(BaseModel):
10
+ description: str
11
+ action_type: str
12
+ assessment: str
13
+ success: bool
14
+ headline: str
15
+
16
+ class TaskSummary(BaseModel):
17
+ overview: str
18
+ key_successes: str
19
+ main_challenges: str
20
+ overall_assessment: str
21
+
22
+
23
+ def get_weave_calls(client):
24
+ calls = client.calls()
25
+
26
+ processed_calls = []
27
+ for call in calls:
28
+ ChatCompletion = weave.ref(call.output).get()
29
+ choices = [choice.message.content for choice in ChatCompletion.choices]
30
+ output = {
31
+ 'weave_task_id': call.attributes['weave_task_id'],
32
+ 'trace_id': call.trace_id,
33
+ 'project_id': call.project_id,
34
+ 'created_timestamp': ChatCompletion.created,
35
+ 'inputs': dict(call.inputs),
36
+ 'id': call.id,
37
+ 'outputs': {'choices' : choices},
38
+ 'exception': call.exception,
39
+ 'summary': call.summary,
40
+ 'display_name': call.display_name,
41
+ 'attributes': dict(call.attributes),
42
+ "_children": call._children,
43
+ '_feedback': call._feedback,
44
+ }
45
+ processed_calls.append(output)
46
+ return processed_calls
47
+
48
+ class AsyncLLMClient(ABC):
49
+ @abstractmethod
50
+ async def generate_text(self, prompt, system_message=None, response_format=None):
51
+ pass
52
+
53
+ class AsyncOpenAIClient(AsyncLLMClient):
54
+ def __init__(self, model="gpt-4o-mini"):
55
+ self.model = model
56
+ self.client = AsyncOpenAI()
57
+
58
+ async def generate_text(self, prompt, system_message=None, response_format=None):
59
+ messages = [
60
+ {"role": "system", "content": system_message or "You are a helpful AI assistant."},
61
+ {"role": "user", "content": prompt}
62
+ ]
63
+ response = await self.client.beta.chat.completions.parse(model=self.model, messages=messages, response_format=response_format)
64
+
65
+ return response.choices[0].message.content
66
+
67
+ async def analyze_agent_steps(processed_calls, llm_client):
68
+ task_calls = defaultdict(list)
69
+ for call in processed_calls:
70
+ task_calls[call['weave_task_id']].append(call)
71
+
72
+ for task_id in task_calls:
73
+ task_calls[task_id].sort(key=lambda x: x['created_timestamp'])
74
+
75
+ tasks = [analyze_task(calls, llm_client) for task_id, calls in task_calls.items()]
76
+ task_analyses = await asyncio.gather(*tasks)
77
+
78
+ return dict(zip(task_calls.keys(), task_analyses))
79
+
80
+ async def analyze_task(calls, llm_client):
81
+ step_tasks = [analyze_step(call, i+1, len(calls), llm_client) for i, call in enumerate(calls)]
82
+ steps = await asyncio.gather(*step_tasks)
83
+
84
+ try:
85
+ task_analysis = await summarize_task(steps, llm_client)
86
+
87
+ return {
88
+ 'steps': steps,
89
+ 'task_analysis': task_analysis
90
+ }
91
+ except Exception as e:
92
+ print(f"Error in task summarization: {str(e)}")
93
+ return TaskSummary(
94
+ overview="Summary generation failed",
95
+ key_successes=[],
96
+ main_challenges=[],
97
+ overall_assessment="Unable to assess due to error"
98
+ )
99
+
100
+ async def analyze_step(call, step_number, total_steps, llm_client):
101
+ prompt = f"""
102
+ Analyze Step {step_number}/{total_steps} of AI agent task:
103
+ Input: {call['inputs']}
104
+ Output: {call['outputs']}
105
+ Exception: {call['exception']}
106
+ Summary: {call['summary']}
107
+
108
+ Provide an analysis with:
109
+ 1. A brief description of the agent's action.
110
+ 2. Classify the action as one of: 'plan', 'tool', 'retrieve', or 'other'.
111
+ 3. Give a brief evaluation of progress, obstacles, or errors.
112
+ 4. Indicate if the agent successfully completed its intended action.
113
+ 5. Write a concise headline summarizing the agent's action that is ideally less than 7 words long.
114
+
115
+ Ensure accuracy and conciseness. Be specific and avoid too high-level descriptions.
116
+ """
117
+
118
+ system_message = "You are an expert AI system analyst, skilled in categorizing and evaluating AI agent actions."
119
+ analysis = await llm_client.generate_text(prompt, system_message, response_format=StepAnalysis)
120
+
121
+ try:
122
+ analysis = json.loads(analysis)
123
+ except json.JSONDecodeError:
124
+ print(f"Error parsing analysis for step {step_number} of {total_steps} in task {call['weave_task_id']}. Using default values.")
125
+ analysis = print(f"Error in analysis for step {step_number} of {total_steps} in task {call['weave_task_id']}: {str(e)}")
126
+ analysis = StepAnalysis(
127
+ description="Analysis failed",
128
+ category='other',
129
+ success=False,
130
+ assessment="Unable to assess due to error"
131
+ )
132
+
133
+ return {
134
+ 'call_data': call,
135
+ 'analysis': analysis
136
+ }
137
+ async def summarize_task(steps, llm_client):
138
+ steps_summary = "\n".join([f"Step {i+1}: {step['analysis']}" for i, step in enumerate(steps)])
139
+
140
+ prompt = f"""
141
+ Analyze the following AI agent task steps:
142
+
143
+ {steps_summary}
144
+
145
+ Provide a summary with:
146
+ 1. A concise overview of the agent's approach.
147
+ 2. Main achievements or breakthroughs.
148
+ 3. Primary obstacles or errors encountered.
149
+ 4. A brief evaluation of the agent's overall performance.
150
+
151
+ Focus on patterns in the agent's approach and effectiveness. Be concise and insightful.
152
+ """
153
+
154
+ system_message = "You are an expert AI performance analyst, skilled in evaluating and summarizing AI agent task execution."
155
+ analysis = await llm_client.generate_text(prompt, system_message, response_format=TaskSummary)
156
+ return json.loads(analysis)
157
+
158
+ # async def main():
159
+ # client = weave.init("citp_agent_eval/usaco_1723148990")
160
+ # processed_calls = get_weave_calls(client)
161
+
162
+ # weave.finish()
163
+ # openai_client = AsyncOpenAIClient(model="gpt-4o-mini")
164
+ # task_analyses_openai = await analyze_agent_steps(processed_calls, openai_client)
165
+
166
+ # with open("task_analyses.json", "w") as f:
167
+ # json.dump(task_analyses_openai, f, indent=4)
168
+
169
+ # if __name__ == "__main__":
170
+ # asyncio.run(main())
utils/data.py CHANGED
@@ -47,7 +47,18 @@ def parse_json_files(folder_path, benchmark_name):
47
 
48
  # round all float columns to 2 decimal places
49
  for column in df.select_dtypes(include='float').columns:
50
- df[column] = df[column].round(2)
 
 
 
 
 
 
 
 
 
 
 
51
 
52
  return df
53
 
 
47
 
48
  # round all float columns to 2 decimal places
49
  for column in df.select_dtypes(include='float').columns:
50
+ df[column] = df[column].round(3)
51
+
52
+ # Rename columns
53
+ df = df.rename(columns={
54
+ 'agent_name': 'Agent Name',
55
+ 'results_total_cost': 'Total Cost',
56
+ 'results_accuracy': 'Accuracy',
57
+ 'results_precision': 'Precision',
58
+ 'results_recall': 'Recall',
59
+ 'results_f1_score': 'F1 Score',
60
+ 'results_auc': 'AUC',
61
+ })
62
 
63
  return df
64
 
utils/processing.py CHANGED
@@ -7,6 +7,7 @@ from email.mime.multipart import MIMEMultipart
7
  import asyncio
8
  import aiofiles
9
  import aiosmtplib
 
10
 
11
  async def check_and_process_uploads():
12
  upload_dir = "evals_upload"
@@ -36,8 +37,9 @@ async def check_and_process_uploads():
36
  with open(processed_path, 'r') as f:
37
  processed_data = json.load(f)
38
 
39
- if new_data != processed_data:
40
- unprocessed_uploads.append(upload)
 
41
  print(f"Upload {upload} is already in processed directory.")
42
  elif os.path.exists(live_path):
43
  with open(upload_path, 'r') as f:
@@ -46,8 +48,8 @@ async def check_and_process_uploads():
46
  with open(live_path, 'r') as f:
47
  live_data = json.load(f)
48
 
49
- if new_data != live_data:
50
- unprocessed_uploads.append(upload)
51
  print(f"Upload {upload} is already in live directory.")
52
  else:
53
  unprocessed_uploads.append(upload)
@@ -71,12 +73,12 @@ async def process_single_upload(upload_path, processed_path):
71
  await process_upload(upload_path, processed_path)
72
 
73
  # Move the file to processed directory
74
- await asyncio.to_thread(shutil.move, upload_path, processed_path)
75
 
76
  # Send email notification
77
  # await send_email_notification(upload_path.name, check_result, "Processing successful")
78
  else:
79
- pass
80
  # Send email notification about the failed check
81
  # await send_email_notification(upload_path.name, check_result, "Upload check failed")
82
 
@@ -110,10 +112,23 @@ async def check_upload_structure(file_path):
110
 
111
 
112
  async def process_upload(input_path, output_path):
113
- # Placeholder for processing logic
114
- # For now, we'll just copy the file
115
- await asyncio.to_thread(shutil.copy, input_path, output_path)
116
- print(f"Processed {input_path} to {output_path}")
 
 
 
 
 
 
 
 
 
 
 
 
 
117
 
118
  async def send_email_notification(filename, check_result, status):
119
  sender_email = "[email protected]"
 
7
  import asyncio
8
  import aiofiles
9
  import aiosmtplib
10
+ from agent_monitor.monitor import analyze_agent_steps, AsyncOpenAIClient
11
 
12
  async def check_and_process_uploads():
13
  upload_dir = "evals_upload"
 
37
  with open(processed_path, 'r') as f:
38
  processed_data = json.load(f)
39
 
40
+ # TODO we can use a better comparison method with exact comparison
41
+ # if new_data != processed_data:
42
+ # unprocessed_uploads.append(upload)
43
  print(f"Upload {upload} is already in processed directory.")
44
  elif os.path.exists(live_path):
45
  with open(upload_path, 'r') as f:
 
48
  with open(live_path, 'r') as f:
49
  live_data = json.load(f)
50
 
51
+ # if new_data != live_data:
52
+ # unprocessed_uploads.append(upload)
53
  print(f"Upload {upload} is already in live directory.")
54
  else:
55
  unprocessed_uploads.append(upload)
 
73
  await process_upload(upload_path, processed_path)
74
 
75
  # Move the file to processed directory
76
+ # await asyncio.to_thread(shutil.move, upload_path, processed_path)
77
 
78
  # Send email notification
79
  # await send_email_notification(upload_path.name, check_result, "Processing successful")
80
  else:
81
+ print(f"Upload check failed for {upload_path}: {check_result['message']}")
82
  # Send email notification about the failed check
83
  # await send_email_notification(upload_path.name, check_result, "Upload check failed")
84
 
 
112
 
113
 
114
  async def process_upload(input_path, output_path):
115
+ print(f"Processing {input_path}...")
116
+ # load the file
117
+ with open(input_path, 'r') as f:
118
+ data = json.loads(f.read())
119
+
120
+ assert 'raw_logging_results' in data, "raw_logging_results key not found in the file"
121
+ openai_client = AsyncOpenAIClient(model="gpt-4o-mini")
122
+
123
+ processed_calls = await analyze_agent_steps(data['raw_logging_results'][:2], openai_client)
124
+
125
+ # Save the processed data
126
+ data['raw_logging_results'] = processed_calls
127
+
128
+ with open(output_path, 'w') as f:
129
+ json.dump(data, f, indent=4)
130
+
131
+ print(f"Processing of {input_path} successful. Results saved to {output_path}")
132
 
133
  async def send_email_notification(filename, check_result, status):
134
  sender_email = "[email protected]"
utils/viz.py CHANGED
@@ -5,15 +5,23 @@ import plotly.graph_objects as go
5
  import textwrap
6
 
7
  def create_scatter_plot(df, x: str, y: str, x_label: str = None, y_label: str = None, hover_data: list = None):
8
- agents = [Agent(row.results_total_cost, row.results_accuracy) for row in df.itertuples()]
9
  pareto_frontier = compute_pareto_frontier(agents)
10
 
11
-
12
  fig = px.scatter(df,
13
  x=x,
14
  y=y,
15
- hover_data=hover_data,
16
- )
 
 
 
 
 
 
 
 
 
17
 
18
 
19
  # Sort the Pareto frontier points by x-coordinate
@@ -32,7 +40,7 @@ def create_scatter_plot(df, x: str, y: str, x_label: str = None, y_label: str =
32
  fig.update_xaxes(rangemode="tozero")
33
 
34
  fig.update_layout(
35
- width = 600,
36
  height = 500,
37
  xaxis_title = x_label,
38
  yaxis_title = y_label,
@@ -52,7 +60,27 @@ def create_scatter_plot(df, x: str, y: str, x_label: str = None, y_label: str =
52
  xanchor="right",
53
  x=0.98,
54
  bgcolor="rgba(255, 255, 255, 0.5)" # semi-transparent white background
55
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
56
  )
57
  return fig
58
 
 
5
  import textwrap
6
 
7
  def create_scatter_plot(df, x: str, y: str, x_label: str = None, y_label: str = None, hover_data: list = None):
8
+ agents = [Agent(row['Total Cost'], row['Accuracy']) for i, row in df.iterrows()]
9
  pareto_frontier = compute_pareto_frontier(agents)
10
 
 
11
  fig = px.scatter(df,
12
  x=x,
13
  y=y,
14
+ custom_data=hover_data)
15
+ fig.update_traces(
16
+ hovertemplate="<br>".join([
17
+ "<b>Agent</b>: %{customdata[0]}",
18
+ "<b>Total Cost</b>: $%{x:.1f}",
19
+ "<b>Accuracy</b>: %{y:.1%}",
20
+ ])
21
+ )
22
+
23
+ fig.update_traces(marker=dict(size=10, color='#1b9e77'),
24
+ hoverlabel=dict(bgcolor="white", font_size=12, font_family="Arial"),)
25
 
26
 
27
  # Sort the Pareto frontier points by x-coordinate
 
40
  fig.update_xaxes(rangemode="tozero")
41
 
42
  fig.update_layout(
43
+ width = 1400,
44
  height = 500,
45
  xaxis_title = x_label,
46
  yaxis_title = y_label,
 
60
  xanchor="right",
61
  x=0.98,
62
  bgcolor="rgba(255, 255, 255, 0.5)" # semi-transparent white background
63
+ ),
64
+ modebar=dict(
65
+ activecolor='#1f77b4', # Color of active tool
66
+ orientation='h', # Vertical orientation
67
+ bgcolor='rgba(255,255,255,0.8)', # Slightly transparent white background
68
+ color='#777', # Color of inactive tools
69
+ add = ['pan2d'],
70
+ remove = [
71
+ 'zoom2d',
72
+ 'zoomIn2d',
73
+ 'zoomOut2d',
74
+ 'resetScale2d',
75
+ 'hoverClosestCartesian',
76
+ 'hoverCompareCartesian',
77
+ 'toggleSpikelines',
78
+ 'lasso2d',
79
+ 'lasso',
80
+ 'select2d',
81
+ 'select']
82
+ ),
83
+ dragmode='pan'
84
  )
85
  return fig
86