Spaces:
Running
Running
| import json | |
| import logging | |
| import os | |
| import subprocess | |
| import threading | |
| import time | |
| from pathlib import Path | |
| import pipe | |
| from app_env import ( | |
| HF_GSK_HUB_HF_TOKEN, | |
| HF_GSK_HUB_KEY, | |
| HF_GSK_HUB_PROJECT_KEY, | |
| HF_GSK_HUB_UNLOCK_TOKEN, | |
| HF_GSK_HUB_URL, | |
| HF_REPO_ID, | |
| HF_SPACE_ID, | |
| HF_WRITE_TOKEN, | |
| ) | |
| from io_utils import LOG_FILE, get_yaml_path, write_log_to_user_file | |
| from isolated_env import prepare_venv | |
| from leaderboard import LEADERBOARD | |
| is_running = False | |
| logger = logging.getLogger(__file__) | |
| def start_process_run_job(): | |
| try: | |
| logging.debug("Running jobs in thread") | |
| global thread, is_running | |
| thread = threading.Thread(target=run_job) | |
| thread.daemon = True | |
| is_running = True | |
| thread.start() | |
| except Exception as e: | |
| print("Failed to start thread: ", e) | |
| def stop_thread(): | |
| logging.debug("Stop thread") | |
| global is_running | |
| is_running = False | |
| def prepare_env_and_get_command( | |
| m_id, | |
| d_id, | |
| config, | |
| split, | |
| inference, | |
| inference_token, | |
| uid, | |
| label_mapping, | |
| feature_mapping, | |
| ): | |
| leaderboard_dataset = None | |
| if os.environ.get("SPACE_ID") == "giskardai/giskard-evaluator": | |
| leaderboard_dataset = LEADERBOARD | |
| inference_type = "hf_pipeline" | |
| if inference and inference_token: | |
| inference_type = "hf_inference_api" | |
| executable = "giskard_scanner" | |
| try: | |
| # Copy the current requirements (might be changed) | |
| with open("requirements.txt", "r") as f: | |
| executable = prepare_venv( | |
| uid, | |
| "\n".join(f.readlines()), | |
| ) | |
| logger.info(f"Using {executable} as executable") | |
| except Exception as e: | |
| logger.warn(f"Create env failed due to {e}, using the current env as fallback.") | |
| executable = "giskard_scanner" | |
| command = [ | |
| executable, | |
| "--loader", | |
| "huggingface", | |
| "--model", | |
| m_id, | |
| "--dataset", | |
| d_id, | |
| "--dataset_config", | |
| config, | |
| "--dataset_split", | |
| split, | |
| "--output_format", | |
| "markdown", | |
| "--output_portal", | |
| "huggingface", | |
| "--feature_mapping", | |
| json.dumps(feature_mapping), | |
| "--label_mapping", | |
| json.dumps(label_mapping), | |
| "--scan_config", | |
| get_yaml_path(uid), | |
| "--inference_type", | |
| inference_type, | |
| "--inference_api_token", | |
| inference_token, | |
| ] | |
| # The token to publish post | |
| if os.environ.get(HF_WRITE_TOKEN): | |
| command.append("--hf_token") | |
| command.append(os.environ.get(HF_WRITE_TOKEN)) | |
| # The repo to publish post | |
| if os.environ.get(HF_REPO_ID) or os.environ.get(HF_SPACE_ID): | |
| command.append("--discussion_repo") | |
| # TODO: Replace by the model id | |
| command.append(os.environ.get(HF_REPO_ID) or os.environ.get(HF_SPACE_ID)) | |
| # The repo to publish for ranking | |
| if leaderboard_dataset: | |
| command.append("--leaderboard_dataset") | |
| command.append(leaderboard_dataset) | |
| # The info to upload to Giskard hub | |
| if os.environ.get(HF_GSK_HUB_KEY): | |
| command.append("--giskard_hub_api_key") | |
| command.append(os.environ.get(HF_GSK_HUB_KEY)) | |
| if os.environ.get(HF_GSK_HUB_URL): | |
| command.append("--giskard_hub_url") | |
| command.append(os.environ.get(HF_GSK_HUB_URL)) | |
| if os.environ.get(HF_GSK_HUB_PROJECT_KEY): | |
| command.append("--giskard_hub_project_key") | |
| command.append(os.environ.get(HF_GSK_HUB_PROJECT_KEY)) | |
| if os.environ.get(HF_GSK_HUB_HF_TOKEN): | |
| command.append("--giskard_hub_hf_token") | |
| command.append(os.environ.get(HF_GSK_HUB_HF_TOKEN)) | |
| if os.environ.get(HF_GSK_HUB_UNLOCK_TOKEN): | |
| command.append("--giskard_hub_unlock_token") | |
| command.append(os.environ.get(HF_GSK_HUB_UNLOCK_TOKEN)) | |
| eval_str = f"[{m_id}]<{d_id}({config}, {split} set)>" | |
| write_log_to_user_file( | |
| uid, | |
| f"Start local evaluation on {eval_str}. Please wait for your job to start...\n", | |
| ) | |
| return command | |
| def save_job_to_pipe(task_id, job, description, lock): | |
| with lock: | |
| pipe.jobs.append((task_id, job, description)) | |
| def pop_job_from_pipe(): | |
| if len(pipe.jobs) == 0: | |
| return | |
| job_info = pipe.jobs.pop() | |
| pipe.current = job_info[2] | |
| task_id = job_info[0] | |
| # Link to LOG_FILE | |
| log_file_path = Path(LOG_FILE) | |
| if log_file_path.exists(): | |
| log_file_path.unlink() | |
| os.symlink(f"./tmp/{task_id}.log", LOG_FILE) | |
| write_log_to_user_file(task_id, f"Running job id {task_id}\n") | |
| command = prepare_env_and_get_command(*job_info[1]) | |
| with open(f"./tmp/{task_id}.log", "a") as log_file: | |
| p = subprocess.Popen(command, stdout=log_file, stderr=subprocess.STDOUT) | |
| p.wait() | |
| pipe.current = None | |
| def run_job(): | |
| global is_running | |
| while is_running: | |
| try: | |
| pop_job_from_pipe() | |
| time.sleep(10) | |
| except KeyboardInterrupt: | |
| logging.debug("KeyboardInterrupt stop background thread") | |
| is_running = False | |
| break | |