Spaces:
Running
Running
| from __future__ import annotations | |
| import os | |
| from datetime import datetime, timezone | |
| from typing import TYPE_CHECKING, Any | |
| from loguru import logger | |
| from typing_extensions import override | |
| from langflow.services.tracing.base import BaseTracer | |
| if TYPE_CHECKING: | |
| from collections.abc import Sequence | |
| from uuid import UUID | |
| from langchain.callbacks.base import BaseCallbackHandler | |
| from langfuse.client import StatefulSpanClient | |
| from langflow.graph.vertex.base import Vertex | |
| from langflow.services.tracing.schema import Log | |
| class LangFuseTracer(BaseTracer): | |
| flow_id: str | |
| def __init__(self, trace_name: str, trace_type: str, project_name: str, trace_id: UUID): | |
| self.project_name = project_name | |
| self.trace_name = trace_name | |
| self.trace_type = trace_type | |
| self.trace_id = trace_id | |
| self.flow_id = trace_name.split(" - ")[-1] | |
| self.last_span: StatefulSpanClient | None = None | |
| self.spans: dict = {} | |
| config = self._get_config() | |
| self._ready: bool = self.setup_langfuse(config) if config else False | |
| def ready(self): | |
| return self._ready | |
| def setup_langfuse(self, config) -> bool: | |
| try: | |
| from langfuse import Langfuse | |
| from langfuse.callback.langchain import LangchainCallbackHandler | |
| self._client = Langfuse(**config) | |
| self.trace = self._client.trace(id=str(self.trace_id), name=self.flow_id) | |
| config |= { | |
| "trace_name": self.flow_id, | |
| "stateful_client": self.trace, | |
| "update_stateful_client": True, | |
| } | |
| self._callback = LangchainCallbackHandler(**config) | |
| except ImportError: | |
| logger.exception("Could not import langfuse. Please install it with `pip install langfuse`.") | |
| return False | |
| except Exception as e: # noqa: BLE001 | |
| logger.debug(f"Error setting up LangSmith tracer: {e}") | |
| return False | |
| return True | |
| def add_trace( | |
| self, | |
| trace_id: str, | |
| trace_name: str, | |
| trace_type: str, | |
| inputs: dict[str, Any], | |
| metadata: dict[str, Any] | None = None, | |
| vertex: Vertex | None = None, | |
| ) -> None: | |
| start_time = datetime.now(tz=timezone.utc) | |
| if not self._ready: | |
| return | |
| metadata_: dict = {} | |
| metadata_ |= {"trace_type": trace_type} if trace_type else {} | |
| metadata_ |= metadata or {} | |
| name = trace_name.removesuffix(f" ({trace_id})") | |
| content_span = { | |
| "name": name, | |
| "input": inputs, | |
| "metadata": metadata_, | |
| "start_time": start_time, | |
| } | |
| span = self.last_span.span(**content_span) if self.last_span else self.trace.span(**content_span) | |
| self.last_span = span | |
| self.spans[trace_id] = span | |
| def end_trace( | |
| self, | |
| trace_id: str, | |
| trace_name: str, | |
| outputs: dict[str, Any] | None = None, | |
| error: Exception | None = None, | |
| logs: Sequence[Log | dict] = (), | |
| ) -> None: | |
| end_time = datetime.now(tz=timezone.utc) | |
| if not self._ready: | |
| return | |
| span = self.spans.get(trace_id, None) | |
| if span: | |
| output: dict = {} | |
| output |= outputs or {} | |
| output |= {"error": str(error)} if error else {} | |
| output |= {"logs": list(logs)} if logs else {} | |
| content = {"output": output, "end_time": end_time} | |
| span.update(**content) | |
| def end( | |
| self, | |
| inputs: dict[str, Any], | |
| outputs: dict[str, Any], | |
| error: Exception | None = None, | |
| metadata: dict[str, Any] | None = None, | |
| ) -> None: | |
| if not self._ready: | |
| return | |
| self._client.flush() | |
| def get_langchain_callback(self) -> BaseCallbackHandler | None: | |
| if not self._ready: | |
| return None | |
| return None # self._callback | |
| def _get_config(self) -> dict: | |
| secret_key = os.getenv("LANGFUSE_SECRET_KEY", None) | |
| public_key = os.getenv("LANGFUSE_PUBLIC_KEY", None) | |
| host = os.getenv("LANGFUSE_HOST", None) | |
| if secret_key and public_key and host: | |
| return {"secret_key": secret_key, "public_key": public_key, "host": host} | |
| return {} | |