badalsahani's picture
feat: chroma initial deploy
287a0bc
from functools import wraps
from enum import Enum
from typing import Any, Callable, Dict, Optional, Sequence, Union
from opentelemetry import trace
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
)
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from chromadb.config import Component
from chromadb.config import System
class OpenTelemetryGranularity(Enum):
"""The granularity of the OpenTelemetry spans."""
NONE = "none"
"""No spans are emitted."""
OPERATION = "operation"
"""Spans are emitted for each operation."""
OPERATION_AND_SEGMENT = "operation_and_segment"
"""Spans are emitted for each operation and segment."""
ALL = "all"
"""Spans are emitted for almost every method call."""
# Greater is more restrictive. So "all" < "operation" (and everything else),
# "none" > everything.
def __lt__(self, other: Any) -> bool:
"""Compare two granularities."""
order = [
OpenTelemetryGranularity.ALL,
OpenTelemetryGranularity.OPERATION_AND_SEGMENT,
OpenTelemetryGranularity.OPERATION,
OpenTelemetryGranularity.NONE,
]
return order.index(self) < order.index(other)
class OpenTelemetryClient(Component):
def __init__(self, system: System):
super().__init__(system)
otel_init(
system.settings.chroma_otel_service_name,
system.settings.chroma_otel_collection_endpoint,
system.settings.chroma_otel_collection_headers,
OpenTelemetryGranularity(
system.settings.chroma_otel_granularity
if system.settings.chroma_otel_granularity
else "none"
),
)
tracer: Optional[trace.Tracer] = None
granularity: OpenTelemetryGranularity = OpenTelemetryGranularity("none")
def otel_init(
otel_service_name: Optional[str],
otel_collection_endpoint: Optional[str],
otel_collection_headers: Optional[Dict[str, str]],
otel_granularity: OpenTelemetryGranularity,
) -> None:
"""Initializes module-level state for OpenTelemetry.
Parameters match the environment variables which configure OTel as documented
at https://docs.trychroma.com/observability.
- otel_service_name: The name of the service for OTel tagging and aggregation.
- otel_collection_endpoint: The endpoint to which OTel spans are sent
(e.g. api.honeycomb.com).
- otel_collection_headers: The headers to send with OTel spans
(e.g. {"x-honeycomb-team": "abc123"}).
- otel_granularity: The granularity of the spans to emit.
"""
if otel_granularity == OpenTelemetryGranularity.NONE:
return
resource = Resource(attributes={SERVICE_NAME: str(otel_service_name)})
provider = TracerProvider(resource=resource)
provider.add_span_processor(
BatchSpanProcessor(
# TODO: we may eventually want to make this configurable.
OTLPSpanExporter(
endpoint=str(otel_collection_endpoint),
headers=otel_collection_headers,
)
)
)
trace.set_tracer_provider(provider)
global tracer, granularity
tracer = trace.get_tracer(__name__)
granularity = otel_granularity
def trace_method(
trace_name: str,
trace_granularity: OpenTelemetryGranularity,
attributes: Optional[
Dict[
str,
Union[
str,
bool,
float,
int,
Sequence[str],
Sequence[bool],
Sequence[float],
Sequence[int],
],
]
] = None,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""A decorator that traces a method."""
def decorator(f: Callable[..., Any]) -> Callable[..., Any]:
@wraps(f)
def wrapper(*args: Any, **kwargs: Dict[Any, Any]) -> Any:
global tracer, granularity
if trace_granularity < granularity:
return f(*args, **kwargs)
if not tracer:
return f(*args, **kwargs)
with tracer.start_as_current_span(trace_name, attributes=attributes):
return f(*args, **kwargs)
return wrapper
return decorator
def add_attributes_to_current_span(
attributes: Dict[
str,
Union[
str,
bool,
float,
int,
Sequence[str],
Sequence[bool],
Sequence[float],
Sequence[int],
],
]
) -> None:
"""Add attributes to the current span."""
global tracer, granularity
if granularity == OpenTelemetryGranularity.NONE:
return
if not tracer:
return
span = trace.get_current_span()
span.set_attributes(attributes)