import os import json import asyncio import aiofiles from agent_monitor.monitor import analyze_agent_steps from agent_monitor.failure_report import analyze_agent_performance, AsyncOpenAIClient import traceback from tqdm import tqdm async def check_and_process_uploads(): upload_dir = "evals_upload" processed_dir = "evals_processed" live_dir = "evals_live" new_uploads = [f for f in os.listdir(upload_dir) if f.endswith('.json')] if not new_uploads: print("No new uploads found.") return # check for all new uploads whether they are already in live or processed directory # Also check whether the files are actually identical unprocessed_uploads = [] for upload in new_uploads: upload_path = os.path.join(upload_dir, upload) processed_path = os.path.join(processed_dir, upload) live_path = os.path.join(live_dir, upload) if not os.path.exists(live_path) and not os.path.exists(processed_path): unprocessed_uploads.append(upload) elif os.path.exists(processed_path): print(f"Upload {upload} is already in processed directory.") elif os.path.exists(live_path): print(f"Upload {upload} is already in live directory.") else: unprocessed_uploads.append(upload) print(f"Processing {len(unprocessed_uploads)} new uploads.") tasks = [] for upload in tqdm(unprocessed_uploads): upload_path = os.path.join(upload_dir, upload) processed_path = os.path.join(processed_dir, upload) # tasks.append(process_single_upload(upload_path, processed_path)) # for async processing await process_single_upload(upload_path, processed_path) # await asyncio.gather(*tasks) # for async processing async def process_single_upload(upload_path, processed_path): # Check the structure of the upload check_result = await check_upload_structure(upload_path) if check_result['is_valid']: # Process the file await process_upload(upload_path, processed_path) # Move the file to processed directory # await asyncio.to_thread(shutil.move, upload_path, processed_path) else: print(f"Upload check failed for {upload_path}: {check_result['message']}") async def check_upload_structure(file_path): try: async with aiofiles.open(file_path, 'r') as f: data = json.loads(await f.read()) # Check for required keys required_keys = ['config', 'results', 'raw_eval_results', 'raw_logging_results'] missing_keys = [key for key in required_keys if key not in data] if missing_keys: return {'is_valid': False, 'message': f"Missing required keys: {', '.join(missing_keys)}"} # Check for specific structure in raw_logging_results if not isinstance(data['raw_logging_results'], list) and not "inspect" in data['config']['benchmark_name']: return {'is_valid': False, 'message': "raw_logging_results should be a list"} if "inspect" not in data['config']['benchmark_name']: for item in data['raw_logging_results']: if not all(key in item for key in ['weave_task_id', 'inputs', 'outputs']): return {'is_valid': False, 'message': "Each item in raw_logging_results should have weave_task_id, inputs, and outputs"} return {'is_valid': True, 'message': "File structure is valid"} except json.JSONDecodeError: return {'is_valid': False, 'message': "Invalid JSON format"} except Exception as e: return {'is_valid': False, 'message': f"Unexpected error: {str(e)}"} async def process_upload(input_path, output_path): print(f"Processing {input_path}...") # load the file with open(input_path, 'r') as f: data = json.loads(f.read()) assert 'raw_logging_results' in data, "raw_logging_results key not found in the file" try: if isinstance(data['raw_logging_results'], list): openai_client = AsyncOpenAIClient(model="gpt-4o-mini") processed_calls = await analyze_agent_steps(data['raw_logging_results'], openai_client, llm_eval=False) else: processed_calls = data['raw_logging_results'] # # experimental # failure_report = await analyze_agent_performance(data['raw_logging_results'], data['results']['failed_tasks'], openai_client) data['raw_logging_results'] = processed_calls data['failure_report'] = None except Exception as e: traceback.print_exc() print(f"Error in processing: {str(e)}") return with open(output_path, 'w') as f: json.dump(data, f, indent=4) print(f"Processing of {input_path} successful. Results saved to {output_path}") if __name__ == "__main__": # process single upload for testing asyncio.run(process_single_upload("evals_upload/inspect_evalsswe_bench_1729538131_UPLOAD.json", "evals_processed/inspect_evalsswe_bench_1729538131_UPLOAD.json"))