Spaces:
Sleeping
Sleeping
File size: 45,286 Bytes
62f88b4 3c37f6f 62f88b4 3c37f6f 62f88b4 3c37f6f 09b00d7 62f88b4 3c37f6f 62f88b4 3c37f6f 62f88b4 3c37f6f 62f88b4 3c37f6f 09b00d7 3c37f6f 62f88b4 3c37f6f 62f88b4 3c37f6f 62f88b4 3c37f6f 62f88b4 3c37f6f 09b00d7 3c37f6f 62f88b4 3c37f6f 09b00d7 62f88b4 09b00d7 62f88b4 09b00d7 62f88b4 09b00d7 3c37f6f 09b00d7 62f88b4 3c37f6f 62f88b4 3c37f6f 09b00d7 3c37f6f 62f88b4 3c37f6f 62f88b4 3c37f6f 62f88b4 3c37f6f 09b00d7 3c37f6f 09b00d7 3c37f6f 62f88b4 3c37f6f 62f88b4 09b00d7 62f88b4 3c37f6f 09b00d7 3c37f6f 09b00d7 3c37f6f 62f88b4 3c37f6f 09b00d7 3c37f6f 09b00d7 62f88b4 3c37f6f 62f88b4 3c37f6f 62f88b4 3c37f6f 09b00d7 3c37f6f 62f88b4 3c37f6f 62f88b4 3c37f6f 62f88b4 09b00d7 3c37f6f 62f88b4 3c37f6f 09b00d7 3c37f6f 09b00d7 62f88b4 09b00d7 62f88b4 3c37f6f 62f88b4 3c37f6f 09b00d7 3c37f6f 09b00d7 62f88b4 3c37f6f 09b00d7 3c37f6f 62f88b4 3c37f6f 62f88b4 3c37f6f 62f88b4 3c37f6f 09b00d7 3c37f6f 62f88b4 3c37f6f 62f88b4 3c37f6f 62f88b4 3c37f6f 09b00d7 62f88b4 09b00d7 62f88b4 3c37f6f 09b00d7 3c37f6f 09b00d7 3c37f6f 62f88b4 3c37f6f 09b00d7 3c37f6f 62f88b4 3c37f6f 09b00d7 3c37f6f 62f88b4 3c37f6f 09b00d7 3c37f6f 62f88b4 3c37f6f 62f88b4 3c37f6f 62f88b4 09b00d7 62f88b4 3c37f6f 62f88b4 09b00d7 62f88b4 3c37f6f 09b00d7 62f88b4 3c37f6f 62f88b4 3c37f6f 09b00d7 3c37f6f 62f88b4 3c37f6f 62f88b4 3c37f6f 62f88b4 3c37f6f 09b00d7 3c37f6f 09b00d7 3c37f6f 09b00d7 3c37f6f 09b00d7 3c37f6f 09b00d7 62f88b4 3c37f6f 62f88b4 3c37f6f 09b00d7 3c37f6f 09b00d7 3c37f6f 09b00d7 3c37f6f 09b00d7 3c37f6f 09b00d7 3c37f6f 09b00d7 3c37f6f 09b00d7 3c37f6f 09b00d7 3c37f6f 09b00d7 3c37f6f 09b00d7 3c37f6f 09b00d7 3c37f6f 62f88b4 09b00d7 62f88b4 3c37f6f 62f88b4 3c37f6f 62f88b4 3c37f6f 62f88b4 3c37f6f 09b00d7 3c37f6f 09b00d7 3c37f6f 09b00d7 3c37f6f 09b00d7 3c37f6f 09b00d7 3c37f6f 62f88b4 3c37f6f 09b00d7 3c37f6f 09b00d7 3c37f6f 09b00d7 3c37f6f 62f88b4 3c37f6f 62f88b4 3c37f6f 09b00d7 3c37f6f 09b00d7 3c37f6f 09b00d7 3c37f6f 62f88b4 3c37f6f 62f88b4 09b00d7 3c37f6f 09b00d7 62f88b4 3c37f6f 09b00d7 3c37f6f 09b00d7 3c37f6f 62f88b4 3c37f6f 09b00d7 3c37f6f 09b00d7 3c37f6f 09b00d7 62f88b4 3c37f6f 62f88b4 3c37f6f 62f88b4 3c37f6f 09b00d7 3c37f6f 62f88b4 09b00d7 3c37f6f 62f88b4 3c37f6f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 |
# Copyright 2025 Google LLC. Based on work by Yousif Ahmed.
# Concept: ChronoWeave - Branching Narrative Generation
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
import streamlit as st
import google.generativeai as genai
import os
import json
import numpy as np
from io import BytesIO
import time
import wave
import contextlib
import asyncio
import uuid # For unique identifiers
import shutil # For directory operations
import logging # For better logging
# Image handling
from PIL import Image
# Pydantic for data validation
from pydantic import BaseModel, Field, ValidationError, validator
from typing import List, Optional, Literal
# Video and audio processing
from moviepy.editor import ImageClip, AudioFileClip, concatenate_videoclips
# from moviepy.config import change_settings # Potential for setting imagemagick path if needed
# Type hints
import typing_extensions as typing
# Async support for Streamlit/Google API
import nest_asyncio
nest_asyncio.apply() # Apply patch for asyncio in environments like Streamlit/Jupyter
# --- Logging Setup ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- Configuration ---
st.set_page_config(page_title="ChronoWeave", layout="wide", initial_sidebar_state="expanded")
st.title("π ChronoWeave: Advanced Branching Narrative Generator")
st.markdown("""
Generate multiple, branching story timelines from a single theme using AI, complete with images and narration.
*Based on the work of Yousif Ahmed. Copyright 2025 Google LLC.*
""")
# --- Constants ---
# Text/JSON Model
TEXT_MODEL_ID = "models/gemini-1.5-flash" # Or "gemini-1.5-pro" for potentially higher quality/cost
# Audio Model Config
AUDIO_API_VERSION = 'v1alpha' # Required for audio modality (though endpoint set implicitly now)
AUDIO_MODEL_ID = f"models/gemini-1.5-flash" # Model used for audio tasks
AUDIO_SAMPLING_RATE = 24000 # Standard for TTS models like Google's
# Image Model Config
IMAGE_MODEL_ID = "imagen-3" # Or specific version like "imagen-3.0-generate-002"
DEFAULT_ASPECT_RATIO = "1:1"
# Video Config
VIDEO_FPS = 24
VIDEO_CODEC = "libx264" # Widely compatible H.264
AUDIO_CODEC = "aac" # Common audio codec for MP4
# File Management
TEMP_DIR_BASE = ".chrono_temp" # Base name for temporary directories
# --- API Key Handling ---
GOOGLE_API_KEY = None
try:
# Preferred way: Use Streamlit secrets when deployed
GOOGLE_API_KEY = st.secrets["GOOGLE_API_KEY"]
logger.info("Google API Key loaded from Streamlit secrets.")
except KeyError:
# Fallback: Check environment variable (useful for local development)
GOOGLE_API_KEY = os.environ.get('GOOGLE_API_KEY')
if GOOGLE_API_KEY:
logger.info("Google API Key loaded from environment variable.")
else:
# Error if neither is found
st.error(
"π¨ **Google API Key Not Found!**\n"
"Please configure your Google API Key:\n"
"1. **Streamlit Cloud/Hugging Face Spaces:** Add it as a Secret named `GOOGLE_API_KEY` in your app's settings.\n"
"2. **Local Development:** Set the `GOOGLE_API_KEY` environment variable or create a `.streamlit/secrets.toml` file.",
icon="π¨"
)
st.stop() # Halt execution
# --- Initialize Google Clients ---
# CORRECTED SECTION: Uses genai.GenerativeModel for both models
try:
# Configure globally
genai.configure(api_key=GOOGLE_API_KEY)
logger.info("Configured google-generativeai with API key.")
# Model/Client Handle for Text/Imagen Generation
client_standard = genai.GenerativeModel(TEXT_MODEL_ID)
logger.info(f"Initialized standard GenerativeModel for {TEXT_MODEL_ID}.")
# Model Handle for Audio Generation
# Use the standard GenerativeModel initialization.
# The necessary methods (like .connect) are part of this object.
live_model = genai.GenerativeModel(AUDIO_MODEL_ID) # Use GenerativeModel here
logger.info(f"Initialized GenerativeModel handle for audio ({AUDIO_MODEL_ID}).")
# We no longer use or need 'client_live' or explicit endpoint setting here.
# The audio config is handled within the generate_audio_live_async function.
except AttributeError as ae:
# Keep this specific error catch just in case library structure is very old/unexpected
logger.exception("AttributeError during Google AI Client Initialization.")
st.error(f"π¨ Failed to initialize Google AI Clients due to an unexpected library structure error: {ae}. Please ensure 'google-generativeai' is up-to-date.", icon="π¨")
st.stop()
except Exception as e:
logger.exception("Failed to initialize Google AI Clients.")
st.error(f"π¨ Failed to initialize Google AI Clients: {e}", icon="π¨")
st.stop()
# --- Define Pydantic Schemas for Robust Validation ---
class StorySegment(BaseModel):
scene_id: int = Field(..., ge=0, description="Scene number within the timeline, starting from 0.")
image_prompt: str = Field(..., min_length=10, max_length=150, description="Concise visual description for image generation (15-35 words). Focus on non-human characters, setting, action, style.")
audio_text: str = Field(..., min_length=5, max_length=150, description="Single sentence of narration/dialogue for the scene (max 30 words).")
character_description: str = Field(..., max_length=100, description="Brief description of key non-human characters/objects in *this* scene's prompt for consistency.")
timeline_visual_modifier: Optional[str] = Field(None, max_length=50, description="Optional subtle visual style hint (e.g., 'slightly darker', 'more vibrant colors').")
@validator('image_prompt')
def image_prompt_no_humans(cls, v):
if any(word in v.lower() for word in ["person", "people", "human", "man", "woman", "boy", "girl", "child"]):
logger.warning(f"Image prompt '{v[:50]}...' may contain human descriptions. Relying on API-level controls & prompt instructions.")
return v
class Timeline(BaseModel):
timeline_id: int = Field(..., ge=0, description="Unique identifier for this timeline.")
divergence_reason: str = Field(..., min_length=5, description="Clear reason why this timeline branched off.")
segments: List[StorySegment] = Field(..., min_items=1, description="List of scenes composing this timeline.")
class ChronoWeaveResponse(BaseModel):
core_theme: str = Field(..., min_length=5, description="The central theme provided by the user.")
timelines: List[Timeline] = Field(..., min_items=1, description="List of generated timelines.")
total_scenes_per_timeline: int = Field(..., gt=0, description="The requested number of scenes per timeline.")
@validator('timelines')
def check_timeline_segment_count(cls, timelines, values):
if 'total_scenes_per_timeline' in values:
expected_scenes = values['total_scenes_per_timeline']
for i, timeline in enumerate(timelines):
if len(timeline.segments) != expected_scenes:
raise ValueError(f"Timeline {i} (ID: {timeline.timeline_id}) has {len(timeline.segments)} segments, but expected {expected_scenes}.")
return timelines
# --- Helper Functions ---
@contextlib.contextmanager
def wave_file_writer(filename: str, channels: int = 1, rate: int = AUDIO_SAMPLING_RATE, sample_width: int = 2):
"""Context manager to safely write WAV files."""
wf = None
try:
wf = wave.open(filename, "wb")
wf.setnchannels(channels)
wf.setsampwidth(sample_width) # 2 bytes for 16-bit audio
wf.setframerate(rate)
yield wf
except Exception as e:
logger.error(f"Error opening/configuring wave file {filename}: {e}")
raise # Re-raise the exception
finally:
if wf:
try:
wf.close()
except Exception as e_close:
logger.error(f"Error closing wave file {filename}: {e_close}")
async def generate_audio_live_async(api_text: str, output_filename: str, voice: Optional[str] = None) -> Optional[str]:
"""
Generates audio using Gemini Live API (async version) via the GenerativeModel.
Returns the path to the generated audio file or None on failure.
"""
collected_audio = bytearray()
task_id = os.path.basename(output_filename).split('.')[0] # Extract T#_S# for logging
logger.info(f"ποΈ [{task_id}] Requesting audio for: '{api_text[:60]}...'")
try:
# Use the 'live_model' (a GenerativeModel instance) initialized earlier.
config = {
"response_modalities": ["AUDIO"],
"audio_config": {
"audio_encoding": "LINEAR16", # Required format for WAV output
"sample_rate_hertz": AUDIO_SAMPLING_RATE,
# "voice": voice if voice else "aura-asteria-en" # Optional: Specify voice if needed and available
}
}
# Prepend directive to discourage conversational filler
directive_prompt = (
"Narrate the following sentence directly and engagingly. "
"Do not add any introductory or concluding remarks like 'Okay', 'Sure', or 'Here is the narration'. "
"Speak only the sentence itself:\n\n"
f'"{api_text}"'
)
# Connect and stream using the GenerativeModel instance
async with live_model.connect(config=config) as session:
await session.send_request([directive_prompt])
async for response in session.stream_content():
if response.audio_chunk and response.audio_chunk.data:
collected_audio.extend(response.audio_chunk.data)
# Handle potential errors within the stream if the API provides them
if hasattr(response, 'error') and response.error:
logger.error(f" β [{task_id}] Error during audio stream: {response.error}")
st.error(f"Audio stream error for scene {task_id}: {response.error}", icon="π")
return None # Stop processing this audio request
if not collected_audio:
logger.warning(f"β οΈ [{task_id}] No audio data received for: '{api_text[:60]}...'")
st.warning(f"No audio data generated for scene {task_id}.", icon="π")
return None
# Write the collected audio bytes into a WAV file using the context manager.
with wave_file_writer(output_filename, rate=AUDIO_SAMPLING_RATE) as wf:
wf.writeframes(bytes(collected_audio))
logger.info(f" β
[{task_id}] Audio saved: {os.path.basename(output_filename)} ({len(collected_audio)} bytes)")
return output_filename
except genai.types.generation_types.BlockedPromptException as bpe:
logger.error(f" β [{task_id}] Audio generation blocked for prompt '{api_text[:60]}...': {bpe}")
st.error(f"Audio generation blocked for scene {task_id} due to safety settings.", icon="π")
return None
except Exception as e:
# Catch other potential errors during connect/send/stream
logger.exception(f" β [{task_id}] Audio generation failed unexpectedly for '{api_text[:60]}...': {e}")
st.error(f"Audio generation failed for scene {task_id}: {e}", icon="π")
return None
def generate_story_sequence_chrono(
theme: str,
num_scenes: int,
num_timelines: int,
divergence_prompt: str = ""
) -> Optional[ChronoWeaveResponse]:
"""
Generates branching story sequences using Gemini structured output and validates with Pydantic.
Returns a validated Pydantic object or None on failure.
"""
st.info(f"π Generating {num_timelines} timeline(s) x {num_scenes} scenes for theme: '{theme}'...")
logger.info(f"Requesting story structure: Theme='{theme}', Timelines={num_timelines}, Scenes={num_scenes}")
divergence_instruction = (
f"Introduce clear points of divergence between timelines, starting potentially after the first scene. "
f"If provided, use this hint for divergence: '{divergence_prompt}'. "
f"Clearly state the divergence reason for each timeline (except potentially the first)."
)
prompt = f"""
Act as an expert narrative designer specializing in short, visual, branching stories for children.
Create a story based on the core theme: "{theme}".
**Instructions:**
1. Generate exactly **{num_timelines}** distinct timelines.
2. Each timeline must contain exactly **{num_scenes}** sequential scenes.
3. **Crucially, DO NOT include any humans, people, or humanoid figures** in the descriptions or actions. Focus strictly on animals, fantasy creatures, animated objects, or natural elements.
4. {divergence_instruction}
5. Maintain a consistent visual style across all scenes and timelines: **'Simple, friendly kids animation style with bright colors and rounded shapes'**, unless a `timeline_visual_modifier` subtly alters it.
6. Each scene's narration (`audio_text`) should be a single, concise sentence (approx. 5-10 seconds spoken length, max 30 words).
7. Image prompts (`image_prompt`) should be descriptive (15-35 words), focusing on the non-human character(s), setting, action, and visual style. Explicitly mention the main character(s) for consistency.
8. `character_description` should briefly describe recurring non-human characters mentioned *in the specific scene's image prompt* (name, key visual features). Keep consistent within a timeline.
**Output Format:**
Respond ONLY with a valid JSON object adhering strictly to the provided schema. Do not include any text before or after the JSON object.
**JSON Schema:**
```json
{json.dumps(ChronoWeaveResponse.schema(), indent=2)}
```
""" # Using .schema() which is the Pydantic v1 way, adjust if using v2 (.model_json_schema())
try:
# Use the standard client (GenerativeModel instance) for text generation
response = client_standard.generate_content(
contents=prompt,
generation_config=genai.types.GenerationConfig(
response_mime_type="application/json",
temperature=0.7 # Add some creativity
)
)
# Debugging: Log raw response
# logger.debug(f"Raw Gemini Response Text:\n{response.text}")
# Attempt to parse the JSON
try:
# Use response.text which should contain the JSON string
raw_data = json.loads(response.text)
except json.JSONDecodeError as json_err:
logger.error(f"Failed to decode JSON response: {json_err}")
logger.error(f"Problematic Response Text:\n{response.text}")
st.error(f"π¨ Failed to parse the story structure from the AI. Error: {json_err}", icon="π")
st.text_area("Problematic AI Response:", response.text, height=200)
return None
except Exception as e:
logger.error(f"Error accessing or decoding response text: {e}")
st.error(f"π¨ Error processing AI response: {e}", icon="π")
# Log the response object itself if possible
# logger.debug(f"Response object: {response}")
return None
# Validate the parsed data using Pydantic
try:
# Use parse_obj for Pydantic v1, or YourModel.model_validate(raw_data) for v2
validated_data = ChronoWeaveResponse.parse_obj(raw_data)
logger.info("β
Story structure generated and validated successfully!")
st.success("β
Story structure generated and validated!")
return validated_data
except ValidationError as val_err:
logger.error(f"JSON structure validation failed: {val_err}")
logger.error(f"Received Data:\n{json.dumps(raw_data, indent=2)}")
st.error(f"π¨ The generated story structure is invalid: {val_err}", icon="π§¬")
st.json(raw_data) # Show the invalid structure
return None
except genai.types.generation_types.BlockedPromptException as bpe:
logger.error(f"Story generation prompt blocked: {bpe}")
st.error("π¨ The story generation prompt was blocked, likely due to safety filters. Try rephrasing the theme.", icon="π«")
return None
except Exception as e:
logger.exception("Error during story sequence generation:")
st.error(f"π¨ An unexpected error occurred during story generation: {e}", icon="π₯")
# Optional: Show the prompt that failed (be mindful of length/PII)
# st.text_area("Failed Prompt (excerpt):", prompt[:500]+"...", height=150)
return None
def generate_image_imagen(prompt: str, aspect_ratio: str = "1:1", task_id: str = "IMG") -> Optional[Image.Image]:
"""
Generates an image using Imagen via the standard client with specific controls.
Returns a PIL Image object or None on failure.
"""
logger.info(f"πΌοΈ [{task_id}] Requesting image for: '{prompt[:70]}...' (Aspect: {aspect_ratio})")
# Refined prompt incorporating negative constraints and style guidance
full_prompt = (
f"Generate an image in a child-friendly, simple animation style with bright colors and rounded shapes. "
f"Ensure absolutely NO humans or human-like figures are present. Focus on animals or objects. "
f"Aspect ratio should be {aspect_ratio}. " # Explicitly state aspect ratio in prompt too
f"Prompt: {prompt}"
)
try:
# Use the standard client's generate_content method.
response = client_standard.generate_content(
full_prompt,
generation_config=genai.types.GenerationConfig(
candidate_count=1,
# Add other config like temperature if desired
),
# Safety settings can be adjusted here if necessary and permitted
# safety_settings={'HARM_CATEGORY_DANGEROUS_CONTENT': 'BLOCK_NONE'} # Use cautiously
)
# Check for valid response and image data
# Accessing image data might depend slightly on the exact API response structure
# common pattern is response.candidates[0].content.parts[0].inline_data.data
# or directly response.parts if simpler structure
image_bytes = None
if response.candidates and response.candidates[0].content and response.candidates[0].content.parts:
part = response.candidates[0].content.parts[0]
if hasattr(part, 'inline_data') and part.inline_data and hasattr(part.inline_data,'data'):
image_bytes = part.inline_data.data
elif hasattr(part, 'file_data') and part.file_data: # Handle potential file URIs if API changes
logger.warning(f" β οΈ [{task_id}] Received file URI instead of inline data. Handling not implemented.")
# Potentially download from part.file_data.file_uri here
return None # Or implement download
if image_bytes:
try:
image = Image.open(BytesIO(image_bytes))
logger.info(f" β
[{task_id}] Image generated successfully.")
# Check safety feedback even on success
safety_ratings = getattr(response.candidates[0], 'safety_ratings', [])
if safety_ratings:
filtered_ratings = [f"{r.category.name}: {r.probability.name}" for r in safety_ratings if r.probability.name != 'NEGLIGIBLE']
if filtered_ratings:
logger.warning(f" β οΈ [{task_id}] Image generated but flagged by safety filters: {', '.join(filtered_ratings)}.")
st.warning(f"Image for scene {task_id} flagged by safety filters: {', '.join(filtered_ratings)}", icon="β οΈ")
return image
except Exception as img_err:
logger.error(f" β [{task_id}] Failed to decode generated image data: {img_err}")
st.warning(f"Failed to decode image data for scene {task_id}.", icon="πΌοΈ")
return None
else:
# Check for blocking or other issues
block_reason = None
prompt_feedback = getattr(response, 'prompt_feedback', None)
if prompt_feedback:
block_reason = getattr(prompt_feedback, 'block_reason', None)
if block_reason:
logger.warning(f" β οΈ [{task_id}] Image generation blocked. Reason: {block_reason}. Prompt: '{prompt[:70]}...'")
st.warning(f"Image generation blocked for scene {task_id}. Reason: {block_reason}", icon="π«")
else:
logger.warning(f" β οΈ [{task_id}] No image data received, unknown reason. Prompt: '{prompt[:70]}...'")
st.warning(f"No image data received for scene {task_id}, reason unclear.", icon="πΌοΈ")
# Log the full response for debugging
# logger.debug(f"Full Imagen response object: {response}")
return None
except genai.types.generation_types.BlockedPromptException as bpe:
# This might be caught by the block_reason check above, but good to have explicit catch
logger.error(f" β [{task_id}] Image generation blocked (exception): {bpe}")
st.error(f"Image generation blocked for scene {task_id} due to safety settings.", icon="π«")
return None
except Exception as e:
logger.exception(f" β [{task_id}] Image generation failed unexpectedly for '{prompt[:70]}...': {e}")
st.error(f"Image generation failed for scene {task_id}: {e}", icon="πΌοΈ")
return None
# --- Streamlit UI Elements ---
st.sidebar.header("βοΈ Configuration")
# API Key Status
if GOOGLE_API_KEY:
st.sidebar.success("Google API Key Loaded", icon="β
")
else:
st.sidebar.error("Google API Key Missing!", icon="π¨") # Should not be reached if st.stop() works
# Story Parameters
theme = st.sidebar.text_input("π Story Theme:", "A curious squirrel finds a mysterious, glowing acorn")
num_scenes = st.sidebar.slider("π¬ Scenes per Timeline:", min_value=2, max_value=7, value=3, help="Number of scenes (image+narration) in each timeline.")
num_timelines = st.sidebar.slider("πΏ Number of Timelines:", min_value=1, max_value=4, value=2, help="Number of parallel storylines to generate.")
divergence_prompt = st.sidebar.text_input("βοΈ Divergence Hint (Optional):", placeholder="e.g., What if a bird tried to steal it?", help="A suggestion for how the timelines might differ.")
# Generation Settings
st.sidebar.subheader("π¨ Visual & Audio Settings")
aspect_ratio = st.sidebar.selectbox("πΌοΈ Image Aspect Ratio:", ["1:1", "16:9", "9:16"], index=0, help="Aspect ratio for generated images.")
# Add audio voice selection if API supports it and voices are known
# available_voices = ["aura-asteria-en", "aura-luna-en", "aura-stella-en"] # Example
# audio_voice = st.sidebar.selectbox("π£οΈ Narration Voice:", available_voices, index=0)
audio_voice = None # Placeholder
generate_button = st.sidebar.button("β¨ Generate ChronoWeave β¨", type="primary", disabled=(not GOOGLE_API_KEY), use_container_width=True)
st.sidebar.markdown("---")
st.sidebar.info("β³ Generation can take several minutes, especially with more scenes or timelines.", icon="β³")
st.sidebar.markdown(f"<small>Models: Text={TEXT_MODEL_ID}, Image={IMAGE_MODEL_ID}, Audio={AUDIO_MODEL_ID}</small>", unsafe_allow_html=True)
# --- Main Logic ---
if generate_button:
if not theme:
st.error("Please enter a story theme in the sidebar.", icon="π")
else:
# Create a unique temporary directory for this run
run_id = str(uuid.uuid4()).split('-')[0] # Short unique ID
temp_dir = os.path.join(TEMP_DIR_BASE, f"run_{run_id}")
try:
os.makedirs(temp_dir, exist_ok=True)
logger.info(f"Created temporary directory: {temp_dir}")
except OSError as e:
st.error(f"π¨ Failed to create temporary directory {temp_dir}: {e}", icon="π")
st.stop()
final_video_paths = {} # Stores {timeline_id: video_path}
generation_errors = {} # Stores {timeline_id: [error_messages]}
# --- 1. Generate Narrative Structure ---
chrono_response: Optional[ChronoWeaveResponse] = None
with st.spinner("Generating narrative structure... π€"):
chrono_response = generate_story_sequence_chrono(theme, num_scenes, num_timelines, divergence_prompt)
if chrono_response:
st.success(f"Narrative structure received for {len(chrono_response.timelines)} timelines.")
logger.info(f"Successfully generated structure for {len(chrono_response.timelines)} timelines.")
# --- 2. Process Each Timeline ---
overall_start_time = time.time()
all_timelines_successful = True # Assume success initially
# Use st.status for collapsible progress updates
with st.status("Generating assets and composing videos...", expanded=True) as status:
for timeline_index, timeline in enumerate(chrono_response.timelines):
timeline_id = timeline.timeline_id
divergence = timeline.divergence_reason
segments = timeline.segments
timeline_label = f"Timeline {timeline_id}" # Consistent label
st.subheader(f"Processing {timeline_label}: {divergence}")
logger.info(f"--- Processing {timeline_label} (Index: {timeline_index}) ---")
generation_errors[timeline_id] = [] # Initialize error list
temp_image_files = {} # {scene_id: path}
temp_audio_files = {} # {scene_id: path}
video_clips = [] # List of moviepy clips
timeline_start_time = time.time()
scene_success_count = 0
for scene_index, segment in enumerate(segments):
scene_id = segment.scene_id
task_id = f"T{timeline_id}_S{scene_id}" # Unique ID
status_message = f"Processing {timeline_label}, Scene {scene_id + 1}/{len(segments)}..."
status.update(label=status_message)
st.markdown(f"--- **Scene {scene_id + 1} ({task_id})** ---")
logger.info(status_message)
scene_has_error = False
# Log scene details
st.write(f" *Image Prompt:* {segment.image_prompt}" + (f" *(Modifier: {segment.timeline_visual_modifier})*" if segment.timeline_visual_modifier else ""))
st.write(f" *Audio Text:* {segment.audio_text}")
# --- 2a. Image Generation ---
generated_image: Optional[Image.Image] = None # Define before spinner
with st.spinner(f"[{task_id}] Generating image... π¨"):
combined_prompt = f"{segment.image_prompt}. {segment.character_description}"
if segment.timeline_visual_modifier:
combined_prompt += f" Visual style hint: {segment.timeline_visual_modifier}."
generated_image = generate_image_imagen(combined_prompt, aspect_ratio, task_id)
if generated_image:
image_path = os.path.join(temp_dir, f"{task_id}_image.png")
try:
generated_image.save(image_path)
temp_image_files[scene_id] = image_path
st.image(generated_image, width=180, caption=f"Scene {scene_id+1} Image")
except Exception as e:
logger.error(f" β [{task_id}] Failed to save image {image_path}: {e}")
st.error(f"Failed to save image for scene {task_id}.", icon="πΎ")
scene_has_error = True
generation_errors[timeline_id].append(f"Scene {scene_id+1}: Image save failed.")
else:
st.warning(f"Image generation failed for scene {task_id}. Skipping scene.", icon="πΌοΈ")
scene_has_error = True
generation_errors[timeline_id].append(f"Scene {scene_id+1}: Image generation failed.")
continue # Skip audio/video for this scene
# --- 2b. Audio Generation ---
generated_audio_path: Optional[str] = None
if not scene_has_error:
with st.spinner(f"[{task_id}] Generating audio... π"):
audio_path_temp = os.path.join(temp_dir, f"{task_id}_audio.wav")
try:
# Run the async function using asyncio.run()
generated_audio_path = asyncio.run(
generate_audio_live_async(segment.audio_text, audio_path_temp, audio_voice)
)
except RuntimeError as e:
logger.error(f" β [{task_id}] Asyncio runtime error during audio gen: {e}")
st.error(f"Asyncio error during audio generation for {task_id}: {e}", icon="β‘")
scene_has_error = True
generation_errors[timeline_id].append(f"Scene {scene_id+1}: Audio async error.")
except Exception as e:
logger.exception(f" β [{task_id}] Unexpected error during audio generation call for {task_id}: {e}")
st.error(f"Unexpected error in audio generation for {task_id}: {e}", icon="π₯")
scene_has_error = True
generation_errors[timeline_id].append(f"Scene {scene_id+1}: Audio generation error.")
if generated_audio_path:
temp_audio_files[scene_id] = generated_audio_path
try:
with open(generated_audio_path, 'rb') as ap:
st.audio(ap.read(), format='audio/wav')
except Exception as e:
logger.warning(f" β οΈ [{task_id}] Could not display audio preview: {e}")
else:
st.warning(f"Audio generation failed for {task_id}. Skipping video clip.", icon="π")
scene_has_error = True
generation_errors[timeline_id].append(f"Scene {scene_id+1}: Audio generation failed.")
# Clean up image if audio fails
if scene_id in temp_image_files and os.path.exists(temp_image_files[scene_id]):
try:
os.remove(temp_image_files[scene_id])
logger.info(f" ποΈ [{task_id}] Removed image file due to audio failure.")
del temp_image_files[scene_id]
except OSError as e:
logger.warning(f" β οΈ [{task_id}] Could not remove image file {temp_image_files[scene_id]} after audio failure: {e}")
continue # Skip video clip creation
# --- 2c. Create Video Clip ---
if not scene_has_error and scene_id in temp_image_files and scene_id in temp_audio_files:
st.write(f" π¬ Creating video clip for Scene {scene_id+1}...")
img_path = temp_image_files[scene_id]
aud_path = temp_audio_files[scene_id]
audio_clip_instance = None # Define before try
image_clip_instance = None # Define before try
composite_clip = None # Define before try
try:
if not os.path.exists(img_path): raise FileNotFoundError(f"Image file not found: {img_path}")
if not os.path.exists(aud_path): raise FileNotFoundError(f"Audio file not found: {aud_path}")
audio_clip_instance = AudioFileClip(aud_path)
np_image = np.array(Image.open(img_path))
image_clip_instance = ImageClip(np_image).set_duration(audio_clip_instance.duration)
composite_clip = image_clip_instance.set_audio(audio_clip_instance)
video_clips.append(composite_clip) # Add the clip to be concatenated later
logger.info(f" β
[{task_id}] Video clip created (Duration: {audio_clip_instance.duration:.2f}s).")
st.write(f" β
Clip created (Duration: {audio_clip_instance.duration:.2f}s).")
scene_success_count += 1
# Don't close individual clips here yet, needed for concatenation
except Exception as e:
logger.exception(f" β [{task_id}] Failed to create video clip for scene {scene_id+1}: {e}")
st.error(f"Failed to create video clip for {task_id}: {e}", icon="π¬")
scene_has_error = True
generation_errors[timeline_id].append(f"Scene {scene_id+1}: Video clip creation failed.")
# Cleanup resources if clip creation failed for *this* scene
if audio_clip_instance: audio_clip_instance.close()
if image_clip_instance: image_clip_instance.close()
# Attempt cleanup of related files
if os.path.exists(img_path): os.remove(img_path)
if os.path.exists(aud_path): os.remove(aud_path)
# --- End of Scene Loop ---
# --- 2d. Assemble Timeline Video ---
timeline_duration = time.time() - timeline_start_time
# Only assemble if clips were created and no *fatal* errors occurred during scene processing
# (We check scene_success_count against expected number)
if video_clips and scene_success_count == len(segments):
status.update(label=f"Composing final video for {timeline_label}...")
st.write(f"ποΈ Assembling final video for {timeline_label}...")
logger.info(f"ποΈ Assembling final video for {timeline_label} ({len(video_clips)} clips)...")
output_filename = os.path.join(temp_dir, f"timeline_{timeline_id}_final.mp4")
final_timeline_video = None # Define before try block
try:
# Concatenate the collected clips
final_timeline_video = concatenate_videoclips(video_clips, method="compose")
final_timeline_video.write_videofile(
output_filename,
fps=VIDEO_FPS,
codec=VIDEO_CODEC,
audio_codec=AUDIO_CODEC,
logger=None # Suppress moviepy console spam
)
final_video_paths[timeline_id] = output_filename
logger.info(f" β
[{timeline_label}] Final video saved: {os.path.basename(output_filename)}")
st.success(f"β
Video for {timeline_label} completed in {timeline_duration:.2f}s.")
except Exception as e:
logger.exception(f" β [{timeline_label}] Failed to write final video: {e}")
st.error(f"Failed to assemble video for {timeline_label}: {e}", icon="πΌ")
all_timelines_successful = False
generation_errors[timeline_id].append(f"Timeline {timeline_id}: Final video assembly failed.")
finally:
# Now close all individual clips and the final concatenated clip
logger.debug(f"[{timeline_label}] Closing {len(video_clips)} source clips...")
for i, clip in enumerate(video_clips):
try:
if clip: # Check if clip object exists
if clip.audio: clip.audio.close()
clip.close()
except Exception as e_close:
logger.warning(f" β οΈ [{timeline_label}] Error closing source clip {i}: {e_close}")
if final_timeline_video:
try:
if final_timeline_video.audio: final_timeline_video.audio.close()
final_timeline_video.close()
logger.debug(f"[{timeline_label}] Closed final video object.")
except Exception as e_close_final:
logger.warning(f" β οΈ [{timeline_label}] Error closing final video object: {e_close_final}")
elif not video_clips:
logger.warning(f"[{timeline_label}] No video clips successfully generated. Skipping final assembly.")
st.warning(f"No scenes were successfully processed for {timeline_label}. Video cannot be created.", icon="π«")
all_timelines_successful = False
else: # Some scenes failed, so scene_success_count < len(segments)
error_count = len(segments) - scene_success_count
logger.warning(f"[{timeline_label}] Encountered errors in {error_count} scene(s). Skipping final video assembly.")
st.warning(f"{timeline_label} had errors in {error_count} scene(s). Final video not assembled.", icon="β οΈ")
all_timelines_successful = False
# Log accumulated errors for the timeline if any occurred
if generation_errors[timeline_id]:
logger.error(f"Summary of errors in {timeline_label}: {generation_errors[timeline_id]}")
# --- End of Timelines Loop ---
# Final status update
overall_duration = time.time() - overall_start_time
if all_timelines_successful and final_video_paths:
status_msg = f"ChronoWeave Generation Complete! ({len(final_video_paths)} videos in {overall_duration:.2f}s)"
status.update(label=status_msg, state="complete", expanded=False)
logger.info(status_msg)
elif final_video_paths: # Some videos made, but errors occurred
status_msg = f"ChronoWeave Partially Complete ({len(final_video_paths)} videos, some errors occurred). Total time: {overall_duration:.2f}s"
status.update(label=status_msg, state="warning", expanded=True)
logger.warning(status_msg)
else: # No videos made
status_msg = f"ChronoWeave Generation Failed. No videos produced. Total time: {overall_duration:.2f}s"
status.update(label=status_msg, state="error", expanded=True)
logger.error(status_msg)
# --- 3. Display Results ---
st.header("π¬ Generated Timelines")
if final_video_paths:
sorted_timeline_ids = sorted(final_video_paths.keys())
# Adjust column count based on number of videos, max 3-4 wide?
num_cols = min(len(sorted_timeline_ids), 3)
cols = st.columns(num_cols)
for idx, timeline_id in enumerate(sorted_timeline_ids):
col = cols[idx % num_cols] # Cycle through columns
video_path = final_video_paths[timeline_id]
timeline_data = next((t for t in chrono_response.timelines if t.timeline_id == timeline_id), None)
reason = timeline_data.divergence_reason if timeline_data else "Unknown Divergence"
with col:
st.subheader(f"Timeline {timeline_id}")
st.caption(f"Divergence: {reason}")
try:
with open(video_path, 'rb') as video_file:
video_bytes = video_file.read()
st.video(video_bytes)
logger.info(f"Displaying video for Timeline {timeline_id}")
st.download_button(
label=f"Download T{timeline_id} Video",
data=video_bytes,
file_name=f"chronoweave_timeline_{timeline_id}.mp4",
mime="video/mp4",
key=f"download_btn_{timeline_id}" # Unique key for download button
)
# Display errors for this timeline if any occurred
if generation_errors.get(timeline_id):
with st.expander(f"β οΈ View {len(generation_errors[timeline_id])} Generation Issues"):
for error_msg in generation_errors[timeline_id]:
st.warning(f"- {error_msg}")
except FileNotFoundError:
logger.error(f"Could not find video file for display: {video_path}")
st.error(f"Error: Video file not found for Timeline {timeline_id}.", icon="π¨")
except Exception as e:
logger.exception(f"Could not display video {video_path}: {e}")
st.error(f"Error displaying video for Timeline {timeline_id}: {e}", icon="π¨")
else:
st.warning("No final videos were successfully generated in this run.")
# Display summary of all errors if no videos were made
all_errors = [msg for err_list in generation_errors.values() for msg in err_list]
if all_errors:
st.subheader("Summary of Generation Issues")
with st.expander("View All Errors", expanded=True):
for tid, errors in generation_errors.items():
if errors:
st.error(f"Timeline {tid}:")
for msg in errors:
st.error(f" - {msg}")
# --- 4. Cleanup ---
st.info(f"Attempting to clean up temporary directory: {temp_dir}")
try:
shutil.rmtree(temp_dir)
logger.info(f"β
Temporary directory removed: {temp_dir}")
st.success("β
Temporary files cleaned up.")
except Exception as e:
logger.error(f"β οΈ Could not remove temporary directory {temp_dir}: {e}")
st.warning(f"Could not automatically remove temporary files: {temp_dir}. Please remove it manually if needed.", icon="β οΈ")
elif not chrono_response:
# Error message likely already shown by generate_story_sequence_chrono
logger.error("Story generation failed, cannot proceed.")
else:
# Fallback for unexpected state
st.error("An unexpected issue occurred after story generation. Cannot proceed.", icon="π")
logger.error("Chrono_response existed but was falsy in the main logic block.")
else:
st.info("Configure settings in the sidebar and click 'β¨ Generate ChronoWeave β¨' to start.") |