karimouda's picture
Code cleanup
dcc7731
import glob
import json
import os
from dataclasses import dataclass
import dateutil
from src.display.formatting import make_clickable_model
from src.display.utils import AutoEvalColumn, EvalDimensions
from src.submission.check_validity import is_model_on_hub
@dataclass
class EvalResult:
"""Represents one full evaluation. Built from a combination of the result and request file for a given run.
"""
eval_name: str # org_model_precision (uid)
full_model: str # org/model (path on hub)
org: str
model: str
results: dict
model_source: str = "" # HF, API, ...
model_category: str = "" #Nano, Small, Medium, Large
date: str = "" # submission date of request file
still_on_hub: bool = False
@classmethod
def init_from_json_file(self, json_filepath):
"""Inits the result from the specific model result file"""
with open(json_filepath) as fp:
data = json.load(fp)
config = data.get("config")
# Get model and org
org_and_model = config.get("model", config.get("model_args", None))
org_and_model = org_and_model.split("/", 1)
if len(org_and_model) == 1:
org = None
model = org_and_model[0]
result_key = f"{model}"
else:
org = org_and_model[0]
model = org_and_model[1]
result_key = f"{org}_{model}"
full_model = "/".join(org_and_model)
still_on_hub, _, _ = is_model_on_hub(
full_model, config.get("model_sha", "main"), trust_remote_code=True, test_tokenizer=False
)
# Extract results available in this file (some results are split in several files)
results = {}
results_obj = data.get("results")
results["average_score"] = results_obj.get("average_score")
results["speed"] = results_obj.get("speed")
results["contamination_score"] = results_obj.get("contamination_score")
scores_by_category = results_obj.get("scores_by_category")
for category_obj in scores_by_category:
category = category_obj["category"]
average_score = category_obj["average_score"]
results[category.lower()] = average_score
return self(
eval_name=result_key,
full_model=full_model,
org=org,
model=model,
model_source=config.get("model_source", ""),
model_category=config.get("model_category", ""),
results=results,
still_on_hub=still_on_hub,
)
def update_with_request_file(self, requests_path):
"""Finds the relevant request file for the current model and updates info with it"""
request_file = get_request_file_for_model(requests_path, self.full_model)
try:
with open(request_file, "r") as f:
request = json.load(f)
self.date = request.get("submitted_time", "")
except Exception:
print(f"Could not find request file for {self.org}/{self.model}")
def to_dict(self):
"""Converts the Eval Result to a dict compatible with our dataframe display"""
average_score = self.results["average_score"]
data_dict = {
"eval_name": self.eval_name, # not a column, just a save name,
AutoEvalColumn.model_source.name: self.model_source,
AutoEvalColumn.model_category.name: self.model_category,
AutoEvalColumn.model.name: make_clickable_model(self.full_model),
AutoEvalColumn.average_score.name: average_score,
}
for eval_dim in EvalDimensions:
dimension_name = eval_dim.value.col_name
try:
dimension_value = self.results[eval_dim.value.metric]
except KeyError:
dimension_value = 0
if dimension_name == "Contamination Score":
dimension_value = 0 if dimension_value < 0 else round(dimension_value,2)
data_dict[dimension_name] = dimension_value
return data_dict
def get_request_file_for_model(requests_path, model_name):
"""Selects the correct request file for a given model. Only keeps runs tagged as FINISHED"""
request_files = os.path.join(
requests_path,
f"{model_name}_eval_request.json",
)
request_files = glob.glob(request_files)
# Select correct request file (precision)
request_file = ""
request_files = sorted(request_files, reverse=True)
for tmp_request_file in request_files:
with open(tmp_request_file, "r") as f:
req_content = json.load(f)
if (
req_content["status"] in ["FINISHED"]
):
request_file = tmp_request_file
return request_file
def get_raw_eval_results(results_path: str, requests_path: str) -> list[EvalResult]:
"""From the path of the results folder root, extract all needed info for results"""
model_result_filepaths = []
for root, _, files in os.walk(results_path):
## we allow HTML files now
#if len(files) == 0 or any([not f.endswith(".json") for f in files]):
# continue
files = [f for f in files if f.endswith(".json")]
# Sort the files by date
try:
files.sort(key=lambda x: x.removesuffix(".json").removeprefix("results_")[:-7])
except dateutil.parser._parser.ParserError as e:
print("Error",e)
files = [files[-1]]
for file in files:
model_result_filepaths.append(os.path.join(root, file))
eval_results = {}
for model_result_filepath in model_result_filepaths:
# Creation of result
eval_result = EvalResult.init_from_json_file(model_result_filepath)
#eval_result.update_with_request_file(requests_path) ##not needed, save processing time
# Store results of same eval together
eval_name = eval_result.eval_name
if eval_name in eval_results.keys():
eval_results[eval_name].results.update({k: v for k, v in eval_result.results.items() if v is not None})
else:
eval_results[eval_name] = eval_result
results = []
for v in eval_results.values():
try:
v.to_dict() # we test if the dict version is complete
results.append(v)
except KeyError: # not all eval values present
print("Key error in eval result, skipping")
continue
return results
def get_model_answers_html_file(results_path, model_name):
model_org,model_name_only = model_name.split("/")
model_answers_prefix = f"{results_path}/{model_org}/"
html_file_content = "EMPTY"
download_file_path = "https://huggingface.co/spaces/silma-ai/Arabic-LLM-Broad-Leaderboard/raw/main/"
for root, _, files in os.walk(model_answers_prefix):
for file_name in files:
if file_name.startswith(f"{model_name_only}_abb_benchmark_answers_"):
file_path = os.path.join(root, file_name)
with open(file_path, "r") as f:
html_file_content = f.read()
download_file_path = download_file_path + file_path.replace("./", "")
break
return html_file_content,download_file_path