Spaces:
Running
Running
File size: 4,774 Bytes
e1e5b1c 2af956d e1e5b1c 2af956d aa9e014 e1e5b1c 2af956d e1e5b1c aa9e014 e1e5b1c aa9e014 e1e5b1c aa9e014 e1e5b1c aa9e014 e1e5b1c b750748 e1e5b1c b750748 e1e5b1c b750748 e1e5b1c cb83a99 aa9e014 cb83a99 aa9e014 cb83a99 aa9e014 cb83a99 8fed4c9 e1e5b1c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
import pandas as pd
import pytest
import networkx as nx
from lynxkite.core import workspace, ops
from lynxkite_graph_analytics.core import Bundle, execute, ENV
async def test_execute_operation_not_in_catalog():
ws = workspace.Workspace(env=ENV)
ws.add_node(
id="1",
type="node_type",
data=workspace.WorkspaceNodeData(title="Non existing op", params={}),
position=workspace.Position(x=0, y=0),
)
await execute(ws)
assert ws.nodes[0].data.error == "Operation not found in catalog"
async def test_execute_operation_inputs_correct_cast():
# Test that the automatic casting of operation inputs works correctly.
op = ops.op_registration("test")
@op("Create Bundle")
def create_bundle() -> Bundle:
df = pd.DataFrame({"source": [1, 2, 3], "target": [4, 5, 6]})
return Bundle(dfs={"edges": df})
@op("Bundle to Graph")
def bundle_to_graph(graph: nx.Graph) -> nx.Graph:
return graph
@op("Graph to Bundle")
def graph_to_bundle(bundle: Bundle) -> pd.DataFrame:
return list(bundle.dfs.values())[0]
@op("Dataframe to Bundle")
def dataframe_to_bundle(bundle: Bundle) -> Bundle:
return bundle
ws = workspace.Workspace(env="test")
ws.add_node(
id="1",
type="node_type",
data=workspace.WorkspaceNodeData(title="Create Bundle", params={}),
position=workspace.Position(x=0, y=0),
)
ws.add_node(
id="2",
type="node_type",
data=workspace.WorkspaceNodeData(title="Bundle to Graph", params={}),
position=workspace.Position(x=100, y=0),
)
ws.add_node(
id="3",
type="node_type",
data=workspace.WorkspaceNodeData(title="Graph to Bundle", params={}),
position=workspace.Position(x=200, y=0),
)
ws.add_node(
id="4",
type="node_type",
data=workspace.WorkspaceNodeData(title="Dataframe to Bundle", params={}),
position=workspace.Position(x=300, y=0),
)
ws.edges = [
workspace.WorkspaceEdge(
id="1", source="1", target="2", sourceHandle="output", targetHandle="graph"
),
workspace.WorkspaceEdge(
id="2", source="2", target="3", sourceHandle="output", targetHandle="bundle"
),
workspace.WorkspaceEdge(
id="3", source="3", target="4", sourceHandle="output", targetHandle="bundle"
),
]
await execute(ws)
assert all([node.data.error is None for node in ws.nodes])
async def test_multiple_inputs():
"""Make sure each input goes to the right argument."""
op = ops.op_registration("test")
@op("One")
def one():
return 1
@op("Two")
def two():
return 2
@op("Smaller?", view="visualization")
def is_smaller(a, b):
return a < b
ws = workspace.Workspace(env="test")
ws.add_node(
id="one",
type="cool",
data=workspace.WorkspaceNodeData(title="One", params={}),
position=workspace.Position(x=0, y=0),
)
ws.add_node(
id="two",
type="cool",
data=workspace.WorkspaceNodeData(title="Two", params={}),
position=workspace.Position(x=100, y=0),
)
ws.add_node(
id="smaller",
type="cool",
data=workspace.WorkspaceNodeData(title="Smaller?", params={}),
position=workspace.Position(x=200, y=0),
)
ws.edges = [
workspace.WorkspaceEdge(
id="one",
source="one",
target="smaller",
sourceHandle="output",
targetHandle="a",
),
workspace.WorkspaceEdge(
id="two",
source="two",
target="smaller",
sourceHandle="output",
targetHandle="b",
),
]
await execute(ws)
assert ws.nodes[-1].data.display is True
# Flip the inputs.
ws.edges[0].targetHandle = "b"
ws.edges[1].targetHandle = "a"
await execute(ws)
assert ws.nodes[-1].data.display is False
async def test_optional_inputs():
@ops.op("test", "one")
def one():
return 1
@ops.op("test", "maybe add")
def maybe_add(a: int, b: int | None = None):
return a + (b or 0)
assert maybe_add.__op__.inputs == [
ops.Input(name="a", type=int, position="left"),
ops.Input(name="b", type=int | None, position="left"),
]
ws = workspace.Workspace(env="test", nodes=[], edges=[])
a = ws.add_node(one)
b = ws.add_node(maybe_add)
await execute(ws)
assert b.data.error == "Missing input: a"
ws.add_edge(a, "output", b, "a")
outputs = await execute(ws)
assert outputs[b.id, "output"] == 1
if __name__ == "__main__":
pytest.main()
|