File size: 2,363 Bytes
45e4c73
 
b600751
 
 
 
 
 
 
45e4c73
b600751
 
45e4c73
b600751
 
45e4c73
b600751
 
 
 
45e4c73
b600751
 
 
 
 
28c40a9
 
b600751
45e4c73
b600751
 
 
 
 
45e4c73
b600751
 
 
 
 
9497543
 
b600751
45e4c73
b600751
45e4c73
b600751
 
 
 
eee9365
75c875f
eee9365
b600751
 
 
 
47eb7cc
 
45e4c73
 
 
b600751
 
 
 
 
 
 
 
 
abb1488
 
 
 
 
 
75c875f
abb1488
 
 
 
 
 
 
45e4c73
abb1488
 
28c40a9
45e4c73
abb1488
 
45e4c73
abb1488
b600751
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
"""For working with LynxKite workspaces."""

from typing import Optional
import dataclasses
import os
import pydantic
import tempfile
from . import ops


class BaseConfig(pydantic.BaseModel):
    model_config = pydantic.ConfigDict(
        extra="allow",
    )


class Position(BaseConfig):
    x: float
    y: float


class WorkspaceNodeData(BaseConfig):
    title: str
    params: dict
    display: Optional[object] = None
    error: Optional[str] = None
    # Also contains a "meta" field when going out.
    # This is ignored when coming back from the frontend.


class WorkspaceNode(BaseConfig):
    id: str
    type: str
    data: WorkspaceNodeData
    position: Position


class WorkspaceEdge(BaseConfig):
    id: str
    source: str
    target: str
    sourceHandle: str
    targetHandle: str


class Workspace(BaseConfig):
    env: str = ""
    nodes: list[WorkspaceNode] = dataclasses.field(default_factory=list)
    edges: list[WorkspaceEdge] = dataclasses.field(default_factory=list)


async def execute(ws: Workspace):
    if ws.env in ops.EXECUTORS:
        await ops.EXECUTORS[ws.env](ws)


def save(ws: Workspace, path: str):
    j = ws.model_dump_json(indent=2)
    dirname, basename = os.path.split(path)
    # Create temp file in the same directory to make sure it's on the same filesystem.
    with tempfile.NamedTemporaryFile(
        "w", prefix=f".{basename}.", dir=dirname, delete_on_close=False
    ) as f:
        f.write(j)
        f.close()
        os.replace(f.name, path)


def load(path: str):
    with open(path) as f:
        j = f.read()
    ws = Workspace.model_validate_json(j)
    # Metadata is added after loading. This way code changes take effect on old boxes too.
    _update_metadata(ws)
    return ws


def _update_metadata(ws):
    catalog = ops.CATALOGS.get(ws.env, {})
    nodes = {node.id: node for node in ws.nodes}
    done = set()
    while len(done) < len(nodes):
        for node in ws.nodes:
            if node.id in done:
                continue
            data = node.data
            op = catalog.get(data.title)
            if op:
                data.meta = op
                node.type = op.type
                if data.error == "Unknown operation.":
                    data.error = None
            else:
                data.error = "Unknown operation."
            done.add(node.id)
    return ws