File size: 2,672 Bytes
dcedc35
 
 
b8b73b2
ca01fa3
c3ebd8b
b8b73b2
dc66cc7
2720e84
ca01fa3
b600751
dc66cc7
 
c3ebd8b
dc66cc7
892d010
 
 
c3ebd8b
ca01fa3
2720e84
 
ca01fa3
892d010
ca01fa3
 
75c875f
 
892d010
 
ca01fa3
 
b600751
05acf81
b600751
05acf81
892d010
05acf81
 
 
b600751
05acf81
892d010
ca01fa3
eee9365
05acf81
eee9365
05acf81
 
 
892d010
05acf81
 
 
 
 
b600751
 
b8b73b2
892d010
 
 
b8b73b2
 
 
 
 
 
892d010
b8b73b2
 
 
 
892d010
 
 
 
 
 
 
 
 
05acf81
 
 
892d010
05acf81
 
 
 
c3ebd8b
892d010
dcedc35
 
892d010
dcedc35
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# TODO: Make this conditional. Until then just comment/uncomment it to use cuDF Pandas.
# import cudf.pandas
# cudf.pandas.install()
import dataclasses
import fastapi
import importlib
import pathlib
import pkgutil
from . import crdt
from . import ops
from . import workspace

here = pathlib.Path(__file__).parent
lynxkite_modules = {}
for _, name, _ in pkgutil.iter_modules([str(here)]):
    if name.endswith("_ops") and not name.startswith("test_"):
        print(f"Importing {name}")
        name = f"server.{name}"
        lynxkite_modules[name] = importlib.import_module(name)

app = fastapi.FastAPI(lifespan=crdt.lifespan)
app.include_router(crdt.router)


@app.get("/api/catalog")
def get_catalog():
    return {
        k: {op.name: op.model_dump() for op in v.values()}
        for k, v in ops.CATALOGS.items()
    }


class SaveRequest(workspace.BaseConfig):
    path: str
    ws: workspace.Workspace


def save(req: SaveRequest):
    path = DATA_PATH / req.path
    assert path.is_relative_to(DATA_PATH)
    workspace.save(req.ws, path)


@app.post("/api/save")
async def save_and_execute(req: SaveRequest):
    save(req)
    await workspace.execute(req.ws)
    save(req)
    return req.ws


@app.get("/api/load")
def load(path: str):
    path = DATA_PATH / path
    assert path.is_relative_to(DATA_PATH)
    if not path.exists():
        return workspace.Workspace()
    return workspace.load(path)


DATA_PATH = pathlib.Path.cwd() / "data"


@dataclasses.dataclass(order=True)
class DirectoryEntry:
    name: str
    type: str


@app.get("/api/dir/list")
def list_dir(path: str):
    path = DATA_PATH / path
    assert path.is_relative_to(DATA_PATH)
    return sorted(
        [
            DirectoryEntry(
                p.relative_to(DATA_PATH), "directory" if p.is_dir() else "workspace"
            )
            for p in path.iterdir()
        ]
    )


@app.post("/api/dir/mkdir")
def make_dir(req: dict):
    path = DATA_PATH / req["path"]
    assert path.is_relative_to(DATA_PATH)
    assert not path.exists()
    path.mkdir()
    return list_dir(path.parent)


@app.get("/api/service/{module_path:path}")
async def service_get(req: fastapi.Request, module_path: str):
    """Executors can provide extra HTTP APIs through the /api/service endpoint."""
    module = lynxkite_modules[module_path.split("/")[0]]
    return await module.api_service_get(req)


@app.post("/api/service/{module_path:path}")
async def service_post(req: fastapi.Request, module_path: str):
    """Executors can provide extra HTTP APIs through the /api/service endpoint."""
    module = lynxkite_modules[module_path.split("/")[0]]
    return await module.api_service_post(req)