Spaces:
Sleeping
Sleeping
# Copyright 2025 Google LLC. Based on work by Yousif Ahmed. | |
# Concept: ChronoWeave - Branching Narrative Generation | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 | |
import streamlit as st | |
import google.generativeai as genai | |
import os | |
import json | |
import numpy as np | |
from io import BytesIO | |
import time | |
import wave | |
import contextlib | |
import asyncio | |
import uuid # For unique identifiers | |
import shutil # For directory operations | |
import logging # For better logging | |
# Image handling | |
from PIL import Image | |
# Pydantic for data validation | |
from pydantic import BaseModel, Field, ValidationError, validator | |
from typing import List, Optional, Literal | |
# Video and audio processing | |
from moviepy.editor import ImageClip, AudioFileClip, concatenate_videoclips | |
# from moviepy.config import change_settings # Potential for setting imagemagick path if needed | |
# Type hints | |
import typing_extensions as typing | |
# Async support for Streamlit/Google API | |
import nest_asyncio | |
nest_asyncio.apply() # Apply patch for asyncio in environments like Streamlit/Jupyter | |
# --- Logging Setup --- | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# --- Configuration --- | |
st.set_page_config(page_title="ChronoWeave", layout="wide", initial_sidebar_state="expanded") | |
st.title("π ChronoWeave: Advanced Branching Narrative Generator") | |
st.markdown(""" | |
Generate multiple, branching story timelines from a single theme using AI, complete with images and narration. | |
*Based on the work of Yousif Ahmed. Copyright 2025 Google LLC.* | |
""") | |
# --- Constants --- | |
# Text/JSON Model | |
TEXT_MODEL_ID = "models/gemini-1.5-flash" # Or "gemini-1.5-pro" for potentially higher quality/cost | |
# Audio Model Config | |
AUDIO_API_VERSION = 'v1alpha' # Required for audio modality (though endpoint set implicitly now) | |
AUDIO_MODEL_ID = f"models/gemini-1.5-flash" # Model used for audio tasks | |
AUDIO_SAMPLING_RATE = 24000 # Standard for TTS models like Google's | |
# Image Model Config | |
IMAGE_MODEL_ID = "imagen-3" # Or specific version like "imagen-3.0-generate-002" | |
DEFAULT_ASPECT_RATIO = "1:1" | |
# Video Config | |
VIDEO_FPS = 24 | |
VIDEO_CODEC = "libx264" # Widely compatible H.264 | |
AUDIO_CODEC = "aac" # Common audio codec for MP4 | |
# File Management | |
TEMP_DIR_BASE = ".chrono_temp" # Base name for temporary directories | |
# --- API Key Handling --- | |
GOOGLE_API_KEY = None | |
try: | |
# Preferred way: Use Streamlit secrets when deployed | |
GOOGLE_API_KEY = st.secrets["GOOGLE_API_KEY"] | |
logger.info("Google API Key loaded from Streamlit secrets.") | |
except KeyError: | |
# Fallback: Check environment variable (useful for local development) | |
GOOGLE_API_KEY = os.environ.get('GOOGLE_API_KEY') | |
if GOOGLE_API_KEY: | |
logger.info("Google API Key loaded from environment variable.") | |
else: | |
# Error if neither is found | |
st.error( | |
"π¨ **Google API Key Not Found!**\n" | |
"Please configure your Google API Key:\n" | |
"1. **Streamlit Cloud/Hugging Face Spaces:** Add it as a Secret named `GOOGLE_API_KEY` in your app's settings.\n" | |
"2. **Local Development:** Set the `GOOGLE_API_KEY` environment variable or create a `.streamlit/secrets.toml` file.", | |
icon="π¨" | |
) | |
st.stop() # Halt execution | |
# --- Initialize Google Clients --- | |
# CORRECTED SECTION: Uses genai.GenerativeModel for both models | |
try: | |
# Configure globally | |
genai.configure(api_key=GOOGLE_API_KEY) | |
logger.info("Configured google-generativeai with API key.") | |
# Model/Client Handle for Text/Imagen Generation | |
client_standard = genai.GenerativeModel(TEXT_MODEL_ID) | |
logger.info(f"Initialized standard GenerativeModel for {TEXT_MODEL_ID}.") | |
# Model Handle for Audio Generation | |
# Use the standard GenerativeModel initialization. | |
# The necessary methods (like .connect) are part of this object. | |
live_model = genai.GenerativeModel(AUDIO_MODEL_ID) # Use GenerativeModel here | |
logger.info(f"Initialized GenerativeModel handle for audio ({AUDIO_MODEL_ID}).") | |
# We no longer use or need 'client_live' or explicit endpoint setting here. | |
# The audio config is handled within the generate_audio_live_async function. | |
except AttributeError as ae: | |
# Keep this specific error catch just in case library structure is very old/unexpected | |
logger.exception("AttributeError during Google AI Client Initialization.") | |
st.error(f"π¨ Failed to initialize Google AI Clients due to an unexpected library structure error: {ae}. Please ensure 'google-generativeai' is up-to-date.", icon="π¨") | |
st.stop() | |
except Exception as e: | |
logger.exception("Failed to initialize Google AI Clients.") | |
st.error(f"π¨ Failed to initialize Google AI Clients: {e}", icon="π¨") | |
st.stop() | |
# --- Define Pydantic Schemas for Robust Validation --- | |
class StorySegment(BaseModel): | |
scene_id: int = Field(..., ge=0, description="Scene number within the timeline, starting from 0.") | |
image_prompt: str = Field(..., min_length=10, max_length=150, description="Concise visual description for image generation (15-35 words). Focus on non-human characters, setting, action, style.") | |
audio_text: str = Field(..., min_length=5, max_length=150, description="Single sentence of narration/dialogue for the scene (max 30 words).") | |
character_description: str = Field(..., max_length=100, description="Brief description of key non-human characters/objects in *this* scene's prompt for consistency.") | |
timeline_visual_modifier: Optional[str] = Field(None, max_length=50, description="Optional subtle visual style hint (e.g., 'slightly darker', 'more vibrant colors').") | |
def image_prompt_no_humans(cls, v): | |
if any(word in v.lower() for word in ["person", "people", "human", "man", "woman", "boy", "girl", "child"]): | |
logger.warning(f"Image prompt '{v[:50]}...' may contain human descriptions. Relying on API-level controls & prompt instructions.") | |
return v | |
class Timeline(BaseModel): | |
timeline_id: int = Field(..., ge=0, description="Unique identifier for this timeline.") | |
divergence_reason: str = Field(..., min_length=5, description="Clear reason why this timeline branched off.") | |
segments: List[StorySegment] = Field(..., min_items=1, description="List of scenes composing this timeline.") | |
class ChronoWeaveResponse(BaseModel): | |
core_theme: str = Field(..., min_length=5, description="The central theme provided by the user.") | |
timelines: List[Timeline] = Field(..., min_items=1, description="List of generated timelines.") | |
total_scenes_per_timeline: int = Field(..., gt=0, description="The requested number of scenes per timeline.") | |
def check_timeline_segment_count(cls, timelines, values): | |
if 'total_scenes_per_timeline' in values: | |
expected_scenes = values['total_scenes_per_timeline'] | |
for i, timeline in enumerate(timelines): | |
if len(timeline.segments) != expected_scenes: | |
raise ValueError(f"Timeline {i} (ID: {timeline.timeline_id}) has {len(timeline.segments)} segments, but expected {expected_scenes}.") | |
return timelines | |
# --- Helper Functions --- | |
def wave_file_writer(filename: str, channels: int = 1, rate: int = AUDIO_SAMPLING_RATE, sample_width: int = 2): | |
"""Context manager to safely write WAV files.""" | |
wf = None | |
try: | |
wf = wave.open(filename, "wb") | |
wf.setnchannels(channels) | |
wf.setsampwidth(sample_width) # 2 bytes for 16-bit audio | |
wf.setframerate(rate) | |
yield wf | |
except Exception as e: | |
logger.error(f"Error opening/configuring wave file {filename}: {e}") | |
raise # Re-raise the exception | |
finally: | |
if wf: | |
try: | |
wf.close() | |
except Exception as e_close: | |
logger.error(f"Error closing wave file {filename}: {e_close}") | |
async def generate_audio_live_async(api_text: str, output_filename: str, voice: Optional[str] = None) -> Optional[str]: | |
""" | |
Generates audio using Gemini Live API (async version) via the GenerativeModel. | |
Returns the path to the generated audio file or None on failure. | |
""" | |
collected_audio = bytearray() | |
task_id = os.path.basename(output_filename).split('.')[0] # Extract T#_S# for logging | |
logger.info(f"ποΈ [{task_id}] Requesting audio for: '{api_text[:60]}...'") | |
try: | |
# Use the 'live_model' (a GenerativeModel instance) initialized earlier. | |
config = { | |
"response_modalities": ["AUDIO"], | |
"audio_config": { | |
"audio_encoding": "LINEAR16", # Required format for WAV output | |
"sample_rate_hertz": AUDIO_SAMPLING_RATE, | |
# "voice": voice if voice else "aura-asteria-en" # Optional: Specify voice if needed and available | |
} | |
} | |
# Prepend directive to discourage conversational filler | |
directive_prompt = ( | |
"Narrate the following sentence directly and engagingly. " | |
"Do not add any introductory or concluding remarks like 'Okay', 'Sure', or 'Here is the narration'. " | |
"Speak only the sentence itself:\n\n" | |
f'"{api_text}"' | |
) | |
# Connect and stream using the GenerativeModel instance | |
async with live_model.connect(config=config) as session: | |
await session.send_request([directive_prompt]) | |
async for response in session.stream_content(): | |
if response.audio_chunk and response.audio_chunk.data: | |
collected_audio.extend(response.audio_chunk.data) | |
# Handle potential errors within the stream if the API provides them | |
if hasattr(response, 'error') and response.error: | |
logger.error(f" β [{task_id}] Error during audio stream: {response.error}") | |
st.error(f"Audio stream error for scene {task_id}: {response.error}", icon="π") | |
return None # Stop processing this audio request | |
if not collected_audio: | |
logger.warning(f"β οΈ [{task_id}] No audio data received for: '{api_text[:60]}...'") | |
st.warning(f"No audio data generated for scene {task_id}.", icon="π") | |
return None | |
# Write the collected audio bytes into a WAV file using the context manager. | |
with wave_file_writer(output_filename, rate=AUDIO_SAMPLING_RATE) as wf: | |
wf.writeframes(bytes(collected_audio)) | |
logger.info(f" β [{task_id}] Audio saved: {os.path.basename(output_filename)} ({len(collected_audio)} bytes)") | |
return output_filename | |
except genai.types.generation_types.BlockedPromptException as bpe: | |
logger.error(f" β [{task_id}] Audio generation blocked for prompt '{api_text[:60]}...': {bpe}") | |
st.error(f"Audio generation blocked for scene {task_id} due to safety settings.", icon="π") | |
return None | |
except Exception as e: | |
# Catch other potential errors during connect/send/stream | |
logger.exception(f" β [{task_id}] Audio generation failed unexpectedly for '{api_text[:60]}...': {e}") | |
st.error(f"Audio generation failed for scene {task_id}: {e}", icon="π") | |
return None | |
def generate_story_sequence_chrono( | |
theme: str, | |
num_scenes: int, | |
num_timelines: int, | |
divergence_prompt: str = "" | |
) -> Optional[ChronoWeaveResponse]: | |
""" | |
Generates branching story sequences using Gemini structured output and validates with Pydantic. | |
Returns a validated Pydantic object or None on failure. | |
""" | |
st.info(f"π Generating {num_timelines} timeline(s) x {num_scenes} scenes for theme: '{theme}'...") | |
logger.info(f"Requesting story structure: Theme='{theme}', Timelines={num_timelines}, Scenes={num_scenes}") | |
divergence_instruction = ( | |
f"Introduce clear points of divergence between timelines, starting potentially after the first scene. " | |
f"If provided, use this hint for divergence: '{divergence_prompt}'. " | |
f"Clearly state the divergence reason for each timeline (except potentially the first)." | |
) | |
prompt = f""" | |
Act as an expert narrative designer specializing in short, visual, branching stories for children. | |
Create a story based on the core theme: "{theme}". | |
**Instructions:** | |
1. Generate exactly **{num_timelines}** distinct timelines. | |
2. Each timeline must contain exactly **{num_scenes}** sequential scenes. | |
3. **Crucially, DO NOT include any humans, people, or humanoid figures** in the descriptions or actions. Focus strictly on animals, fantasy creatures, animated objects, or natural elements. | |
4. {divergence_instruction} | |
5. Maintain a consistent visual style across all scenes and timelines: **'Simple, friendly kids animation style with bright colors and rounded shapes'**, unless a `timeline_visual_modifier` subtly alters it. | |
6. Each scene's narration (`audio_text`) should be a single, concise sentence (approx. 5-10 seconds spoken length, max 30 words). | |
7. Image prompts (`image_prompt`) should be descriptive (15-35 words), focusing on the non-human character(s), setting, action, and visual style. Explicitly mention the main character(s) for consistency. | |
8. `character_description` should briefly describe recurring non-human characters mentioned *in the specific scene's image prompt* (name, key visual features). Keep consistent within a timeline. | |
**Output Format:** | |
Respond ONLY with a valid JSON object adhering strictly to the provided schema. Do not include any text before or after the JSON object. | |
**JSON Schema:** | |
```json | |
{json.dumps(ChronoWeaveResponse.schema(), indent=2)} | |
``` | |
""" # Using .schema() which is the Pydantic v1 way, adjust if using v2 (.model_json_schema()) | |
try: | |
# Use the standard client (GenerativeModel instance) for text generation | |
response = client_standard.generate_content( | |
contents=prompt, | |
generation_config=genai.types.GenerationConfig( | |
response_mime_type="application/json", | |
temperature=0.7 # Add some creativity | |
) | |
) | |
# Debugging: Log raw response | |
# logger.debug(f"Raw Gemini Response Text:\n{response.text}") | |
# Attempt to parse the JSON | |
try: | |
# Use response.text which should contain the JSON string | |
raw_data = json.loads(response.text) | |
except json.JSONDecodeError as json_err: | |
logger.error(f"Failed to decode JSON response: {json_err}") | |
logger.error(f"Problematic Response Text:\n{response.text}") | |
st.error(f"π¨ Failed to parse the story structure from the AI. Error: {json_err}", icon="π") | |
st.text_area("Problematic AI Response:", response.text, height=200) | |
return None | |
except Exception as e: | |
logger.error(f"Error accessing or decoding response text: {e}") | |
st.error(f"π¨ Error processing AI response: {e}", icon="π") | |
# Log the response object itself if possible | |
# logger.debug(f"Response object: {response}") | |
return None | |
# Validate the parsed data using Pydantic | |
try: | |
# Use parse_obj for Pydantic v1, or YourModel.model_validate(raw_data) for v2 | |
validated_data = ChronoWeaveResponse.parse_obj(raw_data) | |
logger.info("β Story structure generated and validated successfully!") | |
st.success("β Story structure generated and validated!") | |
return validated_data | |
except ValidationError as val_err: | |
logger.error(f"JSON structure validation failed: {val_err}") | |
logger.error(f"Received Data:\n{json.dumps(raw_data, indent=2)}") | |
st.error(f"π¨ The generated story structure is invalid: {val_err}", icon="π§¬") | |
st.json(raw_data) # Show the invalid structure | |
return None | |
except genai.types.generation_types.BlockedPromptException as bpe: | |
logger.error(f"Story generation prompt blocked: {bpe}") | |
st.error("π¨ The story generation prompt was blocked, likely due to safety filters. Try rephrasing the theme.", icon="π«") | |
return None | |
except Exception as e: | |
logger.exception("Error during story sequence generation:") | |
st.error(f"π¨ An unexpected error occurred during story generation: {e}", icon="π₯") | |
# Optional: Show the prompt that failed (be mindful of length/PII) | |
# st.text_area("Failed Prompt (excerpt):", prompt[:500]+"...", height=150) | |
return None | |
def generate_image_imagen(prompt: str, aspect_ratio: str = "1:1", task_id: str = "IMG") -> Optional[Image.Image]: | |
""" | |
Generates an image using Imagen via the standard client with specific controls. | |
Returns a PIL Image object or None on failure. | |
""" | |
logger.info(f"πΌοΈ [{task_id}] Requesting image for: '{prompt[:70]}...' (Aspect: {aspect_ratio})") | |
# Refined prompt incorporating negative constraints and style guidance | |
full_prompt = ( | |
f"Generate an image in a child-friendly, simple animation style with bright colors and rounded shapes. " | |
f"Ensure absolutely NO humans or human-like figures are present. Focus on animals or objects. " | |
f"Aspect ratio should be {aspect_ratio}. " # Explicitly state aspect ratio in prompt too | |
f"Prompt: {prompt}" | |
) | |
try: | |
# Use the standard client's generate_content method. | |
response = client_standard.generate_content( | |
full_prompt, | |
generation_config=genai.types.GenerationConfig( | |
candidate_count=1, | |
# Add other config like temperature if desired | |
), | |
# Safety settings can be adjusted here if necessary and permitted | |
# safety_settings={'HARM_CATEGORY_DANGEROUS_CONTENT': 'BLOCK_NONE'} # Use cautiously | |
) | |
# Check for valid response and image data | |
# Accessing image data might depend slightly on the exact API response structure | |
# common pattern is response.candidates[0].content.parts[0].inline_data.data | |
# or directly response.parts if simpler structure | |
image_bytes = None | |
if response.candidates and response.candidates[0].content and response.candidates[0].content.parts: | |
part = response.candidates[0].content.parts[0] | |
if hasattr(part, 'inline_data') and part.inline_data and hasattr(part.inline_data,'data'): | |
image_bytes = part.inline_data.data | |
elif hasattr(part, 'file_data') and part.file_data: # Handle potential file URIs if API changes | |
logger.warning(f" β οΈ [{task_id}] Received file URI instead of inline data. Handling not implemented.") | |
# Potentially download from part.file_data.file_uri here | |
return None # Or implement download | |
if image_bytes: | |
try: | |
image = Image.open(BytesIO(image_bytes)) | |
logger.info(f" β [{task_id}] Image generated successfully.") | |
# Check safety feedback even on success | |
safety_ratings = getattr(response.candidates[0], 'safety_ratings', []) | |
if safety_ratings: | |
filtered_ratings = [f"{r.category.name}: {r.probability.name}" for r in safety_ratings if r.probability.name != 'NEGLIGIBLE'] | |
if filtered_ratings: | |
logger.warning(f" β οΈ [{task_id}] Image generated but flagged by safety filters: {', '.join(filtered_ratings)}.") | |
st.warning(f"Image for scene {task_id} flagged by safety filters: {', '.join(filtered_ratings)}", icon="β οΈ") | |
return image | |
except Exception as img_err: | |
logger.error(f" β [{task_id}] Failed to decode generated image data: {img_err}") | |
st.warning(f"Failed to decode image data for scene {task_id}.", icon="πΌοΈ") | |
return None | |
else: | |
# Check for blocking or other issues | |
block_reason = None | |
prompt_feedback = getattr(response, 'prompt_feedback', None) | |
if prompt_feedback: | |
block_reason = getattr(prompt_feedback, 'block_reason', None) | |
if block_reason: | |
logger.warning(f" β οΈ [{task_id}] Image generation blocked. Reason: {block_reason}. Prompt: '{prompt[:70]}...'") | |
st.warning(f"Image generation blocked for scene {task_id}. Reason: {block_reason}", icon="π«") | |
else: | |
logger.warning(f" β οΈ [{task_id}] No image data received, unknown reason. Prompt: '{prompt[:70]}...'") | |
st.warning(f"No image data received for scene {task_id}, reason unclear.", icon="πΌοΈ") | |
# Log the full response for debugging | |
# logger.debug(f"Full Imagen response object: {response}") | |
return None | |
except genai.types.generation_types.BlockedPromptException as bpe: | |
# This might be caught by the block_reason check above, but good to have explicit catch | |
logger.error(f" β [{task_id}] Image generation blocked (exception): {bpe}") | |
st.error(f"Image generation blocked for scene {task_id} due to safety settings.", icon="π«") | |
return None | |
except Exception as e: | |
logger.exception(f" β [{task_id}] Image generation failed unexpectedly for '{prompt[:70]}...': {e}") | |
st.error(f"Image generation failed for scene {task_id}: {e}", icon="πΌοΈ") | |
return None | |
# --- Streamlit UI Elements --- | |
st.sidebar.header("βοΈ Configuration") | |
# API Key Status | |
if GOOGLE_API_KEY: | |
st.sidebar.success("Google API Key Loaded", icon="β ") | |
else: | |
st.sidebar.error("Google API Key Missing!", icon="π¨") # Should not be reached if st.stop() works | |
# Story Parameters | |
theme = st.sidebar.text_input("π Story Theme:", "A curious squirrel finds a mysterious, glowing acorn") | |
num_scenes = st.sidebar.slider("π¬ Scenes per Timeline:", min_value=2, max_value=7, value=3, help="Number of scenes (image+narration) in each timeline.") | |
num_timelines = st.sidebar.slider("πΏ Number of Timelines:", min_value=1, max_value=4, value=2, help="Number of parallel storylines to generate.") | |
divergence_prompt = st.sidebar.text_input("βοΈ Divergence Hint (Optional):", placeholder="e.g., What if a bird tried to steal it?", help="A suggestion for how the timelines might differ.") | |
# Generation Settings | |
st.sidebar.subheader("π¨ Visual & Audio Settings") | |
aspect_ratio = st.sidebar.selectbox("πΌοΈ Image Aspect Ratio:", ["1:1", "16:9", "9:16"], index=0, help="Aspect ratio for generated images.") | |
# Add audio voice selection if API supports it and voices are known | |
# available_voices = ["aura-asteria-en", "aura-luna-en", "aura-stella-en"] # Example | |
# audio_voice = st.sidebar.selectbox("π£οΈ Narration Voice:", available_voices, index=0) | |
audio_voice = None # Placeholder | |
generate_button = st.sidebar.button("β¨ Generate ChronoWeave β¨", type="primary", disabled=(not GOOGLE_API_KEY), use_container_width=True) | |
st.sidebar.markdown("---") | |
st.sidebar.info("β³ Generation can take several minutes, especially with more scenes or timelines.", icon="β³") | |
st.sidebar.markdown(f"<small>Models: Text={TEXT_MODEL_ID}, Image={IMAGE_MODEL_ID}, Audio={AUDIO_MODEL_ID}</small>", unsafe_allow_html=True) | |
# --- Main Logic --- | |
if generate_button: | |
if not theme: | |
st.error("Please enter a story theme in the sidebar.", icon="π") | |
else: | |
# Create a unique temporary directory for this run | |
run_id = str(uuid.uuid4()).split('-')[0] # Short unique ID | |
temp_dir = os.path.join(TEMP_DIR_BASE, f"run_{run_id}") | |
try: | |
os.makedirs(temp_dir, exist_ok=True) | |
logger.info(f"Created temporary directory: {temp_dir}") | |
except OSError as e: | |
st.error(f"π¨ Failed to create temporary directory {temp_dir}: {e}", icon="π") | |
st.stop() | |
final_video_paths = {} # Stores {timeline_id: video_path} | |
generation_errors = {} # Stores {timeline_id: [error_messages]} | |
# --- 1. Generate Narrative Structure --- | |
chrono_response: Optional[ChronoWeaveResponse] = None | |
with st.spinner("Generating narrative structure... π€"): | |
chrono_response = generate_story_sequence_chrono(theme, num_scenes, num_timelines, divergence_prompt) | |
if chrono_response: | |
st.success(f"Narrative structure received for {len(chrono_response.timelines)} timelines.") | |
logger.info(f"Successfully generated structure for {len(chrono_response.timelines)} timelines.") | |
# --- 2. Process Each Timeline --- | |
overall_start_time = time.time() | |
all_timelines_successful = True # Assume success initially | |
# Use st.status for collapsible progress updates | |
with st.status("Generating assets and composing videos...", expanded=True) as status: | |
for timeline_index, timeline in enumerate(chrono_response.timelines): | |
timeline_id = timeline.timeline_id | |
divergence = timeline.divergence_reason | |
segments = timeline.segments | |
timeline_label = f"Timeline {timeline_id}" # Consistent label | |
st.subheader(f"Processing {timeline_label}: {divergence}") | |
logger.info(f"--- Processing {timeline_label} (Index: {timeline_index}) ---") | |
generation_errors[timeline_id] = [] # Initialize error list | |
temp_image_files = {} # {scene_id: path} | |
temp_audio_files = {} # {scene_id: path} | |
video_clips = [] # List of moviepy clips | |
timeline_start_time = time.time() | |
scene_success_count = 0 | |
for scene_index, segment in enumerate(segments): | |
scene_id = segment.scene_id | |
task_id = f"T{timeline_id}_S{scene_id}" # Unique ID | |
status_message = f"Processing {timeline_label}, Scene {scene_id + 1}/{len(segments)}..." | |
status.update(label=status_message) | |
st.markdown(f"--- **Scene {scene_id + 1} ({task_id})** ---") | |
logger.info(status_message) | |
scene_has_error = False | |
# Log scene details | |
st.write(f" *Image Prompt:* {segment.image_prompt}" + (f" *(Modifier: {segment.timeline_visual_modifier})*" if segment.timeline_visual_modifier else "")) | |
st.write(f" *Audio Text:* {segment.audio_text}") | |
# --- 2a. Image Generation --- | |
generated_image: Optional[Image.Image] = None # Define before spinner | |
with st.spinner(f"[{task_id}] Generating image... π¨"): | |
combined_prompt = f"{segment.image_prompt}. {segment.character_description}" | |
if segment.timeline_visual_modifier: | |
combined_prompt += f" Visual style hint: {segment.timeline_visual_modifier}." | |
generated_image = generate_image_imagen(combined_prompt, aspect_ratio, task_id) | |
if generated_image: | |
image_path = os.path.join(temp_dir, f"{task_id}_image.png") | |
try: | |
generated_image.save(image_path) | |
temp_image_files[scene_id] = image_path | |
st.image(generated_image, width=180, caption=f"Scene {scene_id+1} Image") | |
except Exception as e: | |
logger.error(f" β [{task_id}] Failed to save image {image_path}: {e}") | |
st.error(f"Failed to save image for scene {task_id}.", icon="πΎ") | |
scene_has_error = True | |
generation_errors[timeline_id].append(f"Scene {scene_id+1}: Image save failed.") | |
else: | |
st.warning(f"Image generation failed for scene {task_id}. Skipping scene.", icon="πΌοΈ") | |
scene_has_error = True | |
generation_errors[timeline_id].append(f"Scene {scene_id+1}: Image generation failed.") | |
continue # Skip audio/video for this scene | |
# --- 2b. Audio Generation --- | |
generated_audio_path: Optional[str] = None | |
if not scene_has_error: | |
with st.spinner(f"[{task_id}] Generating audio... π"): | |
audio_path_temp = os.path.join(temp_dir, f"{task_id}_audio.wav") | |
try: | |
# Run the async function using asyncio.run() | |
generated_audio_path = asyncio.run( | |
generate_audio_live_async(segment.audio_text, audio_path_temp, audio_voice) | |
) | |
except RuntimeError as e: | |
logger.error(f" β [{task_id}] Asyncio runtime error during audio gen: {e}") | |
st.error(f"Asyncio error during audio generation for {task_id}: {e}", icon="β‘") | |
scene_has_error = True | |
generation_errors[timeline_id].append(f"Scene {scene_id+1}: Audio async error.") | |
except Exception as e: | |
logger.exception(f" β [{task_id}] Unexpected error during audio generation call for {task_id}: {e}") | |
st.error(f"Unexpected error in audio generation for {task_id}: {e}", icon="π₯") | |
scene_has_error = True | |
generation_errors[timeline_id].append(f"Scene {scene_id+1}: Audio generation error.") | |
if generated_audio_path: | |
temp_audio_files[scene_id] = generated_audio_path | |
try: | |
with open(generated_audio_path, 'rb') as ap: | |
st.audio(ap.read(), format='audio/wav') | |
except Exception as e: | |
logger.warning(f" β οΈ [{task_id}] Could not display audio preview: {e}") | |
else: | |
st.warning(f"Audio generation failed for {task_id}. Skipping video clip.", icon="π") | |
scene_has_error = True | |
generation_errors[timeline_id].append(f"Scene {scene_id+1}: Audio generation failed.") | |
# Clean up image if audio fails | |
if scene_id in temp_image_files and os.path.exists(temp_image_files[scene_id]): | |
try: | |
os.remove(temp_image_files[scene_id]) | |
logger.info(f" ποΈ [{task_id}] Removed image file due to audio failure.") | |
del temp_image_files[scene_id] | |
except OSError as e: | |
logger.warning(f" β οΈ [{task_id}] Could not remove image file {temp_image_files[scene_id]} after audio failure: {e}") | |
continue # Skip video clip creation | |
# --- 2c. Create Video Clip --- | |
if not scene_has_error and scene_id in temp_image_files and scene_id in temp_audio_files: | |
st.write(f" π¬ Creating video clip for Scene {scene_id+1}...") | |
img_path = temp_image_files[scene_id] | |
aud_path = temp_audio_files[scene_id] | |
audio_clip_instance = None # Define before try | |
image_clip_instance = None # Define before try | |
composite_clip = None # Define before try | |
try: | |
if not os.path.exists(img_path): raise FileNotFoundError(f"Image file not found: {img_path}") | |
if not os.path.exists(aud_path): raise FileNotFoundError(f"Audio file not found: {aud_path}") | |
audio_clip_instance = AudioFileClip(aud_path) | |
np_image = np.array(Image.open(img_path)) | |
image_clip_instance = ImageClip(np_image).set_duration(audio_clip_instance.duration) | |
composite_clip = image_clip_instance.set_audio(audio_clip_instance) | |
video_clips.append(composite_clip) # Add the clip to be concatenated later | |
logger.info(f" β [{task_id}] Video clip created (Duration: {audio_clip_instance.duration:.2f}s).") | |
st.write(f" β Clip created (Duration: {audio_clip_instance.duration:.2f}s).") | |
scene_success_count += 1 | |
# Don't close individual clips here yet, needed for concatenation | |
except Exception as e: | |
logger.exception(f" β [{task_id}] Failed to create video clip for scene {scene_id+1}: {e}") | |
st.error(f"Failed to create video clip for {task_id}: {e}", icon="π¬") | |
scene_has_error = True | |
generation_errors[timeline_id].append(f"Scene {scene_id+1}: Video clip creation failed.") | |
# Cleanup resources if clip creation failed for *this* scene | |
if audio_clip_instance: audio_clip_instance.close() | |
if image_clip_instance: image_clip_instance.close() | |
# Attempt cleanup of related files | |
if os.path.exists(img_path): os.remove(img_path) | |
if os.path.exists(aud_path): os.remove(aud_path) | |
# --- End of Scene Loop --- | |
# --- 2d. Assemble Timeline Video --- | |
timeline_duration = time.time() - timeline_start_time | |
# Only assemble if clips were created and no *fatal* errors occurred during scene processing | |
# (We check scene_success_count against expected number) | |
if video_clips and scene_success_count == len(segments): | |
status.update(label=f"Composing final video for {timeline_label}...") | |
st.write(f"ποΈ Assembling final video for {timeline_label}...") | |
logger.info(f"ποΈ Assembling final video for {timeline_label} ({len(video_clips)} clips)...") | |
output_filename = os.path.join(temp_dir, f"timeline_{timeline_id}_final.mp4") | |
final_timeline_video = None # Define before try block | |
try: | |
# Concatenate the collected clips | |
final_timeline_video = concatenate_videoclips(video_clips, method="compose") | |
final_timeline_video.write_videofile( | |
output_filename, | |
fps=VIDEO_FPS, | |
codec=VIDEO_CODEC, | |
audio_codec=AUDIO_CODEC, | |
logger=None # Suppress moviepy console spam | |
) | |
final_video_paths[timeline_id] = output_filename | |
logger.info(f" β [{timeline_label}] Final video saved: {os.path.basename(output_filename)}") | |
st.success(f"β Video for {timeline_label} completed in {timeline_duration:.2f}s.") | |
except Exception as e: | |
logger.exception(f" β [{timeline_label}] Failed to write final video: {e}") | |
st.error(f"Failed to assemble video for {timeline_label}: {e}", icon="πΌ") | |
all_timelines_successful = False | |
generation_errors[timeline_id].append(f"Timeline {timeline_id}: Final video assembly failed.") | |
finally: | |
# Now close all individual clips and the final concatenated clip | |
logger.debug(f"[{timeline_label}] Closing {len(video_clips)} source clips...") | |
for i, clip in enumerate(video_clips): | |
try: | |
if clip: # Check if clip object exists | |
if clip.audio: clip.audio.close() | |
clip.close() | |
except Exception as e_close: | |
logger.warning(f" β οΈ [{timeline_label}] Error closing source clip {i}: {e_close}") | |
if final_timeline_video: | |
try: | |
if final_timeline_video.audio: final_timeline_video.audio.close() | |
final_timeline_video.close() | |
logger.debug(f"[{timeline_label}] Closed final video object.") | |
except Exception as e_close_final: | |
logger.warning(f" β οΈ [{timeline_label}] Error closing final video object: {e_close_final}") | |
elif not video_clips: | |
logger.warning(f"[{timeline_label}] No video clips successfully generated. Skipping final assembly.") | |
st.warning(f"No scenes were successfully processed for {timeline_label}. Video cannot be created.", icon="π«") | |
all_timelines_successful = False | |
else: # Some scenes failed, so scene_success_count < len(segments) | |
error_count = len(segments) - scene_success_count | |
logger.warning(f"[{timeline_label}] Encountered errors in {error_count} scene(s). Skipping final video assembly.") | |
st.warning(f"{timeline_label} had errors in {error_count} scene(s). Final video not assembled.", icon="β οΈ") | |
all_timelines_successful = False | |
# Log accumulated errors for the timeline if any occurred | |
if generation_errors[timeline_id]: | |
logger.error(f"Summary of errors in {timeline_label}: {generation_errors[timeline_id]}") | |
# --- End of Timelines Loop --- | |
# Final status update | |
overall_duration = time.time() - overall_start_time | |
if all_timelines_successful and final_video_paths: | |
status_msg = f"ChronoWeave Generation Complete! ({len(final_video_paths)} videos in {overall_duration:.2f}s)" | |
status.update(label=status_msg, state="complete", expanded=False) | |
logger.info(status_msg) | |
elif final_video_paths: # Some videos made, but errors occurred | |
status_msg = f"ChronoWeave Partially Complete ({len(final_video_paths)} videos, some errors occurred). Total time: {overall_duration:.2f}s" | |
status.update(label=status_msg, state="warning", expanded=True) | |
logger.warning(status_msg) | |
else: # No videos made | |
status_msg = f"ChronoWeave Generation Failed. No videos produced. Total time: {overall_duration:.2f}s" | |
status.update(label=status_msg, state="error", expanded=True) | |
logger.error(status_msg) | |
# --- 3. Display Results --- | |
st.header("π¬ Generated Timelines") | |
if final_video_paths: | |
sorted_timeline_ids = sorted(final_video_paths.keys()) | |
# Adjust column count based on number of videos, max 3-4 wide? | |
num_cols = min(len(sorted_timeline_ids), 3) | |
cols = st.columns(num_cols) | |
for idx, timeline_id in enumerate(sorted_timeline_ids): | |
col = cols[idx % num_cols] # Cycle through columns | |
video_path = final_video_paths[timeline_id] | |
timeline_data = next((t for t in chrono_response.timelines if t.timeline_id == timeline_id), None) | |
reason = timeline_data.divergence_reason if timeline_data else "Unknown Divergence" | |
with col: | |
st.subheader(f"Timeline {timeline_id}") | |
st.caption(f"Divergence: {reason}") | |
try: | |
with open(video_path, 'rb') as video_file: | |
video_bytes = video_file.read() | |
st.video(video_bytes) | |
logger.info(f"Displaying video for Timeline {timeline_id}") | |
st.download_button( | |
label=f"Download T{timeline_id} Video", | |
data=video_bytes, | |
file_name=f"chronoweave_timeline_{timeline_id}.mp4", | |
mime="video/mp4", | |
key=f"download_btn_{timeline_id}" # Unique key for download button | |
) | |
# Display errors for this timeline if any occurred | |
if generation_errors.get(timeline_id): | |
with st.expander(f"β οΈ View {len(generation_errors[timeline_id])} Generation Issues"): | |
for error_msg in generation_errors[timeline_id]: | |
st.warning(f"- {error_msg}") | |
except FileNotFoundError: | |
logger.error(f"Could not find video file for display: {video_path}") | |
st.error(f"Error: Video file not found for Timeline {timeline_id}.", icon="π¨") | |
except Exception as e: | |
logger.exception(f"Could not display video {video_path}: {e}") | |
st.error(f"Error displaying video for Timeline {timeline_id}: {e}", icon="π¨") | |
else: | |
st.warning("No final videos were successfully generated in this run.") | |
# Display summary of all errors if no videos were made | |
all_errors = [msg for err_list in generation_errors.values() for msg in err_list] | |
if all_errors: | |
st.subheader("Summary of Generation Issues") | |
with st.expander("View All Errors", expanded=True): | |
for tid, errors in generation_errors.items(): | |
if errors: | |
st.error(f"Timeline {tid}:") | |
for msg in errors: | |
st.error(f" - {msg}") | |
# --- 4. Cleanup --- | |
st.info(f"Attempting to clean up temporary directory: {temp_dir}") | |
try: | |
shutil.rmtree(temp_dir) | |
logger.info(f"β Temporary directory removed: {temp_dir}") | |
st.success("β Temporary files cleaned up.") | |
except Exception as e: | |
logger.error(f"β οΈ Could not remove temporary directory {temp_dir}: {e}") | |
st.warning(f"Could not automatically remove temporary files: {temp_dir}. Please remove it manually if needed.", icon="β οΈ") | |
elif not chrono_response: | |
# Error message likely already shown by generate_story_sequence_chrono | |
logger.error("Story generation failed, cannot proceed.") | |
else: | |
# Fallback for unexpected state | |
st.error("An unexpected issue occurred after story generation. Cannot proceed.", icon="π") | |
logger.error("Chrono_response existed but was falsy in the main logic block.") | |
else: | |
st.info("Configure settings in the sidebar and click 'β¨ Generate ChronoWeave β¨' to start.") |