darabos commited on
Commit
2bdf2ca
·
unverified ·
2 Parent(s): 1c78316 7516093

Merge pull request #189 from biggraph/darabos-optional-inputs

Browse files
lynxkite-core/pyproject.toml CHANGED
@@ -11,3 +11,6 @@ dependencies = [
11
  dev = [
12
  "pytest",
13
  ]
 
 
 
 
11
  dev = [
12
  "pytest",
13
  ]
14
+
15
+ [tool.pytest.ini_options]
16
+ asyncio_mode = "auto"
lynxkite-core/src/lynxkite/core/executors/one_by_one.py CHANGED
@@ -131,12 +131,22 @@ async def execute(ws: workspace.Workspace, catalog: ops.Catalog, cache=None):
131
  for task in ts:
132
  try:
133
  inputs = []
 
134
  for i in op.inputs:
135
  if i.position.is_vertical():
136
- assert (n, i.name) in batch_inputs, f"{i.name} is missing"
137
- inputs.append(batch_inputs[(n, i.name)])
 
 
 
 
 
 
138
  else:
139
  inputs.append(task)
 
 
 
140
  if cache is not None:
141
  key = make_cache_key((inputs, params))
142
  if key not in cache:
 
131
  for task in ts:
132
  try:
133
  inputs = []
134
+ missing = []
135
  for i in op.inputs:
136
  if i.position.is_vertical():
137
+ if (n, i.name) in batch_inputs:
138
+ inputs.append(batch_inputs[(n, i.name)])
139
+ else:
140
+ opt_type = ops.get_optional_type(i.type)
141
+ if opt_type is not None:
142
+ inputs.append(None)
143
+ else:
144
+ missing.append(i.name)
145
  else:
146
  inputs.append(task)
147
+ if missing:
148
+ node.publish_error(f"Missing input: {', '.join(missing)}")
149
+ break
150
  if cache is not None:
151
  key = make_cache_key((inputs, params))
152
  if key not in cache:
lynxkite-core/src/lynxkite/core/executors/simple.py CHANGED
@@ -42,7 +42,11 @@ async def execute(ws: workspace.Workspace, catalog: ops.Catalog):
42
  if i.name in edges and edges[i.name] in outputs:
43
  inputs.append(outputs[edges[i.name]])
44
  else:
45
- missing.append(i.name)
 
 
 
 
46
  if missing:
47
  node.publish_error(f"Missing input: {', '.join(missing)}")
48
  continue
 
42
  if i.name in edges and edges[i.name] in outputs:
43
  inputs.append(outputs[edges[i.name]])
44
  else:
45
+ opt_type = ops.get_optional_type(i.type)
46
+ if opt_type is not None:
47
+ inputs.append(None)
48
+ else:
49
+ missing.append(i.name)
50
  if missing:
51
  node.publish_error(f"Missing input: {', '.join(missing)}")
52
  continue
lynxkite-core/src/lynxkite/core/ops.py CHANGED
@@ -149,6 +149,16 @@ def basic_outputs(*names):
149
  return {name: Output(name=name, type=None) for name in names}
150
 
151
 
 
 
 
 
 
 
 
 
 
 
152
  def _param_to_type(name, value, type):
153
  value = value or ""
154
  if type is int:
@@ -160,12 +170,9 @@ def _param_to_type(name, value, type):
160
  if isinstance(type, enum.EnumMeta):
161
  assert value in type.__members__, f"{value} is not an option for {name}."
162
  return type[value]
163
- if isinstance(type, types.UnionType):
164
- match type.__args__:
165
- case (types.NoneType, type):
166
- return None if value == "" else _param_to_type(name, value, type)
167
- case (type, types.NoneType):
168
- return None if value == "" else _param_to_type(name, value, type)
169
  if isinstance(type, typeof) and issubclass(type, pydantic.BaseModel):
170
  try:
171
  return type.model_validate_json(value)
 
149
  return {name: Output(name=name, type=None) for name in names}
150
 
151
 
152
+ def get_optional_type(type):
153
+ """For a type like `int | None`, returns `int`. Returns `None` otherwise."""
154
+ if isinstance(type, types.UnionType):
155
+ match type.__args__:
156
+ case (types.NoneType, type):
157
+ return type
158
+ case (type, types.NoneType):
159
+ return type
160
+
161
+
162
  def _param_to_type(name, value, type):
163
  value = value or ""
164
  if type is int:
 
170
  if isinstance(type, enum.EnumMeta):
171
  assert value in type.__members__, f"{value} is not an option for {name}."
172
  return type[value]
173
+ opt_type = get_optional_type(type)
174
+ if opt_type:
175
+ return None if value == "" else _param_to_type(name, value, opt_type)
 
 
 
176
  if isinstance(type, typeof) and issubclass(type, pydantic.BaseModel):
177
  try:
178
  return type.model_validate_json(value)
lynxkite-core/src/lynxkite/core/workspace.py CHANGED
@@ -125,7 +125,7 @@ class Workspace(BaseConfig):
125
  return self.env in ops.EXECUTORS
126
 
127
  async def execute(self):
128
- await ops.EXECUTORS[self.env](self)
129
 
130
  def save(self, path: str):
131
  """Persist the workspace to a local file in JSON format."""
@@ -201,3 +201,36 @@ class Workspace(BaseConfig):
201
  if "data" not in nc:
202
  nc["data"] = pycrdt.Map()
203
  np._crdt = nc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
125
  return self.env in ops.EXECUTORS
126
 
127
  async def execute(self):
128
+ return await ops.EXECUTORS[self.env](self)
129
 
130
  def save(self, path: str):
131
  """Persist the workspace to a local file in JSON format."""
 
201
  if "data" not in nc:
202
  nc["data"] = pycrdt.Map()
203
  np._crdt = nc
204
+
205
+ def add_node(self, func):
206
+ """For convenience in e.g. tests."""
207
+ random_string = os.urandom(4).hex()
208
+ node = WorkspaceNode(
209
+ id=f"{func.__op__.name} {random_string}",
210
+ type=func.__op__.type,
211
+ data=WorkspaceNodeData(
212
+ title=func.__op__.name,
213
+ params={},
214
+ display=None,
215
+ input_metadata=None,
216
+ error=None,
217
+ status=NodeStatus.planned,
218
+ ),
219
+ position=Position(x=0, y=0),
220
+ )
221
+ self.nodes.append(node)
222
+ return node
223
+
224
+ def add_edge(
225
+ self, source: WorkspaceNode, sourceHandle: str, target: WorkspaceNode, targetHandle: str
226
+ ):
227
+ """For convenience in e.g. tests."""
228
+ edge = WorkspaceEdge(
229
+ id=f"{source.id} {sourceHandle} to {target.id} {targetHandle}",
230
+ source=source.id,
231
+ target=target.id,
232
+ sourceHandle=sourceHandle,
233
+ targetHandle=targetHandle,
234
+ )
235
+ self.edges.append(edge)
236
+ return edge
lynxkite-core/tests/test_one_by_one.py ADDED
@@ -0,0 +1,27 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from lynxkite.core import ops, workspace
2
+ from lynxkite.core.executors import one_by_one
3
+
4
+
5
+ async def test_optional_inputs():
6
+ @ops.op("test", "one")
7
+ def one():
8
+ return 1
9
+
10
+ @ops.input_position(a="bottom", b="bottom")
11
+ @ops.op("test", "maybe add")
12
+ def maybe_add(a: list[int], b: list[int] | None = None):
13
+ return [a + b for a, b in zip(a, b)] if b else a
14
+
15
+ assert maybe_add.__op__.inputs == [
16
+ ops.Input(name="a", type=list[int], position="bottom"),
17
+ ops.Input(name="b", type=list[int] | None, position="bottom"),
18
+ ]
19
+ one_by_one.register("test")
20
+ ws = workspace.Workspace(env="test", nodes=[], edges=[])
21
+ a = ws.add_node(one)
22
+ b = ws.add_node(maybe_add)
23
+ outputs = await ws.execute()
24
+ assert b.data.error == "Missing input: a"
25
+ ws.add_edge(a, "output", b, "a")
26
+ outputs = await ws.execute()
27
+ assert outputs[b.id].last_result == [1]
lynxkite-core/tests/test_simple.py ADDED
@@ -0,0 +1,26 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from lynxkite.core import ops, workspace
2
+ from lynxkite.core.executors import simple
3
+
4
+
5
+ async def test_optional_inputs():
6
+ @ops.op("test", "one")
7
+ def one():
8
+ return 1
9
+
10
+ @ops.op("test", "maybe add")
11
+ def maybe_add(a: int, b: int | None = None):
12
+ return a + (b or 0)
13
+
14
+ assert maybe_add.__op__.inputs == [
15
+ ops.Input(name="a", type=int, position="left"),
16
+ ops.Input(name="b", type=int | None, position="left"),
17
+ ]
18
+ simple.register("test")
19
+ ws = workspace.Workspace(env="test", nodes=[], edges=[])
20
+ a = ws.add_node(one)
21
+ b = ws.add_node(maybe_add)
22
+ await ws.execute()
23
+ assert b.data.error == "Missing input: a"
24
+ ws.add_edge(a, "output", b, "a")
25
+ outputs = await ws.execute()
26
+ assert outputs[b.id, "output"] == 1
lynxkite-graph-analytics/src/lynxkite_graph_analytics/core.py CHANGED
@@ -187,6 +187,7 @@ async def execute(ws: workspace.Workspace):
187
  todo.remove(id)
188
  progress = True
189
  await _execute_node(node, ws, catalog, outputs)
 
190
 
191
 
192
  async def await_if_needed(obj):
@@ -213,10 +214,15 @@ async def _execute_node(
213
  # Convert inputs types to match operation signature.
214
  try:
215
  inputs = []
 
216
  for p in op.inputs:
217
  if p.name not in input_map:
218
- node.publish_error(f"Missing input: {p.name}")
219
- return
 
 
 
 
220
  x = input_map[p.name]
221
  if p.type == nx.Graph and isinstance(x, Bundle):
222
  x = x.to_nx()
@@ -230,6 +236,9 @@ async def _execute_node(
230
  traceback.print_exc()
231
  node.publish_error(e)
232
  return
 
 
 
233
  # Execute op.
234
  try:
235
  result = op(*inputs, **params)
 
187
  todo.remove(id)
188
  progress = True
189
  await _execute_node(node, ws, catalog, outputs)
190
+ return outputs
191
 
192
 
193
  async def await_if_needed(obj):
 
214
  # Convert inputs types to match operation signature.
215
  try:
216
  inputs = []
217
+ missing = []
218
  for p in op.inputs:
219
  if p.name not in input_map:
220
+ opt_type = ops.get_optional_type(p.type)
221
+ if opt_type is not None:
222
+ inputs.append(None)
223
+ else:
224
+ missing.append(p.name)
225
+ continue
226
  x = input_map[p.name]
227
  if p.type == nx.Graph and isinstance(x, Bundle):
228
  x = x.to_nx()
 
236
  traceback.print_exc()
237
  node.publish_error(e)
238
  return
239
+ if missing:
240
+ node.publish_error(f"Missing input: {', '.join(missing)}")
241
+ return
242
  # Execute op.
243
  try:
244
  result = op(*inputs, **params)
lynxkite-graph-analytics/tests/test_lynxkite_ops.py CHANGED
@@ -160,5 +160,28 @@ async def test_multiple_inputs():
160
  assert ws.nodes[-1].data.display is False
161
 
162
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
163
  if __name__ == "__main__":
164
  pytest.main()
 
160
  assert ws.nodes[-1].data.display is False
161
 
162
 
163
+ async def test_optional_inputs():
164
+ @ops.op("test", "one")
165
+ def one():
166
+ return 1
167
+
168
+ @ops.op("test", "maybe add")
169
+ def maybe_add(a: int, b: int | None = None):
170
+ return a + (b or 0)
171
+
172
+ assert maybe_add.__op__.inputs == [
173
+ ops.Input(name="a", type=int, position="left"),
174
+ ops.Input(name="b", type=int | None, position="left"),
175
+ ]
176
+ ws = workspace.Workspace(env="test", nodes=[], edges=[])
177
+ a = ws.add_node(one)
178
+ b = ws.add_node(maybe_add)
179
+ await execute(ws)
180
+ assert b.data.error == "Missing input: a"
181
+ ws.add_edge(a, "output", b, "a")
182
+ outputs = await execute(ws)
183
+ assert outputs[b.id, "output"] == 1
184
+
185
+
186
  if __name__ == "__main__":
187
  pytest.main()