File size: 5,218 Bytes
ca89148
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import os
import json
import shutil
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import asyncio
import aiofiles
import aiosmtplib

async def check_and_process_uploads():
    upload_dir =  "evals_upload"
    processed_dir = "evals_processed"
    live_dir = "evals_live"
    
    new_uploads = [f for f in os.listdir(upload_dir) if f.endswith('.json')]
    
    if not new_uploads:
        print("No new uploads found.")
        return
    
    # check for all new uploads whether they are already in live or processed directory
    # Also check whether the files are actually identical
    unprocessed_uploads = []
    for upload in new_uploads:
        upload_path = os.path.join(upload_dir, upload)
        processed_path = os.path.join(processed_dir, upload)
        live_path = os.path.join(live_dir, upload)
        
        if not os.path.exists(live_path) and not os.path.exists(processed_path):
            unprocessed_uploads.append(upload)
        elif os.path.exists(processed_path):
            with open(upload_path, 'r') as f:
                new_data = json.load(f)
            
            with open(processed_path, 'r') as f:
                processed_data = json.load(f)
            
            if new_data != processed_data:
                unprocessed_uploads.append(upload)
            print(f"Upload {upload} is already in processed directory.")
        elif os.path.exists(live_path):
            with open(upload_path, 'r') as f:
                new_data = json.load(f)
            
            with open(live_path, 'r') as f:
                live_data = json.load(f)
            
            if new_data != live_data:
                unprocessed_uploads.append(upload)
            print(f"Upload {upload} is already in live directory.")
        else:
            unprocessed_uploads.append(upload)

    print(f"Processing {len(unprocessed_uploads)} new uploads.")
    tasks = []
    for upload in unprocessed_uploads:
        upload_path = os.path.join(upload_dir, upload)
        processed_path = os.path.join(processed_dir, upload)
        tasks.append(process_single_upload(upload_path, processed_path))
    
    await asyncio.gather(*tasks)


async def process_single_upload(upload_path, processed_path):
    # Check the structure of the upload
    check_result = await check_upload_structure(upload_path)
    
    if check_result['is_valid']:
        # Process the file
        await process_upload(upload_path, processed_path)
        
        # Move the file to processed directory
        await asyncio.to_thread(shutil.move, upload_path, processed_path)
        
        # Send email notification
        # await send_email_notification(upload_path.name, check_result, "Processing successful")
    else:
        pass
        # Send email notification about the failed check
        # await send_email_notification(upload_path.name, check_result, "Upload check failed")
        
        
async def check_upload_structure(file_path):
    try:
        async with aiofiles.open(file_path, 'r') as f:
            data = json.loads(await f.read())
        
        # Check for required keys
        required_keys = ['config', 'results', 'raw_eval_results', 'raw_logging_results']
        missing_keys = [key for key in required_keys if key not in data]
        
        if missing_keys:
            return {'is_valid': False, 'message': f"Missing required keys: {', '.join(missing_keys)}"}
        
        # Check for specific structure in raw_logging_results
        if not isinstance(data['raw_logging_results'], list):
            return {'is_valid': False, 'message': "raw_logging_results should be a list"}
        
        for item in data['raw_logging_results']:
            if not all(key in item for key in ['weave_task_id', 'inputs', 'outputs']):
                return {'is_valid': False, 'message': "Each item in raw_logging_results should have weave_task_id, inputs, and outputs"}
        
        return {'is_valid': True, 'message': "File structure is valid"}

    except json.JSONDecodeError:
        return {'is_valid': False, 'message': "Invalid JSON format"}
    except Exception as e:
        return {'is_valid': False, 'message': f"Unexpected error: {str(e)}"}
    

async def process_upload(input_path, output_path):
    # Placeholder for processing logic
    # For now, we'll just copy the file
    await asyncio.to_thread(shutil.copy, input_path, output_path)
    print(f"Processed {input_path} to {output_path}")

async def send_email_notification(filename, check_result, status):
    sender_email = "[email protected]"
    receiver_email = "[email protected]"
    password = "your_password"

    message = MIMEMultipart()
    message["From"] = sender_email
    message["To"] = receiver_email
    message["Subject"] = f"Upload Processing Notification: {filename}"

    body = f"""
    File: {filename}
    Status: {status}
    Check Result: {check_result['message']}
    """

    message.attach(MIMEText(body, "plain"))

    await aiosmtplib.send(
        message,
        hostname="smtp.gmail.com",
        port=465,
        use_tls=True,
        username=sender_email,
        password=password
    )