import json import os import cv2 import gradio as gr import imagehash import numpy as np import plotly.graph_objects as go from PIL import Image from plotly.subplots import make_subplots from scipy.stats import pearsonr from skimage.metrics import mean_squared_error as mse_skimage from skimage.metrics import peak_signal_noise_ratio as psnr_skimage from skimage.metrics import structural_similarity as ssim class FrameMetrics: """Class to compute and store frame-by-frame metrics""" def __init__(self): self.metrics = {} def compute_ssim(self, frame1, frame2): """Compute SSIM between two frames""" if frame1 is None or frame2 is None: return None try: # Convert to grayscale for SSIM computation gray1 = ( cv2.cvtColor(frame1, cv2.COLOR_RGB2GRAY) if len(frame1.shape) == 3 else frame1 ) gray2 = ( cv2.cvtColor(frame2, cv2.COLOR_RGB2GRAY) if len(frame2.shape) == 3 else frame2 ) # Ensure both frames have the same dimensions if gray1.shape != gray2.shape: # Resize to match the smaller dimension h = min(gray1.shape[0], gray2.shape[0]) w = min(gray1.shape[1], gray2.shape[1]) gray1 = cv2.resize(gray1, (w, h)) gray2 = cv2.resize(gray2, (w, h)) # Compute SSIM ssim_value = ssim(gray1, gray2, data_range=255) return ssim_value except Exception as e: print(f"SSIM computation failed: {e}") return None def compute_ms_ssim(self, frame1, frame2): """Compute Multi-Scale SSIM between two frames""" if frame1 is None or frame2 is None: return None try: # Convert to grayscale for MS-SSIM computation gray1 = ( cv2.cvtColor(frame1, cv2.COLOR_RGB2GRAY) if len(frame1.shape) == 3 else frame1 ) gray2 = ( cv2.cvtColor(frame2, cv2.COLOR_RGB2GRAY) if len(frame2.shape) == 3 else frame2 ) # Ensure both frames have the same dimensions if gray1.shape != gray2.shape: h = min(gray1.shape[0], gray2.shape[0]) w = min(gray1.shape[1], gray2.shape[1]) gray1 = cv2.resize(gray1, (w, h)) gray2 = cv2.resize(gray2, (w, h)) # Ensure minimum size for multi-scale analysis min_size = 32 if min(gray1.shape) < min_size: return None # Compute MS-SSIM using multiple scales from skimage.metrics import structural_similarity # Use win_size that works with image dimensions win_size = min(7, min(gray1.shape) // 4) if win_size < 3: win_size = 3 ms_ssim_val = structural_similarity( gray1, gray2, data_range=255, win_size=win_size, multichannel=False ) return ms_ssim_val except Exception as e: print(f"MS-SSIM computation failed: {e}") return None def compute_psnr(self, frame1, frame2): """Compute PSNR between two frames""" if frame1 is None or frame2 is None: return None try: # Ensure both frames have the same dimensions if frame1.shape != frame2.shape: h = min(frame1.shape[0], frame2.shape[0]) w = min(frame1.shape[1], frame2.shape[1]) c = ( min(frame1.shape[2], frame2.shape[2]) if len(frame1.shape) == 3 else 1 ) if len(frame1.shape) == 3: frame1 = cv2.resize(frame1, (w, h))[:, :, :c] frame2 = cv2.resize(frame2, (w, h))[:, :, :c] else: frame1 = cv2.resize(frame1, (w, h)) frame2 = cv2.resize(frame2, (w, h)) # Compute PSNR return psnr_skimage(frame1, frame2, data_range=255) except Exception as e: print(f"PSNR computation failed: {e}") return None def compute_mse(self, frame1, frame2): """Compute MSE between two frames""" if frame1 is None or frame2 is None: return None try: # Ensure both frames have the same dimensions if frame1.shape != frame2.shape: h = min(frame1.shape[0], frame2.shape[0]) w = min(frame1.shape[1], frame2.shape[1]) c = ( min(frame1.shape[2], frame2.shape[2]) if len(frame1.shape) == 3 else 1 ) if len(frame1.shape) == 3: frame1 = cv2.resize(frame1, (w, h))[:, :, :c] frame2 = cv2.resize(frame2, (w, h))[:, :, :c] else: frame1 = cv2.resize(frame1, (w, h)) frame2 = cv2.resize(frame2, (w, h)) # Compute MSE return mse_skimage(frame1, frame2) except Exception as e: print(f"MSE computation failed: {e}") return None def compute_phash(self, frame1, frame2): """Compute perceptual hash similarity between two frames""" if frame1 is None or frame2 is None: return None try: # Convert to PIL Images for imagehash pil1 = Image.fromarray(frame1) pil2 = Image.fromarray(frame2) # Compute perceptual hashes hash1 = imagehash.phash(pil1) hash2 = imagehash.phash(pil2) # Calculate similarity (lower hamming distance = more similar) hamming_distance = hash1 - hash2 # Convert to similarity score (0-1, where 1 is identical) max_distance = len(str(hash1)) * 4 # 4 bits per hex char similarity = 1 - (hamming_distance / max_distance) return similarity except Exception as e: print(f"pHash computation failed: {e}") return None def compute_color_histogram_correlation(self, frame1, frame2): """Compute color histogram correlation between two frames""" if frame1 is None or frame2 is None: return None try: # Ensure both frames have the same dimensions if frame1.shape != frame2.shape: h = min(frame1.shape[0], frame2.shape[0]) w = min(frame1.shape[1], frame2.shape[1]) frame1 = cv2.resize(frame1, (w, h)) frame2 = cv2.resize(frame2, (w, h)) # Compute histograms for each channel correlations = [] if len(frame1.shape) == 3: # Color image for i in range(3): # R, G, B channels hist1 = cv2.calcHist([frame1], [i], None, [256], [0, 256]) hist2 = cv2.calcHist([frame2], [i], None, [256], [0, 256]) # Flatten histograms hist1 = hist1.flatten() hist2 = hist2.flatten() # Compute correlation if np.std(hist1) > 0 and np.std(hist2) > 0: corr, _ = pearsonr(hist1, hist2) correlations.append(corr) # Return average correlation across channels return np.mean(correlations) if correlations else 0.0 else: # Grayscale hist1 = cv2.calcHist([frame1], [0], None, [256], [0, 256]).flatten() hist2 = cv2.calcHist([frame2], [0], None, [256], [0, 256]).flatten() if np.std(hist1) > 0 and np.std(hist2) > 0: corr, _ = pearsonr(hist1, hist2) return corr else: return 0.0 except Exception as e: print(f"Color histogram correlation computation failed: {e}") return None def compute_sharpness(self, frame): """Compute sharpness using Laplacian variance method""" if frame is None: return None # Convert to grayscale if needed gray = ( cv2.cvtColor(frame, cv2.COLOR_RGB2GRAY) if len(frame.shape) == 3 else frame ) # Compute Laplacian variance (higher values = sharper) laplacian = cv2.Laplacian(gray, cv2.CV_64F) sharpness = laplacian.var() return sharpness def compute_frame_metrics(self, frame1, frame2, frame_idx): """Compute all metrics for a frame pair""" metrics = { "frame_index": frame_idx, "ssim": self.compute_ssim(frame1, frame2), "psnr": self.compute_psnr(frame1, frame2), "mse": self.compute_mse(frame1, frame2), "phash": self.compute_phash(frame1, frame2), "color_hist_corr": self.compute_color_histogram_correlation(frame1, frame2), "sharpness1": self.compute_sharpness(frame1), "sharpness2": self.compute_sharpness(frame2), } # Compute average sharpness for the pair if metrics["sharpness1"] is not None and metrics["sharpness2"] is not None: metrics["sharpness_avg"] = ( metrics["sharpness1"] + metrics["sharpness2"] ) / 2 metrics["sharpness_diff"] = abs( metrics["sharpness1"] - metrics["sharpness2"] ) else: metrics["sharpness_avg"] = None metrics["sharpness_diff"] = None return metrics def compute_all_metrics(self, frames1, frames2): """Compute metrics for all frame pairs""" all_metrics = [] max_frames = max(len(frames1), len(frames2)) for i in range(max_frames): frame1 = frames1[i] if i < len(frames1) else None frame2 = frames2[i] if i < len(frames2) else None if frame1 is not None or frame2 is not None: metrics = self.compute_frame_metrics(frame1, frame2, i) all_metrics.append(metrics) else: # Handle cases where both frames are missing all_metrics.append( { "frame_index": i, "ssim": None, "ms_ssim": None, "psnr": None, "mse": None, "phash": None, "color_hist_corr": None, "sharpness1": None, "sharpness2": None, "sharpness_avg": None, "sharpness_diff": None, } ) return all_metrics def get_metric_summary(self, metrics_list): """Compute summary statistics for all metrics""" metric_names = [ "ssim", "psnr", "mse", "phash", "color_hist_corr", "sharpness1", "sharpness2", "sharpness_avg", "sharpness_diff", ] summary = { "total_frames": len(metrics_list), "valid_frames": len([m for m in metrics_list if m.get("ssim") is not None]), } # Compute statistics for each metric for metric_name in metric_names: valid_values = [ m[metric_name] for m in metrics_list if m.get(metric_name) is not None ] if valid_values: summary.update( { f"{metric_name}_mean": np.mean(valid_values), f"{metric_name}_min": np.min(valid_values), f"{metric_name}_max": np.max(valid_values), f"{metric_name}_std": np.std(valid_values), } ) return summary def create_modern_plot(self, metrics_list, current_frame=0): """Create a comprehensive multi-metric visualization with shared hover""" if not metrics_list: return None # Extract frame indices and metric values frame_indices = [m["frame_index"] for m in metrics_list] # Create 3x2 subplots with quality overview at the top fig = make_subplots( rows=3, cols=2, subplot_titles=( "Quality Overview (Combined Score)", "", # Empty title for merged cell "SSIM", "PSNR vs MSE", "Perceptual Hash vs Color Histogram", "Individual Sharpness (Video 1 vs Video 2)", ), specs=[ [ {"colspan": 2, "secondary_y": False}, None, ], # Row 1: Quality Overview (single axis) [ {"secondary_y": False}, {"secondary_y": True}, ], # Row 2: SSIM (single axis), PSNR vs MSE [ {"secondary_y": True}, {"secondary_y": True}, ], # Row 3: pHash vs Color, Individual Sharpness ], vertical_spacing=0.12, horizontal_spacing=0.1, ) # Helper function to get valid data def get_valid_data(metric_name): values = [m.get(metric_name) for m in metrics_list] valid_indices = [i for i, v in enumerate(values) if v is not None] valid_values = [values[i] for i in valid_indices] valid_frames = [frame_indices[i] for i in valid_indices] return valid_frames, valid_values # Plot 1: Quality Overview - Combined Score Only (row 1, full width) ssim_frames, ssim_values = get_valid_data("ssim") psnr_frames, psnr_values = get_valid_data("psnr") # Show only combined quality score if ssim_values and psnr_values and len(ssim_values) == len(psnr_values): # Normalize metrics to 0-1 scale for comparison ssim_norm = np.array(ssim_values) psnr_norm = np.clip(np.array(psnr_values) / 50, 0, 1) quality_score = (ssim_norm + psnr_norm) / 2 fig.add_trace( go.Scatter( x=ssim_frames, y=quality_score, mode="lines+markers", name="Quality Score ↑", line=dict(color="gold", width=4), marker=dict(size=8), hovertemplate="Frame %{x}
Quality Score: %{y:.3f}", fill="tonexty", ), row=1, col=1, ) # Plot 2: SSIM (row 2, col 1) if ssim_values: fig.add_trace( go.Scatter( x=ssim_frames, y=ssim_values, mode="lines+markers", name="SSIM ↑", line=dict(color="blue", width=3), marker=dict(size=6), hovertemplate="Frame %{x}
SSIM: %{y:.4f}", ), row=2, col=1, ) # Get pHash data for later use phash_frames, phash_values = get_valid_data("phash") # Plot 3: PSNR vs MSE (row 2, col 2) - keep as is since already shows individual metrics if psnr_values: fig.add_trace( go.Scatter( x=psnr_frames, y=psnr_values, mode="lines+markers", name="PSNR ↑", line=dict(color="green", width=2), hovertemplate="Frame %{x}
PSNR: %{y:.2f} dB", ), row=2, col=2, ) mse_frames, mse_values = get_valid_data("mse") if mse_values: fig.add_trace( go.Scatter( x=mse_frames, y=mse_values, mode="lines+markers", name="MSE ↓", line=dict(color="red", width=2), hovertemplate="Frame %{x}
MSE: %{y:.2f}", yaxis="y6", ), row=2, col=2, secondary_y=True, ) # Plot 4: Perceptual Hash vs Color Histogram (row 3, col 1) - keep as is if phash_values: fig.add_trace( go.Scatter( x=phash_frames, y=phash_values, mode="lines+markers", name="pHash ↑", line=dict(color="purple", width=2), hovertemplate="Frame %{x}
pHash: %{y:.4f}", ), row=3, col=1, ) hist_frames, hist_values = get_valid_data("color_hist_corr") if hist_values: fig.add_trace( go.Scatter( x=hist_frames, y=hist_values, mode="lines+markers", name="Color Hist ↑", line=dict(color="orange", width=2), hovertemplate="Frame %{x}
Hist Corr: %{y:.4f}", yaxis="y8", ), row=3, col=1, secondary_y=True, ) # Plot 5: Individual Sharpness - Video 1 vs Video 2 (row 3, col 2) sharp1_frames, sharp1_values = get_valid_data("sharpness1") sharp2_frames, sharp2_values = get_valid_data("sharpness2") if sharp1_values: fig.add_trace( go.Scatter( x=sharp1_frames, y=sharp1_values, mode="lines+markers", name="Video 1 Sharpness ↑", line=dict(color="darkgreen", width=2), hovertemplate="Frame %{x}
Video 1 Sharpness: %{y:.1f}", ), row=3, col=2, ) if sharp2_values: fig.add_trace( go.Scatter( x=sharp2_frames, y=sharp2_values, mode="lines+markers", name="Video 2 Sharpness ↑", line=dict(color="darkblue", width=2), hovertemplate="Frame %{x}
Video 2 Sharpness: %{y:.1f}", yaxis="y10", ), row=3, col=2, secondary_y=True, ) # Add current frame marker to all plots if current_frame is not None: # Add vertical line to each subplot to show current frame # Subplot (1,1): Quality Overview (full width) fig.add_vline( x=current_frame, line_dash="dash", line_color="red", line_width=2, row=1, col=1, ) # Subplot (2,1): Similarity Metrics fig.add_vline( x=current_frame, line_dash="dash", line_color="red", line_width=2, row=2, col=1, ) # Subplot (2,2): PSNR vs MSE fig.add_vline( x=current_frame, line_dash="dash", line_color="red", line_width=2, row=2, col=2, ) # Subplot (3,1): pHash vs Color Histogram fig.add_vline( x=current_frame, line_dash="dash", line_color="red", line_width=2, row=3, col=1, ) # Subplot (3,2): Individual Sharpness fig.add_vline( x=current_frame, line_dash="dash", line_color="red", line_width=2, row=3, col=2, ) # Update layout with shared hover mode and other improvements fig.update_layout( height=900, showlegend=True, hovermode="x unified", # Shared hover pointer across subplots dragmode=False, title={ "text": "📊 Multi-Metric Video Quality Analysis Dashboard", "x": 0.5, "xanchor": "center", "font": {"size": 16}, }, legend={ "orientation": "h", "yanchor": "bottom", "y": 1.02, "xanchor": "center", "x": 0.5, "font": {"size": 10}, }, margin=dict(t=100, b=50, l=50, r=50), plot_bgcolor="rgba(0,0,0,0)", paper_bgcolor="rgba(0,0,0,0)", ) # Update axes labels and ranges with improved configuration fig.update_xaxes(title_text="Frame", fixedrange=True) # Quality Overview axis (row 1, col 1) - focused range to emphasize differences quality_values = [] if ssim_values and psnr_values and len(ssim_values) == len(psnr_values): ssim_norm = np.array(ssim_values) psnr_norm = np.clip(np.array(psnr_values) / 50, 0, 1) quality_values = (ssim_norm + psnr_norm) / 2 if len(quality_values) > 0: # Use dynamic range based on data with some padding for better visualization min_qual = float(np.min(quality_values)) max_qual = float(np.max(quality_values)) range_padding = (max_qual - min_qual) * 0.1 # 10% padding y_min = max(0, min_qual - range_padding) y_max = min(1, max_qual + range_padding) # Ensure minimum range for visibility if (y_max - y_min) < 0.1: center = (y_max + y_min) / 2 y_min = max(0, center - 0.05) y_max = min(1, center + 0.05) else: # Fallback range y_min, y_max = 0.5, 1.0 fig.update_yaxes( title_text="Quality Score", row=1, col=1, fixedrange=True, range=[y_min, y_max], ) # SSIM axis (row 2, col 1) fig.update_yaxes( title_text="SSIM", row=2, col=1, fixedrange=True, range=[0, 1.05] ) # PSNR vs MSE axes (row 2, col 2) fig.update_yaxes(title_text="PSNR (dB)", row=2, col=2, fixedrange=True) fig.update_yaxes( title_text="MSE", row=2, col=2, secondary_y=True, fixedrange=True ) # pHash vs Color Histogram axes (row 3, col 1) fig.update_yaxes(title_text="pHash Similarity", row=3, col=1, fixedrange=True) fig.update_yaxes( title_text="Histogram Correlation", row=3, col=1, secondary_y=True, fixedrange=True, ) # Individual Sharpness axes (row 3, col 2) fig.update_yaxes(title_text="Video 1 Sharpness", row=3, col=2, fixedrange=True) fig.update_yaxes( title_text="Video 2 Sharpness", row=3, col=2, secondary_y=True, fixedrange=True, ) return fig class VideoFrameComparator: def __init__(self): self.video1_frames = [] self.video2_frames = [] self.max_frames = 0 self.frame_metrics = FrameMetrics() self.computed_metrics = [] self.metrics_summary = {} def extract_frames(self, video_path): """Extract all frames from a video file or URL""" if not video_path: return [] # Check if it's a URL or local file is_url = video_path.startswith(("http://", "https://")) if not is_url and not os.path.exists(video_path): print(f"Warning: Local video file not found: {video_path}") return [] frames = [] cap = cv2.VideoCapture(video_path) if not cap.isOpened(): print( f"Error: Could not open video {'URL' if is_url else 'file'}: {video_path}" ) return [] try: frame_count = 0 while True: ret, frame = cap.read() if not ret: break # Convert BGR to RGB for display frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) frames.append(frame_rgb) frame_count += 1 # Add progress feedback for URLs (which might be slower) if is_url and frame_count % 30 == 0: print(f"Processed {frame_count} frames from URL...") except Exception as e: print(f"Error processing video: {e}") finally: cap.release() print( f"Successfully extracted {len(frames)} frames from {'URL' if is_url else 'file'}: {video_path}" ) return frames def is_comparison_in_data_json( self, video1_path, video2_path, json_file_path="data.json" ): """Check if this video comparison exists in data.json""" try: with open(json_file_path, "r") as f: data = json.load(f) for comparison in data.get("comparisons", []): videos = comparison.get("videos", []) if len(videos) == 2: # Check both orders (works for both local files and URLs) if (videos[0] == video1_path and videos[1] == video2_path) or ( videos[0] == video2_path and videos[1] == video1_path ): return True return False except: return False def load_videos(self, video1_path, video2_path): """Load both videos and extract frames""" if not video1_path and not video2_path: return "Please upload at least one video.", 0, None, None, "", None # Extract frames from both videos self.video1_frames = self.extract_frames(video1_path) if video1_path else [] self.video2_frames = self.extract_frames(video2_path) if video2_path else [] # Determine maximum number of frames self.max_frames = max(len(self.video1_frames), len(self.video2_frames)) if self.max_frames == 0: return ( "No valid frames found in the uploaded videos.", 0, None, None, "", None, ) # Compute metrics if both videos are present and not in data.json metrics_info = "" metrics_plot = None if ( video1_path and video2_path and not self.is_comparison_in_data_json(video1_path, video2_path) ): print("Computing comprehensive frame-by-frame metrics...") self.computed_metrics = self.frame_metrics.compute_all_metrics( self.video1_frames, self.video2_frames ) self.metrics_summary = self.frame_metrics.get_metric_summary( self.computed_metrics ) # Build metrics info string metrics_info = "\n\n📊 Computed Metrics Summary:\n" metric_display = { "ssim": ("SSIM", ".4f", "", "↑ Higher=Better"), "psnr": ("PSNR", ".2f", " dB", "↑ Higher=Better"), "mse": ("MSE", ".2f", "", "↓ Lower=Better"), "phash": ("pHash", ".4f", "", "↑ Higher=Better"), "color_hist_corr": ("Color Hist", ".4f", "", "↑ Higher=Better"), "sharpness_avg": ("Sharpness", ".1f", "", "↑ Higher=Better"), } for metric_key, ( display_name, format_str, unit, direction, ) in metric_display.items(): if self.metrics_summary.get(f"{metric_key}_mean") is not None: mean_val = self.metrics_summary[f"{metric_key}_mean"] std_val = self.metrics_summary[f"{metric_key}_std"] metrics_info += f"{display_name}: μ={mean_val:{format_str}}{unit}, σ={std_val:{format_str}}{unit} ({direction})\n" metrics_info += f"Valid Frames: {self.metrics_summary['valid_frames']}/{self.metrics_summary['total_frames']}" # Generate initial plot metrics_plot = self.frame_metrics.create_modern_plot( self.computed_metrics, 0 ) else: self.computed_metrics = [] self.metrics_summary = {} if video1_path and video2_path: metrics_info = "\n\n📋 Note: This comparison is predefined in data.json (metrics not computed)" # Get initial frames frame1 = ( self.video1_frames[0] if self.video1_frames else np.zeros((480, 640, 3), dtype=np.uint8) ) frame2 = ( self.video2_frames[0] if self.video2_frames else np.zeros((480, 640, 3), dtype=np.uint8) ) status_msg = "Videos loaded successfully!\n" status_msg += f"Video 1: {len(self.video1_frames)} frames\n" status_msg += f"Video 2: {len(self.video2_frames)} frames\n" status_msg += ( f"Use the slider to navigate through frames (0-{self.max_frames - 1})" ) status_msg += metrics_info return ( status_msg, self.max_frames - 1, frame1, frame2, self.get_current_frame_info(0), metrics_plot, ) def get_frames_at_index(self, frame_index): """Get frames at specific index from both videos""" frame_index = int(frame_index) # Get frame from video 1 if frame_index < len(self.video1_frames): frame1 = self.video1_frames[frame_index] else: # Create a placeholder if frame doesn't exist frame1 = np.zeros((480, 640, 3), dtype=np.uint8) cv2.putText( frame1, f"Frame {frame_index} not available", (50, 240), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, ) # Get frame from video 2 if frame_index < len(self.video2_frames): frame2 = self.video2_frames[frame_index] else: # Create a placeholder if frame doesn't exist frame2 = np.zeros((480, 640, 3), dtype=np.uint8) cv2.putText( frame2, f"Frame {frame_index} not available", (50, 240), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 255, 255), 2, ) return frame1, frame2 def get_current_frame_info(self, frame_index): """Get information about the current frame including metrics""" frame_index = int(frame_index) info = f"Current Frame: {frame_index} / {self.max_frames - 1}" # Add metrics info if available if self.computed_metrics and frame_index < len(self.computed_metrics): metrics = self.computed_metrics[frame_index] # === COMPARISON METRICS (Between Videos) === comparison_metrics = [] # SSIM with quality assessment if metrics.get("ssim") is not None: ssim_val = metrics["ssim"] if ssim_val >= 0.9: quality = "🟢 Excellent" elif ssim_val >= 0.8: quality = "🔵 Good" elif ssim_val >= 0.6: quality = "🟡 Fair" else: quality = "🔴 Poor" comparison_metrics.append(f"SSIM: {ssim_val:.4f} ↑ ({quality})") # PSNR with quality indicator if metrics.get("psnr") is not None: psnr_val = metrics["psnr"] if psnr_val >= 40: psnr_quality = "🟢" elif psnr_val >= 30: psnr_quality = "🔵" elif psnr_val >= 20: psnr_quality = "🟡" else: psnr_quality = "🔴" comparison_metrics.append(f"PSNR: {psnr_val:.1f}dB ↑ {psnr_quality}") # MSE with quality indicator (lower is better) if metrics.get("mse") is not None: mse_val = metrics["mse"] if mse_val <= 50: mse_quality = "🟢" elif mse_val <= 100: mse_quality = "🔵" elif mse_val <= 200: mse_quality = "🟡" else: mse_quality = "🔴" comparison_metrics.append(f"MSE: {mse_val:.1f} ↓ {mse_quality}") # pHash with quality indicator if metrics.get("phash") is not None: phash_val = metrics["phash"] if phash_val >= 0.95: phash_quality = "🟢" elif phash_val >= 0.9: phash_quality = "🔵" elif phash_val >= 0.8: phash_quality = "🟡" else: phash_quality = "🔴" comparison_metrics.append(f"pHash: {phash_val:.3f} ↑ {phash_quality}") # Color Histogram Correlation if metrics.get("color_hist_corr") is not None: color_val = metrics["color_hist_corr"] if color_val >= 0.9: color_quality = "🟢" elif color_val >= 0.8: color_quality = "🔵" elif color_val >= 0.6: color_quality = "🟡" else: color_quality = "🔴" comparison_metrics.append(f"Color: {color_val:.3f} ↑ {color_quality}") # Add comparison metrics to info if comparison_metrics: info += " | " + " | ".join(comparison_metrics) # === INDIVIDUAL IMAGE METRICS === individual_metrics = [] # Individual Sharpness for each video if metrics.get("sharpness1") is not None: sharp1 = metrics["sharpness1"] if sharp1 >= 200: sharp1_quality = "🟢" elif sharp1 >= 100: sharp1_quality = "🔵" elif sharp1 >= 50: sharp1_quality = "🟡" else: sharp1_quality = "🔴" individual_metrics.append( f"V1 Sharpness: {sharp1:.0f} ↑ {sharp1_quality}" ) if metrics.get("sharpness2") is not None: sharp2 = metrics["sharpness2"] if sharp2 >= 200: sharp2_quality = "🟢" elif sharp2 >= 100: sharp2_quality = "🔵" elif sharp2 >= 50: sharp2_quality = "🟡" else: sharp2_quality = "🔴" individual_metrics.append( f"V2 Sharpness: {sharp2:.0f} ↑ {sharp2_quality}" ) # Sharpness comparison and winner if ( metrics.get("sharpness1") is not None and metrics.get("sharpness2") is not None ): sharp1 = metrics["sharpness1"] sharp2 = metrics["sharpness2"] # Determine winner if sharp1 > sharp2: winner = "V1" winner_emoji = "🏆" elif sharp2 > sharp1: winner = "V2" winner_emoji = "🏆" else: winner = "Tie" winner_emoji = "⚖️" diff_pct = abs(sharp1 - sharp2) / max(sharp1, sharp2) * 100 # Add significance if diff_pct > 20: significance = "Major" elif diff_pct > 10: significance = "Moderate" elif diff_pct > 5: significance = "Minor" else: significance = "Negligible" individual_metrics.append( f"Sharpness Winner: {winner_emoji}{winner} ({significance})" ) # Add individual metrics to info if individual_metrics: info += "\n📊 Individual: " + " | ".join(individual_metrics) # === OVERALL QUALITY ASSESSMENT === quality_score = 0 quality_count = 0 # Calculate overall quality score if metrics.get("ssim") is not None: quality_score += metrics["ssim"] quality_count += 1 if metrics.get("psnr") is not None: # Normalize PSNR to 0-1 scale (assume 50dB max) psnr_norm = min(metrics["psnr"] / 50, 1.0) quality_score += psnr_norm quality_count += 1 if metrics.get("phash") is not None: quality_score += metrics["phash"] quality_count += 1 if quality_count > 0: avg_quality = quality_score / quality_count # Add overall assessment if avg_quality >= 0.9: overall = "✨ Excellent Match" elif avg_quality >= 0.8: overall = "✅ Good Match" elif avg_quality >= 0.6: overall = "⚠️ Fair Match" else: overall = "❌ Poor Match" info += f"\n🎯 Overall: {overall}" return info def get_updated_plot(self, frame_index): """Get updated plot with current frame highlighted""" if self.computed_metrics: return self.frame_metrics.create_modern_plot( self.computed_metrics, int(frame_index) ) return None def load_examples_from_json(json_file_path="data.json"): """Load example video pairs from JSON configuration file""" try: with open(json_file_path, "r") as f: data = json.load(f) examples = [] # Extract video pairs from the comparisons for comparison in data.get("comparisons", []): videos = comparison.get("videos", []) # Validate that video files/URLs exist or are accessible valid_videos = [] for video_path in videos: if video_path: # Check if not empty/None # Check if it's a URL if video_path.startswith(("http://", "https://")): # For URLs, we'll assume they're valid (can't easily check without downloading) # OpenCV will handle the validation during actual loading valid_videos.append(video_path) print(f"Added video URL: {video_path}") elif os.path.exists(video_path): # For local files, check existence valid_videos.append(video_path) print(f"Added local video file: {video_path}") else: print(f"Warning: Local video file not found: {video_path}") # Add to examples if we have valid videos if len(valid_videos) == 2: examples.append(valid_videos) elif len(valid_videos) == 1: # Single video example (compare with None) examples.append([valid_videos[0], None]) return examples except FileNotFoundError: print(f"Warning: {json_file_path} not found. No examples will be loaded.") return [] except json.JSONDecodeError as e: print(f"Error parsing {json_file_path}: {e}") return [] except Exception as e: print(f"Error loading examples: {e}") return [] def get_all_videos_from_json(json_file_path="data.json"): """Get list of all unique videos mentioned in the JSON file""" try: with open(json_file_path, "r") as f: data = json.load(f) all_videos = set() # Extract all unique video paths/URLs from comparisons for comparison in data.get("comparisons", []): videos = comparison.get("videos", []) for video_path in videos: if video_path: # Only add non-empty paths # Check if it's a URL or local file if video_path.startswith(("http://", "https://")): # For URLs, add them directly all_videos.add(video_path) elif os.path.exists(video_path): # For local files, check existence before adding all_videos.add(video_path) return sorted(list(all_videos)) except FileNotFoundError: print(f"Warning: {json_file_path} not found.") return [] except json.JSONDecodeError as e: print(f"Error parsing {json_file_path}: {e}") return [] except Exception as e: print(f"Error loading videos: {e}") return [] def create_app(): comparator = VideoFrameComparator() example_pairs = load_examples_from_json() all_videos = get_all_videos_from_json() with gr.Blocks( title="FrameLens - Video Frame Comparator", # theme=gr.themes.Soft(), ) as app: gr.Markdown(""" # 🎬 FrameLens - Professional Video Quality Analysis Upload two videos and compare them using comprehensive quality metrics. Perfect for analyzing compression effects, processing artifacts, and visual quality assessment. **✨ Features**: SSIM, PSNR, MSE, pHash, Color Histogram & Sharpness Analysis! """) with gr.Row(): with gr.Column(): gr.Markdown("### Video 1") video1_input = gr.File( label="Upload Video 1", file_types=[ ".mp4", ".avi", ".mov", ".mkv", ".wmv", ".flv", ".webm", ], type="filepath", ) with gr.Column(): gr.Markdown("### Video 2") video2_input = gr.File( label="Upload Video 2", file_types=[ ".mp4", ".avi", ".mov", ".mkv", ".wmv", ".flv", ".webm", ], type="filepath", ) # Add examples if available (this auto-populates inputs when clicked) if example_pairs: gr.Markdown("### 📁 Example Video Comparisons") gr.Examples( examples=example_pairs, inputs=[video1_input, video2_input], label="Click any example to load video pairs:", examples_per_page=10, ) load_btn = gr.Button("🔄 Load Videos", variant="primary", size="lg") # Frame comparison section (initially hidden) frame_display = gr.Row(visible=False) with frame_display: with gr.Column(): gr.Markdown("### Video 1 - Current Frame") frame1_output = gr.Image( label="Video 1 Frame", type="numpy", interactive=False, height=400 ) with gr.Column(): gr.Markdown("### Video 2 - Current Frame") frame2_output = gr.Image( label="Video 2 Frame", type="numpy", interactive=False, height=400 ) # Frame navigation (initially hidden) - moved underneath frames frame_controls = gr.Row(visible=False) with frame_controls: frame_slider = gr.Slider( minimum=0, maximum=0, step=1, value=0, label="Frame Number", interactive=False, ) # Comprehensive metrics visualization (initially hidden) metrics_section = gr.Row(visible=False) with metrics_section: with gr.Column(): # Frame info moved above the plot frame_info = gr.Textbox( label="Frame Information & Metrics", interactive=False, value="", lines=3, ) gr.Markdown("### 📊 Comprehensive Metrics Analysis") metrics_plot = gr.Plot( label="Multi-Metric Quality Analysis", show_label=False, ) # Status and frame info (moved below plots, initially hidden) info_section = gr.Row(visible=False) with info_section: with gr.Column(): status_output = gr.Textbox(label="Status", interactive=False, lines=8) # Event handlers def load_videos_handler(video1, video2): status, max_frames, frame1, frame2, info, plot = comparator.load_videos( video1, video2 ) # Update slider slider_update = gr.Slider( minimum=0, maximum=max_frames, step=1, value=0, interactive=True if max_frames > 0 else False, ) # Show/hide sections based on whether videos were loaded successfully videos_loaded = max_frames > 0 return ( status, # status_output slider_update, # frame_slider frame1, # frame1_output frame2, # frame2_output info, # frame_info plot, # metrics_plot gr.Row(visible=videos_loaded), # frame_controls gr.Row(visible=videos_loaded), # frame_display gr.Row(visible=videos_loaded), # metrics_section gr.Row(visible=videos_loaded), # info_section ) def update_frames(frame_index): if comparator.max_frames == 0: return None, None, "No videos loaded", None frame1, frame2 = comparator.get_frames_at_index(frame_index) info = comparator.get_current_frame_info(frame_index) plot = comparator.get_updated_plot(frame_index) return frame1, frame2, info, plot # Auto-load when examples populate the inputs def auto_load_when_examples_change(video1, video2): # Only auto-load if both inputs are provided (from examples) if video1 and video2: return load_videos_handler(video1, video2) # If only one or no videos, return default empty state return ( "Please upload videos or select an example", # status_output gr.Slider( minimum=0, maximum=0, step=1, value=0, interactive=False ), # frame_slider None, # frame1_output None, # frame2_output "", # frame_info (now in metrics_section) None, # metrics_plot gr.Row(visible=False), # frame_controls gr.Row(visible=False), # frame_display gr.Row(visible=False), # metrics_section gr.Row(visible=False), # info_section ) # Connect events load_btn.click( fn=load_videos_handler, inputs=[video1_input, video2_input], outputs=[ status_output, frame_slider, frame1_output, frame2_output, frame_info, metrics_plot, frame_controls, frame_display, metrics_section, info_section, ], ) # Auto-load when both video inputs change (triggered by examples) video1_input.change( fn=auto_load_when_examples_change, inputs=[video1_input, video2_input], outputs=[ status_output, frame_slider, frame1_output, frame2_output, frame_info, metrics_plot, frame_controls, frame_display, metrics_section, info_section, ], ) video2_input.change( fn=auto_load_when_examples_change, inputs=[video1_input, video2_input], outputs=[ status_output, frame_slider, frame1_output, frame2_output, frame_info, metrics_plot, frame_controls, frame_display, metrics_section, info_section, ], ) frame_slider.change( fn=update_frames, inputs=[frame_slider], outputs=[frame1_output, frame2_output, frame_info, metrics_plot], ) # Add comprehensive usage guide gr.Markdown(f""" ### 💡 Professional Features: - Upload videos in common formats (MP4, AVI, MOV, etc.) or use URLs - **6 Quality Metrics**: SSIM, PSNR, MSE, pHash, Color Histogram, Sharpness - **Comprehensive Visualization**: 6-panel analysis dashboard - **Real-time Analysis**: Navigate frames with live metric updates - **Smart Comparisons**: See which video performs better per metric - **Correlation Analysis**: Understand relationships between metrics {"- Click examples above for instant analysis!" if example_pairs else ""} ### 📊 Metrics Explained (with Directionality): - **SSIM** ↑: Structural Similarity (1.0 = identical, 0.0 = completely different) - **PSNR** ↑: Peak Signal-to-Noise Ratio in dB (higher = better quality) - **MSE** ↓: Mean Squared Error (lower = more similar) - **pHash** ↑: Perceptual Hash similarity (1.0 = visually identical) - **Color Histogram** ↑: Color distribution correlation (1.0 = identical colors) - **Sharpness** ↑: Laplacian variance (higher = sharper images) ### 🎯 Quality Assessment Scale: - 🟢 **Excellent**: SSIM ≥ 0.9, PSNR ≥ 40dB, MSE ≤ 50 - 🔵 **Good**: SSIM ≥ 0.8, PSNR ≥ 30dB, MSE ≤ 100 - 🟡 **Fair**: SSIM ≥ 0.6, PSNR ≥ 20dB, MSE ≤ 200 - 🔴 **Poor**: Below fair thresholds ### 🏆 Comparison Indicators: - **V1/V2 Winner**: Shows which video performs better per metric - **Significance**: Major (>20%), Moderate (10-20%), Minor (5-10%), Negligible (<5%) - **Overall Match**: Combined quality assessment across all metrics - **Arrows**: ↑ = Higher is Better, ↓ = Lower is Better ### 📁 Configuration: {f"Loaded {len(example_pairs)} example comparisons from data.json" if example_pairs else "No examples found in data.json"} {f"Available videos: {len(all_videos)} files" if all_videos else ""} """) return app def main(): app = create_app() app.launch(server_name="0.0.0.0", server_port=7860, share=False, debug=True) if __name__ == "__main__": main()