import os import tempfile import requests from urllib.parse import urlparse, unquote from flask import Flask, render_template_string, request, redirect, send_file, jsonify from huggingface_hub import HfApi, hf_hub_download, upload_file, delete_file import time app = Flask(__name__) app.secret_key = os.urandom(24) REPO_ID = None HF_TOKEN = None api = HfApi() TEMPLATE = """ SUS Drive - {{ path or 'Root' }}

Processing...

SUS Drive

Storage Used: {{ usage_percentage|round(2) }}% of 100GB

{% if items %}
{% for item in items %} {% endfor %}
{% else %}

This folder is empty

Upload a file or create a new folder to get started.

{% endif %}
""" def format_size(size): if size is None: return "Directory" for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if size < 1024: return f"{size:.2f} {unit}" size /= 1024 return f"{size:.2f} PB" def list_folder(path=""): if not REPO_ID or not HF_TOKEN: return [] prefix = path.strip("/") + ("/" if path else "") try: files_info = api.get_paths_info(repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN, path=prefix, recursive=False) except Exception as e: print(f"Error listing files: {e}") return [] items = [] seen_dirs = set() for file_info in files_info: rel_path = file_info.path[len(prefix):] if prefix else file_info.path if not rel_path: continue if "/" in rel_path: dir_name = rel_path.split("/")[0] dir_path = (prefix + dir_name).strip("/") if dir_path not in seen_dirs: seen_dirs.add(dir_path) items.append({ "type": "dir", "name": dir_name, "path": dir_path, "size": None, "size_str": "Directory" }) else: items.append({ "type": "file", "name": rel_path, "path": file_info.path, "size": file_info.size, "size_str": format_size(file_info.size) }) items.sort(key=lambda x: (x["type"] != "dir", x["name"].lower())) return items def get_total_size(): if not REPO_ID or not HF_TOKEN: return 0 try: all_files_info = api.get_paths_info(repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN, path="/", recursive=True) total_size = sum(file_info.size for file_info in all_files_info if file_info.type == "file") return total_size except Exception as e: print(f"Error getting total size: {e}") return 0 @app.route("/set_credentials", methods=["GET", "POST"]) def set_credentials(): global REPO_ID, HF_TOKEN if request.method == "POST": REPO_ID = request.form.get("repo_id") HF_TOKEN = request.form.get("hf_token") if not REPO_ID or not HF_TOKEN: return "Missing repo_id or hf_token", 400 return redirect("/") return """


""" @app.route("/", methods=["GET"]) def index(): if not REPO_ID or not HF_TOKEN: return redirect("/set_credentials") path = request.args.get("path", "").strip("/") items = list_folder(path) total_used = get_total_size() total_available = 100 * 1024 * 1024 * 1024 usage_percentage = (total_used / total_available) * 100 if total_available > 0 else 0 return render_template_string(TEMPLATE, items=items, path=path, usage_percentage=usage_percentage) @app.route("/download", methods=["GET"]) def download(): if not REPO_ID or not HF_TOKEN: return "Credentials not set", 403 file_path = request.args.get("path", "") if not file_path: return "No file path provided", 400 try: local_path = hf_hub_download( repo_id=REPO_ID, filename=file_path, repo_type="dataset", token=HF_TOKEN, cache_dir=tempfile.gettempdir() ) return send_file( local_path, as_attachment=True, download_name=os.path.basename(file_path) ) except Exception as e: return f"Error downloading file: {str(e)}", 500 @app.route("/upload", methods=["POST"]) def upload(): if not REPO_ID or not HF_TOKEN: return "Credentials not set", 403 if 'file' not in request.files: return "No file provided", 400 file = request.files["file"] if file.filename == '': return "No file selected", 400 path = request.form.get("path", "").strip("/") dest_path = f"{path}/{file.filename}".strip("/") with tempfile.NamedTemporaryFile(delete=False) as tmp_file: file.save(tmp_file.name) try: upload_file( path_or_fileobj=tmp_file.name, path_in_repo=dest_path, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN ) except Exception as e: os.unlink(tmp_file.name) return f"Error uploading file: {str(e)}", 500 finally: try: os.unlink(tmp_file.name) except: pass return redirect(f"/?path={path}") @app.route("/download_url", methods=["POST"]) def download_url(): if not REPO_ID or not HF_TOKEN: return jsonify({"error": "Credentials not set"}), 403 data = request.get_json() url = data.get("url", "").strip() path = data.get("path", "").strip("/") custom_filename = data.get("filename", "").strip() if not url: return jsonify({"error": "No URL provided"}), 400 if not url.startswith(('http://', 'https://')): return jsonify({"error": "Invalid URL. Must start with http:// or https://"}), 400 try: filename = get_filename_from_url(url, custom_filename) dest_path = f"{path}/{filename}".strip("/") response = requests.get(url, stream=True, timeout=30) response.raise_for_status() content_length = response.headers.get('content-length') if content_length and int(content_length) > 5 * 1024 * 1024 * 1024: return jsonify({"error": "File too large. Maximum size is 5GB"}), 400 with tempfile.NamedTemporaryFile(delete=False) as tmp_file: for chunk in response.iter_content(chunk_size=8192): tmp_file.write(chunk) tmp_file.flush() upload_file( path_or_fileobj=tmp_file.name, path_in_repo=dest_path, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN ) os.unlink(tmp_file.name) return jsonify({"status": "success", "message": "File downloaded and uploaded successfully"}) except requests.exceptions.RequestException as e: return jsonify({"error": f"Failed to download from URL: {str(e)}"}), 400 except Exception as e: return jsonify({"error": f"Upload failed: {str(e)}"}), 500 @app.route("/delete", methods=["POST"]) def delete(): if not REPO_ID or not HF_TOKEN: return jsonify({"error": "Credentials not set"}), 403 data = request.get_json() delete_path = data.get("path", "").strip("/") if not delete_path: return jsonify({"error": "No path provided"}), 400 try: all_files = api.list_repo_files(repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN) deleted_count = 0 for file_path in all_files: if file_path == delete_path or file_path.startswith(delete_path.rstrip("/") + "/"): delete_file( repo_id=REPO_ID, path_in_repo=file_path, repo_type="dataset", token=HF_TOKEN ) deleted_count += 1 if deleted_count == 0: return jsonify({"error": "No files found to delete"}), 404 return jsonify({"status": "success", "message": f"Deleted {deleted_count} file(s)"}) except Exception as e: return jsonify({"error": f"Delete failed: {str(e)}"}), 500 @app.route("/create_folder", methods=["POST"]) def create_folder(): if not REPO_ID or not HF_TOKEN: return jsonify({"error": "Credentials not set"}), 403 data = request.get_json() folder_path = data.get("path", "").strip("/") if not folder_path: return jsonify({"error": "No folder path provided"}), 400 keep_file_path = f"{folder_path}/.keep" try: with tempfile.NamedTemporaryFile(delete=False) as tmp_file: tmp_file.write(b"# This file keeps the folder in git\n") tmp_file.flush() upload_file( path_or_fileobj=tmp_file.name, path_in_repo=keep_file_path, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN ) os.unlink(tmp_file.name) return jsonify({"status": "success", "message": "Folder created successfully"}) except Exception as e: return jsonify({"error": f"Failed to create folder: {str(e)}"}), 500 @app.route("/rename", methods=["POST"]) def rename(): if not REPO_ID or not HF_TOKEN: return jsonify({"error": "Credentials not set"}), 403 data = request.get_json() old_path = data.get("old_path", "").strip("/") new_name = data.get("new_path", "").strip() if not old_path or not new_name: return jsonify({"error": "Missing old path or new name"}), 400 try: parent_dir = "/".join(old_path.split("/")[:-1]) if "/" in old_path else "" new_path = f"{parent_dir}/{new_name}".strip("/") all_files = api.list_repo_files(repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN) renamed_count = 0 for file_path in all_files: if file_path == old_path or file_path.startswith(old_path + "/"): relative_path = file_path[len(old_path):].lstrip("/") new_file_path = (new_path + "/" + relative_path).strip("/") local_path = hf_hub_download( repo_id=REPO_ID, filename=file_path, repo_type="dataset", token=HF_TOKEN, cache_dir=tempfile.gettempdir() ) upload_file( path_or_fileobj=local_path, path_in_repo=new_file_path, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN ) delete_file( repo_id=REPO_ID, path_in_repo=file_path, repo_type="dataset", token=HF_TOKEN ) renamed_count += 1 if renamed_count == 0: return jsonify({"error": "No files found to rename"}), 404 return jsonify({"status": "success", "message": f"Renamed {renamed_count} file(s)"}) except Exception as e: return jsonify({"error": f"Rename failed: {str(e)}"}), 500 def get_filename_from_url(url, custom_filename=None): if custom_filename: return custom_filename parsed_url = urlparse(url) path = unquote(parsed_url.path) filename = os.path.basename(path) if not filename or '.' not in filename: filename = f"downloaded_file_{int(time.time())}" return filename if __name__ == "__main__": app.run(debug=True, host="0.0.0.0")