Spaces:
Running
Running
| import yaml | |
| import subprocess | |
| import os | |
| YAML_PATH = "./config.yaml" | |
| PIPE_PATH = "./tmp/pipe" | |
| class Dumper(yaml.Dumper): | |
| def increase_indent(self, flow=False, *args, **kwargs): | |
| return super().increase_indent(flow=flow, indentless=False) | |
| # read scanners from yaml file | |
| # return a list of scanners | |
| def read_scanners(path): | |
| scanners = [] | |
| with open(path, "r") as f: | |
| config = yaml.load(f, Loader=yaml.FullLoader) | |
| scanners = config.get("detectors", []) | |
| return scanners | |
| # convert a list of scanners to yaml file | |
| def write_scanners(scanners): | |
| print(scanners) | |
| with open(YAML_PATH, "r+") as f: | |
| config = yaml.load(f, Loader=yaml.FullLoader) | |
| if config: | |
| config["detectors"] = scanners | |
| # save scanners to detectors in yaml | |
| yaml.dump(config, f, Dumper=Dumper) | |
| # read model_type from yaml file | |
| def read_inference_type(path): | |
| inference_type = "" | |
| with open(path, "r") as f: | |
| config = yaml.load(f, Loader=yaml.FullLoader) | |
| inference_type = config.get("inference_type", "") | |
| return inference_type | |
| # write model_type to yaml file | |
| def write_inference_type(use_inference): | |
| with open(YAML_PATH, "r+") as f: | |
| config = yaml.load(f, Loader=yaml.FullLoader) | |
| if use_inference: | |
| config["inference_type"] = 'hf_inference_api' | |
| else: | |
| config["inference_type"] = 'hf_pipeline' | |
| # save inference_type to inference_type in yaml | |
| yaml.dump(config, f, Dumper=Dumper) | |
| # read column mapping from yaml file | |
| def read_column_mapping(path): | |
| column_mapping = {} | |
| with open(path, "r") as f: | |
| config = yaml.load(f, Loader=yaml.FullLoader) | |
| column_mapping = config.get("column_mapping", dict()) | |
| return column_mapping | |
| # write column mapping to yaml file | |
| def write_column_mapping(mapping): | |
| with open(YAML_PATH, "r") as f: | |
| config = yaml.load(f, Loader=yaml.FullLoader) | |
| if config is None: | |
| return | |
| if mapping is None and "column_mapping" in config.keys(): | |
| del config["column_mapping"] | |
| else: | |
| config["column_mapping"] = mapping | |
| with open(YAML_PATH, "w") as f: | |
| # save column_mapping to column_mapping in yaml | |
| yaml.dump(config, f, Dumper=Dumper) | |
| # convert column mapping dataframe to json | |
| def convert_column_mapping_to_json(df, label=""): | |
| column_mapping = {} | |
| column_mapping[label] = [] | |
| for _, row in df.iterrows(): | |
| column_mapping[label].append(row.tolist()) | |
| return column_mapping | |
| def write_log_to_user_file(id, log): | |
| with open(f"./tmp/{id}_log", "a") as f: | |
| f.write(log) | |
| def save_job_to_pipe(id, job, lock): | |
| if not os.path.exists('./tmp'): | |
| os.makedirs('./tmp') | |
| job = [str(i) for i in job] | |
| job = ",".join(job) | |
| print(job) | |
| with lock: | |
| with open(PIPE_PATH, "a") as f: | |
| # write each element in job | |
| f.write(f'{id}@{job}\n') | |
| def pop_job_from_pipe(): | |
| if not os.path.exists(PIPE_PATH): | |
| return | |
| with open(PIPE_PATH, "r") as f: | |
| job = f.readline().strip() | |
| remaining = f.readlines() | |
| f.close() | |
| print(job, remaining, ">>>>") | |
| with open(PIPE_PATH, "w") as f: | |
| f.write("\n".join(remaining)) | |
| f.close() | |
| if len(job) == 0: | |
| return | |
| job_info = job.split('\n')[0].split("@") | |
| if len(job_info) != 2: | |
| raise ValueError("Invalid job info: ", job_info) | |
| write_log_to_user_file(job_info[0], f"Running job {job_info}") | |
| command = job_info[1].split(",") | |
| write_log_to_user_file(job_info[0], f"Running command {command}") | |
| log_file = open(f"./tmp/{job_info[0]}_log", "a") | |
| subprocess.Popen( | |
| command, | |
| cwd=os.path.join(os.path.dirname(os.path.realpath(__file__)), "cicd"), | |
| stdout=log_file, | |
| stderr=log_file, | |
| ) | |