Spaces:
Sleeping
Sleeping
import moviepy.editor as mp | |
import numpy as np | |
import os | |
from config.settings import PATHS, PROCESSING_CONFIG | |
class VideoProcessor: | |
def __init__(self): | |
pass | |
def merge_videos(self, video1_path, video2_path=None, target_duration=20): | |
try: | |
clips = [] | |
total_duration = 0 | |
clip1 = mp.VideoFileClip(video1_path) | |
clips.append(clip1) | |
total_duration += clip1.duration | |
if video2_path and os.path.exists(video2_path): | |
clip2 = mp.VideoFileClip(video2_path) | |
clips.append(clip2.crossfadein(0.5)) | |
total_duration += clip2.duration | |
if len(clips) > 1: | |
merged = mp.concatenate_videoclips(clips, method="compose") | |
else: | |
merged = clips[0] | |
if total_duration < target_duration: | |
extend_time = target_duration - total_duration | |
if extend_time > 0: | |
loop_times = int(extend_time / merged.duration) + 1 | |
extended = mp.concatenate_videoclips([merged] * (loop_times + 1)) | |
merged = extended.subclip(0, target_duration) | |
elif total_duration > target_duration: | |
merged = merged.subclip(0, target_duration) | |
merged = self._apply_effects(merged) | |
output_path = os.path.join(PATHS['temp_dir'], f"merged_{hash(video1_path)}.mp4") | |
merged.write_videofile( | |
output_path, | |
fps=PROCESSING_CONFIG['fps'], | |
codec='libx264', | |
audio_codec='aac', | |
temp_audiofile=os.path.join(PATHS['temp_dir'], 'temp_audio.m4a'), | |
remove_temp=True, | |
logger=None, | |
verbose=False | |
) | |
for clip in clips: | |
clip.close() | |
merged.close() | |
return output_path | |
except Exception as e: | |
print(f"Video merge failed: {e}") | |
return None | |
def add_ai_extension(self, video_path, ai_frames, target_duration): | |
if not ai_frames or not video_path: | |
return video_path | |
try: | |
main_clip = mp.VideoFileClip(video_path) | |
frame_array = np.array(ai_frames) | |
ai_clip = mp.ImageSequenceClip([frame for frame in frame_array], fps=PROCESSING_CONFIG['fps']) | |
ai_clip = ai_clip.resize(main_clip.size) | |
combined = mp.concatenate_videoclips([main_clip, ai_clip.crossfadein(0.3)]) | |
if combined.duration > target_duration: | |
combined = combined.subclip(0, target_duration) | |
output_path = os.path.join(PATHS['temp_dir'], f"extended_{hash(video_path)}.mp4") | |
combined.write_videofile( | |
output_path, | |
fps=PROCESSING_CONFIG['fps'], | |
codec='libx264', | |
temp_audiofile=os.path.join(PATHS['temp_dir'], 'temp_audio2.m4a'), | |
remove_temp=True, | |
logger=None, | |
verbose=False | |
) | |
main_clip.close() | |
ai_clip.close() | |
combined.close() | |
return output_path | |
except Exception as e: | |
print(f"AI extension failed: {e}") | |
return video_path | |
def _apply_effects(self, clip): | |
try: | |
def color_correct(get_frame, t): | |
frame = get_frame(t) | |
enhanced = np.clip(frame * 1.05 + 2, 0, 255) | |
enhanced[:, :, 0] = np.clip(enhanced[:, :, 0] * 1.02, 0, 255) | |
return enhanced.astype(np.uint8) | |
return clip.fl(color_correct) | |
except: | |
return clip | |