Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| import io | |
| import os | |
| import re | |
| import pathlib | |
| import shutil | |
| from loguru import logger | |
| import subprocess | |
| from typing import List | |
| UPLOAD_DIRECTORY = pathlib.Path("/app/uploaded_files") | |
| CONFIG_PATH = pathlib.Path("/app/yourbench_config.yml") | |
| # Ensure the upload directory exists | |
| UPLOAD_DIRECTORY.mkdir(parents=True, exist_ok=True) | |
| STAGES = [ | |
| "ingestion", | |
| "upload_ingest_to_hub", | |
| "summarization", | |
| "chunking", | |
| "single_shot_question_generation", | |
| "answer_generation", | |
| # "judge_answers", # to uncomment when fixed | |
| ] | |
| def save_files(files: List[pathlib.Path]) -> str: | |
| """Save uploaded files to the UPLOAD_DIRECTORY safely""" | |
| saved_paths = [] | |
| for file in files: | |
| try: | |
| source_path = pathlib.Path(file) | |
| destination_path = UPLOAD_DIRECTORY / source_path.name | |
| if not source_path.exists(): | |
| print(f"File not found: {source_path}") | |
| continue # Skip missing files | |
| shutil.move(str(source_path), str(destination_path)) | |
| saved_paths.append(str(destination_path)) | |
| except Exception as e: | |
| print(f"Error moving file {file}: {e}") | |
| return ( | |
| f"Files saved to: {', '.join(saved_paths)}" | |
| if saved_paths | |
| else "No files were saved" | |
| ) | |
| class SubprocessManager: | |
| def __init__(self, command): | |
| self.command = command | |
| self.process = None | |
| self.output_stream = io.StringIO() | |
| self.exit_code = None | |
| def start_process(self, custom_env: dict | None): | |
| """Start the subprocess.""" | |
| if self.is_running(): | |
| logger.info("Process is already running") | |
| return | |
| self.output_stream = io.StringIO() | |
| self.exit_code = None | |
| try: | |
| logger.info(f"Starting process with command: {' '.join(self.command)}") | |
| self.process = subprocess.Popen( | |
| self.command, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, # Combine stderr with stdout | |
| text=True, | |
| bufsize=1, | |
| start_new_session=True, | |
| env=custom_env, | |
| ) | |
| os.set_blocking(self.process.stdout.fileno(), False) | |
| logger.info(f"Started process with PID: {self.process.pid}") | |
| except Exception as e: | |
| logger.error(f"Failed to start process: {str(e)}") | |
| return | |
| def read_and_get_output(self): | |
| """Read subprocess output, capture it, and return log and completed stages.""" | |
| current_output = "" | |
| completed_stages = [] | |
| if self.process and self.process.stdout: | |
| try: | |
| while True: | |
| line = self.process.stdout.readline() | |
| if line: | |
| self.output_stream.write(line) | |
| else: | |
| break | |
| except BlockingIOError: | |
| pass | |
| current_output = self.output_stream.getvalue() | |
| completed_stages = list(set(re.findall(r"Successfully completed stage: (\w+)", current_output))) | |
| return current_output, completed_stages | |
| def stop_process(self): | |
| """Terminate the subprocess.""" | |
| if not self.is_running(): | |
| logger.info("Process is not running") | |
| return | |
| logger.info("Sending SIGTERM to the Process") | |
| try: | |
| self.process.terminate() | |
| self.exit_code = self.process.wait(timeout=5) # Wait up to 5 seconds for process to terminate | |
| logger.info(f"Process terminated by user with exit code {self.exit_code}") | |
| except subprocess.TimeoutExpired: | |
| logger.warning("Process did not terminate within timeout, sending SIGKILL") | |
| self.kill_process() | |
| def kill_process(self): | |
| """Forcefully kill the subprocess""" | |
| if not self.is_running(): | |
| logger.info("Process is not running") | |
| return | |
| logger.info("Sending SIGKILL to the Process") | |
| try: | |
| self.process.kill() | |
| self.exit_code = self.process.wait(timeout=5) # Wait up to 5 seconds for process to be killed | |
| logger.info(f"Process killed by user with exit code {self.exit_code}") | |
| except subprocess.TimeoutExpired: | |
| logger.error("Process could not be killed within timeout") | |
| def is_running(self): | |
| """Check if the subprocess is still running""" | |
| if self.process is None: | |
| return False | |
| return self.process.poll() is None | |
| def get_exit_details(self): | |
| """Return exit code and reason if process has terminated""" | |
| if self.process is None: | |
| return None, "Process was never started" | |
| if self.is_running(): | |
| return None, "Process is still running" | |
| if not self.exit_code is None and self.exit_code != 0 : | |
| return self.exit_code, "Process exited abnormaly" | |
| return self.exit_code, "Process exited normaly" | |