kostis-init's picture
remove unused constants and redundant imports; update Dockerfile dependencies
5e53d23
raw
history blame
20.3 kB
import datetime
import time
import json
import tempfile
import minizinc
from datasets import load_dataset
from huggingface_hub import HfApi, hf_hub_download
import os
import sys
import subprocess
import threading
from pathlib import Path
from src.config import DATASET_REPO_ID, DS_RESULTS_PATH, CPMPY_FRAMEWORK, ORTOOLS_FRAMEWORK, \
MINIZINC_FRAMEWORK
# --- Configuration ---
GT_DATASET_NAME = "kostis-init/CP-Bench"
# Column names in the Hugging Face dataset for problem identifier and model script
GT_PROBLEM_NAME_COLUMN = "id"
GT_MODEL_CODE_COLUMN = "model"
# Timeout for running individual model scripts (both generated and modified ground-truth)
SCRIPT_EXECUTION_TIMEOUT = 60 # seconds
def run_evaluation(submission_path):
print(f"Starting evaluation for: {submission_path}")
main_eval(DATASET_REPO_ID, submission_path, DS_RESULTS_PATH)
print(f"Evaluation process complete for: {submission_path}", flush=True)
def start_background_evaluation(submission_path):
"""Start evaluation in a background thread."""
thread = threading.Thread(
target=lambda: run_evaluation(submission_path),
daemon=True
)
thread.start()
return True
def extract_json_from_code_output(output: str):
try:
start_index = output.find('{')
end_index = output.rfind('}') + 1
# Extract the JSON part
json_part = output[start_index:end_index]
return json.loads(json_part)
except json.JSONDecodeError:
return None
def exec_code_minizinc(code: str, timeout_sec):
"""
Executes a MiniZinc model string using the minizinc-python library.
:param code: The MiniZinc model code as a string.
:param timeout_sec: The maximum time to wait for the solver in seconds.
:return: A tuple of (success, output, timeout_occured)
"""
successfully_executed = False
output = ""
timeout_occurred = False
timeout_duration = datetime.timedelta(seconds=timeout_sec)
try:
# 1. Create a MiniZinc model instance
model = minizinc.Model()
model.add_string(code)
# 2. Find a default solver configured with MiniZinc
# You can be more specific, e.g., solver = minizinc.Solver.lookup("gecode")
# If the default solver isn't found or suitable, this will raise an error.
gecode = minizinc.Solver.lookup("gecode")
if gecode is None:
raise RuntimeError("No suitable solver found. Please install a MiniZinc solver.")
# 3. Create an Instance to solve
instance = minizinc.Instance(gecode, model)
# 4. Solve the instance with the specified timeout
# The solve() method handles the timeout internally.
result = instance.solve(timeout=timeout_duration)
# 5. Process the result
if result.status in {minizinc.Status.SATISFIED, minizinc.Status.OPTIMAL_SOLUTION}:
successfully_executed = True
output = str(result.solution) if result.solution is not None else ""
timeout_occurred = False
elif result.status == minizinc.Status.UNKNOWN:
successfully_executed = False
output = f"Timeout Error: Solver stopped after {timeout_sec} seconds (Status: UNKNOWN)."
timeout_occurred = True
else:
# Handle other non-success statuses (UNSAT, ERROR, etc.)
successfully_executed = False
output = f"Solving failed. Status: {result.status}"
timeout_occurred = False
except minizinc.MiniZincError as e:
# Catch MiniZinc specific errors (e.g., syntax errors, solver not found)
successfully_executed = False
output = f"MiniZinc Error: {e}"
timeout_occurred = False
except Exception as e:
# Catch other unexpected errors
successfully_executed = False
output = f"Unexpected Error during MiniZinc execution: {e}"
timeout_occurred = False
return successfully_executed, output, timeout_occurred
def exec_code(code: str, timeout=10, modelling_language='cpmpy'):
"""
Execute the given code and return the output
:param code: The code to execute as a string
:param timeout: The maximum time to wait for the code to execute in seconds
:param modelling_language: The language to use for execution (cpmpy, minizinc, or-tools)
:return: A tuple of (success, output, timeout_occured)
"""
# create a temp directory to store the temporary file
temp_dir_name = "temp_dir_for_exec_code"
temp_dir = os.path.join(os.getcwd(), temp_dir_name)
os.makedirs(temp_dir, exist_ok=True)
# write the code to a temporary file
suffix = '.__hidden_py__' if modelling_language == CPMPY_FRAMEWORK or modelling_language == ORTOOLS_FRAMEWORK else '.mzn'
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=suffix, dir=temp_dir, encoding='utf-8') as temp_file:
temp_instance_path = temp_file.name
temp_file.write(code)
try:
# execute the code
if modelling_language == CPMPY_FRAMEWORK or modelling_language == ORTOOLS_FRAMEWORK:
command = [sys.executable, temp_instance_path]
result = subprocess.run(command, capture_output=True, text=True, timeout=timeout, encoding='utf-8')
successfully_executed = (result.returncode == 0)
output = result.stdout if successfully_executed else result.stderr
timeout_occurred = False
elif modelling_language == MINIZINC_FRAMEWORK:
successfully_executed, output, timeout_occurred = exec_code_minizinc(code, timeout)
else:
raise ValueError(f"MODELLING_LANGUAGE not supported: {modelling_language}")
except subprocess.TimeoutExpired as e:
successfully_executed = False
output = f"Timeout Error: Execution time exceeded {timeout} seconds"
timeout_occurred = True
except Exception as e:
successfully_executed = False
output = f"Error: {e}"
timeout_occurred = False
os.remove(temp_instance_path)
return successfully_executed, output, timeout_occurred
def add_constraints_as_string(solution):
"""Generate constraints as a string to be added to the original script."""
constraints = ""
if solution: # Ensure solution is not None
for key, value in solution.items():
# Basic escaping for string values if they occur, though typically solutions are numeric/boolean
if isinstance(value, str):
constraints += f"\nmodel += ({key} == \"{value}\")"
else:
constraints += f"\nmodel += ({key} == {value})"
return constraints
def get_modified_script(script_content, solution):
"""Add constraints to the script content and self-consistency checks."""
constraints_str = add_constraints_as_string(solution)
modified_script = f"{script_content}\n{constraints_str}"
modified_script += """
# Print the absolute path of the current directory along with the script name
import os
print(os.path.abspath(__file__))
# Keep old objective
old_objective = None
if hasattr(model, 'objective_is_min') and model.objective_is_min is not None:
old_objective = model.objective_value()
# Check self-consistency
if not model.solve():
print('ERROR: The model is unsatisfiable with the self-consistency constraints')
else:
print('SUCCESS: Model is consistent')
# Check if the objective value is the same
if old_objective is None:
print('SUCCESS: No objective defined')
elif model.objective_value() != old_objective:
print('ERROR: The objective value has changed')
else:
print('SUCCESS: Objective value is consistent')
"""
return modified_script
# --- Main Evaluation Logic ---
def main_eval(
user_dataset_repo_id: str,
submission_path_in_dataset: str, # e.g., "submissions/uploaded_dir_name"
results_base_path_in_dataset: str # e.g., "results"
):
start_time = time.time()
# Infer submission name for logging and result path generation
submission_name_for_files = Path(submission_path_in_dataset).name
print(f"eval.py: Starting evaluation for submission: '{submission_name_for_files}'", flush=True)
print(f" User Data Repo: {user_dataset_repo_id}", flush=True)
print(f" Submission to download from: {submission_path_in_dataset}", flush=True)
print(f" Results to upload to: {results_base_path_in_dataset}/{submission_name_for_files}", flush=True)
hf_api = HfApi() # Will use HF_TOKEN from environment
# Create a top-level temporary directory for all operations for this eval run
with tempfile.TemporaryDirectory(prefix="eval_run_") as top_level_temp_dir_str:
top_level_temp_dir = Path(top_level_temp_dir_str)
local_submission_dir = top_level_temp_dir / "submissions"
local_result_dir_for_upload = top_level_temp_dir / "results"
os.makedirs(local_submission_dir, exist_ok=True)
os.makedirs(local_result_dir_for_upload, exist_ok=True)
# Path for the summary file within the local temporary result directory
summary_file_path = local_result_dir_for_upload / "summary.txt"
# 1. Download submitted files from HF Dataset
print(f" Downloading submission files from '{submission_path_in_dataset}' to '{local_submission_dir}'...",
flush=True)
try:
# Download the relevant submission file
hf_hub_download(
repo_id=user_dataset_repo_id,
repo_type="dataset",
local_dir=local_submission_dir,
filename=f"{submission_path_in_dataset}/submission.jsonl",
)
print(f" Downloaded submission file successfully.", flush=True)
# Download the metadata file
hf_hub_download(
repo_id=user_dataset_repo_id,
repo_type="dataset",
local_dir=local_submission_dir,
filename=f"{submission_path_in_dataset}/metadata.json",
)
print(f" Downloaded metadata file successfully.", flush=True)
except Exception as e_download:
print(f" CRITICAL ERROR - Failed to download submission files: {e_download}", flush=True)
return 1
# 2. Load ground-truth dataset
print(f" Loading ground-truth dataset '{GT_DATASET_NAME}'...", flush=True)
try:
gt_dataset = load_dataset(GT_DATASET_NAME, split="train", trust_remote_code=True)
ground_truth_models = {
item[GT_PROBLEM_NAME_COLUMN]: item[GT_MODEL_CODE_COLUMN]
for item in gt_dataset if
GT_PROBLEM_NAME_COLUMN in item and GT_MODEL_CODE_COLUMN in item and item[GT_MODEL_CODE_COLUMN]
}
if not ground_truth_models: raise ValueError("No models in GT dataset.")
print(f" Loaded {len(ground_truth_models)} ground-truth models.", flush=True)
except Exception as e_gt:
print(f" CRITICAL ERROR - Failed to load ground-truth dataset: {e_gt}", flush=True)
with open(summary_file_path, "w") as f:
f.write(f"CRITICAL ERROR: Failed to load ground-truth dataset '{GT_DATASET_NAME}'.\nError: {e_gt}\n")
# (Attempt to upload error summary)
return 1
# load generated models from jsonl to memory
print(f" Loading generated models from '{local_submission_dir}'...", flush=True)
submitted_models = []
with open(os.path.join(local_submission_dir, submission_path_in_dataset, "submission.jsonl"), "r", encoding="utf-8") as f:
for line in f:
try:
json_obj = json.loads(line)
submitted_models.append(json_obj)
except json.JSONDecodeError as e:
print(f" ERROR: Failed to parse JSON object from line: {line}. Error: {e}", flush=True)
# load metadata file
with open(os.path.join(local_submission_dir, submission_path_in_dataset, "metadata.json"), "r", encoding="utf-8") as f:
metadata = json.load(f)
print(f" Loaded {len(submitted_models)} generated models.", flush=True)
# Statistics
total_submitted_models = 0
models_ran_successfully = 0
consistency_checks_passed = 0
objective_checks_passed = 0
all_checks_passed = 0
gt_models_found = 0
with open(summary_file_path, "w", encoding="utf-8") as summary_f:
summary_f.write(f"Evaluation Summary for Submission: {submission_name_for_files}\n")
summary_f.write(f"User Data Repo: {user_dataset_repo_id}\n")
summary_f.write(f"Submission Path in Dataset: {submission_path_in_dataset}\n")
summary_f.write(f"Ground-Truth Dataset: {GT_DATASET_NAME}\n")
summary_f.write("-" * 30 + "\n")
# Iterate through downloaded submitted models
for submitted_model in submitted_models:
curr_model = submitted_model[GT_MODEL_CODE_COLUMN]
total_submitted_models += 1
problem_name = submitted_model[GT_PROBLEM_NAME_COLUMN]
print(f"\n Processing downloaded model: {problem_name}", flush=True)
summary_f.write(f"\n--- Model: {problem_name} ---\n")
summary_f.write(" 1. Running submitted model...\n")
succ_exec, output, timeout_occurred = exec_code(curr_model, timeout=SCRIPT_EXECUTION_TIMEOUT, modelling_language=metadata["modelling_framework"])
if timeout_occurred:
summary_f.write(f" - TIMEOUT: Execution time exceeded {SCRIPT_EXECUTION_TIMEOUT} seconds.\n")
continue
if not succ_exec:
summary_f.write(f" - FAILED: Execution failed with error: {output}\n")
continue
if output is None or not output.strip():
summary_f.write(f" - FAILED: No output from execution.\n")
continue
# Attempt to extract JSON from stdout
generated_solution = extract_json_from_code_output(output)
if generated_solution is None:
summary_f.write(f" - FAILED: Could not extract JSON solution from output: {output}\n")
continue
models_ran_successfully += 1
summary_f.write(f" - SUCCESS: Got solution: {generated_solution}\n")
summary_f.write(f" 2. Checking against ground-truth for '{problem_name}'...\n")
if problem_name not in ground_truth_models:
summary_f.write(f" - FAILED: Ground-truth model for '{problem_name}' not found in dataset.\n")
continue
gt_models_found += 1
ground_truth_script_content = ground_truth_models[problem_name]
summary_f.write(" - SUCCESS: Found ground-truth model.\n")
summary_f.write(" 3. Performing self-consistency check on ground-truth model...\n")
modified_gt_script = get_modified_script(ground_truth_script_content, generated_solution)
try:
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False, encoding='utf-8',
dir=top_level_temp_dir) as tmp_file:
tmp_file.write(modified_gt_script)
tmp_file_path_str = tmp_file.name
gt_check_result = subprocess.run(
[sys.executable, tmp_file_path_str],
capture_output=True, text=True, timeout=SCRIPT_EXECUTION_TIMEOUT, encoding='utf-8',
)
os.unlink(tmp_file_path_str)
gt_stdout = gt_check_result.stdout
if "SUCCESS: Model is consistent" in gt_stdout:
summary_f.write(" - CONSISTENCY: PASSED\n")
consistency_checks_passed += 1
else:
summary_f.write(
" - CONSISTENCY: FAILED (Details in logs or stdout)\n")
if "SUCCESS: No objective defined" in gt_stdout or "SUCCESS: Objective value is consistent" in gt_stdout:
summary_f.write(" - OBJECTIVE: PASSED\n")
objective_checks_passed += 1
else:
summary_f.write(" - OBJECTIVE: FAILED (Details in logs or stdout)\n")
if "SUCCESS: Model is consistent" in gt_stdout and ("SUCCESS: No objective defined" in gt_stdout or "SUCCESS: Objective value is consistent" in gt_stdout):
summary_f.write(" - SELF-CONSISTENCY CHECK: PASSED fully\n")
all_checks_passed += 1
except Exception as e_gt_run:
summary_f.write(f" - SELF-CONSISTENCY CHECK: FAILED (Error: {e_gt_run})\n")
# Final statistics (write to summary_f)
summary_f.write("\n" + "=" * 30 + "\n")
summary_f.write("Overall Evaluation Statistics:\n")
summary_f.write(f" Total Submitted Models Parsed: {total_submitted_models}\n")
summary_f.write(f" Models That Ran Successfully: {models_ran_successfully}/{total_submitted_models}\n")
summary_f.write(f" Ground-Truth Models Found: {gt_models_found}/{models_ran_successfully}\n")
summary_f.write(f" Consistency Checks Passed: {consistency_checks_passed}/{gt_models_found}\n")
summary_f.write(f" Objective Value Checks Passed: {objective_checks_passed}/{gt_models_found}\n")
summary_f.write("=" * 30 + "\n")
summary_f.write("Final Evaluation Summary:\n")
summary_f.write(f" Execution perc: {models_ran_successfully / len(ground_truth_models) * 100:.2f}%\n")
summary_f.write(f" Consistency perc: {consistency_checks_passed / len(ground_truth_models) * 100:.2f}%\n")
summary_f.write(f" Objective perc: {objective_checks_passed / len(ground_truth_models) * 100:.2f}%\n")
summary_f.write(f" Final Solution Accuracy perc: {all_checks_passed / len(ground_truth_models) * 100:.2f}%\n")
summary_f.write("-" * 30 + "\n")
# 4. Upload the entire local_result_dir_for_upload to HF Dataset
# This directory contains summary.txt and could contain other result files.
result_path_on_hub = f"{results_base_path_in_dataset}/{submission_name_for_files}"
print(f" Uploading results from '{local_result_dir_for_upload}' to '{result_path_on_hub}' on dataset...",
flush=True)
try:
hf_api.upload_folder(
folder_path=str(local_result_dir_for_upload),
path_in_repo=result_path_on_hub,
repo_id=user_dataset_repo_id,
repo_type="dataset",
commit_message=f"Evaluation results for {submission_name_for_files}"
)
print(" Results uploaded successfully.", flush=True)
except Exception as e_upload:
print(f" CRITICAL ERROR: Failed to upload results: {e_upload}", flush=True)
# The summary.txt was written locally, but upload failed.
elapsed_time = time.time() - start_time
print(f"eval.py: Evaluation finished in {elapsed_time:.2f} seconds.", flush=True)
# return 0
# if __name__ == "__main__":
# if len(sys.argv) < 4:
# print(
# "Usage: python eval.py <user_dataset_repo_id> <submission_path_in_dataset> <results_base_path_in_dataset>")
# print("Example: python eval.py your-username/my-storage submissions/run123 results")
# sys.exit(1)
#
# arg_user_dataset_repo_id = sys.argv[1]
# arg_submission_path_in_dataset = sys.argv[2]
# arg_results_base_path_in_dataset = sys.argv[3]
#
# exit_code = main(arg_user_dataset_repo_id, arg_submission_path_in_dataset, arg_results_base_path_in_dataset)
# sys.exit(exit_code)