Spaces:
Runtime error
Runtime error
import glob | |
import json | |
import math | |
import os | |
from dataclasses import dataclass | |
import dateutil | |
import numpy as np | |
from src.display.formatting import make_hyperlink | |
from src.display.utils import AutoEvalColumn, ModelType, Tasks, Precision | |
from src.about import Model_Backbone, Training_Dataset, Testing_Type | |
class EvalResult: | |
""" | |
Represents one full evaluation. Built from a combination of the result and request file for a given run. | |
""" | |
eval_name: str # model_training_testing_precision (identifier for evaluations) | |
model_name: str | |
training_dataset_type: Training_Dataset | |
training_dataset: str | |
testing_type: Testing_Type | |
results: dict | |
paper_name: str = "" | |
model_link: str = "" | |
paper_link: str = "" | |
model_backbone_type: Model_Backbone = Model_Backbone.Other | |
model_backbone: str = "" | |
precision: Precision = Precision.Other | |
model_parameters: float = 0 | |
model_type: ModelType = ModelType.Other # Pretrained, fine tuned, ... | |
date: str = "" # submission date of request file | |
def init_from_json_file(self, json_filepath): | |
"""Inits the result from the specific model result file""" | |
with open(json_filepath) as fp: | |
data = json.load(fp) | |
config = data.get("config") | |
# Extract evaluation config | |
model_name = config["model_name"] | |
training_dataset_type = Training_Dataset.from_str(config["training_dataset"]) | |
if training_dataset_type.name != Training_Dataset.Other.name: | |
training_dataset = training_dataset_type.value | |
else: | |
training_dataset = config["training_dataset"] | |
testing_type = Testing_Type(config["testing_type"]) | |
precision = Precision.from_str(config.get("model_dtype")) | |
eval_name = model_name + precision.value + training_dataset + testing_type.value | |
# Extract results | |
results = {} | |
for task in Tasks: | |
task = task.value | |
results[task.metric] = data["results"].get(task.metric, -1) | |
return self( | |
eval_name=eval_name, | |
model_name=model_name, | |
training_dataset_type=training_dataset_type, | |
training_dataset=training_dataset, | |
testing_type=testing_type, | |
precision=precision, | |
results=results, | |
) | |
def update_with_request_file(self, requests_path): | |
"""Finds the relevant request file for the current model and updates info with it""" | |
if self.training_dataset_type.name != Training_Dataset.Other.name: | |
training_dataset_request = self.training_dataset_type.name | |
else: | |
training_dataset_request = self.training_dataset | |
training_dataset_request = "_".join(training_dataset_request.split()) | |
request_file = get_request_file_for_model(requests_path, self.model_name, self.precision.value, training_dataset_request, self.testing_type.value) | |
try: | |
with open(request_file, "r") as f: | |
request = json.load(f) | |
self.model_parameters = request.get("model_parameters", 0) | |
self.model_link = request.get("model_link", "None") | |
self.model_backbone = request.get("model_backbone", "Unknown") | |
self.model_backbone_type = Model_Backbone.from_str(self.model_backbone) | |
self.paper_name = request.get("paper_name", "None") | |
self.paper_link = request.get("paper_link", "None") | |
self.model_type = ModelType.from_str(request.get("model_type", "")) | |
self.date = request.get("submitted_time", "") | |
except Exception: | |
print(f"Could not find request file for {self.model_name} with precision {self.precision.value}, training dataset {self.training_dataset} and testing type {self.testing_type.value}") | |
def to_dict(self): | |
"""Converts the Eval Result to a dict compatible with our dataframe display""" | |
data_dict = { | |
"eval_name": self.eval_name, # not a column, just a save name, | |
AutoEvalColumn.precision.name: self.precision.value, | |
AutoEvalColumn.model_parameters.name: self.model_parameters, | |
AutoEvalColumn.model_name.name: self.model_name, | |
AutoEvalColumn.paper.name: make_hyperlink(self.paper_link, self.paper_name) if self.paper_link.startswith("http") else self.paper_name, | |
AutoEvalColumn.model_backbone_type.name: self.model_backbone_type.value, | |
AutoEvalColumn.model_backbone.name: self.model_backbone, | |
AutoEvalColumn.training_dataset_type.name: self.training_dataset_type.value, | |
AutoEvalColumn.training_dataset.name: self.training_dataset, | |
AutoEvalColumn.testing_type.name: self.testing_type.name, | |
AutoEvalColumn.model_link.name: self.model_link | |
} | |
for task in Tasks: | |
data_dict[task.value.col_name] = self.results[task.value.metric] | |
return data_dict | |
def get_request_file_for_model(requests_path, model_name, precision, training_dataset, testing_type): | |
"""Selects the correct request file for a given model if it's marked as FINISHED""" | |
request_filename = os.path.join( | |
requests_path, | |
model_name, | |
f"{model_name}_eval_request_{precision}_{training_dataset}_{testing_type}.json", | |
) | |
# check for request file | |
try: | |
with open(request_filename, "r") as file: | |
req_content = json.load(file) | |
if req_content["status"] not in ["FINISHED"]: | |
return None | |
except OSError: | |
return None | |
return request_filename | |
def get_raw_eval_results(results_path: str, requests_path: str) -> list[EvalResult]: | |
"""From the path of the results folder root, extract all needed info for results""" | |
model_result_filepaths = [] | |
for root, _, files in os.walk(results_path): | |
# We should only have json files in model results | |
if len(files) == 0 or any([not f.endswith(".json") for f in files]): | |
continue | |
for file in files: | |
model_result_filepaths.append(os.path.join(root, file)) | |
eval_results = {} | |
for model_result_filepath in model_result_filepaths: | |
# Creation of result | |
eval_result = EvalResult.init_from_json_file(model_result_filepath) | |
eval_result.update_with_request_file(requests_path) | |
eval_name = eval_result.eval_name | |
eval_results[eval_name] = eval_result | |
results = [] | |
for v in eval_results.values(): | |
try: | |
v.to_dict() # we test if the dict version is complete | |
results.append(v) | |
except KeyError: # not all eval values present | |
continue | |
return results | |