Spaces:
Running
Running
import os | |
import io | |
import shutil | |
import tempfile | |
from flask import Flask, render_template_string, request, redirect, send_file, jsonify | |
from huggingface_hub import HfApi, HfFileSystem, login, upload_file, delete_file, move_file | |
# Set env vars | |
REPO_ID = os.getenv("REPO_ID") | |
HF_TOKEN = os.getenv("HF_TOKEN") | |
app = Flask(__name__) | |
api = HfApi() | |
fs = HfFileSystem(token=HF_TOKEN) | |
login(token=HF_TOKEN) | |
TEMPLATE = """ | |
<!DOCTYPE html> | |
<html> | |
<head> | |
<title>HuggingFace Drive</title> | |
<style> | |
body { | |
background-color: #121212; | |
color: #e0e0e0; | |
font-family: Arial, sans-serif; | |
padding: 20px; | |
} | |
.folder, .file { | |
padding: 5px 10px; | |
margin: 5px 0; | |
background-color: #1e1e1e; | |
border-radius: 5px; | |
cursor: pointer; | |
} | |
.folder:hover, .file:hover { | |
background-color: #2a2a2a; | |
} | |
.actions { | |
margin-top: 10px; | |
} | |
button { | |
background: #333; | |
color: #fff; | |
border: none; | |
padding: 8px 12px; | |
margin-right: 5px; | |
border-radius: 4px; | |
cursor: pointer; | |
} | |
button:hover { | |
background: #444; | |
} | |
input[type="text"], input[type="file"] { | |
background: #1e1e1e; | |
color: #fff; | |
border: 1px solid #333; | |
padding: 5px; | |
margin-right: 5px; | |
} | |
</style> | |
<script> | |
async function navigate(path) { | |
const res = await fetch('/?path=' + encodeURIComponent(path)); | |
document.open(); | |
document.write(await res.text()); | |
document.close(); | |
} | |
async function deleteFile(path) { | |
if (!confirm("Delete " + path + "?")) return; | |
await fetch('/delete', { | |
method: 'POST', | |
headers: { 'Content-Type': 'application/json' }, | |
body: JSON.stringify({ path }) | |
}); | |
location.reload(); | |
} | |
async function createFolder(currentPath) { | |
const name = prompt("Folder name:"); | |
if (!name) return; | |
await fetch('/create_folder', { | |
method: 'POST', | |
headers: { 'Content-Type': 'application/json' }, | |
body: JSON.stringify({ path: currentPath + '/' + name }) | |
}); | |
location.reload(); | |
} | |
async function renamePath(oldPath) { | |
const newPath = prompt("Rename to:", oldPath); | |
if (!newPath || newPath === oldPath) return; | |
await fetch('/rename', { | |
method: 'POST', | |
headers: { 'Content-Type': 'application/json' }, | |
body: JSON.stringify({ old_path: oldPath, new_path: newPath }) | |
}); | |
location.reload(); | |
} | |
</script> | |
</head> | |
<body> | |
<h2>HuggingFace Drive - {{ path or 'Root' }}</h2> | |
{% if parent %} | |
<div><a href="#" onclick="navigate('{{ parent }}')">β¬ οΈ Back</a></div> | |
{% endif %} | |
<div class="actions"> | |
<form method="POST" action="/upload" enctype="multipart/form-data"> | |
<input type="file" name="file"> | |
<input type="hidden" name="path" value="{{ path }}"> | |
<button type="submit">Upload</button> | |
</form> | |
<button onclick="createFolder('{{ path }}')">Create Folder</button> | |
</div> | |
<hr> | |
{% for item in items %} | |
{% if item['type'] == 'dir' %} | |
<div class="folder" onclick="navigate('{{ item['path'] }}')">π {{ item['name'] }} | |
<button onclick="event.stopPropagation(); renamePath('{{ item['path'] }}')">Rename</button> | |
<button onclick="event.stopPropagation(); deleteFile('{{ item['path'] }}')">Delete</button> | |
</div> | |
{% else %} | |
<div class="file"> | |
π {{ item['name'] }} | |
<a href="/download?path={{ item['path'] }}"><button>Download</button></a> | |
<button onclick="renamePath('{{ item['path'] }}')">Rename</button> | |
<button onclick="deleteFile('{{ item['path'] }}')">Delete</button> | |
</div> | |
{% endif %} | |
{% endfor %} | |
</body> | |
</html> | |
""" | |
def list_dataset_files(path=""): | |
files = fs.ls(f"datasets/{REPO_ID}/{path}", detail=True) | |
folders = [] | |
items = [] | |
seen = set() | |
for file in files: | |
relative_path = file["name"].replace(f"datasets/{REPO_ID}/", "") | |
parts = relative_path.split("/") | |
if len(parts) > 1: | |
folder_path = "/".join(parts[:1]) | |
full_folder = f"{path}/{folder_path}".strip("/") | |
if full_folder not in seen: | |
folders.append({"type": "dir", "name": folder_path, "path": full_folder}) | |
seen.add(full_folder) | |
elif len(parts) == 1: | |
items.append({"type": "file", "name": parts[0], "path": f"{path}/{parts[0]}".strip("/")}) | |
return sorted(folders + items, key=lambda x: x["name"].lower()) | |
def index(): | |
path = request.args.get("path", "").strip("/") | |
parent = "/".join(path.split("/")[:-1]) if path else None | |
items = list_dataset_files(path) | |
return render_template_string(TEMPLATE, items=items, path=path, parent=parent) | |
def download(): | |
path = request.args.get("path") | |
with tempfile.NamedTemporaryFile(delete=False) as tmp: | |
fs.download(f"datasets/{REPO_ID}/{path}", tmp.name) | |
return send_file(tmp.name, as_attachment=True, download_name=os.path.basename(path)) | |
def upload(): | |
file = request.files["file"] | |
path = request.form.get("path", "").strip("/") | |
dest = f"{path}/{file.filename}".strip("/") | |
with tempfile.NamedTemporaryFile() as tmp: | |
file.save(tmp.name) | |
upload_file(path_or_fileobj=tmp.name, path_in_repo=dest, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN) | |
return redirect(f"/?path={path}") | |
def delete(): | |
data = request.get_json() | |
delete_file(repo_id=REPO_ID, path_in_repo=data["path"], repo_type="dataset", token=HF_TOKEN) | |
return jsonify({"status": "deleted"}) | |
def create_folder(): | |
data = request.get_json() | |
path = data["path"].strip("/") | |
dummy_path = f"{path}/.keep" | |
with tempfile.NamedTemporaryFile() as tmp: | |
tmp.write(b"") | |
tmp.flush() | |
upload_file(path_or_fileobj=tmp.name, path_in_repo=dummy_path, repo_id=REPO_ID, repo_type="dataset", token=HF_TOKEN) | |
return jsonify({"status": "folder_created"}) | |
def rename(): | |
data = request.get_json() | |
move_file(repo_id=REPO_ID, src_path=data["old_path"], dst_path=data["new_path"], repo_type="dataset", token=HF_TOKEN) | |
return jsonify({"status": "renamed"}) | |
if __name__ == "__main__": | |
app.run(debug=True, port=7860) | |