darabos commited on
Commit
573f5c6
·
1 Parent(s): 7d24ba8

Fix caching, move outputs on top.

Browse files
Files changed (2) hide show
  1. requirements.txt +1 -0
  2. server/executors/one_by_one.py +14 -5
requirements.txt CHANGED
@@ -2,6 +2,7 @@ fastapi
2
  matplotlib
3
  networkx
4
  numpy
 
5
  pandas
6
  scipy
7
  uvicorn[standard]
 
2
  matplotlib
3
  networkx
4
  numpy
5
+ orjson
6
  pandas
7
  scipy
8
  uvicorn[standard]
server/executors/one_by_one.py CHANGED
@@ -1,8 +1,8 @@
1
  from .. import ops
2
  from .. import workspace
3
- import fastapi
4
- import json
5
  import pandas as pd
 
6
  import traceback
7
  import inspect
8
  import typing
@@ -63,6 +63,15 @@ def get_stages(ws, catalog):
63
  stages.append(set(nodes))
64
  return stages
65
 
 
 
 
 
 
 
 
 
 
66
  EXECUTOR_OUTPUT_CACHE = {}
67
 
68
  def execute(ws, catalog, cache=None):
@@ -101,10 +110,10 @@ def execute(ws, catalog, cache=None):
101
  inputs = [
102
  batch_inputs[(n, i.name)] if i.position in 'top or bottom' else task
103
  for i in op.inputs.values()]
104
- if cache:
105
- key = json.dumps(fastapi.encoders.jsonable_encoder((inputs, params)))
106
  if key not in cache:
107
- cache[key] = op.func(*inputs, **params)
108
  result = cache[key]
109
  else:
110
  result = op(*inputs, **params)
 
1
  from .. import ops
2
  from .. import workspace
3
+ import orjson
 
4
  import pandas as pd
5
+ import pydantic
6
  import traceback
7
  import inspect
8
  import typing
 
63
  stages.append(set(nodes))
64
  return stages
65
 
66
+
67
+ def _default_serializer(obj):
68
+ if isinstance(obj, pydantic.BaseModel):
69
+ return obj.dict()
70
+ return {"__nonserializable__": id(obj)}
71
+
72
+ def make_cache_key(obj):
73
+ return orjson.dumps(obj, default=_default_serializer)
74
+
75
  EXECUTOR_OUTPUT_CACHE = {}
76
 
77
  def execute(ws, catalog, cache=None):
 
110
  inputs = [
111
  batch_inputs[(n, i.name)] if i.position in 'top or bottom' else task
112
  for i in op.inputs.values()]
113
+ if cache is not None:
114
+ key = make_cache_key((inputs, params))
115
  if key not in cache:
116
+ cache[key] = op(*inputs, **params)
117
  result = cache[key]
118
  else:
119
  result = op(*inputs, **params)