Spaces:
Running
Running
| from __future__ import annotations | |
| from abc import ABC, abstractmethod | |
| from typing import TYPE_CHECKING, Any | |
| if TYPE_CHECKING: | |
| from collections.abc import Sequence | |
| from uuid import UUID | |
| from langchain.callbacks.base import BaseCallbackHandler | |
| from langflow.graph.vertex.base import Vertex | |
| from langflow.services.tracing.schema import Log | |
| class BaseTracer(ABC): | |
| trace_id: UUID | |
| def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID): | |
| raise NotImplementedError | |
| def ready(self) -> bool: | |
| raise NotImplementedError | |
| def add_trace( | |
| self, | |
| trace_id: str, | |
| trace_name: str, | |
| trace_type: str, | |
| inputs: dict[str, Any], | |
| metadata: dict[str, Any] | None = None, | |
| vertex: Vertex | None = None, | |
| ) -> None: | |
| raise NotImplementedError | |
| def end_trace( | |
| self, | |
| trace_id: str, | |
| trace_name: str, | |
| outputs: dict[str, Any] | None = None, | |
| error: Exception | None = None, | |
| logs: Sequence[Log | dict] = (), | |
| ) -> None: | |
| raise NotImplementedError | |
| def end( | |
| self, | |
| inputs: dict[str, Any], | |
| outputs: dict[str, Any], | |
| error: Exception | None = None, | |
| metadata: dict[str, Any] | None = None, | |
| ) -> None: | |
| raise NotImplementedError | |
| def get_langchain_callback(self) -> BaseCallbackHandler | None: | |
| raise NotImplementedError | |