import gradio as gr from gradio_client import Client, handle_file from google import genai import os from typing import Optional, List from huggingface_hub import whoami from PIL import Image from io import BytesIO import tempfile import ffmpeg # --- Google Gemini API Configuration --- GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY", "") if not GOOGLE_API_KEY: raise ValueError("GOOGLE_API_KEY environment variable not set.") client = genai.Client(api_key=os.environ.get("GOOGLE_API_KEY")) GEMINI_MODEL_NAME = 'gemini-2.5-flash-image-preview' def verify_pro_status(token: Optional[gr.OAuthToken]) -> bool: """Verifies if the user is a Hugging Face PRO user or part of an enterprise org.""" if not token: return False try: user_info = whoami(token=token.token) return user_info.get("isPro", False) or any(org.get("isEnterprise", False) for org in user_info.get("orgs", [])) except Exception as e: print(f"Could not verify user's PRO/Enterprise status: {e}") return False def _extract_image_data_from_response(response) -> Optional[bytes]: """Helper to extract image data from the model's response.""" if hasattr(response, 'candidates') and response.candidates: for part in response.candidates[0].content.parts: if hasattr(part, 'inline_data') and hasattr(part.inline_data, 'data'): return part.inline_data.data return None def _get_framerate(video_path: str) -> float: """Instantly gets the framerate of a video using ffprobe.""" probe = ffmpeg.probe(video_path) video_stream = next((stream for stream in probe['streams'] if stream['codec_type'] == 'video'), None) if video_stream is None: raise ValueError("Could not find video stream in the file.") return eval(video_stream['avg_frame_rate']) def _trim_first_frame_fast(video_path: str) -> str: """ Removes exactly the first frame of a video without re-encoding. This is the frame-accurate and fast method. """ gr.Info("Preparing video segment...") with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_output_file: output_path = tmp_output_file.name try: framerate = _get_framerate(video_path) if framerate == 0: raise ValueError("Framerate cannot be zero.") start_time = 1 / framerate # The key is placing -ss AFTER -i for accuracy, combined with -c copy for speed. ( ffmpeg .input(video_path, ss=start_time) .output(output_path, c='copy', avoid_negative_ts='make_zero') .run(overwrite_output=True, quiet=True) ) return output_path except Exception as e: raise RuntimeError(f"FFmpeg trim error: {e}") def _combine_videos_simple(video1_path: str, video2_path: str) -> str: """ Combines two videos using the fast concat demuxer. Assumes video2 is already trimmed. """ gr.Info("Stitching videos...") with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix=".txt") as tmp_list_file: tmp_list_file.write(f"file '{os.path.abspath(video1_path)}'\n") tmp_list_file.write(f"file '{os.path.abspath(video2_path)}'\n") list_file_path = tmp_list_file.name with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_output_file: output_path = tmp_output_file.name try: ( ffmpeg .input(list_file_path, format='concat', safe=0) .output(output_path, c='copy') .run(overwrite_output=True, quiet=True) ) return output_path except ffmpeg.Error as e: raise RuntimeError(f"FFmpeg combine error: {e.stderr.decode()}") finally: if os.path.exists(list_file_path): os.remove(list_file_path) def _generate_video_segment(input_image_path: str, output_image_path: str, prompt: str, token: str) -> str: """Generates a single video segment using the external service.""" gr.Info("Generating new video segment...") video_client = Client("multimodalart/wan-2-2-first-last-frame", hf_token=token) result = video_client.predict( start_image_pil=handle_file(input_image_path), end_image_pil=handle_file(output_image_path), prompt=prompt, api_name="/generate_video" ) return result[0]["video"] def unified_image_generator(prompt: str, images: Optional[List[str]], previous_video_path: Optional[str], oauth_token: Optional[gr.OAuthToken]) -> tuple: """ Handles image generation and determines the visibility of video creation buttons. """ if not verify_pro_status(oauth_token): raise gr.Error("Access Denied.") try: contents = [Image.open(image_path[0]) for image_path in images] if images else [] contents.append(prompt) response = client.models.generate_content(model=GEMINI_MODEL_NAME, contents=contents) image_data = _extract_image_data_from_response(response) if not image_data: raise ValueError("No image data in response.") with tempfile.NamedTemporaryFile(delete=False, suffix=".png") as tmp: Image.open(BytesIO(image_data)).save(tmp.name) output_path = tmp.name can_create_video = bool(images and len(images) == 1) can_extend_video = can_create_video and bool(previous_video_path) return ( output_path, gr.update(visible=can_create_video), gr.update(visible=can_extend_video), gr.update(visible=False) ) except Exception as e: raise gr.Error(f"Image generation failed: {e}") def create_new_video(input_image_gallery: List[str], prompt_input: str, output_image: str, oauth_token: Optional[gr.OAuthToken]) -> tuple: """Starts a NEW video chain, overwriting any previous video state.""" if not verify_pro_status(oauth_token): raise gr.Error("Access Denied.") if not input_image_gallery or not output_image: raise gr.Error("Input/output images required.") try: new_segment_path = _generate_video_segment(input_image_gallery[0][0], output_image, prompt_input, oauth_token.token) return new_segment_path, new_segment_path except Exception as e: raise gr.Error(f"Video creation failed: {e}") def extend_existing_video(input_image_gallery: List[str], prompt_input: str, output_image: str, previous_video_path: str, oauth_token: Optional[gr.OAuthToken]) -> tuple: """Extends an existing video with a new segment.""" if not verify_pro_status(oauth_token): raise gr.Error("Access Denied.") if not previous_video_path: raise gr.Error("No previous video to extend.") if not input_image_gallery or not output_image: raise gr.Error("Input/output images required.") try: new_segment_path = _generate_video_segment(input_image_gallery[0][0], output_image, prompt_input, oauth_token.token) trimmed_segment_path = _trim_first_frame_fast(new_segment_path) final_video_path = _combine_videos_simple(previous_video_path, trimmed_segment_path) return final_video_path, final_video_path except Exception as e: raise gr.Error(f"Video extension failed: {e}") css = ''' #sub_title{margin-top: -35px !important} .tab-wrapper{margin-bottom: -33px !important} .tabitem{padding: 0px !important} .fillable{max-width: 980px !important} .dark .progress-text {color: white} .logo-dark{display: none} .dark .logo-dark{display: block !important} .dark .logo-light{display: none} .grid-container img{object-fit: contain} .grid-container {display: grid;grid-template-columns: repeat(2, 1fr)} .grid-container:has(> .gallery-item:only-child) {grid-template-columns: 1fr} #wan_ad p{text-align: center;padding: .5em} ''' with gr.Blocks(theme=gr.themes.Citrus(), css=css) as demo: gr.HTML(''' ''') gr.HTML("

