smtp_tester / modules /restart.py
admin
restart avoid disabled
293a93b
raw
history blame
2.07 kB
import requests
from huggingface_hub import HfApi
from modules.smtp import send_email
from utils import HF_DOMAIN, MS_DOMAIN, HF_TK, MS_HEADER, TIMEOUT, REPOS
HF_API = HfApi(token=HF_TK)
def restart_private_studio(repo: str):
response = requests.put(
f"{MS_DOMAIN}/api/v1/studio/{repo}/reset_restart",
headers=MS_HEADER,
timeout=TIMEOUT,
)
response.raise_for_status()
return f"\n{response.text}\n"
def restart_repos():
if REPOS:
repos = REPOS.split(";")
for repo in repos:
private_repo = repo.strip()
print(f"Restarting {private_repo} space & studio...")
restart_private_studio(private_repo)
HF_API.restart_space(private_repo)
def self_restart(me="kakamond/keeps_alive"):
try:
spaces = HF_API.list_spaces(author=me.split("/")[0])
for space in spaces:
if space.id != me and not space.disabled:
HF_API.restart_space(space.id)
HF_API.restart_space(me)
except Exception as e:
send_email(f"Failed to self-restart: {e}")
# UI func
def test_hf_restart(repo: str):
status = "Success"
logs = None
try:
if not repo:
raise ValueError("Empty repo!")
if f"{HF_DOMAIN}/spaces/" in repo:
repo = repo.split("/spaces/")[1].split("/")[:2]
repo = "/".join(repo)
logs = f"\n{HF_API.restart_space(repo)}\n"
logs += f"\nSuccessfully restart {HF_DOMAIN}/spaces/{repo}\n"
except Exception as e:
status = f"{e}"
return status, logs
def test_ms_restart(repo: str):
status = "Success"
logs = None
try:
if not repo:
raise ValueError("Empty repo!")
if f"{MS_DOMAIN}/studios/" in repo:
repo = repo.split("/studios/")[1].split("/")[:2]
repo = "/".join(repo)
logs = restart_private_studio(repo)
logs += f"\nSuccessfully restart {MS_DOMAIN}/studios/{repo}\n"
except Exception as e:
status = f"{e}"
return status, logs