# app.py import os # Ensure Hugging Face cache writes to tmp os.environ["HF_HOME"] = "/tmp/hf_home" import shutil, zipfile, threading, time, json, uuid from datetime import datetime, timedelta from flask import Flask, request, render_template_string, jsonify import gdown from huggingface_hub import HfApi, login, upload_folder, list_repo_files from googleapiclient.discovery import build from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request import pickle # Environment variables FOLDER_URL = os.getenv("FOLDER_URL") REPO_ID = os.getenv("REPO_ID") TOKEN = os.getenv("HF_TOKEN") GOOGLE_CREDENTIALS = os.getenv("GOOGLE_CREDENTIALS") # JSON string of credentials # Directories DOWNLOAD_DIR = "/tmp/backups" EXTRACT_DIR = "/tmp/extracted_backups" GDRIVE_DIR = "/tmp/gdrive_files" # Global state app_state = { "last_backup_time": "Never", "schedule_interval": 0, "status": "Ready", "backup_history": [], "gdrive_connected": False, "auto_cleanup": True, "max_backups": 10, "notification_enabled": True, "backup_running": False, "total_backups": 0, "last_error": None, "gdrive_files": [] } app = Flask(__name__) # Google Drive Integration class GDriveManager: def __init__(self): self.service = None self.creds = None def authenticate(self): try: if GOOGLE_CREDENTIALS: creds_data = json.loads(GOOGLE_CREDENTIALS) self.creds = Credentials.from_authorized_user_info(creds_data) self.service = build('drive', 'v3', credentials=self.creds) app_state["gdrive_connected"] = True return True except Exception as e: app_state["last_error"] = f"GDrive auth failed: {str(e)}" return False def list_files(self, folder_id=None): if not self.service: return [] try: query = f"'{folder_id}' in parents" if folder_id else "mimeType='application/zip' or mimeType='application/x-zip-compressed'" results = self.service.files().list( q=query, pageSize=50, fields="files(id, name, size, modifiedTime, mimeType)" ).execute() return results.get('files', []) except Exception as e: app_state["last_error"] = f"GDrive list error: {str(e)}" return [] def download_file(self, file_id, filename): if not self.service: return False try: os.makedirs(GDRIVE_DIR, exist_ok=True) request = self.service.files().get_media(fileId=file_id) with open(os.path.join(GDRIVE_DIR, filename), 'wb') as f: downloader = MediaIoBaseDownload(f, request) done = False while done is False: status, done = downloader.next_chunk() return True except Exception as e: app_state["last_error"] = f"GDrive download error: {str(e)}" return False gdrive = GDriveManager() # Enhanced backup logic def run_backup(source="gdrive"): global app_state if app_state["backup_running"]: return {"status": "error", "message": "Backup already running"} app_state["backup_running"] = True log_entries = [] backup_id = str(uuid.uuid4())[:8] start_time = datetime.now() try: log_entries.append(f"[{start_time.strftime('%H:%M:%S')}] Starting backup #{backup_id}") # Clean directories shutil.rmtree(DOWNLOAD_DIR, ignore_errors=True) shutil.rmtree(EXTRACT_DIR, ignore_errors=True) os.makedirs(DOWNLOAD_DIR, exist_ok=True) os.makedirs(EXTRACT_DIR, exist_ok=True) log_entries.append("Directories prepared") # Download based on source if source == "gdrive" and app_state["gdrive_connected"]: log_entries.append("Downloading from Google Drive...") gdrive_files = gdrive.list_files() for file in gdrive_files[:5]: # Limit to 5 recent files if gdrive.download_file(file['id'], file['name']): log_entries.append(f"Downloaded: {file['name']}") # Move gdrive files to download dir if os.path.exists(GDRIVE_DIR): for f in os.listdir(GDRIVE_DIR): shutil.move(os.path.join(GDRIVE_DIR, f), os.path.join(DOWNLOAD_DIR, f)) else: log_entries.append(f"Downloading from URL: {FOLDER_URL}") gdown.download_folder(url=FOLDER_URL, output=DOWNLOAD_DIR, use_cookies=False, quiet=True) log_entries.append("Download completed") # Extract archives extracted_count = 0 for root, _, files in os.walk(DOWNLOAD_DIR): for f in files: if f.endswith(('.zip', '.rar', '.7z')): zp = os.path.join(root, f) try: with zipfile.ZipFile(zp) as z: z.extractall(EXTRACT_DIR) extracted_count += 1 log_entries.append(f"Extracted: {f}") except Exception as e: log_entries.append(f"Failed to extract {f}: {str(e)}") # Fix common folder naming issues fixes = [ ("world_nither", "world_nether"), ("world_end", "world_the_end"), ("plugin", "plugins") ] for bad, good in fixes: bad_path = os.path.join(EXTRACT_DIR, bad) good_path = os.path.join(EXTRACT_DIR, good) if os.path.exists(bad_path) and not os.path.exists(good_path): os.rename(bad_path, good_path) log_entries.append(f"Fixed folder: {bad} → {good}") # Upload to Hugging Face login(token=TOKEN) api = HfApi() log_entries.append("Connected to Hugging Face") api.create_repo(repo_id=REPO_ID, repo_type="dataset", private=False, exist_ok=True, token=TOKEN) subfolders = { "world": os.path.join(EXTRACT_DIR, "world"), "world_nether": os.path.join(EXTRACT_DIR, "world_nether"), "world_the_end": os.path.join(EXTRACT_DIR, "world_the_end"), "plugins": os.path.join(EXTRACT_DIR, "plugins"), "logs": os.path.join(EXTRACT_DIR, "logs"), "config": os.path.join(EXTRACT_DIR, "config") } uploaded_folders = [] for name, path in subfolders.items(): if os.path.exists(path): try: upload_folder( repo_id=REPO_ID, folder_path=path, repo_type="dataset", token=TOKEN, path_in_repo=name, commit_message=f"Backup #{backup_id} - {name} - {datetime.now().strftime('%Y-%m-%d %H:%M')}" ) uploaded_folders.append(name) log_entries.append(f"✓ Uploaded: {name}") except Exception as e: log_entries.append(f"✗ Failed to upload {name}: {str(e)}") # Update state end_time = datetime.now() duration = (end_time - start_time).total_seconds() backup_record = { "id": backup_id, "timestamp": end_time.isoformat(), "duration": f"{duration:.1f}s", "source": source, "folders": uploaded_folders, "files_extracted": extracted_count, "status": "success" } app_state["backup_history"].insert(0, backup_record) app_state["last_backup_time"] = end_time.strftime("%Y-%m-%d %H:%M:%S") app_state["total_backups"] += 1 app_state["last_error"] = None # Auto-cleanup old backups if app_state["auto_cleanup"] and len(app_state["backup_history"]) > app_state["max_backups"]: app_state["backup_history"] = app_state["backup_history"][:app_state["max_backups"]] log_entries.append(f"✓ Backup completed in {duration:.1f}s") except Exception as e: error_msg = str(e) log_entries.append(f"✗ Error: {error_msg}") app_state["last_error"] = error_msg backup_record = { "id": backup_id, "timestamp": datetime.now().isoformat(), "duration": f"{(datetime.now() - start_time).total_seconds():.1f}s", "source": source, "status": "failed", "error": error_msg } app_state["backup_history"].insert(0, backup_record) finally: app_state["backup_running"] = False return { "status": "success" if not app_state["last_error"] else "error", "log": log_entries, "backup_id": backup_id } # Scheduler def schedule_loop(): while True: if app_state["schedule_interval"] > 0 and not app_state["backup_running"]: run_backup() time.sleep(60 if app_state["schedule_interval"] > 0 else 30) # Start scheduler thread threading.Thread(target=schedule_loop, daemon=True).start() # Initialize Google Drive gdrive.authenticate() if app_state["gdrive_connected"]: app_state["gdrive_files"] = gdrive.list_files() # HTML Template HTML_TEMPLATE = ''' Minecraft Backup Manager Pro

