chroma / chromadb /ingest /impl /pulsar_admin.py
badalsahani's picture
feat: chroma initial deploy
287a0bc
raw
history blame
3.41 kB
# A thin wrapper around the pulsar admin api
import requests
from chromadb.config import System
from chromadb.ingest.impl.utils import parse_topic_name
class PulsarAdmin:
"""A thin wrapper around the pulsar admin api, only used for interim development towards distributed chroma.
This functionality will be moved to the chroma coordinator."""
_connection_str: str
def __init__(self, system: System):
pulsar_host = system.settings.require("pulsar_broker_url")
pulsar_port = system.settings.require("pulsar_admin_port")
self._connection_str = f"http://{pulsar_host}:{pulsar_port}"
# Create the default tenant and namespace
# This is a temporary workaround until we have a proper tenant/namespace management system
self.create_tenant("default")
self.create_namespace("default", "default")
def create_tenant(self, tenant: str) -> None:
"""Make a PUT request to the admin api to create the tenant"""
path = f"/admin/v2/tenants/{tenant}"
url = self._connection_str + path
response = requests.put(
url, json={"allowedClusters": ["standalone"], "adminRoles": []}
) # TODO: how to manage clusters?
if response.status_code != 204 and response.status_code != 409:
raise RuntimeError(f"Failed to create tenant {tenant}")
def create_namespace(self, tenant: str, namespace: str) -> None:
"""Make a PUT request to the admin api to create the namespace"""
path = f"/admin/v2/namespaces/{tenant}/{namespace}"
url = self._connection_str + path
response = requests.put(url)
if response.status_code != 204 and response.status_code != 409:
raise RuntimeError(f"Failed to create namespace {namespace}")
def create_topic(self, topic: str) -> None:
# TODO: support non-persistent topics?
tenant, namespace, topic_name = parse_topic_name(topic)
if tenant != "default":
raise ValueError(f"Only the default tenant is supported, got {tenant}")
if namespace != "default":
raise ValueError(
f"Only the default namespace is supported, got {namespace}"
)
# Make a PUT request to the admin api to create the topic
path = f"/admin/v2/persistent/{tenant}/{namespace}/{topic_name}"
url = self._connection_str + path
response = requests.put(url)
if response.status_code != 204 and response.status_code != 409:
raise RuntimeError(f"Failed to create topic {topic_name}")
def delete_topic(self, topic: str) -> None:
tenant, namespace, topic_name = parse_topic_name(topic)
if tenant != "default":
raise ValueError(f"Only the default tenant is supported, got {tenant}")
if namespace != "default":
raise ValueError(
f"Only the default namespace is supported, got {namespace}"
)
# Make a PUT request to the admin api to delete the topic
path = f"/admin/v2/persistent/{tenant}/{namespace}/{topic_name}"
# Force delete the topic
path += "?force=true"
url = self._connection_str + path
response = requests.delete(url)
if response.status_code != 204 and response.status_code != 409:
raise RuntimeError(f"Failed to delete topic {topic_name}")