CingenAI / core /visual_engine.py
mgbam's picture
Update core/visual_engine.py
04b7b7e verified
raw
history blame
20.6 kB
# core/visual_engine.py
# ... (all imports and class setup as in the previous "expertly crafted" version) ...
# ... (methods __init__ through generate_narration_audio also as in the previous full version) ...
class VisualEngine:
# ... (previous __init__ and set_api_key methods, _image_to_data_uri, _map_resolution_to_runway_ratio,
# _get_text_dimensions, _create_placeholder_image_content, _search_pexels_image,
# _generate_video_clip_with_runwayml, _create_placeholder_video_content,
# generate_scene_asset, generate_narration_audio - KEEP THESE AS THEY WERE in the last full version) ...
# =========================================================================
# ASSEMBLE ANIMATIC - EXTREME DEBUGGING FOR IMAGE ASSETS
# =========================================================================
def assemble_animatic_from_assets(self, asset_data_list, overall_narration_path=None, output_filename="final_video.mp4", fps=24):
if not asset_data_list: logger.warning("No assets for animatic."); return None
processed_moviepy_clips_list = []; narration_audio_clip_mvpy = None; final_video_output_clip = None
logger.info(f"Assembling from {len(asset_data_list)} assets. Target Frame: {self.video_frame_size}.")
for i_asset, asset_info_item_loop in enumerate(asset_data_list):
path_of_asset, type_of_asset, duration_for_scene = asset_info_item_loop.get('path'), asset_info_item_loop.get('type'), asset_info_item_loop.get('duration', 4.5)
num_of_scene, action_in_key = asset_info_item_loop.get('scene_num', i_asset + 1), asset_info_item_loop.get('key_action', '')
logger.info(f"S{num_of_scene}: Path='{path_of_asset}', Type='{type_of_asset}', Dur='{duration_for_scene}'s")
if not (path_of_asset and os.path.exists(path_of_asset)): logger.warning(f"S{num_of_scene}: Not found '{path_of_asset}'. Skip."); continue
if duration_for_scene <= 0: logger.warning(f"S{num_of_scene}: Invalid duration ({duration_for_scene}s). Skip."); continue
active_scene_clip = None
try:
if type_of_asset == 'image':
logger.info(f"S{num_of_scene}: Processing IMAGE asset: {path_of_asset}")
# 0. Load original image
pil_img_original = Image.open(path_of_asset)
logger.debug(f"S{num_of_scene} (0-Load): Original loaded. Mode:{pil_img_original.mode}, Size:{pil_img_original.size}")
pil_img_original.save(os.path.join(self.output_dir,f"debug_0_ORIGINAL_S{num_of_scene}.png"))
# 1. Convert to RGBA for consistent alpha handling (even if it's already RGB)
img_rgba_intermediate = pil_img_original.convert('RGBA') if pil_img_original.mode != 'RGBA' else pil_img_original.copy().convert('RGBA') # Ensure copy if already RGBA
logger.debug(f"S{num_of_scene} (1-ToRGBA): Converted to RGBA. Mode:{img_rgba_intermediate.mode}, Size:{img_rgba_intermediate.size}")
img_rgba_intermediate.save(os.path.join(self.output_dir,f"debug_1_AS_RGBA_S{num_of_scene}.png"))
# 2. Thumbnail the RGBA image
thumbnailed_img_rgba = img_rgba_intermediate.copy() # Work on a copy for thumbnailing
resample_filter_pil = Image.Resampling.LANCZOS if hasattr(Image.Resampling,'LANCZOS') else Image.BILINEAR
thumbnailed_img_rgba.thumbnail(self.video_frame_size, resample_filter_pil)
logger.debug(f"S{num_of_scene} (2-Thumbnail): Thumbnailed RGBA. Mode:{thumbnailed_img_rgba.mode}, Size:{thumbnailed_img_rgba.size}")
thumbnailed_img_rgba.save(os.path.join(self.output_dir,f"debug_2_THUMBNAIL_RGBA_S{num_of_scene}.png"))
# 3. Create a target-sized RGBA canvas (fully transparent for true alpha blending)
canvas_for_compositing_rgba = Image.new('RGBA', self.video_frame_size, (0,0,0,0))
pos_x_paste = (self.video_frame_size[0] - thumbnailed_img_rgba.width) // 2
pos_y_paste = (self.video_frame_size[1] - thumbnailed_img_rgba.height) // 2
# Paste the (potentially smaller) thumbnailed RGBA image onto the transparent RGBA canvas, using its own alpha
canvas_for_compositing_rgba.paste(thumbnailed_img_rgba, (pos_x_paste, pos_y_paste), thumbnailed_img_rgba)
logger.debug(f"S{num_of_scene} (3-PasteOnRGBA): Image pasted onto transparent RGBA canvas. Mode:{canvas_for_compositing_rgba.mode}, Size:{canvas_for_compositing_rgba.size}")
canvas_for_compositing_rgba.save(os.path.join(self.output_dir,f"debug_3_COMPOSITED_RGBA_S{num_of_scene}.png"))
# 4. Create a final RGB image by pasting the composited RGBA canvas onto an opaque background
# This flattens all transparency and ensures a 3-channel RGB image for MoviePy.
final_rgb_image_for_pil = Image.new("RGB", self.video_frame_size, (5, 5, 15)) # Dark opaque background (e.g., dark blue)
# Paste canvas_for_compositing_rgba using its alpha channel as the mask
if canvas_for_compositing_rgba.mode == 'RGBA':
final_rgb_image_for_pil.paste(canvas_for_compositing_rgba, mask=canvas_for_compositing_rgba.split()[3])
else: # Should not happen if step 1 & 3 are correct, but as a fallback
final_rgb_image_for_pil.paste(canvas_for_compositing_rgba) # Paste without mask if not RGBA
logger.debug(f"S{num_of_scene} (4-ToRGB): Final RGB image created. Mode:{final_rgb_image_for_pil.mode}, Size:{final_rgb_image_for_pil.size}")
# THIS IS THE CRITICAL DEBUG IMAGE - what does it look like?
debug_path_img_pre_numpy = os.path.join(self.output_dir,f"debug_4_PRE_NUMPY_RGB_S{num_of_scene}.png");
final_rgb_image_for_pil.save(debug_path_img_pre_numpy);
logger.info(f"CRITICAL DEBUG: Saved PRE_NUMPY_RGB_S{num_of_scene} (image fed to NumPy) to {debug_path_img_pre_numpy}")
# 5. Convert to C-contiguous NumPy array, dtype uint8
numpy_frame_arr = np.array(final_rgb_image_for_pil, dtype=np.uint8)
if not numpy_frame_arr.flags['C_CONTIGUOUS']:
numpy_frame_arr = np.ascontiguousarray(numpy_frame_arr, dtype=np.uint8) # Ensure C-order
logger.debug(f"S{num_of_scene} (5-NumPy): Ensured NumPy array is C-contiguous.")
logger.debug(f"S{num_of_scene} (5-NumPy): Final NumPy array for MoviePy. Shape:{numpy_frame_arr.shape}, DType:{numpy_frame_arr.dtype}, Flags:{numpy_frame_arr.flags}")
if numpy_frame_arr.size == 0 or numpy_frame_arr.ndim != 3 or numpy_frame_arr.shape[2] != 3:
logger.error(f"S{num_of_scene}: Invalid NumPy array shape/size ({numpy_frame_arr.shape}) for ImageClip. Skipping this asset."); continue
# 6. Create MoviePy ImageClip
base_image_clip_mvpy = ImageClip(numpy_frame_arr, transparent=False, ismask=False).set_duration(duration_for_scene)
logger.debug(f"S{num_of_scene} (6-ImageClip): Base ImageClip created. Duration: {base_image_clip_mvpy.duration}")
# 7. DEBUG: Save a frame directly FROM the MoviePy ImageClip object
debug_path_moviepy_frame = os.path.join(self.output_dir,f"debug_7_MOVIEPY_FRAME_S{num_of_scene}.png")
try:
base_image_clip_mvpy.save_frame(debug_path_moviepy_frame, t=min(0.1, base_image_clip_mvpy.duration / 2 if base_image_clip_mvpy.duration > 0 else 0.1)) # Save frame at 0.1s or mid-point
logger.info(f"CRITICAL DEBUG: Saved frame FROM MOVIEPY ImageClip for S{num_of_scene} to {debug_path_moviepy_frame}")
except Exception as e_save_mvpy_frame:
logger.error(f"DEBUG: Error saving frame FROM MOVIEPY ImageClip for S{num_of_scene}: {e_save_mvpy_frame}", exc_info=True)
# 8. Apply Ken Burns effect (optional, can be commented out for further isolation)
fx_image_clip_mvpy = base_image_clip_mvpy
try:
scale_end_kb_val = random.uniform(1.03, 1.08)
if duration_for_scene > 0: # Avoid division by zero
fx_image_clip_mvpy = base_image_clip_mvpy.fx(vfx.resize, lambda t_val: 1 + (scale_end_kb_val - 1) * (t_val / duration_for_scene)).set_position('center')
logger.debug(f"S{num_of_scene} (8-KenBurns): Ken Burns effect applied.")
else:
logger.warning(f"S{num_of_scene}: Duration is zero, skipping Ken Burns.")
except Exception as e_kb_fx_loop: logger.error(f"S{num_of_scene} Ken Burns effect error: {e_kb_fx_loop}", exc_info=False) # exc_info=False for brevity
active_scene_clip = fx_image_clip_mvpy
elif type_of_asset == 'video':
# ... (Video processing logic from the previous full, corrected version) ...
# Ensure this part also handles clip closing diligently.
source_video_clip_obj=None
try:
logger.debug(f"S{num_of_scene}: Loading VIDEO asset: {path_of_asset}")
source_video_clip_obj=VideoFileClip(path_of_asset,target_resolution=(self.video_frame_size[1],self.video_frame_size[0])if self.video_frame_size else None, audio=False)
temp_video_clip_obj_loop=source_video_clip_obj
if source_video_clip_obj.duration!=duration_for_scene:
if source_video_clip_obj.duration>duration_for_scene:temp_video_clip_obj_loop=source_video_clip_obj.subclip(0,duration_for_scene)
else:
if duration_for_scene/source_video_clip_obj.duration > 1.5 and source_video_clip_obj.duration>0.1:temp_video_clip_obj_loop=source_video_clip_obj.loop(duration=duration_for_scene)
else:temp_video_clip_obj_loop=source_video_clip_obj.set_duration(source_video_clip_obj.duration);logger.info(f"S{num_of_scene} Video clip ({source_video_clip_obj.duration:.2f}s) shorter than target ({duration_for_scene:.2f}s).")
active_scene_clip=temp_video_clip_obj_loop.set_duration(duration_for_scene) # Ensure final clip has target duration
if active_scene_clip.size!=list(self.video_frame_size):active_scene_clip=active_scene_clip.resize(self.video_frame_size)
logger.debug(f"S{num_of_scene}: Video asset processed. Final duration for scene: {active_scene_clip.duration:.2f}s")
except Exception as e_vid_load_loop:logger.error(f"S{num_of_scene} Video load error '{path_of_asset}':{e_vid_load_loop}",exc_info=True);continue # Skip this broken video asset
finally: # Close the original source_video_clip_obj if it's different from active_scene_clip
if source_video_clip_obj and source_video_clip_obj is not active_scene_clip and hasattr(source_video_clip_obj,'close'):
try: source_video_clip_obj.close()
except Exception as e_close_src_vid: logger.warning(f"S{num_of_scene}: Error closing source VideoFileClip: {e_close_src_vid}")
else:
logger.warning(f"S{num_of_scene} Unknown asset type '{type_of_asset}'. Skipping."); continue
# Add text overlay (common to both image and video assets)
if active_scene_clip and action_in_key:
try:
dur_text_overlay_val=min(active_scene_clip.duration-0.5,active_scene_clip.duration*0.8)if active_scene_clip.duration>0.5 else active_scene_clip.duration
start_text_overlay_val=0.25 # Start text a bit into the clip
if dur_text_overlay_val > 0:
text_clip_for_overlay_obj=TextClip(f"Scene {num_of_scene}\n{action_in_key}",fontsize=self.VIDEO_OVERLAY_FONT_SIZE,color=self.VIDEO_OVERLAY_FONT_COLOR,font=self.active_moviepy_font_name,bg_color='rgba(10,10,20,0.7)',method='caption',align='West',size=(self.video_frame_size[0]*0.9,None),kerning=-1,stroke_color='black',stroke_width=1.5).set_duration(dur_text_overlay_val).set_start(start_text_overlay_val).set_position(('center',0.92),relative=True)
active_scene_clip=CompositeVideoClip([active_scene_clip,text_clip_for_overlay_obj],size=self.video_frame_size,use_bgclip=True) # Ensure use_bgclip=True
logger.debug(f"S{num_of_scene}: Text overlay composited.")
else:
logger.warning(f"S{num_of_scene}: Text overlay duration is zero or negative ({dur_text_overlay_val}). Skipping text overlay.")
except Exception as e_txt_comp_loop:logger.error(f"S{num_of_scene} TextClip compositing error:{e_txt_comp_loop}. Proceeding without text for this scene.",exc_info=True) # Log full error but continue
if active_scene_clip:
processed_moviepy_clips_list.append(active_scene_clip)
logger.info(f"S{num_of_scene}: Asset successfully processed. Clip duration: {active_scene_clip.duration:.2f}s. Added to final list for concatenation.")
except Exception as e_asset_loop_main_exc: # Catch errors during the processing of a single asset
logger.error(f"MAJOR UNHANDLED ERROR processing asset for S{num_of_scene} (Path: {path_of_asset}): {e_asset_loop_main_exc}", exc_info=True)
# Ensure any partially created clip for this iteration is closed
if active_scene_clip and hasattr(active_scene_clip,'close'):
try: active_scene_clip.close()
except Exception as e_close_active_err: logger.warning(f"S{num_of_scene}: Error closing active_scene_clip in error handler: {e_close_active_err}")
# Continue to the next asset
continue
if not processed_moviepy_clips_list:
logger.warning("No MoviePy clips were successfully processed. Aborting animatic assembly before concatenation."); return None
transition_duration_val=0.75
try:
logger.info(f"Concatenating {len(processed_moviepy_clips_list)} processed clips for final animatic.");
if len(processed_moviepy_clips_list)>1:
final_video_output_clip=concatenate_videoclips(processed_moviepy_clips_list,
padding=-transition_duration_val if transition_duration_val > 0 else 0,
method="compose") # "compose" is often more robust for mixed content
elif processed_moviepy_clips_list:
final_video_output_clip=processed_moviepy_clips_list[0] # Single clip, no concatenation needed
if not final_video_output_clip: logger.error("Concatenation resulted in a None clip. Aborting."); return None
logger.info(f"Concatenated animatic base duration:{final_video_output_clip.duration:.2f}s")
# Apply fade effects if duration allows
if transition_duration_val > 0 and final_video_output_clip.duration > 0:
if final_video_output_clip.duration > transition_duration_val * 2:
final_video_output_clip=final_video_output_clip.fx(vfx.fadein,transition_duration_val).fx(vfx.fadeout,transition_duration_val)
else: # Shorter clip, just fade in
final_video_output_clip=final_video_output_clip.fx(vfx.fadein,min(transition_duration_val,final_video_output_clip.duration/2.0))
logger.debug("Applied fade in/out effects to final composite clip.")
# Add overall narration audio
if overall_narration_path and os.path.exists(overall_narration_path) and final_video_output_clip.duration > 0:
try:
narration_audio_clip_mvpy=AudioFileClip(overall_narration_path)
logger.info(f"Adding overall narration. Video duration: {final_video_output_clip.duration:.2f}s, Narration duration: {narration_audio_clip_mvpy.duration:.2f}s")
# MoviePy will cut the audio if it's longer than the video, or pad with silence if shorter (when using set_audio)
final_video_output_clip=final_video_output_clip.set_audio(narration_audio_clip_mvpy)
logger.info("Overall narration successfully added to animatic.")
except Exception as e_narr_add_final:logger.error(f"Error adding overall narration to animatic:{e_narr_add_final}",exc_info=True)
elif final_video_output_clip.duration <= 0:
logger.warning("Animatic has zero or negative duration before adding audio. Audio will not be added.")
# Write the final video file
if final_video_output_clip and final_video_output_clip.duration > 0:
final_output_path_str=os.path.join(self.output_dir,output_filename)
logger.info(f"Writing final animatic video to: {final_output_path_str} (Target Duration: {final_video_output_clip.duration:.2f}s)")
# Ensure threads is at least 1, common os.cpu_count() can be None in some restricted envs
num_threads = os.cpu_count()
if not isinstance(num_threads, int) or num_threads < 1:
num_threads = 2 # Fallback to 2 threads
logger.warning(f"os.cpu_count() returned invalid, defaulting to {num_threads} threads for ffmpeg.")
final_video_output_clip.write_videofile(
final_output_path_str,
fps=fps,
codec='libx264', # Standard H.264 codec
preset='medium', # Good balance of speed and quality. 'ultrafast' for speed, 'slower' for quality.
audio_codec='aac', # Standard audio codec
temp_audiofile=os.path.join(self.output_dir,f'temp-audio-{os.urandom(4).hex()}.m4a'), # Temporary audio file
remove_temp=True, # Clean up temp audio
threads=num_threads,
logger='bar', # Show progress bar
bitrate="5000k", # Decent quality bitrate for 720p
ffmpeg_params=["-pix_fmt", "yuv420p"] # Crucial for compatibility and color accuracy
)
logger.info(f"Animatic video created successfully: {final_output_path_str}")
return final_output_path_str
else:
logger.error("Final animatic clip is invalid or has zero duration. Cannot write video file."); return None
except Exception as e_vid_write_final_op: # Renamed
logger.error(f"Error during final animatic video file writing or composition stage: {e_vid_write_final_op}", exc_info=True)
return None
finally:
logger.debug("Closing all MoviePy clips in `assemble_animatic_from_assets` main finally block.")
# Consolidate list of all clips that might need closing
all_clips_for_closure = processed_moviepy_clips_list[:] # Start with a copy
if narration_audio_clip_mvpy: all_clips_for_closure.append(narration_audio_clip_mvpy)
if final_video_output_clip: all_clips_for_closure.append(final_video_output_clip)
for clip_to_close_item_final in all_clips_for_closure: # Renamed
if clip_to_close_item_final and hasattr(clip_to_close_item_final, 'close'):
try: clip_to_close_item_final.close()
except Exception as e_final_clip_close_op: logger.warning(f"Ignoring error while closing a MoviePy clip ({type(clip_to_close_item_final).__name__}): {e_final_clip_close_op}")