Spaces:
Running
Running
File size: 3,520 Bytes
751f3b1 10b479b a98d805 609a724 10b479b 751f3b1 10b479b 751f3b1 a98d805 751f3b1 b50dddd a98d805 b50dddd de43d1a b50dddd de43d1a f8fb685 de43d1a f8fb685 751f3b1 de43d1a 42452f8 de43d1a b50dddd 10b479b 751f3b1 e8e1626 609a724 b50dddd 0a7ac7a 609a724 b9da84c 609a724 0a7ac7a 609a724 b9da84c 609a724 de43d1a b50dddd e8e1626 b50dddd 609a724 b50dddd 2fbc1c8 609a724 a98d805 e8e1626 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
import time
import schedule
import threading
import gradio as gr
from datetime import datetime
from modules.smtp import send_email
from modules.activate import trigger
from modules.times import fix_datetime, time_diff
from modules.restart import self_restart, test_hf_restart, test_ms_restart
from utils import START_TIME, PERIOD, DELAY
def run_schedule():
global START_TIME
START_TIME = datetime.now()
trigger()
while True:
schedule.run_pending()
time.sleep(DELAY)
def monitor(period=PERIOD):
print(f"Monitor is on and triggered every {period}h...")
schedule.every(47).hours.do(self_restart)
schedule.every(int(period)).hours.do(trigger)
threading.Thread(target=run_schedule, daemon=True).start()
# UI func
def tasklst():
status = "Success"
logs = None
try:
logs = f"\nHas been running for {time_diff(START_TIME, datetime.now())}\n"
jobs = schedule.get_jobs()
for job in jobs:
prev_run = ""
last_run = fix_datetime(job.last_run)
if last_run:
prev_run = f"last run: {last_run}, "
next_run = fix_datetime(job.next_run)
logs += f"\nEvery {job.interval}h do {job.job_func.__name__} ({prev_run}next run: {next_run})\n"
except Exception as e:
status = f"{e}"
return status, logs
if __name__ == "__main__":
monitor()
with gr.Blocks() as demo:
gr.Markdown("# Keep Spaces/Studios Alive")
with gr.Row():
with gr.Column():
task_btn = gr.Button("See current task status")
once_btn = gr.Button("Trigger once manually")
smtp_btn = gr.Button("SMTP test")
with gr.Tab("HF token test"):
hf_txt = gr.Textbox(
label="Restart a space with permissions",
placeholder="username/repo",
value="Genius-Society/online_tools",
)
with gr.Row():
clhf_btn = gr.Button("Clear")
hf_btn = gr.Button("Restart to test token validity")
with gr.Tab("MS cookie test"):
ms_txt = gr.Textbox(
label="Restart a studio with permissions",
placeholder="username/repo",
value="Genius-Society/online_tools",
)
with gr.Row():
clms_btn = gr.Button("Clear")
ms_btn = gr.Button("Restart to test cookie validity")
with gr.Column():
status_bar = gr.Textbox(label="Status", show_copy_button=True)
gr.Markdown("Logs")
logs_bar = gr.Markdown(container=True, show_copy_button=True)
task_btn.click(fn=tasklst, outputs=[status_bar, logs_bar])
once_btn.click(fn=trigger, outputs=[status_bar, logs_bar])
smtp_btn.click(fn=send_email, outputs=[status_bar, logs_bar])
hf_btn.click(
fn=test_hf_restart,
inputs=[hf_txt],
outputs=[status_bar, logs_bar],
)
clhf_btn.click(fn=lambda: None, outputs=hf_txt)
ms_btn.click(
fn=test_ms_restart,
inputs=[ms_txt],
outputs=[status_bar, logs_bar],
)
clms_btn.click(fn=lambda: None, outputs=ms_txt)
demo.launch()
|