darabos commited on
Commit
4345cd9
·
unverified ·
2 Parent(s): 493a64c 4dce97e

Merge pull request #208 from biggraph/darabos-k8s

Browse files
lynxkite-bio/src/lynxkite_bio/__init__.py CHANGED
@@ -1,4 +1,5 @@
1
  """An expansion for `lynxkite-graph-analytics` that provides algorithms for biological applications."""
2
 
 
3
  from . import nims # noqa (imported to trigger registration)
4
  from . import rdkit # noqa (imported to trigger registration)
 
1
  """An expansion for `lynxkite-graph-analytics` that provides algorithms for biological applications."""
2
 
3
+ from . import llm # noqa (imported to trigger registration)
4
  from . import nims # noqa (imported to trigger registration)
5
  from . import rdkit # noqa (imported to trigger registration)
lynxkite-bio/src/lynxkite_bio/k8s.py ADDED
@@ -0,0 +1,272 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Tools for starting and stopping Docker containers on Kubernetes.
2
+
3
+ A test setup for this feature:
4
+
5
+ ```bash
6
+ # Start minikube with GPU support:
7
+ minikube start --driver docker --container-runtime docker --gpus all
8
+ # Make the services accessible:
9
+ minikube tunnel
10
+ ```
11
+
12
+ Use `k8s.needs()` to declare a Kubernetes dependency for an operation. For example:
13
+
14
+ ```python
15
+ @op("Ask LLM", slow=True)
16
+ @k8s.needs(
17
+ name="vllm-for-ask-llm-op",
18
+ image="vllm/vllm-openai:latest",
19
+ port=8000,
20
+ args=["--model", "google/gemma-3-1b-it"],
21
+ health_probe="/health",
22
+ env=k8s.env_vars("HUGGING_FACE_HUB_TOKEN"),
23
+ storage_path="/root/.cache/huggingface",
24
+ storage_size="10Gi",
25
+ )
26
+ def ask_llm(df: pd.DataFrame, *, question: ops.LongStr):
27
+ ip = k8s.get_ip("vllm-for-ask-llm-op")
28
+ client = openai.OpenAI(api_key="EMPTY", base_url=f"http://{ip}/v1")
29
+ # ...
30
+ ```
31
+ """
32
+
33
+ import functools
34
+ import os
35
+ import queue
36
+ import threading
37
+ import time
38
+ import httpx
39
+ from kubernetes import client, config
40
+ from kubernetes.client.rest import ApiException
41
+
42
+ config.load_kube_config()
43
+
44
+
45
+ def _run(
46
+ *,
47
+ name,
48
+ image,
49
+ port,
50
+ namespace,
51
+ storage_size,
52
+ storage_path,
53
+ health_probe,
54
+ **kwargs,
55
+ ):
56
+ print(f"Starting {name} in namespace {namespace}...")
57
+ volume_mounts = []
58
+ volumes = []
59
+ if storage_size:
60
+ pvc_name = f"{name}-data-volume"
61
+ if not _pvc_exists(pvc_name, namespace):
62
+ _create_pvc(pvc_name, size=storage_size, namespace=namespace)
63
+ volume_mounts.append(
64
+ client.V1VolumeMount(
65
+ name=pvc_name,
66
+ mount_path=storage_path,
67
+ )
68
+ )
69
+ volumes.append(
70
+ client.V1Volume(
71
+ name=pvc_name,
72
+ persistent_volume_claim=client.V1PersistentVolumeClaimVolumeSource(
73
+ claim_name=pvc_name,
74
+ ),
75
+ )
76
+ )
77
+ container = client.V1Container(
78
+ name=name,
79
+ image=image,
80
+ ports=[client.V1ContainerPort(container_port=port)],
81
+ volume_mounts=volume_mounts,
82
+ **kwargs,
83
+ )
84
+ if health_probe:
85
+ container.readiness_probe = client.V1Probe(
86
+ http_get=client.V1HTTPGetAction(path=health_probe, port=port, scheme="HTTP"),
87
+ )
88
+ deployment = client.V1Deployment(
89
+ metadata=client.V1ObjectMeta(name=name),
90
+ spec=client.V1DeploymentSpec(
91
+ replicas=1,
92
+ selector=client.V1LabelSelector(match_labels={"app": name}),
93
+ template=client.V1PodTemplateSpec(
94
+ metadata=client.V1ObjectMeta(labels={"app": name}),
95
+ spec=client.V1PodSpec(
96
+ volumes=volumes,
97
+ containers=[container],
98
+ ),
99
+ ),
100
+ ),
101
+ )
102
+ apps_v1 = client.AppsV1Api()
103
+ apps_v1.create_namespaced_deployment(namespace=namespace, body=deployment)
104
+
105
+ service_name = f"{name}-service"
106
+ service = client.V1Service(
107
+ metadata=client.V1ObjectMeta(name=service_name, labels={"app": name}),
108
+ spec=client.V1ServiceSpec(
109
+ selector={"app": name},
110
+ ports=[client.V1ServicePort(protocol="TCP", port=80, target_port=port)],
111
+ type="LoadBalancer",
112
+ ),
113
+ )
114
+ core_v1 = client.CoreV1Api()
115
+ core_v1.create_namespaced_service(namespace=namespace, body=service)
116
+
117
+
118
+ def _stop(name, namespace="default"):
119
+ print(f"Stopping {name} in namespace {namespace}...")
120
+ apps_v1 = client.AppsV1Api()
121
+ apps_v1.delete_namespaced_deployment(name, namespace)
122
+ service_name = f"{name}-service"
123
+ core_v1 = client.CoreV1Api()
124
+ core_v1.delete_namespaced_service(service_name, namespace)
125
+
126
+
127
+ def get_ip(name: str, namespace: str = "default", timeout: int = 3600, interval: int = 1) -> str:
128
+ """Look up the IP address where the operation can access the service."""
129
+ service_name = f"{name}-service"
130
+ core_v1 = client.CoreV1Api()
131
+ end_time = time.time() + timeout
132
+ while time.time() < end_time:
133
+ try:
134
+ svc = core_v1.read_namespaced_service(service_name, namespace)
135
+ ingress = svc.status.load_balancer.ingress
136
+ if ingress:
137
+ ip = ingress[0].ip or ingress[0].hostname
138
+ if ip:
139
+ if _can_connect(ip):
140
+ return ip
141
+ except ApiException as e:
142
+ if e.status != 404:
143
+ raise
144
+ time.sleep(interval)
145
+ raise TimeoutError(f"Timed out waiting for external IP of service '{service_name}'")
146
+
147
+
148
+ def _can_connect(ip: str) -> bool:
149
+ try:
150
+ httpx.get(f"http://{ip}/")
151
+ return True
152
+ except httpx.RequestError:
153
+ return False
154
+
155
+
156
+ def _is_running(name: str, namespace: str = "default") -> bool:
157
+ apps_v1 = client.AppsV1Api()
158
+ try:
159
+ apps_v1.read_namespaced_deployment(name, namespace)
160
+ return True
161
+ except ApiException as e:
162
+ if e.status == 404:
163
+ return False
164
+ else:
165
+ raise
166
+
167
+
168
+ def _stop_if_running(name, namespace="default"):
169
+ if _is_running(name, namespace):
170
+ _stop(name, namespace)
171
+
172
+
173
+ def _create_pvc(name, size="1Gi", namespace="default"):
174
+ core_v1 = client.CoreV1Api()
175
+ pvc = client.V1PersistentVolumeClaim(
176
+ metadata=client.V1ObjectMeta(name=name),
177
+ spec=client.V1PersistentVolumeClaimSpec(
178
+ access_modes=["ReadWriteOnce"],
179
+ resources=client.V1ResourceRequirements(requests={"storage": size}),
180
+ ),
181
+ )
182
+ core_v1.create_namespaced_persistent_volume_claim(namespace=namespace, body=pvc)
183
+
184
+
185
+ def _pvc_exists(name: str, namespace: str = "default") -> bool:
186
+ core_v1 = client.CoreV1Api()
187
+ try:
188
+ core_v1.read_namespaced_persistent_volume_claim(name=name, namespace=namespace)
189
+ return True
190
+ except ApiException as e:
191
+ if e.status == 404:
192
+ return False
193
+ else:
194
+ raise
195
+
196
+
197
+ def env_vars(*names: str):
198
+ """A convenient way to pass local environment variables to the microservice."""
199
+ return [{"name": name, "value": os.environ[name]} for name in names]
200
+
201
+
202
+ def needs(
203
+ name: str,
204
+ image: str,
205
+ port: int,
206
+ args: list = None,
207
+ env: list = None,
208
+ health_probe: str = None,
209
+ storage_size: str = None,
210
+ storage_path: str = "/data",
211
+ namespace: str = "default",
212
+ ):
213
+ """Use this decorator to configure a microservice that the operation depends on.
214
+ LynxKite will manage the lifecycle of the microservice for you.
215
+ """
216
+
217
+ def decorator(func):
218
+ @functools.wraps(func)
219
+ def wrapper(*func_args, **func_kwargs):
220
+ _using(
221
+ name=name,
222
+ image=image,
223
+ port=port,
224
+ args=args or [],
225
+ env=env or [],
226
+ health_probe=health_probe,
227
+ storage_size=storage_size,
228
+ storage_path=storage_path,
229
+ namespace=namespace,
230
+ )
231
+ try:
232
+ return func(*func_args, **func_kwargs)
233
+ finally:
234
+ _stop_using(name, namespace)
235
+
236
+ return wrapper
237
+
238
+ return decorator
239
+
240
+
241
+ _USER_COUNTERS = {}
242
+
243
+
244
+ def _using(name, **kwargs):
245
+ q = _USER_COUNTERS.setdefault(name, queue.Queue(-1))
246
+ q.put(1)
247
+ try:
248
+ if not _is_running(name):
249
+ _run(name=name, **kwargs)
250
+ except Exception as e:
251
+ q.get()
252
+ raise e
253
+
254
+
255
+ def _stop_using(name, namespace):
256
+ q = _USER_COUNTERS[name]
257
+ q.get()
258
+ if q.empty():
259
+ _stop_later(name, namespace)
260
+
261
+
262
+ def _stop_later(name, namespace):
263
+ q = _USER_COUNTERS[name]
264
+
265
+ def stop():
266
+ time.sleep(6000)
267
+ if q.empty():
268
+ # Nobody started the service in the meantime.
269
+ _stop(name, namespace)
270
+
271
+ t = threading.Thread(target=stop, daemon=True)
272
+ t.start()
lynxkite-bio/src/lynxkite_bio/llm.py ADDED
@@ -0,0 +1,55 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """LLM on Kubernetes, for use in bio projects or otherwise.
2
+
3
+ The BioNeMo NIMs are large. This module provides a small LLM that can be used
4
+ for demonstrating the Kubernetes lifecycle management without huge hardware
5
+ requirements.
6
+ """
7
+
8
+ import openai
9
+ import pandas as pd
10
+ from lynxkite.core import ops
11
+
12
+ from . import k8s
13
+
14
+ ENV = "LynxKite Graph Analytics"
15
+ op = ops.op_registration(ENV)
16
+
17
+
18
+ @op("Ask LLM", slow=True)
19
+ @k8s.needs(
20
+ name="lynxkite-bio-small-llm",
21
+ image="vllm/vllm-openai:latest",
22
+ port=8000,
23
+ args=["--model", "google/gemma-3-1b-it"],
24
+ health_probe="/health",
25
+ env=k8s.env_vars("HUGGING_FACE_HUB_TOKEN"),
26
+ storage_path="/root/.cache/huggingface",
27
+ storage_size="10Gi",
28
+ )
29
+ def ask_llm(df: pd.DataFrame, *, question: ops.LongStr, include_columns="<all>"):
30
+ if not question:
31
+ return df
32
+ ip = k8s.get_ip("lynxkite-bio-small-llm")
33
+ print(f"LLM is running at {ip}")
34
+ client = openai.OpenAI(api_key="EMPTY", base_url=f"http://{ip}/v1")
35
+ responses = []
36
+ for row in df.iterrows():
37
+ data = row[1].to_dict()
38
+ if include_columns != "<all>":
39
+ data = {k: v for k, v in data.items() if k in include_columns}
40
+ prompt = (
41
+ f"Answer the question based on the following data:\n\n{data}\n\nQuestion: {question}"
42
+ )
43
+ response = client.chat.completions.create(
44
+ model="google/gemma-3-1b-it",
45
+ messages=[
46
+ {
47
+ "role": "user",
48
+ "content": prompt,
49
+ },
50
+ ],
51
+ )
52
+ responses.append(response.choices[0].message.content)
53
+ df = df.copy()
54
+ df["response"] = responses
55
+ return df
lynxkite-bio/src/lynxkite_bio/nims.py CHANGED
@@ -6,6 +6,8 @@ import httpx
6
  import pandas as pd
7
  import os
8
 
 
 
9
 
10
  ENV = "LynxKite Graph Analytics"
11
  op = ops.op_registration(ENV)
@@ -131,7 +133,30 @@ def view_molecule(
131
  }
132
 
133
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
134
  @op("Query GenMol", slow=True)
 
135
  async def query_genmol(
136
  bundle: Bundle,
137
  *,
@@ -146,7 +171,7 @@ async def query_genmol(
146
  bundle = bundle.copy()
147
 
148
  response = await query_bionemo_nim(
149
- url="https://health.api.nvidia.com/v1/biology/nvidia/genmol/generate",
150
  payload={
151
  "smiles": bundle.dfs[molecule_table][molecule_column].iloc[0],
152
  "num_molecules": num_molecules,
 
6
  import pandas as pd
7
  import os
8
 
9
+ from . import k8s
10
+
11
 
12
  ENV = "LynxKite Graph Analytics"
13
  op = ops.op_registration(ENV)
 
133
  }
134
 
135
 
136
+ def _needs_bionemo_k8s(**k8s_kwargs):
137
+ if USE_K8S:
138
+ return k8s.needs(**k8s_kwargs)
139
+ else:
140
+ return lambda func: func
141
+
142
+
143
+ def base_url(service):
144
+ if USE_K8S:
145
+ return f"http://{k8s.get_ip(service)}/"
146
+ else:
147
+ return "https://health.api.nvidia.com/"
148
+
149
+
150
+ USE_K8S = False # Not production ready yet.
151
+ needs_genmol_k8s = _needs_bionemo_k8s(
152
+ name="genmol",
153
+ image="nvcr.io/nim/nvidia/genmol:1.0.0",
154
+ port=8000,
155
+ )
156
+
157
+
158
  @op("Query GenMol", slow=True)
159
+ @needs_genmol_k8s
160
  async def query_genmol(
161
  bundle: Bundle,
162
  *,
 
171
  bundle = bundle.copy()
172
 
173
  response = await query_bionemo_nim(
174
+ url=f"{base_url('genmol')}v1/biology/nvidia/genmol/generate",
175
  payload={
176
  "smiles": bundle.dfs[molecule_table][molecule_column].iloc[0],
177
  "num_molecules": num_molecules,