File size: 16,608 Bytes
70cc330 0030c6c 70cc330 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 |
import time
import json
import os
import subprocess
import sys
import tempfile
import click
from pathlib import Path
import minizinc
import datetime
from datasets import load_dataset
from tqdm import tqdm
CPMPY_FRAMEWORK = "CPMpy"
MINIZINC_FRAMEWORK = "MiniZinc"
ORTOOLS_FRAMEWORK = "OR-Tools"
GT_DATASET_NAME = "kostis-init/CP-Bench"
GT_PROBLEM_NAME_COLUMN = "id"
GT_MODEL_CODE_COLUMN = "model"
SCRIPT_EXECUTION_TIMEOUT = 60 # seconds
def exec_code_minizinc(code: str, timeout_sec):
"""
Executes a MiniZinc model string using the minizinc-python library.
:param code: The MiniZinc model code as a string.
:param timeout_sec: The maximum time to wait for the solver in seconds.
:return: A tuple of (success, output, timeout_occured)
"""
successfully_executed = False
output = ""
timeout_occurred = False
timeout_duration = datetime.timedelta(seconds=timeout_sec)
try:
# 1. Create a MiniZinc model instance
model = minizinc.Model()
model.add_string(code)
# 2. Find a default solver configured with MiniZinc
# You can be more specific, e.g., solver = minizinc.Solver.lookup("gecode")
# If the default solver isn't found or suitable, this will raise an error.
gecode = minizinc.Solver.lookup("gecode")
if gecode is None:
raise RuntimeError("No suitable solver found. Please install a MiniZinc solver.")
# 3. Create an Instance to solve
instance = minizinc.Instance(gecode, model)
# 4. Solve the instance with the specified timeout
# The solve() method handles the timeout internally.
result = instance.solve(timeout=timeout_duration)
# 5. Process the result
if result.status in {minizinc.Status.SATISFIED, minizinc.Status.OPTIMAL_SOLUTION}:
successfully_executed = True
output = str(result.solution) if result.solution is not None else ""
timeout_occurred = False
elif result.status == minizinc.Status.UNKNOWN:
successfully_executed = False
output = f"Timeout Error: Solver stopped after {timeout_sec} seconds (Status: UNKNOWN)."
timeout_occurred = True
else:
# Handle other non-success statuses (UNSAT, ERROR, etc.)
successfully_executed = False
output = f"Solving failed. Status: {result.status}"
timeout_occurred = False
except minizinc.MiniZincError as e:
# Catch MiniZinc specific errors (e.g., syntax errors, solver not found)
successfully_executed = False
output = f"MiniZinc Error: {e}"
timeout_occurred = False
except Exception as e:
# Catch other unexpected errors
successfully_executed = False
output = f"Unexpected Error during MiniZinc execution: {e}"
timeout_occurred = False
return successfully_executed, output, timeout_occurred
def exec_code(code: str, timeout=10, modelling_language='cpmpy'):
"""
Execute the given code and return the output
:param code: The code to execute as a string
:param timeout: The maximum time to wait for the code to execute in seconds
:param modelling_language: The language to use for execution (cpmpy, minizinc, or-tools)
:return: A tuple of (success, output, timeout_occured)
"""
# create a temp directory to store the temporary file
temp_dir_name = "temp_dir_for_exec_code"
temp_dir = os.path.join(os.getcwd(), temp_dir_name)
os.makedirs(temp_dir, exist_ok=True)
# write the code to a temporary file
suffix = '.__hidden_py__' if modelling_language == CPMPY_FRAMEWORK or modelling_language == ORTOOLS_FRAMEWORK else '.mzn'
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=suffix, dir=temp_dir,
encoding='utf-8') as temp_file:
temp_instance_path = temp_file.name
temp_file.write(code)
try:
# execute the code
if modelling_language == CPMPY_FRAMEWORK or modelling_language == ORTOOLS_FRAMEWORK:
command = [sys.executable, temp_instance_path]
result = subprocess.run(command, capture_output=True, text=True, timeout=timeout, encoding='utf-8')
successfully_executed = (result.returncode == 0)
output = result.stdout if successfully_executed else result.stderr
timeout_occurred = False
elif modelling_language == MINIZINC_FRAMEWORK:
successfully_executed, output, timeout_occurred = exec_code_minizinc(code, timeout)
else:
raise ValueError(f"MODELLING_LANGUAGE not supported: {modelling_language}")
except subprocess.TimeoutExpired as e:
successfully_executed = False
output = f"Timeout Error: Execution time exceeded {timeout} seconds"
timeout_occurred = True
except Exception as e:
successfully_executed = False
output = f"Error: {e}"
timeout_occurred = False
os.remove(temp_instance_path)
return successfully_executed, output, timeout_occurred
def validate_submission_file(file_path: Path) -> tuple[bool, str]:
"""Validate the submission file format and content.
Args:
file_path: Path to the submission file
Returns:
Tuple of (is_valid, error_message)
"""
if not file_path.exists():
return False, f"File {file_path} does not exist"
if not file_path.name.endswith('.jsonl'):
return False, "Invalid file format. Please provide a .jsonl file"
try:
with open(file_path, 'r', encoding='utf-8') as file:
found_one = False
for line_num, line in enumerate(file, 1):
found_one = True
try:
json_object = json.loads(line)
if not all(key in json_object for key in ["id", "model"]):
return False, f"Line {line_num}: Missing required keys 'id' and/or 'model'"
except json.JSONDecodeError:
return False, f"Line {line_num}: Invalid JSON format"
if not found_one:
return False, "Empty file. Please provide a valid JSONL file"
except Exception as e:
return False, f"Error reading file: {str(e)}"
return True, "File is valid"
def extract_json_from_code_output(output: str):
try:
start_index = output.find('{')
end_index = output.rfind('}') + 1
# Extract the JSON part
json_part = output[start_index:end_index]
return json.loads(json_part)
except json.JSONDecodeError:
return None
def add_constraints_as_string(solution):
"""Generate constraints as a string to be added to the original script."""
constraints = ""
if solution: # Ensure solution is not None
for key, value in solution.items():
# Basic escaping for string values if they occur, though typically solutions are numeric/boolean
if isinstance(value, str):
constraints += f"\nmodel += ({key} == \"{value}\")"
else:
constraints += f"\nmodel += ({key} == {value})"
return constraints
def get_modified_script(script_content, solution):
"""Add constraints to the script content and self-consistency checks."""
constraints_str = add_constraints_as_string(solution)
modified_script = f"{script_content}\n{constraints_str}"
modified_script += """
# Print the absolute path of the current directory along with the script name
import os
print(os.path.abspath(__file__))
# Keep old objective
old_objective = None
if hasattr(model, 'objective_is_min') and model.objective_is_min is not None:
old_objective = model.objective_value()
# Check self-consistency
if not model.solve():
print('ERROR: The model is unsatisfiable with the self-consistency constraints')
else:
print('SUCCESS: Model is consistent')
# Check if the objective value is the same
if old_objective is None:
print('SUCCESS: No objective defined')
elif model.objective_value() != old_objective:
print('ERROR: The objective value has changed')
else:
print('SUCCESS: Objective value is consistent')
"""
return modified_script
def evaluate_submission(submitted_models, summary_file_path, modelling_framw, top_lvl_temp_dir):
# Load ground-truth dataset
print(f" Loading ground-truth dataset '{GT_DATASET_NAME}'...", flush=True)
try:
gt_dataset = load_dataset(GT_DATASET_NAME, split="train", trust_remote_code=True)
ground_truth_models = {
item[GT_PROBLEM_NAME_COLUMN]: item[GT_MODEL_CODE_COLUMN]
for item in gt_dataset if
GT_PROBLEM_NAME_COLUMN in item and GT_MODEL_CODE_COLUMN in item and item[GT_MODEL_CODE_COLUMN]
}
if not ground_truth_models: raise ValueError("No models in GT dataset.")
print(f" Loaded {len(ground_truth_models)} ground-truth models.", flush=True)
except Exception as e_gt:
print(f" CRITICAL ERROR - Failed to load ground-truth dataset: {e_gt}", flush=True)
with open(summary_file_path, "w") as f:
f.write(f"CRITICAL ERROR: Failed to load ground-truth dataset '{GT_DATASET_NAME}'.\nError: {e_gt}\n")
return 1
# Statistics
total_submitted_models = 0
models_ran_successfully = 0
consistency_checks_passed = 0
all_checks_passed = 0
with (open(summary_file_path, "w", encoding="utf-8") as summary_f):
summary_f.write(f"Ground-Truth Dataset: {GT_DATASET_NAME}\n")
summary_f.write("-" * 30 + "\n")
# Iterate through downloaded submitted models
for submitted_model in tqdm(submitted_models):
curr_model = submitted_model[GT_MODEL_CODE_COLUMN]
total_submitted_models += 1
problem_name = submitted_model[GT_PROBLEM_NAME_COLUMN]
print(f"\n Processing model: {problem_name}", flush=True)
summary_f.write(f"\n--- Model: {problem_name} ---\n")
summary_f.write(" 1. Running submitted model...\n")
succ_exec, output, timeout_occurred = exec_code(curr_model, timeout=SCRIPT_EXECUTION_TIMEOUT,
modelling_language=modelling_framw)
if succ_exec:
models_ran_successfully += 1
summary_f.write(" - SUCCESS: Model executed successfully.\n")
if timeout_occurred:
summary_f.write(f" - TIMEOUT: Execution time exceeded {SCRIPT_EXECUTION_TIMEOUT} seconds.\n")
continue
if not succ_exec:
summary_f.write(f" - FAILED: Execution failed with error: {output}\n")
continue
if output is None or not output.strip():
summary_f.write(f" - FAILED: No output from execution.\n")
continue
# Attempt to extract JSON from stdout
generated_solution = extract_json_from_code_output(output)
if generated_solution is None:
summary_f.write(f" - FAILED: Could not extract JSON solution from output: {output}\n")
continue
summary_f.write(f" - SUCCESS: Got solution: {generated_solution}\n")
summary_f.write(f" 2. Checking against ground-truth for '{problem_name}'...\n")
if problem_name not in ground_truth_models:
summary_f.write(f" - FAILED: Ground-truth model for '{problem_name}' not found in dataset.\n")
continue
ground_truth_script_content = ground_truth_models[problem_name]
summary_f.write(" - SUCCESS: Found ground-truth model.\n")
summary_f.write(" 3. Performing self-consistency check on ground-truth model...\n")
modified_gt_script = get_modified_script(ground_truth_script_content, generated_solution)
try:
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False, encoding='utf-8',
dir=top_lvl_temp_dir) as tmp_file:
tmp_file.write(modified_gt_script)
tmp_file_path_str = tmp_file.name
gt_check_result = subprocess.run(
[sys.executable, tmp_file_path_str],
capture_output=True, text=True, timeout=SCRIPT_EXECUTION_TIMEOUT, encoding='utf-8',
)
os.unlink(tmp_file_path_str)
gt_stdout = gt_check_result.stdout
if "SUCCESS: Model is consistent" in gt_stdout:
summary_f.write(" - CONSISTENCY: PASSED\n")
consistency_checks_passed += 1
else:
summary_f.write(" - CONSISTENCY: FAILED (Details in logs or stdout)\n")
if "SUCCESS: Model is consistent" in gt_stdout and (
"SUCCESS: No objective defined" in gt_stdout or "SUCCESS: Objective value is consistent" in gt_stdout):
summary_f.write(" - SELF-CONSISTENCY CHECK: PASSED fully\n")
all_checks_passed += 1
except Exception as e_gt_run:
summary_f.write(f" - SELF-CONSISTENCY CHECK: FAILED (Error: {e_gt_run})\n")
# Final statistics (write to summary_f)
summary_f.write("\n" + "=" * 30 + "\n")
summary_f.write("Overall Evaluation Statistics:\n")
summary_f.write(f" Total Submitted Models Parsed: {total_submitted_models}\n")
summary_f.write(f" Models That Ran Successfully (out of the submitted models): {models_ran_successfully}/{total_submitted_models}\n")
summary_f.write(f" Submission coverage perc: {float(total_submitted_models) / len(ground_truth_models) * 100:.2f}%\n")
summary_f.write(f" Error perc: {float(total_submitted_models - models_ran_successfully) / float(total_submitted_models) * 100:.2f}%\n")
summary_f.write(f" Consistency perc: {consistency_checks_passed / len(ground_truth_models) * 100:.2f}%\n")
summary_f.write(f" Final Solution Accuracy perc: {all_checks_passed / len(ground_truth_models) * 100:.2f}%\n")
summary_f.write("-" * 30 + "\n")
@click.command()
@click.option('--submission_file', required=True, type=click.Path(exists=True, path_type=Path),
help='Path to the submission JSONL file')
@click.option('--modelling_framework', required=True,
type=click.Choice([CPMPY_FRAMEWORK, ORTOOLS_FRAMEWORK, MINIZINC_FRAMEWORK]),
help='Modelling framework used in the submission')
def main(submission_file: Path, modelling_framework: str):
"""Evaluate a submission file for the CP-Bench competition."""
is_valid, message = validate_submission_file(submission_file)
if not is_valid:
click.echo(f"Error: {message}")
return
click.echo("Starting evaluation...")
# load generated models from jsonl to memory
print(f" Loading models from file...", flush=True)
submitted_models = []
with open(submission_file, "r", encoding="utf-8") as f:
for line in f:
try:
json_obj = json.loads(line)
submitted_models.append(json_obj)
except json.JSONDecodeError as e:
print(f" ERROR: Failed to parse JSON object from line: {line}. Error: {e}", flush=True)
print(f" Loaded {len(submitted_models)} generated models.", flush=True)
summary_file_path = Path("summary.txt")
top_level_temp_dir = tempfile.mkdtemp(prefix="cp_bench_eval_")
try:
start_time = time.time()
evaluate_submission(submitted_models, summary_file_path, modelling_framework, top_level_temp_dir)
elapsed_time = time.time() - start_time
except Exception as e:
click.echo(f"Error during evaluation: {e}")
return
click.echo("Evaluation complete!")
click.echo(f"Results written to {summary_file_path}")
click.echo(f"Total evaluation time: {elapsed_time:.2f} seconds")
# Clean up temporary directory
if os.path.exists(top_level_temp_dir):
try:
os.rmdir(top_level_temp_dir)
except OSError as e:
click.echo(f"Warning: Could not remove temporary directory {top_level_temp_dir}: {e}")
else:
click.echo(f"Temporary directory {top_level_temp_dir} does not exist, nothing to clean up.")
click.echo("Thank you for your submission!")
if __name__ == "__main__":
main()
|