Spaces:
Running
Running
Progress indicator in one_by_one.py.
Browse files
lynxkite-app/src/lynxkite_app/crdt.py
CHANGED
|
@@ -197,6 +197,7 @@ async def workspace_changed(name: str, changes: pycrdt.MapEvent, ws_crdt: pycrdt
|
|
| 197 |
getattr(change, "keys", {}).get("__execution_delay", {}).get("newValue", 0)
|
| 198 |
for change in changes
|
| 199 |
)
|
|
|
|
| 200 |
if delay:
|
| 201 |
task = asyncio.create_task(execute(name, ws_crdt, ws_pyd, delay))
|
| 202 |
delayed_executions[name] = task
|
|
|
|
| 197 |
getattr(change, "keys", {}).get("__execution_delay", {}).get("newValue", 0)
|
| 198 |
for change in changes
|
| 199 |
)
|
| 200 |
+
print(f"Running {name} in {ws_pyd.env}...")
|
| 201 |
if delay:
|
| 202 |
task = asyncio.create_task(execute(name, ws_crdt, ws_pyd, delay))
|
| 203 |
delayed_executions[name] = task
|
lynxkite-core/src/lynxkite/core/executors/one_by_one.py
CHANGED
|
@@ -104,11 +104,11 @@ async def execute(ws: workspace.Workspace, catalog, cache=None):
|
|
| 104 |
tasks = {}
|
| 105 |
NO_INPUT = object() # Marker for initial tasks.
|
| 106 |
for node in ws.nodes:
|
| 107 |
-
node.data.error = None
|
| 108 |
op = catalog.get(node.data.title)
|
| 109 |
if op is None:
|
| 110 |
-
node.
|
| 111 |
continue
|
|
|
|
| 112 |
# Start tasks for nodes that have no non-batch inputs.
|
| 113 |
if all([i.position in "top or bottom" for i in op.inputs.values()]):
|
| 114 |
tasks[node.id] = [NO_INPUT]
|
|
@@ -123,12 +123,12 @@ async def execute(ws: workspace.Workspace, catalog, cache=None):
|
|
| 123 |
next_stage.setdefault(n, []).extend(ts)
|
| 124 |
continue
|
| 125 |
node = nodes[n]
|
| 126 |
-
|
| 127 |
-
|
| 128 |
-
params = {**data.params}
|
| 129 |
if has_ctx(op):
|
| 130 |
params["_ctx"] = contexts[node.id]
|
| 131 |
results = []
|
|
|
|
| 132 |
for task in ts:
|
| 133 |
try:
|
| 134 |
inputs = []
|
|
@@ -146,11 +146,12 @@ async def execute(ws: workspace.Workspace, catalog, cache=None):
|
|
| 146 |
cache[key] = output
|
| 147 |
output = cache[key]
|
| 148 |
else:
|
|
|
|
| 149 |
result = op(*inputs, **params)
|
| 150 |
output = await await_if_needed(result.output)
|
| 151 |
except Exception as e:
|
| 152 |
traceback.print_exc()
|
| 153 |
-
|
| 154 |
break
|
| 155 |
contexts[node.id].last_result = output
|
| 156 |
# Returned lists and DataFrames are considered multiple tasks.
|
|
@@ -161,7 +162,7 @@ async def execute(ws: workspace.Workspace, catalog, cache=None):
|
|
| 161 |
results.extend(output)
|
| 162 |
else: # Finished all tasks without errors.
|
| 163 |
if result.display:
|
| 164 |
-
|
| 165 |
for edge in edges[node.id]:
|
| 166 |
t = nodes[edge.target]
|
| 167 |
op = catalog[t.data.title]
|
|
@@ -172,5 +173,6 @@ async def execute(ws: workspace.Workspace, catalog, cache=None):
|
|
| 172 |
).extend(results)
|
| 173 |
else:
|
| 174 |
tasks.setdefault(edge.target, []).extend(results)
|
|
|
|
| 175 |
tasks = next_stage
|
| 176 |
return contexts
|
|
|
|
| 104 |
tasks = {}
|
| 105 |
NO_INPUT = object() # Marker for initial tasks.
|
| 106 |
for node in ws.nodes:
|
|
|
|
| 107 |
op = catalog.get(node.data.title)
|
| 108 |
if op is None:
|
| 109 |
+
node.publish_error(f'Operation "{node.data.title}" not found.')
|
| 110 |
continue
|
| 111 |
+
node.publish_error(None)
|
| 112 |
# Start tasks for nodes that have no non-batch inputs.
|
| 113 |
if all([i.position in "top or bottom" for i in op.inputs.values()]):
|
| 114 |
tasks[node.id] = [NO_INPUT]
|
|
|
|
| 123 |
next_stage.setdefault(n, []).extend(ts)
|
| 124 |
continue
|
| 125 |
node = nodes[n]
|
| 126 |
+
op = catalog[node.data.title]
|
| 127 |
+
params = {**node.data.params}
|
|
|
|
| 128 |
if has_ctx(op):
|
| 129 |
params["_ctx"] = contexts[node.id]
|
| 130 |
results = []
|
| 131 |
+
node.publish_started()
|
| 132 |
for task in ts:
|
| 133 |
try:
|
| 134 |
inputs = []
|
|
|
|
| 146 |
cache[key] = output
|
| 147 |
output = cache[key]
|
| 148 |
else:
|
| 149 |
+
op.publish_started()
|
| 150 |
result = op(*inputs, **params)
|
| 151 |
output = await await_if_needed(result.output)
|
| 152 |
except Exception as e:
|
| 153 |
traceback.print_exc()
|
| 154 |
+
node.publish_error(e)
|
| 155 |
break
|
| 156 |
contexts[node.id].last_result = output
|
| 157 |
# Returned lists and DataFrames are considered multiple tasks.
|
|
|
|
| 162 |
results.extend(output)
|
| 163 |
else: # Finished all tasks without errors.
|
| 164 |
if result.display:
|
| 165 |
+
result.display = await await_if_needed(result.display)
|
| 166 |
for edge in edges[node.id]:
|
| 167 |
t = nodes[edge.target]
|
| 168 |
op = catalog[t.data.title]
|
|
|
|
| 173 |
).extend(results)
|
| 174 |
else:
|
| 175 |
tasks.setdefault(edge.target, []).extend(results)
|
| 176 |
+
op.publish_result(result)
|
| 177 |
tasks = next_stage
|
| 178 |
return contexts
|
lynxkite-core/src/lynxkite/core/ops.py
CHANGED
|
@@ -9,6 +9,9 @@ import typing
|
|
| 9 |
from dataclasses import dataclass
|
| 10 |
from typing_extensions import Annotated
|
| 11 |
|
|
|
|
|
|
|
|
|
|
| 12 |
CATALOGS = {}
|
| 13 |
EXECUTORS = {}
|
| 14 |
|
|
@@ -233,9 +236,15 @@ def register_passive_op(env: str, name: str, inputs=[], outputs=["output"], para
|
|
| 233 |
|
| 234 |
|
| 235 |
def register_executor(env: str):
|
| 236 |
-
"""Decorator for registering an executor.
|
| 237 |
|
| 238 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 239 |
EXECUTORS[env] = func
|
| 240 |
return func
|
| 241 |
|
|
|
|
| 9 |
from dataclasses import dataclass
|
| 10 |
from typing_extensions import Annotated
|
| 11 |
|
| 12 |
+
if typing.TYPE_CHECKING:
|
| 13 |
+
from . import workspace
|
| 14 |
+
|
| 15 |
CATALOGS = {}
|
| 16 |
EXECUTORS = {}
|
| 17 |
|
|
|
|
| 236 |
|
| 237 |
|
| 238 |
def register_executor(env: str):
|
| 239 |
+
"""Decorator for registering an executor.
|
| 240 |
|
| 241 |
+
The executor is a function that takes a workspace and executes the operations in it.
|
| 242 |
+
When it starts executing an operation, it should call `node.publish_started()` to indicate
|
| 243 |
+
the status on the UI. When the execution is finished, it should call `node.publish_result()`.
|
| 244 |
+
This will update the UI with the result of the operation.
|
| 245 |
+
"""
|
| 246 |
+
|
| 247 |
+
def decorator(func: typing.Callable[[workspace.Workspace], typing.Any]):
|
| 248 |
EXECUTORS[env] = func
|
| 249 |
return func
|
| 250 |
|
lynxkite-core/src/lynxkite/core/workspace.py
CHANGED
|
@@ -49,9 +49,12 @@ class WorkspaceNode(BaseConfig):
|
|
| 49 |
|
| 50 |
def publish_started(self):
|
| 51 |
"""Notifies the frontend that work has started on this node."""
|
|
|
|
| 52 |
self.data.status = NodeStatus.active
|
| 53 |
if hasattr(self, "_crdt"):
|
| 54 |
-
self._crdt
|
|
|
|
|
|
|
| 55 |
|
| 56 |
def publish_result(self, result: ops.Result):
|
| 57 |
"""Sends the result to the frontend. Call this in an executor when the result is available."""
|
|
@@ -64,8 +67,10 @@ class WorkspaceNode(BaseConfig):
|
|
| 64 |
self._crdt["data"]["error"] = result.error
|
| 65 |
self._crdt["data"]["status"] = NodeStatus.done
|
| 66 |
|
| 67 |
-
def publish_error(self, error: Exception | str):
|
| 68 |
-
|
|
|
|
|
|
|
| 69 |
|
| 70 |
|
| 71 |
class WorkspaceEdge(BaseConfig):
|
|
|
|
| 49 |
|
| 50 |
def publish_started(self):
|
| 51 |
"""Notifies the frontend that work has started on this node."""
|
| 52 |
+
self.data.error = None
|
| 53 |
self.data.status = NodeStatus.active
|
| 54 |
if hasattr(self, "_crdt"):
|
| 55 |
+
with self._crdt.doc.transaction():
|
| 56 |
+
self._crdt["data"]["error"] = None
|
| 57 |
+
self._crdt["data"]["status"] = NodeStatus.active
|
| 58 |
|
| 59 |
def publish_result(self, result: ops.Result):
|
| 60 |
"""Sends the result to the frontend. Call this in an executor when the result is available."""
|
|
|
|
| 67 |
self._crdt["data"]["error"] = result.error
|
| 68 |
self._crdt["data"]["status"] = NodeStatus.done
|
| 69 |
|
| 70 |
+
def publish_error(self, error: Exception | str | None):
|
| 71 |
+
"""Can be called with None to clear the error state."""
|
| 72 |
+
result = ops.Result(error=str(error) if error else None)
|
| 73 |
+
self.publish_result(result)
|
| 74 |
|
| 75 |
|
| 76 |
class WorkspaceEdge(BaseConfig):
|