lionelgarnier
refactor session management and update event handlers for improved clarity and functionality
cc9aa28
raw
history blame
23 kB
import gradio as gr
import numpy as np
import random
import os
import spaces
import torch
from diffusers import DiffusionPipeline
from transformers import pipeline, AutoTokenizer
from huggingface_hub import login
from PIL import Image
from gradio_litmodel3d import LitModel3D
import shutil
os.environ['SPCONV_ALGO'] = 'native'
from typing import *
import imageio
from easydict import EasyDict as edict
from trellis.pipelines import TrellisImageTo3DPipeline
from trellis.representations import Gaussian, MeshExtractResult
from trellis.utils import render_utils, postprocessing_utils
hf_token = os.getenv("hf_token")
login(token=hf_token)
# Global constants and default values
MAX_SEED = np.iinfo(np.int32).max
MAX_IMAGE_SIZE = 2048
PRELOAD_MODELS = False
# Default system prompt for text generation
DEFAULT_SYSTEM_PROMPT = """You are a product designer with strong knowledge in text-to-image generation. You will receive a product request in the form of a brief description, and your mission will be to imagine a new product design that meets this need.
The deliverable (generated response) will be exclusively a text prompt for the FLUX.1-schnell text-to-image AI.
This prompt should include a visual description of the object explicitly mentioning the essential aspects of its function.
Additionally, you should explicitly mention in this prompt the aesthetic/photo characteristics of the image rendering (e.g., photorealistic, high quality, focal length, grain, etc.), knowing that the image will be the main image of this object in the product catalog. The background of the generated image must be entirely white.
The prompt should be without narration, can be long but must not exceed 77 tokens."""
# Default Flux parameters
DEFAULT_SEED = 42
DEFAULT_RANDOMIZE_SEED = True
DEFAULT_WIDTH = 512
DEFAULT_HEIGHT = 512
DEFAULT_NUM_INFERENCE_STEPS = 6
DEFAULT_GUIDANCE_SCALE = 0.0
DEFAULT_TEMPERATURE = 0.9
TMP_DIR = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'tmp')
os.makedirs(TMP_DIR, exist_ok=True)
_text_gen_pipeline = None
_image_gen_pipeline = None
_trellis_pipeline = None
def start_session(req: gr.Request):
"""Create a temporary directory for the user session"""
try:
user_dir = os.path.join(TMP_DIR, str(req.session_hash))
os.makedirs(user_dir, exist_ok=True)
print(f"Session started: {req.session_hash}")
except Exception as e:
print(f"Error starting session: {str(e)}")
def end_session(req: gr.Request):
"""Clean up the temporary directory when the session ends"""
try:
user_dir = os.path.join(TMP_DIR, str(req.session_hash))
if os.path.exists(user_dir):
shutil.rmtree(user_dir)
print(f"Session ended: {req.session_hash}")
except Exception as e:
print(f"Error ending session: {str(e)}")
@spaces.GPU()
def get_image_gen_pipeline():
global _image_gen_pipeline
if (_image_gen_pipeline is None):
try:
device = "cuda" if torch.cuda.is_available() else "cpu"
dtype = torch.bfloat16
_image_gen_pipeline = DiffusionPipeline.from_pretrained(
"black-forest-labs/FLUX.1-schnell",
torch_dtype=dtype,
).to(device)
# Comment these out for now to match the working example
# _image_gen_pipeline.enable_model_cpu_offload()
# _image_gen_pipeline.enable_vae_slicing()
except Exception as e:
print(f"Error loading image generation model: {e}")
return None
return _image_gen_pipeline
@spaces.GPU()
def get_text_gen_pipeline():
global _text_gen_pipeline
if (_text_gen_pipeline is None):
try:
device = "cuda" if torch.cuda.is_available() else "cpu"
tokenizer = AutoTokenizer.from_pretrained(
"mistralai/Mistral-7B-Instruct-v0.3",
use_fast=True
)
tokenizer.pad_token = tokenizer.pad_token or tokenizer.eos_token
_text_gen_pipeline = pipeline(
"text-generation",
model="mistralai/Mistral-7B-Instruct-v0.3",
tokenizer=tokenizer,
max_new_tokens=2048,
device=device,
pad_token_id=tokenizer.pad_token_id
)
except Exception as e:
print(f"Error loading text generation model: {e}")
return None
return _text_gen_pipeline
@spaces.GPU()
def get_trellis_pipeline():
global _trellis_pipeline
if _trellis_pipeline is None:
try:
print("Loading Trellis pipeline...")
_trellis_pipeline = TrellisImageTo3DPipeline.from_pretrained("JeffreyXiang/TRELLIS-image-large")
_trellis_pipeline.cuda()
# Preload rembg by processing a small test image
try:
_trellis_pipeline.preprocess_image(Image.fromarray(np.zeros((512, 512, 3), dtype=np.uint8)))
except Exception as e:
print(f"Warning when preloading rembg: {e}")
except Exception as e:
print(f"Error loading Trellis pipeline: {e}")
return None
return _trellis_pipeline
@spaces.GPU()
def refine_prompt(prompt, system_prompt=DEFAULT_SYSTEM_PROMPT, progress=gr.Progress()):
text_gen = get_text_gen_pipeline()
if text_gen is None:
return "", "Text generation model is unavailable."
try:
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt},
]
# Indicate progress started
progress(0, desc="Generating text")
# Generate text
refined_prompt = text_gen(messages)
# Indicate progress complete
progress(1)
# Extract just the assistant's content from the response
try:
messages = refined_prompt[0]['generated_text']
# Find the last message with role 'assistant'
assistant_messages = [msg for msg in messages if msg['role'] == 'assistant']
if not assistant_messages:
return "", "Error: No assistant response found"
assistant_content = assistant_messages[-1]['content']
# Remove quotation marks at the beginning and end
if assistant_content.startswith('"') and assistant_content.endswith('"'):
assistant_content = assistant_content[1:-1]
return assistant_content, "Prompt refined successfully!"
except (KeyError, IndexError):
return "", "Error: Unexpected response format from the model"
except Exception as e:
print(f"Error in refine_prompt: {str(e)}") # Add debug print
return "", f"Error refining prompt: {str(e)}"
def validate_dimensions(width, height):
if width * height > MAX_IMAGE_SIZE * MAX_IMAGE_SIZE:
return False, "Image dimensions too large"
return True, None
@spaces.GPU()
def infer(prompt, seed=DEFAULT_SEED,
randomize_seed=DEFAULT_RANDOMIZE_SEED,
width=DEFAULT_WIDTH,
height=DEFAULT_HEIGHT,
num_inference_steps=DEFAULT_NUM_INFERENCE_STEPS,
progress=gr.Progress(track_tqdm=True)):
try:
# Validate that prompt is not empty
if not prompt or prompt.strip() == "":
return None, "Please provide a valid prompt."
progress(0.1, desc="Loading model")
pipe = get_image_gen_pipeline()
if pipe is None:
return None, "Image generation model is unavailable."
is_valid, error_msg = validate_dimensions(width, height)
if not is_valid:
return None, error_msg
if randomize_seed:
seed = random.randint(0, MAX_SEED)
# Use default torch generator instead of cuda-specific generator
generator = torch.Generator().manual_seed(seed)
progress(0.3, desc="Running inference")
# Match the working example's parameters
output = pipe(
prompt=prompt,
width=width,
height=height,
num_inference_steps=num_inference_steps,
generator=generator,
guidance_scale=DEFAULT_GUIDANCE_SCALE,
)
progress(0.8, desc="Processing output")
image = output.images[0]
progress(1.0, desc="Complete")
return image, f"Image generated successfully with seed {seed}"
except Exception as e:
print(f"Error in infer: {str(e)}")
return None, f"Error generating image: {str(e)}"
examples = [
"a backpack for kids, flower style",
"medieval flip flops",
"cat shaped cake mold",
]
css="""
#col-container {
margin: 0 auto;
max-width: 520px;
}
"""
def preload_models():
print("Preloading models...")
text_success = get_text_gen_pipeline() is not None
image_success = get_image_gen_pipeline() is not None
trellis_success = get_trellis_pipeline() is not None
success = text_success and image_success and trellis_success
status_parts = []
if text_success:
status_parts.append("Mistral ✓")
else:
status_parts.append("Mistral ✗")
if image_success:
status_parts.append("Flux ✓")
else:
status_parts.append("Flux ✗")
if trellis_success:
status_parts.append("Trellis ✓")
else:
status_parts.append("Trellis ✗")
status = f"Models loaded: {', '.join(status_parts)}"
print(status)
return success, status
def pack_state(gs: Gaussian, mesh: MeshExtractResult) -> dict:
return {
'gaussian': {
**gs.init_params,
'_xyz': gs._xyz.cpu().numpy(),
'_features_dc': gs._features_dc.cpu().numpy(),
'_scaling': gs._scaling.cpu().numpy(),
'_rotation': gs._rotation.cpu().numpy(),
'_opacity': gs._opacity.cpu().numpy(),
},
'mesh': {
'vertices': mesh.vertices.cpu().numpy(),
'faces': mesh.faces.cpu().numpy(),
},
}
def unpack_state(state: dict) -> Tuple[Gaussian, edict, str]:
gs = Gaussian(
aabb=state['gaussian']['aabb'],
sh_degree=state['gaussian']['sh_degree'],
mininum_kernel_size=state['gaussian']['mininum_kernel_size'],
scaling_bias=state['gaussian']['scaling_bias'],
opacity_bias=state['gaussian']['opacity_bias'],
scaling_activation=state['gaussian']['scaling_activation'],
)
gs._xyz = torch.tensor(state['gaussian']['_xyz'], device='cuda')
gs._features_dc = torch.tensor(state['gaussian']['_features_dc'], device='cuda')
gs._scaling = torch.tensor(state['gaussian']['_scaling'], device='cuda')
gs._rotation = torch.tensor(state['gaussian']['_rotation'], device='cuda')
gs._opacity = torch.tensor(state['gaussian']['_opacity'], device='cuda')
mesh = edict(
vertices=torch.tensor(state['mesh']['vertices'], device='cuda'),
faces=torch.tensor(state['mesh']['faces'], device='cuda'),
)
return gs, mesh
@spaces.GPU
def image_to_3d(
image: Image.Image,
seed: int,
ss_guidance_strength: float,
ss_sampling_steps: int,
slat_guidance_strength: float,
slat_sampling_steps: int,
req: gr.Request,
) -> Tuple[dict, str]:
try:
user_dir = os.path.join(TMP_DIR, str(req.session_hash))
# Get the pipeline using the getter function
pipeline = get_trellis_pipeline()
if pipeline is None:
return None, "Trellis pipeline is unavailable."
outputs = pipeline.run(
image,
seed=seed,
formats=["gaussian", "mesh"],
preprocess_image=False,
sparse_structure_sampler_params={
"steps": ss_sampling_steps,
"cfg_strength": ss_guidance_strength,
},
slat_sampler_params={
"steps": slat_sampling_steps,
"cfg_strength": slat_guidance_strength,
},
)
video = render_utils.render_video(outputs['gaussian'][0], num_frames=120)['color']
video_geo = render_utils.render_video(outputs['mesh'][0], num_frames=120)['normal']
video = [np.concatenate([video[i], video_geo[i]], axis=1) for i in range(len(video))]
video_path = os.path.join(user_dir, 'sample.mp4')
imageio.mimsave(video_path, video, fps=15)
state = pack_state(outputs['gaussian'][0], outputs['mesh'][0])
torch.cuda.empty_cache()
return state, video_path
except Exception as e:
print(f"Error in image_to_3d: {str(e)}")
return None, f"Error generating 3D model: {str(e)}"
@spaces.GPU(duration=90)
def extract_glb(
state: dict,
mesh_simplify: float,
texture_size: int,
req: gr.Request,
) -> Tuple[str, str]:
"""
Extract a GLB file from the 3D model.
Args:
state (dict): The state of the generated 3D model.
mesh_simplify (float): The mesh simplification factor.
texture_size (int): The texture resolution.
Returns:
str: The path to the extracted GLB file.
"""
user_dir = os.path.join(TMP_DIR, str(req.session_hash))
gs, mesh = unpack_state(state)
glb = postprocessing_utils.to_glb(gs, mesh, simplify=mesh_simplify, texture_size=texture_size, verbose=False)
glb_path = os.path.join(user_dir, 'sample.glb')
glb.export(glb_path)
torch.cuda.empty_cache()
return glb_path, glb_path
@spaces.GPU
def extract_gaussian(state: dict, req: gr.Request) -> Tuple[str, str]:
"""
Extract a Gaussian file from the 3D model.
Args:
state (dict): The state of the generated 3D model.
Returns:
str: The path to the extracted Gaussian file.
"""
user_dir = os.path.join(TMP_DIR, str(req.session_hash))
gs, _ = unpack_state(state)
gaussian_path = os.path.join(user_dir, 'sample.ply')
gs.save_ply(gaussian_path)
torch.cuda.empty_cache()
return gaussian_path, gaussian_path
# Create a combined function that handles the whole pipeline from example to image
# This version gets the parameters from the UI components
@spaces.GPU()
def process_example_pipeline(example_prompt, system_prompt=DEFAULT_SYSTEM_PROMPT, progress=gr.Progress()):
# Step 1: Update status
progress(0, desc="Starting example processing")
# Step 2: Refine the prompt
progress(0.1, desc="Refining prompt with Mistral")
refined, status = refine_prompt(example_prompt, system_prompt, progress)
if not refined:
return "", "Failed to refine prompt: " + status
# Return only the refined prompt and status - don't generate image
return refined, "Prompt refined successfully!"
def create_interface():
# Preload models if needed
if PRELOAD_MODELS:
model_success, model_status_details = preload_models()
model_status = f"✅ {model_status_details}" if model_success else f"⚠️ {model_status_details}"
else:
model_status = "ℹ️ Models will be loaded on demand"
with gr.Blocks(css=css) as demo:
# Set up session management - COMMENT THESE OUT FOR TESTING
# demo.load(start_session)
# demo.unload(end_session)
gr.Info(model_status)
# State for storing 3D model data
output_state = gr.State(None)
with gr.Column(elem_id="col-container"):
gr.Markdown("# Text to Product\nUsing Mistral-7B + FLUX.1-dev + Trellis")
prompt = gr.Text(
show_label=False,
max_lines=1,
placeholder="Enter basic object prompt",
container=False,
)
prompt_button = gr.Button("Refine prompt with Mistral")
refined_prompt = gr.Text(
show_label=False,
max_lines=10,
placeholder="Detailed object prompt",
container=False,
max_length=2048,
)
visual_button = gr.Button("Create visual with Flux")
generated_image = gr.Image(show_label=False)
gen3d_button = gr.Button("Create 3D visual with Trellis")
video_output = gr.Video(label="Generated 3D Asset", autoplay=True, loop=True, height=300)
model_output = LitModel3D(label="Extracted GLB/Gaussian", exposure=10.0, height=300)
with gr.Row():
download_glb = gr.DownloadButton(label="Download GLB", interactive=False)
download_gs = gr.DownloadButton(label="Download Gaussian", interactive=False)
message_box = gr.Textbox(
label="Status Messages",
interactive=False,
placeholder="Status messages will appear here",
)
# Accordion sections for advanced settings
with gr.Accordion("Advanced Settings", open=False):
with gr.Tab("Mistral"):
# Mistral settings
temperature = gr.Slider(
label="Temperature",
value=DEFAULT_TEMPERATURE,
minimum=0.0,
maximum=1.0,
step=0.05,
info="Higher values produce more diverse outputs",
)
system_prompt = gr.Textbox(
label="System Prompt",
value=DEFAULT_SYSTEM_PROMPT,
lines=10,
info="Instructions for the Mistral model"
)
with gr.Tab("Flux"):
# Flux settings
flux_seed = gr.Slider(label="Seed", minimum=0, maximum=MAX_SEED, step=1, value=DEFAULT_SEED)
flux_randomize_seed = gr.Checkbox(label="Randomize seed", value=DEFAULT_RANDOMIZE_SEED)
with gr.Row():
width = gr.Slider(label="Width", minimum=256, maximum=MAX_IMAGE_SIZE, step=32, value=DEFAULT_WIDTH)
height = gr.Slider(label="Height", minimum=256, maximum=MAX_IMAGE_SIZE, step=32, value=DEFAULT_HEIGHT)
num_inference_steps = gr.Slider(
label="Number of inference steps",
minimum=1,
maximum=50,
step=1,
value=DEFAULT_NUM_INFERENCE_STEPS,
)
with gr.Tab("3D Generation Settings"):
trellis_seed = gr.Slider(0, MAX_SEED, label="Seed", value=0, step=1)
trellis_randomize_seed = gr.Checkbox(label="Randomize Seed", value=True)
gr.Markdown("Stage 1: Sparse Structure Generation")
with gr.Row():
ss_guidance_strength = gr.Slider(0.0, 10.0, label="Guidance Strength", value=7.5, step=0.1)
ss_sampling_steps = gr.Slider(1, 50, label="Sampling Steps", value=12, step=1)
gr.Markdown("Stage 2: Structured Latent Generation")
with gr.Row():
slat_guidance_strength = gr.Slider(0.0, 10.0, label="Guidance Strength", value=3.0, step=0.1)
slat_sampling_steps = gr.Slider(1, 50, label="Sampling Steps", value=12, step=1)
with gr.Tab("GLB Extraction Settings"):
mesh_simplify = gr.Slider(0.9, 0.98, label="Simplify", value=0.95, step=0.01)
texture_size = gr.Slider(512, 2048, label="Texture Size", value=1024, step=512)
with gr.Row():
extract_glb_btn = gr.Button("Extract GLB", interactive=False)
extract_gs_btn = gr.Button("Extract Gaussian", interactive=False)
gr.Markdown("""
*NOTE: Gaussian file can be very large (~50MB), it will take a while to display and download.*
""")
output_buf = gr.State()
# Examples section - simplified version that only updates the prompt fields
gr.Examples(
examples=examples,
fn=process_example_pipeline,
inputs=[prompt],
outputs=[refined_prompt, message_box],
cache_examples=True,
)
# Event handlers - Fixed to use the renamed components
gr.on(
triggers=[prompt_button.click, prompt.submit],
fn=refine_prompt,
inputs=[prompt, system_prompt],
outputs=[refined_prompt, message_box]
)
gr.on(
triggers=[visual_button.click],
fn=infer,
inputs=[refined_prompt, flux_seed, flux_randomize_seed, width, height, num_inference_steps],
outputs=[generated_image, message_box]
)
gr.on(
triggers=[gen3d_button.click],
fn=image_to_3d,
inputs=[generated_image, trellis_seed, ss_guidance_strength, ss_sampling_steps, slat_guidance_strength, slat_sampling_steps],
outputs=[output_state, video_output],
).then(
# Update button states after successful 3D generation
lambda: (gr.Button.update(interactive=True), gr.Button.update(interactive=True), "3D model generated successfully"),
outputs=[extract_glb_btn, extract_gs_btn, message_box]
)
# Add handlers for GLB and Gaussian extraction
gr.on(
triggers=[extract_glb_btn.click],
fn=extract_glb,
inputs=[output_state, mesh_simplify, texture_size],
outputs=[model_output, download_glb]
).then(
lambda path: (gr.DownloadButton.update(interactive=True, value=path), "GLB extraction completed"),
inputs=[model_output],
outputs=[download_glb, message_box]
)
gr.on(
triggers=[extract_gs_btn.click],
fn=extract_gaussian,
inputs=[output_state],
outputs=[model_output, download_gs]
).then(
lambda path: (gr.DownloadButton.update(interactive=True, value=path), "Gaussian extraction completed"),
inputs=[model_output],
outputs=[download_gs, message_box]
)
return demo
if __name__ == "__main__":
# Initialize models if PRELOAD_MODELS is True
if PRELOAD_MODELS:
success, status = preload_models()
print(status)
demo = create_interface()
demo.launch()