Spaces:
Sleeping
Sleeping
File size: 7,465 Bytes
de0c2cf a76e4e2 de0c2cf 7e22793 de0c2cf 7e22793 a5af344 de0c2cf c7f856b 7e22793 35c2f39 a7ce240 970297d a7ce240 107b889 a7ce240 970297d a7ce240 970297d a7ce240 35c2f39 b020c6c 107b889 35c2f39 b020c6c 35c2f39 b020c6c 35c2f39 b020c6c 35c2f39 b020c6c 35c2f39 b020c6c 35c2f39 b020c6c 35c2f39 107b889 7e22793 2f32fa5 27cc4b8 685a3eb 2f32fa5 27cc4b8 685a3eb 2f32fa5 685a3eb 2f32fa5 685a3eb 2f32fa5 685a3eb 2f32fa5 685a3eb 2f32fa5 685a3eb 2f32fa5 27cc4b8 685a3eb 27cc4b8 685a3eb 27cc4b8 685a3eb 27cc4b8 685a3eb 27cc4b8 2f32fa5 685a3eb 2f32fa5 27cc4b8 2f32fa5 7058e0b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 |
import os
os.environ["HF_HOME"] = "/tmp/hf_cache"
os.makedirs("/tmp/hf_cache", exist_ok=True)
from fastapi import FastAPI, Query
from huggingface_hub import list_repo_files, hf_hub_download, upload_file
import io
import requests
from fastapi import FastAPI, UploadFile, File
from fastapi.middleware.cors import CORSMiddleware
import os
import os
import zipfile
import tempfile # β
Add this!
app = FastAPI()
# CORS setup to allow requests from your frontend
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Replace "*" with your frontend domain in production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/")
def health_check():
return {"status": "β
FastAPI running on Hugging Face Spaces!"}
REPO_ID = "rahul7star/ohamlab"
FOLDER = "demo"
BASE_URL = f"https://huggingface.co/{REPO_ID}/resolve/main/"
#show all images in a DIR at UI FE
@app.get("/images")
def list_images():
try:
all_files = list_repo_files(REPO_ID)
folder_prefix = FOLDER.rstrip("/") + "/"
files_in_folder = [
f for f in all_files
if f.startswith(folder_prefix)
and "/" not in f[len(folder_prefix):] # no subfolder files
and f.lower().endswith((".png", ".jpg", ".jpeg", ".webp"))
]
urls = [BASE_URL + f for f in files_in_folder]
return {"images": urls}
except Exception as e:
return {"error": str(e)}
from datetime import datetime
import tempfile
import uuid
# upload zip from UI
@app.post("/upload-zip")
async def upload_zip(file: UploadFile = File(...)):
if not file.filename.endswith(".zip"):
return {"error": "Please upload a .zip file"}
# Save the ZIP to /tmp
temp_zip_path = f"/tmp/{file.filename}"
with open(temp_zip_path, "wb") as f:
f.write(await file.read())
# Create a unique subfolder name inside 'demo/'
timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
unique_id = uuid.uuid4().hex[:6]
folder_name = f"upload_{timestamp}_{unique_id}"
hf_folder_prefix = f"demo/{folder_name}"
try:
with tempfile.TemporaryDirectory() as extract_dir:
# Extract zip
with zipfile.ZipFile(temp_zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_dir)
uploaded_files = []
# Upload all extracted files
for root_dir, _, files in os.walk(extract_dir):
for name in files:
file_path = os.path.join(root_dir, name)
relative_path = os.path.relpath(file_path, extract_dir)
repo_path = f"{hf_folder_prefix}/{relative_path}".replace("\\", "/")
upload_file(
path_or_fileobj=file_path,
path_in_repo=repo_path,
repo_id="rahul7star/ohamlab",
repo_type="model",
commit_message=f"Upload {relative_path} to {folder_name}",
token=True,
)
uploaded_files.append(repo_path)
return {
"message": f"β
Uploaded {len(uploaded_files)} files",
"folder": folder_name,
"files": uploaded_files,
}
except Exception as e:
return {"error": f"β Failed to process zip: {str(e)}"}
# upload a single file from UI
@app.post("/upload")
async def upload_image(file: UploadFile = File(...)):
filename = file.filename
contents = await file.read()
# Save temporarily
temp_path = f"/tmp/{filename}"
with open(temp_path, "wb") as f:
f.write(contents)
try:
upload_file(
path_or_fileobj=temp_path,
path_in_repo=f"demo/{filename}",
repo_id="rahul7star/ohamlab",
repo_type="model",
commit_message=f"Upload {filename} to demo folder",
token=True, # uses HF_TOKEN from Space secrets
)
return {"message": f"β
{filename} uploaded successfully!"}
except Exception as e:
return {"error": f"β Upload failed: {str(e)}"}
#Tranining Data set start fitering data for traninig
T_REPO_ID = "rahul7star/ohamlab"
DESCRIPTION_TEXT = (
"Ra3hul is wearing a black jacket over a striped white t-shirt with blue jeans. "
"He is standing near a lake with his arms spread wide open, with mountains and cloudy skies in the background."
)
def is_image_file(filename: str) -> bool:
return filename.lower().endswith((".png", ".jpg", ".jpeg", ".webp"))
@app.post("/filter-images")
def filter_and_rename_images(folder: str = Query("demo", description="Folder path in repo to scan")):
try:
all_files = list_repo_files(T_REPO_ID)
folder_prefix = folder.rstrip("/") + "/"
filter_folder = f"filter-{folder.rstrip('/')}"
filter_prefix = filter_folder + "/"
# Filter images only directly in the folder (no subfolders)
image_files = [
f for f in all_files
if f.startswith(folder_prefix)
and "/" not in f[len(folder_prefix):] # no deeper path
and is_image_file(f)
]
if not image_files:
return {"error": f"No images found in folder '{folder}'"}
uploaded_files = []
for idx, orig_path in enumerate(image_files, start=1):
# Download image content bytes (uses local cache)
local_path = hf_hub_download(repo_id=T_REPO_ID, filename=orig_path)
with open(local_path, "rb") as f:
file_bytes = f.read()
# Rename images as image1.jpeg, image2.jpeg, ...
new_image_name = f"image{idx}.jpeg"
# Upload renamed image from memory
upload_file(
path_or_fileobj=io.BytesIO(file_bytes),
path_in_repo=filter_prefix + new_image_name,
repo_id=T_REPO_ID,
repo_type="model",
commit_message=f"Upload renamed image {new_image_name} to {filter_folder}",
token=True,
)
uploaded_files.append(filter_prefix + new_image_name)
# Create and upload text file for each image
txt_filename = f"image{idx}.txt"
upload_file(
path_or_fileobj=io.BytesIO(DESCRIPTION_TEXT.encode("utf-8")),
path_in_repo=filter_prefix + txt_filename,
repo_id=T_REPO_ID,
repo_type="model",
commit_message=f"Upload text file {txt_filename} to {filter_folder}",
token=True,
)
uploaded_files.append(filter_prefix + txt_filename)
return {
"message": f"Processed and uploaded {len(image_files)} images and text files.",
"files": uploaded_files,
}
except Exception as e:
return {"error": str(e)}
# Test call another space and send the payload
@app.post("/webhook-trigger")
def call_other_space():
try:
# You can dynamically collect input from elsewhere too
payload = {"input": "Start training from external trigger"}
res = requests.post(
"https://rahul7star-ohamlab-ai-toolkit.hf.space/trigger",
json=payload,
timeout=30,
)
return res.json()
except Exception as e:
return {"error": str(e)} |