Spaces:
Running
Running
from any_agent import AnyAgent | |
from opentelemetry.sdk.trace.export import SimpleSpanProcessor | |
from collections.abc import Sequence | |
from typing import TYPE_CHECKING, Callable | |
from opentelemetry.sdk.trace.export import ( | |
SpanExporter, | |
SpanExportResult, | |
) | |
from any_agent import AgentFramework | |
from any_agent.tracing import TracingProcessor | |
from any_agent.tracing.trace import AgentSpan | |
if TYPE_CHECKING: | |
from opentelemetry.sdk.trace import ReadableSpan | |
class StreamlitExporter(SpanExporter): | |
"""Build an `AgentTrace` and export to the different outputs.""" | |
def __init__( # noqa: D107 | |
self, agent_framework: AgentFramework, callback: Callable | |
): | |
self.agent_framework = agent_framework | |
self.processor: TracingProcessor | None = TracingProcessor.create( | |
agent_framework | |
) | |
self.callback = callback | |
def export(self, spans: Sequence["ReadableSpan"]) -> SpanExportResult: # noqa: D102 | |
if not self.processor: | |
return SpanExportResult.SUCCESS | |
for readable_span in spans: | |
# Check if this span belongs to our run | |
span = AgentSpan.from_readable_span(readable_span) | |
self.callback(span) | |
return SpanExportResult.SUCCESS | |
def export_logs(agent: AnyAgent, callback: Callable) -> None: | |
exporter = StreamlitExporter(agent.framework, callback) | |
span_processor = SimpleSpanProcessor(exporter) | |
agent._tracer_provider.add_span_processor(span_processor) | |