Spaces:
Runtime error
Runtime error
| import os | |
| import numpy as np | |
| import cv2 | |
| import traceback | |
| from collections import Counter | |
| from sklearn.model_selection import train_test_split | |
| from tensorflow.keras.utils import Sequence | |
| from tensorflow.keras.models import Sequential, load_model | |
| from tensorflow.keras.layers import Input, Conv3D, MaxPooling3D, Flatten, Dense, Dropout, BatchNormalization | |
| from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, CSVLogger | |
| import tensorflow as tf | |
| # === CONFIG === | |
| DATA_DIR = "D:\\K_REPO\\ComV\\train" | |
| N_FRAMES = 30 | |
| IMG_SIZE = (96, 96) | |
| EPOCHS = 10 | |
| BATCH_SIZE = 14 | |
| CHECKPOINT_DIR = r"D:\K_REPO\ComV\AI_made\trainnig_output\checkpoint" | |
| RESUME_TRAINING = 1 | |
| MIN_REQUIRED_FRAMES = 10 | |
| OUTPUT_PATH = r"D:\K_REPO\ComV\AI_made\trainnig_output\final_model_2.h5" | |
| # Optimize OpenCV | |
| cv2.setUseOptimized(True) | |
| cv2.setNumThreads(8) | |
| # === VIDEO DATA GENERATOR === | |
| class VideoDataGenerator(Sequence): | |
| def __init__(self, video_paths, labels, batch_size, n_frames, img_size): | |
| self.video_paths, self.labels = self._filter_invalid_videos(video_paths, labels) | |
| self.batch_size = batch_size | |
| self.n_frames = n_frames | |
| self.img_size = img_size | |
| self.indices = np.arange(len(self.video_paths)) | |
| print(f"[INFO] Final dataset size: {len(self.video_paths)} videos") | |
| def _filter_invalid_videos(self, paths, labels): | |
| valid_paths = [] | |
| valid_labels = [] | |
| for path, label in zip(paths, labels): | |
| cap = cv2.VideoCapture(path) | |
| if not cap.isOpened(): | |
| print(f"[WARNING] Could not open video: {path}") | |
| continue | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| cap.release() | |
| if total_frames < MIN_REQUIRED_FRAMES: | |
| print(f"[WARNING] Skipping {path} - only {total_frames} frames (needs at least {MIN_REQUIRED_FRAMES})") | |
| continue | |
| valid_paths.append(path) | |
| valid_labels.append(label) | |
| return valid_paths, valid_labels | |
| def __len__(self): | |
| return int(np.ceil(len(self.video_paths) / self.batch_size)) | |
| def __getitem__(self, index): | |
| batch_indices = self.indices[index*self.batch_size:(index+1)*self.batch_size] | |
| X, y = [], [] | |
| for i in batch_indices: | |
| path = self.video_paths[i] | |
| label = self.labels[i] | |
| try: | |
| frames = self._load_video_frames(path) | |
| X.append(frames) | |
| y.append(label) | |
| except Exception as e: | |
| print(f"[WARNING] Error processing {path} - {str(e)}") | |
| X.append(np.zeros((self.n_frames, *self.img_size, 3))) | |
| y.append(label) | |
| return np.array(X), np.array(y) | |
| def _load_video_frames(self, path): | |
| cap = cv2.VideoCapture(path) | |
| total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| if total_frames < self.n_frames: | |
| frame_indices = np.linspace(0, total_frames - 1, min(total_frames, self.n_frames), dtype=np.int32) | |
| else: | |
| frame_indices = np.linspace(0, total_frames - 1, self.n_frames, dtype=np.int32) | |
| frames = [] | |
| for idx in frame_indices: | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, idx) | |
| ret, frame = cap.read() | |
| if not ret: | |
| frame = np.zeros((*self.img_size, 3), dtype=np.uint8) | |
| else: | |
| frame = cv2.resize(frame, self.img_size) | |
| frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| frames.append(frame) | |
| cap.release() | |
| while len(frames) < self.n_frames: | |
| frames.append(frames[-1] if frames else np.zeros((*self.img_size, 3), dtype=np.uint8)) | |
| return np.array(frames) / 255.0 | |
| def on_epoch_end(self): | |
| np.random.shuffle(self.indices) | |
| def create_model(): | |
| model = Sequential([ | |
| Input(shape=(N_FRAMES, *IMG_SIZE, 3)), | |
| Conv3D(32, kernel_size=(3, 3, 3), activation='relu', padding='same'), | |
| MaxPooling3D(pool_size=(1, 2, 2)), | |
| BatchNormalization(), | |
| Conv3D(64, kernel_size=(3, 3, 3), activation='relu', padding='same'), | |
| MaxPooling3D(pool_size=(1, 2, 2)), | |
| BatchNormalization(), | |
| Conv3D(128, kernel_size=(3, 3, 3), activation='relu', padding='same'), | |
| MaxPooling3D(pool_size=(2, 2, 2)), | |
| BatchNormalization(), | |
| Flatten(), | |
| Dense(256, activation='relu'), | |
| Dropout(0.5), | |
| Dense(1, activation='sigmoid') | |
| ]) | |
| model.compile(optimizer='adam', | |
| loss='binary_crossentropy', | |
| metrics=['accuracy']) | |
| return model | |
| def load_data(): | |
| video_paths, labels = [], [] | |
| for label_name in ["Fighting", "Normal"]: | |
| label_dir = os.path.join(DATA_DIR, label_name) | |
| if not os.path.isdir(label_dir): | |
| raise FileNotFoundError(f"Directory not found: {label_dir}") | |
| label = 1 if label_name.lower() == "fighting" else 0 | |
| for file in os.listdir(label_dir): | |
| if file.lower().endswith((".mp4", ".mpeg", ".avi", ".mov")): | |
| full_path = os.path.join(label_dir, file) | |
| video_paths.append(full_path) | |
| labels.append(label) | |
| if not video_paths: | |
| raise ValueError(f"No videos found in {DATA_DIR}") | |
| print(f"[INFO] Total videos: {len(video_paths)} (Fighting: {labels.count(1)}, Normal: {labels.count(0)})") | |
| if len(set(labels)) > 1: | |
| return train_test_split(video_paths, labels, test_size=0.2, stratify=labels, random_state=42) | |
| else: | |
| print("[WARNING] Only one class found. Splitting without stratification.") | |
| return train_test_split(video_paths, labels, test_size=0.2, random_state=42) | |
| def get_latest_checkpoint(): | |
| if not os.path.exists(CHECKPOINT_DIR): | |
| os.makedirs(CHECKPOINT_DIR) | |
| return None | |
| checkpoints = [f for f in os.listdir(CHECKPOINT_DIR) | |
| if f.startswith('ckpt_') and f.endswith('.h5')] | |
| if not checkpoints: | |
| return None | |
| checkpoints.sort(key=lambda x: int(x.split('_')[1].split('.')[0])) | |
| return os.path.join(CHECKPOINT_DIR, checkpoints[-1]) | |
| def main(): | |
| # Load and split data | |
| try: | |
| train_paths, val_paths, train_labels, val_labels = load_data() | |
| except Exception as e: | |
| print(f"[ERROR] Failed to load data: {str(e)}") | |
| return | |
| # Create data generators | |
| try: | |
| train_gen = VideoDataGenerator(train_paths, train_labels, BATCH_SIZE, N_FRAMES, IMG_SIZE) | |
| val_gen = VideoDataGenerator(val_paths, val_labels, BATCH_SIZE, N_FRAMES, IMG_SIZE) | |
| except Exception as e: | |
| print(f"[ERROR] Failed to create data generators: {str(e)}") | |
| return | |
| # Callbacks | |
| callbacks = [ | |
| ModelCheckpoint( | |
| os.path.join(CHECKPOINT_DIR, 'ckpt_{epoch}.h5'), | |
| save_best_only=False, | |
| save_weights_only=False | |
| ), | |
| CSVLogger('training_log.csv', append=True), | |
| EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True) | |
| ] | |
| # Handle resume training | |
| initial_epoch = 0 | |
| try: | |
| if RESUME_TRAINING: | |
| ckpt = get_latest_checkpoint() | |
| if ckpt: | |
| print(f"[INFO] Resuming training from checkpoint: {ckpt}") | |
| model = load_model(ckpt) | |
| initial_epoch = int(ckpt.split('_')[1].split('.')[0]) | |
| else: | |
| print("[INFO] No checkpoint found, starting new training") | |
| model = create_model() | |
| else: | |
| model = create_model() | |
| except Exception as e: | |
| print(f"[ERROR] Failed to initialize model: {str(e)}") | |
| return | |
| # Display model summary | |
| model.summary() | |
| # Train model | |
| try: | |
| print("[INFO] Starting training...") | |
| history = model.fit( | |
| train_gen, | |
| validation_data=val_gen, | |
| epochs=EPOCHS, | |
| initial_epoch=initial_epoch, | |
| callbacks=callbacks, | |
| verbose=1 | |
| ) | |
| except Exception as e: | |
| print(f"[ERROR] Training failed: {str(e)}") | |
| traceback.print_exc() | |
| finally: | |
| model.save(OUTPUT_PATH) | |
| print("[INFO] Training completed. Model saved to final_model_2.h5") | |
| if __name__ == "__main__": | |
| print("[INFO] Starting script...") | |
| main() | |
| print("[INFO] Script execution completed.") | |