Spaces:
Sleeping
Sleeping
| import types | |
| import inspect | |
| from typing import Union, Optional, Any, Type, Sequence, Callable, Iterable | |
| from aworld.trace.base import ( | |
| AttributeValueType, | |
| NoOpSpan, | |
| Span, Tracer, | |
| NoOpTracer, | |
| get_tracer_provider, | |
| get_tracer_provider_silent, | |
| log_trace_error | |
| ) | |
| from aworld.trace.span_cosumer import SpanConsumer | |
| from aworld.version_gen import __version__ | |
| from aworld.trace.auto_trace import AutoTraceModule, install_auto_tracing | |
| from aworld.trace.stack_info import get_user_stack_info | |
| from aworld.trace.constants import ( | |
| ATTRIBUTES_MESSAGE_KEY, | |
| ATTRIBUTES_MESSAGE_RUN_TYPE_KEY, | |
| ATTRIBUTES_MESSAGE_TEMPLATE_KEY, | |
| RunType | |
| ) | |
| from aworld.trace.msg_format import ( | |
| chunks_formatter, | |
| warn_formatting, | |
| FStringAwaitError, | |
| KnownFormattingError, | |
| warn_fstring_await | |
| ) | |
| from aworld.trace.function_trace import trace_func | |
| from .opentelemetry.opentelemetry_adapter import configure_otlp_provider | |
| from aworld.logs.util import logger | |
| def trace_configure(provider: str = "otlp", | |
| backends: Sequence[str] = None, | |
| base_url: str = None, | |
| write_token: str = None, | |
| span_consumers: Optional[Sequence[SpanConsumer]] = None, | |
| **kwargs | |
| ) -> None: | |
| """ | |
| Configure the trace provider. | |
| Args: | |
| provider: The trace provider to use. | |
| backends: The trace backends to use. | |
| base_url: The base URL of the trace backend. | |
| write_token: The write token of the trace backend. | |
| span_consumers: The span consumers to use. | |
| **kwargs: Additional arguments to pass to the trace provider. | |
| Returns: | |
| None | |
| """ | |
| exist_provider = get_tracer_provider_silent() | |
| if exist_provider: | |
| logger.info("Trace provider already configured, shutting down...") | |
| exist_provider.shutdown() | |
| if provider == "otlp": | |
| configure_otlp_provider( | |
| backends=backends, base_url=base_url, write_token=write_token, span_consumers=span_consumers, **kwargs) | |
| else: | |
| raise ValueError(f"Unknown trace provider: {provider}") | |
| class TraceManager: | |
| """ | |
| TraceManager is a class that provides a way to trace the execution of a function. | |
| """ | |
| def __init__(self, tracer_name: str = None) -> None: | |
| self._tracer_name = tracer_name or "aworld" | |
| self._version = __version__ | |
| def _create_auto_span(self, | |
| name: str, | |
| attributes: dict[str, AttributeValueType] = None | |
| ) -> Span: | |
| """ | |
| Create a auto trace span with the given name and attributes. | |
| """ | |
| return self._create_context_span(name, attributes) | |
| def _create_context_span(self, | |
| name: str, | |
| attributes: dict[str, AttributeValueType] = None) -> Span: | |
| try: | |
| tracer = get_tracer_provider().get_tracer( | |
| name=self._tracer_name, version=self._version) | |
| return ContextSpan(span_name=name, tracer=tracer, attributes=attributes) | |
| except Exception: | |
| return ContextSpan(span_name=name, tracer=NoOpTracer(), attributes=attributes) | |
| def get_current_span(self) -> Span: | |
| """ | |
| Get the current span. | |
| """ | |
| try: | |
| return get_tracer_provider().get_current_span() | |
| except Exception: | |
| return NoOpSpan() | |
| def new_manager(self, tracer_name_suffix: str = None) -> "TraceManager": | |
| """ | |
| Create a new TraceManager with the given tracer name suffix. | |
| """ | |
| tracer_name = self._tracer_name if not tracer_name_suffix else f"{self._tracer_name}.{tracer_name_suffix}" | |
| return TraceManager(tracer_name=tracer_name) | |
| def auto_tracing(self, | |
| modules: Union[Sequence[str], Callable[[AutoTraceModule], bool]], | |
| min_duration: float) -> None: | |
| """ | |
| Automatically trace the execution of a function. | |
| Args: | |
| modules: A list of module names or a callable that takes a `AutoTraceModule` and returns a boolean. | |
| min_duration: The minimum duration of a function to be traced. | |
| Returns: | |
| None | |
| """ | |
| install_auto_tracing(self, modules, min_duration) | |
| def span(self, | |
| msg_template: str = "", | |
| attributes: dict[str, AttributeValueType] = None, | |
| *, | |
| span_name: str = None, | |
| run_type: RunType = RunType.OTHER) -> "ContextSpan": | |
| try: | |
| attributes = attributes or {} | |
| stack_info = get_user_stack_info() | |
| merged_attributes = {**stack_info, **attributes} | |
| # Retrieve stack information of user code and add it to the attributes | |
| if any(c in msg_template for c in ('{', '}')): | |
| fstring_frame = inspect.currentframe().f_back | |
| else: | |
| fstring_frame = None | |
| log_message, extra_attrs, msg_template = format_span_msg( | |
| msg_template, | |
| merged_attributes, | |
| fstring_frame=fstring_frame, | |
| ) | |
| merged_attributes[ATTRIBUTES_MESSAGE_KEY] = log_message | |
| merged_attributes.update(extra_attrs) | |
| merged_attributes[ATTRIBUTES_MESSAGE_TEMPLATE_KEY] = msg_template | |
| merged_attributes[ATTRIBUTES_MESSAGE_RUN_TYPE_KEY] = run_type.value | |
| span_name = span_name or msg_template | |
| return self._create_context_span(span_name, merged_attributes) | |
| except Exception: | |
| log_trace_error() | |
| return ContextSpan(span_name=span_name, tracer=NoOpTracer(), attributes=attributes) | |
| def func_span(self, | |
| msg_template: Union[str, Callable] = None, | |
| *, | |
| attributes: dict[str, AttributeValueType] = None, | |
| span_name: str = None, | |
| extract_args: Union[bool, Iterable[str]] = False, | |
| **kwargs) -> Callable: | |
| """ | |
| A decorator that traces the execution of a function. | |
| Args: | |
| msg_template: The message template to use. | |
| attributes: The attributes to add to the span. | |
| span_name: The name of the span. | |
| extract_args: Whether to extract arguments from the function call. | |
| **kwargs: Additional attributes to add to the span. | |
| Returns: | |
| A decorator that traces the execution of a function. | |
| """ | |
| if callable(msg_template): | |
| # @trace_func | |
| # def foo(): | |
| return self.func_span()(msg_template) | |
| attributes = attributes or {} | |
| attributes.update(kwargs) | |
| return trace_func(self, msg_template, attributes, span_name, extract_args) | |
| class ContextSpan(Span): | |
| """A context manager that wraps an existing `Span` object. | |
| This class provides a way to use a `Span` object as a context manager. | |
| When the context manager is entered, it returns the `Span` itself. | |
| When the context manager is exited, it calls `end` on the `Span`. | |
| Args: | |
| span: The `Span` object to wrap. | |
| """ | |
| def __init__(self, | |
| span_name: str, | |
| tracer: Tracer, | |
| attributes: dict[str, AttributeValueType] = None) -> None: | |
| self._span_name = span_name | |
| self._tracer = tracer | |
| self._attributes = attributes | |
| self._span: Span = None | |
| self._coro_context = None | |
| def _start(self): | |
| if self._span is not None: | |
| return | |
| self._span = self._tracer.start_span( | |
| name=self._span_name, | |
| attributes=self._attributes, | |
| ) | |
| def __enter__(self) -> "Span": | |
| self._start() | |
| return self | |
| def __exit__( | |
| self, | |
| exc_type: Optional[Type[BaseException]], | |
| exc_val: Optional[BaseException], | |
| traceback: Optional[Any], | |
| ) -> None: | |
| """Ends context manager and calls `end` on the `Span`.""" | |
| self._handle_exit(exc_type, exc_val, traceback) | |
| async def __aenter__(self) -> "Span": | |
| self._start() | |
| return self | |
| async def __aexit__( | |
| self, | |
| exc_type: Optional[Type[BaseException]], | |
| exc_val: Optional[BaseException], | |
| traceback: Optional[Any], | |
| ) -> None: | |
| self._handle_exit(exc_type, exc_val, traceback) | |
| def _handle_exit( | |
| self, | |
| exc_type: Optional[Type[BaseException]], | |
| exc_val: Optional[BaseException], | |
| traceback: Optional[Any], | |
| ) -> None: | |
| try: | |
| if self._span and self._span.is_recording() and isinstance(exc_val, BaseException): | |
| self._span.record_exception(exc_val, escaped=True) | |
| except ValueError as e: | |
| logger.warning(f"Failed to record_exception: {e}") | |
| finally: | |
| if self._span: | |
| self._span.end() | |
| def end(self, end_time: Optional[int] = None) -> None: | |
| if self._span: | |
| self._span.end(end_time) | |
| def set_attribute(self, key: str, value: AttributeValueType) -> None: | |
| if self._span: | |
| self._span.set_attribute(key, value) | |
| def set_attributes(self, attributes: dict[str, AttributeValueType]) -> None: | |
| if self._span: | |
| self._span.set_attributes(attributes) | |
| def is_recording(self) -> bool: | |
| if self._span: | |
| return self._span.is_recording() | |
| return False | |
| def record_exception( | |
| self, | |
| exception: BaseException, | |
| attributes: dict[str, Any] = None, | |
| timestamp: Optional[int] = None, | |
| escaped: bool = False, | |
| ) -> None: | |
| if self._span: | |
| self._span.record_exception( | |
| exception, attributes, timestamp, escaped) | |
| def get_trace_id(self) -> str: | |
| if self._span: | |
| return self._span.get_trace_id() | |
| def get_span_id(self) -> str: | |
| if self._span: | |
| return self._span.get_span_id() | |
| def format_span_msg( | |
| format_string: str, | |
| kwargs: dict[str, Any], | |
| fstring_frame: types.FrameType = None, | |
| ) -> tuple[str, dict[str, Any], str]: | |
| """ Returns | |
| 1. The formatted message. | |
| 2. A dictionary of extra attributes to add to the span/log. | |
| These can come from evaluating values in f-strings. | |
| 3. The final message template, which may differ from `format_string` if it was an f-string. | |
| """ | |
| try: | |
| chunks, extra_attrs, new_template = chunks_formatter.chunks( | |
| format_string, | |
| kwargs, | |
| fstring_frame=fstring_frame | |
| ) | |
| return ''.join(chunk['v'] for chunk in chunks), extra_attrs, new_template | |
| except KnownFormattingError as e: | |
| warn_formatting(str(e) or str(e.__cause__)) | |
| except FStringAwaitError as e: | |
| warn_fstring_await(str(e)) | |
| except Exception: | |
| log_trace_error() | |
| # Formatting failed, so just use the original format string as the message. | |
| return format_string, {}, format_string | |