|
import json |
|
import os |
|
import subprocess |
|
import sys |
|
import tempfile |
|
|
|
import click |
|
from pathlib import Path |
|
|
|
from datasets import load_dataset |
|
|
|
GT_DATASET_NAME = "kostis-init/CP-Bench" |
|
GT_PROBLEM_NAME_COLUMN = "id" |
|
GT_MODEL_CODE_COLUMN = "model" |
|
|
|
|
|
def exec_code(code: str, timeout=10, modelling_language='cpmpy'): |
|
""" |
|
Execute the given code and return the output |
|
|
|
:param code: The code to execute as a string |
|
:param timeout: The maximum time to wait for the code to execute in seconds |
|
:param modelling_language: The language to use for execution (cpmpy, minizinc, or-tools) |
|
:return: A tuple of (success, output, timeout_occured) |
|
""" |
|
|
|
|
|
temp_dir_name = "_temp_dir_for_exec_code" |
|
temp_dir = os.path.join(os.getcwd(), temp_dir_name) |
|
os.makedirs(temp_dir, exist_ok=True) |
|
|
|
|
|
suffix = '.__hidden_py__' if modelling_language == "cpmpy" or modelling_language == "or-tools" else '.mzn' |
|
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=suffix, dir=temp_dir, encoding='utf-8') as temp_file: |
|
temp_instance_path = temp_file.name |
|
temp_file.write(code) |
|
|
|
try: |
|
|
|
if modelling_language == "cpmpy" or modelling_language == "or-tools": |
|
command = [sys.executable, temp_instance_path] |
|
result = subprocess.run(command, capture_output=True, text=True, timeout=timeout, encoding='utf-8') |
|
|
|
successfully_executed = (result.returncode == 0) |
|
output = result.stdout if successfully_executed else result.stderr |
|
timeout_occurred = False |
|
|
|
|
|
else: |
|
raise ValueError(f"MODELLING_LANGUAGE not supported: {modelling_language}") |
|
|
|
except subprocess.TimeoutExpired as e: |
|
successfully_executed = False |
|
output = f"Timeout Error: Execution time exceeded {timeout} seconds" |
|
timeout_occurred = True |
|
except Exception as e: |
|
successfully_executed = False |
|
output = f"Error: {e}" |
|
timeout_occurred = False |
|
|
|
os.remove(temp_instance_path) |
|
|
|
return successfully_executed, output, timeout_occurred |
|
|
|
|
|
def validate_submission_file(file_path: Path) -> tuple[bool, str]: |
|
"""Validate the submission file format and content. |
|
|
|
Args: |
|
file_path: Path to the submission file |
|
|
|
Returns: |
|
Tuple of (is_valid, error_message) |
|
""" |
|
if not file_path.exists(): |
|
return False, f"File {file_path} does not exist" |
|
|
|
if not file_path.name.endswith('.jsonl'): |
|
return False, "Invalid file format. Please provide a .jsonl file" |
|
|
|
try: |
|
with open(file_path, 'r', encoding='utf-8') as file: |
|
found_one = False |
|
for line_num, line in enumerate(file, 1): |
|
found_one = True |
|
try: |
|
json_object = json.loads(line) |
|
if not all(key in json_object for key in ["id", "model"]): |
|
return False, f"Line {line_num}: Missing required keys 'id' and/or 'model'" |
|
except json.JSONDecodeError: |
|
return False, f"Line {line_num}: Invalid JSON format" |
|
|
|
if not found_one: |
|
return False, "Empty file. Please provide a valid JSONL file" |
|
|
|
except Exception as e: |
|
return False, f"Error reading file: {str(e)}" |
|
|
|
return True, "File is valid" |
|
|
|
|
|
def extract_json_from_code_output(output: str): |
|
try: |
|
start_index = output.find('{') |
|
end_index = output.rfind('}') + 1 |
|
|
|
json_part = output[start_index:end_index] |
|
return json.loads(json_part) |
|
except json.JSONDecodeError: |
|
return None |
|
|
|
|
|
def add_constraints_as_string(solution): |
|
"""Generate constraints as a string to be added to the original script.""" |
|
constraints = "" |
|
if solution: |
|
for key, value in solution.items(): |
|
|
|
if isinstance(value, str): |
|
constraints += f"\nmodel += ({key} == \"{value}\")" |
|
else: |
|
constraints += f"\nmodel += ({key} == {value})" |
|
return constraints |
|
|
|
|
|
def get_modified_script(script_content, solution): |
|
"""Add constraints to the script content and self-consistency checks.""" |
|
constraints_str = add_constraints_as_string(solution) |
|
modified_script = f"{script_content}\n{constraints_str}" |
|
modified_script += """ |
|
# Print the absolute path of the current directory along with the script name |
|
import os |
|
print(os.path.abspath(__file__)) |
|
|
|
# Keep old objective |
|
old_objective = None |
|
if hasattr(model, 'objective_is_min') and model.objective_is_min is not None: |
|
old_objective = model.objective_value() |
|
|
|
# Check self-consistency |
|
if not model.solve(): |
|
print('ERROR: The model is unsatisfiable with the self-consistency constraints') |
|
else: |
|
print('SUCCESS: Model is consistent') |
|
|
|
# Check if the objective value is the same |
|
if old_objective is None: |
|
print('SUCCESS: No objective defined') |
|
elif model.objective_value() != old_objective: |
|
print('ERROR: The objective value has changed') |
|
else: |
|
print('SUCCESS: Objective value is consistent') |
|
""" |
|
return modified_script |
|
|
|
@click.command() |
|
@click.option('--submission_file', required=True, type=click.Path(exists=True, path_type=Path), |
|
help='Path to the submission JSONL file') |
|
def main(submission_file: Path): |
|
"""Evaluate a submission file for the CP-Bench competition.""" |
|
is_valid, message = validate_submission_file(submission_file) |
|
if not is_valid: |
|
click.echo(f"Error: {message}") |
|
return |
|
|
|
click.echo("Starting evaluation...") |
|
|
|
|
|
print(f" Loading models from file...", flush=True) |
|
submitted_models = [] |
|
with open(submission_file, "r", encoding="utf-8") as f: |
|
for line in f: |
|
try: |
|
json_obj = json.loads(line) |
|
submitted_models.append(json_obj) |
|
except json.JSONDecodeError as e: |
|
print(f" ERROR: Failed to parse JSON object from line: {line}. Error: {e}", flush=True) |
|
print(f" Loaded {len(submitted_models)} generated models.", flush=True) |
|
|
|
|
|
|
|
total_submitted_models = 0 |
|
models_ran_successfully = 0 |
|
consistency_checks_passed = 0 |
|
objective_checks_passed = 0 |
|
all_checks_passed = 0 |
|
gt_models_found = 0 |
|
|
|
|
|
print(f" Loading ground-truth dataset '{GT_DATASET_NAME}'...", flush=True) |
|
try: |
|
gt_dataset = load_dataset(GT_DATASET_NAME, split="train", trust_remote_code=True) |
|
ground_truth_models = { |
|
item[GT_PROBLEM_NAME_COLUMN]: item[GT_MODEL_CODE_COLUMN] |
|
for item in gt_dataset if |
|
GT_PROBLEM_NAME_COLUMN in item and GT_MODEL_CODE_COLUMN in item and item[GT_MODEL_CODE_COLUMN] |
|
} |
|
if not ground_truth_models: raise ValueError("No models in GT dataset.") |
|
print(f" Loaded {len(ground_truth_models)} ground-truth models.", flush=True) |
|
except Exception as e_gt: |
|
print(f" CRITICAL ERROR - Failed to load ground-truth dataset: {e_gt}", flush=True) |
|
return |
|
|
|
|
|
for submitted_model in submitted_models: |
|
curr_model = submitted_model[GT_MODEL_CODE_COLUMN] |
|
|
|
total_submitted_models += 1 |
|
problem_name = submitted_model[GT_PROBLEM_NAME_COLUMN] |
|
print(f"\n Processing model: {problem_name}", flush=True) |
|
print(f"\n--- Model: {problem_name} ---\n") |
|
|
|
print(" 1. Running submitted model...\n") |
|
|
|
succ_exec, output, timeout_occurred = exec_code(curr_model, timeout=60) |
|
|
|
if timeout_occurred: |
|
print(f" - TIMEOUT: Execution time exceeded 60 seconds.\n") |
|
continue |
|
if not succ_exec: |
|
print(f" - FAILED: Execution failed with error: {output}\n") |
|
continue |
|
if output is None or not output.strip(): |
|
print(f" - FAILED: No output from execution.\n") |
|
continue |
|
|
|
generated_solution = extract_json_from_code_output(output) |
|
if generated_solution is None: |
|
print(f" - FAILED: Could not extract JSON solution from output: {output}\n") |
|
continue |
|
|
|
models_ran_successfully += 1 |
|
print(f" - SUCCESS: Got solution: {generated_solution}\n") |
|
|
|
print(f" 2. Checking against ground-truth for '{problem_name}'...\n") |
|
if problem_name not in ground_truth_models: |
|
print(f" - FAILED: Ground-truth model for '{problem_name}' not found in dataset.\n") |
|
continue |
|
gt_models_found += 1 |
|
ground_truth_script_content = ground_truth_models[problem_name] |
|
print(" - SUCCESS: Found ground-truth model.\n") |
|
|
|
print(" 3. Performing self-consistency check on ground-truth model...\n") |
|
modified_gt_script = get_modified_script(ground_truth_script_content, generated_solution) |
|
|
|
try: |
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False, encoding='utf-8') as tmp_file: |
|
tmp_file.write(modified_gt_script) |
|
tmp_file_path_str = tmp_file.name |
|
|
|
gt_check_result = subprocess.run( |
|
[sys.executable, tmp_file_path_str], |
|
capture_output=True, text=True, timeout=60, encoding='utf-8', |
|
) |
|
os.unlink(tmp_file_path_str) |
|
|
|
gt_stdout = gt_check_result.stdout |
|
if "SUCCESS: Model is consistent" in gt_stdout: |
|
print(" - CONSISTENCY: PASSED\n") |
|
consistency_checks_passed += 1 |
|
else: |
|
print( |
|
" - CONSISTENCY: FAILED (Details in logs or stdout)\n") |
|
|
|
if "SUCCESS: No objective defined" in gt_stdout or "SUCCESS: Objective value is consistent" in gt_stdout: |
|
print(" - OBJECTIVE: PASSED\n") |
|
objective_checks_passed += 1 |
|
else: |
|
print(" - OBJECTIVE: FAILED (Details in logs or stdout)\n") |
|
|
|
if "SUCCESS: Model is consistent" in gt_stdout and ( |
|
"SUCCESS: No objective defined" in gt_stdout or "SUCCESS: Objective value is consistent" in gt_stdout): |
|
print(" - SELF-CONSISTENCY CHECK: PASSED fully\n") |
|
all_checks_passed += 1 |
|
|
|
except Exception as e_gt_run: |
|
print(f" - SELF-CONSISTENCY CHECK: FAILED (Error: {e_gt_run})\n") |
|
|
|
|
|
print("\n" + "=" * 30 + "\n") |
|
print("Overall Evaluation:\n") |
|
print(f" Total Submitted Models Parsed: {total_submitted_models}\n") |
|
print(f" Execution perc: {models_ran_successfully / len(ground_truth_models) * 100:.2f}%\n") |
|
print(f" Consistency perc: {consistency_checks_passed / len(ground_truth_models) * 100:.2f}%\n") |
|
print(f" Objective perc: {objective_checks_passed / len(ground_truth_models) * 100:.2f}%\n") |
|
print(f" Final Solution Accuracy perc: {all_checks_passed / len(ground_truth_models) * 100:.2f}%\n") |
|
print("-" * 30 + "\n") |
|
|
|
click.echo("Evaluation complete!") |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|