import spaces import gradio as gr from datetime import datetime import tempfile import os import json import torch import gc def debug(): torch.randn(10).cuda() debug() from PIL import Image from decord import VideoReader, cpu from yolo_detection import ( detect_people_and_machinery, annotate_video_with_bboxes, is_image, is_video ) from image_captioning import ( analyze_image_activities, analyze_video_activities, process_video_chunk, load_model_and_tokenizer, MAX_NUM_FRAMES ) # Global storage for activities and media paths global_activities = [] global_media_path = None # Create tmp directory for storing frames tmp_dir = os.path.join('.', 'tmp') os.makedirs(tmp_dir, exist_ok=True) @spaces.GPU def process_diary(day, date, total_people, total_machinery, machinery_types, activities, media): """Process the site diary entry""" global global_activities, global_media_path if media is None: return [day, date, "No media uploaded", "No media uploaded", "No media uploaded", None, None, [], None] try: if not hasattr(media, 'name'): raise ValueError("Invalid file upload") file_ext = get_file_extension(media.name) if not (is_image(media.name) or is_video(media.name)): raise ValueError(f"Unsupported file type: {file_ext}") with tempfile.NamedTemporaryFile(suffix=file_ext, delete=False) as temp_file: temp_path = temp_file.name if hasattr(media, 'name') and os.path.exists(media.name): with open(media.name, 'rb') as f: temp_file.write(f.read()) else: file_content = media.read() if hasattr(media, 'read') else media temp_file.write(file_content if isinstance(file_content, bytes) else file_content.read()) detected_people, detected_machinery, detected_machinery_types = detect_people_and_machinery(temp_path) print(f"Detected people: {detected_people}, machinery: {detected_machinery}, types: {detected_machinery_types}") annotated_video_path = None detected_activities = analyze_image_activities(temp_path) if is_image(media.name) else analyze_video_activities(temp_path) print(f"Detected activities: {detected_activities}") # Store activities and media path globally for chat mode global_activities = detected_activities global_media_path = temp_path if is_video(media.name): annotated_video_path = annotate_video_with_bboxes(temp_path) # Or use annotate_video_with_bboxes(temp_path) if implemented detected_types_str = ", ".join([f"{k}: {v}" for k, v in detected_machinery_types.items()]) # We'll return the activities as a list for the card display # Clear the chat history when loading new media chat_history = [] # Extract data for the activity table activity_rows = [] for activity in detected_activities: time = activity.get('time', 'Unknown') summary = activity.get('summary', 'No description available') activity_rows.append([time, summary]) return [day, date, str(detected_people), str(detected_machinery), detected_types_str, gr.update(visible=True), annotated_video_path, detected_activities, chat_history, activity_rows] except Exception as e: print(f"Error processing media: {str(e)}") return [day, date, "Error processing media", "Error processing media", "Error processing media", None, None, [], None, []] def get_file_extension(filename): return os.path.splitext(filename)[1].lower() def on_card_click(activity_indices, history, evt: gr.SelectData): """Handle clicking on an activity card in the gallery""" global global_activities, global_media_path # Get the index of the selected activity from the SelectData event selected_idx = evt.index # Map the gallery index to the actual activity index if selected_idx < 0 or selected_idx >= len(activity_indices): return [gr.update(visible=True), gr.update(visible=False), [], None] card_idx = activity_indices[selected_idx] print(f"Gallery item {selected_idx} clicked, corresponds to activity index: {card_idx}") if card_idx < 0 or card_idx >= len(global_activities): return [gr.update(visible=True), gr.update(visible=False), [], None] selected_activity = global_activities[card_idx] chunk_video_path = None # Use the pre-saved chunk video if available if 'chunk_path' in selected_activity and os.path.exists(selected_activity['chunk_path']): chunk_video_path = selected_activity['chunk_path'] print(f"Using pre-saved chunk video: {chunk_video_path}") else: # Fallback to full video if chunk not available chunk_video_path = global_media_path print(f"Chunk video not available, using full video: {chunk_video_path}") # Add the selected activity to chat history history = [] history.append((None, f"🎬 Selected video at timestamp {selected_activity['time']}")) # Add the thumbnail to the chat as a visual element if 'thumbnail' in selected_activity and os.path.exists(selected_activity['thumbnail']): # Use the tuple format for images in chatbot thumbnail_path = selected_activity['thumbnail'] history.append((None, f"📷 Video frame at {selected_activity['time']}")) history.append((None, thumbnail_path)) # Format message about the detected activity activity_info = f"I detected the following activity:\n\n{selected_activity['summary']}" if selected_activity['objects']: activity_info += f"\n\nIdentified objects: {', '.join(selected_activity['objects'])}" history.append(("Tell me about this video segment", activity_info)) return [gr.update(visible=False), gr.update(visible=True), history, chunk_video_path] def chat_with_video(message, history): """Chat with the mPLUG model about the selected video segment""" global global_activities, global_media_path try: # Get the selected activity from the history to identify which chunk we're discussing selected_chunk_idx = None selected_time = None selected_activity = None for entry in history: if entry[0] is None and "Selected video at timestamp" in entry[1]: time_str = entry[1].split("Selected video at timestamp ")[1] selected_time = time_str.strip() break # Find the corresponding chunk if selected_time: for i, activity in enumerate(global_activities): if activity.get('time') == selected_time: selected_chunk_idx = activity.get('chunk_id') selected_activity = activity break # If we found the chunk, use the model to analyze it if selected_chunk_idx is not None and global_media_path and selected_activity: # Load model model, tokenizer, processor = load_model_and_tokenizer() # Generate prompt based on user question and add context about what's in the video context = f"This video shows construction site activities at timestamp {selected_time}." if selected_activity.get('objects'): context += f" The scene contains {', '.join(selected_activity.get('objects'))}." prompt = f"{context} Analyze this segment of construction site video and answer this question: {message}" # This would ideally use the specific chunk, but for simplicity we'll use the global path # In a production system, you'd extract just that chunk of the video vr = VideoReader(global_media_path, ctx=cpu(0)) # Get the frames for this chunk sample_fps = round(vr.get_avg_fps() / 1) frame_idx = [i for i in range(0, len(vr), sample_fps)] # Extract frames for the specific chunk chunk_size = MAX_NUM_FRAMES # From the constants in image_captioning.py start_idx = selected_chunk_idx * chunk_size end_idx = min(start_idx + chunk_size, len(frame_idx)) chunk_frames = frame_idx[start_idx:end_idx] if chunk_frames: frames = vr.get_batch(chunk_frames).asnumpy() frames_pil = [Image.fromarray(v.astype('uint8')) for v in frames] # Process frames with model response = process_video_chunk(frames_pil, model, tokenizer, processor, prompt) # If we couldn't save a frame, just return the text response # Clean up del model, tokenizer, processor torch.cuda.empty_cache() gc.collect() return history + [(message, response)] else: return history + [(message, "Could not extract frames for this segment.")] else: # Fallback response if we can't identify the chunk thumbnail = None response_text = f"I'm analyzing your question about the video segment: {message}\n\nBased on what I can see in this segment, it appears to show construction activity with various machinery and workers on site. The specific details would depend on the exact timestamp you're referring to." # Try to get a thumbnail from the selected activity if available if selected_activity and 'thumbnail' in selected_activity and os.path.exists(selected_activity['thumbnail']): thumbnail = selected_activity['thumbnail'] new_history = history + [(message, response_text)] new_history.append((None, f"📷 Video frame at {selected_time}")) new_history.append((None, thumbnail)) return new_history return history + [(message, response_text)] except Exception as e: print(f"Error in chat_with_video: {str(e)}") return history + [(message, f"I encountered an error while processing your question. Let me try to answer based on what I can see: {message}\n\nThe video appears to show construction site activities, but I'm having trouble with the detailed analysis at the moment.")] # Native Gradio activity cards def create_activity_cards_ui(activities): """Create activity cards using native Gradio components""" if not activities: return gr.HTML("