rzanoli's picture
Small changes
d4cf66e
raw
history blame
8.5 kB
import glob
import json
import math
import os
from dataclasses import dataclass
import dateutil
import numpy as np
from typing import Dict, Union
#from get_model_info import num_params
from src.display.formatting import make_clickable_model
from src.display.utils import AutoEvalColumn, ModelType, Tasks, Precision, WeightType, FewShotType
from src.submission.check_validity import is_model_on_hub
@dataclass
class EvalResult:
"""Represents one full evaluation. Built from a combination of the result and request file for a given run.
"""
eval_name: str # org_model_precision (uid)
full_model: str # org/model (path on hub)
org: str
model: str
revision: str # commit hash, "" if main
results: Dict[str, Union[float, int]] # float o int
average_CPS: float
is_5fewshot: bool
fewshot_symbol: FewShotType = FewShotType.Unknown
weight_type: WeightType = WeightType.Original # Original or Adapter
architecture: str = "Unknown"
license: str = "?"
likes: int = 0
num_params: int = 0
date: str = "" # submission date of request file
still_on_hub: bool = False
@classmethod
def init_from_json_file(self, json_filepath):
"""Inits the result from the specific model result file"""
with open(json_filepath) as fp:
data = json.load(fp)
config = data.get("config")
#average_CPS = f"{data.get('average_CPS'):.2f}"
# Get average_CPS
average_CPS = float(data.get('average_CPS', 0.0)) # 0.0 come valore di default
# Get number of fewshot
fewshot = config.get("num_fewshot", False)
try:
if fewshot == "5":
is_5fewshot = True
else:
is_5fewshot = False
except ValueError:
is_5fewshot = False
# Determine the few-shot type (ZS or FS) based on num_fewshot
fewshot_symbol = FewShotType.from_num_fewshot(is_5fewshot) # Use the new
# Determine the number of parameters of the models
num_params = int(0)
num_params_billion = config.get("num_params_billion")
if num_params_billion is not None:
num_params = math.ceil(num_params_billion)
# Get model and org
org_and_model = config.get("model_name", config.get("model_args", None))
org_and_model = org_and_model.split("/", 1)
if len(org_and_model) == 1:
org = None
model = org_and_model[0]
#result_key = f"{model}_{precision.value.name}"
result_key = f"{model}_{is_5fewshot}"
else:
org = org_and_model[0]
model = org_and_model[1]
#result_key = f"{org}_{model}_{precision.value.name}"
result_key = f"{org}_{model}_{is_5fewshot}"
full_model = "/".join(org_and_model)
still_on_hub, _, model_config = is_model_on_hub(
full_model, config.get("model_sha", "main"), trust_remote_code=True, test_tokenizer=False
)
architecture = "?"
if model_config is not None:
architectures = getattr(model_config, "architectures", None)
if architectures:
architecture = ";".join(architectures)
# Extract the results of the models
results = {}
for task in Tasks:
task = task.value
for k, v in data["tasks"].items():
if task.benchmark[:-2] == k:
if "Best Prompt Id" in task.col_name:
results[task.benchmark] = int(v[task.metric_type][-1:])
else:
#results[task.benchmark] = f"{v[task.metric_type]:.2f}" # Ensure two decimals for display
results[task.benchmark] = float(v[task.metric_type])
#value = float(v[task.metric_type])
#results[task.benchmark] = round(value, 2) # Arrotonda a 2 decimali
return self(
eval_name=result_key,
full_model=full_model,
org=org,
model=model,
results=results,
average_CPS=average_CPS,
fewshot_symbol=fewshot_symbol,
is_5fewshot=is_5fewshot,
revision= config.get("model_sha", ""),
still_on_hub=still_on_hub,
architecture=architecture,
num_params=num_params
)
'''
def update_with_request_file(self, requests_path):
"""Finds the relevant request file for the current model and updates info with it"""
request_file = get_request_file_for_model(requests_path, self.full_model, self.precision.value.name)
try:
with open(request_file, "r") as f:
request = json.load(f)
self.model_type = ModelType.from_str(request.get("model_type", ""))
self.weight_type = WeightType[request.get("weight_type", "Original")]
self.license = request.get("license", "?")
self.likes = request.get("likes", 0)
self.num_params = request.get("params", 0)
self.date = request.get("submitted_time", "")
except Exception:
print(f"Could not find request file for {self.org}/{self.model} with precision
'''
def to_dict(self):
"""Converts the Eval Result to a dict compatible with our dataframe display"""
average = self.average_CPS
fewshot_symbol = (
self.fewshot_symbol.value.symbol if isinstance(self.fewshot_symbol, FewShotType) else "❓"
)
data_dict = {
"eval_name": self.eval_name, # not a column, just a save name,
#AutoEvalColumn.precision.name: self.precision.value.name,
#AutoEvalColumn.model_type.name: self.model_type.value.name,
#AutoEvalColumn.model_type_symbol.name: self.model_type.value.symbol,
#AutoEvalColumn.model_type.name: self.model_type.value.name if self.model_type else "Unknown",
#AutoEvalColumn.model_type_symbol.name: self.model_type.value.symbol if self.model_type else "Unknown",
AutoEvalColumn.fewshot_symbol.name: fewshot_symbol,
AutoEvalColumn.weight_type.name: self.weight_type.value.name,
AutoEvalColumn.architecture.name: self.architecture,
AutoEvalColumn.model.name: make_clickable_model(self.full_model),
AutoEvalColumn.revision.name: self.revision,
AutoEvalColumn.average.name: average,
AutoEvalColumn.is_5fewshot.name: self.is_5fewshot,
AutoEvalColumn.license.name: self.license,
AutoEvalColumn.likes.name: self.likes,
AutoEvalColumn.params.name: self.num_params,
AutoEvalColumn.still_on_hub.name: self.still_on_hub,
}
for task in Tasks:
data_dict[task.value.col_name] = self.results[task.value.benchmark]
return data_dict
def get_raw_eval_results(results_path: str, requests_path: str) -> list[EvalResult]:
"""From the path of the results folder root, extract all needed info for results"""
model_result_filepaths = []
for root, _, files in os.walk(results_path):
# We should only have json files in model results
if len(files) == 0 or any([not f.endswith(".json") for f in files]):
continue
# Sort the files by date
try:
files.sort(key=lambda x: x.removesuffix(".json").removeprefix("results_")[:-7])
except dateutil.parser._parser.ParserError:
files = [files[-1]]
for file in files:
model_result_filepaths.append(os.path.join(root, file))
eval_results = {}
for model_result_filepath in model_result_filepaths:
# Creation of result
eval_result = EvalResult.init_from_json_file(model_result_filepath)
#eval_result.update_with_request_file(requests_path)
# Store results of same eval together
eval_name = eval_result.eval_name
if eval_name in eval_results.keys():
eval_results[eval_name].results.update({k: v for k, v in eval_result.results.items() if v is not None})
else:
eval_results[eval_name] = eval_result
results = []
for v in eval_results.values():
try:
v.to_dict() # we test if the dict version is complete
results.append(v)
except KeyError: # not all eval values present
continue
return results