Spaces:
Sleeping
Sleeping
| from abc import ABC, abstractmethod | |
| from typing import Sequence | |
| from aworld.trace.base import Span | |
| class SpanConsumer(ABC): | |
| """SpanConsumer is a protocol that represents a consumer for spans. | |
| """ | |
| def consume(self, spans: Sequence[Span]) -> None: | |
| """Consumes a span. | |
| Args: | |
| spans: The span to consume. | |
| """ | |
| _SPAN_CONSUMER_REGISTRY = {} | |
| def register_span_consumer(default_kwargs=None) -> None: | |
| """Registers a span consumer. | |
| Args: | |
| default_kwargs: A dictionary of default keyword arguments to pass to the span consumer. | |
| """ | |
| default_kwargs = default_kwargs or {} | |
| def decorator(cls): | |
| _SPAN_CONSUMER_REGISTRY[cls.__name__] = (cls, default_kwargs) | |
| return cls | |
| return decorator | |
| def get_span_consumers() -> Sequence[SpanConsumer]: | |
| """Returns a list of span consumers. | |
| Returns: | |
| A list of span consumers. | |
| """ | |
| return [ | |
| cls(**kwargs) | |
| for cls, kwargs in _SPAN_CONSUMER_REGISTRY.values() | |
| ] | |