File size: 6,261 Bytes
ca89148
 
 
 
 
 
 
 
 
5a7e21a
 
 
ca89148
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5a7e21a
 
ca89148
5a7e21a
 
ca89148
cd69490
 
 
ca89148
 
 
 
 
 
 
 
cd69490
 
ca89148
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5a7e21a
ca89148
 
5a7e21a
ca89148
 
 
 
cd69490
ca89148
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cd69490
 
 
 
 
 
 
 
f98f521
ff06039
5a7e21a
ff06039
 
 
f98f521
5a7e21a
f98f521
 
cd69490
 
 
 
 
ca89148
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import os
import json
import shutil
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import asyncio
import aiofiles
import aiosmtplib
from agent_monitor.monitor import analyze_agent_steps
from agent_monitor.failure_report import analyze_agent_performance, AsyncOpenAIClient
import traceback

async def check_and_process_uploads():
    upload_dir =  "evals_upload"
    processed_dir = "evals_processed"
    live_dir = "evals_live"
    
    new_uploads = [f for f in os.listdir(upload_dir) if f.endswith('.json')]
    
    if not new_uploads:
        print("No new uploads found.")
        return
    
    # check for all new uploads whether they are already in live or processed directory
    # Also check whether the files are actually identical
    unprocessed_uploads = []
    for upload in new_uploads:
        upload_path = os.path.join(upload_dir, upload)
        processed_path = os.path.join(processed_dir, upload)
        live_path = os.path.join(live_dir, upload)
        
        if not os.path.exists(live_path) and not os.path.exists(processed_path):
            unprocessed_uploads.append(upload)
        elif os.path.exists(processed_path):
            # with open(upload_path, 'r') as f:
            #     new_data = json.load(f)
            
            # with open(processed_path, 'r') as f:
            #     processed_data = json.load(f)
            
            # TODO we can use a better comparison method with exact comparison
            # if new_data != processed_data:
            #     unprocessed_uploads.append(upload)
            print(f"Upload {upload} is already in processed directory.")
        elif os.path.exists(live_path):
            with open(upload_path, 'r') as f:
                new_data = json.load(f)
            
            with open(live_path, 'r') as f:
                live_data = json.load(f)
            
            # if new_data != live_data:
            #     unprocessed_uploads.append(upload)
            print(f"Upload {upload} is already in live directory.")
        else:
            unprocessed_uploads.append(upload)

    print(f"Processing {len(unprocessed_uploads)} new uploads.")
    tasks = []
    for upload in unprocessed_uploads:
        upload_path = os.path.join(upload_dir, upload)
        processed_path = os.path.join(processed_dir, upload)
        tasks.append(process_single_upload(upload_path, processed_path))
    
    await asyncio.gather(*tasks)


async def process_single_upload(upload_path, processed_path):
    # Check the structure of the upload
    check_result = await check_upload_structure(upload_path)
    
    if check_result['is_valid']:
        # Process the file
        await process_upload(upload_path, processed_path)
        
        # Move the file to processed directory
        # await asyncio.to_thread(shutil.move, upload_path, processed_path)
        
        # Send email notification
        # await send_email_notification(upload_path.name, check_result, "Processing successful")
    else:
        print(f"Upload check failed for {upload_path}: {check_result['message']}")
        # Send email notification about the failed check
        # await send_email_notification(upload_path.name, check_result, "Upload check failed")
        
        
async def check_upload_structure(file_path):
    try:
        async with aiofiles.open(file_path, 'r') as f:
            data = json.loads(await f.read())
        
        # Check for required keys
        required_keys = ['config', 'results', 'raw_eval_results', 'raw_logging_results']
        missing_keys = [key for key in required_keys if key not in data]
        
        if missing_keys:
            return {'is_valid': False, 'message': f"Missing required keys: {', '.join(missing_keys)}"}
        
        # Check for specific structure in raw_logging_results
        if not isinstance(data['raw_logging_results'], list):
            return {'is_valid': False, 'message': "raw_logging_results should be a list"}
        
        for item in data['raw_logging_results']:
            if not all(key in item for key in ['weave_task_id', 'inputs', 'outputs']):
                return {'is_valid': False, 'message': "Each item in raw_logging_results should have weave_task_id, inputs, and outputs"}
        
        return {'is_valid': True, 'message': "File structure is valid"}

    except json.JSONDecodeError:
        return {'is_valid': False, 'message': "Invalid JSON format"}
    except Exception as e:
        return {'is_valid': False, 'message': f"Unexpected error: {str(e)}"}
    

async def process_upload(input_path, output_path):
    print(f"Processing {input_path}...")
    # load the file
    with open(input_path, 'r') as f:
        data = json.loads(f.read())

    assert 'raw_logging_results' in data, "raw_logging_results key not found in the file"
    openai_client = AsyncOpenAIClient(model="gpt-4o-mini")

    try:
        processed_calls = await analyze_agent_steps(data['raw_logging_results'], openai_client, llm_eval=False)
        data['raw_logging_results'] = processed_calls

        # failure_report = await analyze_agent_performance(data['raw_logging_results'], data['results']['failed_tasks'], openai_client)
        # data['failure_report'] = None
    except Exception as e:
        traceback.print_exc()
        print(f"Error in processing: {str(e)}")
        return

    with open(output_path, 'w') as f:
        json.dump(data, f, indent=4)

    print(f"Processing of {input_path} successful. Results saved to {output_path}")

async def send_email_notification(filename, check_result, status):
    sender_email = "[email protected]"
    receiver_email = "[email protected]"
    password = "your_password"

    message = MIMEMultipart()
    message["From"] = sender_email
    message["To"] = receiver_email
    message["Subject"] = f"Upload Processing Notification: {filename}"

    body = f"""
    File: {filename}
    Status: {status}
    Check Result: {check_result['message']}
    """

    message.attach(MIMEText(body, "plain"))

    await aiosmtplib.send(
        message,
        hostname="smtp.gmail.com",
        port=465,
        use_tls=True,
        username=sender_email,
        password=password
    )