Spaces:
Sleeping
Sleeping
import numpy as np | |
import pandas as pd | |
import wandb | |
from typing import Any, Dict, List, Optional, Tuple | |
import json | |
import bittensor as bt | |
from dataclasses import dataclass | |
import os | |
WANDB_TOKEN = os.environ.get("WANDB_TOKEN") | |
NETUID = 80 | |
DELAY_SECS = 3 | |
RETRIES = 3 | |
class ModelData: | |
uid: int | |
hotkey: str | |
competition_id: int | |
namespace: str | |
name: str | |
commit: str | |
# Hash of (hash(model) + hotkey) | |
secure_hash: str | |
block: int | |
incentive: float | |
emission: float | |
def from_compressed_str( | |
cls, | |
uid: int, | |
hotkey: str, | |
cs: str, | |
block: int, | |
incentive: float, | |
emission: float, | |
): | |
"""Returns an instance of this class from a compressed string representation""" | |
tokens = cs.split(":") | |
return ModelData( | |
uid=uid, | |
hotkey=hotkey, | |
namespace=tokens[0], | |
name=tokens[1], | |
commit=tokens[2], | |
secure_hash=tokens[3], | |
competition_id=int(tokens[4]), | |
block=block, | |
incentive=incentive, | |
emission=emission, | |
) | |
def run_with_retries(func, *args, **kwargs): | |
"""Runs a provided function with retries in the event of a failure.""" | |
for i in range(0, RETRIES): | |
try: | |
return func(*args, **kwargs) | |
except (Exception, RuntimeError): | |
print(f"Failed to run function: {traceback.format_exc()}") | |
if i == RETRIES - 1: | |
raise | |
time.sleep(DELAY_SECS) | |
raise RuntimeError("Should never happen") | |
def get_wandb_runs( | |
project: str, filters: Dict[str, Any], order: str = "-created_at" | |
) -> List: | |
"""Get the latest runs from Wandb, retrying infinitely until we get them. | |
Args: | |
project (str): The Wandb project to get runs from. | |
filters (Dict[str, Any]): Filters to apply to the runs. | |
order (str): Order to sort the runs by. Defaults to "-created_at" (newest first) | |
Returns: | |
List: List of runs matching the provided filters | |
""" | |
while True: | |
api = wandb.Api(api_key=WANDB_TOKEN, timeout=100) | |
runs = list( | |
api.runs( | |
project, | |
filters=filters, | |
order=order, | |
) | |
) | |
if len(runs) > 0: | |
return runs | |
# WandDB API is quite unreliable. Wait another minute and try again. | |
print("Failed to get runs from Wandb. Trying again in 60 seconds.") | |
time.sleep(60) | |
def get_scores( | |
uids: List[int], | |
wandb_runs: List, | |
) -> Dict[int, Dict[str, Optional[float]]]: | |
"""Returns the most recent scores for the provided UIDs. | |
Args: | |
uids (List[int]): List of UIDs to get scores for. | |
wandb_runs (List): List of validator runs from Wandb. Requires the runs are provided in descending order. | |
""" | |
result = {} | |
previous_timestamp = None | |
seen_competitions = set() | |
# Iterate through the runs until we've processed all the uids. | |
for i, run in enumerate(wandb_runs): | |
if not "original_format_json" in run.summary: | |
continue | |
data = json.loads(run.summary["original_format_json"]) | |
all_uid_data = data["uid_data"] | |
timestamp = data["timestamp"] | |
# Make sure runs are indeed in descending time order. | |
# assert ( | |
# previous_timestamp is None or timestamp < previous_timestamp | |
# ), f"Timestamps are not in descending order: {timestamp} >= {previous_timestamp}" | |
previous_timestamp = timestamp | |
comp_id = data.get("competition_id", None) | |
for uid in uids: | |
if uid in result: | |
continue | |
if str(uid) in all_uid_data: | |
uid_data = all_uid_data[str(uid)] | |
# Only the most recent run per competition is fresh. | |
is_fresh = comp_id not in seen_competitions | |
result[uid] = { | |
"avg_loss": uid_data.get("average_loss", None), | |
"win_rate": uid_data.get("win_rate", None), | |
"win_total": uid_data.get("win_total", None), | |
"weight": uid_data.get("weight", None), | |
"competition_id": uid_data.get("competition_id", None), | |
"fresh": is_fresh, | |
} | |
seen_competitions.add(comp_id) | |
break | |
return result | |
def get_subnet_data( | |
subtensor: bt.subtensor, metagraph: bt.metagraph | |
) -> List[ModelData]: | |
result = [] | |
for uid in metagraph.uids.tolist(): | |
hotkey = metagraph.hotkeys[uid] | |
metadata = None | |
try: | |
metadata = run_with_retries( | |
functools.partial(get_metadata, subtensor, metagraph.netuid, hotkey) | |
) | |
except: | |
print(f"Failed to get metadata for UID {uid}: {traceback.format_exc()}") | |
if not metadata: | |
continue | |
commitment = metadata["info"]["fields"][0] | |
hex_data = commitment[list(commitment.keys())[0]][2:] | |
chain_str = bytes.fromhex(hex_data).decode() | |
block = metadata["block"] | |
incentive = np.nan_to_num(metagraph.incentive[uid]).item() | |
emission = ( | |
np.nan_to_num(metagraph.emission[uid]).item() * 20 | |
) # convert to daily TAO | |
model_data = None | |
try: | |
model_data = ModelData.from_compressed_str( | |
uid, hotkey, chain_str, block, incentive, emission | |
) | |
except: | |
continue | |
result.append(model_data) | |
return result | |
def get_subtensor_and_metagraph() -> Tuple[bt.subtensor, bt.metagraph]: | |
"""Returns a subtensor and metagraph for the finetuning subnet.""" | |
def _internal() -> Tuple[bt.subtensor, bt.metagraph]: | |
subtensor = bt.subtensor("finney") | |
metagraph = subtensor.metagraph(NETUID, lite=False) | |
return subtensor, metagraph | |
return run_with_retries(_internal) |