|
|
|
import sys |
|
import os |
|
import time |
|
import json |
|
import subprocess |
|
import tempfile |
|
from pathlib import Path |
|
from datasets import load_dataset |
|
|
|
|
|
|
|
DATASET_NAME = "kostis-init/CP-Bench" |
|
|
|
|
|
PROBLEM_NAME_COLUMN = "id" |
|
MODEL_CODE_COLUMN = "model" |
|
|
|
|
|
SCRIPT_EXECUTION_TIMEOUT = 60 |
|
|
|
|
|
def extract_json_from_string(text_output: str): |
|
""" |
|
Attempts to find and parse the first valid JSON object or array from a string. |
|
Handles cases where JSON is preceded or followed by non-JSON text. |
|
""" |
|
idx = 0 |
|
while idx < len(text_output): |
|
|
|
start_brace = text_output.find('{', idx) |
|
start_bracket = text_output.find('[', idx) |
|
|
|
if start_brace == -1 and start_bracket == -1: |
|
|
|
return None |
|
|
|
|
|
if start_brace != -1 and (start_bracket == -1 or start_brace < start_bracket): |
|
json_start_index = start_brace |
|
else: |
|
json_start_index = start_bracket |
|
|
|
potential_json_segment = text_output[json_start_index:] |
|
|
|
try: |
|
|
|
decoder = json.JSONDecoder() |
|
json_obj, end_index_in_segment = decoder.raw_decode(potential_json_segment) |
|
|
|
return json_obj |
|
except json.JSONDecodeError: |
|
|
|
|
|
idx = json_start_index + 1 |
|
|
|
return None |
|
|
|
|
|
def run_instance(instance_path_str: str, |
|
timeout: int = SCRIPT_EXECUTION_TIMEOUT): |
|
"""Run the instance file and robustly capture the JSON output.""" |
|
command = [sys.executable, instance_path_str] |
|
instance_name = Path(instance_path_str).name |
|
try: |
|
result = subprocess.run(command, capture_output=True, text=True, timeout=timeout, encoding='utf-8', |
|
errors='replace') |
|
|
|
|
|
if result.returncode != 0: |
|
|
|
error_message = result.stderr[:500].strip() if result.stderr else "<No stderr>" |
|
print(f" ERROR: Running {instance_name} (Return Code: {result.returncode}): {error_message}", flush=True) |
|
return None |
|
|
|
|
|
stdout_text = result.stdout |
|
if not stdout_text or not stdout_text.strip(): |
|
print(f" ERROR: No stdout from {instance_name}.", flush=True) |
|
return None |
|
|
|
solution = extract_json_from_string(stdout_text) |
|
|
|
if solution is None: |
|
|
|
abbreviated_stdout = stdout_text.replace('\n', '\\n')[:300] |
|
print( |
|
f" ERROR: Could not extract valid JSON from {instance_name}. Raw stdout (abbreviated): '{abbreviated_stdout}...'", |
|
flush=True) |
|
return None |
|
|
|
return solution |
|
|
|
except subprocess.TimeoutExpired: |
|
print(f" ERROR: Timeout running {instance_name} (>{timeout}s)", flush=True) |
|
return None |
|
except Exception as e: |
|
print(f" ERROR: Unexpected error running {instance_name}: {e}", flush=True) |
|
return None |
|
|
|
|
|
def add_constraints_as_string(solution): |
|
"""Generate constraints as a string to be added to the original script.""" |
|
constraints = "" |
|
if solution: |
|
for key, value in solution.items(): |
|
|
|
if isinstance(value, str): |
|
constraints += f"\nmodel += ({key} == \"{value}\")" |
|
else: |
|
constraints += f"\nmodel += ({key} == {value})" |
|
return constraints |
|
|
|
|
|
def get_modified_script(script_content, solution): |
|
"""Add constraints to the script content and self-consistency checks.""" |
|
constraints_str = add_constraints_as_string(solution) |
|
modified_script = f"{script_content}\n{constraints_str}" |
|
modified_script += """ |
|
|
|
# --- Self-consistency check appended by eval.py --- |
|
# Print the absolute path of the current directory along with the script name |
|
import os |
|
# print(f"DEBUG: Running modified script: {os.path.abspath(__file__)}") # Optional debug |
|
|
|
# Keep old objective |
|
old_objective_value = None |
|
objective_defined = False |
|
if 'model' in locals() and hasattr(model, 'objective_value') and callable(model.objective_value): |
|
try: |
|
# This block assumes 'model' is the CPMpy model object or similar |
|
# Check if an objective is set. Some libraries might not have a direct 'objective_is_min/max' |
|
# or might raise an error if objective_value() is called on an unsolved/unformulated objective. |
|
# This part might need adjustment based on the specific modeling library used in CP-Bench. |
|
# For now, we'll try to get it and catch errors. |
|
# A more robust way might be to inspect model.objective_ |
|
if hasattr(model, '_objective_value'): # cpmpy specific check if objective was set |
|
if model._objective_value is not None: # cpmpy does not have objective_is_min |
|
objective_defined = True |
|
old_objective_value = model.objective_value() |
|
|
|
except Exception as e_obj_check: |
|
# print(f"DEBUG: Could not retrieve initial objective value: {e_obj_check}") |
|
pass # Objective might not be set or model not solved yet. |
|
|
|
# Check self-consistency |
|
solved_ok = False |
|
try: |
|
if 'model' in locals() and hasattr(model, 'solve') and callable(model.solve): |
|
solved_ok = model.solve() |
|
else: |
|
print('ERROR: Model object not found or does not have a solve() method.') |
|
except Exception as e_solve: |
|
print(f'ERROR: Exception during model.solve(): {e_solve}') |
|
solved_ok = False # Ensure it's false on exception |
|
|
|
if not solved_ok: |
|
print('EVAL_OUTPUT: CONSISTENCY_CHECK_RESULT=UNSATISFIABLE') |
|
else: |
|
print('EVAL_OUTPUT: CONSISTENCY_CHECK_RESULT=SUCCESS') |
|
|
|
# Check if the objective value is the same |
|
if not objective_defined: |
|
print('EVAL_OUTPUT: OBJECTIVE_CHECK_RESULT=NO_OBJECTIVE_DEFINED') |
|
else: |
|
try: |
|
current_objective_value = model.objective_value() |
|
# Handle potential floating point inaccuracies if objectives can be floats |
|
if isinstance(old_objective_value, float) or isinstance(current_objective_value, float): |
|
if abs(current_objective_value - old_objective_value) < 1e-6: # Tolerance for float comparison |
|
print('EVAL_OUTPUT: OBJECTIVE_CHECK_RESULT=CONSISTENT') |
|
else: |
|
print(f'EVAL_OUTPUT: OBJECTIVE_CHECK_RESULT=CHANGED (Old: {old_objective_value}, New: {current_objective_value})') |
|
elif current_objective_value != old_objective_value: # Integer comparison |
|
print(f'EVAL_OUTPUT: OBJECTIVE_CHECK_RESULT=CHANGED (Old: {old_objective_value}, New: {current_objective_value})') |
|
else: |
|
print('EVAL_OUTPUT: OBJECTIVE_CHECK_RESULT=CONSISTENT') |
|
except Exception as e_obj_final: |
|
print(f'EVAL_OUTPUT: OBJECTIVE_CHECK_RESULT=ERROR_ACCESSING_FINAL_OBJECTIVE ({e_obj_final})') |
|
|
|
""" |
|
return modified_script |
|
|
|
|
|
|
|
def main(submission_path_str: str, results_base_dir_str: str): |
|
start_time = time.time() |
|
print(f"eval.py: Starting evaluation for submission at '{submission_path_str}'", flush=True) |
|
print(f"eval.py: Results will be saved relative to '{results_base_dir_str}'", flush=True) |
|
print(f"eval.py: Loading ground-truth dataset '{DATASET_NAME}' from Hugging Face.", flush=True) |
|
|
|
submission_path = Path(submission_path_str) |
|
submission_name = submission_path.name |
|
result_dir_for_submission = Path(results_base_dir_str) / f"{submission_name}_result" |
|
os.makedirs(result_dir_for_submission, exist_ok=True) |
|
summary_file_path = result_dir_for_submission / "summary.txt" |
|
|
|
|
|
try: |
|
|
|
gt_dataset = load_dataset(DATASET_NAME, split="train") |
|
ground_truth_models = { |
|
item[PROBLEM_NAME_COLUMN]: item[MODEL_CODE_COLUMN] |
|
for item in gt_dataset |
|
if PROBLEM_NAME_COLUMN in item and MODEL_CODE_COLUMN in item and item[MODEL_CODE_COLUMN] |
|
} |
|
if not ground_truth_models: |
|
raise ValueError( |
|
f"No models found in dataset. Check PROBLEM_NAME_COLUMN ('{PROBLEM_NAME_COLUMN}') and MODEL_CODE_COLUMN ('{MODEL_CODE_COLUMN}').") |
|
print(f"eval.py: Loaded {len(ground_truth_models)} ground-truth models from Hugging Face.", flush=True) |
|
except Exception as e: |
|
print(f"eval.py: CRITICAL ERROR - Failed to load ground-truth dataset: {e}", flush=True) |
|
with open(summary_file_path, "w") as f: |
|
f.write(f"CRITICAL ERROR: Failed to load ground-truth dataset '{DATASET_NAME}'.\nError: {e}\n") |
|
return 1 |
|
|
|
|
|
total_submitted_models = 0 |
|
models_ran_successfully = 0 |
|
gt_models_found = 0 |
|
consistency_checks_passed = 0 |
|
objective_checks_passed = 0 |
|
|
|
with open(summary_file_path, "w") as summary_f: |
|
summary_f.write(f"Evaluation Summary for Submission: {submission_name}\n") |
|
summary_f.write( |
|
f"Ground-Truth Dataset: {DATASET_NAME}\n") |
|
summary_f.write("-" * 30 + "\n") |
|
|
|
submitted_model_files = list(submission_path.glob('*.py')) |
|
if not submitted_model_files: |
|
summary_f.write("No .py model files found in submission.\n") |
|
print("eval.py: No .py model files found in submission.", flush=True) |
|
return 0 |
|
|
|
for model_file_path in submitted_model_files: |
|
total_submitted_models += 1 |
|
problem_name = model_file_path.stem |
|
print(f"\nProcessing submitted model: {model_file_path.name}", flush=True) |
|
summary_f.write(f"\n--- Model: {model_file_path.name} ---\n") |
|
|
|
|
|
summary_f.write(" 1. Running submitted model...\n") |
|
generated_solution = run_instance(str(model_file_path)) |
|
if generated_solution is None: |
|
summary_f.write(" - FAILED to run or get valid JSON solution from submitted model.\n") |
|
continue |
|
models_ran_successfully += 1 |
|
summary_f.write(f" - SUCCESS: Got solution. (e.g., {str(list(generated_solution.items())[:2])}...)\n") |
|
|
|
|
|
summary_f.write(f" 2. Checking against ground-truth for '{problem_name}'...\n") |
|
if problem_name not in ground_truth_models: |
|
summary_f.write(f" - FAILED: Ground-truth model for '{problem_name}' not found in dataset.\n") |
|
print(f" WARNING: Ground-truth for '{problem_name}' not found in dataset.", flush=True) |
|
continue |
|
gt_models_found += 1 |
|
ground_truth_script_content = ground_truth_models[problem_name] |
|
summary_f.write(" - SUCCESS: Found ground-truth model.\n") |
|
|
|
|
|
summary_f.write(" 3. Performing self-consistency check on ground-truth model...\n") |
|
modified_gt_script = get_modified_script(ground_truth_script_content, generated_solution) |
|
|
|
consistency_passed_this_model = False |
|
objective_passed_this_model = False |
|
|
|
try: |
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False, encoding='utf-8') as tmp_file: |
|
tmp_file.write(modified_gt_script) |
|
tmp_file_path_str = tmp_file.name |
|
|
|
|
|
gt_check_result = subprocess.run( |
|
[sys.executable, tmp_file_path_str], |
|
capture_output=True, text=True, timeout=SCRIPT_EXECUTION_TIMEOUT |
|
) |
|
os.unlink(tmp_file_path_str) |
|
|
|
|
|
gt_stdout = gt_check_result.stdout |
|
gt_stderr = gt_check_result.stderr |
|
|
|
if gt_stderr: |
|
summary_f.write(f" Modified GT STDERR: {gt_stderr[:500]}...\n") |
|
|
|
if "EVAL_OUTPUT: CONSISTENCY_CHECK_RESULT=SUCCESS" in gt_stdout: |
|
summary_f.write(" - CONSISTENCY: PASSED\n") |
|
consistency_checks_passed += 1 |
|
consistency_passed_this_model = True |
|
elif "EVAL_OUTPUT: CONSISTENCY_CHECK_RESULT=UNSATISFIABLE" in gt_stdout: |
|
summary_f.write(" - CONSISTENCY: FAILED (Model became unsatisfiable)\n") |
|
else: |
|
summary_f.write(" - CONSISTENCY: FAILED (Could not determine consistency from output)\n") |
|
|
|
if "EVAL_OUTPUT: OBJECTIVE_CHECK_RESULT=CONSISTENT" in gt_stdout or \ |
|
"EVAL_OUTPUT: OBJECTIVE_CHECK_RESULT=NO_OBJECTIVE_DEFINED" in gt_stdout: |
|
summary_f.write(" - OBJECTIVE: PASSED (Consistent or no objective)\n") |
|
objective_checks_passed += 1 |
|
objective_passed_this_model = True |
|
elif "EVAL_OUTPUT: OBJECTIVE_CHECK_RESULT=CHANGED" in gt_stdout: |
|
summary_f.write(f" - OBJECTIVE: FAILED (Value changed)\n") |
|
elif "EVAL_OUTPUT: OBJECTIVE_CHECK_RESULT=ERROR_ACCESSING_FINAL_OBJECTIVE" in gt_stdout: |
|
summary_f.write(f" - OBJECTIVE: FAILED (Error accessing final objective)\n") |
|
else: |
|
summary_f.write(" - OBJECTIVE: FAILED (Could not determine objective consistency from output)\n") |
|
|
|
except subprocess.TimeoutExpired: |
|
summary_f.write( |
|
f" - SELF-CONSISTENCY CHECK: FAILED (Timeout >{SCRIPT_EXECUTION_TIMEOUT}s running modified ground-truth)\n") |
|
print(f" ERROR: Timeout running modified GT for {problem_name}", flush=True) |
|
except Exception as e_gt_run: |
|
summary_f.write( |
|
f" - SELF-CONSISTENCY CHECK: FAILED (Error running modified ground-truth: {e_gt_run})\n") |
|
print(f" ERROR: Running modified GT for {problem_name}: {e_gt_run}", flush=True) |
|
|
|
|
|
summary_f.write("\n" + "=" * 30 + "\n") |
|
summary_f.write("Overall Evaluation Statistics:\n") |
|
summary_f.write(f" Total Submitted Models Parsed: {total_submitted_models}\n") |
|
summary_f.write( |
|
f" Models That Ran Successfully (produced solution): {models_ran_successfully}/{total_submitted_models}\n") |
|
summary_f.write( |
|
f" Corresponding Ground-Truth Models Found: {gt_models_found}/{models_ran_successfully} (of those that ran)\n") |
|
summary_f.write(f" Consistency Checks Passed: {consistency_checks_passed}/{gt_models_found}\n") |
|
summary_f.write(f" Objective Value Checks Passed: {objective_checks_passed}/{gt_models_found}\n") |
|
|
|
|
|
fully_passed_models = 0 |
|
|
|
|
|
overall_score = consistency_checks_passed + objective_checks_passed |
|
summary_f.write(f"\nScore: {overall_score} (Raw sum of passed checks)\n") |
|
|
|
elapsed_time = time.time() - start_time |
|
print(f"eval.py: Evaluation finished in {elapsed_time:.2f} seconds.", flush=True) |
|
print(f"eval.py: Summary written to {summary_file_path}", flush=True) |
|
return 0 |
|
|
|
|
|
if __name__ == "__main__": |
|
if len(sys.argv) < 3: |
|
print("Usage: python eval.py <path_to_submitted_directory> <path_to_results_base_directory>") |
|
print("Example: python eval.py ./submissions/my_run ./results") |
|
sys.exit(1) |
|
|
|
submission_dir = sys.argv[1] |
|
results_base_dir = sys.argv[2] |
|
|
|
|
|
if not Path(submission_dir).is_dir(): |
|
print(f"Error: Submission directory '{submission_dir}' not found or not a directory.") |
|
sys.exit(1) |
|
|
|
exit_code = main(submission_dir, results_base_dir) |
|
sys.exit(exit_code) |