Spaces:
Sleeping
Sleeping
| import os | |
| import torch | |
| import time | |
| import threading | |
| import json | |
| import gc | |
| from flask import Flask, request, jsonify, send_file, Response, stream_with_context | |
| from werkzeug.utils import secure_filename | |
| from PIL import Image | |
| import io | |
| import zipfile | |
| import uuid | |
| import traceback | |
| from huggingface_hub import snapshot_download, login | |
| from flask_cors import CORS | |
| import numpy as np | |
| import trimesh | |
| from transformers import pipeline, AutoImageProcessor, AutoModelForDepthEstimation | |
| from scipy.ndimage import gaussian_filter | |
| from scipy import interpolate | |
| import cv2 | |
| app = Flask(__name__) | |
| CORS(app) | |
| # Configure directories | |
| UPLOAD_FOLDER = '/tmp/uploads' | |
| RESULTS_FOLDER = '/tmp/results' | |
| CACHE_DIR = '/tmp/huggingface' | |
| ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg'} | |
| os.makedirs(UPLOAD_FOLDER, exist_ok=True) | |
| os.makedirs(RESULTS_FOLDER, exist_ok=True) | |
| os.makedirs(CACHE_DIR, exist_ok=True) | |
| os.environ['HF_HOME'] = CACHE_DIR | |
| app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER | |
| app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 | |
| # Job tracking | |
| processing_jobs = {} | |
| # Model variables | |
| dpt_estimator = None | |
| depth_anything_model = None | |
| depth_anything_processor = None | |
| model_loaded = False | |
| model_loading = False | |
| TIMEOUT_SECONDS = 240 | |
| MAX_DIMENSION = 518 | |
| class TimeoutError(Exception): | |
| pass | |
| def process_with_timeout(function, args, timeout): | |
| result = [None] | |
| error = [None] | |
| completed = [False] | |
| def target(): | |
| try: | |
| result[0] = function(*args) | |
| completed[0] = True | |
| except Exception as e: | |
| error[0] = e | |
| thread = threading.Thread(target=target) | |
| thread.daemon = True | |
| thread.start() | |
| thread.join(timeout) | |
| if not completed[0]: | |
| if thread.is_alive(): | |
| return None, TimeoutError(f"Processing timed out after {timeout} seconds") | |
| elif error[0]: | |
| return None, error[0] | |
| if error[0]: | |
| return None, error[0] | |
| return result[0], None | |
| def allowed_file(filename): | |
| return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS | |
| def preprocess_image(image_path): | |
| with Image.open(image_path) as img: | |
| img = img.convert("RGB") | |
| if img.width > MAX_DIMENSION or img.height > MAX_DIMENSION: | |
| if img.width > img.height: | |
| new_width = MAX_DIMENSION | |
| new_height = int(img.height * (MAX_DIMENSION / img.width)) | |
| else: | |
| new_height = MAX_DIMENSION | |
| new_width = int(img.width * (MAX_DIMENSION / img.height)) | |
| img = img.resize((new_width, new_height), Image.LANCZOS) | |
| img_array = np.array(img) | |
| if len(img_array.shape) == 3 and img_array.shape[2] == 3: | |
| lab = cv2.cvtColor(img_array, cv2.COLOR_RGB2LAB) | |
| l, a, b = cv2.split(lab) | |
| clahe = cv2.createCLAHE(clipLimit=2.0, tileGridSize=(8, 8)) | |
| cl = clahe.apply(l) | |
| enhanced_lab = cv2.merge((cl, a, b)) | |
| img_array = cv2.cvtColor(enhanced_lab, cv2.COLOR_LAB2RGB) | |
| img = Image.fromarray(img_array) | |
| return img | |
| def load_models(): | |
| global dpt_estimator, depth_anything_model, depth_anything_processor, model_loaded, model_loading | |
| if model_loaded: | |
| return dpt_estimator, depth_anything_model, depth_anything_processor | |
| if model_loading: | |
| while model_loading and not model_loaded: | |
| time.sleep(0.5) | |
| return dpt_estimator, depth_anything_model, depth_anything_processor | |
| try: | |
| model_loading = True | |
| print("Loading models...") | |
| # Authenticate with Hugging Face | |
| hf_token = os.environ.get('HF_TOKEN') | |
| if hf_token: | |
| login(token=hf_token) | |
| print("Authenticated with Hugging Face token") | |
| # DPT-Large | |
| dpt_model_name = "Intel/dpt-large" | |
| max_retries = 3 | |
| retry_delay = 5 | |
| for attempt in range(max_retries): | |
| try: | |
| snapshot_download( | |
| repo_id=dpt_model_name, | |
| cache_dir=CACHE_DIR, | |
| resume_download=True, | |
| token=hf_token | |
| ) | |
| break | |
| except Exception as e: | |
| if attempt < max_retries - 1: | |
| print(f"DPT download attempt {attempt+1} failed: {str(e)}. Retrying...") | |
| time.sleep(retry_delay) | |
| retry_delay *= 2 | |
| else: | |
| raise | |
| dpt_estimator = pipeline( | |
| "depth-estimation", | |
| model=dpt_model_name, | |
| device=-1, | |
| cache_dir=CACHE_DIR, | |
| use_fast=True | |
| ) | |
| print("DPT-Large loaded") | |
| gc.collect() | |
| # Depth Anything | |
| da_model_name = "depth-anything/Depth-Anything-V2-Small-hf" | |
| for attempt in range(max_retries): | |
| try: | |
| snapshot_download( | |
| repo_id=da_model_name, | |
| cache_dir=CACHE_DIR, | |
| resume_download=True, | |
| token=hf_token | |
| ) | |
| break | |
| except Exception as e: | |
| if attempt < max_retries - 1: | |
| print(f"Depth Anything download attempt {attempt+1} failed: {str(e)}. Retrying...") | |
| time.sleep(retry_delay) | |
| retry_delay *= 2 | |
| else: | |
| print(f"Failed to load Depth Anything: {str(e)}. Falling back to DPT-Large only.") | |
| depth_anything_model = None | |
| depth_anything_processor = None | |
| model_loaded = True | |
| return dpt_estimator, None, None | |
| depth_anything_processor = AutoImageProcessor.from_pretrained( | |
| da_model_name, | |
| cache_dir=CACHE_DIR, | |
| token=hf_token | |
| ) | |
| depth_anything_model = AutoModelForDepthEstimation.from_pretrained( | |
| da_model_name, | |
| cache_dir=CACHE_DIR, | |
| token=hf_token | |
| ).to("cpu") | |
| model_loaded = True | |
| print("Depth Anything loaded") | |
| return dpt_estimator, depth_anything_model, depth_anything_processor | |
| except Exception as e: | |
| print(f"Error loading models: {str(e)}") | |
| print(traceback.format_exc()) | |
| raise | |
| finally: | |
| model_loading = False | |
| def fuse_depth_maps(dpt_depth, da_depth, detail_level='medium'): | |
| if isinstance(dpt_depth, Image.Image): | |
| dpt_depth = np.array(dpt_depth) | |
| if isinstance(da_depth, torch.Tensor): | |
| da_depth = da_depth.cpu().numpy() | |
| if len(dpt_depth.shape) > 2: | |
| dpt_depth = np.mean(dpt_depth, axis=2) | |
| if len(da_depth.shape) > 2: | |
| da_depth = np.mean(da_depth, axis=2) | |
| if dpt_depth.shape != da_depth.shape: | |
| da_depth = cv2.resize(da_depth, (dpt_depth.shape[1], dpt_depth.shape[0]), interpolation=cv2.INTER_CUBIC) | |
| p_low_dpt, p_high_dpt = np.percentile(dpt_depth, [1, 99]) | |
| p_low_da, p_high_da = np.percentile(da_depth, [1, 99]) | |
| dpt_depth = np.clip((dpt_depth - p_low_dpt) / (p_high_dpt - p_low_dpt), 0, 1) if p_high_dpt > p_low_dpt else dpt_depth | |
| da_depth = np.clip((da_depth - p_low_da) / (p_high_da - p_low_da), 0, 1) if p_high_da > p_low_da else da_depth | |
| if detail_level == 'high': | |
| weight_da = 0.7 | |
| edges = cv2.Canny((da_depth * 255).astype(np.uint8), 50, 150) | |
| edge_mask = (edges > 0).astype(np.float32) | |
| dpt_weight = gaussian_filter(1 - edge_mask, sigma=1.0) | |
| da_weight = gaussian_filter(edge_mask, sigma=1.0) | |
| fused_depth = dpt_weight * dpt_depth + da_weight * da_depth * weight_da + (1 - weight_da) * dpt_depth | |
| else: | |
| weight_da = 0.5 if detail_level == 'medium' else 0.3 | |
| fused_depth = (1 - weight_da) * dpt_depth + weight_da * da_depth | |
| fused_depth = np.clip(fused_depth, 0, 1) | |
| return fused_depth | |
| def enhance_depth_map(depth_map, detail_level='medium'): | |
| enhanced_depth = depth_map.copy().astype(np.float32) | |
| p_low, p_high = np.percentile(enhanced_depth, [1, 99]) | |
| enhanced_depth = np.clip(enhanced_depth, p_low, p_high) | |
| enhanced_depth = (enhanced_depth - p_low) / (p_high - p_low) if p_high > p_low else enhanced_depth | |
| if detail_level == 'high': | |
| blurred = gaussian_filter(enhanced_depth, sigma=1.5) | |
| mask = enhanced_depth - blurred | |
| enhanced_depth = enhanced_depth + 1.5 * mask | |
| smooth1 = gaussian_filter(enhanced_depth, sigma=0.5) | |
| smooth2 = gaussian_filter(enhanced_depth, sigma=2.0) | |
| edge_mask = enhanced_depth - smooth2 | |
| enhanced_depth = smooth1 + 1.2 * edge_mask | |
| elif detail_level == 'medium': | |
| blurred = gaussian_filter(enhanced_depth, sigma=1.0) | |
| mask = enhanced_depth - blurred | |
| enhanced_depth = enhanced_depth + 0.8 * mask | |
| enhanced_depth = gaussian_filter(enhanced_depth, sigma=0.5) | |
| else: | |
| enhanced_depth = gaussian_filter(enhanced_depth, sigma=0.7) | |
| enhanced_depth = np.clip(enhanced_depth, 0, 1) | |
| return enhanced_depth | |
| def depth_to_mesh(depth_map, image, resolution=100, detail_level='medium'): | |
| enhanced_depth = enhance_depth_map(depth_map, detail_level) | |
| h, w = enhanced_depth.shape | |
| x = np.linspace(0, w-1, resolution) | |
| y = np.linspace(0, h-1, resolution) | |
| x_grid, y_grid = np.meshgrid(x, y) | |
| interp_func = interpolate.RectBivariateSpline( | |
| np.arange(h), np.arange(w), enhanced_depth, kx=3, ky=3 | |
| ) | |
| z_values = interp_func(y, x, grid=True) | |
| if detail_level == 'high': | |
| dx = np.gradient(z_values, axis=1) | |
| dy = np.gradient(z_values, axis=0) | |
| gradient_magnitude = np.sqrt(dx**2 + dy**2) | |
| edge_mask = np.clip(gradient_magnitude * 5, 0, 0.2) | |
| z_values = z_values + edge_mask * (z_values - gaussian_filter(z_values, sigma=1.0)) | |
| z_min, z_max = np.percentile(z_values, [2, 98]) | |
| z_values = (z_values - z_min) / (z_max - z_min) if z_max > z_min else z_values | |
| z_scaling = 2.5 if detail_level == 'high' else 2.0 if detail_level == 'medium' else 1.5 | |
| z_values = z_values * z_scaling | |
| x_grid = (x_grid / w - 0.5) * 2.0 | |
| y_grid = (y_grid / h - 0.5) * 2.0 | |
| vertices = np.vstack([x_grid.flatten(), -y_grid.flatten(), -z_values.flatten()]).T | |
| faces = [] | |
| for i in range(resolution-1): | |
| for j in range(resolution-1): | |
| p1 = i * resolution + j | |
| p2 = i * resolution + (j + 1) | |
| p3 = (i + 1) * resolution + j | |
| p4 = (i + 1) * resolution + (j + 1) | |
| v1 = vertices[p1] | |
| v2 = vertices[p2] | |
| v3 = vertices[p3] | |
| v4 = vertices[p4] | |
| norm1 = np.cross(v2-v1, v4-v1) | |
| norm2 = np.cross(v4-v3, v1-v3) | |
| if np.dot(norm1, norm2) >= 0: | |
| faces.append([p1, p2, p4]) | |
| faces.append([p1, p4, p3]) | |
| else: | |
| faces.append([p1, p2, p3]) | |
| faces.append([p2, p4, p3]) | |
| faces = np.array(faces) | |
| mesh = trimesh.Trimesh(vertices=vertices, faces=faces) | |
| if image: | |
| img_array = np.array(image) | |
| vertex_colors = np.zeros((vertices.shape[0], 4), dtype=np.uint8) | |
| for i in range(resolution): | |
| for j in range(resolution): | |
| img_x = j * (img_array.shape[1] - 1) / (resolution - 1) | |
| img_y = i * (img_array.shape[0] - 1) / (resolution - 1) | |
| x0, y0 = int(img_x), int(img_y) | |
| x1, y1 = min(x0 + 1, img_array.shape[1] - 1), min(y0 + 1, img_array.shape[0] - 1) | |
| wx = img_x - x0 | |
| wy = img_y - y0 | |
| vertex_idx = i * resolution + j | |
| if len(img_array.shape) == 3 and img_array.shape[2] == 3: | |
| r = int((1-wx)*(1-wy)*img_array[y0, x0, 0] + wx*(1-wy)*img_array[y0, x1, 0] + | |
| (1-wx)*wy*img_array[y1, x0, 0] + wx*wy*img_array[y1, x1, 0]) | |
| g = int((1-wx)*(1-wy)*img_array[y0, x0, 1] + wx*(1-wy)*img_array[y0, x1, 1] + | |
| (1-wx)*wy*img_array[y1, x0, 1] + wx*wy*img_array[y1, x1, 1]) | |
| b = int((1-wx)*(1-wy)*img_array[y0, x0, 2] + wx*(1-wy)*img_array[y0, x1, 2] + | |
| (1-wx)*wy*img_array[y1, x0, 2] + wx*wy*img_array[y1, x1, 2]) | |
| vertex_colors[vertex_idx, :3] = [r, g, b] | |
| vertex_colors[vertex_idx, 3] = 255 | |
| elif len(img_array.shape) == 3 and img_array.shape[2] == 4: | |
| for c in range(4): | |
| vertex_colors[vertex_idx, c] = int((1-wx)*(1-wy)*img_array[y0, x0, c] + | |
| wx*(1-wy)*img_array[y0, x1, c] + | |
| (1-wx)*wy*img_array[y1, x0, c] + | |
| wx*wy*img_array[y1, x1, c]) | |
| else: | |
| gray = int((1-wx)*(1-wy)*img_array[y0, x0] + wx*(1-wy)*img_array[y0, x1] + | |
| (1-wx)*wy*img_array[y1, x0] + wx*wy*img_array[y1, x1]) | |
| vertex_colors[vertex_idx, :3] = [gray, gray, gray] | |
| vertex_colors[vertex_idx, 3] = 255 | |
| mesh.visual.vertex_colors = vertex_colors | |
| if detail_level != 'high': | |
| mesh = mesh.smoothed(method='laplacian', iterations=1) | |
| mesh.fix_normals() | |
| return mesh | |
| def health_check(): | |
| return jsonify({ | |
| "status": "healthy", | |
| "model": "DPT-Large + Depth Anything", | |
| "device": "cpu" | |
| }), 200 | |
| def progress(job_id): | |
| def generate(): | |
| if job_id not in processing_jobs: | |
| yield f"data: {json.dumps({'error': 'Job not found'})}\n\n" | |
| return | |
| job = processing_jobs[job_id] | |
| yield f"data: {json.dumps({'status': 'processing', 'progress': job['progress']})}\n\n" | |
| last_progress = job['progress'] | |
| check_count = 0 | |
| while job['status'] == 'processing': | |
| if job['progress'] != last_progress: | |
| yield f"data: {json.dumps({'status': 'processing', 'progress': job['progress']})}\n\n" | |
| last_progress = job['progress'] | |
| time.sleep(0.5) | |
| check_count += 1 | |
| if check_count > 60: | |
| if 'thread_alive' in job and not job['thread_alive'](): | |
| job['status'] = 'error' | |
| job['error'] = 'Processing thread died unexpectedly' | |
| break | |
| check_count = 0 | |
| if job['status'] == 'completed': | |
| yield f"data: {json.dumps({'status': 'completed', 'progress': 100, 'result_url': job['result_url'], 'preview_url': job['preview_url']})}\n\n" | |
| else: | |
| yield f"data: {json.dumps({'status': 'error', 'error': job['error']})}\n\n" | |
| return Response(stream_with_context(generate()), mimetype='text/event-stream') | |
| def convert_image_to_3d(): | |
| if 'image' not in request.files: | |
| return jsonify({"error": "No image provided"}), 400 | |
| file = request.files['image'] | |
| if file.filename == '': | |
| return jsonify({"error": "No image selected"}), 400 | |
| if not allowed_file(file.filename): | |
| return jsonify({"error": f"File type not allowed. Supported types: {', '.join(ALLOWED_EXTENSIONS)}"}), 400 | |
| try: | |
| mesh_resolution = min(int(request.form.get('mesh_resolution', 100)), 150) | |
| output_format = request.form.get('output_format', 'glb').lower() | |
| detail_level = request.form.get('detail_level', 'medium').lower() | |
| texture_quality = request.form.get('texture_quality', 'medium').lower() | |
| except ValueError: | |
| return jsonify({"error": "Invalid parameter values"}), 400 | |
| if output_format not in ['obj', 'glb']: | |
| return jsonify({"error": "Unsupported output format. Use 'obj' or 'glb'"}), 400 | |
| if detail_level == 'high': | |
| mesh_resolution = min(int(mesh_resolution * 1.5), 150) | |
| elif detail_level == 'low': | |
| mesh_resolution = max(int(mesh_resolution * 0.7), 50) | |
| job_id = str(uuid.uuid4()) | |
| output_dir = os.path.join(RESULTS_FOLDER, job_id) | |
| os.makedirs(output_dir, exist_ok=True) | |
| filename = secure_filename(file.filename) | |
| filepath = os.path.join(app.config['UPLOAD_FOLDER'], f"{job_id}_{filename}") | |
| file.save(filepath) | |
| processing_jobs[job_id] = { | |
| 'status': 'processing', | |
| 'progress': 0, | |
| 'result_url': None, | |
| 'preview_url': None, | |
| 'error': None, | |
| 'output_format': output_format, | |
| 'created_at': time.time() | |
| } | |
| def process_image(): | |
| thread = threading.current_thread() | |
| processing_jobs[job_id]['thread_alive'] = lambda: thread.is_alive() | |
| try: | |
| processing_jobs[job_id]['progress'] = 5 | |
| image = preprocess_image(filepath) | |
| processing_jobs[job_id]['progress'] = 10 | |
| try: | |
| dpt_model, da_model, da_processor = load_models() | |
| processing_jobs[job_id]['progress'] = 30 | |
| except Exception as e: | |
| processing_jobs[job_id]['status'] = 'error' | |
| processing_jobs[job_id]['error'] = f"Error loading models: {str(e)}" | |
| return | |
| try: | |
| def estimate_depth(): | |
| with torch.no_grad(): | |
| # DPT-Large | |
| dpt_result = dpt_model(image) | |
| dpt_depth = dpt_result["depth"] | |
| # Depth Anything (if loaded) | |
| if da_model and da_processor: | |
| inputs = da_processor(images=image, return_tensors="pt") | |
| inputs = {k: v.to("cpu") for k, v in inputs.items()} | |
| outputs = da_model(**inputs) | |
| da_depth = outputs.predicted_depth.squeeze() | |
| da_depth = torch.nn.functional.interpolate( | |
| da_depth.unsqueeze(0).unsqueeze(0), | |
| size=(image.height, image.width), | |
| mode='bicubic', | |
| align_corners=False | |
| ).squeeze() | |
| fused_depth = fuse_depth_maps(dpt_depth, da_depth, detail_level) | |
| else: | |
| fused_depth = np.array(dpt_depth) if isinstance(dpt_depth, Image.Image) else dpt_depth | |
| if len(fused_depth.shape) > 2: | |
| fused_depth = np.mean(fused_depth, axis=2) | |
| p_low, p_high = np.percentile(fused_depth, [1, 99]) | |
| fused_depth = np.clip((fused_depth - p_low) / (p_high - p_low), 0, 1) if p_high > p_low else fused_depth | |
| return fused_depth | |
| fused_depth, error = process_with_timeout(estimate_depth, [], TIMEOUT_SECONDS) | |
| if error: | |
| if isinstance(error, TimeoutError): | |
| processing_jobs[job_id]['status'] = 'error' | |
| processing_jobs[job_id]['error'] = f"Processing timed out after {TIMEOUT_SECONDS} seconds" | |
| return | |
| else: | |
| raise error | |
| processing_jobs[job_id]['progress'] = 60 | |
| mesh_resolution_int = int(mesh_resolution) | |
| mesh = depth_to_mesh(fused_depth, image, resolution=mesh_resolution_int, detail_level=detail_level) | |
| processing_jobs[job_id]['progress'] = 80 | |
| if output_format == 'obj': | |
| obj_path = os.path.join(output_dir, "model.obj") | |
| mesh.export( | |
| obj_path, | |
| file_type='obj', | |
| include_normals=True, | |
| include_texture=True | |
| ) | |
| zip_path = os.path.join(output_dir, "model.zip") | |
| with zipfile.ZipFile(zip_path, 'w') as zipf: | |
| zipf.write(obj_path, arcname="model.obj") | |
| mtl_path = os.path.join(output_dir, "model.mtl") | |
| if os.path.exists(mtl_path): | |
| zipf.write(mtl_path, arcname="model.mtl") | |
| texture_path = os.path.join(output_dir, "model.png") | |
| if os.path.exists(texture_path): | |
| zipf.write(texture_path, arcname="model.png") | |
| processing_jobs[job_id]['result_url'] = f"/download/{job_id}" | |
| processing_jobs[job_id]['preview_url'] = f"/preview/{job_id}" | |
| elif output_format == 'glb': | |
| glb_path = os.path.join(output_dir, "model.glb") | |
| mesh.export( | |
| glb_path, | |
| file_type='glb' | |
| ) | |
| processing_jobs[job_id]['result_url'] = f"/download/{job_id}" | |
| processing_jobs[job_id]['preview_url'] = f"/preview/{job_id}" | |
| processing_jobs[job_id]['status'] = 'completed' | |
| processing_jobs[job_id]['progress'] = 100 | |
| print(f"Job {job_id} completed") | |
| except Exception as e: | |
| error_details = traceback.format_exc() | |
| processing_jobs[job_id]['status'] = 'error' | |
| processing_jobs[job_id]['error'] = f"Error during processing: {str(e)}" | |
| print(f"Error processing job {job_id}: {str(e)}") | |
| print(error_details) | |
| return | |
| if os.path.exists(filepath): | |
| os.remove(filepath) | |
| gc.collect() | |
| except Exception as e: | |
| error_details = traceback.format_exc() | |
| processing_jobs[job_id]['status'] = 'error' | |
| processing_jobs[job_id]['error'] = f"{str(e)}\n{error_details}" | |
| print(f"Error processing job {job_id}: {str(e)}") | |
| print(error_details) | |
| if os.path.exists(filepath): | |
| os.remove(filepath) | |
| processing_thread = threading.Thread(target=process_image) | |
| processing_thread.daemon = True | |
| processing_thread.start() | |
| return jsonify({"job_id": job_id}), 202 | |
| def download_model(job_id): | |
| if job_id not in processing_jobs or processing_jobs[job_id]['status'] != 'completed': | |
| return jsonify({"error": "Model not found or processing not complete"}), 404 | |
| output_dir = os.path.join(RESULTS_FOLDER, job_id) | |
| output_format = processing_jobs[job_id].get('output_format', 'glb') | |
| if output_format == 'obj': | |
| zip_path = os.path.join(output_dir, "model.zip") | |
| if os.path.exists(zip_path): | |
| return send_file(zip_path, as_attachment=True, download_name="model.zip") | |
| else: | |
| glb_path = os.path.join(output_dir, "model.glb") | |
| if os.path.exists(glb_path): | |
| return send_file(glb_path, as_attachment=True, download_name="model.glb") | |
| return jsonify({"error": "File not found"}), 404 | |
| def preview_model(job_id): | |
| if job_id not in processing_jobs or processing_jobs[job_id]['status'] != 'completed': | |
| return jsonify({"error": "Model not found or processing not complete"}), 404 | |
| output_dir = os.path.join(RESULTS_FOLDER, job_id) | |
| output_format = processing_jobs[job_id].get('output_format', 'glb') | |
| if output_format == 'obj': | |
| obj_path = os.path.join(output_dir, "model.obj") | |
| if os.path.exists(obj_path): | |
| return send_file(obj_path, mimetype='model/obj') | |
| else: | |
| glb_path = os.path.join(output_dir, "model.glb") | |
| if os.path.exists(glb_path): | |
| return send_file(glb_path, mimetype='model/gltf-binary') | |
| return jsonify({"error": "File not found"}), 404 | |
| def cleanup_old_jobs(): | |
| current_time = time.time() | |
| job_ids_to_remove = [] | |
| for job_id, job_data in processing_jobs.items(): | |
| if job_data['status'] == 'completed' and (current_time - job_data.get('created_at', 0)) > 3600: | |
| job_ids_to_remove.append(job_id) | |
| elif job_data['status'] == 'error' and (current_time - job_data.get('created_at', 0)) > 1800: | |
| job_ids_to_remove.append(job_id) | |
| for job_id in job_ids_to_remove: | |
| output_dir = os.path.join(RESULTS_FOLDER, job_id) | |
| try: | |
| import shutil | |
| if os.path.exists(output_dir): | |
| shutil.rmtree(output_dir) | |
| except Exception as e: | |
| print(f"Error cleaning up job {job_id}: {str(e)}") | |
| if job_id in processing_jobs: | |
| del processing_jobs[job_id] | |
| threading.Timer(300, cleanup_old_jobs).start() | |
| def model_info(job_id): | |
| if job_id not in processing_jobs: | |
| return jsonify({"error": "Model not found"}), 404 | |
| job = processing_jobs[job_id] | |
| if job['status'] != 'completed': | |
| return jsonify({ | |
| "status": job['status'], | |
| "progress": job['progress'], | |
| "error": job.get('error') | |
| }), 200 | |
| output_dir = os.path.join(RESULTS_FOLDER, job_id) | |
| model_stats = {} | |
| if job['output_format'] == 'obj': | |
| obj_path = os.path.join(output_dir, "model.obj") | |
| zip_path = os.path.join(output_dir, "model.zip") | |
| if os.path.exists(obj_path): | |
| model_stats['obj_size'] = os.path.getsize(obj_path) | |
| if os.path.exists(zip_path): | |
| model_stats['package_size'] = os.path.getsize(zip_path) | |
| else: | |
| glb_path = os.path.join(output_dir, "model.glb") | |
| if os.path.exists(glb_path): | |
| model_stats['model_size'] = os.path.getsize(glb_path) | |
| return jsonify({ | |
| "status": job['status'], | |
| "model_format": job['output_format'], | |
| "download_url": job['result_url'], | |
| "preview_url": job['preview_url'], | |
| "model_stats": model_stats, | |
| "created_at": job.get('created_at'), | |
| "completed_at": job.get('completed_at') | |
| }), 200 | |
| def index(): | |
| return jsonify({ | |
| "message": "Image to 3D API (DPT-Large + Depth Anything)", | |
| "endpoints": [ | |
| "/convert", | |
| "/progress/<job_id>", | |
| "/download/<job_id>", | |
| "/preview/<job_id>", | |
| "/model-info/<job_id>" | |
| ], | |
| "parameters": { | |
| "mesh_resolution": "Integer (50-150)", | |
| "output_format": "obj or glb", | |
| "detail_level": "low, medium, or high", | |
| "texture_quality": "low, medium, or high" | |
| }, | |
| "description": "Creates high-quality 3D models from 2D images using DPT-Large and Depth Anything." | |
| }), 200 | |
| if __name__ == '__main__': | |
| cleanup_old_jobs() | |
| port = int(os.environ.get('PORT', 7860)) | |
| app.run(host='0.0.0.0', port=port) | |