Spaces:
Running
Running
| from DepthEstimator import DepthEstimator | |
| import numpy as np | |
| from PIL import Image | |
| import os | |
| from GenerateCaptions import generate_caption | |
| import re | |
| from config import LOGS_DIR | |
| from transformers import AutoProcessor, AutoModelForZeroShotObjectDetection | |
| import torch | |
| from PIL import Image, ImageDraw, ImageFont | |
| import spacy | |
| import gc | |
| class SoundMapper: | |
| def __init__(self): | |
| self.depth_estimator = DepthEstimator() | |
| # List of depth maps in dict["predicted_depth" ,"depth"] in (tensor, PIL.Image) format | |
| self.device = "cuda" | |
| # self.map_list = self.depth_estimator.estimate_depth(self.depth_estimator.image_dir) | |
| self.map_list = None | |
| self.image_dir = self.depth_estimator.image_dir | |
| # self.nlp = spacy.load("en_core_web_sm") | |
| self.nlp = None | |
| self.dino = None | |
| self.dino_processor = None | |
| # self.dino = AutoModelForZeroShotObjectDetection.from_pretrained("IDEA-Research/grounding-dino-tiny").to(self.device) | |
| # self.dino_processor = AutoProcessor.from_pretrained("IDEA-Research/grounding-dino-tiny") | |
| def _load_nlp(self): | |
| if self.nlp is None: | |
| self.nlp = spacy.load("en_core_web_sm") | |
| return self.nlp | |
| def _load_depth_maps(self): | |
| if self.map_list is None: | |
| self.map_list = self.depth_estimator.estimate_depth(self.depth_estimator.image_dir) | |
| return self.map_list | |
| def process_depth_maps(self) -> list: | |
| depth_maps = self._load_depth_maps() | |
| processed_maps = [] | |
| for item in depth_maps: | |
| depth_map = item["depth"] | |
| depth_array = np.array(depth_map) | |
| normalization = depth_array / 255.0 | |
| processed_maps.append({ | |
| "original": depth_map, | |
| "normalization": normalization | |
| }) | |
| return processed_maps | |
| # def create_depth_zone(self, processed_maps : list, num_zones = 3): | |
| # zones_data = [] | |
| # for depth_data in processed_maps: | |
| # normalized = depth_data["normalization"] | |
| # thresholds = np.linspace(0, 1, num_zones+1) | |
| # zones = [] | |
| # for i in range(num_zones): | |
| # zone_mask = (normalized >= thresholds[i]) & (normalized < thresholds[i+1]) | |
| # zone_percentage = zone_mask.sum() / zone_mask.size | |
| # zones.append({ | |
| # "range": (thresholds[i], thresholds[i+1]), | |
| # "percentage": zone_percentage, | |
| # "mask": zone_mask | |
| # }) | |
| # zones_data.append(zones) | |
| # return zones_data | |
| def detect_sound_sources(self, caption_text: str) -> dict: | |
| """ | |
| Extract nouns and their sound descriptions from caption text. | |
| Returns a dictionary mapping nouns to their descriptions. | |
| """ | |
| sound_sources = {} | |
| nlp = self._load_nlp() | |
| print(f"\n[DEBUG] Beginning sound source detection") | |
| print(f"Raw caption text length: {len(caption_text)}") | |
| print(f"First 100 chars: {caption_text[:100]}...") | |
| # Split the caption by newlines to separate entries | |
| lines = caption_text.strip().split('\n') | |
| print(f"Found {len(lines)} lines after splitting") | |
| for i, line in enumerate(lines): | |
| # Skip empty lines | |
| if not line.strip(): | |
| continue | |
| print(f"Processing line {i}: {line[:50]}{'...' if len(line) > 50 else ''}") | |
| # Check if line matches the expected format (Noun: description) | |
| if ':' in line: | |
| parts = line.split(':', 1) # Split only on the first colon | |
| # Clean up the noun part - remove numbers and leading/trailing whitespace | |
| noun_part = parts[0].strip().lower() | |
| # Remove list numbering (e.g., "1. ", "2. ", etc.) | |
| noun_part = re.sub(r'^\d+\.\s*', '', noun_part) | |
| description = parts[1].strip() | |
| # Clean any markdown formatting | |
| noun = re.sub(r'[*()]', '', noun_part).strip() | |
| description = re.sub(r'[*()]', '', description).strip() | |
| # Separate the description at em dash if present | |
| if ' — ' in description: | |
| description = description.split(' — ', 1)[0].strip() | |
| elif ' - ' in description: | |
| description = description.split(' - ', 1)[0].strip() | |
| print(f" - Found potential noun: '{noun}' with description: '{description[:30]}...'") | |
| # Skip if noun contains invalid characters or is too short | |
| if '##' not in noun and len(noun) > 1 and noun[0].isalpha(): | |
| sound_sources[noun] = description | |
| print(f" √ Added to sound sources") | |
| else: | |
| print(f" × Skipped (invalid format)") | |
| # If no structured format found, try to extract nouns from the text | |
| if not sound_sources: | |
| print("No structured format found, falling back to noun extraction") | |
| all_nouns = [] | |
| doc = nlp(caption_text) | |
| for token in doc: | |
| if token.pos_ == "NOUN" and len(token.text) > 1: | |
| if token.text[0].isalpha(): | |
| all_nouns.append(token.text.lower()) | |
| print(f" - Extracted noun: '{token.text.lower()}'") | |
| for noun in all_nouns: | |
| sound_sources[noun] = "" # Empty description | |
| print(f"[DEBUG] Final detected sound sources: {list(sound_sources.keys())}") | |
| return sound_sources | |
| def map_bbox_to_depth_zone(self, bbox, depth_map, num_zones=3): | |
| x1, y1, x2, y2 = [int(coord) for coord in bbox] | |
| height, width = depth_map.shape | |
| x1, y1 = max(0, x1), max(0, y1) | |
| x2, y2 = min(width, x2), min(height, y2) | |
| depth_roi = depth_map[y1:y2, x1:x2] | |
| if depth_roi.size == 0: | |
| return num_zones - 1 | |
| mean_depth = np.mean(depth_roi) | |
| thresholds = self.create_histogram_depth_zones(depth_map, num_zones) | |
| for i in range(num_zones): | |
| if thresholds[i] <= mean_depth < thresholds[i+1]: | |
| return i | |
| return num_zones - 1 | |
| def detect_objects(self, nouns : list, image: Image): | |
| filtered_nouns = [] | |
| for noun in nouns: | |
| if '##' not in noun and len(noun) > 1 and noun[0].isalpha(): | |
| filtered_nouns.append(noun) | |
| print(f"Detecting objects for nouns: {filtered_nouns}") | |
| if self.dino is None: | |
| self.dino = AutoModelForZeroShotObjectDetection.from_pretrained("IDEA-Research/grounding-dino-base").to(self.device) | |
| self.dino_processor = AutoProcessor.from_pretrained("IDEA-Research/grounding-dino-base") | |
| else: | |
| self.dino = self.dino.to(self.device) | |
| text_prompt = " . ".join(filtered_nouns) | |
| inputs = self.dino_processor(images=image, text=text_prompt, return_tensors="pt").to(self.device) | |
| with torch.no_grad(): | |
| outputs = self.dino(**inputs) | |
| results = self.dino_processor.post_process_grounded_object_detection( | |
| outputs, | |
| inputs.input_ids, | |
| box_threshold=0.25, | |
| text_threshold=0.25, | |
| target_sizes=[image.size[::-1]] | |
| ) | |
| result = results[0] | |
| labels = result["labels"] | |
| bboxes = result["boxes"] | |
| clean_labels = [] | |
| for label in labels: | |
| clean_label = re.sub(r'##\w+', '', label) | |
| clean_label = self._split_combined_words(clean_label, filtered_nouns) | |
| clean_labels.append(clean_label) | |
| self.dino = self.dino.to("cpu") | |
| torch.cuda.empty_cache() | |
| del inputs, outputs, results | |
| print(f"Detected objects: {clean_labels}") | |
| return (clean_labels, bboxes) | |
| def _split_combined_words(self, text, nouns=None): | |
| nlp = self._load_nlp() | |
| if nouns is None: | |
| known_words = set() | |
| doc = nlp(text) | |
| for token in doc: | |
| if token.pos_ == "NOUN" and len(token.text) > 1: | |
| known_words.add(token.text.lower()) | |
| else: | |
| known_words = set(nouns) | |
| result = [] | |
| for word in text.split(): | |
| if word in known_words: | |
| result.append(word) | |
| continue | |
| found = False | |
| for known in known_words: | |
| if known in word and len(known) > 2: | |
| result.append(known) | |
| found = True | |
| if not found: | |
| result.append(word) | |
| return " ".join(result) | |
| def process_dino_labels(self, labels): | |
| processed_labels = [] | |
| nlp = self._load_nlp() | |
| for label in labels: | |
| if label.startswith('##'): | |
| continue | |
| label = re.sub(r'[*()]', '', label).strip() | |
| parts = label.split() | |
| for part in parts: | |
| if part.startswith('##'): | |
| continue | |
| doc = nlp(part) | |
| for token in doc: | |
| if token.pos_ == "NOUN" and len(token.text) > 1: | |
| processed_labels.append(token.text.lower()) | |
| unique_labels = [] | |
| for label in processed_labels: | |
| if label not in unique_labels: | |
| unique_labels.append(label) | |
| return unique_labels | |
| def create_histogram_depth_zones(self, depth_map, num_zones = 3): | |
| # using 50 bins because it is faster | |
| hist, bin_edge = np.histogram(depth_map.flatten(), bins=50, range=(0, 1)) | |
| cumulative = np.cumsum(hist) / np.sum(hist) | |
| thresholds = [0.0] | |
| for i in range(1, num_zones): | |
| target = i / num_zones | |
| idx = np.argmin(np.abs(cumulative - target)) | |
| thresholds.append(bin_edge[idx + 1]) | |
| thresholds.append(1.0) | |
| return thresholds | |
| def analyze_object_depths(self, image_path, depth_map, lat, lon, caption_data=None, all_objects=False): | |
| image = Image.open(image_path) | |
| if caption_data is None: | |
| caption = generate_caption(lat, lon) | |
| if not caption: | |
| print(f"Failed to generate caption for {image_path}") | |
| return [] | |
| caption_text = caption.get("sound_description", "") | |
| else: | |
| caption_text = caption_data.get("sound_description", "") | |
| # Debug: Print the raw caption text | |
| print(f"\n[DEBUG] Raw caption text for {os.path.basename(image_path)}:") | |
| print(caption_text) | |
| print("-" * 50) | |
| if not caption_text: | |
| print(f"No caption text available for {image_path}") | |
| return [] | |
| # Extract nouns and their sound descriptions | |
| sound_sources = self.detect_sound_sources(caption_text) | |
| # Debug: Print the extracted sound sources | |
| print(f"[DEBUG] Extracted sound sources:") | |
| for noun, desc in sound_sources.items(): | |
| print(f" - {noun}: {desc}") | |
| print("-" * 50) | |
| if not sound_sources: | |
| print(f"No sound sources detected in caption for {image_path}") | |
| return [] | |
| # Get list of nouns only for object detection | |
| nouns = list(sound_sources.keys()) | |
| # Debug: Print the list of nouns being used for detection | |
| print(f"[DEBUG] Nouns for object detection: {nouns}") | |
| print("-" * 50) | |
| labels, bboxes = self.detect_objects(nouns, image) | |
| if len(labels) == 0 or len(bboxes) == 0: | |
| print(f"No objects detected in {image_path}") | |
| return [] | |
| object_data = [] | |
| known_objects = set(nouns) if nouns else set() | |
| for i, (label, bbox) in enumerate(zip(labels, bboxes)): | |
| if '##' in label: | |
| continue | |
| x1, y1, x2, y2 = [int(coord) for coord in bbox] | |
| height, width = depth_map.shape | |
| x1, y1 = max(0, x1), max(0, y1) | |
| x2, y2 = min(width, x2), min(height, y2) | |
| depth_roi = depth_map[y1:y2, x1:x2] | |
| if depth_roi.size == 0: | |
| continue | |
| mean_depth = np.mean(depth_roi) | |
| matched_noun = None | |
| matched_desc = None | |
| for word in label.split(): | |
| word = word.lower() | |
| if word in sound_sources: | |
| matched_noun = word | |
| matched_desc = sound_sources[word] | |
| break | |
| if matched_noun is None: | |
| for noun in sound_sources: | |
| if noun in label.lower(): | |
| matched_noun = noun | |
| matched_desc = sound_sources[noun] | |
| break | |
| if matched_noun is None: | |
| for word in label.split(): | |
| if len(word) > 1 and word[0].isalpha() and '##' not in word: | |
| matched_noun = word.lower() | |
| matched_desc = "" # No description available | |
| break | |
| if matched_noun: | |
| thresholds = self.create_histogram_depth_zones(depth_map, num_zones=3) | |
| zone = 0 # The default is 0 which is the closest zone | |
| for i in range(3): | |
| if thresholds[i] <= mean_depth < thresholds[i+1]: | |
| zone = i | |
| break | |
| object_data.append({ | |
| "original_label": matched_noun, | |
| "bbox": bbox.tolist(), | |
| "depth_zone": zone, | |
| "zone_description": ["near", "medium", "far"][zone], | |
| "mean_depth": mean_depth, | |
| "weight": 1.0 - mean_depth, | |
| "sound_description": matched_desc | |
| }) | |
| if all_objects: | |
| object_data.sort(key=lambda x: x["mean_depth"]) | |
| return object_data | |
| else: | |
| if not object_data: | |
| return [] | |
| closest_object = min(object_data, key=lambda x: x["mean_depth"]) | |
| return [closest_object] | |
| def cleanup(self): | |
| if hasattr(self, 'depth_estimator') and self.depth_estimator is not None: | |
| del self.depth_estimator | |
| self.depth_estimator = None | |
| if self.map_list is not None: | |
| del self.map_list | |
| self.map_list = None | |
| if self.dino is not None: | |
| self.dino = self.dino.to("cpu") | |
| del self.dino | |
| self.dino = None | |
| del self.dino_processor | |
| self.dino_processor = None | |
| if self.nlp is not None: | |
| del self.nlp | |
| self.nlp = None | |
| torch.cuda.empty_cache() | |
| gc.collect() | |
| def test_object_depth_analysis(self): | |
| """ | |
| Test the object depth analysis on all images in the directory. | |
| """ | |
| # Process depth maps first | |
| processed_maps = self.process_depth_maps() | |
| # Get list of original image paths | |
| image_dir = self.depth_estimator.image_dir | |
| image_paths = [os.path.join(image_dir, f) for f in os.listdir(image_dir) if f.endswith(".jpg")] | |
| results = [] | |
| # For each image and its corresponding depth map | |
| for i, (image_path, processed_map) in enumerate(zip(image_paths, processed_maps)): | |
| # Extract the normalized depth map | |
| depth_map = processed_map["normalization"] | |
| # Analyze objects and their depths | |
| object_depths = self.analyze_object_depths(image_path, depth_map) | |
| # Store results | |
| results.append({ | |
| "image_path": image_path, | |
| "object_depths": object_depths | |
| }) | |
| # Print some information for debugging | |
| print(f"Analyzed {image_path}:") | |
| for obj in object_depths: | |
| print(f" - {obj['original_label']} (Zone: {obj['zone_description']})") | |
| return results | |