vis data
Upload 3 files
58ef5b9 verified
import numpy as np
import pandas as pd
import wandb
from typing import Any, Dict, List, Optional, Tuple
import json
import bittensor as bt
from dataclasses import dataclass
import os
WANDB_TOKEN = os.environ.get("WANDB_TOKEN")
NETUID = 80
DELAY_SECS = 3
RETRIES = 3
@dataclass(frozen=True)
class ModelData:
uid: int
hotkey: str
competition_id: int
namespace: str
name: str
commit: str
# Hash of (hash(model) + hotkey)
secure_hash: str
block: int
incentive: float
emission: float
@classmethod
def from_compressed_str(
cls,
uid: int,
hotkey: str,
cs: str,
block: int,
incentive: float,
emission: float,
):
"""Returns an instance of this class from a compressed string representation"""
tokens = cs.split(":")
return ModelData(
uid=uid,
hotkey=hotkey,
namespace=tokens[0],
name=tokens[1],
commit=tokens[2],
secure_hash=tokens[3],
competition_id=int(tokens[4]),
block=block,
incentive=incentive,
emission=emission,
)
def run_with_retries(func, *args, **kwargs):
"""Runs a provided function with retries in the event of a failure."""
for i in range(0, RETRIES):
try:
return func(*args, **kwargs)
except (Exception, RuntimeError):
print(f"Failed to run function: {traceback.format_exc()}")
if i == RETRIES - 1:
raise
time.sleep(DELAY_SECS)
raise RuntimeError("Should never happen")
def get_wandb_runs(
project: str, filters: Dict[str, Any], order: str = "-created_at"
) -> List:
"""Get the latest runs from Wandb, retrying infinitely until we get them.
Args:
project (str): The Wandb project to get runs from.
filters (Dict[str, Any]): Filters to apply to the runs.
order (str): Order to sort the runs by. Defaults to "-created_at" (newest first)
Returns:
List: List of runs matching the provided filters
"""
while True:
api = wandb.Api(api_key=WANDB_TOKEN, timeout=100)
runs = list(
api.runs(
project,
filters=filters,
order=order,
)
)
if len(runs) > 0:
return runs
# WandDB API is quite unreliable. Wait another minute and try again.
print("Failed to get runs from Wandb. Trying again in 60 seconds.")
time.sleep(60)
def get_scores(
uids: List[int],
wandb_runs: List,
) -> Dict[int, Dict[str, Optional[float]]]:
"""Returns the most recent scores for the provided UIDs.
Args:
uids (List[int]): List of UIDs to get scores for.
wandb_runs (List): List of validator runs from Wandb. Requires the runs are provided in descending order.
"""
result = {}
previous_timestamp = None
seen_competitions = set()
# Iterate through the runs until we've processed all the uids.
for i, run in enumerate(wandb_runs):
if not "original_format_json" in run.summary:
continue
data = json.loads(run.summary["original_format_json"])
all_uid_data = data["uid_data"]
timestamp = data["timestamp"]
# Make sure runs are indeed in descending time order.
# assert (
# previous_timestamp is None or timestamp < previous_timestamp
# ), f"Timestamps are not in descending order: {timestamp} >= {previous_timestamp}"
previous_timestamp = timestamp
comp_id = data.get("competition_id", None)
for uid in uids:
if uid in result:
continue
if str(uid) in all_uid_data:
uid_data = all_uid_data[str(uid)]
# Only the most recent run per competition is fresh.
is_fresh = comp_id not in seen_competitions
result[uid] = {
"avg_loss": uid_data.get("average_loss", None),
"win_rate": uid_data.get("win_rate", None),
"win_total": uid_data.get("win_total", None),
"weight": uid_data.get("weight", None),
"competition_id": uid_data.get("competition_id", None),
"fresh": is_fresh,
}
seen_competitions.add(comp_id)
break
return result
def get_subnet_data(
subtensor: bt.subtensor, metagraph: bt.metagraph
) -> List[ModelData]:
result = []
for uid in metagraph.uids.tolist():
hotkey = metagraph.hotkeys[uid]
metadata = None
try:
metadata = run_with_retries(
functools.partial(get_metadata, subtensor, metagraph.netuid, hotkey)
)
except:
print(f"Failed to get metadata for UID {uid}: {traceback.format_exc()}")
if not metadata:
continue
commitment = metadata["info"]["fields"][0]
hex_data = commitment[list(commitment.keys())[0]][2:]
chain_str = bytes.fromhex(hex_data).decode()
block = metadata["block"]
incentive = np.nan_to_num(metagraph.incentive[uid]).item()
emission = (
np.nan_to_num(metagraph.emission[uid]).item() * 20
) # convert to daily TAO
model_data = None
try:
model_data = ModelData.from_compressed_str(
uid, hotkey, chain_str, block, incentive, emission
)
except:
continue
result.append(model_data)
return result
def get_subtensor_and_metagraph() -> Tuple[bt.subtensor, bt.metagraph]:
"""Returns a subtensor and metagraph for the finetuning subnet."""
def _internal() -> Tuple[bt.subtensor, bt.metagraph]:
subtensor = bt.subtensor("finney")
metagraph = subtensor.metagraph(NETUID, lite=False)
return subtensor, metagraph
return run_with_retries(_internal)