Duibonduil commited on
Commit
c0b9657
·
verified ·
1 Parent(s): 3c9e1fb

Upload 3 files

Browse files
aworld/trace/opentelemetry/__init__.py ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ # coding: utf-8
2
+ # Copyright (c) 2025 inclusionAI.
aworld/trace/opentelemetry/memory_storage.py ADDED
@@ -0,0 +1,238 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import time
4
+ import threading
5
+ from datetime import datetime
6
+ from abc import ABC, abstractmethod
7
+ from collections import defaultdict
8
+ from pydantic import BaseModel
9
+ from typing import Optional, Dict, Any, Union
10
+ from opentelemetry.sdk.trace import Span, SpanContext
11
+ from opentelemetry.sdk.trace.export import SpanExporter
12
+ from aworld.logs.util import logger
13
+ from aworld.trace.constants import ATTRIBUTES_MESSAGE_RUN_TYPE_KEY, RunType
14
+
15
+
16
+ class SpanStatus(BaseModel):
17
+ code: str = "UNSET"
18
+ description: Optional[str] = None
19
+
20
+
21
+ class SpanModel(BaseModel):
22
+ trace_id: str
23
+ span_id: str
24
+ name: str
25
+ start_time: str
26
+ end_time: str
27
+ duration_ms: float
28
+ attributes: Dict[str, Any]
29
+ status: SpanStatus
30
+ parent_id: Optional[str]
31
+ children: list['SpanModel'] = []
32
+ run_type: Optional[str] = RunType.OTHER.value
33
+ is_event: bool = False
34
+
35
+ @staticmethod
36
+ def from_span(span):
37
+ start_timestamp = span.start_time / 1e9
38
+ end_timestamp = span.end_time / 1e9
39
+ start_ms = int((span.start_time % 1e9) / 1e6)
40
+ end_ms = int((span.end_time % 1e9) / 1e6)
41
+
42
+ return SpanModel(
43
+ trace_id=f"{span.get_span_context().trace_id:032x}",
44
+ span_id=SpanModel.get_span_id(span),
45
+ name=span.name,
46
+ start_time=time.strftime(
47
+ '%Y-%m-%d %H:%M:%S', time.localtime(start_timestamp)) + f'.{start_ms:03d}',
48
+ end_time=time.strftime(
49
+ '%Y-%m-%d %H:%M:%S', time.localtime(end_timestamp)) + f'.{end_ms:03d}',
50
+ duration_ms=(span.end_time - span.start_time)/1e6,
51
+ attributes={k: v for k, v in span.attributes.items()},
52
+ status=SpanStatus(
53
+ code=str(
54
+ span.status.status_code) if span.status.status_code else "UNSET",
55
+ description=span.status.description or None
56
+ ),
57
+ parent_id=SpanModel.get_span_id(
58
+ span.parent) if span.parent else None,
59
+ run_type=span.attributes.get(
60
+ ATTRIBUTES_MESSAGE_RUN_TYPE_KEY, RunType.OTHER.value),
61
+ is_event=(span.attributes.get("event.id") is not None)
62
+ )
63
+
64
+ @staticmethod
65
+ def get_span_id(span: Union[Span, SpanContext]):
66
+ if isinstance(span, SpanContext):
67
+ return f"{span.span_id:016x}"
68
+ return f"{span.get_span_context().span_id:016x}"
69
+
70
+
71
+ class TraceStorage(ABC):
72
+ """
73
+ Storage for traces.
74
+ """
75
+ @abstractmethod
76
+ def add_span(self, span: Span) -> None:
77
+ """
78
+ Add a span to the storage.
79
+ """
80
+
81
+ @abstractmethod
82
+ def get_all_traces(self) -> list[str]:
83
+ """
84
+ Get all trace ids.
85
+ """
86
+
87
+ @abstractmethod
88
+ def get_all_spans(self, trace_id) -> list[SpanModel]:
89
+ """
90
+ Get all spans of a trace.
91
+ """
92
+
93
+
94
+ class InMemoryStorage(TraceStorage):
95
+ """
96
+ In-memory storage for spans.
97
+ """
98
+
99
+ def __init__(self, max_traces=1000):
100
+ self._traces = defaultdict(list)
101
+ self._trace_order = []
102
+ self.max_traces = max_traces
103
+
104
+ def add_span(self, span: Span):
105
+ trace_id = f"{span.get_span_context().trace_id:032x}"
106
+ if trace_id not in self._traces:
107
+ self._trace_order.append(trace_id)
108
+ if len(self._trace_order) > self.max_traces:
109
+ oldest_trace = self._trace_order.pop(0)
110
+ del self._traces[oldest_trace]
111
+ self._traces[trace_id].append(SpanModel.from_span(span))
112
+
113
+ def get_all_traces(self):
114
+ return list(self._traces.keys())
115
+
116
+ def get_all_spans(self, trace_id):
117
+ return self._traces.get(trace_id, [])
118
+
119
+
120
+ class InMemoryWithPersistStorage(TraceStorage):
121
+ """
122
+ In-memory storage for spans with optimized disk persistence.
123
+ """
124
+
125
+ def __init__(self, storage_dir: str = "./trace_data"):
126
+ self._traces = defaultdict(list)
127
+ self._pending_spans = []
128
+ self.storage_dir = os.path.abspath(storage_dir)
129
+ os.makedirs(self.storage_dir, exist_ok=True)
130
+ self._lock = threading.Lock()
131
+ self._persist_thread = None
132
+ self._load_today_traces()
133
+ self.current_filename = None
134
+
135
+ def _get_today_filename(self):
136
+ if not self.current_filename:
137
+ timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
138
+ self.current_filename = f"trace_{timestamp}.json"
139
+ return self.current_filename
140
+
141
+ def _load_today_traces(self):
142
+ today = datetime.now().strftime("%Y%m%d")
143
+ for filename in os.listdir(self.storage_dir):
144
+ if filename.startswith(f"trace_{today}") and filename.endswith(".json"):
145
+ filepath = os.path.join(self.storage_dir, filename)
146
+ try:
147
+ with self._lock, open(filepath, 'r') as f:
148
+ data = json.load(f)
149
+ for span_data in data:
150
+ trace_id = span_data.get("trace_id")
151
+ span_json = span_data.get("span")
152
+ self._traces[trace_id].append(
153
+ SpanModel.parse_raw(span_json))
154
+ except Exception as e:
155
+ logger.error(
156
+ f"Error loading trace file {filename}: {str(e)}")
157
+
158
+ def _start_persist_thread(self):
159
+ if self._persist_thread is None:
160
+ self._persist_thread = threading.Thread(
161
+ target=self._persist_worker, daemon=True)
162
+ self._persist_thread.start()
163
+
164
+ def _persist_worker(self):
165
+ while True:
166
+ time.sleep(5)
167
+ self._persist()
168
+
169
+ def _persist(self):
170
+ if not self._pending_spans:
171
+ return
172
+
173
+ temp_filepath = os.path.join(
174
+ self.storage_dir, f"temp_{time.time_ns()}.json")
175
+ final_filepath = os.path.join(
176
+ self.storage_dir, self._get_today_filename())
177
+
178
+ try:
179
+ spans_to_persist = []
180
+ with self._lock:
181
+ spans_to_persist = self._pending_spans.copy()
182
+ self._pending_spans.clear()
183
+
184
+ if spans_to_persist:
185
+ existing_data = []
186
+ if os.path.exists(final_filepath):
187
+ try:
188
+ with open(final_filepath, 'r') as f:
189
+ existing_data = json.load(f)
190
+ except Exception as e:
191
+ logger.error(
192
+ f"Error reading existing trace file: {str(e)}")
193
+
194
+ merged_spans = existing_data + spans_to_persist
195
+
196
+ with open(temp_filepath, 'w') as f:
197
+ json.dump(merged_spans, f, default=str)
198
+ os.replace(temp_filepath, final_filepath)
199
+ except Exception as e:
200
+ logger.error(f"Error persisting traces: {str(e)}")
201
+ try:
202
+ os.unlink(temp_filepath)
203
+ except:
204
+ pass
205
+
206
+ def add_span(self, span: Span):
207
+ span_model = SpanModel.from_span(span)
208
+ with self._lock:
209
+ self._traces[span_model.trace_id].append(span_model)
210
+ self._pending_spans.append({
211
+ "trace_id": span_model.trace_id,
212
+ "span": span_model.json()
213
+ })
214
+ self._start_persist_thread()
215
+
216
+ def get_all_traces(self):
217
+ with self._lock:
218
+ return list(self._traces.keys())
219
+
220
+ def get_all_spans(self, trace_id):
221
+ with self._lock:
222
+ return self._traces.get(trace_id, [])
223
+
224
+
225
+ class InMemorySpanExporter(SpanExporter):
226
+ """
227
+ Span exporter that stores spans in memory.
228
+ """
229
+
230
+ def __init__(self, storage: TraceStorage):
231
+ self._storage = storage
232
+
233
+ def export(self, spans):
234
+ for span in spans:
235
+ self._storage.add_span(span)
236
+
237
+ def shutdown(self):
238
+ pass
aworld/trace/opentelemetry/opentelemetry_adapter.py ADDED
@@ -0,0 +1,436 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import sys
2
+ import os
3
+ import traceback
4
+ import time
5
+ import datetime
6
+ import requests
7
+ from threading import Lock
8
+ from typing import Any, Iterator, Sequence, Optional, TYPE_CHECKING
9
+ from contextvars import Token
10
+ from urllib.parse import urljoin
11
+ import opentelemetry.context as otlp_context_api
12
+ from opentelemetry.trace import (
13
+ SpanKind,
14
+ set_span_in_context,
15
+ get_current_span as get_current_otlp_span,
16
+ NonRecordingSpan,
17
+ SpanContext,
18
+ TraceFlags
19
+ )
20
+ from opentelemetry.trace.status import StatusCode
21
+ from opentelemetry.sdk.trace import (
22
+ ReadableSpan,
23
+ SynchronousMultiSpanProcessor,
24
+ Tracer as SDKTracer,
25
+ Span as SDKSpan,
26
+ TracerProvider as SDKTracerProvider
27
+ )
28
+ from opentelemetry.context import Context as OTLPContext
29
+ from opentelemetry.semconv.trace import SpanAttributes
30
+ from opentelemetry.sdk.trace.export import BatchSpanProcessor, SimpleSpanProcessor
31
+
32
+ from aworld.trace.base import (
33
+ AttributeValueType,
34
+ NoOpTracer,
35
+ SpanType,
36
+ TraceProvider,
37
+ Tracer,
38
+ Span,
39
+ TraceContext,
40
+ set_tracer_provider
41
+ )
42
+ from aworld.trace.span_cosumer import SpanConsumer
43
+ from aworld.trace.propagator import get_global_trace_context
44
+ from aworld.trace.baggage.sofa_tracer import SofaSpanHelper
45
+ from aworld.logs.util import logger
46
+ from aworld.utils.common import get_local_ip
47
+ from .memory_storage import InMemorySpanExporter, InMemoryStorage
48
+ from ..constants import ATTRIBUTES_MESSAGE_KEY
49
+ from .export import FileSpanExporter, NoOpSpanExporter, SpanConsumerExporter
50
+ from ..server import set_trace_server
51
+
52
+
53
+ class OTLPTraceProvider(TraceProvider):
54
+ """A TraceProvider that wraps an existing `SDKTracerProvider`.
55
+ This class provides a way to use a `SDKTracerProvider` as a `TraceProvider`.
56
+ When the context manager is entered, it returns the `SDKTracerProvider` itself.
57
+ When the context manager is exited, it calls `shutdown` on the `SDKTracerProvider`.
58
+ Args:
59
+ provider: The internal provider to wrap.
60
+ """
61
+
62
+ def __init__(self, provider: SDKTracerProvider, suppressed_scopes: Optional[set[str]] = None):
63
+ self._provider: SDKTracerProvider = provider
64
+ self._suppressed_scopes = set()
65
+ if suppressed_scopes:
66
+ self._suppressed_scopes.update(suppressed_scopes)
67
+ self._lock: Lock = Lock()
68
+
69
+ def get_tracer(
70
+ self,
71
+ name: str,
72
+ version: Optional[str] = None
73
+ ):
74
+ with self._lock:
75
+ if name in self._suppressed_scopes:
76
+ return NoOpTracer()
77
+ else:
78
+ tracer = self._provider.get_tracer(instrumenting_module_name=name,
79
+ instrumenting_library_version=version)
80
+ return OTLPTracer(tracer)
81
+
82
+ def shutdown(self) -> None:
83
+ with self._lock:
84
+ if isinstance(self._provider, SDKTracerProvider):
85
+ self._provider.shutdown()
86
+
87
+ def force_flush(self, timeout: Optional[float] = None) -> bool:
88
+ with self._lock:
89
+ if isinstance(self._provider, SDKTracerProvider):
90
+ return self._provider.force_flush(timeout)
91
+ else:
92
+ return False
93
+
94
+ def get_current_span(self) -> Optional["Span"]:
95
+ otlp_span = get_current_otlp_span()
96
+ return OTLPSpan(otlp_span, is_new_span=False)
97
+
98
+
99
+ class OTLPTracer(Tracer):
100
+ """A Tracer represents a collection of Spans.
101
+ Args:
102
+ tracer: The internal tracer to wrap.
103
+ """
104
+
105
+ def __init__(self, tracer: SDKTracer):
106
+ self._tracer = tracer
107
+
108
+ def start_span(
109
+ self,
110
+ name: str,
111
+ span_type: SpanType = SpanType.INTERNAL,
112
+ attributes: dict[str, AttributeValueType] = None,
113
+ start_time: Optional[int] = None,
114
+ record_exception: bool = True,
115
+ set_status_on_exception: bool = True,
116
+ trace_context: Optional[TraceContext] = None
117
+ ) -> "Span":
118
+ otel_context = None
119
+ trace_context = trace_context or get_global_trace_context().get_and_clear()
120
+ if trace_context:
121
+ otel_context = self._get_otel_context_from_trace_context(
122
+ trace_context)
123
+ start_time = start_time or time.time_ns()
124
+ attributes = {**(attributes or {})}
125
+ attributes.setdefault(ATTRIBUTES_MESSAGE_KEY, name)
126
+ SofaSpanHelper.set_sofa_context_to_attr(attributes)
127
+ attributes = {k: v for k, v in attributes.items(
128
+ ) if is_valid_attribute_value(k, v)}
129
+
130
+ span_kind = self._convert_to_span_kind(
131
+ span_type) if span_type else SpanKind.INTERNAL
132
+ span = self._tracer.start_span(name=name,
133
+ kind=span_kind,
134
+ context=otel_context,
135
+ attributes=attributes,
136
+ start_time=start_time,
137
+ record_exception=record_exception,
138
+ set_status_on_exception=set_status_on_exception)
139
+ return OTLPSpan(span)
140
+
141
+ def start_as_current_span(
142
+ self,
143
+ name: str,
144
+ span_type: SpanType = SpanType.INTERNAL,
145
+ attributes: dict[str, AttributeValueType] = None,
146
+ start_time: Optional[int] = None,
147
+ record_exception: bool = True,
148
+ set_status_on_exception: bool = True,
149
+ end_on_exit: bool = True,
150
+ trace_context: Optional[TraceContext] = None
151
+ ) -> Iterator["Span"]:
152
+
153
+ start_time = start_time or time.time_ns()
154
+ attributes = {**(attributes or {})}
155
+ attributes.setdefault(ATTRIBUTES_MESSAGE_KEY, name)
156
+ SofaSpanHelper.set_sofa_context_to_attr(attributes)
157
+ attributes = {k: v for k, v in attributes.items(
158
+ ) if is_valid_attribute_value(k, v)}
159
+
160
+ span_kind = self._convert_to_span_kind(
161
+ span_type) if span_type else SpanKind.INTERNAL
162
+ otel_context = None
163
+ trace_context = trace_context or get_global_trace_context().get_and_clear()
164
+ if trace_context:
165
+ otel_context = self._get_otel_context_from_trace_context(
166
+ trace_context)
167
+
168
+ class _OTLPSpanContextManager:
169
+ def __init__(self, tracer: SDKTracer):
170
+ self._span_cm = None
171
+ self._tracer = tracer
172
+
173
+ def __enter__(self):
174
+ self._span_cm = self._tracer.start_as_current_span(
175
+ name=name,
176
+ kind=span_kind,
177
+ context=otel_context,
178
+ attributes=attributes,
179
+ start_time=start_time,
180
+ record_exception=record_exception,
181
+ set_status_on_exception=set_status_on_exception,
182
+ end_on_exit=end_on_exit
183
+ )
184
+ inner_span = self._span_cm.__enter__()
185
+ return OTLPSpan(inner_span)
186
+
187
+ def __exit__(self, exc_type, exc_val, exc_tb):
188
+ return self._span_cm.__exit__(exc_type, exc_val, exc_tb)
189
+
190
+ return _OTLPSpanContextManager(self._tracer)
191
+
192
+ def _convert_to_span_kind(self, span_type: SpanType) -> str:
193
+ if span_type == SpanType.INTERNAL:
194
+ return SpanKind.INTERNAL
195
+ elif span_type == SpanType.CLIENT:
196
+ return SpanKind.CLIENT
197
+ elif span_type == SpanType.SERVER:
198
+ return SpanKind.SERVER
199
+ elif span_type == SpanType.PRODUCER:
200
+ return SpanKind.PRODUCER
201
+ elif span_type == SpanType.CONSUMER:
202
+ return SpanKind.CONSUMER
203
+ else:
204
+ return SpanKind.INTERNAL
205
+
206
+ def _get_otel_context_from_trace_context(self, trace_context: TraceContext) -> OTLPContext:
207
+ trace_flags = None
208
+ if trace_context.trace_flags:
209
+ trace_flags = TraceFlags(int(trace_context.trace_flags, 16))
210
+ otel_context = otlp_context_api.Context()
211
+ return set_span_in_context(
212
+ NonRecordingSpan(
213
+ SpanContext(
214
+ trace_id=int(trace_context.trace_id, 16),
215
+ span_id=int(trace_context.span_id, 16),
216
+ is_remote=True,
217
+ trace_flags=trace_flags
218
+ )
219
+ ),
220
+ otel_context,
221
+ )
222
+
223
+
224
+ class OTLPSpan(Span, ReadableSpan):
225
+ """A Span represents a single operation within a trace.
226
+ """
227
+
228
+ def __init__(self, span: SDKSpan, is_new_span=True):
229
+ self._span = span
230
+ self._token: Optional[Token[OTLPContext]] = None
231
+ if is_new_span:
232
+ self._attach()
233
+ self._add_to_open_spans()
234
+
235
+ if not TYPE_CHECKING: # pragma: no branch
236
+ def __getattr__(self, name: str) -> Any:
237
+ return getattr(self._span, name)
238
+
239
+ def end(self, end_time: Optional[int] = None) -> None:
240
+ self._remove_from_open_spans()
241
+ end_time = end_time or time.time_ns()
242
+ if not self._span._status or self._span._status.status_code == StatusCode.UNSET:
243
+ self._span.set_status(
244
+ status=StatusCode.OK,
245
+ description="",
246
+ )
247
+ self._span.end(end_time=end_time)
248
+ self._detach()
249
+
250
+ def set_attribute(self, key: str, value: Any) -> None:
251
+ if not is_valid_attribute_value(key, value):
252
+ return
253
+ self._span.set_attribute(key=key, value=value)
254
+
255
+ def set_attributes(self, attributes: dict[str, Any]) -> None:
256
+ attributes = {k: v for k, v in attributes.items(
257
+ ) if is_valid_attribute_value(k, v)}
258
+ self._span.set_attributes(attributes=attributes)
259
+
260
+ def is_recording(self) -> bool:
261
+ return self._span.is_recording()
262
+
263
+ def record_exception(
264
+ self,
265
+ exception: BaseException,
266
+ attributes: dict[str, Any] = None,
267
+ timestamp: Optional[int] = None,
268
+ escaped: bool = False,
269
+ ) -> None:
270
+ timestamp = timestamp or time.time_ns()
271
+ attributes = {**(attributes or {})}
272
+
273
+ stacktrace = ''.join(traceback.format_exception(
274
+ type(exception), exception, exception.__traceback__))
275
+ self._span.set_attributes({
276
+ SpanAttributes.EXCEPTION_STACKTRACE: stacktrace,
277
+ SpanAttributes.EXCEPTION_TYPE: type(exception).__name__,
278
+ SpanAttributes.EXCEPTION_MESSAGE: str(exception),
279
+ SpanAttributes.EXCEPTION_ESCAPED: escaped
280
+ })
281
+ if exception is not sys.exc_info()[1]:
282
+ attributes[SpanAttributes.EXCEPTION_STACKTRACE] = stacktrace
283
+
284
+ self._span.record_exception(exception=exception,
285
+ attributes=attributes,
286
+ timestamp=timestamp,
287
+ escaped=escaped)
288
+ self._span.set_status(
289
+ status=StatusCode.ERROR,
290
+ description=str(exception),
291
+ )
292
+
293
+ def get_trace_id(self) -> str:
294
+ """Get the trace ID of the span.
295
+ Returns:
296
+ The trace ID of the span.
297
+ """
298
+ if not self._span or not self._span.get_span_context() or not self.is_recording():
299
+ return None
300
+ return f"{self._span.get_span_context().trace_id:032x}"
301
+
302
+ def get_span_id(self) -> str:
303
+ """Get the span ID of the span.
304
+ Returns:
305
+ The span ID of the span.
306
+ """
307
+ if not self._span or not self._span.get_span_context() or not self.is_recording():
308
+ return None
309
+ return f"{self._span.get_span_context().span_id:016x}"
310
+
311
+ def _attach(self):
312
+ if self._token is not None:
313
+ return
314
+ self._token = otlp_context_api.attach(set_span_in_context(self._span))
315
+
316
+ def _detach(self):
317
+ if self._token is None:
318
+ return
319
+ try:
320
+ otlp_context_api.detach(self._token)
321
+ except ValueError as e:
322
+ logger.warning(f"Failed to detach context: {e}")
323
+ finally:
324
+ self._token = None
325
+
326
+
327
+ def configure_otlp_provider(
328
+ backends: Sequence[str] = None,
329
+ base_url: str = None,
330
+ write_token: str = None,
331
+ span_consumers: Optional[Sequence[SpanConsumer]] = None,
332
+ **kwargs
333
+ ) -> None:
334
+ """Configure the OTLP provider.
335
+ Args:
336
+ backend: The backend to use.
337
+ write_token: The write token to use.
338
+ **kwargs: Additional keyword arguments to pass to the provider.
339
+ """
340
+ from aworld.metrics.opentelemetry.opentelemetry_adapter import build_otel_resource
341
+ backends = backends or ["logfire"]
342
+ processor = SynchronousMultiSpanProcessor()
343
+ processor.add_span_processor(BatchSpanProcessor(
344
+ SpanConsumerExporter(span_consumers)))
345
+ for backend in backends:
346
+ if backend == "logfire":
347
+ span_exporter = _configure_logfire_exporter(
348
+ write_token=write_token, base_url=base_url, **kwargs)
349
+ processor.add_span_processor(BatchSpanProcessor(span_exporter))
350
+ elif backend == "console":
351
+ from opentelemetry.sdk.trace.export import ConsoleSpanExporter
352
+ processor.add_span_processor(
353
+ BatchSpanProcessor(ConsoleSpanExporter()))
354
+ elif backend == "file":
355
+ timestamp = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
356
+ file_path = kwargs.get("file_path", f"traces_{timestamp}.json")
357
+ processor.add_span_processor(
358
+ BatchSpanProcessor(FileSpanExporter(file_path)))
359
+ elif backend == "memory":
360
+ logger.info("Using in-memory storage for traces.")
361
+ storage = kwargs.get(
362
+ "storage", InMemoryStorage())
363
+ processor.add_span_processor(
364
+ SimpleSpanProcessor(InMemorySpanExporter(storage=storage)))
365
+ server_enabled = kwargs.get("server_enabled") or os.getenv(
366
+ "START_TRACE_SERVER") or "true"
367
+ server_port = kwargs.get("server_port") or 7079
368
+ if (server_enabled.lower() == "true"):
369
+ logger.info(f"Starting trace server on port {server_port}.")
370
+ set_trace_server(storage=storage, port=int(
371
+ server_port), start_server=True)
372
+ else:
373
+ logger.info("Trace server is not started.")
374
+ set_trace_server(storage=storage, port=int(
375
+ server_port), start_server=False)
376
+ else:
377
+ span_exporter = _configure_otlp_exporter(
378
+ base_url=base_url, **kwargs)
379
+ processor.add_span_processor(BatchSpanProcessor(span_exporter))
380
+
381
+ set_tracer_provider(OTLPTraceProvider(SDKTracerProvider(active_span_processor=processor,
382
+ resource=build_otel_resource())))
383
+
384
+
385
+ def _configure_logfire_exporter(write_token: str, base_url: str = None) -> None:
386
+ """Configure the Logfire exporter.
387
+ Args:
388
+ write_token: The write token to use.
389
+ base_url: The base URL to use.
390
+ **kwargs: Additional keyword arguments to pass to the exporter.
391
+ """
392
+ from opentelemetry.exporter.otlp.proto.http import Compression
393
+ from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
394
+
395
+ base_url = base_url or "https://logfire-us.pydantic.dev"
396
+ headers = {'User-Agent': f'logfire/3.14.0', 'Authorization': write_token}
397
+ session = requests.Session()
398
+ session.headers.update(headers)
399
+ return OTLPSpanExporter(
400
+ endpoint=urljoin(base_url, '/v1/traces'),
401
+ session=session,
402
+ compression=Compression.Gzip,
403
+ )
404
+
405
+
406
+ def _configure_otlp_exporter(base_url: str = None, **kwargs) -> None:
407
+ """Configure the OTLP exporter.
408
+ Args:
409
+ write_token: The write token to use.
410
+ base_url: The base URL to use.
411
+ **kwargs: Additional keyword arguments to pass to the exporter.
412
+ """
413
+ import requests
414
+ from opentelemetry.exporter.otlp.proto.http import Compression
415
+ from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
416
+
417
+ otlp_traces_endpoint = os.getenv("OTLP_TRACES_ENDPOINT")
418
+ base_url = base_url or otlp_traces_endpoint
419
+ session = requests.Session()
420
+ return OTLPSpanExporter(
421
+ endpoint=base_url,
422
+ session=session,
423
+ compression=Compression.Gzip,
424
+ )
425
+
426
+
427
+ def is_valid_attribute_value(k, v):
428
+ valid = True
429
+ if not v:
430
+ valid = False
431
+ valid = isinstance(v, (str, bool, int, float)) or \
432
+ (isinstance(v, Sequence) and
433
+ all(isinstance(i, (str, bool, int, float)) for i in v))
434
+ if not valid:
435
+ logger.warning(f"value of attribute[{k}] is invalid: {v}")
436
+ return valid