Spaces:
Running
Running
Merge pull request #143 from biggraph/feature/task-solver-box
Browse files
lynxkite-app/src/lynxkite_app/crdt.py
CHANGED
@@ -273,6 +273,7 @@ async def execute(name: str, ws_crdt: pycrdt.Map, ws_pyd: workspace.Workspace, d
|
|
273 |
nc["data"]["status"] = "planned"
|
274 |
# Nodes get a reference to their CRDT maps, so they can update them as the results come in.
|
275 |
np._crdt = nc
|
|
|
276 |
await workspace.execute(ws_pyd)
|
277 |
workspace.save(ws_pyd, path)
|
278 |
print(f"Finished running {name} in {ws_pyd.env}.")
|
|
|
273 |
nc["data"]["status"] = "planned"
|
274 |
# Nodes get a reference to their CRDT maps, so they can update them as the results come in.
|
275 |
np._crdt = nc
|
276 |
+
ws_pyd = ws_pyd.normalize()
|
277 |
await workspace.execute(ws_pyd)
|
278 |
workspace.save(ws_pyd, path)
|
279 |
print(f"Finished running {name} in {ws_pyd.env}.")
|
lynxkite-core/src/lynxkite/core/executors/simple.py
ADDED
@@ -0,0 +1,64 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
"""A LynxKite executor that simply passes the output of one box to the other."""
|
2 |
+
|
3 |
+
import os
|
4 |
+
from .. import ops
|
5 |
+
from .. import workspace
|
6 |
+
import traceback
|
7 |
+
import inspect
|
8 |
+
import graphlib
|
9 |
+
|
10 |
+
|
11 |
+
def register(env: str):
|
12 |
+
"""Registers the one-by-one executor."""
|
13 |
+
ops.EXECUTORS[env] = lambda ws: execute(ws, ops.CATALOGS[env])
|
14 |
+
|
15 |
+
|
16 |
+
async def await_if_needed(obj):
|
17 |
+
if inspect.isawaitable(obj):
|
18 |
+
return await obj
|
19 |
+
return obj
|
20 |
+
|
21 |
+
|
22 |
+
async def execute(ws: workspace.Workspace, catalog: ops.Catalog):
|
23 |
+
nodes = {n.id: n for n in ws.nodes}
|
24 |
+
dependencies = {n: [] for n in nodes}
|
25 |
+
in_edges = {n: {} for n in nodes}
|
26 |
+
for e in ws.edges:
|
27 |
+
dependencies[e.target].append(e.source)
|
28 |
+
assert e.targetHandle not in in_edges[e.target], f"Duplicate input for {e.target}"
|
29 |
+
in_edges[e.target][e.targetHandle] = e.source, e.sourceHandle
|
30 |
+
outputs = {}
|
31 |
+
ts = graphlib.TopologicalSorter(dependencies)
|
32 |
+
for node_id in ts.static_order():
|
33 |
+
node = nodes[node_id]
|
34 |
+
op = catalog[node.data.title]
|
35 |
+
params = {**node.data.params}
|
36 |
+
node.publish_started()
|
37 |
+
try:
|
38 |
+
inputs = []
|
39 |
+
missing = []
|
40 |
+
for i in op.inputs.values():
|
41 |
+
edges = in_edges[node_id]
|
42 |
+
if i.name in edges and edges[i.name] in outputs:
|
43 |
+
inputs.append(outputs[edges[i.name]])
|
44 |
+
else:
|
45 |
+
missing.append(i.name)
|
46 |
+
if missing:
|
47 |
+
node.publish_error(f"Missing input: {', '.join(missing)}")
|
48 |
+
continue
|
49 |
+
result = op(*inputs, **params)
|
50 |
+
result.output = await await_if_needed(result.output)
|
51 |
+
result.display = await await_if_needed(result.display)
|
52 |
+
if len(op.outputs) == 1:
|
53 |
+
[output] = list(op.outputs.values())
|
54 |
+
outputs[node_id, output.name] = result.output
|
55 |
+
elif len(op.outputs) > 1:
|
56 |
+
assert type(result.output) is dict, "An op with multiple outputs must return a dict"
|
57 |
+
for output in op.outputs.values():
|
58 |
+
outputs[node_id, output.name] = result.output[output.name]
|
59 |
+
node.publish_result(result)
|
60 |
+
except Exception as e:
|
61 |
+
if not os.environ.get("LYNXKITE_SUPPRESS_OP_ERRORS"):
|
62 |
+
traceback.print_exc()
|
63 |
+
node.publish_error(e)
|
64 |
+
return outputs
|
lynxkite-core/src/lynxkite/core/ops.py
CHANGED
@@ -144,6 +144,7 @@ def _param_to_type(name, value, type):
|
|
144 |
assert value != "", f"{name} is unset."
|
145 |
return float(value)
|
146 |
if isinstance(type, enum.EnumMeta):
|
|
|
147 |
return type[value]
|
148 |
if isinstance(type, types.UnionType):
|
149 |
match type.__args__:
|
|
|
144 |
assert value != "", f"{name} is unset."
|
145 |
return float(value)
|
146 |
if isinstance(type, enum.EnumMeta):
|
147 |
+
assert value in type.__members__, f"{value} is not an option for {name}."
|
148 |
return type[value]
|
149 |
if isinstance(type, types.UnionType):
|
150 |
match type.__args__:
|
lynxkite-core/src/lynxkite/core/workspace.py
CHANGED
@@ -97,6 +97,25 @@ class Workspace(BaseConfig):
|
|
97 |
edges: list[WorkspaceEdge] = dataclasses.field(default_factory=list)
|
98 |
_crdt: pycrdt.Map
|
99 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
100 |
|
101 |
async def execute(ws: Workspace):
|
102 |
if ws.env in ops.EXECUTORS:
|
|
|
97 |
edges: list[WorkspaceEdge] = dataclasses.field(default_factory=list)
|
98 |
_crdt: pycrdt.Map
|
99 |
|
100 |
+
def normalize(self):
|
101 |
+
if self.env not in ops.CATALOGS:
|
102 |
+
return self
|
103 |
+
catalog = ops.CATALOGS[self.env]
|
104 |
+
_ops = {n.id: catalog[n.data.title] for n in self.nodes if n.data.title in catalog}
|
105 |
+
valid_targets = set(
|
106 |
+
(n.id, h) for n in self.nodes for h in _ops[n.id].inputs if n.id in _ops
|
107 |
+
)
|
108 |
+
valid_sources = set(
|
109 |
+
(n.id, h) for n in self.nodes for h in _ops[n.id].outputs if n.id in _ops
|
110 |
+
)
|
111 |
+
edges = [
|
112 |
+
edge
|
113 |
+
for edge in self.edges
|
114 |
+
if (edge.source, edge.sourceHandle) in valid_sources
|
115 |
+
and (edge.target, edge.targetHandle) in valid_targets
|
116 |
+
]
|
117 |
+
return self.model_copy(update={"edges": edges})
|
118 |
+
|
119 |
|
120 |
async def execute(ws: Workspace):
|
121 |
if ws.env in ops.EXECUTORS:
|
lynxkite-pillow-example/src/lynxkite_pillow_example/__init__.py
CHANGED
@@ -1,7 +1,7 @@
|
|
1 |
"""Demo for how easily we can provide a UI for popular open-source tools."""
|
2 |
|
3 |
from lynxkite.core import ops
|
4 |
-
from lynxkite.core.executors import
|
5 |
from PIL import Image, ImageFilter
|
6 |
import base64
|
7 |
import fsspec
|
@@ -9,7 +9,7 @@ import io
|
|
9 |
|
10 |
ENV = "Pillow"
|
11 |
op = ops.op_registration(ENV)
|
12 |
-
|
13 |
|
14 |
|
15 |
@op("Open image")
|
|
|
1 |
"""Demo for how easily we can provide a UI for popular open-source tools."""
|
2 |
|
3 |
from lynxkite.core import ops
|
4 |
+
from lynxkite.core.executors import simple
|
5 |
from PIL import Image, ImageFilter
|
6 |
import base64
|
7 |
import fsspec
|
|
|
9 |
|
10 |
ENV = "Pillow"
|
11 |
op = ops.op_registration(ENV)
|
12 |
+
simple.register(ENV)
|
13 |
|
14 |
|
15 |
@op("Open image")
|