badalsahani's picture
feat: chroma initial deploy
287a0bc
raw
history blame
2.44 kB
from abc import abstractmethod
from typing import Any, Callable, List
from overrides import EnforceOverrides, overrides
from chromadb.config import Component, System
from chromadb.types import Segment
class SegmentDirectory(Component):
"""A segment directory is a data interface that manages the location of segments. Concretely, this
means that for clustered chroma, it provides the grpc endpoint for a segment."""
@abstractmethod
def get_segment_endpoint(self, segment: Segment) -> str:
"""Return the segment residence for a given segment ID"""
@abstractmethod
def register_updated_segment_callback(
self, callback: Callable[[Segment], None]
) -> None:
"""Register a callback that will be called when a segment is updated"""
pass
Memberlist = List[str]
class MemberlistProvider(Component, EnforceOverrides):
"""Returns the latest memberlist and provdes a callback for when it changes. This
callback may be called from a different thread than the one that called. Callers should ensure
that they are thread-safe."""
callbacks: List[Callable[[Memberlist], Any]]
def __init__(self, system: System):
self.callbacks = []
super().__init__(system)
@abstractmethod
def get_memberlist(self) -> Memberlist:
"""Returns the latest memberlist"""
pass
@abstractmethod
def set_memberlist_name(self, memberlist: str) -> None:
"""Sets the memberlist that this provider will watch"""
pass
@overrides
def stop(self) -> None:
"""Stops watching the memberlist"""
self.callbacks = []
def register_updated_memberlist_callback(
self, callback: Callable[[Memberlist], Any]
) -> None:
"""Registers a callback that will be called when the memberlist changes. May be called many times
with the same memberlist, so callers should be idempotent. May be called from a different thread.
"""
self.callbacks.append(callback)
def unregister_updated_memberlist_callback(
self, callback: Callable[[Memberlist], Any]
) -> bool:
"""Unregisters a callback that was previously registered. Returns True if the callback was
successfully unregistered, False if it was not ever registered."""
if callback in self.callbacks:
self.callbacks.remove(callback)
return True
return False