🎮 Minecraft Backup Manager Pro

Advanced backup automation with Google Drive integration

System Status

Ready

Total Backups

{{ total_backups }}

Last Backup

{{ last_backup_time }}

Google Drive

{{ 'Connected' if gdrive_connected else 'Offline' }}

🚀 Quick Actions

⚙️ Automation Settings

📊 Backup History

{% for backup in backup_history %}

Backup #{{ backup.id }}

{{ backup.timestamp }} • {{ backup.duration }} {% if backup.folders %} • {{ backup.folders|length }} folders {% endif %}
{{ backup.status.title() }}
{% endfor %}

☁️ Google Drive Files

{% for file in gdrive_files %}

{{ file.name }}

{{ file.get('size', 'Unknown size') }} • {{ file.get('modifiedTime', 'Unknown date') }}
{% endfor %}

📝 Activity Logs

System initialized and ready for backups...
Google Drive connection: {{ 'Active' if gdrive_connected else 'Inactive' }}
Hugging Face repository: {{ repo_id }}
''' # API Routes @app.route("/") def index(): return render_template_string(HTML_TEMPLATE, last_backup_time=app_state["last_backup_time"], schedule_interval=app_state["schedule_interval"], backup_history=app_state["backup_history"][:10], gdrive_connected=app_state["gdrive_connected"], gdrive_files=app_state["gdrive_files"][:10], auto_cleanup=app_state["auto_cleanup"], max_backups=app_state["max_backups"], total_backups=app_state["total_backups"], repo_id=REPO_ID or "Not configured" ) @app.route("/api/backup", methods=["POST"]) def api_backup(): if app_state["backup_running"]: return jsonify({"status": "error", "message": "Backup already running"}) data = request.get_json() or {} source = data.get("source", "gdrive") # Run backup in background thread def backup_thread(): result = run_backup(source) return result thread = threading.Thread(target=backup_thread) thread.start() thread.join(timeout=300) # 5 minute timeout if thread.is_alive(): return jsonify({"status": "error", "message": "Backup timeout"}) return jsonify({"status": "success", "message": "Backup initiated"}) @app.route("/api/settings", methods=["POST"]) def api_settings(): try: data = request.get_json() app_state["schedule_interval"] = data.get("interval", 0) app_state["auto_cleanup"] = data.get("auto_cleanup", True) app_state["max_backups"] = data.get("max_backups", 10) return jsonify({"status": "success"}) except Exception as e: return jsonify({"status": "error", "message": str(e)}) @app.route("/api/gdrive/refresh", methods=["POST"]) def api_gdrive_refresh(): try: if gdrive.authenticate(): app_state["gdrive_files"] = gdrive.list_files() return jsonify({"status": "success", "files": len(app_state["gdrive_files"])}) else: return jsonify({"status": "error", "message": "Authentication failed"}) except Exception as e: return jsonify({"status": "error", "message": str(e)}) @app.route("/api/gdrive/download", methods=["POST"]) def api_gdrive_download(): try: data = request.get_json() file_id = data.get("file_id") filename = data.get("filename") if gdrive.download_file(file_id, filename): return jsonify({"status": "success"}) else: return jsonify({"status": "error", "message": "Download failed"}) except Exception as e: return jsonify({"status": "error", "message": str(e)}) @app.route("/api/history") def api_history(): return jsonify({ "status": "success", "history": app_state["backup_history"], "total_backups": app_state["total_backups"], "last_backup_time": app_state["last_backup_time"] }) @app.route("/api/status") def api_status(): try: # Get HF repo info hf_status = "Connected" repo_files = 0 try: if TOKEN and REPO_ID: api = HfApi() files = list_repo_files(repo_id=REPO_ID, repo_type="dataset", token=TOKEN) repo_files = len(list(files)) except: hf_status = "Error" return jsonify({ "status": "success", "system_status": "running" if app_state["backup_running"] else "ready", "gdrive_status": "connected" if app_state["gdrive_connected"] else "disconnected", "hf_status": hf_status, "repo_files": repo_files, "last_backup": app_state["last_backup_time"], "total_backups": app_state["total_backups"], "schedule_interval": app_state["schedule_interval"], "last_error": app_state["last_error"] }) except Exception as e: return jsonify({"status": "error", "message": str(e)}) if __name__ == "__main__": print("🚀 Minecraft Backup Manager Pro starting...") print(f"📁 Repository: {REPO_ID}") print(f"☁️ Google Drive: {'Connected' if app_state['gdrive_connected'] else 'Not configured'}") print(f"🔧 Server running on http://0.0.0.0:7860") app.run(host="0.0.0.0", port=7860, debug=False)