Spaces:
Running
Running
File size: 4,998 Bytes
287a0bc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 |
from functools import wraps
from enum import Enum
from typing import Any, Callable, Dict, Optional, Sequence, Union
from opentelemetry import trace
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
)
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from chromadb.config import Component
from chromadb.config import System
class OpenTelemetryGranularity(Enum):
"""The granularity of the OpenTelemetry spans."""
NONE = "none"
"""No spans are emitted."""
OPERATION = "operation"
"""Spans are emitted for each operation."""
OPERATION_AND_SEGMENT = "operation_and_segment"
"""Spans are emitted for each operation and segment."""
ALL = "all"
"""Spans are emitted for almost every method call."""
# Greater is more restrictive. So "all" < "operation" (and everything else),
# "none" > everything.
def __lt__(self, other: Any) -> bool:
"""Compare two granularities."""
order = [
OpenTelemetryGranularity.ALL,
OpenTelemetryGranularity.OPERATION_AND_SEGMENT,
OpenTelemetryGranularity.OPERATION,
OpenTelemetryGranularity.NONE,
]
return order.index(self) < order.index(other)
class OpenTelemetryClient(Component):
def __init__(self, system: System):
super().__init__(system)
otel_init(
system.settings.chroma_otel_service_name,
system.settings.chroma_otel_collection_endpoint,
system.settings.chroma_otel_collection_headers,
OpenTelemetryGranularity(
system.settings.chroma_otel_granularity
if system.settings.chroma_otel_granularity
else "none"
),
)
tracer: Optional[trace.Tracer] = None
granularity: OpenTelemetryGranularity = OpenTelemetryGranularity("none")
def otel_init(
otel_service_name: Optional[str],
otel_collection_endpoint: Optional[str],
otel_collection_headers: Optional[Dict[str, str]],
otel_granularity: OpenTelemetryGranularity,
) -> None:
"""Initializes module-level state for OpenTelemetry.
Parameters match the environment variables which configure OTel as documented
at https://docs.trychroma.com/observability.
- otel_service_name: The name of the service for OTel tagging and aggregation.
- otel_collection_endpoint: The endpoint to which OTel spans are sent
(e.g. api.honeycomb.com).
- otel_collection_headers: The headers to send with OTel spans
(e.g. {"x-honeycomb-team": "abc123"}).
- otel_granularity: The granularity of the spans to emit.
"""
if otel_granularity == OpenTelemetryGranularity.NONE:
return
resource = Resource(attributes={SERVICE_NAME: str(otel_service_name)})
provider = TracerProvider(resource=resource)
provider.add_span_processor(
BatchSpanProcessor(
# TODO: we may eventually want to make this configurable.
OTLPSpanExporter(
endpoint=str(otel_collection_endpoint),
headers=otel_collection_headers,
)
)
)
trace.set_tracer_provider(provider)
global tracer, granularity
tracer = trace.get_tracer(__name__)
granularity = otel_granularity
def trace_method(
trace_name: str,
trace_granularity: OpenTelemetryGranularity,
attributes: Optional[
Dict[
str,
Union[
str,
bool,
float,
int,
Sequence[str],
Sequence[bool],
Sequence[float],
Sequence[int],
],
]
] = None,
) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
"""A decorator that traces a method."""
def decorator(f: Callable[..., Any]) -> Callable[..., Any]:
@wraps(f)
def wrapper(*args: Any, **kwargs: Dict[Any, Any]) -> Any:
global tracer, granularity
if trace_granularity < granularity:
return f(*args, **kwargs)
if not tracer:
return f(*args, **kwargs)
with tracer.start_as_current_span(trace_name, attributes=attributes):
return f(*args, **kwargs)
return wrapper
return decorator
def add_attributes_to_current_span(
attributes: Dict[
str,
Union[
str,
bool,
float,
int,
Sequence[str],
Sequence[bool],
Sequence[float],
Sequence[int],
],
]
) -> None:
"""Add attributes to the current span."""
global tracer, granularity
if granularity == OpenTelemetryGranularity.NONE:
return
if not tracer:
return
span = trace.get_current_span()
span.set_attributes(attributes)
|