import spaces import gradio as gr from datetime import datetime import tempfile import os import json import torch import gc import shutil # Added for directory cleanup from azure.storage.blob import BlobServiceClient # Added for Azure integration def debug(): torch.randn(10).cuda() debug() from PIL import Image from decord import VideoReader, cpu from yolo_detection import ( detect_people_and_machinery, # Keep for images # annotate_video_with_bboxes, # Replaced by unified function process_video_unified, # Import the new unified function is_image, is_video ) from image_captioning import ( analyze_image_activities, analyze_video_activities, process_video_chunk, load_model_and_tokenizer, MAX_NUM_FRAMES ) # Load model instance once gc.collect() torch.cuda.empty_cache() model, tokenizer, processor = load_model_and_tokenizer() print("Model loaded.") # Azure Blob Storage Setup CONTAINER_NAME = "logs" # Replace with your actual container name connection_string = "BlobEndpoint=https://assentian.blob.core.windows.net/;QueueEndpoint=https://assentian.queue.core.windows.net/;FileEndpoint=https://assentian.file.core.windows.net/;TableEndpoint=https://assentian.table.core.windows.net/;SharedAccessSignature=sv=2024-11-04&ss=bfqt&srt=sco&sp=rwdlacupiytfx&se=2025-04-30T17:16:18Z&st=2025-04-22T09:16:18Z&spr=https&sig=AkJb79C%2FJ0G1HqfotIYuSfm%2Fb%2BQ2E%2FjvxV3ZG7ejVQo%3D" if not connection_string: print("Warning: AZURE_STORAGE_CONNECTION_STRING not found. Azure Blob functionality will be disabled.") # Initialize Azure Blob Service Client if connection string is available blob_service_client = None if connection_string: try: blob_service_client = BlobServiceClient.from_connection_string(connection_string) print("Azure Blob Service Client initialized successfully.") except Exception as e: print(f"Error initializing BlobServiceClient: {str(e)}") blob_service_client = None def list_blobs(): """List video blobs in the specified Azure container.""" if not blob_service_client: print("Cannot list blobs: BlobServiceClient is not initialized.") return [] try: container_client = blob_service_client.get_container_client(CONTAINER_NAME) blobs = container_client.list_blobs() video_extensions = ['.mp4', '.mkv', '.mov', '.avi', '.flv', '.wmv', '.webm', '.m4v'] blob_list = [blob.name for blob in blobs if any(blob.name.lower().endswith(ext) for ext in video_extensions)] print(f"Found {len(blob_list)} video blobs in container '{CONTAINER_NAME}': {blob_list}") return blob_list except Exception as e: print(f"Error listing blobs in container '{CONTAINER_NAME}': {str(e)}") return [] # Fetch blob names at startup blob_names = list_blobs() print(f"Populated azure_blob dropdown with {len(blob_names)} options.") # Global storage for activities and media paths global_activities = [] global_media_path = None global_temp_media_path = None # Store path if downloaded from Azure for cleanup # Create tmp directory for storing frames tmp_dir = os.path.join('.', 'tmp') os.makedirs(tmp_dir, exist_ok=True) @spaces.GPU def process_diary(day, date, total_people, total_machinery, machinery_types, activities, media_source, local_file, azure_blob): """Process the site diary entry with media from local file or Azure Blob Storage.""" global global_activities, global_media_path, global_temp_media_path global_temp_media_path = None # Reset before processing if media_source == "Local File": if local_file is None: return [day, date, "No media uploaded", "No media uploaded", "No media uploaded", None, None, [], None, []] media_path = local_file # local_file is a string path in Gradio print(f"Processing local file: {media_path}") else: # Azure Blob if not azure_blob or not blob_service_client: return [day, date, "No blob selected or Azure not configured", "No blob selected or Azure not configured", "No blob selected or Azure not configured", None, None, [], None, []] try: blob_client = blob_service_client.get_blob_client(container=CONTAINER_NAME, blob=azure_blob) with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(azure_blob)[1]) as temp_file: temp_path = temp_file.name blob_data = blob_client.download_blob() blob_data.readinto(temp_file) media_path = temp_path global_temp_media_path = media_path # Store for cleanup print(f"Downloaded Azure blob '{azure_blob}' to temporary file: {media_path}") except Exception as e: print(f"Error downloading blob '{azure_blob}': {str(e)}") return [day, date, "Error downloading blob", "Error downloading blob", "Error downloading blob", None, None, [], None, []] # Ensure cleanup happens even on error try: file_ext = get_file_extension(media_path) if not (is_image(media_path) or is_video(media_path)): raise ValueError(f"Unsupported file type: {file_ext}") annotated_video_path = None # Initialize if is_image(media_path): # Process image with original function detected_people, detected_machinery, detected_machinery_types = detect_people_and_machinery(media_path) detected_activities = analyze_image_activities(media_path) else: # It's a video # Process video with the unified function print("Processing video with unified YOLO function...") detected_people, detected_machinery, detected_machinery_types, annotated_video_path = process_video_unified(media_path) print(f"Unified YOLO results - People: {detected_people}, Machinery: {detected_machinery}, Types: {detected_machinery_types}, Annotated Video: {annotated_video_path}") # Now analyze activities detected_activities = analyze_video_activities(media_path, model, tokenizer, processor) # Debug the detected activities print(f"Detected activities (raw): {detected_activities}") print(f"Type of detected_activities: {type(detected_activities)}") # Ensure detected_activities is a list of dictionaries if isinstance(detected_activities, str): print("Warning: detected_activities is a string, converting to list of dicts.") detected_activities = [{"time": "Unknown", "summary": detected_activities}] elif not isinstance(detected_activities, list): print("Warning: detected_activities is not a list, wrapping in a list.") detected_activities = [{"time": "Unknown", "summary": str(detected_activities)}] # Validate each activity for activity in detected_activities: if not isinstance(activity, dict): print(f"Warning: Invalid activity format: {activity}, converting.") activity = {"time": "Unknown", "summary": str(activity)} print(f"Processed detected_activities: {detected_activities}") # Store activities and media path globally for chat mode global_activities = detected_activities global_media_path = media_path # The annotation is now handled within process_video_unified for videos # if is_video(media_path): # annotated_video_path = annotate_video_with_bboxes(media_path) # Removed duplicate call # print(f"Generated annotated video: {annotated_video_path}") # Clean up temporary file if downloaded from Azure - This is now handled in the finally block # if media_source == "Azure Blob" and os.path.exists(media_path): # os.remove(media_path) # print(f"Cleaned up temporary file: {media_path}") detected_types_str = ", ".join([f"{k}: {v}" for k, v in detected_machinery_types.items()]) # The cleanup for Azure temp files is now handled in the finally block # os.remove(media_path) # print(f"Cleaned up temporary file: {media_path}") # We'll return the activities as a list for the card display # Clear the chat history when loading new media chat_history = [] # Extract data for the activity table activity_rows = [] for activity in detected_activities: time = activity.get('time', 'Unknown') summary = activity.get('summary', 'No description available') activity_rows.append([time, summary]) print(f"Activity rows for Dataframe: {activity_rows}") return [day, date, str(detected_people), str(detected_machinery), detected_types_str, gr.update(visible=True), annotated_video_path, detected_activities, chat_history, activity_rows] except Exception as e: print(f"Error processing media: {str(e)}") # Cleanup is handled in finally block now # if media_source == "Azure Blob" and os.path.exists(media_path): # os.remove(media_path) # print(f"Cleaned up temporary file due to error: {media_path}") return [day, date, "Error processing media", "Error processing media", "Error processing media", None, None, [], None, []] finally: # Cleanup temporary files and GPU memory print("Running cleanup...") if global_temp_media_path and os.path.exists(global_temp_media_path): try: os.remove(global_temp_media_path) print(f"Cleaned up temporary Azure file: {global_temp_media_path}") except OSError as e: print(f"Error removing temporary Azure file {global_temp_media_path}: {e}") # Clear GPU cache if torch.cuda.is_available(): torch.cuda.empty_cache() gc.collect() print("Cleared GPU cache.") def get_file_extension(filename): return os.path.splitext(filename)[1].lower() def on_card_click(activity_indices, history, evt: gr.SelectData): """Handle clicking on an activity card in the gallery""" global global_activities, global_media_path # Get the index of the selected activity from the SelectData event selected_idx = evt.index # Map the gallery index to the actual activity index if selected_idx < 0 or selected_idx >= len(activity_indices): return [gr.update(visible=True), gr.update(visible=False), [], None] card_idx = activity_indices[selected_idx] print(f"Gallery item {selected_idx} clicked, corresponds to activity index: {card_idx}") if card_idx < 0 or card_idx >= len(global_activities): return [gr.update(visible=True), gr.update(visible=False), [], None] selected_activity = global_activities[card_idx] chunk_video_path = None # Use the pre-saved chunk video if available if 'chunk_path' in selected_activity and os.path.exists(selected_activity['chunk_path']): chunk_video_path = selected_activity['chunk_path'] print(f"Using pre-saved chunk video: {chunk_video_path}") else: # Fallback to full video if chunk not available chunk_video_path = global_media_path print(f"Chunk video not available, using full video: {chunk_video_path}") # Add the selected activity to chat history history = [] history.append((None, f"🎬 Selected video at timestamp {selected_activity['time']}")) # Add the thumbnail to the chat as a visual element if 'thumbnail' in selected_activity and os.path.exists(selected_activity['thumbnail']): # Use the tuple format for images in chatbot thumbnail_path = selected_activity['thumbnail'] history.append((None, f"📷 Video frame at {selected_activity['time']}")) history.append((None, thumbnail_path)) # Format message about the detected activity activity_info = f"I detected the following activity:\n\n{selected_activity['summary']}" if selected_activity['objects']: activity_info += f"\n\nIdentified objects: {', '.join(selected_activity['objects'])}" history.append(("Tell me about this video segment", activity_info)) return [gr.update(visible=False), gr.update(visible=True), history, chunk_video_path] def chat_with_video(message, history): """Chat with the mPLUG model about the selected video segment""" global global_activities, global_media_path try: # Get the selected activity from the history to identify which chunk we're discussing selected_chunk_idx = None selected_time = None selected_activity = None for entry in history: if entry[0] is None and "Selected video at timestamp" in entry[1]: time_str = entry[1].split("Selected video at timestamp ")[1] selected_time = time_str.strip() break # Find the corresponding chunk if selected_time: for i, activity in enumerate(global_activities): if activity.get('time') == selected_time: selected_chunk_idx = activity.get('chunk_id') selected_activity = activity break # If we found the chunk, use the model to analyze it if selected_chunk_idx is not None and global_media_path and selected_activity: # Generate prompt based on user question and add context about what's in the video context = f"This video shows construction site activities at timestamp {selected_time}." if selected_activity.get('objects'): context += f" The scene contains {', '.join(selected_activity.get('objects'))}." prompt = f"{context} Analyze this segment of construction site video and answer this question: {message}" # This would ideally use the specific chunk, but for simplicity we'll use the global path # In a production system, you'd extract just that chunk of the video vr = VideoReader(global_media_path, ctx=cpu(0)) # Get the frames for this chunk sample_fps = round(vr.get_avg_fps() / 1) frame_idx = [i for i in range(0, len(vr), sample_fps)] # Extract frames for the specific chunk chunk_size = MAX_NUM_FRAMES start_idx = selected_chunk_idx * chunk_size end_idx = min(start_idx + chunk_size, len(frame_idx)) chunk_frames = frame_idx[start_idx:end_idx] if chunk_frames: frames = vr.get_batch(chunk_frames).asnumpy() frames_pil = [Image.fromarray(v.astype('uint8')) for v in frames] # Process frames with model response = process_video_chunk(frames_pil, model, tokenizer, processor, prompt) return history + [(message, response)] else: return history + [(message, "Could not extract frames for this segment.")] else: # Fallback response if we can't identify the chunk thumbnail = None response_text = f"I'm analyzing your question about the video segment: {message}\n\nBased on what I can see in this segment, it appears to show construction activity with various machinery and workers on site. The specific details would depend on the exact timestamp you're referring to." # Try to get a thumbnail from the selected activity if available if selected_activity and 'thumbnail' in selected_activity and os.path.exists(selected_activity['thumbnail']): thumbnail = selected_activity['thumbnail'] new_history = history + [(message, response_text)] new_history.append((None, f"📷 Video frame at {selected_time}")) new_history.append((None, thumbnail)) return new_history return history + [(message, response_text)] except Exception as e: print(f"Error in chat_with_video: {str(e)}") return history + [(message, f"I encountered an error while processing your question. Let me try to answer based on what I can see: {message}\n\nThe video appears to show construction site activities, but I'm having trouble with the detailed analysis at the moment.")] # Native Gradio activity cards def create_activity_cards_ui(activities): """Create activity cards using native Gradio components""" if not activities: return gr.HTML("