File size: 11,586 Bytes
27b03dc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
import json
import os
import subprocess
import sys
import tempfile

import click
from pathlib import Path

from datasets import load_dataset

GT_DATASET_NAME = "kostis-init/CP-Bench"
GT_PROBLEM_NAME_COLUMN = "id"
GT_MODEL_CODE_COLUMN = "model"


def exec_code(code: str, timeout=10, modelling_language='cpmpy'):
    """
    Execute the given code and return the output

    :param code: The code to execute as a string
    :param timeout: The maximum time to wait for the code to execute in seconds
    :param modelling_language: The language to use for execution (cpmpy, minizinc, or-tools)
    :return: A tuple of (success, output, timeout_occured)
    """

    # create a temp directory to store the temporary file
    temp_dir_name = "_temp_dir_for_exec_code"
    temp_dir = os.path.join(os.getcwd(), temp_dir_name)
    os.makedirs(temp_dir, exist_ok=True)

    # write the code to a temporary file
    suffix = '.__hidden_py__' if modelling_language == "cpmpy" or modelling_language == "or-tools" else '.mzn'
    with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=suffix, dir=temp_dir, encoding='utf-8') as temp_file:
        temp_instance_path = temp_file.name
        temp_file.write(code)

    try:
        # execute the code
        if modelling_language == "cpmpy" or modelling_language == "or-tools":
            command = [sys.executable, temp_instance_path]
            result = subprocess.run(command, capture_output=True, text=True, timeout=timeout, encoding='utf-8')

            successfully_executed = (result.returncode == 0)
            output = result.stdout if successfully_executed else result.stderr
            timeout_occurred = False
        # elif modelling_language == "minizinc":
        #     successfully_executed, output, timeout_occurred = exec_code_minizinc(code, timeout)
        else:
            raise ValueError(f"MODELLING_LANGUAGE not supported: {modelling_language}")

    except subprocess.TimeoutExpired as e:
        successfully_executed = False
        output = f"Timeout Error: Execution time exceeded {timeout} seconds"
        timeout_occurred = True
    except Exception as e:
        successfully_executed = False
        output = f"Error: {e}"
        timeout_occurred = False

    os.remove(temp_instance_path)

    return successfully_executed, output, timeout_occurred


def validate_submission_file(file_path: Path) -> tuple[bool, str]:
    """Validate the submission file format and content.
    
    Args:
        file_path: Path to the submission file
    
    Returns:
        Tuple of (is_valid, error_message)
    """
    if not file_path.exists():
        return False, f"File {file_path} does not exist"

    if not file_path.name.endswith('.jsonl'):
        return False, "Invalid file format. Please provide a .jsonl file"

    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            found_one = False
            for line_num, line in enumerate(file, 1):
                found_one = True
                try:
                    json_object = json.loads(line)
                    if not all(key in json_object for key in ["id", "model"]):
                        return False, f"Line {line_num}: Missing required keys 'id' and/or 'model'"
                except json.JSONDecodeError:
                    return False, f"Line {line_num}: Invalid JSON format"

            if not found_one:
                return False, "Empty file. Please provide a valid JSONL file"

    except Exception as e:
        return False, f"Error reading file: {str(e)}"

    return True, "File is valid"


def extract_json_from_code_output(output: str):
    try:
        start_index = output.find('{')
        end_index = output.rfind('}') + 1
        # Extract the JSON part
        json_part = output[start_index:end_index]
        return json.loads(json_part)
    except json.JSONDecodeError:
        return None


def add_constraints_as_string(solution):
    """Generate constraints as a string to be added to the original script."""
    constraints = ""
    if solution:  # Ensure solution is not None
        for key, value in solution.items():
            # Basic escaping for string values if they occur, though typically solutions are numeric/boolean
            if isinstance(value, str):
                constraints += f"\nmodel += ({key} == \"{value}\")"
            else:
                constraints += f"\nmodel += ({key} == {value})"
    return constraints


def get_modified_script(script_content, solution):
    """Add constraints to the script content and self-consistency checks."""
    constraints_str = add_constraints_as_string(solution)
    modified_script = f"{script_content}\n{constraints_str}"
    modified_script += """
# Print the absolute path of the current directory along with the script name
import os
print(os.path.abspath(__file__))

# Keep old objective
old_objective = None
if hasattr(model, 'objective_is_min') and model.objective_is_min is not None:
    old_objective = model.objective_value()

# Check self-consistency
if not model.solve():
    print('ERROR: The model is unsatisfiable with the self-consistency constraints')
else:
    print('SUCCESS: Model is consistent')

# Check if the objective value is the same
if old_objective is None:
    print('SUCCESS: No objective defined')
elif model.objective_value() != old_objective:
    print('ERROR: The objective value has changed')
else:
    print('SUCCESS: Objective value is consistent')
"""
    return modified_script

@click.command()
@click.option('--submission_file', required=True, type=click.Path(exists=True, path_type=Path),
              help='Path to the submission JSONL file')
