File size: 3,405 Bytes
287a0bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
# A thin wrapper around the pulsar admin api
import requests
from chromadb.config import System
from chromadb.ingest.impl.utils import parse_topic_name


class PulsarAdmin:
    """A thin wrapper around the pulsar admin api, only used for interim development towards distributed chroma.
    This functionality will be moved to the chroma coordinator."""

    _connection_str: str

    def __init__(self, system: System):
        pulsar_host = system.settings.require("pulsar_broker_url")
        pulsar_port = system.settings.require("pulsar_admin_port")
        self._connection_str = f"http://{pulsar_host}:{pulsar_port}"

        # Create the default tenant and namespace
        # This is a temporary workaround until we have a proper tenant/namespace management system
        self.create_tenant("default")
        self.create_namespace("default", "default")

    def create_tenant(self, tenant: str) -> None:
        """Make a PUT request to the admin api to create the tenant"""

        path = f"/admin/v2/tenants/{tenant}"
        url = self._connection_str + path
        response = requests.put(
            url, json={"allowedClusters": ["standalone"], "adminRoles": []}
        )  # TODO: how to manage clusters?

        if response.status_code != 204 and response.status_code != 409:
            raise RuntimeError(f"Failed to create tenant {tenant}")

    def create_namespace(self, tenant: str, namespace: str) -> None:
        """Make a PUT request to the admin api to create the namespace"""

        path = f"/admin/v2/namespaces/{tenant}/{namespace}"
        url = self._connection_str + path
        response = requests.put(url)

        if response.status_code != 204 and response.status_code != 409:
            raise RuntimeError(f"Failed to create namespace {namespace}")

    def create_topic(self, topic: str) -> None:
        # TODO: support non-persistent topics?
        tenant, namespace, topic_name = parse_topic_name(topic)

        if tenant != "default":
            raise ValueError(f"Only the default tenant is supported, got {tenant}")
        if namespace != "default":
            raise ValueError(
                f"Only the default namespace is supported, got {namespace}"
            )

        # Make a PUT request to the admin api to create the topic
        path = f"/admin/v2/persistent/{tenant}/{namespace}/{topic_name}"
        url = self._connection_str + path
        response = requests.put(url)

        if response.status_code != 204 and response.status_code != 409:
            raise RuntimeError(f"Failed to create topic {topic_name}")

    def delete_topic(self, topic: str) -> None:
        tenant, namespace, topic_name = parse_topic_name(topic)

        if tenant != "default":
            raise ValueError(f"Only the default tenant is supported, got {tenant}")
        if namespace != "default":
            raise ValueError(
                f"Only the default namespace is supported, got {namespace}"
            )

        # Make a PUT request to the admin api to delete the topic
        path = f"/admin/v2/persistent/{tenant}/{namespace}/{topic_name}"
        # Force delete the topic
        path += "?force=true"
        url = self._connection_str + path
        response = requests.delete(url)
        if response.status_code != 204 and response.status_code != 409:
            raise RuntimeError(f"Failed to delete topic {topic_name}")