Spaces:
Running
Running
import os, shutil, zipfile, threading, time | |
from flask import Flask, request, render_template_string | |
import gdown | |
from huggingface_hub import HfApi, login, upload_folder | |
# Environment variables (set them in HF Spaces settings) | |
FOLDER_URL = os.getenv("FOLDER_URL") | |
REPO_ID = os.getenv("REPO_ID") | |
TOKEN = os.getenv("HF_TOKEN") | |
DOWNLOAD_DIR = "/tmp/backups" | |
EXTRACT_DIR = "/tmp/extracted_backups" | |
# Global state | |
last_backup_time = "Never" | |
schedule_interval = 0 | |
app = Flask(__name__) | |
# Backup logic | |
def run_backup(): | |
global last_backup_time | |
try: | |
shutil.rmtree(DOWNLOAD_DIR, ignore_errors=True) | |
shutil.rmtree(EXTRACT_DIR, ignore_errors=True) | |
os.makedirs(DOWNLOAD_DIR, exist_ok=True) | |
os.makedirs(EXTRACT_DIR, exist_ok=True) | |
gdown.download_folder( | |
url=FOLDER_URL, | |
output=DOWNLOAD_DIR, | |
use_cookies=False, | |
quiet=False | |
) | |
for root, _, files in os.walk(DOWNLOAD_DIR): | |
for f in files: | |
if f.endswith(".zip"): | |
zp = os.path.join(root, f) | |
with zipfile.ZipFile(zp) as z: | |
z.extractall(EXTRACT_DIR) | |
bad = os.path.join(EXTRACT_DIR, "world_nither") | |
good = os.path.join(EXTRACT_DIR, "world_nether") | |
if os.path.exists(bad) and not os.path.exists(good): | |
os.rename(bad, good) | |
login(token=TOKEN) | |
api = HfApi() | |
try: | |
api.delete_repo(repo_id=REPO_ID, repo_type="dataset") | |
except Exception as err: | |
print("delete skipped:", err) | |
api.create_repo(repo_id=REPO_ID, repo_type="dataset", private=False, exist_ok=True) | |
subfolders = { | |
"world": os.path.join(EXTRACT_DIR, "world"), | |
"world_nether": os.path.join(EXTRACT_DIR, "world_nether"), | |
"world_the_end": os.path.join(EXTRACT_DIR, "world_the_end"), | |
"plugins": os.path.join(EXTRACT_DIR, "plugins") | |
} | |
for name, path in subfolders.items(): | |
if os.path.exists(path): | |
upload_folder( | |
repo_id=REPO_ID, | |
folder_path=path, | |
repo_type="dataset", | |
token=TOKEN, | |
path_in_repo=name, | |
commit_message="add " + name | |
) | |
last_backup_time = time.ctime() | |
return "Backup completed" | |
except Exception as e: | |
return f"Error: {str(e)}" | |
# Background timer | |
def schedule_loop(): | |
while True: | |
if schedule_interval > 0: | |
run_backup() | |
time.sleep(schedule_interval * 60) | |
else: | |
time.sleep(10) | |
# Start scheduler thread | |
threading.Thread(target=schedule_loop, daemon=True).start() | |
# HTML template | |
HTML = """ | |
<!DOCTYPE html> | |
<html> | |
<head> | |
<title>Minecraft Backup Panel</title> | |
<meta name="viewport" content="width=device-width, initial-scale=1"> | |
<style> | |
body { font-family: sans-serif; padding: 20px; max-width: 600px; margin: auto; } | |
h2 { font-size: 24px; } | |
input, button { width: 100%; padding: 10px; margin: 8px 0; font-size: 16px; } | |
.box { background: #f5f5f5; padding: 15px; border-radius: 8px; margin-top: 20px; } | |
</style> | |
</head> | |
<body> | |
<h2>Minecraft Backup Controller</h2> | |
<form method="post"> | |
<label>Set interval (minutes)</label> | |
<input type="number" name="interval" value="{{ interval }}" min="1"> | |
<button type="submit">Set Timer</button> | |
</form> | |
<form method="post"> | |
<input type="hidden" name="manual_run" value="1"> | |
<button type="submit">Run Backup Now</button> | |
</form> | |
<div class="box"> | |
<p><strong>Last Backup:</strong> {{ last_run }}</p> | |
<p><strong>Status:</strong> {{ status }}</p> | |
</div> | |
</body> | |
</html> | |
""" | |
def index(): | |
global schedule_interval | |
status = "" | |
if request.method == "POST": | |
if "manual_run" in request.form: | |
status = run_backup() | |
else: | |
try: | |
schedule_interval = int(request.form.get("interval", "0")) | |
status = f"Timer set for every {schedule_interval} minutes" | |
except: | |
status = "Invalid interval" | |
return render_template_string(HTML, last_run=last_backup_time, interval=schedule_interval, status=status) | |
if __name__ == "__main__": | |
app.run(host="0.0.0.0", port=7860) | |