| 
							 | 
						import os | 
					
					
						
						| 
							 | 
						import hashlib | 
					
					
						
						| 
							 | 
						import shutil | 
					
					
						
						| 
							 | 
						from pathlib import Path | 
					
					
						
						| 
							 | 
						import asyncio | 
					
					
						
						| 
							 | 
						import tempfile | 
					
					
						
						| 
							 | 
						import logging | 
					
					
						
						| 
							 | 
						from functools import partial | 
					
					
						
						| 
							 | 
						from typing import Dict, List, Optional, Tuple | 
					
					
						
						| 
							 | 
						import gradio as gr | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						from scenedetect import detect, ContentDetector, SceneManager, open_video | 
					
					
						
						| 
							 | 
						from scenedetect.video_splitter import split_video_ffmpeg | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						from ..config import TRAINING_PATH, STORAGE_PATH, TRAINING_VIDEOS_PATH, VIDEOS_TO_SPLIT_PATH, STAGING_PATH, DEFAULT_PROMPT_PREFIX | 
					
					
						
						| 
							 | 
						from ..utils import remove_black_bars, extract_scene_info, is_video_file, is_image_file, add_prefix_to_caption | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						logger = logging.getLogger(__name__) | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						class SplittingService: | 
					
					
						
						| 
							 | 
						    def __init__(self): | 
					
					
						
						| 
							 | 
						         | 
					
					
						
						| 
							 | 
						        self.processing = False | 
					
					
						
						| 
							 | 
						        self._current_file: Optional[str] = None | 
					
					
						
						| 
							 | 
						        self._scene_counts: Dict[str, int] = {} | 
					
					
						
						| 
							 | 
						        self._processing_status: Dict[str, str] = {} | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						    def compute_file_hash(self, file_path: Path) -> str: | 
					
					
						
						| 
							 | 
						        """Compute SHA-256 hash of file""" | 
					
					
						
						| 
							 | 
						        sha256_hash = hashlib.sha256() | 
					
					
						
						| 
							 | 
						        with open(file_path, "rb") as f: | 
					
					
						
						| 
							 | 
						             | 
					
					
						
						| 
							 | 
						            for byte_block in iter(lambda: f.read(4096), b""): | 
					
					
						
						| 
							 | 
						                sha256_hash.update(byte_block) | 
					
					
						
						| 
							 | 
						        return sha256_hash.hexdigest() | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						    def rename_with_hash(self, video_path: Path) -> Tuple[Path, str]: | 
					
					
						
						| 
							 | 
						        """Rename video and caption files using hash | 
					
					
						
						| 
							 | 
						         | 
					
					
						
						| 
							 | 
						        Args: | 
					
					
						
						| 
							 | 
						            video_path: Path to video file | 
					
					
						
						| 
							 | 
						             | 
					
					
						
						| 
							 | 
						        Returns: | 
					
					
						
						| 
							 | 
						            Tuple of (new video path, hash) | 
					
					
						
						| 
							 | 
						        """ | 
					
					
						
						| 
							 | 
						         | 
					
					
						
						| 
							 | 
						        file_hash = self.compute_file_hash(video_path) | 
					
					
						
						| 
							 | 
						         | 
					
					
						
						| 
							 | 
						         | 
					
					
						
						| 
							 | 
						        new_video_path = video_path.parent / f"{file_hash}{video_path.suffix}" | 
					
					
						
						| 
							 | 
						        video_path.rename(new_video_path) | 
					
					
						
						| 
							 | 
						         | 
					
					
						
						| 
							 | 
						         | 
					
					
						
						| 
							 | 
						        caption_path = video_path.with_suffix('.txt') | 
					
					
						
						| 
							 | 
						        if caption_path.exists(): | 
					
					
						
						| 
							 | 
						            new_caption_path = caption_path.parent / f"{file_hash}.txt" | 
					
					
						
						| 
							 | 
						            caption_path.rename(new_caption_path) | 
					
					
						
						| 
							 | 
						             | 
					
					
						
						| 
							 | 
						        return new_video_path, file_hash | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						    async def process_video(self, video_path: Path, enable_splitting: bool) -> int: | 
					
					
						
						| 
							 | 
						        """Process a single video file to detect and split scenes""" | 
					
					
						
						| 
							 | 
						        try: | 
					
					
						
						| 
							 | 
						            self._processing_status[video_path.name] = f'Processing video "{video_path.name}"...' | 
					
					
						
						| 
							 | 
						             | 
					
					
						
						| 
							 | 
						            parent_caption_path = video_path.with_suffix('.txt') | 
					
					
						
						| 
							 | 
						             | 
					
					
						
						| 
							 | 
						            base_name, _ = extract_scene_info(video_path.name) | 
					
					
						
						| 
							 | 
						             | 
					
					
						
						| 
							 | 
						            with tempfile.TemporaryDirectory() as temp_dir: | 
					
					
						
						| 
							 | 
						                temp_path = Path(temp_dir) / f"preprocessed_{video_path.name}" | 
					
					
						
						| 
							 | 
						                 | 
					
					
						
						| 
							 | 
						                 | 
					
					
						
						| 
							 | 
						                was_cropped = await asyncio.get_event_loop().run_in_executor( | 
					
					
						
						| 
							 | 
						                    None, | 
					
					
						
						| 
							 | 
						                    remove_black_bars, | 
					
					
						
						| 
							 | 
						                    video_path, | 
					
					
						
						| 
							 | 
						                    temp_path | 
					
					
						
						| 
							 | 
						                ) | 
					
					
						
						| 
							 | 
						                 | 
					
					
						
						| 
							 | 
						                 | 
					
					
						
						| 
							 | 
						                process_path = temp_path if was_cropped else video_path | 
					
					
						
						| 
							 | 
						                 | 
					
					
						
						| 
							 | 
						                 | 
					
					
						
						| 
							 | 
						                if enable_splitting: | 
					
					
						
						| 
							 | 
						                    video = open_video(str(process_path)) | 
					
					
						
						| 
							 | 
						                    scene_manager = SceneManager() | 
					
					
						
						| 
							 | 
						                    scene_manager.add_detector(ContentDetector()) | 
					
					
						
						| 
							 | 
						                    scene_manager.detect_scenes(video, show_progress=False) | 
					
					
						
						| 
							 | 
						                    scenes = scene_manager.get_scene_list() | 
					
					
						
						| 
							 | 
						                else: | 
					
					
						
						| 
							 | 
						                    scenes = [] | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						                num_scenes = len(scenes) | 
					
					
						
						| 
							 | 
						                     | 
					
					
						
						| 
							 | 
						             | 
					
					
						
						| 
							 | 
						         | 
					
					
						
						| 
							 | 
						                if not scenes: | 
					
					
						
						| 
							 | 
						                    print(f'video "{video_path.name}" is already a single-scene clip') | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						                     | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						                    if parent_caption_path.exists(): | 
					
					
						
						| 
							 | 
						                         | 
					
					
						
						| 
							 | 
						                         | 
					
					
						
						| 
							 | 
						                         | 
					
					
						
						| 
							 | 
						                         | 
					
					
						
						| 
							 | 
						                        output_video_path = STAGING_PATH / f"{base_name}___{1:03d}.mp4" | 
					
					
						
						| 
							 | 
						                         | 
					
					
						
						| 
							 | 
						                        shutil.copy2(process_path, output_video_path) | 
					
					
						
						| 
							 | 
						                         | 
					
					
						
						| 
							 | 
						                        shutil.copy2(parent_caption_path, output_video_path.with_suffix('.txt')) | 
					
					
						
						| 
							 | 
						                        parent_caption_path.unlink() | 
					
					
						
						| 
							 | 
						                    else: | 
					
					
						
						| 
							 | 
						                         | 
					
					
						
						| 
							 | 
						                        output_video_path = STAGING_PATH / f"{base_name}___{1:03d}.mp4" | 
					
					
						
						| 
							 | 
						                        shutil.copy2(process_path, output_video_path) | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						                else: | 
					
					
						
						| 
							 | 
						                    print(f'video "{video_path.name}" contains {num_scenes} scenes') | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						                     | 
					
					
						
						| 
							 | 
						                     | 
					
					
						
						| 
							 | 
						                     | 
					
					
						
						| 
							 | 
						                    if parent_caption_path.exists(): | 
					
					
						
						| 
							 | 
						                        output_caption_path = STAGING_PATH / f"{base_name}.txt" | 
					
					
						
						| 
							 | 
						                        shutil.copy2(parent_caption_path, output_caption_path) | 
					
					
						
						| 
							 | 
						                        parent_caption_path.unlink() | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						                    output_template = str(STAGING_PATH / f"{base_name}___$SCENE_NUMBER.mp4") | 
					
					
						
						| 
							 | 
						                     | 
					
					
						
						| 
							 | 
						                     | 
					
					
						
						| 
							 | 
						                    await asyncio.get_event_loop().run_in_executor( | 
					
					
						
						| 
							 | 
						                        None, | 
					
					
						
						| 
							 | 
						                        lambda: split_video_ffmpeg( | 
					
					
						
						| 
							 | 
						                            str(process_path), | 
					
					
						
						| 
							 | 
						                            scenes, | 
					
					
						
						| 
							 | 
						                            output_file_template=output_template, | 
					
					
						
						| 
							 | 
						                            show_progress=False | 
					
					
						
						| 
							 | 
						                        ) | 
					
					
						
						| 
							 | 
						                    ) | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						                 | 
					
					
						
						| 
							 | 
						                crop_status = " (black bars removed)" if was_cropped else "" | 
					
					
						
						| 
							 | 
						                self._scene_counts[video_path.name] = num_scenes | 
					
					
						
						| 
							 | 
						                self._processing_status[video_path.name] = f"{num_scenes} scenes{crop_status}" | 
					
					
						
						| 
							 | 
						                 | 
					
					
						
						| 
							 | 
						                 | 
					
					
						
						| 
							 | 
						                video_path.unlink() | 
					
					
						
						| 
							 | 
						                 | 
					
					
						
						| 
							 | 
						                if num_scenes: | 
					
					
						
						| 
							 | 
						                    gr.Info(f"Extracted {num_scenes} clips from {video_path.name}{crop_status}") | 
					
					
						
						| 
							 | 
						                else: | 
					
					
						
						| 
							 | 
						                    gr.Info(f"Imported {video_path.name}{crop_status}") | 
					
					
						
						| 
							 | 
						                     | 
					
					
						
						| 
							 | 
						                return num_scenes | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						        except Exception as e: | 
					
					
						
						| 
							 | 
						            self._scene_counts[video_path.name] = 0 | 
					
					
						
						| 
							 | 
						            self._processing_status[video_path.name] = f"Error: {str(e)}" | 
					
					
						
						| 
							 | 
						            raise gr.Error(f"Error processing video {video_path}: {str(e)}") | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						    def get_scene_count(self, video_name: str) -> Optional[int]: | 
					
					
						
						| 
							 | 
						        """Get number of detected scenes for a video | 
					
					
						
						| 
							 | 
						         | 
					
					
						
						| 
							 | 
						        Returns None if video hasn't been scanned | 
					
					
						
						| 
							 | 
						        """ | 
					
					
						
						| 
							 | 
						        return self._scene_counts.get(video_name) | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						    def get_current_file(self) -> Optional[str]: | 
					
					
						
						| 
							 | 
						        """Get name of file currently being processed""" | 
					
					
						
						| 
							 | 
						        return self._current_file | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						    def is_processing(self) -> bool: | 
					
					
						
						| 
							 | 
						        """Check if background processing is running""" | 
					
					
						
						| 
							 | 
						        return self.processing | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						    async def start_processing(self, enable_splitting: bool) -> None: | 
					
					
						
						| 
							 | 
						        """Start background processing of unprocessed videos""" | 
					
					
						
						| 
							 | 
						        if self.processing: | 
					
					
						
						| 
							 | 
						            return | 
					
					
						
						| 
							 | 
						             | 
					
					
						
						| 
							 | 
						        self.processing = True | 
					
					
						
						| 
							 | 
						        try: | 
					
					
						
						| 
							 | 
						             | 
					
					
						
						| 
							 | 
						            for video_file in VIDEOS_TO_SPLIT_PATH.glob("*.mp4"): | 
					
					
						
						| 
							 | 
						                self._current_file = video_file.name | 
					
					
						
						| 
							 | 
						                await self.process_video(video_file, enable_splitting) | 
					
					
						
						| 
							 | 
						                     | 
					
					
						
						| 
							 | 
						        finally: | 
					
					
						
						| 
							 | 
						            self.processing = False | 
					
					
						
						| 
							 | 
						            self._current_file = None | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						    def get_processing_status(self, video_name: str) -> str: | 
					
					
						
						| 
							 | 
						        """Get processing status for a video | 
					
					
						
						| 
							 | 
						         | 
					
					
						
						| 
							 | 
						        Args: | 
					
					
						
						| 
							 | 
						            video_name: Name of the video file | 
					
					
						
						| 
							 | 
						             | 
					
					
						
						| 
							 | 
						        Returns: | 
					
					
						
						| 
							 | 
						            Status string for the video | 
					
					
						
						| 
							 | 
						        """ | 
					
					
						
						| 
							 | 
						        if video_name in self._processing_status: | 
					
					
						
						| 
							 | 
						            return self._processing_status[video_name] | 
					
					
						
						| 
							 | 
						        return "not processed" | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						    def list_unprocessed_videos(self) -> List[List[str]]: | 
					
					
						
						| 
							 | 
						        """List all unprocessed and processed videos with their status. | 
					
					
						
						| 
							 | 
						        Images will be ignored. | 
					
					
						
						| 
							 | 
						         | 
					
					
						
						| 
							 | 
						        Returns: | 
					
					
						
						| 
							 | 
						            List of lists containing [name, status] for each video | 
					
					
						
						| 
							 | 
						        """ | 
					
					
						
						| 
							 | 
						        videos = [] | 
					
					
						
						| 
							 | 
						         | 
					
					
						
						| 
							 | 
						         | 
					
					
						
						| 
							 | 
						        processed_videos = {} | 
					
					
						
						| 
							 | 
						        for clip_path in STAGING_PATH.glob("*.mp4"): | 
					
					
						
						| 
							 | 
						            base_name = clip_path.stem.rsplit('___', 1)[0] + '.mp4' | 
					
					
						
						| 
							 | 
						            if base_name in processed_videos: | 
					
					
						
						| 
							 | 
						                processed_videos[base_name] += 1 | 
					
					
						
						| 
							 | 
						            else: | 
					
					
						
						| 
							 | 
						                processed_videos[base_name] = 1 | 
					
					
						
						| 
							 | 
						                 | 
					
					
						
						| 
							 | 
						         | 
					
					
						
						| 
							 | 
						        for video_file in VIDEOS_TO_SPLIT_PATH.glob("*.mp4"): | 
					
					
						
						| 
							 | 
						            if is_video_file(video_file):   | 
					
					
						
						| 
							 | 
						                status = self.get_processing_status(video_file.name) | 
					
					
						
						| 
							 | 
						                videos.append([video_file.name, status]) | 
					
					
						
						| 
							 | 
						         | 
					
					
						
						| 
							 | 
						         | 
					
					
						
						| 
							 | 
						        for video_name, clip_count in processed_videos.items(): | 
					
					
						
						| 
							 | 
						            if not (VIDEOS_TO_SPLIT_PATH / video_name).exists(): | 
					
					
						
						| 
							 | 
						                status = f"Processed ({clip_count} clips)" | 
					
					
						
						| 
							 | 
						                videos.append([video_name, status]) | 
					
					
						
						| 
							 | 
						
 | 
					
					
						
						| 
							 | 
						        return sorted(videos, key=lambda x: (x[1] != "Processing...", x[0].lower())) | 
					
					
						
						| 
							 | 
						
 |