Spaces:
Running
Running
import time | |
import schedule | |
import threading | |
import gradio as gr | |
from datetime import datetime | |
from modules.smtp import send_email | |
from modules.activate import trigger | |
from modules.times import fix_datetime, time_diff | |
from modules.restart import self_restart, test_hf_restart, test_ms_restart | |
from utils import START_TIME, PERIOD, DELAY | |
def run_schedule(): | |
global START_TIME | |
START_TIME = datetime.now() | |
trigger() | |
while True: | |
schedule.run_pending() | |
time.sleep(DELAY) | |
def monitor(period=PERIOD): | |
print(f"Monitor is on and triggered every {period}h...") | |
schedule.every(47).hours.do(self_restart) | |
schedule.every(int(period)).hours.do(trigger) | |
threading.Thread(target=run_schedule, daemon=True).start() | |
# UI func | |
def tasklst(): | |
status = "Success" | |
logs = None | |
try: | |
logs = f"\nHas been running for {time_diff(START_TIME, datetime.now())}\n" | |
jobs = schedule.get_jobs() | |
for job in jobs: | |
prev_run = "" | |
last_run = fix_datetime(job.last_run) | |
if last_run: | |
prev_run = f"last run: {last_run}, " | |
next_run = fix_datetime(job.next_run) | |
logs += f"\nEvery {job.interval}h do {job.job_func.__name__} ({prev_run}next run: {next_run})\n" | |
except Exception as e: | |
status = f"{e}" | |
return status, logs | |
if __name__ == "__main__": | |
monitor() | |
with gr.Blocks() as demo: | |
gr.Markdown("# Keep Spaces/Studios Alive") | |
with gr.Row(): | |
with gr.Column(): | |
task_btn = gr.Button("See current task status") | |
once_btn = gr.Button("Trigger once manually") | |
smtp_btn = gr.Button("SMTP test") | |
with gr.Tab("HF token test"): | |
hf_txt = gr.Textbox( | |
label="Restart a space with permissions", | |
placeholder="username/repo", | |
value="Genius-Society/online_tools", | |
) | |
with gr.Row(): | |
clhf_btn = gr.Button("Clear") | |
hf_btn = gr.Button("Restart to test token validity") | |
with gr.Tab("MS cookie test"): | |
ms_txt = gr.Textbox( | |
label="Restart a studio with permissions", | |
placeholder="username/repo", | |
value="Genius-Society/online_tools", | |
) | |
with gr.Row(): | |
clms_btn = gr.Button("Clear") | |
ms_btn = gr.Button("Restart to test cookie validity") | |
with gr.Column(): | |
status_bar = gr.Textbox(label="Status", show_copy_button=True) | |
gr.Markdown("Logs") | |
logs_bar = gr.Markdown(container=True, show_copy_button=True) | |
task_btn.click(fn=tasklst, outputs=[status_bar, logs_bar]) | |
once_btn.click(fn=trigger, outputs=[status_bar, logs_bar]) | |
smtp_btn.click(fn=send_email, outputs=[status_bar, logs_bar]) | |
hf_btn.click( | |
fn=test_hf_restart, | |
inputs=[hf_txt], | |
outputs=[status_bar, logs_bar], | |
) | |
clhf_btn.click(fn=lambda: None, outputs=hf_txt) | |
ms_btn.click( | |
fn=test_ms_restart, | |
inputs=[ms_txt], | |
outputs=[status_bar, logs_bar], | |
) | |
clms_btn.click(fn=lambda: None, outputs=ms_txt) | |
demo.launch() | |