Spaces:
Running
Plugin development
Plugins can provide additional operations for an existing LynxKite environment, and they can also provide new environments.
Creating a new plugin
.py
files inside the LynxKite data directory are automatically imported each time a
workspace is executed. You can create a new plugin by creating a new .py
file in the
data directory. LynxKite even includes an integrated editor for this purpose.
Click New code file in the directory where you want to create the file.
Plugins in subdirectories of the data directory are imported when executing workspaces within those directories. This allows you to create plugins that are only available in specific workspaces.
You can also create and distribute plugins as Python packages. In this case the
module name must start with lynxkite_
for it to be automatically imported on startup.
Plugin dependencies
When creating a plugin as a "code file", you can create a requirements.txt
file in the same
directory. This file will be used to install the dependencies of the plugin.
Adding new operations
Any piece of Python code can easily be wrapped into a LynxKite operation. Let's say we have some code that calculates the length of a string column in a Pandas DataFrame:
df["length"] = df["my_column"].str.len()
We can turn it into a LynxKite operation using the
@op
decorator:
import pandas as pd
from lynxkite.core.ops import op
@op("LynxKite Graph Analytics", "Get column length")
def get_length(df: pd.DataFrame, *, column_name: str):
"""
Gets the length of a string column.
Args:
column_name: The name of the column to get the length of.
"""
df = df.copy()
df["length"] = df[column_name].str.len()
return df
Let's review the changes we made.
The @op
decorator
The @op
decorator registers a
function as a LynxKite operation. The first argument is the name of the environment,
the second argument is the name of the operation.
When defining multiple operations, you can use
ops.op_registration
for convenience:
op = ops.op_registration("LynxKite Graph Analytics")
@op("An operation")
def my_op():
...
The function signature
*
in the list of function arguments marks the start of keyword-only arguments.
The arguments before *
will become inputs of the operation. The arguments after *
will
be its parameters.
# /--- inputs ---\ /- parameters -\
def get_length(df: pd.DataFrame, *, column_name: str):
LynxKite uses the type annotations of the function arguments to provide input validation, conversion, and the right UI on the frontend.
The types supported for inputs are determined by the environment. For graph analytics, the possibilities are:
pandas.DataFrame
networkx.Graph
lynxkite_graph_analytics.Bundle
The inputs of an operation are automatically converted to the right type, when possible.
To make an input optional, use an optional type, like pd.DataFrame | None
.
The position of the input and output connectors can be controlled using the
@ops.input_position
and
@ops.output_position
decorators. By default, inputs are on the left and outputs on the right.
All parameters are stored in LynxKite workspaces as strings. If a type annotation is provided, LynxKite will convert the string to the right type and provide the right UI.
str
,int
,float
are presented as a text box and converted to the given type.bool
is presented as a checkbox.lynxkite.core.ops.LongStr
is presented as a text area.- Enums are presented as a dropdown list.
- Pydantic models are presented as their JSON string representations. (Unless you add custom UI for them.) They are converted to the model object when your function is called.
Slow operations
If the function takes a significant amount of time to run, we must either:
- Write an asynchronous function.
- Pass
slow=True
to the@op
decorator. LynxKite will run the function in a separate thread.
slow=True
also causes the results of the operation to be cached on disk. As long as
its inputs don't change, the operation will not be run again. This is useful for both
synchronous and synchronous operations.
Documentation
The docstring of the function is used as the operation's description. You can use Google-style or Numpy-style docstrings. (See Griffe's documentation.)
The docstring should be omitted for simple operations like the one above.
Outputting results
The return value of the function is the output of the operation. It will be passed to the next operation in the pipeline.
An operation can have multiple outputs. In this case, the return value must be a dictionary,
and the list of outputs must be declared in the @op
decorator.
@op("LynxKite Graph Analytics", "Train/test split", outputs=["train", "test"])
def test_split(df: pd.DataFrame, *, test_ratio=0.1):
test = df.sample(frac=test_ratio).reset_index()
train = df.drop(test.index).reset_index()
return {"train": train, "test": test}
Displaying results
The outputs of the operation can be used by other operations. But we can also generate results
that are meant to be viewed by the user. The different options for this are controlled by the view
argument of the @op
decorator.
The view
argument can be one of the following:
matplotlib
: Just plot something with Matplotlib and it will be displayed in the UI.@op("LynxKite Graph Analytics", "Plot column histogram", view="matplotlib") def plot(df: pd.DataFrame, *, column_name: str): df[column_name].value_counts().sort_index().plot.bar()
visualization
: Draws a chart using ECharts. You need to return a dictionary with the chart configuration, which ECharts callsoption
.@op("View loss", view="visualization") def view_loss(bundle: core.Bundle): loss = bundle.dfs["training"].training_loss.tolist() v = { "title": {"text": "Training loss"}, "xAxis": {"type": "category"}, "yAxis": {"type": "value"}, "series": [{"data": loss, "type": "line"}], } return v
image
: Return an image as a data URL and it will be displayed.molecule
: Return a molecule as a PDB or SDF string, or anrdkit.Chem.Mol
object. It will be displayed using 3Dmol.js.table_view
: ReturnBundle.to_dict()
.
Adding new environments
A new environment means a completely new set of operations, and (optionally) a new executor. There's nothing to be done for setting up a new environment. Just start registering operations into it.
No executor
By default, the new environment will have no executor. This can be useful!
LynxKite workspaces are stored as straightforward JSON files and updated on every modification. You can use LynxKite for configuring workflows and have a separate system read the JSON files.
Since the code of the operations is not executed in this case, you can create functions that do nothing.
Alternatively, you can use the
register_passive_op
and
passive_op_registration
functions to easily whip up a set of operations:
from lynxkite.core.ops import passive_op_registration, Parameter as P
op = passive_op_registration("My Environment")
op('Scrape documents', params=[P('url', '')])
op('Conversation logs')
op('Extract graph')
op('Compute embeddings', params=[P.options('method', ['LLM', 'graph', 'random']), P('dimensions', 1234)])
op('Vector DB', params=[P.options('backend', ['ANN', 'HNSW'])])
op('Chat UI', outputs=[])
op('Chat backend')
Built-in executors
LynxKite comes with two built-in executors. You can register these for your environment and you're good to go.
from lynxkite.core.executors import simple
simple.register("My Environment")
The simple
executor
runs each operation once, passing the output of the preceding operation
as the input to the next one. No tricks. You can use any types as inputs and outputs.
from lynxkite.core.executors import one_by_one
one_by_one.register("My Environment")
The one_by_one
executor
expects that the code for operations is the code for transforming
a single element. If an operation returns an iterable, it will be split up
into its elements, and the next operation is called for each element.
Sometimes you need the full contents of an input. The one_by_one
executor
lets you choose between the two modes by the orientation of the input connector.
If the input connector is horizontal (left or right), it takes single elements.
If the input connector is vertical (top or bottom), it takes an iterable of all the incoming data.
A unique advantage of this setup is that horizontal inputs can have loops across horizontal inputs. Just make sure that loops eventually discard all elements, so you don't end up with an infinite loop.
Custom executors
A custom executor can be registered using
@ops.register_executor
.
@ops.register_executor(ENV)
async def execute(ws: workspace.Workspace):
catalog = ops.CATALOGS[ws.env]
...
The executor must be an asynchronous function that takes a
workspace.Workspace
as an argument. The return value is ignored and it's up to you how you process the workspace.
To update the frontend as the executor processes the workspace, call
WorkspaceNode.publish_started
when starting to execute a node, and
WorkspaceNode.publish_result
to publish the results. Use
WorkspaceNode.publish_error
if the node failed.