# core/visual_engine.py # ... (all imports and class setup as in the previous "expertly crafted" version) ... # ... (methods __init__ through generate_narration_audio also as in the previous full version) ... class VisualEngine: # ... (previous __init__ and set_api_key methods, _image_to_data_uri, _map_resolution_to_runway_ratio, # _get_text_dimensions, _create_placeholder_image_content, _search_pexels_image, # _generate_video_clip_with_runwayml, _create_placeholder_video_content, # generate_scene_asset, generate_narration_audio - KEEP THESE AS THEY WERE in the last full version) ... # ========================================================================= # ASSEMBLE ANIMATIC - EXTREME DEBUGGING FOR IMAGE ASSETS # ========================================================================= def assemble_animatic_from_assets(self, asset_data_list, overall_narration_path=None, output_filename="final_video.mp4", fps=24): if not asset_data_list: logger.warning("No assets for animatic."); return None processed_moviepy_clips_list = []; narration_audio_clip_mvpy = None; final_video_output_clip = None logger.info(f"Assembling from {len(asset_data_list)} assets. Target Frame: {self.video_frame_size}.") for i_asset, asset_info_item_loop in enumerate(asset_data_list): path_of_asset, type_of_asset, duration_for_scene = asset_info_item_loop.get('path'), asset_info_item_loop.get('type'), asset_info_item_loop.get('duration', 4.5) num_of_scene, action_in_key = asset_info_item_loop.get('scene_num', i_asset + 1), asset_info_item_loop.get('key_action', '') logger.info(f"S{num_of_scene}: Path='{path_of_asset}', Type='{type_of_asset}', Dur='{duration_for_scene}'s") if not (path_of_asset and os.path.exists(path_of_asset)): logger.warning(f"S{num_of_scene}: Not found '{path_of_asset}'. Skip."); continue if duration_for_scene <= 0: logger.warning(f"S{num_of_scene}: Invalid duration ({duration_for_scene}s). Skip."); continue active_scene_clip = None try: if type_of_asset == 'image': logger.info(f"S{num_of_scene}: Processing IMAGE asset: {path_of_asset}") # 0. Load original image pil_img_original = Image.open(path_of_asset) logger.debug(f"S{num_of_scene} (0-Load): Original loaded. Mode:{pil_img_original.mode}, Size:{pil_img_original.size}") pil_img_original.save(os.path.join(self.output_dir,f"debug_0_ORIGINAL_S{num_of_scene}.png")) # 1. Convert to RGBA for consistent alpha handling (even if it's already RGB) img_rgba_intermediate = pil_img_original.convert('RGBA') if pil_img_original.mode != 'RGBA' else pil_img_original.copy().convert('RGBA') # Ensure copy if already RGBA logger.debug(f"S{num_of_scene} (1-ToRGBA): Converted to RGBA. Mode:{img_rgba_intermediate.mode}, Size:{img_rgba_intermediate.size}") img_rgba_intermediate.save(os.path.join(self.output_dir,f"debug_1_AS_RGBA_S{num_of_scene}.png")) # 2. Thumbnail the RGBA image thumbnailed_img_rgba = img_rgba_intermediate.copy() # Work on a copy for thumbnailing resample_filter_pil = Image.Resampling.LANCZOS if hasattr(Image.Resampling,'LANCZOS') else Image.BILINEAR thumbnailed_img_rgba.thumbnail(self.video_frame_size, resample_filter_pil) logger.debug(f"S{num_of_scene} (2-Thumbnail): Thumbnailed RGBA. Mode:{thumbnailed_img_rgba.mode}, Size:{thumbnailed_img_rgba.size}") thumbnailed_img_rgba.save(os.path.join(self.output_dir,f"debug_2_THUMBNAIL_RGBA_S{num_of_scene}.png")) # 3. Create a target-sized RGBA canvas (fully transparent for true alpha blending) canvas_for_compositing_rgba = Image.new('RGBA', self.video_frame_size, (0,0,0,0)) pos_x_paste = (self.video_frame_size[0] - thumbnailed_img_rgba.width) // 2 pos_y_paste = (self.video_frame_size[1] - thumbnailed_img_rgba.height) // 2 # Paste the (potentially smaller) thumbnailed RGBA image onto the transparent RGBA canvas, using its own alpha canvas_for_compositing_rgba.paste(thumbnailed_img_rgba, (pos_x_paste, pos_y_paste), thumbnailed_img_rgba) logger.debug(f"S{num_of_scene} (3-PasteOnRGBA): Image pasted onto transparent RGBA canvas. Mode:{canvas_for_compositing_rgba.mode}, Size:{canvas_for_compositing_rgba.size}") canvas_for_compositing_rgba.save(os.path.join(self.output_dir,f"debug_3_COMPOSITED_RGBA_S{num_of_scene}.png")) # 4. Create a final RGB image by pasting the composited RGBA canvas onto an opaque background # This flattens all transparency and ensures a 3-channel RGB image for MoviePy. final_rgb_image_for_pil = Image.new("RGB", self.video_frame_size, (5, 5, 15)) # Dark opaque background (e.g., dark blue) # Paste canvas_for_compositing_rgba using its alpha channel as the mask if canvas_for_compositing_rgba.mode == 'RGBA': final_rgb_image_for_pil.paste(canvas_for_compositing_rgba, mask=canvas_for_compositing_rgba.split()[3]) else: # Should not happen if step 1 & 3 are correct, but as a fallback final_rgb_image_for_pil.paste(canvas_for_compositing_rgba) # Paste without mask if not RGBA logger.debug(f"S{num_of_scene} (4-ToRGB): Final RGB image created. Mode:{final_rgb_image_for_pil.mode}, Size:{final_rgb_image_for_pil.size}") # THIS IS THE CRITICAL DEBUG IMAGE - what does it look like? debug_path_img_pre_numpy = os.path.join(self.output_dir,f"debug_4_PRE_NUMPY_RGB_S{num_of_scene}.png"); final_rgb_image_for_pil.save(debug_path_img_pre_numpy); logger.info(f"CRITICAL DEBUG: Saved PRE_NUMPY_RGB_S{num_of_scene} (image fed to NumPy) to {debug_path_img_pre_numpy}") # 5. Convert to C-contiguous NumPy array, dtype uint8 numpy_frame_arr = np.array(final_rgb_image_for_pil, dtype=np.uint8) if not numpy_frame_arr.flags['C_CONTIGUOUS']: numpy_frame_arr = np.ascontiguousarray(numpy_frame_arr, dtype=np.uint8) # Ensure C-order logger.debug(f"S{num_of_scene} (5-NumPy): Ensured NumPy array is C-contiguous.") logger.debug(f"S{num_of_scene} (5-NumPy): Final NumPy array for MoviePy. Shape:{numpy_frame_arr.shape}, DType:{numpy_frame_arr.dtype}, Flags:{numpy_frame_arr.flags}") if numpy_frame_arr.size == 0 or numpy_frame_arr.ndim != 3 or numpy_frame_arr.shape[2] != 3: logger.error(f"S{num_of_scene}: Invalid NumPy array shape/size ({numpy_frame_arr.shape}) for ImageClip. Skipping this asset."); continue # 6. Create MoviePy ImageClip base_image_clip_mvpy = ImageClip(numpy_frame_arr, transparent=False, ismask=False).set_duration(duration_for_scene) logger.debug(f"S{num_of_scene} (6-ImageClip): Base ImageClip created. Duration: {base_image_clip_mvpy.duration}") # 7. DEBUG: Save a frame directly FROM the MoviePy ImageClip object debug_path_moviepy_frame = os.path.join(self.output_dir,f"debug_7_MOVIEPY_FRAME_S{num_of_scene}.png") try: base_image_clip_mvpy.save_frame(debug_path_moviepy_frame, t=min(0.1, base_image_clip_mvpy.duration / 2 if base_image_clip_mvpy.duration > 0 else 0.1)) # Save frame at 0.1s or mid-point logger.info(f"CRITICAL DEBUG: Saved frame FROM MOVIEPY ImageClip for S{num_of_scene} to {debug_path_moviepy_frame}") except Exception as e_save_mvpy_frame: logger.error(f"DEBUG: Error saving frame FROM MOVIEPY ImageClip for S{num_of_scene}: {e_save_mvpy_frame}", exc_info=True) # 8. Apply Ken Burns effect (optional, can be commented out for further isolation) fx_image_clip_mvpy = base_image_clip_mvpy try: scale_end_kb_val = random.uniform(1.03, 1.08) if duration_for_scene > 0: # Avoid division by zero fx_image_clip_mvpy = base_image_clip_mvpy.fx(vfx.resize, lambda t_val: 1 + (scale_end_kb_val - 1) * (t_val / duration_for_scene)).set_position('center') logger.debug(f"S{num_of_scene} (8-KenBurns): Ken Burns effect applied.") else: logger.warning(f"S{num_of_scene}: Duration is zero, skipping Ken Burns.") except Exception as e_kb_fx_loop: logger.error(f"S{num_of_scene} Ken Burns effect error: {e_kb_fx_loop}", exc_info=False) # exc_info=False for brevity active_scene_clip = fx_image_clip_mvpy elif type_of_asset == 'video': # ... (Video processing logic from the previous full, corrected version) ... # Ensure this part also handles clip closing diligently. source_video_clip_obj=None try: logger.debug(f"S{num_of_scene}: Loading VIDEO asset: {path_of_asset}") source_video_clip_obj=VideoFileClip(path_of_asset,target_resolution=(self.video_frame_size[1],self.video_frame_size[0])if self.video_frame_size else None, audio=False) temp_video_clip_obj_loop=source_video_clip_obj if source_video_clip_obj.duration!=duration_for_scene: if source_video_clip_obj.duration>duration_for_scene:temp_video_clip_obj_loop=source_video_clip_obj.subclip(0,duration_for_scene) else: if duration_for_scene/source_video_clip_obj.duration > 1.5 and source_video_clip_obj.duration>0.1:temp_video_clip_obj_loop=source_video_clip_obj.loop(duration=duration_for_scene) else:temp_video_clip_obj_loop=source_video_clip_obj.set_duration(source_video_clip_obj.duration);logger.info(f"S{num_of_scene} Video clip ({source_video_clip_obj.duration:.2f}s) shorter than target ({duration_for_scene:.2f}s).") active_scene_clip=temp_video_clip_obj_loop.set_duration(duration_for_scene) # Ensure final clip has target duration if active_scene_clip.size!=list(self.video_frame_size):active_scene_clip=active_scene_clip.resize(self.video_frame_size) logger.debug(f"S{num_of_scene}: Video asset processed. Final duration for scene: {active_scene_clip.duration:.2f}s") except Exception as e_vid_load_loop:logger.error(f"S{num_of_scene} Video load error '{path_of_asset}':{e_vid_load_loop}",exc_info=True);continue # Skip this broken video asset finally: # Close the original source_video_clip_obj if it's different from active_scene_clip if source_video_clip_obj and source_video_clip_obj is not active_scene_clip and hasattr(source_video_clip_obj,'close'): try: source_video_clip_obj.close() except Exception as e_close_src_vid: logger.warning(f"S{num_of_scene}: Error closing source VideoFileClip: {e_close_src_vid}") else: logger.warning(f"S{num_of_scene} Unknown asset type '{type_of_asset}'. Skipping."); continue # Add text overlay (common to both image and video assets) if active_scene_clip and action_in_key: try: dur_text_overlay_val=min(active_scene_clip.duration-0.5,active_scene_clip.duration*0.8)if active_scene_clip.duration>0.5 else active_scene_clip.duration start_text_overlay_val=0.25 # Start text a bit into the clip if dur_text_overlay_val > 0: text_clip_for_overlay_obj=TextClip(f"Scene {num_of_scene}\n{action_in_key}",fontsize=self.VIDEO_OVERLAY_FONT_SIZE,color=self.VIDEO_OVERLAY_FONT_COLOR,font=self.active_moviepy_font_name,bg_color='rgba(10,10,20,0.7)',method='caption',align='West',size=(self.video_frame_size[0]*0.9,None),kerning=-1,stroke_color='black',stroke_width=1.5).set_duration(dur_text_overlay_val).set_start(start_text_overlay_val).set_position(('center',0.92),relative=True) active_scene_clip=CompositeVideoClip([active_scene_clip,text_clip_for_overlay_obj],size=self.video_frame_size,use_bgclip=True) # Ensure use_bgclip=True logger.debug(f"S{num_of_scene}: Text overlay composited.") else: logger.warning(f"S{num_of_scene}: Text overlay duration is zero or negative ({dur_text_overlay_val}). Skipping text overlay.") except Exception as e_txt_comp_loop:logger.error(f"S{num_of_scene} TextClip compositing error:{e_txt_comp_loop}. Proceeding without text for this scene.",exc_info=True) # Log full error but continue if active_scene_clip: processed_moviepy_clips_list.append(active_scene_clip) logger.info(f"S{num_of_scene}: Asset successfully processed. Clip duration: {active_scene_clip.duration:.2f}s. Added to final list for concatenation.") except Exception as e_asset_loop_main_exc: # Catch errors during the processing of a single asset logger.error(f"MAJOR UNHANDLED ERROR processing asset for S{num_of_scene} (Path: {path_of_asset}): {e_asset_loop_main_exc}", exc_info=True) # Ensure any partially created clip for this iteration is closed if active_scene_clip and hasattr(active_scene_clip,'close'): try: active_scene_clip.close() except Exception as e_close_active_err: logger.warning(f"S{num_of_scene}: Error closing active_scene_clip in error handler: {e_close_active_err}") # Continue to the next asset continue if not processed_moviepy_clips_list: logger.warning("No MoviePy clips were successfully processed. Aborting animatic assembly before concatenation."); return None transition_duration_val=0.75 try: logger.info(f"Concatenating {len(processed_moviepy_clips_list)} processed clips for final animatic."); if len(processed_moviepy_clips_list)>1: final_video_output_clip=concatenate_videoclips(processed_moviepy_clips_list, padding=-transition_duration_val if transition_duration_val > 0 else 0, method="compose") # "compose" is often more robust for mixed content elif processed_moviepy_clips_list: final_video_output_clip=processed_moviepy_clips_list[0] # Single clip, no concatenation needed if not final_video_output_clip: logger.error("Concatenation resulted in a None clip. Aborting."); return None logger.info(f"Concatenated animatic base duration:{final_video_output_clip.duration:.2f}s") # Apply fade effects if duration allows if transition_duration_val > 0 and final_video_output_clip.duration > 0: if final_video_output_clip.duration > transition_duration_val * 2: final_video_output_clip=final_video_output_clip.fx(vfx.fadein,transition_duration_val).fx(vfx.fadeout,transition_duration_val) else: # Shorter clip, just fade in final_video_output_clip=final_video_output_clip.fx(vfx.fadein,min(transition_duration_val,final_video_output_clip.duration/2.0)) logger.debug("Applied fade in/out effects to final composite clip.") # Add overall narration audio if overall_narration_path and os.path.exists(overall_narration_path) and final_video_output_clip.duration > 0: try: narration_audio_clip_mvpy=AudioFileClip(overall_narration_path) logger.info(f"Adding overall narration. Video duration: {final_video_output_clip.duration:.2f}s, Narration duration: {narration_audio_clip_mvpy.duration:.2f}s") # MoviePy will cut the audio if it's longer than the video, or pad with silence if shorter (when using set_audio) final_video_output_clip=final_video_output_clip.set_audio(narration_audio_clip_mvpy) logger.info("Overall narration successfully added to animatic.") except Exception as e_narr_add_final:logger.error(f"Error adding overall narration to animatic:{e_narr_add_final}",exc_info=True) elif final_video_output_clip.duration <= 0: logger.warning("Animatic has zero or negative duration before adding audio. Audio will not be added.") # Write the final video file if final_video_output_clip and final_video_output_clip.duration > 0: final_output_path_str=os.path.join(self.output_dir,output_filename) logger.info(f"Writing final animatic video to: {final_output_path_str} (Target Duration: {final_video_output_clip.duration:.2f}s)") # Ensure threads is at least 1, common os.cpu_count() can be None in some restricted envs num_threads = os.cpu_count() if not isinstance(num_threads, int) or num_threads < 1: num_threads = 2 # Fallback to 2 threads logger.warning(f"os.cpu_count() returned invalid, defaulting to {num_threads} threads for ffmpeg.") final_video_output_clip.write_videofile( final_output_path_str, fps=fps, codec='libx264', # Standard H.264 codec preset='medium', # Good balance of speed and quality. 'ultrafast' for speed, 'slower' for quality. audio_codec='aac', # Standard audio codec temp_audiofile=os.path.join(self.output_dir,f'temp-audio-{os.urandom(4).hex()}.m4a'), # Temporary audio file remove_temp=True, # Clean up temp audio threads=num_threads, logger='bar', # Show progress bar bitrate="5000k", # Decent quality bitrate for 720p ffmpeg_params=["-pix_fmt", "yuv420p"] # Crucial for compatibility and color accuracy ) logger.info(f"Animatic video created successfully: {final_output_path_str}") return final_output_path_str else: logger.error("Final animatic clip is invalid or has zero duration. Cannot write video file."); return None except Exception as e_vid_write_final_op: # Renamed logger.error(f"Error during final animatic video file writing or composition stage: {e_vid_write_final_op}", exc_info=True) return None finally: logger.debug("Closing all MoviePy clips in `assemble_animatic_from_assets` main finally block.") # Consolidate list of all clips that might need closing all_clips_for_closure = processed_moviepy_clips_list[:] # Start with a copy if narration_audio_clip_mvpy: all_clips_for_closure.append(narration_audio_clip_mvpy) if final_video_output_clip: all_clips_for_closure.append(final_video_output_clip) for clip_to_close_item_final in all_clips_for_closure: # Renamed if clip_to_close_item_final and hasattr(clip_to_close_item_final, 'close'): try: clip_to_close_item_final.close() except Exception as e_final_clip_close_op: logger.warning(f"Ignoring error while closing a MoviePy clip ({type(clip_to_close_item_final).__name__}): {e_final_clip_close_op}")