Spaces:
Sleeping
Sleeping
File size: 6,210 Bytes
58ef5b9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 |
import numpy as np
import pandas as pd
import wandb
from typing import Any, Dict, List, Optional, Tuple
import json
import bittensor as bt
from dataclasses import dataclass
import os
WANDB_TOKEN = os.environ.get("WANDB_TOKEN")
NETUID = 80
DELAY_SECS = 3
RETRIES = 3
@dataclass(frozen=True)
class ModelData:
uid: int
hotkey: str
competition_id: int
namespace: str
name: str
commit: str
# Hash of (hash(model) + hotkey)
secure_hash: str
block: int
incentive: float
emission: float
@classmethod
def from_compressed_str(
cls,
uid: int,
hotkey: str,
cs: str,
block: int,
incentive: float,
emission: float,
):
"""Returns an instance of this class from a compressed string representation"""
tokens = cs.split(":")
return ModelData(
uid=uid,
hotkey=hotkey,
namespace=tokens[0],
name=tokens[1],
commit=tokens[2],
secure_hash=tokens[3],
competition_id=int(tokens[4]),
block=block,
incentive=incentive,
emission=emission,
)
def run_with_retries(func, *args, **kwargs):
"""Runs a provided function with retries in the event of a failure."""
for i in range(0, RETRIES):
try:
return func(*args, **kwargs)
except (Exception, RuntimeError):
print(f"Failed to run function: {traceback.format_exc()}")
if i == RETRIES - 1:
raise
time.sleep(DELAY_SECS)
raise RuntimeError("Should never happen")
def get_wandb_runs(
project: str, filters: Dict[str, Any], order: str = "-created_at"
) -> List:
"""Get the latest runs from Wandb, retrying infinitely until we get them.
Args:
project (str): The Wandb project to get runs from.
filters (Dict[str, Any]): Filters to apply to the runs.
order (str): Order to sort the runs by. Defaults to "-created_at" (newest first)
Returns:
List: List of runs matching the provided filters
"""
while True:
api = wandb.Api(api_key=WANDB_TOKEN, timeout=100)
runs = list(
api.runs(
project,
filters=filters,
order=order,
)
)
if len(runs) > 0:
return runs
# WandDB API is quite unreliable. Wait another minute and try again.
print("Failed to get runs from Wandb. Trying again in 60 seconds.")
time.sleep(60)
def get_scores(
uids: List[int],
wandb_runs: List,
) -> Dict[int, Dict[str, Optional[float]]]:
"""Returns the most recent scores for the provided UIDs.
Args:
uids (List[int]): List of UIDs to get scores for.
wandb_runs (List): List of validator runs from Wandb. Requires the runs are provided in descending order.
"""
result = {}
previous_timestamp = None
seen_competitions = set()
# Iterate through the runs until we've processed all the uids.
for i, run in enumerate(wandb_runs):
if not "original_format_json" in run.summary:
continue
data = json.loads(run.summary["original_format_json"])
all_uid_data = data["uid_data"]
timestamp = data["timestamp"]
# Make sure runs are indeed in descending time order.
# assert (
# previous_timestamp is None or timestamp < previous_timestamp
# ), f"Timestamps are not in descending order: {timestamp} >= {previous_timestamp}"
previous_timestamp = timestamp
comp_id = data.get("competition_id", None)
for uid in uids:
if uid in result:
continue
if str(uid) in all_uid_data:
uid_data = all_uid_data[str(uid)]
# Only the most recent run per competition is fresh.
is_fresh = comp_id not in seen_competitions
result[uid] = {
"avg_loss": uid_data.get("average_loss", None),
"win_rate": uid_data.get("win_rate", None),
"win_total": uid_data.get("win_total", None),
"weight": uid_data.get("weight", None),
"competition_id": uid_data.get("competition_id", None),
"fresh": is_fresh,
}
seen_competitions.add(comp_id)
break
return result
def get_subnet_data(
subtensor: bt.subtensor, metagraph: bt.metagraph
) -> List[ModelData]:
result = []
for uid in metagraph.uids.tolist():
hotkey = metagraph.hotkeys[uid]
metadata = None
try:
metadata = run_with_retries(
functools.partial(get_metadata, subtensor, metagraph.netuid, hotkey)
)
except:
print(f"Failed to get metadata for UID {uid}: {traceback.format_exc()}")
if not metadata:
continue
commitment = metadata["info"]["fields"][0]
hex_data = commitment[list(commitment.keys())[0]][2:]
chain_str = bytes.fromhex(hex_data).decode()
block = metadata["block"]
incentive = np.nan_to_num(metagraph.incentive[uid]).item()
emission = (
np.nan_to_num(metagraph.emission[uid]).item() * 20
) # convert to daily TAO
model_data = None
try:
model_data = ModelData.from_compressed_str(
uid, hotkey, chain_str, block, incentive, emission
)
except:
continue
result.append(model_data)
return result
def get_subtensor_and_metagraph() -> Tuple[bt.subtensor, bt.metagraph]:
"""Returns a subtensor and metagraph for the finetuning subnet."""
def _internal() -> Tuple[bt.subtensor, bt.metagraph]:
subtensor = bt.subtensor("finney")
metagraph = subtensor.metagraph(NETUID, lite=False)
return subtensor, metagraph
return run_with_retries(_internal) |