Hugging Face PRO users can use Google's Nano Banana (Gemini 2.5 Flash Image Preview) on this Space. Subscribe to PRO

", elem_id="sub_title") pro_message = gr.Markdown(visible=False) main_interface = gr.Column(visible=False) previous_video_state = gr.State(None) with main_interface: with gr.Row(): with gr.Column(scale=1): image_input_gallery = gr.Gallery(label="Upload one or more images here. Leave empty for text-to-image", file_types=["image"], height="auto") prompt_input = gr.Textbox(label="Prompt", placeholder="Turns this photo into a masterpiece") generate_button = gr.Button("Generate", variant="primary") with gr.Column(scale=1): output_image = gr.Image(label="Output", interactive=False, elem_id="output", type="filepath") use_image_button = gr.Button("♻️ Use this Image for Next Edit", variant="primary") with gr.Row(): create_video_button = gr.Button("Create video between the two images 🎥", variant="secondary", visible=False) extend_video_button = gr.Button("Extend previous video with new scene 🎞️", variant="secondary", visible=False) with gr.Group(visible=False) as video_group: video_output = gr.Video(label="Generated Video", show_download_button=True, autoplay=True) gr.Markdown("Generate more with [Wan 2.2 first-last-frame](https://huggingface.co/spaces/multimodalart/wan-2-2-first-last-frame)", elem_id="wan_ad") gr.Markdown("## Thank you for being a PRO! 🤗") login_button = gr.LoginButton() gr.on( triggers=[generate_button.click, prompt_input.submit], fn=unified_image_generator, inputs=[prompt_input, image_input_gallery, previous_video_state], outputs=[output_image, create_video_button, extend_video_button, video_group] ) use_image_button.click( fn=lambda img: ( [img] if img else None, None, gr.update(visible=False), gr.update(visible=False), gr.update(visible=False) ), inputs=[output_image], outputs=[image_input_gallery, output_image, create_video_button, extend_video_button, video_group] ) create_video_button.click( fn=lambda: gr.update(visible=True), outputs=[video_group] ).then( fn=create_new_video, inputs=[image_input_gallery, prompt_input, output_image], outputs=[video_output, previous_video_state], ) extend_video_button.click( fn=lambda: gr.update(visible=True), outputs=[video_group] ).then( fn=extend_existing_video, inputs=[image_input_gallery, prompt_input, output_image, previous_video_state], outputs=[video_output, previous_video_state], ) def control_access(profile: Optional[gr.OAuthProfile] = None, oauth_token: Optional[gr.OAuthToken] = None): if not profile: return gr.update(visible=False), gr.update(visible=False) if verify_pro_status(oauth_token): return gr.update(visible=True), gr.update(visible=False) else: message = ( "## ✨ Exclusive Access for PRO Users\n\n" "Thank you for your interest! This app is available exclusively for our Hugging Face **PRO** members.\n\n" "To unlock this and many other cool stuff, please consider upgrading your account.\n\n" "### [**Become a PRO Today!**](http://huggingface.co/subscribe/pro?source=nana_banana)" ) return gr.update(visible=False), gr.update(visible=True, value=message) demo.load(control_access, inputs=None, outputs=[main_interface, pro_message]) if __name__ == "__main__": demo.queue(max_size=None, default_concurrency_limit=None).launch()