Spaces:
Running
Running
import requests | |
from modules.smtp import send_email | |
from utils import HF_DOMAIN, MS_DOMAIN, HF_HEADER, MS_HEADER, TIMEOUT | |
def restart_private_studio(repo: str): | |
response = requests.put( | |
f"{MS_DOMAIN}/api/v1/studio/{repo}/reset_restart", | |
headers=MS_HEADER, | |
timeout=TIMEOUT, | |
) | |
response.raise_for_status() | |
return f"\n{response.text}\n" | |
def restart_private_space(repo: str): | |
response = requests.post( | |
f"{HF_DOMAIN}/api/spaces/{repo}/restart", | |
headers=HF_HEADER, | |
timeout=TIMEOUT, | |
) | |
response.raise_for_status() | |
return f"\n{response.text}\n" | |
def self_restart(): | |
try: | |
restart_private_space("kakamond/keeps_alive") | |
except Exception as e: | |
send_email(f"Failed to self-restart: {e}") | |
# UI func | |
def test_hf_restart(repo: str): | |
status = "Success" | |
logs = None | |
try: | |
if not repo: | |
raise ValueError("Empty repo!") | |
if f"{HF_DOMAIN}/spaces/" in repo: | |
repo = repo.split("/spaces/")[1].split("/")[:2] | |
repo = "/".join(repo) | |
logs = restart_private_space(repo) | |
logs += f"\nSuccessfully restart {HF_DOMAIN}/spaces/{repo}\n" | |
except Exception as e: | |
status = f"{e}" | |
return status, logs | |
def test_ms_restart(repo: str): | |
status = "Success" | |
logs = None | |
try: | |
if not repo: | |
raise ValueError("Empty repo!") | |
if f"{MS_DOMAIN}/studios/" in repo: | |
repo = repo.split("/studios/")[1].split("/")[:2] | |
repo = "/".join(repo) | |
logs = restart_private_studio(repo) | |
logs += f"\nSuccessfully restart {MS_DOMAIN}/studios/{repo}\n" | |
except Exception as e: | |
status = f"{e}" | |
return status, logs | |