import logging from typing import Optional, Tuple from urllib.parse import urlparse import grpc from open_webui.config import ( QDRANT_API_KEY, QDRANT_GRPC_PORT, QDRANT_ON_DISK, QDRANT_PREFER_GRPC, QDRANT_URI, ) from open_webui.env import SRC_LOG_LEVELS from open_webui.retrieval.vector.main import ( GetResult, SearchResult, VectorDBBase, VectorItem, ) from qdrant_client import QdrantClient as Qclient from qdrant_client.http.exceptions import UnexpectedResponse from qdrant_client.http.models import PointStruct from qdrant_client.models import models NO_LIMIT = 999999999 log = logging.getLogger(__name__) log.setLevel(SRC_LOG_LEVELS["RAG"]) class QdrantClient(VectorDBBase): def __init__(self): self.collection_prefix = "open-webui" self.QDRANT_URI = QDRANT_URI self.QDRANT_API_KEY = QDRANT_API_KEY self.QDRANT_ON_DISK = QDRANT_ON_DISK self.PREFER_GRPC = QDRANT_PREFER_GRPC self.GRPC_PORT = QDRANT_GRPC_PORT if not self.QDRANT_URI: self.client = None return # Unified handling for either scheme parsed = urlparse(self.QDRANT_URI) host = parsed.hostname or self.QDRANT_URI http_port = parsed.port or 6333 # default REST port if self.PREFER_GRPC: self.client = Qclient( host=host, port=http_port, grpc_port=self.GRPC_PORT, prefer_grpc=self.PREFER_GRPC, api_key=self.QDRANT_API_KEY, ) else: self.client = Qclient(url=self.QDRANT_URI, api_key=self.QDRANT_API_KEY) # Main collection types for multi-tenancy self.MEMORY_COLLECTION = f"{self.collection_prefix}_memories" self.KNOWLEDGE_COLLECTION = f"{self.collection_prefix}_knowledge" self.FILE_COLLECTION = f"{self.collection_prefix}_files" self.WEB_SEARCH_COLLECTION = f"{self.collection_prefix}_web-search" self.HASH_BASED_COLLECTION = f"{self.collection_prefix}_hash-based" def _result_to_get_result(self, points) -> GetResult: ids = [] documents = [] metadatas = [] for point in points: payload = point.payload ids.append(point.id) documents.append(payload["text"]) metadatas.append(payload["metadata"]) return GetResult( **{ "ids": [ids], "documents": [documents], "metadatas": [metadatas], } ) def _get_collection_and_tenant_id(self, collection_name: str) -> Tuple[str, str]: """ Maps the traditional collection name to multi-tenant collection and tenant ID. Returns: tuple: (collection_name, tenant_id) """ # Check for user memory collections tenant_id = collection_name if collection_name.startswith("user-memory-"): return self.MEMORY_COLLECTION, tenant_id # Check for file collections elif collection_name.startswith("file-"): return self.FILE_COLLECTION, tenant_id # Check for web search collections elif collection_name.startswith("web-search-"): return self.WEB_SEARCH_COLLECTION, tenant_id # Handle hash-based collections (YouTube and web URLs) elif len(collection_name) == 63 and all( c in "0123456789abcdef" for c in collection_name ): return self.HASH_BASED_COLLECTION, tenant_id else: return self.KNOWLEDGE_COLLECTION, tenant_id def _extract_error_message(self, exception): """ Extract error message from either HTTP or gRPC exceptions Returns: tuple: (status_code, error_message) """ # Check if it's an HTTP exception if isinstance(exception, UnexpectedResponse): try: error_data = exception.structured() error_msg = error_data.get("status", {}).get("error", "") return exception.status_code, error_msg except Exception as inner_e: log.error(f"Failed to parse HTTP error: {inner_e}") return exception.status_code, str(exception) # Check if it's a gRPC exception elif isinstance(exception, grpc.RpcError): # Extract status code from gRPC error status_code = None if hasattr(exception, "code") and callable(exception.code): status_code = exception.code().value[0] # Extract error message error_msg = str(exception) if "details =" in error_msg: # Parse the details line which contains the actual error message try: details_line = [ line.strip() for line in error_msg.split("\n") if "details =" in line ][0] error_msg = details_line.split("details =")[1].strip(' "') except (IndexError, AttributeError): # Fall back to full message if parsing fails pass return status_code, error_msg # For any other type of exception return None, str(exception) def _is_collection_not_found_error(self, exception): """ Check if the exception is due to collection not found, supporting both HTTP and gRPC """ status_code, error_msg = self._extract_error_message(exception) # HTTP error (404) if ( status_code == 404 and "Collection" in error_msg and "doesn't exist" in error_msg ): return True # gRPC error (NOT_FOUND status) if ( isinstance(exception, grpc.RpcError) and exception.code() == grpc.StatusCode.NOT_FOUND ): return True return False def _is_dimension_mismatch_error(self, exception): """ Check if the exception is due to dimension mismatch, supporting both HTTP and gRPC """ status_code, error_msg = self._extract_error_message(exception) # Common patterns in both HTTP and gRPC return ( "Vector dimension error" in error_msg or "dimensions mismatch" in error_msg or "invalid vector size" in error_msg ) def _create_multi_tenant_collection_if_not_exists( self, mt_collection_name: str, dimension: int = 384 ): """ Creates a collection with multi-tenancy configuration if it doesn't exist. Default dimension is set to 384 which corresponds to 'sentence-transformers/all-MiniLM-L6-v2'. When creating collections dynamically (insert/upsert), the actual vector dimensions will be used. """ try: # Try to create the collection directly - will fail if it already exists self.client.create_collection( collection_name=mt_collection_name, vectors_config=models.VectorParams( size=dimension, distance=models.Distance.COSINE, on_disk=self.QDRANT_ON_DISK, ), hnsw_config=models.HnswConfigDiff( payload_m=16, # Enable per-tenant indexing m=0, on_disk=self.QDRANT_ON_DISK, ), ) # Create tenant ID payload index self.client.create_payload_index( collection_name=mt_collection_name, field_name="tenant_id", field_schema=models.KeywordIndexParams( type=models.KeywordIndexType.KEYWORD, is_tenant=True, on_disk=self.QDRANT_ON_DISK, ), wait=True, ) log.info( f"Multi-tenant collection {mt_collection_name} created with dimension {dimension}!" ) except (UnexpectedResponse, grpc.RpcError) as e: # Check for the specific error indicating collection already exists status_code, error_msg = self._extract_error_message(e) # HTTP status code 409 or gRPC ALREADY_EXISTS if (isinstance(e, UnexpectedResponse) and status_code == 409) or ( isinstance(e, grpc.RpcError) and e.code() == grpc.StatusCode.ALREADY_EXISTS ): if "already exists" in error_msg: log.debug(f"Collection {mt_collection_name} already exists") return # If it's not an already exists error, re-raise raise e except Exception as e: raise e def _create_points(self, items: list[VectorItem], tenant_id: str): """ Create point structs from vector items with tenant ID. """ return [ PointStruct( id=item["id"], vector=item["vector"], payload={ "text": item["text"], "metadata": item["metadata"], "tenant_id": tenant_id, }, ) for item in items ] def has_collection(self, collection_name: str) -> bool: """ Check if a logical collection exists by checking for any points with the tenant ID. """ if not self.client: return False # Map to multi-tenant collection and tenant ID mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name) # Create tenant filter tenant_filter = models.FieldCondition( key="tenant_id", match=models.MatchValue(value=tenant_id) ) try: # Try directly querying - most of the time collection should exist response = self.client.query_points( collection_name=mt_collection, query_filter=models.Filter(must=[tenant_filter]), limit=1, ) # Collection exists with this tenant ID if there are points return len(response.points) > 0 except (UnexpectedResponse, grpc.RpcError) as e: if self._is_collection_not_found_error(e): log.debug(f"Collection {mt_collection} doesn't exist") return False else: # For other API errors, log and return False _, error_msg = self._extract_error_message(e) log.warning(f"Unexpected Qdrant error: {error_msg}") return False except Exception as e: # For any other errors, log and return False log.debug(f"Error checking collection {mt_collection}: {e}") return False def delete( self, collection_name: str, ids: Optional[list[str]] = None, filter: Optional[dict] = None, ): """ Delete vectors by ID or filter from a collection with tenant isolation. """ if not self.client: return None # Map to multi-tenant collection and tenant ID mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name) # Create tenant filter tenant_filter = models.FieldCondition( key="tenant_id", match=models.MatchValue(value=tenant_id) ) must_conditions = [tenant_filter] should_conditions = [] if ids: for id_value in ids: should_conditions.append( models.FieldCondition( key="metadata.id", match=models.MatchValue(value=id_value), ), ) elif filter: for key, value in filter.items(): must_conditions.append( models.FieldCondition( key=f"metadata.{key}", match=models.MatchValue(value=value), ), ) try: # Try to delete directly - most of the time collection should exist update_result = self.client.delete( collection_name=mt_collection, points_selector=models.FilterSelector( filter=models.Filter(must=must_conditions, should=should_conditions) ), ) return update_result except (UnexpectedResponse, grpc.RpcError) as e: if self._is_collection_not_found_error(e): log.debug( f"Collection {mt_collection} doesn't exist, nothing to delete" ) return None else: # For other API errors, log and re-raise _, error_msg = self._extract_error_message(e) log.warning(f"Unexpected Qdrant error: {error_msg}") raise except Exception as e: # For non-Qdrant exceptions, re-raise raise def search( self, collection_name: str, vectors: list[list[float | int]], limit: int ) -> Optional[SearchResult]: """ Search for the nearest neighbor items based on the vectors with tenant isolation. """ if not self.client: return None # Map to multi-tenant collection and tenant ID mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name) # Get the vector dimension from the query vector dimension = len(vectors[0]) if vectors and len(vectors) > 0 else None try: # Try the search operation directly - most of the time collection should exist # Create tenant filter tenant_filter = models.FieldCondition( key="tenant_id", match=models.MatchValue(value=tenant_id) ) # Ensure vector dimensions match the collection collection_dim = self.client.get_collection( mt_collection ).config.params.vectors.size if collection_dim != dimension: if collection_dim < dimension: vectors = [vector[:collection_dim] for vector in vectors] else: vectors = [ vector + [0] * (collection_dim - dimension) for vector in vectors ] # Search with tenant filter prefetch_query = models.Prefetch( filter=models.Filter(must=[tenant_filter]), limit=NO_LIMIT, ) query_response = self.client.query_points( collection_name=mt_collection, query=vectors[0], prefetch=prefetch_query, limit=limit, ) get_result = self._result_to_get_result(query_response.points) return SearchResult( ids=get_result.ids, documents=get_result.documents, metadatas=get_result.metadatas, # qdrant distance is [-1, 1], normalize to [0, 1] distances=[ [(point.score + 1.0) / 2.0 for point in query_response.points] ], ) except (UnexpectedResponse, grpc.RpcError) as e: if self._is_collection_not_found_error(e): log.debug( f"Collection {mt_collection} doesn't exist, search returns None" ) return None else: # For other API errors, log and re-raise _, error_msg = self._extract_error_message(e) log.warning(f"Unexpected Qdrant error during search: {error_msg}") raise except Exception as e: # For non-Qdrant exceptions, log and return None log.exception(f"Error searching collection '{collection_name}': {e}") return None def query(self, collection_name: str, filter: dict, limit: Optional[int] = None): """ Query points with filters and tenant isolation. """ if not self.client: return None # Map to multi-tenant collection and tenant ID mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name) # Set default limit if not provided if limit is None: limit = NO_LIMIT # Create tenant filter tenant_filter = models.FieldCondition( key="tenant_id", match=models.MatchValue(value=tenant_id) ) # Create metadata filters field_conditions = [] for key, value in filter.items(): field_conditions.append( models.FieldCondition( key=f"metadata.{key}", match=models.MatchValue(value=value) ) ) # Combine tenant filter with metadata filters combined_filter = models.Filter(must=[tenant_filter, *field_conditions]) try: # Try the query directly - most of the time collection should exist points = self.client.query_points( collection_name=mt_collection, query_filter=combined_filter, limit=limit, ) return self._result_to_get_result(points.points) except (UnexpectedResponse, grpc.RpcError) as e: if self._is_collection_not_found_error(e): log.debug( f"Collection {mt_collection} doesn't exist, query returns None" ) return None else: # For other API errors, log and re-raise _, error_msg = self._extract_error_message(e) log.warning(f"Unexpected Qdrant error during query: {error_msg}") raise except Exception as e: # For non-Qdrant exceptions, log and re-raise log.exception(f"Error querying collection '{collection_name}': {e}") return None def get(self, collection_name: str) -> Optional[GetResult]: """ Get all items in a collection with tenant isolation. """ if not self.client: return None # Map to multi-tenant collection and tenant ID mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name) # Create tenant filter tenant_filter = models.FieldCondition( key="tenant_id", match=models.MatchValue(value=tenant_id) ) try: # Try to get points directly - most of the time collection should exist points = self.client.query_points( collection_name=mt_collection, query_filter=models.Filter(must=[tenant_filter]), limit=NO_LIMIT, ) return self._result_to_get_result(points.points) except (UnexpectedResponse, grpc.RpcError) as e: if self._is_collection_not_found_error(e): log.debug(f"Collection {mt_collection} doesn't exist, get returns None") return None else: # For other API errors, log and re-raise _, error_msg = self._extract_error_message(e) log.warning(f"Unexpected Qdrant error during get: {error_msg}") raise except Exception as e: # For non-Qdrant exceptions, log and return None log.exception(f"Error getting collection '{collection_name}': {e}") return None def _handle_operation_with_error_retry( self, operation_name, mt_collection, points, dimension ): """ Private helper to handle common error cases for insert and upsert operations. Args: operation_name: 'insert' or 'upsert' mt_collection: The multi-tenant collection name points: The vector points to insert/upsert dimension: The dimension of the vectors Returns: The operation result (for upsert) or None (for insert) """ try: if operation_name == "insert": self.client.upload_points(mt_collection, points) return None else: # upsert return self.client.upsert(mt_collection, points) except (UnexpectedResponse, grpc.RpcError) as e: # Handle collection not found if self._is_collection_not_found_error(e): log.info( f"Collection {mt_collection} doesn't exist. Creating it with dimension {dimension}." ) # Create collection with correct dimensions from our vectors self._create_multi_tenant_collection_if_not_exists( mt_collection_name=mt_collection, dimension=dimension ) # Try operation again - no need for dimension adjustment since we just created with correct dimensions if operation_name == "insert": self.client.upload_points(mt_collection, points) return None else: # upsert return self.client.upsert(mt_collection, points) # Handle dimension mismatch elif self._is_dimension_mismatch_error(e): # For dimension errors, the collection must exist, so get its configuration mt_collection_info = self.client.get_collection(mt_collection) existing_size = mt_collection_info.config.params.vectors.size log.info( f"Dimension mismatch: Collection {mt_collection} expects {existing_size}, got {dimension}" ) if existing_size < dimension: # Truncate vectors to fit log.info( f"Truncating vectors from {dimension} to {existing_size} dimensions" ) points = [ PointStruct( id=point.id, vector=point.vector[:existing_size], payload=point.payload, ) for point in points ] elif existing_size > dimension: # Pad vectors with zeros log.info( f"Padding vectors from {dimension} to {existing_size} dimensions with zeros" ) points = [ PointStruct( id=point.id, vector=point.vector + [0] * (existing_size - len(point.vector)), payload=point.payload, ) for point in points ] # Try operation again with adjusted dimensions if operation_name == "insert": self.client.upload_points(mt_collection, points) return None else: # upsert return self.client.upsert(mt_collection, points) else: # Not a known error we can handle, log and re-raise _, error_msg = self._extract_error_message(e) log.warning(f"Unhandled Qdrant error: {error_msg}") raise except Exception as e: # For non-Qdrant exceptions, re-raise raise def insert(self, collection_name: str, items: list[VectorItem]): """ Insert items with tenant ID. """ if not self.client or not items: return None # Map to multi-tenant collection and tenant ID mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name) # Get dimensions from the actual vectors dimension = len(items[0]["vector"]) if items else None # Create points with tenant ID points = self._create_points(items, tenant_id) # Handle the operation with error retry return self._handle_operation_with_error_retry( "insert", mt_collection, points, dimension ) def upsert(self, collection_name: str, items: list[VectorItem]): """ Upsert items with tenant ID. """ if not self.client or not items: return None # Map to multi-tenant collection and tenant ID mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name) # Get dimensions from the actual vectors dimension = len(items[0]["vector"]) if items else None # Create points with tenant ID points = self._create_points(items, tenant_id) # Handle the operation with error retry return self._handle_operation_with_error_retry( "upsert", mt_collection, points, dimension ) def reset(self): """ Reset the database by deleting all collections. """ if not self.client: return None collection_names = self.client.get_collections().collections for collection_name in collection_names: if collection_name.name.startswith(self.collection_prefix): self.client.delete_collection(collection_name=collection_name.name) def delete_collection(self, collection_name: str): """ Delete a collection. """ if not self.client: return None # Map to multi-tenant collection and tenant ID mt_collection, tenant_id = self._get_collection_and_tenant_id(collection_name) tenant_filter = models.FieldCondition( key="tenant_id", match=models.MatchValue(value=tenant_id) ) field_conditions = [tenant_filter] update_result = self.client.delete( collection_name=mt_collection, points_selector=models.FilterSelector( filter=models.Filter(must=field_conditions) ), ) if self.client.get_collection(mt_collection).points_count == 0: self.client.delete_collection(mt_collection) return update_result