def main(submission_file: Path):
    """Evaluate a submission file for the CP-Bench competition."""
    is_valid, message = validate_submission_file(submission_file)
    if not is_valid:
        click.echo(f"Error: {message}")
        return

    click.echo("Starting evaluation...")

    # load generated models from jsonl to memory
    print(f"  Loading models from file...", flush=True)
    submitted_models = []
    with open(submission_file, "r", encoding="utf-8") as f:
        for line in f:
            try:
                json_obj = json.loads(line)
                submitted_models.append(json_obj)
            except json.JSONDecodeError as e:
                print(f"  ERROR: Failed to parse JSON object from line: {line}. Error: {e}", flush=True)
    print(f"  Loaded {len(submitted_models)} generated models.", flush=True)


    # eval
    total_submitted_models = 0
    models_ran_successfully = 0
    consistency_checks_passed = 0
    objective_checks_passed = 0
    all_checks_passed = 0
    gt_models_found = 0
    
    # Load ground-truth models
    print(f"  Loading ground-truth dataset '{GT_DATASET_NAME}'...", flush=True)
    try:
        gt_dataset = load_dataset(GT_DATASET_NAME, split="train", trust_remote_code=True)
        ground_truth_models = {
            item[GT_PROBLEM_NAME_COLUMN]: item[GT_MODEL_CODE_COLUMN]
            for item in gt_dataset if
            GT_PROBLEM_NAME_COLUMN in item and GT_MODEL_CODE_COLUMN in item and item[GT_MODEL_CODE_COLUMN]
        }
        if not ground_truth_models: raise ValueError("No models in GT dataset.")
        print(f"  Loaded {len(ground_truth_models)} ground-truth models.", flush=True)
    except Exception as e_gt:
        print(f"  CRITICAL ERROR - Failed to load ground-truth dataset: {e_gt}", flush=True)
        return

    # Iterate through downloaded submitted models
    for submitted_model in submitted_models:
        curr_model = submitted_model[GT_MODEL_CODE_COLUMN]

        total_submitted_models += 1
        problem_name = submitted_model[GT_PROBLEM_NAME_COLUMN]
        print(f"\n  Processing model: {problem_name}", flush=True)
        print(f"\n--- Model: {problem_name} ---\n")

        print("    1. Running submitted model...\n")

        succ_exec, output, timeout_occurred = exec_code(curr_model, timeout=60)

        if timeout_occurred:
            print(f"      - TIMEOUT: Execution time exceeded 60 seconds.\n")
            continue
        if not succ_exec:
            print(f"      - FAILED: Execution failed with error: {output}\n")
            continue
        if output is None or not output.strip():
            print(f"      - FAILED: No output from execution.\n")
            continue
        # Attempt to extract JSON from stdout
        generated_solution = extract_json_from_code_output(output)
        if generated_solution is None:
            print(f"      - FAILED: Could not extract JSON solution from output: {output}\n")
            continue

        models_ran_successfully += 1
        print(f"      - SUCCESS: Got solution: {generated_solution}\n")

        print(f"    2. Checking against ground-truth for '{problem_name}'...\n")
        if problem_name not in ground_truth_models:
            print(f"      - FAILED: Ground-truth model for '{problem_name}' not found in dataset.\n")
            continue
        gt_models_found += 1
        ground_truth_script_content = ground_truth_models[problem_name]
        print("      - SUCCESS: Found ground-truth model.\n")

        print("    3. Performing self-consistency check on ground-truth model...\n")
        modified_gt_script = get_modified_script(ground_truth_script_content, generated_solution)

        try:
            with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False, encoding='utf-8') as tmp_file:
                tmp_file.write(modified_gt_script)
                tmp_file_path_str = tmp_file.name

            gt_check_result = subprocess.run(
                [sys.executable, tmp_file_path_str],
                capture_output=True, text=True, timeout=60, encoding='utf-8',
            )
            os.unlink(tmp_file_path_str)

            gt_stdout = gt_check_result.stdout
            if "SUCCESS: Model is consistent" in gt_stdout:
                print("      - CONSISTENCY: PASSED\n")
                consistency_checks_passed += 1
            else:
                print(
                    "      - CONSISTENCY: FAILED (Details in logs or stdout)\n")

            if "SUCCESS: No objective defined" in gt_stdout or "SUCCESS: Objective value is consistent" in gt_stdout:
                print("      - OBJECTIVE: PASSED\n")
                objective_checks_passed += 1
            else:
                print("      - OBJECTIVE: FAILED (Details in logs or stdout)\n")

            if "SUCCESS: Model is consistent" in gt_stdout and (
                    "SUCCESS: No objective defined" in gt_stdout or "SUCCESS: Objective value is consistent" in gt_stdout):
                print("      - SELF-CONSISTENCY CHECK: PASSED fully\n")
                all_checks_passed += 1

        except Exception as e_gt_run:
            print(f"      - SELF-CONSISTENCY CHECK: FAILED (Error: {e_gt_run})\n")

    # Final statistics (write to summary_f)
    print("\n" + "=" * 30 + "\n")
    print("Overall Evaluation:\n")
    print(f"  Total Submitted Models Parsed: {total_submitted_models}\n")
    print(f"  Execution perc: {models_ran_successfully / len(ground_truth_models) * 100:.2f}%\n")
    print(f"  Consistency perc: {consistency_checks_passed / len(ground_truth_models) * 100:.2f}%\n")
    print(f"  Objective perc: {objective_checks_passed / len(ground_truth_models) * 100:.2f}%\n")
    print(f"  Final Solution Accuracy perc: {all_checks_passed / len(ground_truth_models) * 100:.2f}%\n")
    print("-" * 30 + "\n")

    click.echo("Evaluation complete!")


if __name__ == "__main__":
    main()