darabos commited on
Commit
21e70fe
·
1 Parent(s): ee35ea7

Load ops from .py files in the data directory.

Browse files
examples/word2vec.py ADDED
@@ -0,0 +1,22 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from lynxkite.core.ops import op
2
+ import staticvectors
3
+ import pandas as pd
4
+
5
+
6
+ @op("LynxKite Graph Analytics", "Word2vec for the top 1000 words", cache=True)
7
+ def word2vec_1000():
8
+ model = staticvectors.StaticVectors("neuml/word2vec-quantized")
9
+ with open("wordlist.txt") as f:
10
+ words = [w.strip() for w in f.read().strip().split("\n")]
11
+ df = pd.DataFrame(
12
+ {
13
+ "word": words,
14
+ "embedding": model.embeddings(words).tolist(),
15
+ }
16
+ )
17
+ return df
18
+
19
+
20
+ @op("LynxKite Graph Analytics", "Take first N")
21
+ def first_n(df: pd.DataFrame, *, n=10):
22
+ return df.head(n)
lynxkite-app/src/lynxkite_app/crdt.py CHANGED
@@ -233,6 +233,7 @@ async def execute(name: str, ws_crdt: pycrdt.Map, ws_pyd: workspace.Workspace, d
233
  assert path.is_relative_to(cwd), "Provided workspace path is invalid"
234
  # Save user changes before executing, in case the execution fails.
235
  workspace.save(ws_pyd, path)
 
236
  ws_pyd._crdt = ws_crdt
237
  with ws_crdt.doc.transaction():
238
  for nc, np in zip(ws_crdt["nodes"], ws_pyd.nodes):
 
233
  assert path.is_relative_to(cwd), "Provided workspace path is invalid"
234
  # Save user changes before executing, in case the execution fails.
235
  workspace.save(ws_pyd, path)
236
+ ops.load_user_scripts(name)
237
  ws_pyd._crdt = ws_crdt
238
  with ws_crdt.doc.transaction():
239
  for nc, np in zip(ws_crdt["nodes"], ws_pyd.nodes):
lynxkite-app/src/lynxkite_app/main.py CHANGED
@@ -26,6 +26,7 @@ def detect_plugins():
26
 
27
 
28
  lynxkite_plugins = detect_plugins()
 
29
 
30
  app = fastapi.FastAPI(lifespan=crdt.lifespan)
31
  app.include_router(crdt.router)
@@ -33,7 +34,8 @@ app.add_middleware(GZipMiddleware)
33
 
34
 
35
  @app.get("/api/catalog")
36
- def get_catalog():
 
37
  return {k: {op.name: op.model_dump() for op in v.values()} for k, v in ops.CATALOGS.items()}
38
 
39
 
 
26
 
27
 
28
  lynxkite_plugins = detect_plugins()
29
+ ops.save_catalogs("plugins loaded")
30
 
31
  app = fastapi.FastAPI(lifespan=crdt.lifespan)
32
  app.include_router(crdt.router)
 
34
 
35
 
36
  @app.get("/api/catalog")
37
+ def get_catalog(workspace: str):
38
+ ops.load_user_scripts(workspace)
39
  return {k: {op.name: op.model_dump() for op in v.values()} for k, v in ops.CATALOGS.items()}
40
 
41
 
lynxkite-app/web/src/workspace/Workspace.tsx CHANGED
@@ -151,7 +151,7 @@ function LynxKiteFlow() {
151
 
152
  const fetcher: Fetcher<Catalogs> = (resource: string, init?: RequestInit) =>
153
  fetch(resource, init).then((res) => res.json());
154
- const catalog = useSWR("/api/catalog", fetcher);
155
  const [suppressSearchUntil, setSuppressSearchUntil] = useState(0);
156
  const [nodeSearchSettings, setNodeSearchSettings] = useState(
157
  undefined as
 
151
 
152
  const fetcher: Fetcher<Catalogs> = (resource: string, init?: RequestInit) =>
153
  fetch(resource, init).then((res) => res.json());
154
+ const catalog = useSWR(`/api/catalog?workspace=${path}`, fetcher);
155
  const [suppressSearchUntil, setSuppressSearchUntil] = useState(0);
156
  const [nodeSearchSettings, setNodeSearchSettings] = useState(
157
  undefined as
lynxkite-core/src/lynxkite/core/ops.py CHANGED
@@ -4,7 +4,11 @@ from __future__ import annotations
4
  import asyncio
5
  import enum
6
  import functools
 
7
  import inspect
 
 
 
8
  import types
9
  import pydantic
10
  import typing
@@ -14,8 +18,11 @@ from typing_extensions import Annotated
14
  if typing.TYPE_CHECKING:
15
  from . import workspace
16
 
17
- CATALOGS: dict[str, dict[str, "Op"]] = {}
 
 
18
  EXECUTORS = {}
 
19
 
20
  typeof = type # We have some arguments called "type".
21
 
@@ -189,10 +196,12 @@ class Op(BaseConfig):
189
  return res
190
 
191
 
192
- def op(env: str, name: str, *, view="basic", outputs=None, params=None):
193
  """Decorator for defining an operation."""
194
 
195
  def decorator(func):
 
 
196
  sig = inspect.signature(func)
197
  # Positional arguments are inputs.
198
  inputs = {
@@ -308,3 +317,41 @@ def slow(func):
308
  return await asyncio.to_thread(func, *args, **kwargs)
309
 
310
  return wrapper
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4
  import asyncio
5
  import enum
6
  import functools
7
+ import importlib
8
  import inspect
9
+ import pathlib
10
+ import traceback
11
+ import joblib
12
  import types
13
  import pydantic
14
  import typing
 
18
  if typing.TYPE_CHECKING:
19
  from . import workspace
20
 
21
+ Catalog = dict[str, "Op"]
22
+ Catalogs = dict[str, Catalog]
23
+ CATALOGS: Catalogs = {}
24
  EXECUTORS = {}
25
+ mem = joblib.Memory(".joblib-cache")
26
 
27
  typeof = type # We have some arguments called "type".
28
 
 
196
  return res
197
 
198
 
199
+ def op(env: str, name: str, *, view="basic", outputs=None, params=None, cache=False):
200
  """Decorator for defining an operation."""
201
 
202
  def decorator(func):
203
+ if cache:
204
+ func = mem.cache(func)
205
  sig = inspect.signature(func)
206
  # Positional arguments are inputs.
207
  inputs = {
 
317
  return await asyncio.to_thread(func, *args, **kwargs)
318
 
319
  return wrapper
320
+
321
+
322
+ CATALOGS_SNAPSHOTS: dict[str, Catalogs] = {}
323
+
324
+
325
+ def save_catalogs(snapshot_name: str):
326
+ CATALOGS_SNAPSHOTS[snapshot_name] = {k: dict(v) for k, v in CATALOGS.items()}
327
+
328
+
329
+ def load_catalogs(snapshot_name: str):
330
+ global CATALOGS
331
+ snap = CATALOGS_SNAPSHOTS[snapshot_name]
332
+ CATALOGS = {k: dict(v) for k, v in snap.items()}
333
+
334
+
335
+ def load_user_scripts(workspace: str):
336
+ """Reloads the *.py in the workspace's directory and higher-level directories."""
337
+ if "plugins loaded" in CATALOGS_SNAPSHOTS:
338
+ load_catalogs("plugins loaded")
339
+ cwd = pathlib.Path()
340
+ path = cwd / workspace
341
+ assert path.is_relative_to(cwd), "Provided workspace path is invalid"
342
+ for p in path.parents:
343
+ print("checking user scripts in", p)
344
+ for f in p.glob("*.py"):
345
+ try:
346
+ run_user_script(f)
347
+ except Exception:
348
+ traceback.print_exc()
349
+ if p == cwd:
350
+ break
351
+
352
+
353
+ def run_user_script(script_path: pathlib.Path):
354
+ print(f"Running {script_path}...")
355
+ spec = importlib.util.spec_from_file_location(script_path.stem, str(script_path))
356
+ module = importlib.util.module_from_spec(spec)
357
+ spec.loader.exec_module(module)