leaderboard / utils /processing.py
benediktstroebl
added trace download links
c03c7bc
import os
import json
import asyncio
import aiofiles
from agent_monitor.monitor import analyze_agent_steps
from agent_monitor.failure_report import analyze_agent_performance, AsyncOpenAIClient
import traceback
from tqdm import tqdm
async def check_and_process_uploads():
upload_dir = "evals_upload"
processed_dir = "evals_processed"
live_dir = "evals_live"
new_uploads = [f for f in os.listdir(upload_dir) if f.endswith('.json')]
if not new_uploads:
print("No new uploads found.")
return
# check for all new uploads whether they are already in live or processed directory
# Also check whether the files are actually identical
unprocessed_uploads = []
for upload in new_uploads:
upload_path = os.path.join(upload_dir, upload)
processed_path = os.path.join(processed_dir, upload)
live_path = os.path.join(live_dir, upload)
if not os.path.exists(live_path) and not os.path.exists(processed_path):
unprocessed_uploads.append(upload)
elif os.path.exists(processed_path):
print(f"Upload {upload} is already in processed directory.")
elif os.path.exists(live_path):
print(f"Upload {upload} is already in live directory.")
else:
unprocessed_uploads.append(upload)
print(f"Processing {len(unprocessed_uploads)} new uploads.")
tasks = []
for upload in tqdm(unprocessed_uploads):
upload_path = os.path.join(upload_dir, upload)
processed_path = os.path.join(processed_dir, upload)
# tasks.append(process_single_upload(upload_path, processed_path)) # for async processing
await process_single_upload(upload_path, processed_path)
# await asyncio.gather(*tasks) # for async processing
async def process_single_upload(upload_path, processed_path):
# Check the structure of the upload
check_result = await check_upload_structure(upload_path)
if check_result['is_valid']:
# Process the file
await process_upload(upload_path, processed_path)
# Move the file to processed directory
# await asyncio.to_thread(shutil.move, upload_path, processed_path)
else:
print(f"Upload check failed for {upload_path}: {check_result['message']}")
async def check_upload_structure(file_path):
try:
async with aiofiles.open(file_path, 'r') as f:
data = json.loads(await f.read())
# Check for required keys
required_keys = ['config', 'results', 'raw_eval_results', 'raw_logging_results']
missing_keys = [key for key in required_keys if key not in data]
if missing_keys:
return {'is_valid': False, 'message': f"Missing required keys: {', '.join(missing_keys)}"}
# Check for specific structure in raw_logging_results
if not isinstance(data['raw_logging_results'], list) and not "inspect" in data['config']['benchmark_name']:
return {'is_valid': False, 'message': "raw_logging_results should be a list"}
if "inspect" not in data['config']['benchmark_name']:
for item in data['raw_logging_results']:
if not all(key in item for key in ['weave_task_id', 'inputs', 'outputs']):
return {'is_valid': False, 'message': "Each item in raw_logging_results should have weave_task_id, inputs, and outputs"}
return {'is_valid': True, 'message': "File structure is valid"}
except json.JSONDecodeError:
return {'is_valid': False, 'message': "Invalid JSON format"}
except Exception as e:
return {'is_valid': False, 'message': f"Unexpected error: {str(e)}"}
async def process_upload(input_path, output_path):
print(f"Processing {input_path}...")
# load the file
with open(input_path, 'r') as f:
data = json.loads(f.read())
assert 'raw_logging_results' in data, "raw_logging_results key not found in the file"
try:
if isinstance(data['raw_logging_results'], list):
openai_client = AsyncOpenAIClient(model="gpt-4o-mini")
processed_calls = await analyze_agent_steps(data['raw_logging_results'], openai_client, llm_eval=False)
else:
processed_calls = data['raw_logging_results']
# # experimental
# failure_report = await analyze_agent_performance(data['raw_logging_results'], data['results']['failed_tasks'], openai_client)
data['raw_logging_results'] = processed_calls
data['failure_report'] = None
except Exception as e:
traceback.print_exc()
print(f"Error in processing: {str(e)}")
return
with open(output_path, 'w') as f:
json.dump(data, f, indent=4)
print(f"Processing of {input_path} successful. Results saved to {output_path}")
if __name__ == "__main__":
# process single upload for testing
asyncio.run(process_single_upload("evals_upload/inspect_evalsswe_bench_1729538131_UPLOAD.json", "evals_processed/inspect_evalsswe_bench_1729538131_UPLOAD.json"))