diff --git "a/scene_analyzer.py" "b/scene_analyzer.py" --- "a/scene_analyzer.py" +++ "b/scene_analyzer.py" @@ -6,92 +6,220 @@ from spatial_analyzer import SpatialAnalyzer from scene_description import SceneDescriptor from enhance_scene_describer import EnhancedSceneDescriber from clip_analyzer import CLIPAnalyzer +from landmark_activities import LANDMARK_ACTIVITIES +from clip_zero_shot_classifier import CLIPZeroShotClassifier from llm_enhancer import LLMEnhancer from scene_type import SCENE_TYPES from object_categories import OBJECT_CATEGORIES +from landmark_data import ALL_LANDMARKS + class SceneAnalyzer: """ Core class for scene analysis and understanding based on object detection results. Analyzes detected objects, their relationships, and infers the scene type. """ - def __init__(self, class_names: Dict[int, str] = None, use_llm: bool = True, llm_model_path: str = None): + EVERYDAY_SCENE_TYPE_KEYS = [ + "general_indoor_space", "generic_street_view", + "desk_area_workspace", "outdoor_gathering_spot", + "kitchen_counter_or_utility_area" + ] + + def __init__(self, class_names: Dict[int, str] = None, use_llm: bool = True, use_clip: bool = True, enable_landmark=True, llm_model_path: str = None): """ Initialize the scene analyzer with optional class name mappings. Args: class_names: Dictionary mapping class IDs to class names (optional) """ - self.class_names = class_names + try: + self.class_names = class_names - # 加載場景類型和物體類別 - self.SCENE_TYPES = SCENE_TYPES - self.OBJECT_CATEGORIES = OBJECT_CATEGORIES + self.use_clip = use_clip + self.use_landmark_detection = enable_landmark + self.enable_landmark = enable_landmark - # 初始化其他組件,將數據傳遞給 SceneDescriptor - self.spatial_analyzer = SpatialAnalyzer(class_names=class_names, object_categories=self.OBJECT_CATEGORIES) - self.descriptor = SceneDescriptor(scene_types=self.SCENE_TYPES, object_categories=self.OBJECT_CATEGORIES) - self.scene_describer = EnhancedSceneDescriber(scene_types=self.SCENE_TYPES) + # 初始化基本屬性 + self.LANDMARK_ACTIVITIES = {} + self.SCENE_TYPES = {} + self.OBJECT_CATEGORIES = {} - # 初始化 CLIP 分析器 - try: - self.clip_analyzer = CLIPAnalyzer() - self.use_clip = True - except Exception as e: - print(f"Warning: Could not initialize CLIP analyzer: {e}") - print("Scene analysis will proceed without CLIP. Install CLIP with 'pip install clip' for enhanced scene understanding.") - self.use_clip = False + # 嘗試加載資料 + try: + self.LANDMARK_ACTIVITIES = LANDMARK_ACTIVITIES + print("Loaded LANDMARK_ACTIVITIES successfully") + except Exception as e: + print(f"Warning: Failed to load LANDMARK_ACTIVITIES: {e}") + + try: + self.SCENE_TYPES = SCENE_TYPES + print("Loaded SCENE_TYPES successfully") + except Exception as e: + print(f"Warning: Failed to load SCENE_TYPES: {e}") + + try: + self.OBJECT_CATEGORIES = OBJECT_CATEGORIES + print("Loaded OBJECT_CATEGORIES successfully") + except Exception as e: + print(f"Warning: Failed to load OBJECT_CATEGORIES: {e}") + + # 初始化其他組件 + self.spatial_analyzer = None + self.descriptor = None + self.scene_describer = None + + try: + self.spatial_analyzer = SpatialAnalyzer(class_names=class_names, object_categories=self.OBJECT_CATEGORIES) + print("Initialized SpatialAnalyzer successfully") + except Exception as e: + print(f"Error initializing SpatialAnalyzer: {e}") + import traceback + traceback.print_exc() - # 初始化LLM Model - self.use_llm = use_llm - if use_llm: try: - # from llm_enhancer import LLMEnhancer - self.llm_enhancer = LLMEnhancer(model_path=llm_model_path) - print(f"LLM enhancer initialized successfully.") + self.descriptor = SceneDescriptor(scene_types=self.SCENE_TYPES, object_categories=self.OBJECT_CATEGORIES) + print("Initialized SceneDescriptor successfully") except Exception as e: - print(f"Warning: Could not initialize LLM enhancer: {e}") - print("Scene analysis will proceed without LLM. Make sure required packages are installed.") - self.use_llm = False + print(f"Error initializing SceneDescriptor: {e}") + import traceback + traceback.print_exc() + + try: + if self.spatial_analyzer: + self.scene_describer = EnhancedSceneDescriber(scene_types=self.SCENE_TYPES, spatial_analyzer_instance=self.spatial_analyzer) + print("Initialized EnhancedSceneDescriber successfully") + else: + print("Warning: Cannot initialize EnhancedSceneDescriber without SpatialAnalyzer") + except Exception as e: + print(f"Error initializing EnhancedSceneDescriber: {e}") + import traceback + traceback.print_exc() + + # 初始化 CLIP 分析器 + if self.use_clip: + try: + self.clip_analyzer = CLIPAnalyzer() + + try: + # 嘗試使用已加載的CLIP模型實例 + if hasattr(self.clip_analyzer, 'get_clip_instance'): + model, preprocess, device = self.clip_analyzer.get_clip_instance() + self.landmark_classifier = CLIPZeroShotClassifier(device=device) + print("Initialized landmark classifier with shared CLIP model") + else: + self.landmark_classifier = CLIPZeroShotClassifier() + + # 配置地標檢測器 + self.landmark_classifier.set_batch_size(8) # 設置合適的批處理大小 + self.landmark_classifier.adjust_confidence_threshold("full_image", 0.8) # 整張圖像的閾值要求 + self.landmark_classifier.adjust_confidence_threshold("distant", 0.65) # 遠景地標的閾值要求 + + self.use_landmark_detection = True + print("Landmark detection enabled with optimized settings") + + except (ImportError, Exception) as e: + print(f"Warning: Could not initialize landmark classifier: {e}") + self.use_landmark_detection = False + + except Exception as e: + print(f"Warning: Could not initialize CLIP analyzer: {e}") + print("Scene analysis will proceed without CLIP. Install CLIP with 'pip install clip' for enhanced scene understanding.") + self.use_clip = False + + # 初始化LLM Model + self.use_llm = use_llm + if use_llm: + try: + # from llm_enhancer import LLMEnhancer + self.llm_enhancer = LLMEnhancer(model_path=llm_model_path) + print(f"LLM enhancer initialized successfully.") + except Exception as e: + print(f"Warning: Could not initialize LLM enhancer: {e}") + print("Scene analysis will proceed without LLM. Make sure required packages are installed.") + self.use_llm = False + + except Exception as e: + print(f"Critical error during SceneAnalyzer initialization: {e}") + import traceback + traceback.print_exc() + raise + def generate_scene_description(self, - scene_type, - detected_objects, - confidence, - lighting_info=None, - functional_zones=None): + scene_type: str, + detected_objects: List[Dict], + confidence: float, + lighting_info: Optional[Dict] = None, + functional_zones: Optional[Dict] = None, + enable_landmark: bool = True, + scene_scores: Optional[Dict] = None, + spatial_analysis: Optional[Dict] = None, + image_dimensions: Optional[Tuple[int, int]] = None + ): """ - 生成場景描述。 + 生成場景描述,並將所有必要的上下文傳遞給底層的描述器。 Args: scene_type: 識別的場景類型 detected_objects: 檢測到的物體列表 confidence: 場景分類置信度 lighting_info: 照明條件信息(可選) functional_zones: 功能區域信息(可選) + enable_landmark: 是否啟用地標描述(可選) + scene_scores: 場景分數(可選) + spatial_analysis: 空間分析結果(可選) + image_dimensions: 圖像尺寸 (寬, 高)(可選) Returns: str: 生成的場景描述 """ + + # 轉換 functional_zones 從 Dict 到 List[str],並過濾技術術語 + functional_zones_list = [] + if functional_zones and isinstance(functional_zones, dict): + # 過濾掉技術術語,只保留有意義的描述 + filtered_zones = {k: v for k, v in functional_zones.items() + if not k.endswith('_zone') or k in ['dining_zone', 'seating_zone', 'work_zone']} + functional_zones_list = [v.get('description', k) for k, v in filtered_zones.items() + if isinstance(v, dict) and v.get('description')] + elif functional_zones and isinstance(functional_zones, list): + # 過濾列表中的技術術語 + functional_zones_list = [zone for zone in functional_zones + if not zone.endswith('_zone') or 'area' in zone] + + # 生成詳細的物體統計信息 + object_statistics = {} + for obj in detected_objects: + class_name = obj.get("class_name", "unknown") + if class_name not in object_statistics: + object_statistics[class_name] = { + "count": 0, + "avg_confidence": 0.0, + "max_confidence": 0.0, + "instances": [] + } + + stats = object_statistics[class_name] + stats["count"] += 1 + stats["instances"].append(obj) + stats["max_confidence"] = max(stats["max_confidence"], obj.get("confidence", 0.0)) + + # 計算平均信心度 + for class_name, stats in object_statistics.items(): + if stats["count"] > 0: + total_conf = sum(inst.get("confidence", 0.0) for inst in stats["instances"]) + stats["avg_confidence"] = total_conf / stats["count"] + return self.scene_describer.generate_description( - scene_type, - detected_objects, - confidence, - lighting_info, - functional_zones + scene_type=scene_type, + detected_objects=detected_objects, + confidence=confidence, + lighting_info=lighting_info, + functional_zones=functional_zones_list, + enable_landmark=enable_landmark, + scene_scores=scene_scores, + spatial_analysis=spatial_analysis, + image_dimensions=image_dimensions, + object_statistics=object_statistics ) - def _generate_scene_description(self, scene_type, detected_objects, confidence, lighting_info=None): - """ - Use new implement - """ - # get the functional zones info - functional_zones = self.spatial_analyzer._identify_functional_zones(detected_objects, scene_type) - - return self.generate_scene_description( - scene_type, - detected_objects, - confidence, - lighting_info, - functional_zones - ) def _define_image_regions(self): """Define regions of the image for spatial analysis (3x3 grid)""" @@ -107,306 +235,1084 @@ class SceneAnalyzer: "bottom_right": (2/3, 2/3, 1, 1) } + def _get_alternative_scene_type(self, landmark_scene_type, detected_objects, scene_scores): + """ + 為地標場景類型選擇適合的替代類型 + + Args: + landmark_scene_type: 原始地標場景類型 + detected_objects: 檢測到的物體列表 + scene_scores: 所有場景類型的分數 + + Returns: + str: 適合的替代場景類型 + """ + # 1. 嘗試從現有場景分數中找出第二高的非地標場景 + landmark_types = {"tourist_landmark", "natural_landmark", "historical_monument"} + alternative_scores = {k: v for k, v in scene_scores.items() if k not in landmark_types and v > 0.2} + + if alternative_scores: + # 返回分數最高的非地標場景類型 + return max(alternative_scores.items(), key=lambda x: x[1])[0] - def analyze(self, detection_result: Any, lighting_info: Optional[Dict] = None, class_confidence_threshold: float = 0.35, scene_confidence_threshold: float = 0.6) -> Dict: + # 2. 基於物體組合推斷場景類型 + object_counts = {} + for obj in detected_objects: + class_name = obj.get("class_name", "") + if class_name not in object_counts: + object_counts[class_name] = 0 + object_counts[class_name] += 1 + + # 根據物體組合決定場景類型 + if "car" in object_counts or "truck" in object_counts or "bus" in object_counts: + # 有車輛,可能是街道或交叉路口 + if "traffic light" in object_counts or "stop sign" in object_counts: + return "intersection" + else: + return "city_street" + + if "building" in object_counts and object_counts.get("person", 0) > 0: + # 有建築物和人,可能是商業區 + return "commercial_district" + + if object_counts.get("person", 0) > 3: + # 多個行人,可能是行人區 + return "pedestrian_area" + + if "bench" in object_counts or "potted plant" in object_counts: + # 有長椅或盆栽,可能是公園區域 + return "park_area" + + # 3. 根據原始地標場景類型選擇合適的替代場景 + if landmark_scene_type == "natural_landmark": + return "outdoor_natural_area" + elif landmark_scene_type == "historical_monument": + return "urban_architecture" + + # 默認回退到城市街道 + return "city_street" + + def analyze(self, detection_result: Any, lighting_info: Optional[Dict] = None, class_confidence_threshold: float = 0.25, scene_confidence_threshold: float = 0.6, enable_landmark=True, places365_info: Optional[Dict] = None) -> Dict: """ Analyze detection results to determine scene type and provide understanding. Args: - detection_result: Detection result from YOLOv8 - lighting_info: Optional lighting condition analysis results - class_confidence_threshold: Minimum confidence to consider an object - scene_confidence_threshold: Minimum confidence to determine a scene + detection_result: Detection result from YOLOv8 or similar. + lighting_info: Optional lighting condition analysis results. + class_confidence_threshold: Minimum confidence to consider an object. + scene_confidence_threshold: Minimum confidence to determine a scene. + enable_landmark: Whether to enable landmark detection and recognition for this run. Returns: - Dictionary with scene analysis results + Dictionary with scene analysis results. """ - # If no result or no detections, handle with LLM if possible - if detection_result is None or len(detection_result.boxes) == 0: - if self.use_llm and self.use_clip and detection_result is not None: - # 使用CLIP和LLM分析無物體檢測的情況 + current_run_enable_landmark = enable_landmark + print(f"DIAGNOSTIC (SceneAnalyzer.analyze): Called with current_run_enable_landmark={current_run_enable_landmark}") + print(f"DEBUG: SceneAnalyzer received lighting_info type: {type(lighting_info)}") + print(f"DEBUG: SceneAnalyzer lighting_info source: {lighting_info.get('source', 'unknown') if isinstance(lighting_info, dict) else 'not_dict'}") + + # Log Places365 information if available + if places365_info: + print(f"DIAGNOSTIC: Places365 info received - scene: {places365_info.get('scene_label', 'unknown')}, " + f"mapped: {places365_info.get('mapped_scene_type', 'unknown')}, " + f"confidence: {places365_info.get('confidence', 0.0):.3f}") + + # Sync enable_landmark status with child components for this analysis run + # Assuming these components exist and have an 'enable_landmark' attribute + for component_name in ['scene_describer', 'clip_analyzer', 'landmark_classifier']: + if hasattr(self, component_name): + component = getattr(self, component_name) + if component and hasattr(component, 'enable_landmark'): + component.enable_landmark = current_run_enable_landmark + + self.enable_landmark = current_run_enable_landmark # Instance's general state for this run + if hasattr(self, 'use_landmark_detection'): + self.use_landmark_detection = current_run_enable_landmark + + + original_image_pil = None + image_dims_val = None # Will be (width, height) + + if detection_result is not None and hasattr(detection_result, 'orig_img') and detection_result.orig_img is not None: + if isinstance(detection_result.orig_img, np.ndarray): try: - original_image = detection_result.orig_img - clip_analysis = self.clip_analyzer.analyze_image(original_image) - llm_description = self.llm_enhancer.handle_no_detection(clip_analysis) + img_array = detection_result.orig_img + if img_array.ndim == 3 and img_array.shape[2] == 4: # RGBA + img_array = img_array[:, :, :3] # Convert to RGB + if img_array.ndim == 2 : # Grayscale + original_image_pil = Image.fromarray(img_array).convert("RGB") + else: # Assuming RGB or BGR (PIL handles BGR->RGB on fromarray if mode not specified, but explicit is better if source is cv2 BGR) + original_image_pil = Image.fromarray(img_array) + + if original_image_pil.mode == 'BGR': # Explicitly convert BGR from OpenCV to RGB for PIL + original_image_pil = original_image_pil.convert('RGB') + + image_dims_val = (original_image_pil.width, original_image_pil.height) + except Exception as e: + print(f"Warning: Error converting NumPy orig_img to PIL: {e}") + elif hasattr(detection_result.orig_img, 'size') and callable(getattr(detection_result.orig_img, 'convert', None)): + original_image_pil = detection_result.orig_img.copy().convert("RGB") # Ensure RGB + image_dims_val = original_image_pil.size + else: + print(f"Warning: detection_result.orig_img (type: {type(detection_result.orig_img)}) is not a recognized NumPy array or PIL Image.") + else: + print("Warning: detection_result.orig_img not available. Image-based analysis will be limited.") + + # Handling cases with no YOLO detections (or no boxes attribute) + no_yolo_detections = (detection_result is None or + not hasattr(detection_result, 'boxes') or + not hasattr(detection_result.boxes, 'xyxy') or + len(detection_result.boxes.xyxy) == 0) + + if no_yolo_detections: + tried_landmark_detection = False + landmark_detection_result = None + + if original_image_pil and self.use_clip and current_run_enable_landmark: + if not hasattr(self, 'landmark_classifier') and hasattr(self, 'clip_analyzer'): + try: + if hasattr(self.clip_analyzer, 'get_clip_instance'): + model, preprocess, device = self.clip_analyzer.get_clip_instance() + self.landmark_classifier = CLIPZeroShotClassifier(device=device) + print("Initialized landmark classifier with shared CLIP model") + else: + self.landmark_classifier = CLIPZeroShotClassifier() + print("Created landmark classifier on demand for no YOLO detection path") + except Exception as e: + print(f"Warning: Could not initialize landmark classifier: {e}") + + # 地標搜索 + if hasattr(self, 'landmark_classifier'): + try: + tried_landmark_detection = True + print("Attempting landmark detection with no YOLO boxes") + landmark_results_no_yolo = self.landmark_classifier.intelligent_landmark_search( + original_image_pil, yolo_boxes=None, base_threshold=0.2 # 略微降低閾值,提高靈敏度 + ) + # 確保在無地標場景時返回有效結果 + if landmark_results_no_yolo is None: + landmark_results_no_yolo = {"is_landmark_scene": False, "detected_landmarks": []} + + if landmark_results_no_yolo and landmark_results_no_yolo.get("is_landmark_scene", False): + primary_landmark_no_yolo = landmark_results_no_yolo.get("primary_landmark") + + # 放寬閾值條件,以便捕獲更多潛在地標 + if primary_landmark_no_yolo and primary_landmark_no_yolo.get("confidence", 0) > 0.25: # 降低閾值 + landmark_detection_result = True + detected_objects_from_landmarks_list = [] + w_img_no_yolo, h_img_no_yolo = image_dims_val if image_dims_val else (1,1) + + for lm_info_item in landmark_results_no_yolo.get("detected_landmarks", []): + if lm_info_item.get("confidence", 0) > 0.25: # 降低閾值與上面保持一致 + # 安全獲取 box 值,避免索引錯誤 + box = lm_info_item.get("box", [0, 0, w_img_no_yolo, h_img_no_yolo]) + # 確保 box 包含至少 4 個元素 + if len(box) < 4: + box = [0, 0, w_img_no_yolo, h_img_no_yolo] + + # 計算中心點和標準化坐標 + center_x, center_y = (box[0] + box[2]) / 2, (box[1] + box[3]) / 2 + norm_cx = center_x / w_img_no_yolo if w_img_no_yolo > 0 else 0.5 + norm_cy = center_y / h_img_no_yolo if h_img_no_yolo > 0 else 0.5 + + # 決定地標類型 + landmark_type = "architectural" # 預設類型 + landmark_id = lm_info_item.get("landmark_id", "") + + if hasattr(self.landmark_classifier, '_determine_landmark_type') and landmark_id: + try: + landmark_type = self.landmark_classifier._determine_landmark_type(landmark_id) + except Exception as e: + print(f"Error determining landmark type: {e}") + else: + # 使用簡單的基於 ID 的啟發式方法推斷類型 + landmark_id_lower = landmark_id.lower() if isinstance(landmark_id, str) else "" + if "natural" in landmark_id_lower or any(term in landmark_id_lower for term in ["mountain", "waterfall", "canyon", "lake"]): + landmark_type = "natural" + elif "monument" in landmark_id_lower or "memorial" in landmark_id_lower or "historical" in landmark_id_lower: + landmark_type = "monument" + + # 決定區域位置 + region = "center" # 預設值 + if hasattr(self, 'spatial_analyzer') and hasattr(self.spatial_analyzer, '_determine_region'): + try: + region = self.spatial_analyzer._determine_region(norm_cx, norm_cy) + except Exception as e: + print(f"Error determining region: {e}") + + # 創建地標物體 + landmark_obj = { + "class_id": lm_info_item.get("landmark_id", f"LM_{lm_info_item.get('landmark_name','unk')}")[:15], + "class_name": lm_info_item.get("landmark_name", "Unknown Landmark"), + "confidence": lm_info_item.get("confidence", 0.0), + "box": box, + "center": (center_x, center_y), + "normalized_center": (norm_cx, norm_cy), + "size": (box[2] - box[0], box[3] - box[1]), + "normalized_size": ( + (box[2] - box[0])/(w_img_no_yolo if w_img_no_yolo>0 else 1), + (box[3] - box[1])/(h_img_no_yolo if h_img_no_yolo>0 else 1) + ), + "area": (box[2] - box[0]) * (box[3] - box[1]), + "normalized_area": ( + (box[2] - box[0]) * (box[3] - box[1]) + ) / ((w_img_no_yolo*h_img_no_yolo) if w_img_no_yolo*h_img_no_yolo >0 else 1), + "is_landmark": True, + "landmark_id": landmark_id, + "location": lm_info_item.get("location", "Unknown Location"), + "region": region, + "year_built": lm_info_item.get("year_built", ""), + "architectural_style": lm_info_item.get("architectural_style", ""), + "significance": lm_info_item.get("significance", ""), + "landmark_type": landmark_type + } + detected_objects_from_landmarks_list.append(landmark_obj) + + if detected_objects_from_landmarks_list: + # 設定場景類型 + best_scene_val_no_yolo = "tourist_landmark" # 預設 + if primary_landmark_no_yolo: + try: + lm_type_no_yolo = primary_landmark_no_yolo.get("landmark_type", "architectural") + if lm_type_no_yolo and "natural" in lm_type_no_yolo.lower(): + best_scene_val_no_yolo = "natural_landmark" + elif lm_type_no_yolo and ("historical" in lm_type_no_yolo.lower() or "monument" in lm_type_no_yolo.lower()): + best_scene_val_no_yolo = "historical_monument" + except Exception as e: + print(f"Error determining scene type from landmark type: {e}") + + # 確保場景類型有效 + if not hasattr(self, 'SCENE_TYPES') or best_scene_val_no_yolo not in self.SCENE_TYPES: + best_scene_val_no_yolo = "tourist_landmark" # 預設場景類型 + + # 設定置信度 + scene_confidence_no_yolo = primary_landmark_no_yolo.get("confidence", 0.0) if primary_landmark_no_yolo else 0.0 + + # 分析空間區域 + region_analysis_for_lm_desc = {} + if hasattr(self, 'spatial_analyzer') and hasattr(self.spatial_analyzer, '_analyze_regions'): + try: + region_analysis_for_lm_desc = self.spatial_analyzer._analyze_regions(detected_objects_from_landmarks_list) + except Exception as e: + print(f"Error analyzing regions: {e}") + + # 獲取功能區 + f_zones_no_yolo = {} + if hasattr(self, 'spatial_analyzer') and hasattr(self.spatial_analyzer, '_identify_landmark_zones'): + try: + f_zones_no_yolo = self.spatial_analyzer._identify_landmark_zones(detected_objects_from_landmarks_list) + except Exception as e: + print(f"Error identifying landmark zones: {e}") + + # 生成場景描述 + scene_desc_no_yolo = f"A {best_scene_val_no_yolo} scene." # 預設描述 + if hasattr(self, 'scene_describer') and hasattr(self.scene_describer, 'generate_description'): + try: + scene_desc_no_yolo = self.scene_describer.generate_description( + scene_type=best_scene_val_no_yolo, + detected_objects=detected_objects_from_landmarks_list, + confidence=scene_confidence_no_yolo, + lighting_info=lighting_info, + functional_zones=list(f_zones_no_yolo.keys()) if f_zones_no_yolo else [], + enable_landmark=True, + scene_scores={best_scene_val_no_yolo: scene_confidence_no_yolo}, + spatial_analysis=region_analysis_for_lm_desc, + image_dimensions=image_dims_val + ) + + except Exception as e: + print(f"Error generating scene description: {e}") + + + # 使用 LLM 增強描述 + enhanced_desc_no_yolo = scene_desc_no_yolo + if self.use_llm and hasattr(self, 'llm_enhancer') and hasattr(self.llm_enhancer, 'enhance_description'): + try: + # 準備用於 LLM 增強器的數據 + prominent_objects_detail = "" + if hasattr(self, 'scene_describer') and hasattr(self.scene_describer, '_format_object_list_for_description'): + try: + prominent_objects_detail = self.scene_describer._format_object_list_for_description( + detected_objects_from_landmarks_list[:min(1, len(detected_objects_from_landmarks_list))] + ) + except Exception as e: + print(f"Error formatting object list: {e}") + + scene_data_llm_no_yolo = { + "original_description": scene_desc_no_yolo, + "scene_type": best_scene_val_no_yolo, + "scene_name": self.SCENE_TYPES.get(best_scene_val_no_yolo, {}).get("name", "Landmark") + if hasattr(self, 'SCENE_TYPES') else "Landmark", + "detected_objects": detected_objects_from_landmarks_list, + "object_list": "landmark", + "confidence": scene_confidence_no_yolo, + "lighting_info": lighting_info, + "functional_zones": f_zones_no_yolo, + "clip_analysis": landmark_results_no_yolo.get("clip_analysis_on_full_image", {}), + "enable_landmark": True, + "image_width": w_img_no_yolo, + "image_height": h_img_no_yolo, + "prominent_objects_detail": prominent_objects_detail + } + enhanced_desc_no_yolo = self.llm_enhancer.enhance_description(scene_data_llm_no_yolo) + except Exception as e: + print(f"Error enhancing description with LLM: {e}") + import traceback + traceback.print_exc() + + # 計算可能的活動,優先使用地標特定活動 + possible_activities = ["Sightseeing"] + + # 檢查是否有主要地標活動從 CLIP 分析結果中獲取 + primary_landmark_activities = landmark_results_no_yolo.get("primary_landmark_activities", []) + + if primary_landmark_activities: + print(f"Using {len(primary_landmark_activities)} landmark-specific activities") + possible_activities = primary_landmark_activities + else: + # 從檢測到的地標中提取特定活動 + landmark_specific_activities = [] + for lm_info_item in landmark_results_no_yolo.get("detected_landmarks", []): + lm_id = lm_info_item.get("landmark_id") + if lm_id and hasattr(self, 'LANDMARK_ACTIVITIES') and lm_id in self.LANDMARK_ACTIVITIES: + landmark_specific_activities.extend(self.LANDMARK_ACTIVITIES[lm_id]) + + if landmark_specific_activities: + possible_activities = list(set(landmark_specific_activities)) # 去重 + print(f"Extracted {len(possible_activities)} activities from landmark data") + else: + # 回退到通用活動推斷 + if hasattr(self, 'descriptor') and hasattr(self.descriptor, '_infer_possible_activities'): + try: + possible_activities = self.descriptor._infer_possible_activities( + best_scene_val_no_yolo, + detected_objects_from_landmarks_list, + enable_landmark=True, + scene_scores={best_scene_val_no_yolo: scene_confidence_no_yolo} + ) + except Exception as e: + print(f"Error inferring possible activities: {e}") + + # 準備最終結果 + return { + "scene_type": best_scene_val_no_yolo, + "scene_name": self.SCENE_TYPES.get(best_scene_val_no_yolo, {}).get("name", "Landmark") + if hasattr(self, 'SCENE_TYPES') else "Landmark", + "confidence": round(float(scene_confidence_no_yolo), 4), + "description": scene_desc_no_yolo, + "enhanced_description": enhanced_desc_no_yolo, + "objects_present": detected_objects_from_landmarks_list, + "object_count": len(detected_objects_from_landmarks_list), + "regions": region_analysis_for_lm_desc, + "possible_activities": possible_activities, + "functional_zones": f_zones_no_yolo, + "detected_landmarks": [lm for lm in detected_objects_from_landmarks_list if lm.get("is_landmark", False)], + "primary_landmark": primary_landmark_no_yolo, + "lighting_conditions": lighting_info or {"time_of_day": "unknown", "confidence": 0.0} + } + except Exception as e: + print(f"Error in landmark-only detection path (analyze method): {e}") + import traceback + traceback.print_exc() + + # 如果地標檢測失敗或未嘗試,使用 CLIP 進行一般場景分析 + if not landmark_detection_result and self.use_clip and original_image_pil: + try: + clip_analysis_val_no_yolo = None + if hasattr(self, 'clip_analyzer') and hasattr(self.clip_analyzer, 'analyze_image'): + try: + clip_analysis_val_no_yolo = self.clip_analyzer.analyze_image( + original_image_pil, + enable_landmark=current_run_enable_landmark + ) + except Exception as e: + print(f"Error in CLIP analysis: {e}") + + scene_type_llm_no_yolo = "llm_inferred_no_yolo" + confidence_llm_no_yolo = 0.0 + + if clip_analysis_val_no_yolo and isinstance(clip_analysis_val_no_yolo, dict): + top_scene = clip_analysis_val_no_yolo.get("top_scene") + if top_scene and isinstance(top_scene, tuple) and len(top_scene) >= 2: + confidence_llm_no_yolo = top_scene[1] + if isinstance(top_scene[0], str): + scene_type_llm_no_yolo = top_scene[0] + + desc_llm_no_yolo = "Primary object detection did not yield results. This description is based on overall image context." + + w_llm_no_yolo, h_llm_no_yolo = image_dims_val if image_dims_val else (1, 1) + + enhanced_desc_llm_no_yolo = desc_llm_no_yolo + if self.use_llm and hasattr(self, 'llm_enhancer'): + try: + # 確保數據正確格式化 + clip_analysis_safe = {} + if isinstance(clip_analysis_val_no_yolo, dict): + clip_analysis_safe = clip_analysis_val_no_yolo + + scene_data_llm_no_yolo_enhance = { + "original_description": desc_llm_no_yolo, + "scene_type": scene_type_llm_no_yolo, + "scene_name": "Contextually Inferred (No Detections)", + "detected_objects": [], + "object_list": "general ambiance", + "confidence": confidence_llm_no_yolo, + "lighting_info": lighting_info or {"time_of_day": "unknown", "confidence": 0.0}, + "clip_analysis": clip_analysis_safe, + "enable_landmark": current_run_enable_landmark, + "image_width": w_llm_no_yolo, + "image_height": h_llm_no_yolo, + "prominent_objects_detail": "the overall visual context" + } + + if hasattr(self.llm_enhancer, 'enhance_description'): + try: + enhanced_desc_llm_no_yolo = self.llm_enhancer.enhance_description(scene_data_llm_no_yolo_enhance) + except Exception as e: + print(f"Error in enhance_description: {e}") + + if (not enhanced_desc_llm_no_yolo or len(enhanced_desc_llm_no_yolo.strip()) < 20) and hasattr(self.llm_enhancer, 'handle_no_detection'): + try: + enhanced_desc_llm_no_yolo = self.llm_enhancer.handle_no_detection(clip_analysis_safe) + except Exception as e: + print(f"Error in handle_no_detection: {e}") + except Exception as e: + print(f"Error preparing data for LLM enhancement: {e}") + import traceback + traceback.print_exc() + + # 安全類型轉換 + try: + confidence_float = float(confidence_llm_no_yolo) + except (ValueError, TypeError): + confidence_float = 0.0 + + # 確保增強描述不為空 + if not enhanced_desc_llm_no_yolo or not isinstance(enhanced_desc_llm_no_yolo, str): + enhanced_desc_llm_no_yolo = desc_llm_no_yolo + + # 返回結果 return { - "scene_type": "llm_inferred", - "confidence": clip_analysis.get("top_scene", ("unknown", 0))[1], - "description": "No objects detected by standard detection.", - "enhanced_description": llm_description, + "scene_type": scene_type_llm_no_yolo, + "confidence": round(confidence_float, 4), + "description": desc_llm_no_yolo, + "enhanced_description": enhanced_desc_llm_no_yolo, "objects_present": [], "object_count": 0, "regions": {}, "possible_activities": [], "safety_concerns": [], - "lighting_conditions": lighting_info or {"time_of_day": "unknown", "confidence": 0} + "lighting_conditions": lighting_info or {"time_of_day": "unknown", "confidence": 0.0} } except Exception as e: - print(f"Error in LLM no-detection handling: {e}") + print(f"Error in CLIP no-detection fallback (analyze method): {e}") + import traceback + traceback.print_exc() + + # Check if Places365 provides useful scene information even without YOLO detections + fallback_scene_type = "unknown" + fallback_confidence = 0.0 + fallback_description = "No objects were detected in the image, and contextual analysis could not be performed or failed." + + if places365_info and places365_info.get('confidence', 0) > 0.3: + fallback_scene_type = places365_info.get('mapped_scene_type', 'unknown') + fallback_confidence = places365_info.get('confidence', 0.0) + fallback_description = f"Scene appears to be {places365_info.get('scene_label', 'an unidentified location')} based on overall visual context." - # 如果無法使用LLM/CLIP或處理失敗,返回原始的無檢測結果 return { - "scene_type": "unknown", - "confidence": 0, - "description": "No objects detected in the image.", + "scene_type": fallback_scene_type, + "confidence": fallback_confidence, + "description": fallback_description, + "enhanced_description": "The image analysis system could not detect any recognizable objects or landmarks in this image.", "objects_present": [], "object_count": 0, "regions": {}, "possible_activities": [], "safety_concerns": [], - "lighting_conditions": lighting_info or {"time_of_day": "unknown", "confidence": 0} + "lighting_conditions": lighting_info or {"time_of_day": "unknown", "confidence": 0.0} } - # Get class names from detection result if not already set - if self.class_names is None: + if self.use_llm and self.use_clip and original_image_pil: + try: + clip_analysis_val_no_yolo = self.clip_analyzer.analyze_image(original_image_pil, enable_landmark=current_run_enable_landmark) + scene_type_llm_no_yolo = "llm_inferred_no_yolo" + confidence_llm_no_yolo = clip_analysis_val_no_yolo.get("top_scene", ("unknown", 0.0))[1] if isinstance(clip_analysis_val_no_yolo, dict) else 0.0 + desc_llm_no_yolo = "Primary object detection did not yield results. This description is based on overall image context." + + w_llm_no_yolo, h_llm_no_yolo = image_dims_val if image_dims_val else (1,1) + scene_data_llm_no_yolo_enhance = { + "original_description": desc_llm_no_yolo, "scene_type": scene_type_llm_no_yolo, + "scene_name": "Contextually Inferred (No Detections)", "detected_objects": [], "object_list": "general ambiance", + "confidence": confidence_llm_no_yolo, "lighting_info": lighting_info, "clip_analysis": clip_analysis_val_no_yolo, + "enable_landmark": current_run_enable_landmark, "image_width": w_llm_no_yolo, "image_height": h_llm_no_yolo, + "prominent_objects_detail": "the overall visual context" + } + enhanced_desc_llm_no_yolo = self.llm_enhancer.enhance_description(scene_data_llm_no_yolo_enhance) if hasattr(self, 'llm_enhancer') else desc_llm_no_yolo + if hasattr(self, 'llm_enhancer') and hasattr(self.llm_enhancer, 'handle_no_detection') and (not enhanced_desc_llm_no_yolo or len(enhanced_desc_llm_no_yolo.strip()) < 20): + enhanced_desc_llm_no_yolo = self.llm_enhancer.handle_no_detection(clip_analysis_val_no_yolo) + + return { + "scene_type": scene_type_llm_no_yolo, "confidence": round(float(confidence_llm_no_yolo),4), + "description": desc_llm_no_yolo, "enhanced_description": enhanced_desc_llm_no_yolo, + "objects_present": [], "object_count": 0, "regions": {}, "possible_activities": [], + "safety_concerns": [], "lighting_conditions": lighting_info or {"time_of_day": "unknown", "confidence": 0.0} + } + except Exception as e: + print(f"Error in LLM/CLIP no-detection fallback (analyze method): {e}") + + return { + "scene_type": "unknown", "confidence": 0.0, + "description": "No objects were detected in the image, and contextual analysis could not be performed or failed.", + "objects_present": [], "object_count": 0, "regions": {}, "possible_activities": [], + "safety_concerns": [], "lighting_conditions": lighting_info or {"time_of_day": "unknown", "confidence": 0.0} + } + + # Main processing flow if YOLO detections are present + if self.class_names is None and hasattr(detection_result, 'names'): self.class_names = detection_result.names - # Also update class names in spatial analyzer - self.spatial_analyzer.class_names = self.class_names + if hasattr(self.spatial_analyzer, 'class_names'): + self.spatial_analyzer.class_names = self.class_names - # Extract detected objects with confidence above threshold - detected_objects = self.spatial_analyzer._extract_detected_objects( + detected_objects_main = self.spatial_analyzer._extract_detected_objects( detection_result, confidence_threshold=class_confidence_threshold ) - # No objects above confidence threshold - if not detected_objects: + if not detected_objects_main: return { - "scene_type": "unknown", - "confidence": 0, - "description": "No objects with sufficient confidence detected.", - "objects_present": [], - "object_count": 0, - "regions": {}, - "possible_activities": [], - "safety_concerns": [], - "lighting_conditions": lighting_info or {"time_of_day": "unknown", "confidence": 0} + "scene_type": "unknown", "confidence": 0.0, + "description": "No objects detected with sufficient confidence by the primary vision system.", + "objects_present": [], "object_count": 0, "regions": {}, "possible_activities": [], + "safety_concerns": [], "lighting_conditions": lighting_info or {"time_of_day": "unknown", "confidence": 0.0} } - # Analyze object distribution in regions - region_analysis = self.spatial_analyzer._analyze_regions(detected_objects) + # Spatial analysis done once on YOLO objects + region_analysis_val = self.spatial_analyzer._analyze_regions(detected_objects_main) + + final_functional_zones = {} + final_activities = [] + final_landmark_info = {} + + tentative_best_scene = "unknown" + tentative_scene_confidence = 0.0 + + # Landmark Processing and Integration + landmark_objects_identified_clip = [] + landmark_specific_activities = [] # NEW + if self.use_clip and current_run_enable_landmark and hasattr(self, 'process_unknown_objects') and hasattr(self, 'landmark_classifier'): + + detected_objects_main_after_lm, landmark_objects_identified_clip = self.process_unknown_objects( + detection_result, + detected_objects_main + ) + detected_objects_main = detected_objects_main_after_lm # Update main list + + if landmark_objects_identified_clip: + primary_landmark_clip = max(landmark_objects_identified_clip, key=lambda x: x.get("confidence", 0.0), default=None) + if primary_landmark_clip and primary_landmark_clip.get("confidence", 0.0) > 0.35: + lm_type_raw = "architectural" # Default + if hasattr(self.landmark_classifier, '_determine_landmark_type') and primary_landmark_clip.get("landmark_id"): + lm_type_raw = self.landmark_classifier._determine_landmark_type(primary_landmark_clip.get("landmark_id")) + else: + lm_type_raw = primary_landmark_clip.get("landmark_type", "architectural") + + + if lm_type_raw == "natural": tentative_best_scene = "natural_landmark" + elif lm_type_raw == "monument": tentative_best_scene = "historical_monument" + else: tentative_best_scene = "tourist_landmark" + tentative_scene_confidence = primary_landmark_clip.get("confidence", 0.0) + + final_landmark_info = { + "detected_landmarks": landmark_objects_identified_clip, + "primary_landmark": primary_landmark_clip, + "detailed_landmarks": landmark_objects_identified_clip + } - # Compute scene type scores based on object detection - yolo_scene_scores = self._compute_scene_scores(detected_objects) + # 專門儲存地標特定活動的列表 + landmark_specific_activities = [] - # 使用 CLIP 分析圖像 - clip_scene_scores = {} - clip_analysis = None - if self.use_clip: - try: - # 獲取原始圖像 - original_image = detection_result.orig_img + # 優先收集��自識別地標的特定活動 + for lm_obj in landmark_objects_identified_clip: + lm_id = lm_obj.get("landmark_id") + if lm_id and lm_id in self.LANDMARK_ACTIVITIES: + landmark_specific_activities.extend(self.LANDMARK_ACTIVITIES[lm_id]) - # Use CLIP analyze image - clip_analysis = self.clip_analyzer.analyze_image(original_image) - - # get CLIP's score - clip_scene_scores = clip_analysis.get("scene_scores", {}) - - if "asian_commercial_street" in clip_scene_scores and clip_scene_scores["asian_commercial_street"] > 0.2: - # 使用對比提示進一步區分室內/室外 - comparative_results = self.clip_analyzer.calculate_similarity( - original_image, - self.clip_analyzer.comparative_prompts["indoor_vs_outdoor"] - ) - - # 分析對比結果 - indoor_score = sum(s for p, s in comparative_results.items() if "indoor" in p or "enclosed" in p) - outdoor_score = sum(s for p, s in comparative_results.items() if "outdoor" in p or "open-air" in p) - - # 如果 CLIP 認為這是室外場景,且光照分析認為是室內 - if outdoor_score > indoor_score and lighting_info and lighting_info.get("is_indoor", False): - # 修正光照分析結果 - print(f"CLIP indicates outdoor commercial street (score: {outdoor_score:.2f} vs {indoor_score:.2f}), adjusting lighting analysis") - lighting_info["is_indoor"] = False - lighting_info["indoor_probability"] = 0.3 - # 把CLIP 分析結果加到光照診斷 - if "diagnostics" not in lighting_info: - lighting_info["diagnostics"] = {} - lighting_info["diagnostics"]["clip_override"] = { - "reason": "CLIP detected outdoor commercial street", - "outdoor_score": float(outdoor_score), - "indoor_score": float(indoor_score) - } + # 將特定地標活動加入最終活動列表 + if landmark_specific_activities: + final_activities.extend(landmark_specific_activities) + print(f"Added {len(landmark_specific_activities)} landmark-specific activities for {', '.join([lm.get('landmark_name', 'unknown') for lm in landmark_objects_identified_clip if lm.get('is_landmark', False)])}") - # 如果 CLIP 檢測到了光照條件但沒有提供 lighting_info - if not lighting_info and "lighting_condition" in clip_analysis: - lighting_type, lighting_conf = clip_analysis["lighting_condition"] - lighting_info = { - "time_of_day": lighting_type, - "confidence": lighting_conf - } - except Exception as e: - print(f"Error in CLIP analysis: {e}") + if hasattr(self.spatial_analyzer, '_identify_landmark_zones'): + final_functional_zones.update(self.spatial_analyzer._identify_landmark_zones(landmark_objects_identified_clip)) - # 融合 YOLO 和 CLIP 的場景分數 - scene_scores = self._fuse_scene_scores(yolo_scene_scores, clip_scene_scores) + if not current_run_enable_landmark: + detected_objects_main = [obj for obj in detected_objects_main if not obj.get("is_landmark", False)] + final_landmark_info = {} - # Determine best matching scene type - best_scene, scene_confidence = self._determine_scene_type(scene_scores) + # --- Compute YOLO-based scene scores --- + # MODIFIED: Pass region_analysis_val as spatial_analysis_results + yolo_scene_scores_val = self._compute_scene_scores(detected_objects_main, + spatial_analysis_results=region_analysis_val) - # Generate possible activities based on scene - activities = self.descriptor._infer_possible_activities(best_scene, detected_objects) + # --- CLIP Analysis for general scene scores --- + clip_scene_scores_val = {} + clip_analysis_results = None # To store the full dict from clip_analyzer + if self.use_clip and original_image_pil is not None: + try: + clip_analysis_results = self.clip_analyzer.analyze_image( + original_image_pil, + enable_landmark=current_run_enable_landmark, + exclude_categories=["landmark", "tourist", "monument", "tower", "attraction", "scenic", "historical", "famous"] if not current_run_enable_landmark else None + ) + if isinstance(clip_analysis_results, dict): # Ensure it's a dict before get + clip_scene_scores_val = clip_analysis_results.get("scene_scores", {}) + # Filter again if landmarks are disabled + if not current_run_enable_landmark: + clip_scene_scores_val = {k: v for k, v in clip_scene_scores_val.items() if not any(kw in k.lower() for kw in ["landmark", "monument", "tourist"])} + if "cultural_analysis" in clip_analysis_results: del clip_analysis_results["cultural_analysis"] + if "top_scene" in clip_analysis_results and any(term in clip_analysis_results.get("top_scene",["unknown",0.0])[0].lower() for term in ["landmark", "monument", "tourist"]): + non_lm_cs = sorted([item for item in clip_scene_scores_val.items() if item[1] > 0], key=lambda x:x[1], reverse=True) + clip_analysis_results["top_scene"] = non_lm_cs[0] if non_lm_cs else ("unknown", 0.0) + + # (Keep your asian_commercial_street special handling here if needed) + if not lighting_info and "lighting_condition" in clip_analysis_results: # If main lighting_info is still None + lt, lc = clip_analysis_results.get("lighting_condition", ("unknown", 0.0)) + lighting_info = {"time_of_day": lt, "confidence": lc, "source": "CLIP_fallback"} + except Exception as e: + print(f"Error in main CLIP analysis for YOLO path (analyze method): {e}") - # Identify potential safety concerns - safety_concerns = self.descriptor._identify_safety_concerns(detected_objects, best_scene) + # Calculate stats for _fuse_scene_scores (based on non-landmark YOLO objects) + yolo_only_objects_for_fuse_stats = [obj for obj in detected_objects_main if not obj.get("is_landmark")] + num_yolo_detections_for_fuse = len(yolo_only_objects_for_fuse_stats) + avg_yolo_confidence_for_fuse = sum(obj.get('confidence', 0.0) for obj in yolo_only_objects_for_fuse_stats) / num_yolo_detections_for_fuse if num_yolo_detections_for_fuse > 0 else 0.0 - # Calculate functional zones - functional_zones = self.spatial_analyzer._identify_functional_zones(detected_objects, best_scene) + print(f"DEBUG: About to call _fuse_scene_scores with lighting_info: {lighting_info}") + print(f"DEBUG: Places365_info being passed to fuse: {places365_info}") - # Generate scene description - scene_description = self.generate_scene_description( - best_scene, - detected_objects, - scene_confidence, + scene_scores_fused = self._fuse_scene_scores( + yolo_scene_scores_val, clip_scene_scores_val, + num_yolo_detections=num_yolo_detections_for_fuse, + avg_yolo_confidence=avg_yolo_confidence_for_fuse, lighting_info=lighting_info, - functional_zones=functional_zones + places365_info=places365_info + ) + + # Respect tentative scene from strong landmark detection during fusion adjustment + if tentative_best_scene != "unknown" and "landmark" in tentative_best_scene.lower() and tentative_scene_confidence > 0.5: + scene_scores_fused[tentative_best_scene] = max(scene_scores_fused.get(tentative_best_scene, 0.0), tentative_scene_confidence * 0.95) + + # Final determination of scene type + final_best_scene, final_scene_confidence = self._determine_scene_type(scene_scores_fused) + + if not current_run_enable_landmark and final_best_scene in ["tourist_landmark", "natural_landmark", "historical_monument"]: + if hasattr(self, '_get_alternative_scene_type'): + alt_scene_type = self._get_alternative_scene_type(final_best_scene, detected_objects_main, scene_scores_fused) + final_best_scene = alt_scene_type + final_scene_confidence = scene_scores_fused.get(alt_scene_type, 0.6) + else: + final_best_scene = "generic_street_view"; final_scene_confidence = min(final_scene_confidence, 0.65) + + # Generate final descriptive content (Activities, Safety, Zones) + # 如果有特定地標活動,限制通用活動的數量 + generic_activities = [] + if hasattr(self.descriptor, '_infer_possible_activities'): + generic_activities = self.descriptor._infer_possible_activities( + final_best_scene, detected_objects_main, + enable_landmark=current_run_enable_landmark, scene_scores=scene_scores_fused + ) + + # 優先處理策略:使用特定地標活動,不足時才從通用活動補充 + if landmark_specific_activities: + # 如果有特定活動,優先保留,去除與特定活動重複的通用活動 + unique_generic_activities = [act for act in generic_activities if act not in landmark_specific_activities] + + # 如果特定活動少於3個,從通用活動中補充 + if len(landmark_specific_activities) < 3: + # 補充通用活動但總數不超過7個 + supplement_count = min(3 - len(landmark_specific_activities), len(unique_generic_activities)) + if supplement_count > 0: + final_activities.extend(unique_generic_activities[:supplement_count]) + else: + # 若無特定活動,則使用所有通用活動 + final_activities.extend(generic_activities) + + # 去重並排序,但確保特定地標活動保持在前面 + final_activities_set = set(final_activities) + final_activities = [] + + # 先加入特定地標活動(按原順序) + for activity in landmark_specific_activities: + if activity in final_activities_set: + final_activities.append(activity) + final_activities_set.remove(activity) + + # 再加入通用活動(按字母排序) + final_activities.extend(sorted(list(final_activities_set))) + + final_safety_concerns = self.descriptor._identify_safety_concerns(detected_objects_main, final_best_scene) if hasattr(self.descriptor, '_identify_safety_concerns') else [] + + if hasattr(self.spatial_analyzer, '_identify_functional_zones'): # Update functional_zones + general_zones = self.spatial_analyzer._identify_functional_zones(detected_objects_main, final_best_scene) + for gz_key, gz_val in general_zones.items(): + if gz_key not in final_functional_zones: final_functional_zones[gz_key] = gz_val + + # Filter again if landmarks disabled for this run + if not current_run_enable_landmark: + final_functional_zones = {k: v for k, v in final_functional_zones.items() if not any(kw in k.lower() for kw in ["landmark", "monument", "viewing", "tourist"])} + current_activities_temp = [act for act in final_activities if not any(kw in act.lower() for kw in ["sightsee", "photograph", "tour", "histor", "landmark", "monument", "cultur"])] + final_activities = current_activities_temp + if not final_activities and hasattr(self.descriptor, '_infer_possible_activities'): + final_activities = self.descriptor._infer_possible_activities("generic_street_view", detected_objects_main, enable_landmark=False) + + # 創建淨化的光線資訊,避免不合理的時間描述 + lighting_info_clean = None + if lighting_info: + lighting_info_clean = { + "is_indoor": lighting_info.get("is_indoor"), + "confidence": lighting_info.get("confidence", 0.0), + "time_of_day": lighting_info.get("time_of_day", "unknown") # 加入這行 + } + # 如果 Places365 提供高信心度判斷,就用它的結果 + if places365_info and places365_info.get('confidence', 0) >= 0.8: + lighting_info_clean["is_indoor"] = places365_info.get('is_indoor') + lighting_info_clean["confidence"] = places365_info.get('confidence') + + base_scene_description = self.generate_scene_description( + scene_type=final_best_scene, + detected_objects=detected_objects_main, + confidence=final_scene_confidence, + lighting_info=lighting_info_clean, + functional_zones=final_functional_zones, + enable_landmark=current_run_enable_landmark, + scene_scores=scene_scores_fused, + spatial_analysis=region_analysis_val, + image_dimensions=image_dims_val ) - # 使用LLM進行增強處理 - enhanced_description = None - llm_verification = None + if not current_run_enable_landmark and hasattr(self, '_remove_landmark_references'): + base_scene_description = self._remove_landmark_references(base_scene_description) - if self.use_llm: + # --- LLM Enhancement --- + enhanced_final_description = base_scene_description + llm_verification_output = None + if self.use_llm and hasattr(self, 'llm_enhancer'): try: - # 準備用於LLM的場景數據 - scene_data = { - "original_description": scene_description, - "scene_type": best_scene, - "scene_name": self.SCENE_TYPES.get(best_scene, {}).get("name", "Unknown"), - "detected_objects": detected_objects, - "confidence": scene_confidence, - "lighting_info": lighting_info, - "functional_zones": functional_zones, - "activities": activities, - "safety_concerns": safety_concerns, - "clip_analysis": clip_analysis - } + obj_list_for_llm = ", ".join(sorted(list(set( + obj["class_name"] for obj in detected_objects_main + if obj.get("confidence", 0) > 0.4 and not obj.get("is_landmark") + )))) + if not obj_list_for_llm and current_run_enable_landmark and final_landmark_info.get("primary_landmark"): + obj_list_for_llm = final_landmark_info["primary_landmark"].get("class_name", "a prominent feature") + elif not obj_list_for_llm: obj_list_for_llm = "various visual elements" + + # 生成物體統計信息 + object_statistics = {} + for obj in detected_objects_main: + class_name = obj.get("class_name", "unknown") + if class_name not in object_statistics: + object_statistics[class_name] = { + "count": 0, + "avg_confidence": 0.0, + "max_confidence": 0.0, + "instances": [] + } - # 如果CLIP和YOLO結果之間存在顯著差異,使用LLM進行驗證 - if self.use_clip and clip_analysis and "top_scene" in clip_analysis: - clip_top_scene = clip_analysis["top_scene"][0] - clip_confidence = clip_analysis["top_scene"][1] - - # 如果CLIP和YOLO的場景預測不同且都有較高的置信度,進行驗證 - if clip_top_scene != best_scene and clip_confidence > 0.4 and scene_confidence > 0.4: - llm_verification = self.llm_enhancer.verify_detection( - detected_objects, - clip_analysis, - best_scene, - self.SCENE_TYPES.get(best_scene, {}).get("name", "Unknown"), - scene_confidence + stats = object_statistics[class_name] + stats["count"] += 1 + stats["instances"].append(obj) + stats["max_confidence"] = max(stats["max_confidence"], obj.get("confidence", 0.0)) + + # 計算平均信心度 + for class_name, stats in object_statistics.items(): + if stats["count"] > 0: + total_conf = sum(inst.get("confidence", 0.0) for inst in stats["instances"]) + stats["avg_confidence"] = total_conf / stats["count"] + + llm_scene_data = { + "original_description": base_scene_description, "scene_type": final_best_scene, + "scene_name": self.SCENE_TYPES.get(final_best_scene, {}).get("name", "Unknown Scene"), + "detected_objects": detected_objects_main, "object_list": obj_list_for_llm, + "object_statistics": object_statistics, # 新增統計信息 + "confidence": final_scene_confidence, "lighting_info": lighting_info, + "functional_zones": final_functional_zones, "activities": final_activities, + "safety_concerns": final_safety_concerns, + "clip_analysis": clip_analysis_results if isinstance(clip_analysis_results, dict) else None, + "enable_landmark": current_run_enable_landmark, + "image_width": image_dims_val[0] if image_dims_val else None, + "image_height": image_dims_val[1] if image_dims_val else None, + "prominent_objects_detail": self.scene_describer._format_object_list_for_description( + self.scene_describer._get_prominent_objects(detected_objects_main, min_prominence_score=0.1, max_categories_to_return=3, max_total_objects=7) + ) if hasattr(self.scene_describer, '_get_prominent_objects') and hasattr(self.scene_describer, '_format_object_list_for_description') else "" + } + if current_run_enable_landmark and final_landmark_info.get("primary_landmark"): + llm_scene_data["primary_landmark_info"] = final_landmark_info["primary_landmark"] + + if self.use_clip and clip_analysis_results and isinstance(clip_analysis_results, dict) and "top_scene" in clip_analysis_results: + clip_top_name = clip_analysis_results.get("top_scene",["unknown",0.0])[0] + clip_top_conf = clip_analysis_results.get("top_scene",["unknown",0.0])[1] + if clip_top_name != final_best_scene and clip_top_conf > 0.4 and final_scene_confidence > 0.4 and hasattr(self.llm_enhancer, 'verify_detection'): + llm_verification_output = self.llm_enhancer.verify_detection( + detected_objects_main, clip_analysis_results, final_best_scene, + self.SCENE_TYPES.get(final_best_scene, {}).get("name", "Unknown"), final_scene_confidence ) + if llm_verification_output : llm_scene_data["verification_result"] = llm_verification_output.get("verification_text", "") - # 將驗證結果添加到場景數據中 - scene_data["verification_result"] = llm_verification.get("verification_text", "") - - # 使用LLM生成增強描述 - enhanced_description = self.llm_enhancer.enhance_description(scene_data) - + enhanced_final_description = self.llm_enhancer.enhance_description(llm_scene_data) + if not current_run_enable_landmark and hasattr(self, '_remove_landmark_references'): + enhanced_final_description = self._remove_landmark_references(enhanced_final_description) except Exception as e: - print(f"Error in LLM enhancement: {e}") - import traceback - traceback.print_exc() - enhanced_description = None - - # Return comprehensive analysis - result = { - "scene_type": best_scene if scene_confidence >= scene_confidence_threshold else "unknown", - "scene_name": self.SCENE_TYPES.get(best_scene, {}).get("name", "Unknown") - if scene_confidence >= scene_confidence_threshold else "Unknown Scene", - "confidence": scene_confidence, - "description": scene_description, - "enhanced_description": enhanced_description, # 添加LLM增強的描述 - "objects_present": [ - {"class_id": obj["class_id"], - "class_name": obj["class_name"], - "confidence": obj["confidence"]} - for obj in detected_objects - ], - "object_count": len(detected_objects), - "regions": region_analysis, - "possible_activities": activities, - "safety_concerns": safety_concerns, - "functional_zones": functional_zones, - "alternative_scenes": self.descriptor._get_alternative_scenes(scene_scores, scene_confidence_threshold, top_k=2), - "lighting_conditions": lighting_info or {"time_of_day": "unknown", "confidence": 0} + print(f"Error in LLM Enhancement in main flow (analyze method): {e}") + + # Construct final output dictionary + output_result = { + "scene_type": final_best_scene if final_scene_confidence >= scene_confidence_threshold else "unknown", + "scene_name": self.SCENE_TYPES.get(final_best_scene, {}).get("name", "Unknown Scene") if final_scene_confidence >= scene_confidence_threshold else "Unknown Scene", + "confidence": round(float(final_scene_confidence), 4), + "description": base_scene_description, + "enhanced_description": enhanced_final_description, + "objects_present": [{"class_id": obj.get("class_id", -1), "class_name": obj.get("class_name", "unknown"), "confidence": round(float(obj.get("confidence",0.0)), 4)} for obj in detected_objects_main], + "object_count": len(detected_objects_main), + "regions": region_analysis_val, + "possible_activities": final_activities, + "safety_concerns": final_safety_concerns, + "functional_zones": final_functional_zones, + "alternative_scenes": self.descriptor._get_alternative_scenes(scene_scores_fused, scene_confidence_threshold, top_k=2) if hasattr(self.descriptor, '_get_alternative_scenes') else [], + "lighting_conditions": lighting_info if lighting_info else {"time_of_day": "unknown", "confidence": 0.0, "source": "default"} } - # 如果有LLM驗證結果,添加到輸出中 - if llm_verification: - result["llm_verification"] = llm_verification.get("verification_text") - if llm_verification.get("has_errors", False): - result["detection_warnings"] = "LLM detected potential issues with object recognition" - - # 添加 CLIP 特定的結果 - if clip_analysis and "error" not in clip_analysis: - result["clip_analysis"] = { - "top_scene": clip_analysis.get("top_scene", ("unknown", 0)), - "cultural_analysis": clip_analysis.get("cultural_analysis", {}) + if current_run_enable_landmark and final_landmark_info and final_landmark_info.get("detected_landmarks"): + output_result.update(final_landmark_info) + if final_best_scene in ["tourist_landmark", "natural_landmark", "historical_monument"]: + output_result["scene_source"] = "landmark_detection" + elif not current_run_enable_landmark: + for key_rm in ["detected_landmarks", "primary_landmark", "detailed_landmarks", "scene_source"]: + if key_rm in output_result: del output_result[key_rm] + + if llm_verification_output: + output_result["llm_verification"] = llm_verification_output.get("verification_text") + if llm_verification_output.get("has_errors", False): + output_result["detection_warnings"] = "LLM detected potential issues with object recognition." + + if clip_analysis_results and isinstance(clip_analysis_results, dict) and "error" not in clip_analysis_results: + top_scene_clip = clip_analysis_results.get("top_scene", ("unknown", 0.0)) + output_result["clip_analysis"] = { + "top_scene": (top_scene_clip[0], round(float(top_scene_clip[1]), 4)), + "cultural_analysis": clip_analysis_results.get("cultural_analysis", {}) if current_run_enable_landmark else {} } - return result + return output_result + + + def _get_object_spatial_cohesion_score(self, objects_for_scene: List[Dict], spatial_analysis_results: Optional[Dict]) -> float: + """ + (This is a NEW helper function) + Calculates a score based on how spatially cohesive the key objects for a scene are. + A higher score means objects are more clustered in fewer regions. + This is a heuristic and can be refined. + + Args: + objects_for_scene: List of detected objects (dictionaries with at least 'class_id') + relevant to the current scene type being evaluated. + spatial_analysis_results: Output from SpatialAnalyzer._analyze_regions. + Expected format: {'objects_by_region': {'region_name': [{'class_id': id, ...}, ...]}} - def _compute_scene_scores(self, detected_objects: List[Dict]) -> Dict[str, float]: + Returns: + float: A cohesion score, typically a small bonus (e.g., 0.0 to 0.1). + """ + if not objects_for_scene or not spatial_analysis_results or \ + "objects_by_region" not in spatial_analysis_results or \ + not spatial_analysis_results["objects_by_region"]: + return 0.0 + + # Get the set of class_ids for the key objects defining the current scene type + key_object_class_ids = {obj.get('class_id') for obj in objects_for_scene if obj.get('class_id') is not None} + if not key_object_class_ids: + return 0.0 + + # Find in which regions these key objects appear + regions_containing_key_objects = set() + # Count how many of the *instances* of key objects are found + # This helps differentiate a scene with 1 chair in 1 region vs 5 chairs spread over 5 regions + total_key_object_instances_found = 0 + + for region_name, objects_in_region_list in spatial_analysis_results["objects_by_region"].items(): + region_has_key_object = False + for obj_in_region in objects_in_region_list: + if obj_in_region.get('class_id') in key_object_class_ids: + region_has_key_object = True + total_key_object_instances_found += 1 # Count each instance + if region_has_key_object: + regions_containing_key_objects.add(region_name) + + num_distinct_key_objects_in_scene = len(key_object_class_ids) # Number of *types* of key objects + num_instances_of_key_objects_passed = len(objects_for_scene) # Number of *instances* passed for this scene + + if not regions_containing_key_objects or num_instances_of_key_objects_passed == 0: + return 0.0 + + # A simple heuristic: + if len(regions_containing_key_objects) == 1 and total_key_object_instances_found >= num_instances_of_key_objects_passed * 0.75: + return 0.10 # Strongest cohesion: most/all key object instances in a single region + elif len(regions_containing_key_objects) <= 2 and total_key_object_instances_found >= num_instances_of_key_objects_passed * 0.60: + return 0.05 # Moderate cohesion: most/all key object instances in up to two regions + elif len(regions_containing_key_objects) <= 3 and total_key_object_instances_found >= num_instances_of_key_objects_passed * 0.50: + return 0.02 # Weaker cohesion + + return 0.0 + + + def _compute_scene_scores(self, detected_objects: List[Dict], spatial_analysis_results: Optional[Dict] = None) -> Dict[str, float]: """ Compute confidence scores for each scene type based on detected objects. + Enhanced to better score everyday scenes and consider object richness and spatial cohesion. + Args: - detected_objects: List of detected objects + detected_objects: List of detected objects with their details (class_id, confidence, region, etc.). + spatial_analysis_results: Optional output from SpatialAnalyzer, specifically 'objects_by_region', + which is used by _get_object_spatial_cohesion_score. + Returns: - Dictionary mapping scene types to confidence scores + Dictionary mapping scene types to confidence scores. """ scene_scores = {} - detected_class_ids = [obj["class_id"] for obj in detected_objects] - detected_classes_set = set(detected_class_ids) - - # Count occurrence of each class - class_counts = {} + if not detected_objects: + for scene_type_key in self.SCENE_TYPES: + scene_scores[scene_type_key] = 0.0 + return scene_scores + + # Prepare data from detected_objects + detected_class_ids_all = [obj["class_id"] for obj in detected_objects] + detected_classes_set_all = set(detected_class_ids_all) + class_counts_all = {} for obj in detected_objects: class_id = obj["class_id"] - if class_id not in class_counts: - class_counts[class_id] = 0 - class_counts[class_id] += 1 + class_counts_all[class_id] = class_counts_all.get(class_id, 0) + 1 - # Evaluate each scene type + # Evaluate each scene type defined in self.SCENE_TYPES for scene_type, scene_def in self.SCENE_TYPES.items(): - # Count required objects present - required_objects = set(scene_def["required_objects"]) - required_present = required_objects.intersection(detected_classes_set) - - # Count optional objects present - optional_objects = set(scene_def["optional_objects"]) - optional_present = optional_objects.intersection(detected_classes_set) - - # Skip if minimum required objects aren't present - if len(required_present) < scene_def["minimum_required"]: - scene_scores[scene_type] = 0 + required_obj_ids_defined = set(scene_def.get("required_objects", [])) + optional_obj_ids_defined = set(scene_def.get("optional_objects", [])) + min_required_matches_needed = scene_def.get("minimum_required", 0) + + # Determine which actual detected objects are relevant for this scene_type + # These lists will store the actual detected object dicts, not just class_ids + actual_required_objects_found_list = [] + for req_id in required_obj_ids_defined: + if req_id in detected_classes_set_all: + # Find first instance of this required object to add to list (for cohesion check later) + for dobj in detected_objects: + if dobj['class_id'] == req_id: + actual_required_objects_found_list.append(dobj) + break + + num_required_matches_found = len(actual_required_objects_found_list) + + actual_optional_objects_found_list = [] + for opt_id in optional_obj_ids_defined: + if opt_id in detected_classes_set_all: + for dobj in detected_objects: + if dobj['class_id'] == opt_id: + actual_optional_objects_found_list.append(dobj) + break + + num_optional_matches_found = len(actual_optional_objects_found_list) + + # --- Initial Score Calculation Weights --- + # Base score: 55% from required, 25% from optional, 10% richness, 10% cohesion (max) + required_weight = 0.55 + optional_weight = 0.25 + richness_bonus_max = 0.10 + cohesion_bonus_max = 0.10 # Max bonus from _get_object_spatial_cohesion_score is 0.1 + + current_scene_score = 0.0 + objects_to_check_for_cohesion = [] # For spatial cohesion scoring + + # --- Check minimum_required condition & Calculate base score --- + if num_required_matches_found >= min_required_matches_needed: + if len(required_obj_ids_defined) > 0: + required_ratio = num_required_matches_found / len(required_obj_ids_defined) + else: # No required objects defined, but min_required_matches_needed could be 0 + required_ratio = 1.0 if min_required_matches_needed == 0 else 0.0 + + current_scene_score = required_ratio * required_weight + objects_to_check_for_cohesion.extend(actual_required_objects_found_list) + + # Add score from optional objects + if len(optional_obj_ids_defined) > 0: + optional_ratio = num_optional_matches_found / len(optional_obj_ids_defined) + current_scene_score += optional_ratio * optional_weight + objects_to_check_for_cohesion.extend(actual_optional_objects_found_list) + + # Flexible handling for "everyday scenes" if strict minimum_required (based on 'required_objects') isn't met + elif scene_type in self.EVERYDAY_SCENE_TYPE_KEYS: + # If an everyday scene has many optional items, it might still be a weak candidate + # Check if a decent proportion of its 'optional_objects' are present + if len(optional_obj_ids_defined) > 0 and \ + (num_optional_matches_found / len(optional_obj_ids_defined)) >= 0.25: # e.g., at least 25% of typical optional items + # Base score more on optional fulfillment for these types + current_scene_score = (num_optional_matches_found / len(optional_obj_ids_defined)) * (required_weight + optional_weight * 0.5) # Give some base + objects_to_check_for_cohesion.extend(actual_optional_objects_found_list) + else: + scene_scores[scene_type] = 0.0 + continue # Skip this scene type + else: # For non-everyday scenes, if minimum_required is not met, score is 0 + scene_scores[scene_type] = 0.0 continue - # Base score from required objects - required_ratio = len(required_present) / max(1, len(required_objects)) - required_score = required_ratio * 0.7 # 70% of score from required objects - - # Additional score from optional objects - optional_ratio = len(optional_present) / max(1, len(optional_objects)) - optional_score = optional_ratio * 0.3 # 30% of score from optional objects - - # Bonus for having multiple instances of key objects - multiple_bonus = 0 - for class_id in required_present: - if class_counts.get(class_id, 0) > 1: - multiple_bonus += 0.05 # 5% bonus per additional key object type - - # Cap the bonus at 15% - multiple_bonus = min(0.15, multiple_bonus) - - # Calculate final score - final_score = required_score + optional_score + multiple_bonus - + # --- Bonus for object richness/variety --- + # Considers unique object *classes* found that are relevant to the scene definition + relevant_defined_class_ids = required_obj_ids_defined.union(optional_obj_ids_defined) + unique_relevant_detected_classes = relevant_defined_class_ids.intersection(detected_classes_set_all) + + object_richness_score = 0.0 + if len(relevant_defined_class_ids) > 0: + richness_ratio = len(unique_relevant_detected_classes) / len(relevant_defined_class_ids) + object_richness_score = min(richness_bonus_max, richness_ratio * 0.15) # Max 10% bonus from richness + current_scene_score += object_richness_score + + # --- Bonus for spatial cohesion (if spatial_analysis_results are provided) --- + spatial_cohesion_bonus = 0.0 + if spatial_analysis_results and objects_to_check_for_cohesion: + # Deduplicate objects_to_check_for_cohesion based on actual object instances (not just class_id) + # This can be done by converting list of dicts to list of tuples of items for hashing + # However, assuming _get_object_spatial_cohesion_score handles instances correctly. + # If objects_to_check_for_cohesion might have duplicate dict references for the SAME object, + # then a more robust deduplication on actual object references would be needed if not already handled. + # For now, assume it's a list of unique object *instances* found relevant to the scene. + spatial_cohesion_bonus = self._get_object_spatial_cohesion_score( + objects_to_check_for_cohesion, # Pass the list of actual detected object dicts + spatial_analysis_results + ) + current_scene_score += spatial_cohesion_bonus # Max 0.1 from this bonus + + # --- Bonus for multiple instances of key objects (original logic refined) --- + multiple_instance_bonus = 0.0 + # For multiple instance bonus, focus on objects central to the scene's definition + key_objects_for_multi_instance_check = required_obj_ids_defined + if scene_type in self.EVERYDAY_SCENE_TYPE_KEYS and len(optional_obj_ids_defined) > 0: + # For everyday scenes, some optionals can also be key if they appear multiple times + # e.g., multiple chairs in a "general_indoor_space" + key_objects_for_multi_instance_check = key_objects_for_multi_instance_check.union( + set(list(optional_obj_ids_defined)[:max(1, len(optional_obj_ids_defined)//2)]) # consider first half of optionals + ) + + for class_id_check in key_objects_for_multi_instance_check: + if class_id_check in detected_classes_set_all and class_counts_all.get(class_id_check, 0) > 1: + multiple_instance_bonus += 0.025 # Slightly smaller bonus per type + current_scene_score += min(0.075, multiple_instance_bonus) # Max 7.5% bonus + + # Apply scene-specific priority defined in SCENE_TYPES if "priority" in scene_def: - final_score *= scene_def["priority"] + current_scene_score *= scene_def["priority"] + + scene_scores[scene_type] = min(1.0, max(0.0, current_scene_score)) - # Normalize to 0-1 range - scene_scores[scene_type] = min(1.0, final_score) + # If landmark detection is disabled via the instance attribute self.enable_landmark, + # ensure scores for landmark-specific scene types are zeroed out. + if hasattr(self, 'enable_landmark') and not self.enable_landmark: + landmark_scene_types = ["tourist_landmark", "natural_landmark", "historical_monument"] + for lm_scene_type in landmark_scene_types: + if lm_scene_type in scene_scores: + scene_scores[lm_scene_type] = 0.0 return scene_scores @@ -419,68 +1325,563 @@ class SceneAnalyzer: Tuple of (best_scene_type, confidence) """ if not scene_scores: - return "unknown", 0 + return "unknown", 0.0 - # Find scene with highest score best_scene = max(scene_scores, key=scene_scores.get) best_score = scene_scores[best_scene] + return best_scene, float(best_score) - return best_scene, best_score - - def _fuse_scene_scores(self, yolo_scene_scores: Dict[str, float], clip_scene_scores: Dict[str, float]) -> Dict[str, float]: + def _fuse_scene_scores(self, + yolo_scene_scores: Dict[str, float], + clip_scene_scores: Dict[str, float], + num_yolo_detections: int = 0, + avg_yolo_confidence: float = 0.0, + lighting_info: Optional[Dict] = None, + places365_info: Optional[Dict] = None + ) -> Dict[str, float]: """ - 融合基於 YOLO 物體檢測和 CLIP 分析的場景分數。 + Fuse scene scores from YOLO-based object detection, CLIP-based analysis, and Places365 scene classification. + Adjusts weights based on scene type, richness of YOLO detections, lighting information, and Places365 confidence. + Args: - yolo_scene_scores: 基於 YOLO 物體檢測的場景分數 - clip_scene_scores: 基於 CLIP 分析的場景分數 + yolo_scene_scores: Scene scores based on YOLO object detection. + clip_scene_scores: Scene scores based on CLIP analysis. + num_yolo_detections: Total number of non-landmark objects detected by YOLO with sufficient confidence. + avg_yolo_confidence: Average confidence of non-landmark objects detected by YOLO. + lighting_info: Optional lighting condition analysis results, + expected to contain 'is_indoor' (bool) and 'confidence' (float). + places365_info: Optional Places365 scene classification results, + expected to contain 'mapped_scene_type', 'confidence', and 'is_indoor'. + Returns: - Dict: 融合後的場景分數 + Dict: Fused scene scores incorporating all three analysis sources. """ - # 如果沒有 CLIP 分數,直接返回 YOLO 分數 - if not clip_scene_scores: - return yolo_scene_scores - - # 如果沒有 YOLO 分數,直接返回 CLIP 分數 - if not yolo_scene_scores: - return clip_scene_scores + # Handle cases where one of the score dictionaries might be empty or all scores are effectively zero + # Extract and process Places365 scene scores + places365_scene_scores_map = {} # 修改變數名稱以避免與傳入的字典衝突 + if places365_info and places365_info.get('confidence', 0) > 0.1: + mapped_scene_type = places365_info.get('mapped_scene_type', 'unknown') + places365_confidence = places365_info.get('confidence', 0.0) + + if mapped_scene_type in self.SCENE_TYPES.keys(): + places365_scene_scores_map[mapped_scene_type] = places365_confidence # 使用新的字典 + print(f"Places365 contributing: {mapped_scene_type} with confidence {places365_confidence:.3f}") + + yolo_has_meaningful_scores = bool(yolo_scene_scores and any(s > 1e-5 for s in yolo_scene_scores.values())) # 確保是布林值 + clip_has_meaningful_scores = bool(clip_scene_scores and any(s > 1e-5 for s in clip_scene_scores.values())) # 確保是布林值 + places365_has_meaningful_scores = bool(places365_scene_scores_map and any(s > 1e-5 for s in places365_scene_scores_map.values())) + + meaningful_sources_count = sum([ + yolo_has_meaningful_scores, + clip_has_meaningful_scores, + places365_has_meaningful_scores + ]) + + + if meaningful_sources_count == 0: + return {st: 0.0 for st in self.SCENE_TYPES.keys()} + elif meaningful_sources_count == 1: + if yolo_has_meaningful_scores: + return {st: yolo_scene_scores.get(st, 0.0) for st in self.SCENE_TYPES.keys()} + elif clip_has_meaningful_scores: + return {st: clip_scene_scores.get(st, 0.0) for st in self.SCENE_TYPES.keys()} + elif places365_has_meaningful_scores: + return {st: places365_scene_scores_map.get(st, 0.0) for st in self.SCENE_TYPES.keys()} - # 融合分數 fused_scores = {} + all_relevant_scene_types = set(self.SCENE_TYPES.keys()) + all_possible_scene_types = all_relevant_scene_types.union( + set(yolo_scene_scores.keys()), + set(clip_scene_scores.keys()), + set(places365_scene_scores_map.keys()) + ) - # 獲取所有場景類型 - all_scene_types = set(list(yolo_scene_scores.keys()) + list(clip_scene_scores.keys())) - - for scene_type in all_scene_types: - # 獲取兩個模型的分數 - yolo_score = yolo_scene_scores.get(scene_type, 0) - clip_score = clip_scene_scores.get(scene_type, 0) - - # 設置基本權重 - yolo_weight = 0.7 # YOLO 可提供比較好的物體資訊 - clip_weight = 0.3 # CLIP 強項是理解整體的場景關係 - - # 對特定類型場景調整權重 - # 文化特定場景或具有特殊布局的場景,CLIP可能比較能理解 - if any(keyword in scene_type for keyword in ["asian", "cultural", "aerial"]): - yolo_weight = 0.3 - clip_weight = 0.7 - - # 對室內家居場景,物體檢測通常更準確 - elif any(keyword in scene_type for keyword in ["room", "kitchen", "office", "bedroom"]): - yolo_weight = 0.8 - clip_weight = 0.2 - elif scene_type == "beach_water_recreation": - yolo_weight = 0.8 # 衝浪板等特定物品的檢測 - clip_weight = 0.2 - elif scene_type == "sports_venue": - yolo_weight = 0.7 - clip_weight = 0.3 - elif scene_type == "professional_kitchen": - yolo_weight = 0.8 # 廚房用具的檢測非常重要 - clip_weight = 0.2 - - # 計算加權分數 - fused_scores[scene_type] = (yolo_score * yolo_weight) + (clip_score * clip_weight) + # Base weights - adjusted to accommodate three sources + default_yolo_weight = 0.5 + default_clip_weight = 0.3 + default_places365_weight = 0.2 + + is_lighting_indoor = None + lighting_analysis_confidence = 0.0 + if lighting_info and isinstance(lighting_info, dict): + is_lighting_indoor = lighting_info.get("is_indoor") + lighting_analysis_confidence = lighting_info.get("confidence", 0.0) + + for scene_type in all_possible_scene_types: + yolo_score = yolo_scene_scores.get(scene_type, 0.0) + clip_score = clip_scene_scores.get(scene_type, 0.0) + places365_score = places365_scene_scores_map.get(scene_type, 0.0) + + current_yolo_weight = default_yolo_weight + current_clip_weight = default_clip_weight + current_places365_weight = default_places365_weight + + scene_definition = self.SCENE_TYPES.get(scene_type, {}) + + # Weight adjustment based on scene_type nature and YOLO richness + if scene_type in self.EVERYDAY_SCENE_TYPE_KEYS: + # Places365 excels at everyday scene classification + if num_yolo_detections >= 5 and avg_yolo_confidence >= 0.45: # Rich YOLO for everyday + current_yolo_weight = 0.60 + current_clip_weight = 0.15 + current_places365_weight = 0.25 + elif num_yolo_detections >= 3: # Moderate YOLO for everyday + current_yolo_weight = 0.50 + current_clip_weight = 0.20 + current_places365_weight = 0.30 + else: # Sparse YOLO for everyday, rely more on Places365 + current_yolo_weight = 0.35 + current_clip_weight = 0.25 + current_places365_weight = 0.40 + + # For scenes where CLIP's global understanding or specific training is often more valuable + elif any(keyword in scene_type.lower() for keyword in ["asian", "cultural", "aerial", "landmark", "monument", "tourist", "natural_landmark", "historical_monument"]): + current_yolo_weight = 0.25 + current_clip_weight = 0.65 + current_places365_weight = 0.10 # Lower weight for landmark scenes + + # For specific indoor common scenes (non-landmark), object detection is key but Places365 provides strong scene context + elif any(keyword in scene_type.lower() for keyword in + ["room", "kitchen", "office", "bedroom", "desk_area", "indoor_space", + "professional_kitchen", "cafe", "library", "gym", "retail_store", + "supermarket", "classroom", "conference_room", "medical_facility", + "educational_setting", "dining_area"]): + current_yolo_weight = 0.55 + current_clip_weight = 0.20 + current_places365_weight = 0.25 + + # For specific outdoor common scenes (non-landmark) where objects are still important + elif any(keyword in scene_type.lower() for keyword in + ["parking_lot", "park_area", "beach", "harbor", "playground", "sports_field", "bus_stop", "train_station", "airport"]): + current_yolo_weight = 0.50 + current_clip_weight = 0.25 + current_places365_weight = 0.25 + + # If landmark detection is globally disabled for this run + if hasattr(self, 'enable_landmark') and not self.enable_landmark: + if any(keyword in scene_type.lower() for keyword in ["landmark", "monument", "tourist"]): + yolo_score = 0.0 # Should already be 0 from _compute_scene_scores + clip_score *= 0.05 # Heavily penalize + places365_score *= 0.8 if scene_type not in self.EVERYDAY_SCENE_TYPE_KEYS else 1.0 # Slight penalty for landmark scenes + elif scene_type not in self.EVERYDAY_SCENE_TYPE_KEYS and \ + not any(keyword in scene_type.lower() for keyword in ["asian", "cultural", "aerial"]): + # Redistribute weights away from CLIP towards YOLO and Places365 + weight_boost = 0.05 + current_yolo_weight = min(0.9, current_yolo_weight + weight_boost) + current_places365_weight = min(0.9, current_places365_weight + weight_boost) + current_clip_weight = max(0.1, current_clip_weight - weight_boost * 2) + + # Boost Places365 weight if it has high confidence for this specific scene type + if places365_score > 0.0 and places365_info: # 這裡的 places365_score 已經是從 map 中獲取 + places365_original_confidence = places365_info.get('confidence', 0.0) # 獲取原始的 Places365 信心度 + if places365_original_confidence > 0.7: + boost_factor = min(0.2, (places365_original_confidence - 0.7) * 0.4) + current_places365_weight += boost_factor + total_other_weight = current_yolo_weight + current_clip_weight + if total_other_weight > 0: + reduction_factor = boost_factor / total_other_weight + current_yolo_weight *= (1 - reduction_factor) + current_clip_weight *= (1 - reduction_factor) + + total_weight = current_yolo_weight + current_clip_weight + current_places365_weight + if total_weight > 0: # 避免除以零 + current_yolo_weight /= total_weight + current_clip_weight /= total_weight + current_places365_weight /= total_weight + else: + current_yolo_weight = 1/3 + current_clip_weight = 1/3 + current_places365_weight = 1/3 + + + fused_score = (yolo_score * current_yolo_weight) + (clip_score * current_clip_weight) + (places365_score * current_places365_weight) + + places365_is_indoor = None + places365_confidence_for_indoor = 0.0 + effective_is_indoor = is_lighting_indoor + effective_confidence = lighting_analysis_confidence + + if places365_info and isinstance(places365_info, dict): + places365_is_indoor = places365_info.get('is_indoor') + places365_confidence_for_indoor = places365_info.get('confidence', 0.0) + + # Places365 overrides lighting analysis when confidence is high + if places365_confidence_for_indoor >= 0.8 and places365_is_indoor is not None: + effective_is_indoor = places365_is_indoor + effective_confidence = places365_confidence_for_indoor + + # 只在特定場景類型首次處理時輸出調試資訊 + if scene_type == "intersection" or (scene_type in ["urban_intersection", "street_view"] and scene_type == sorted(all_possible_scene_types)[0]): + print(f"DEBUG: Using Places365 indoor/outdoor decision: {places365_is_indoor} (confidence: {places365_confidence_for_indoor:.3f}) over lighting analysis") + + if effective_is_indoor is not None and effective_confidence >= 0.65: + # Determine if the scene_type is inherently indoor or outdoor based on its definition + is_defined_as_indoor = "indoor" in scene_definition.get("description", "").lower() or \ + any(kw in scene_type.lower() for kw in ["room", "kitchen", "office", "indoor", "library", "cafe", "gym"]) + is_defined_as_outdoor = "outdoor" in scene_definition.get("description", "").lower() or \ + any(kw in scene_type.lower() for kw in ["street", "park", "aerial", "beach", "harbor", "intersection", "crosswalk"]) + + lighting_adjustment_strength = 0.20 # Max adjustment factor (e.g., 20%) + # Scale adjustment by how confident the analysis is above the threshold + adjustment_scale = (effective_confidence - 0.65) / (1.0 - 0.65) # Scale from 0 to 1 + adjustment = lighting_adjustment_strength * adjustment_scale + adjustment = min(lighting_adjustment_strength, max(0, adjustment)) # Clamp adjustment + + if effective_is_indoor and is_defined_as_outdoor: + fused_score *= (1.0 - adjustment) + elif not effective_is_indoor and is_defined_as_indoor: + fused_score *= (1.0 - adjustment) + elif effective_is_indoor and is_defined_as_indoor: + fused_score = min(1.0, fused_score * (1.0 + adjustment * 0.5)) + elif not effective_is_indoor and is_defined_as_outdoor: + fused_score = min(1.0, fused_score * (1.0 + adjustment * 0.5)) + + fused_scores[scene_type] = min(1.0, max(0.0, fused_score)) return fused_scores + + + def process_unknown_objects(self, detection_result, detected_objects): + """ + 對YOLO未能識別或信心度低的物體進行地標檢測 + + Args: + detection_result: YOLO檢測結果 + detected_objects: 已識別的物體列表 + + Returns: + tuple: (更新後的物體列表, 地標物體列表) + """ + if not getattr(self, 'enable_landmark', True) or not self.use_clip or not hasattr(self, 'use_landmark_detection') or not self.use_landmark_detection: + # 未啟用地標識別時,確保返回的物體列表中不包含任何地標物體 + cleaned_objects = [obj for obj in detected_objects if not obj.get("is_landmark", False)] + return cleaned_objects, [] + + try: + # 獲取原始圖像 + original_image = None + if detection_result is not None and hasattr(detection_result, 'orig_img'): + original_image = detection_result.orig_img + + # 檢查原始圖像是否存在 + if original_image is None: + print("Warning: Original image not available for landmark detection") + return detected_objects, [] + + # 確保原始圖像為PIL格式或可轉換為PIL格式 + if not isinstance(original_image, Image.Image): + if isinstance(original_image, np.ndarray): + try: + if original_image.ndim == 3 and original_image.shape[2] == 4: # RGBA + original_image = original_image[:, :, :3] # 轉換為RGB + if original_image.ndim == 2: # 灰度圖 + original_image = Image.fromarray(original_image).convert("RGB") + else: # 假設為RGB或BGR + original_image = Image.fromarray(original_image) + + if hasattr(original_image, 'mode') and original_image.mode == 'BGR': # 從OpenCV明確將BGR轉換為RGB + original_image = original_image.convert('RGB') + except Exception as e: + print(f"Warning: Error converting image for landmark detection: {e}") + return detected_objects, [] + else: + print(f"Warning: Cannot process image of type {type(original_image)}") + return detected_objects, [] + + # 獲取圖像維度 + if isinstance(original_image, np.ndarray): + h, w = original_image.shape[:2] + elif isinstance(original_image, Image.Image): + w, h = original_image.size + else: + print(f"Warning: Unable to determine image dimensions for type {type(original_image)}") + return detected_objects, [] + + # 收集可能含有地標的區域 + candidate_boxes = [] + low_conf_boxes = [] + + # 即使沒有YOLO檢測到的物體,也嘗試進行更詳細的地標分析 + if len(detected_objects) == 0: + # 創建一個包含整個圖像的框 + full_image_box = [0, 0, w, h] + low_conf_boxes.append(full_image_box) + candidate_boxes.append((full_image_box, "full_image")) + + # 加入網格分析以增加檢測成功率 + grid_size = 2 # 2x2網格 + for i in range(grid_size): + for j in range(grid_size): + # 創建網格框 + grid_box = [ + j * w / grid_size, + i * h / grid_size, + (j + 1) * w / grid_size, + (i + 1) * h / grid_size + ] + low_conf_boxes.append(grid_box) + candidate_boxes.append((grid_box, "grid")) + + # 創建更大的中心框(覆蓋中心70%區域) + center_box = [ + w * 0.15, h * 0.15, + w * 0.85, h * 0.85 + ] + low_conf_boxes.append(center_box) + candidate_boxes.append((center_box, "center")) + + print("No YOLO detections, attempting detailed landmark analysis with multiple regions") + else: + try: + # 獲取原始YOLO檢測結果中的低置信度物體 + if hasattr(detection_result, 'boxes') and hasattr(detection_result.boxes, 'xyxy') and hasattr(detection_result.boxes, 'conf') and hasattr(detection_result.boxes, 'cls'): + all_boxes = detection_result.boxes.xyxy.cpu().numpy() if hasattr(detection_result.boxes.xyxy, 'cpu') else detection_result.boxes.xyxy + all_confs = detection_result.boxes.conf.cpu().numpy() if hasattr(detection_result.boxes.conf, 'cpu') else detection_result.boxes.conf + all_cls = detection_result.boxes.cls.cpu().numpy() if hasattr(detection_result.boxes.cls, 'cpu') else detection_result.boxes.cls + + # 收集低置信度區域和可能含有地標的區域(如建築物) + for i, (box, conf, cls) in enumerate(zip(all_boxes, all_confs, all_cls)): + is_low_conf = conf < 0.4 and conf > 0.1 + + # 根據物體類別 ID 識別建築物 - 使用通用分類 + common_building_classes = [11, 12, 13, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65] # 常見建築類別 ID + is_building = int(cls) in common_building_classes + + # 計算相對面積 - 大物體 + is_large_object = (box[2] - box[0]) * (box[3] - box[1]) > (0.1 * w * h) + + if is_low_conf or is_building: + # 確保 box 是一個有效的數組或列表 + if isinstance(box, (list, tuple, np.ndarray)) and len(box) >= 4: + low_conf_boxes.append(box) + if is_large_object: + candidate_boxes.append((box, "building" if is_building else "low_conf")) + except Exception as e: + print(f"Error processing YOLO detections: {e}") + import traceback + traceback.print_exc() + + if not hasattr(self, 'landmark_classifier'): + if hasattr(self, 'clip_analyzer') and hasattr(self.clip_analyzer, 'get_clip_instance'): + try: + print("Initializing landmark classifier for process_unknown_objects") + model, preprocess, device = self.clip_analyzer.get_clip_instance() + self.landmark_classifier = CLIPZeroShotClassifier(device=device) + except Exception as e: + print(f"Error initializing landmark classifier: {e}") + return detected_objects, [] + else: + print("Warning: landmark_classifier not available and cannot be initialized") + return detected_objects, [] + + # 使用智能地標搜索 + landmark_results = None + try: + # 確保有有效的框 + if not low_conf_boxes: + # 如果沒有低置信度框,添加全圖 + low_conf_boxes.append([0, 0, w, h]) + + landmark_results = self.landmark_classifier.intelligent_landmark_search( + original_image, + yolo_boxes=low_conf_boxes, + base_threshold=0.25 + ) + except Exception as e: + print(f"Error in intelligent_landmark_search: {e}") + import traceback + traceback.print_exc() + return detected_objects, [] + + # 處理識別結果 + landmark_objects = [] + + # 如果有效的地標結果 + if landmark_results and landmark_results.get("is_landmark_scene", False): + for landmark_info in landmark_results.get("detected_landmarks", []): + try: + # 使用 landmark_classifier 的閾值判斷 + base_threshold = 0.25 # 基礎閾值 + + # 獲取地標類型並設定閾值 + landmark_type = "architectural" # 預設類型 + type_threshold = 0.5 # 預設閾值 + + # 優先使用 landmark_classifier + if hasattr(self, 'landmark_classifier') and hasattr(self.landmark_classifier, '_determine_landmark_type'): + landmark_type = self.landmark_classifier._determine_landmark_type(landmark_info.get("landmark_id", "")) + type_threshold = getattr(self.landmark_classifier, 'landmark_type_thresholds', {}).get(landmark_type, 0.5) + # 否則使用本地方法 + elif hasattr(self, '_determine_landmark_type'): + landmark_type = self._determine_landmark_type(landmark_info.get("landmark_id", "")) + # 依據地標類型調整閾值 + if landmark_type == "skyscraper": + type_threshold = 0.4 + elif landmark_type == "natural": + type_threshold = 0.6 + # 或者直接從地標 ID 推斷 + else: + landmark_id = landmark_info.get("landmark_id", "").lower() + if any(term in landmark_id for term in ["mountain", "canyon", "waterfall", "lake", "river", "natural"]): + landmark_type = "natural" + type_threshold = 0.6 + elif any(term in landmark_id for term in ["skyscraper", "building", "tower", "tall"]): + landmark_type = "skyscraper" + type_threshold = 0.4 + elif any(term in landmark_id for term in ["monument", "memorial", "statue", "historical"]): + landmark_type = "monument" + type_threshold = 0.5 + + effective_threshold = base_threshold * (type_threshold / 0.5) + # 如果置信度足夠高 + if landmark_info.get("confidence", 0) > effective_threshold: + # 獲取邊界框 + if "box" in landmark_info: + box = landmark_info["box"] + else: + # 如果沒有邊界框,使用整個圖像的90%區域 + margin_x, margin_y = w * 0.05, h * 0.05 + box = [margin_x, margin_y, w - margin_x, h - margin_y] + + # 計算中心點和其他必要信息 + center_x = (box[0] + box[2]) / 2 + center_y = (box[1] + box[3]) / 2 + norm_center_x = center_x / w if w > 0 else 0.5 + norm_center_y = center_y / h if h > 0 else 0.5 + + # 獲取區域位置 + region = "center" # 預設 + if hasattr(self, 'spatial_analyzer') and hasattr(self.spatial_analyzer, '_determine_region'): + try: + region = self.spatial_analyzer._determine_region(norm_center_x, norm_center_y) + except Exception as e: + print(f"Error determining region: {e}") + + # 創建地標物體 + landmark_obj = { + "class_id": landmark_info.get("landmark_id", "")[:15] if isinstance(landmark_info.get("landmark_id", ""), str) else "-100", # 截斷過長的 ID + "class_name": landmark_info.get("landmark_name", "Unknown Landmark"), + "confidence": landmark_info.get("confidence", 0.0), + "box": box, + "center": (center_x, center_y), + "normalized_center": (norm_center_x, norm_center_y), + "size": (box[2] - box[0], box[3] - box[1]), + "normalized_size": ( + (box[2] - box[0]) / w if w > 0 else 0, + (box[3] - box[1]) / h if h > 0 else 0 + ), + "area": (box[2] - box[0]) * (box[3] - box[1]), + "normalized_area": ( + (box[2] - box[0]) * (box[3] - box[1]) / (w * h) if w * h > 0 else 0 + ), + "region": region, + "is_landmark": True, + "landmark_id": landmark_info.get("landmark_id", ""), + "location": landmark_info.get("location", "Unknown Location") + } + + # 添加額外信息 + for key in ["year_built", "architectural_style", "significance"]: + if key in landmark_info: + landmark_obj[key] = landmark_info[key] + + # 添加地標類型 + landmark_obj["landmark_type"] = landmark_type + + # 添加到檢測物體列表 + detected_objects.append(landmark_obj) + landmark_objects.append(landmark_obj) + print(f"Detected landmark: {landmark_info.get('landmark_name', 'Unknown')} with confidence {landmark_info.get('confidence', 0.0):.2f}") + except Exception as e: + print(f"Error processing landmark: {e}") + continue + + return detected_objects, landmark_objects + + return detected_objects, [] + + except Exception as e: + print(f"Error in landmark detection: {e}") + import traceback + traceback.print_exc() + return detected_objects, [] + + def _remove_landmark_references(self, text): + """ + 從文本中移除所有地標引用 + + Args: + text: 輸入文本 + + Returns: + str: 清除地標引用後的文本 + """ + if not text: + return text + + import re + + try: + # 動態收集所有地標名稱和位置 + landmark_names = [] + locations = [] + + for landmark_id, info in ALL_LANDMARKS.items(): + # 收集地標名稱及其別名 + landmark_names.append(info["name"]) + landmark_names.extend(info.get("aliases", [])) + + # 收集地理位置 + if "location" in info: + location = info["location"] + locations.append(location) + + # 處理分離的城市和國家名稱 + parts = location.split(",") + if len(parts) >= 1: + locations.append(parts[0].strip()) + if len(parts) >= 2: + locations.append(parts[1].strip()) + + # 使用正則表達式動態替換所有地標名稱 + for name in landmark_names: + if name and len(name) > 2: # 避免過短的名稱 + text = re.sub(r'\b' + re.escape(name) + r'\b', "tall structure", text, flags=re.IGNORECASE) + + # 動態替換所有位置引用 + for location in locations: + if location and len(location) > 2: + # 替換常見位置表述模式 + text = re.sub(r'in ' + re.escape(location), "in the urban area", text, flags=re.IGNORECASE) + text = re.sub(r'of ' + re.escape(location), "of the urban area", text, flags=re.IGNORECASE) + text = re.sub(r'\b' + re.escape(location) + r'\b', "the urban area", text, flags=re.IGNORECASE) + + except ImportError: + # 通用地標描述模式 + landmark_patterns = [ + # 地標地點模式 + (r'an iconic structure in ([A-Z][a-zA-Z\s,]+)', r'an urban structure'), + (r'a famous (monument|tower|landmark) in ([A-Z][a-zA-Z\s,]+)', r'an urban structure'), + (r'(the [A-Z][a-zA-Z\s]+ Tower)', r'the tower'), + (r'(the [A-Z][a-zA-Z\s]+ Building)', r'the building'), + (r'(the CN Tower)', r'the tower'), + (r'([A-Z][a-zA-Z\s]+) Tower', r'tall structure'), + + # 地標位置關係模式 + (r'(centered|built|located|positioned) around the ([A-Z][a-zA-Z\s]+? (Tower|Monument|Landmark))', r'located in this area'), + + # 地標活動模式 + (r'(sightseeing|guided tours|cultural tourism) (at|around|near) (this landmark|the [A-Z][a-zA-Z\s]+)', r'\1 in this area'), + + # 一般性地標形容模式 + (r'this (famous|iconic|historic|well-known) (landmark|monument|tower|structure)', r'this urban structure'), + (r'landmark scene', r'urban scene'), + (r'tourist destination', r'urban area'), + (r'tourist attraction', r'urban area') + ] + + for pattern, replacement in landmark_patterns: + text = re.sub(pattern, replacement, text, flags=re.IGNORECASE) + + return text