|
|
|
|
|
|
|
|
|
class VisualEngine: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def assemble_animatic_from_assets(self, asset_data_list, overall_narration_path=None, output_filename="final_video.mp4", fps=24): |
|
if not asset_data_list: logger.warning("No assets for animatic."); return None |
|
processed_moviepy_clips_list = []; narration_audio_clip_mvpy = None; final_video_output_clip = None |
|
logger.info(f"Assembling from {len(asset_data_list)} assets. Target Frame: {self.video_frame_size}.") |
|
|
|
for i_asset, asset_info_item_loop in enumerate(asset_data_list): |
|
path_of_asset, type_of_asset, duration_for_scene = asset_info_item_loop.get('path'), asset_info_item_loop.get('type'), asset_info_item_loop.get('duration', 4.5) |
|
num_of_scene, action_in_key = asset_info_item_loop.get('scene_num', i_asset + 1), asset_info_item_loop.get('key_action', '') |
|
logger.info(f"S{num_of_scene}: Path='{path_of_asset}', Type='{type_of_asset}', Dur='{duration_for_scene}'s") |
|
|
|
if not (path_of_asset and os.path.exists(path_of_asset)): logger.warning(f"S{num_of_scene}: Not found '{path_of_asset}'. Skip."); continue |
|
if duration_for_scene <= 0: logger.warning(f"S{num_of_scene}: Invalid duration ({duration_for_scene}s). Skip."); continue |
|
|
|
active_scene_clip = None |
|
try: |
|
if type_of_asset == 'image': |
|
logger.info(f"S{num_of_scene}: Processing IMAGE asset: {path_of_asset}") |
|
|
|
pil_img_original = Image.open(path_of_asset) |
|
logger.debug(f"S{num_of_scene} (0-Load): Original loaded. Mode:{pil_img_original.mode}, Size:{pil_img_original.size}") |
|
pil_img_original.save(os.path.join(self.output_dir,f"debug_0_ORIGINAL_S{num_of_scene}.png")) |
|
|
|
|
|
|
|
img_rgba_intermediate = pil_img_original.convert('RGBA') if pil_img_original.mode != 'RGBA' else pil_img_original.copy().convert('RGBA') |
|
logger.debug(f"S{num_of_scene} (1-ToRGBA): Converted to RGBA. Mode:{img_rgba_intermediate.mode}, Size:{img_rgba_intermediate.size}") |
|
img_rgba_intermediate.save(os.path.join(self.output_dir,f"debug_1_AS_RGBA_S{num_of_scene}.png")) |
|
|
|
|
|
thumbnailed_img_rgba = img_rgba_intermediate.copy() |
|
resample_filter_pil = Image.Resampling.LANCZOS if hasattr(Image.Resampling,'LANCZOS') else Image.BILINEAR |
|
thumbnailed_img_rgba.thumbnail(self.video_frame_size, resample_filter_pil) |
|
logger.debug(f"S{num_of_scene} (2-Thumbnail): Thumbnailed RGBA. Mode:{thumbnailed_img_rgba.mode}, Size:{thumbnailed_img_rgba.size}") |
|
thumbnailed_img_rgba.save(os.path.join(self.output_dir,f"debug_2_THUMBNAIL_RGBA_S{num_of_scene}.png")) |
|
|
|
|
|
canvas_for_compositing_rgba = Image.new('RGBA', self.video_frame_size, (0,0,0,0)) |
|
pos_x_paste = (self.video_frame_size[0] - thumbnailed_img_rgba.width) // 2 |
|
pos_y_paste = (self.video_frame_size[1] - thumbnailed_img_rgba.height) // 2 |
|
|
|
canvas_for_compositing_rgba.paste(thumbnailed_img_rgba, (pos_x_paste, pos_y_paste), thumbnailed_img_rgba) |
|
logger.debug(f"S{num_of_scene} (3-PasteOnRGBA): Image pasted onto transparent RGBA canvas. Mode:{canvas_for_compositing_rgba.mode}, Size:{canvas_for_compositing_rgba.size}") |
|
canvas_for_compositing_rgba.save(os.path.join(self.output_dir,f"debug_3_COMPOSITED_RGBA_S{num_of_scene}.png")) |
|
|
|
|
|
|
|
final_rgb_image_for_pil = Image.new("RGB", self.video_frame_size, (5, 5, 15)) |
|
|
|
if canvas_for_compositing_rgba.mode == 'RGBA': |
|
final_rgb_image_for_pil.paste(canvas_for_compositing_rgba, mask=canvas_for_compositing_rgba.split()[3]) |
|
else: |
|
final_rgb_image_for_pil.paste(canvas_for_compositing_rgba) |
|
logger.debug(f"S{num_of_scene} (4-ToRGB): Final RGB image created. Mode:{final_rgb_image_for_pil.mode}, Size:{final_rgb_image_for_pil.size}") |
|
|
|
|
|
debug_path_img_pre_numpy = os.path.join(self.output_dir,f"debug_4_PRE_NUMPY_RGB_S{num_of_scene}.png"); |
|
final_rgb_image_for_pil.save(debug_path_img_pre_numpy); |
|
logger.info(f"CRITICAL DEBUG: Saved PRE_NUMPY_RGB_S{num_of_scene} (image fed to NumPy) to {debug_path_img_pre_numpy}") |
|
|
|
|
|
numpy_frame_arr = np.array(final_rgb_image_for_pil, dtype=np.uint8) |
|
if not numpy_frame_arr.flags['C_CONTIGUOUS']: |
|
numpy_frame_arr = np.ascontiguousarray(numpy_frame_arr, dtype=np.uint8) |
|
logger.debug(f"S{num_of_scene} (5-NumPy): Ensured NumPy array is C-contiguous.") |
|
|
|
logger.debug(f"S{num_of_scene} (5-NumPy): Final NumPy array for MoviePy. Shape:{numpy_frame_arr.shape}, DType:{numpy_frame_arr.dtype}, Flags:{numpy_frame_arr.flags}") |
|
|
|
if numpy_frame_arr.size == 0 or numpy_frame_arr.ndim != 3 or numpy_frame_arr.shape[2] != 3: |
|
logger.error(f"S{num_of_scene}: Invalid NumPy array shape/size ({numpy_frame_arr.shape}) for ImageClip. Skipping this asset."); continue |
|
|
|
|
|
base_image_clip_mvpy = ImageClip(numpy_frame_arr, transparent=False, ismask=False).set_duration(duration_for_scene) |
|
logger.debug(f"S{num_of_scene} (6-ImageClip): Base ImageClip created. Duration: {base_image_clip_mvpy.duration}") |
|
|
|
|
|
debug_path_moviepy_frame = os.path.join(self.output_dir,f"debug_7_MOVIEPY_FRAME_S{num_of_scene}.png") |
|
try: |
|
base_image_clip_mvpy.save_frame(debug_path_moviepy_frame, t=min(0.1, base_image_clip_mvpy.duration / 2 if base_image_clip_mvpy.duration > 0 else 0.1)) |
|
logger.info(f"CRITICAL DEBUG: Saved frame FROM MOVIEPY ImageClip for S{num_of_scene} to {debug_path_moviepy_frame}") |
|
except Exception as e_save_mvpy_frame: |
|
logger.error(f"DEBUG: Error saving frame FROM MOVIEPY ImageClip for S{num_of_scene}: {e_save_mvpy_frame}", exc_info=True) |
|
|
|
|
|
fx_image_clip_mvpy = base_image_clip_mvpy |
|
try: |
|
scale_end_kb_val = random.uniform(1.03, 1.08) |
|
if duration_for_scene > 0: |
|
fx_image_clip_mvpy = base_image_clip_mvpy.fx(vfx.resize, lambda t_val: 1 + (scale_end_kb_val - 1) * (t_val / duration_for_scene)).set_position('center') |
|
logger.debug(f"S{num_of_scene} (8-KenBurns): Ken Burns effect applied.") |
|
else: |
|
logger.warning(f"S{num_of_scene}: Duration is zero, skipping Ken Burns.") |
|
except Exception as e_kb_fx_loop: logger.error(f"S{num_of_scene} Ken Burns effect error: {e_kb_fx_loop}", exc_info=False) |
|
|
|
active_scene_clip = fx_image_clip_mvpy |
|
|
|
elif type_of_asset == 'video': |
|
|
|
|
|
source_video_clip_obj=None |
|
try: |
|
logger.debug(f"S{num_of_scene}: Loading VIDEO asset: {path_of_asset}") |
|
source_video_clip_obj=VideoFileClip(path_of_asset,target_resolution=(self.video_frame_size[1],self.video_frame_size[0])if self.video_frame_size else None, audio=False) |
|
temp_video_clip_obj_loop=source_video_clip_obj |
|
if source_video_clip_obj.duration!=duration_for_scene: |
|
if source_video_clip_obj.duration>duration_for_scene:temp_video_clip_obj_loop=source_video_clip_obj.subclip(0,duration_for_scene) |
|
else: |
|
if duration_for_scene/source_video_clip_obj.duration > 1.5 and source_video_clip_obj.duration>0.1:temp_video_clip_obj_loop=source_video_clip_obj.loop(duration=duration_for_scene) |
|
else:temp_video_clip_obj_loop=source_video_clip_obj.set_duration(source_video_clip_obj.duration);logger.info(f"S{num_of_scene} Video clip ({source_video_clip_obj.duration:.2f}s) shorter than target ({duration_for_scene:.2f}s).") |
|
active_scene_clip=temp_video_clip_obj_loop.set_duration(duration_for_scene) |
|
if active_scene_clip.size!=list(self.video_frame_size):active_scene_clip=active_scene_clip.resize(self.video_frame_size) |
|
logger.debug(f"S{num_of_scene}: Video asset processed. Final duration for scene: {active_scene_clip.duration:.2f}s") |
|
except Exception as e_vid_load_loop:logger.error(f"S{num_of_scene} Video load error '{path_of_asset}':{e_vid_load_loop}",exc_info=True);continue |
|
finally: |
|
if source_video_clip_obj and source_video_clip_obj is not active_scene_clip and hasattr(source_video_clip_obj,'close'): |
|
try: source_video_clip_obj.close() |
|
except Exception as e_close_src_vid: logger.warning(f"S{num_of_scene}: Error closing source VideoFileClip: {e_close_src_vid}") |
|
else: |
|
logger.warning(f"S{num_of_scene} Unknown asset type '{type_of_asset}'. Skipping."); continue |
|
|
|
|
|
if active_scene_clip and action_in_key: |
|
try: |
|
dur_text_overlay_val=min(active_scene_clip.duration-0.5,active_scene_clip.duration*0.8)if active_scene_clip.duration>0.5 else active_scene_clip.duration |
|
start_text_overlay_val=0.25 |
|
if dur_text_overlay_val > 0: |
|
text_clip_for_overlay_obj=TextClip(f"Scene {num_of_scene}\n{action_in_key}",fontsize=self.VIDEO_OVERLAY_FONT_SIZE,color=self.VIDEO_OVERLAY_FONT_COLOR,font=self.active_moviepy_font_name,bg_color='rgba(10,10,20,0.7)',method='caption',align='West',size=(self.video_frame_size[0]*0.9,None),kerning=-1,stroke_color='black',stroke_width=1.5).set_duration(dur_text_overlay_val).set_start(start_text_overlay_val).set_position(('center',0.92),relative=True) |
|
active_scene_clip=CompositeVideoClip([active_scene_clip,text_clip_for_overlay_obj],size=self.video_frame_size,use_bgclip=True) |
|
logger.debug(f"S{num_of_scene}: Text overlay composited.") |
|
else: |
|
logger.warning(f"S{num_of_scene}: Text overlay duration is zero or negative ({dur_text_overlay_val}). Skipping text overlay.") |
|
except Exception as e_txt_comp_loop:logger.error(f"S{num_of_scene} TextClip compositing error:{e_txt_comp_loop}. Proceeding without text for this scene.",exc_info=True) |
|
|
|
if active_scene_clip: |
|
processed_moviepy_clips_list.append(active_scene_clip) |
|
logger.info(f"S{num_of_scene}: Asset successfully processed. Clip duration: {active_scene_clip.duration:.2f}s. Added to final list for concatenation.") |
|
|
|
except Exception as e_asset_loop_main_exc: |
|
logger.error(f"MAJOR UNHANDLED ERROR processing asset for S{num_of_scene} (Path: {path_of_asset}): {e_asset_loop_main_exc}", exc_info=True) |
|
|
|
if active_scene_clip and hasattr(active_scene_clip,'close'): |
|
try: active_scene_clip.close() |
|
except Exception as e_close_active_err: logger.warning(f"S{num_of_scene}: Error closing active_scene_clip in error handler: {e_close_active_err}") |
|
|
|
continue |
|
|
|
if not processed_moviepy_clips_list: |
|
logger.warning("No MoviePy clips were successfully processed. Aborting animatic assembly before concatenation."); return None |
|
|
|
transition_duration_val=0.75 |
|
try: |
|
logger.info(f"Concatenating {len(processed_moviepy_clips_list)} processed clips for final animatic."); |
|
if len(processed_moviepy_clips_list)>1: |
|
final_video_output_clip=concatenate_videoclips(processed_moviepy_clips_list, |
|
padding=-transition_duration_val if transition_duration_val > 0 else 0, |
|
method="compose") |
|
elif processed_moviepy_clips_list: |
|
final_video_output_clip=processed_moviepy_clips_list[0] |
|
|
|
if not final_video_output_clip: logger.error("Concatenation resulted in a None clip. Aborting."); return None |
|
logger.info(f"Concatenated animatic base duration:{final_video_output_clip.duration:.2f}s") |
|
|
|
|
|
if transition_duration_val > 0 and final_video_output_clip.duration > 0: |
|
if final_video_output_clip.duration > transition_duration_val * 2: |
|
final_video_output_clip=final_video_output_clip.fx(vfx.fadein,transition_duration_val).fx(vfx.fadeout,transition_duration_val) |
|
else: |
|
final_video_output_clip=final_video_output_clip.fx(vfx.fadein,min(transition_duration_val,final_video_output_clip.duration/2.0)) |
|
logger.debug("Applied fade in/out effects to final composite clip.") |
|
|
|
|
|
if overall_narration_path and os.path.exists(overall_narration_path) and final_video_output_clip.duration > 0: |
|
try: |
|
narration_audio_clip_mvpy=AudioFileClip(overall_narration_path) |
|
logger.info(f"Adding overall narration. Video duration: {final_video_output_clip.duration:.2f}s, Narration duration: {narration_audio_clip_mvpy.duration:.2f}s") |
|
|
|
final_video_output_clip=final_video_output_clip.set_audio(narration_audio_clip_mvpy) |
|
logger.info("Overall narration successfully added to animatic.") |
|
except Exception as e_narr_add_final:logger.error(f"Error adding overall narration to animatic:{e_narr_add_final}",exc_info=True) |
|
elif final_video_output_clip.duration <= 0: |
|
logger.warning("Animatic has zero or negative duration before adding audio. Audio will not be added.") |
|
|
|
|
|
if final_video_output_clip and final_video_output_clip.duration > 0: |
|
final_output_path_str=os.path.join(self.output_dir,output_filename) |
|
logger.info(f"Writing final animatic video to: {final_output_path_str} (Target Duration: {final_video_output_clip.duration:.2f}s)") |
|
|
|
|
|
num_threads = os.cpu_count() |
|
if not isinstance(num_threads, int) or num_threads < 1: |
|
num_threads = 2 |
|
logger.warning(f"os.cpu_count() returned invalid, defaulting to {num_threads} threads for ffmpeg.") |
|
|
|
|
|
final_video_output_clip.write_videofile( |
|
final_output_path_str, |
|
fps=fps, |
|
codec='libx264', |
|
preset='medium', |
|
audio_codec='aac', |
|
temp_audiofile=os.path.join(self.output_dir,f'temp-audio-{os.urandom(4).hex()}.m4a'), |
|
remove_temp=True, |
|
threads=num_threads, |
|
logger='bar', |
|
bitrate="5000k", |
|
ffmpeg_params=["-pix_fmt", "yuv420p"] |
|
) |
|
logger.info(f"Animatic video created successfully: {final_output_path_str}") |
|
return final_output_path_str |
|
else: |
|
logger.error("Final animatic clip is invalid or has zero duration. Cannot write video file."); return None |
|
except Exception as e_vid_write_final_op: |
|
logger.error(f"Error during final animatic video file writing or composition stage: {e_vid_write_final_op}", exc_info=True) |
|
return None |
|
finally: |
|
logger.debug("Closing all MoviePy clips in `assemble_animatic_from_assets` main finally block.") |
|
|
|
all_clips_for_closure = processed_moviepy_clips_list[:] |
|
if narration_audio_clip_mvpy: all_clips_for_closure.append(narration_audio_clip_mvpy) |
|
if final_video_output_clip: all_clips_for_closure.append(final_video_output_clip) |
|
|
|
for clip_to_close_item_final in all_clips_for_closure: |
|
if clip_to_close_item_final and hasattr(clip_to_close_item_final, 'close'): |
|
try: clip_to_close_item_final.close() |
|
except Exception as e_final_clip_close_op: logger.warning(f"Ignoring error while closing a MoviePy clip ({type(clip_to_close_item_final).__name__}): {e_final_clip_close_op}") |