from functools import wraps from enum import Enum from typing import Any, Callable, Dict, Optional, Sequence, Union from opentelemetry import trace from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import ( BatchSpanProcessor, ) from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from chromadb.config import Component from chromadb.config import System class OpenTelemetryGranularity(Enum): """The granularity of the OpenTelemetry spans.""" NONE = "none" """No spans are emitted.""" OPERATION = "operation" """Spans are emitted for each operation.""" OPERATION_AND_SEGMENT = "operation_and_segment" """Spans are emitted for each operation and segment.""" ALL = "all" """Spans are emitted for almost every method call.""" # Greater is more restrictive. So "all" < "operation" (and everything else), # "none" > everything. def __lt__(self, other: Any) -> bool: """Compare two granularities.""" order = [ OpenTelemetryGranularity.ALL, OpenTelemetryGranularity.OPERATION_AND_SEGMENT, OpenTelemetryGranularity.OPERATION, OpenTelemetryGranularity.NONE, ] return order.index(self) < order.index(other) class OpenTelemetryClient(Component): def __init__(self, system: System): super().__init__(system) otel_init( system.settings.chroma_otel_service_name, system.settings.chroma_otel_collection_endpoint, system.settings.chroma_otel_collection_headers, OpenTelemetryGranularity( system.settings.chroma_otel_granularity if system.settings.chroma_otel_granularity else "none" ), ) tracer: Optional[trace.Tracer] = None granularity: OpenTelemetryGranularity = OpenTelemetryGranularity("none") def otel_init( otel_service_name: Optional[str], otel_collection_endpoint: Optional[str], otel_collection_headers: Optional[Dict[str, str]], otel_granularity: OpenTelemetryGranularity, ) -> None: """Initializes module-level state for OpenTelemetry. Parameters match the environment variables which configure OTel as documented at https://docs.trychroma.com/observability. - otel_service_name: The name of the service for OTel tagging and aggregation. - otel_collection_endpoint: The endpoint to which OTel spans are sent (e.g. api.honeycomb.com). - otel_collection_headers: The headers to send with OTel spans (e.g. {"x-honeycomb-team": "abc123"}). - otel_granularity: The granularity of the spans to emit. """ if otel_granularity == OpenTelemetryGranularity.NONE: return resource = Resource(attributes={SERVICE_NAME: str(otel_service_name)}) provider = TracerProvider(resource=resource) provider.add_span_processor( BatchSpanProcessor( # TODO: we may eventually want to make this configurable. OTLPSpanExporter( endpoint=str(otel_collection_endpoint), headers=otel_collection_headers, ) ) ) trace.set_tracer_provider(provider) global tracer, granularity tracer = trace.get_tracer(__name__) granularity = otel_granularity def trace_method( trace_name: str, trace_granularity: OpenTelemetryGranularity, attributes: Optional[ Dict[ str, Union[ str, bool, float, int, Sequence[str], Sequence[bool], Sequence[float], Sequence[int], ], ] ] = None, ) -> Callable[[Callable[..., Any]], Callable[..., Any]]: """A decorator that traces a method.""" def decorator(f: Callable[..., Any]) -> Callable[..., Any]: @wraps(f) def wrapper(*args: Any, **kwargs: Dict[Any, Any]) -> Any: global tracer, granularity if trace_granularity < granularity: return f(*args, **kwargs) if not tracer: return f(*args, **kwargs) with tracer.start_as_current_span(trace_name, attributes=attributes): return f(*args, **kwargs) return wrapper return decorator def add_attributes_to_current_span( attributes: Dict[ str, Union[ str, bool, float, int, Sequence[str], Sequence[bool], Sequence[float], Sequence[int], ], ] ) -> None: """Add attributes to the current span.""" global tracer, granularity if granularity == OpenTelemetryGranularity.NONE: return if not tracer: return span = trace.get_current_span() span.set_attributes(attributes)