Spaces:
Running
Running
File size: 2,786 Bytes
287a0bc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 |
from abc import abstractmethod
import os
from typing import ClassVar, Dict, Any
import uuid
import chromadb
from chromadb.config import Component
from pathlib import Path
from enum import Enum
TELEMETRY_WHITELISTED_SETTINGS = [
"chroma_api_impl",
"is_persistent",
"chroma_server_ssl_enabled",
]
class ServerContext(Enum):
NONE = "None"
FASTAPI = "FastAPI"
class ProductTelemetryEvent:
max_batch_size: ClassVar[int] = 1
batch_size: int
def __init__(self, batch_size: int = 1):
self.batch_size = batch_size
@property
def properties(self) -> Dict[str, Any]:
return self.__dict__
@property
def name(self) -> str:
return self.__class__.__name__
# A batch key is used to determine whether two events can be batched together.
# If a TelemetryEvent's max_batch_size > 1, batch_key() and batch() MUST be
# implemented.
# Otherwise they are ignored.
@property
def batch_key(self) -> str:
return self.name
def batch(self, other: "ProductTelemetryEvent") -> "ProductTelemetryEvent":
raise NotImplementedError
class ProductTelemetryClient(Component):
USER_ID_PATH = str(Path.home() / ".cache" / "chroma" / "telemetry_user_id")
UNKNOWN_USER_ID = "UNKNOWN"
SERVER_CONTEXT: ServerContext = ServerContext.NONE
_curr_user_id = None
@abstractmethod
def capture(self, event: ProductTelemetryEvent) -> None:
pass
@property
def context(self) -> Dict[str, Any]:
chroma_version = chromadb.__version__
settings = chromadb.get_settings()
telemetry_settings = {}
for whitelisted in TELEMETRY_WHITELISTED_SETTINGS:
telemetry_settings[whitelisted] = settings[whitelisted]
self._context = {
"chroma_version": chroma_version,
"server_context": self.SERVER_CONTEXT.value,
**telemetry_settings,
}
return self._context
@property
def user_id(self) -> str:
if self._curr_user_id:
return self._curr_user_id
# File access may fail due to permissions or other reasons. We don't want to
# crash so we catch all exceptions.
try:
if not os.path.exists(self.USER_ID_PATH):
os.makedirs(os.path.dirname(self.USER_ID_PATH), exist_ok=True)
with open(self.USER_ID_PATH, "w") as f:
new_user_id = str(uuid.uuid4())
f.write(new_user_id)
self._curr_user_id = new_user_id
else:
with open(self.USER_ID_PATH, "r") as f:
self._curr_user_id = f.read()
except Exception:
self._curr_user_id = self.UNKNOWN_USER_ID
return self._curr_user_id
|