# app.py import os # Ensure Hugging Face cache writes to tmp os.environ["HF_HOME"] = "/tmp/hf_home" import shutil, zipfile, threading, time, json, uuid from datetime import datetime, timedelta from flask import Flask, request, render_template_string, jsonify import gdown from huggingface_hub import HfApi, login, upload_folder, list_repo_files from googleapiclient.discovery import build from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from google.auth.transport.requests import Request import pickle # Environment variables FOLDER_URL = os.getenv("FOLDER_URL") REPO_ID = os.getenv("REPO_ID") TOKEN = os.getenv("HF_TOKEN") GOOGLE_CREDENTIALS = os.getenv("GOOGLE_CREDENTIALS") # JSON string of credentials # Directories DOWNLOAD_DIR = "/tmp/backups" EXTRACT_DIR = "/tmp/extracted_backups" GDRIVE_DIR = "/tmp/gdrive_files" # Global state app_state = { "last_backup_time": "Never", "schedule_interval": 0, "status": "Ready", "backup_history": [], "gdrive_connected": False, "auto_cleanup": True, "max_backups": 10, "notification_enabled": True, "backup_running": False, "total_backups": 0, "last_error": None, "gdrive_files": [] } app = Flask(__name__) # Google Drive Integration class GDriveManager: def __init__(self): self.service = None self.creds = None def authenticate(self): try: if GOOGLE_CREDENTIALS: creds_data = json.loads(GOOGLE_CREDENTIALS) self.creds = Credentials.from_authorized_user_info(creds_data) self.service = build('drive', 'v3', credentials=self.creds) app_state["gdrive_connected"] = True return True except Exception as e: app_state["last_error"] = f"GDrive auth failed: {str(e)}" return False def list_files(self, folder_id=None): if not self.service: return [] try: query = f"'{folder_id}' in parents" if folder_id else "mimeType='application/zip' or mimeType='application/x-zip-compressed'" results = self.service.files().list( q=query, pageSize=50, fields="files(id, name, size, modifiedTime, mimeType)" ).execute() return results.get('files', []) except Exception as e: app_state["last_error"] = f"GDrive list error: {str(e)}" return [] def download_file(self, file_id, filename): if not self.service: return False try: os.makedirs(GDRIVE_DIR, exist_ok=True) request = self.service.files().get_media(fileId=file_id) with open(os.path.join(GDRIVE_DIR, filename), 'wb') as f: downloader = MediaIoBaseDownload(f, request) done = False while done is False: status, done = downloader.next_chunk() return True except Exception as e: app_state["last_error"] = f"GDrive download error: {str(e)}" return False gdrive = GDriveManager() # Enhanced backup logic def run_backup(source="gdrive"): global app_state if app_state["backup_running"]: return {"status": "error", "message": "Backup already running"} app_state["backup_running"] = True log_entries = [] backup_id = str(uuid.uuid4())[:8] start_time = datetime.now() try: log_entries.append(f"[{start_time.strftime('%H:%M:%S')}] Starting backup #{backup_id}") # Clean directories shutil.rmtree(DOWNLOAD_DIR, ignore_errors=True) shutil.rmtree(EXTRACT_DIR, ignore_errors=True) os.makedirs(DOWNLOAD_DIR, exist_ok=True) os.makedirs(EXTRACT_DIR, exist_ok=True) log_entries.append("Directories prepared") # Download based on source if source == "gdrive" and app_state["gdrive_connected"]: log_entries.append("Downloading from Google Drive...") gdrive_files = gdrive.list_files() for file in gdrive_files[:5]: # Limit to 5 recent files if gdrive.download_file(file['id'], file['name']): log_entries.append(f"Downloaded: {file['name']}") # Move gdrive files to download dir if os.path.exists(GDRIVE_DIR): for f in os.listdir(GDRIVE_DIR): shutil.move(os.path.join(GDRIVE_DIR, f), os.path.join(DOWNLOAD_DIR, f)) else: log_entries.append(f"Downloading from URL: {FOLDER_URL}") gdown.download_folder(url=FOLDER_URL, output=DOWNLOAD_DIR, use_cookies=False, quiet=True) log_entries.append("Download completed") # Extract archives extracted_count = 0 for root, _, files in os.walk(DOWNLOAD_DIR): for f in files: if f.endswith(('.zip', '.rar', '.7z')): zp = os.path.join(root, f) try: with zipfile.ZipFile(zp) as z: z.extractall(EXTRACT_DIR) extracted_count += 1 log_entries.append(f"Extracted: {f}") except Exception as e: log_entries.append(f"Failed to extract {f}: {str(e)}") # Fix common folder naming issues fixes = [ ("world_nither", "world_nether"), ("world_end", "world_the_end"), ("plugin", "plugins") ] for bad, good in fixes: bad_path = os.path.join(EXTRACT_DIR, bad) good_path = os.path.join(EXTRACT_DIR, good) if os.path.exists(bad_path) and not os.path.exists(good_path): os.rename(bad_path, good_path) log_entries.append(f"Fixed folder: {bad} → {good}") # Upload to Hugging Face login(token=TOKEN) api = HfApi() log_entries.append("Connected to Hugging Face") api.create_repo(repo_id=REPO_ID, repo_type="dataset", private=False, exist_ok=True, token=TOKEN) subfolders = { "world": os.path.join(EXTRACT_DIR, "world"), "world_nether": os.path.join(EXTRACT_DIR, "world_nether"), "world_the_end": os.path.join(EXTRACT_DIR, "world_the_end"), "plugins": os.path.join(EXTRACT_DIR, "plugins"), "logs": os.path.join(EXTRACT_DIR, "logs"), "config": os.path.join(EXTRACT_DIR, "config") } uploaded_folders = [] for name, path in subfolders.items(): if os.path.exists(path): try: upload_folder( repo_id=REPO_ID, folder_path=path, repo_type="dataset", token=TOKEN, path_in_repo=name, commit_message=f"Backup #{backup_id} - {name} - {datetime.now().strftime('%Y-%m-%d %H:%M')}" ) uploaded_folders.append(name) log_entries.append(f"✓ Uploaded: {name}") except Exception as e: log_entries.append(f"✗ Failed to upload {name}: {str(e)}") # Update state end_time = datetime.now() duration = (end_time - start_time).total_seconds() backup_record = { "id": backup_id, "timestamp": end_time.isoformat(), "duration": f"{duration:.1f}s", "source": source, "folders": uploaded_folders, "files_extracted": extracted_count, "status": "success" } app_state["backup_history"].insert(0, backup_record) app_state["last_backup_time"] = end_time.strftime("%Y-%m-%d %H:%M:%S") app_state["total_backups"] += 1 app_state["last_error"] = None # Auto-cleanup old backups if app_state["auto_cleanup"] and len(app_state["backup_history"]) > app_state["max_backups"]: app_state["backup_history"] = app_state["backup_history"][:app_state["max_backups"]] log_entries.append(f"✓ Backup completed in {duration:.1f}s") except Exception as e: error_msg = str(e) log_entries.append(f"✗ Error: {error_msg}") app_state["last_error"] = error_msg backup_record = { "id": backup_id, "timestamp": datetime.now().isoformat(), "duration": f"{(datetime.now() - start_time).total_seconds():.1f}s", "source": source, "status": "failed", "error": error_msg } app_state["backup_history"].insert(0, backup_record) finally: app_state["backup_running"] = False return { "status": "success" if not app_state["last_error"] else "error", "log": log_entries, "backup_id": backup_id } # Scheduler def schedule_loop(): while True: if app_state["schedule_interval"] > 0 and not app_state["backup_running"]: run_backup() time.sleep(60 if app_state["schedule_interval"] > 0 else 30) # Start scheduler thread threading.Thread(target=schedule_loop, daemon=True).start() # Initialize Google Drive gdrive.authenticate() if app_state["gdrive_connected"]: app_state["gdrive_files"] = gdrive.list_files() # HTML Template HTML_TEMPLATE = '''
Advanced backup automation with Google Drive integration