Spaces:
Sleeping
Sleeping
Upload 2 files
Browse files- aworld/trace/baggage/sofa_tracer.py +128 -0
- aworld/trace/baggage/w3c.py +65 -0
aworld/trace/baggage/sofa_tracer.py
ADDED
@@ -0,0 +1,128 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
from aworld.trace.base import Propagator, Carrier, TraceContext
|
2 |
+
from aworld.trace.baggage import BaggageContext
|
3 |
+
from aworld.logs.util import logger
|
4 |
+
from aworld.trace.base import AttributeValueType
|
5 |
+
|
6 |
+
|
7 |
+
class SofaTracerBaggagePropagator(Propagator):
|
8 |
+
"""
|
9 |
+
Sofa tracer baggage propagator.
|
10 |
+
"""
|
11 |
+
|
12 |
+
_TRACE_ID_HEDER_NAMES = ["SOFA-TraceId", "sofaTraceId"]
|
13 |
+
_SPAN_ID_HEDER_NAMES = ["SOFA-RpcId", "sofaRpcId"]
|
14 |
+
_PEN_ATTRS_HEDER_NAME = "sofaPenAttrs"
|
15 |
+
_SYS_PEN_ATTRS_HEDER_NAME = "sysPenAttrs"
|
16 |
+
|
17 |
+
_TRACE_ID_BAGGAGE_KEY = "attributes.sofa.traceid"
|
18 |
+
_SPAN_ID_BAGGAGE_KEY = "attributes.sofa.rpcid"
|
19 |
+
_PEN_ATTRS_BAGGAGE_KEY = "attributes.sofa.penattrs"
|
20 |
+
_SYS_PEN_ATTRS_BAGGAGE_KEY = "attributes.sofa.syspenattrs"
|
21 |
+
|
22 |
+
def extract(self, carrier: Carrier):
|
23 |
+
"""
|
24 |
+
Extract trace context from carrier.
|
25 |
+
Args:
|
26 |
+
carrier: The carrier to extract trace context from.
|
27 |
+
Returns:
|
28 |
+
A dict of trace context.
|
29 |
+
"""
|
30 |
+
trace_id = None
|
31 |
+
span_id = None
|
32 |
+
for name in self._TRACE_ID_HEDER_NAMES:
|
33 |
+
trace_id = self._get_value(carrier, name)
|
34 |
+
if trace_id:
|
35 |
+
break
|
36 |
+
for name in self._SPAN_ID_HEDER_NAMES:
|
37 |
+
span_id = self._get_value(carrier, name)
|
38 |
+
if span_id:
|
39 |
+
break
|
40 |
+
pen_attrs = self._get_value(carrier, self._PEN_ATTRS_HEDER_NAME)
|
41 |
+
sys_pen_attrs = self._get_value(
|
42 |
+
carrier, self._SYS_PEN_ATTRS_HEDER_NAME)
|
43 |
+
|
44 |
+
logger.info(
|
45 |
+
f"extract trace_id: {trace_id}, span_id: {span_id}, pen_attrs: {pen_attrs}, sys_pen_attrs: {sys_pen_attrs}")
|
46 |
+
if trace_id and span_id:
|
47 |
+
BaggageContext.set_baggage(self._TRACE_ID_BAGGAGE_KEY, trace_id)
|
48 |
+
span_id = span_id + ".1"
|
49 |
+
BaggageContext.set_baggage(self._SPAN_ID_BAGGAGE_KEY, span_id)
|
50 |
+
if pen_attrs:
|
51 |
+
BaggageContext.set_baggage(
|
52 |
+
self._PEN_ATTRS_BAGGAGE_KEY, pen_attrs)
|
53 |
+
if sys_pen_attrs:
|
54 |
+
BaggageContext.set_baggage(
|
55 |
+
self._SYS_PEN_ATTRS_BAGGAGE_KEY, sys_pen_attrs)
|
56 |
+
|
57 |
+
def inject(self, trace_context: TraceContext, carrier: Carrier):
|
58 |
+
"""
|
59 |
+
Inject trace context to carrier.
|
60 |
+
Args:
|
61 |
+
trace_context: The trace context to inject.
|
62 |
+
carrier: The carrier to inject trace context to.
|
63 |
+
"""
|
64 |
+
baggage = BaggageContext.get_baggage()
|
65 |
+
|
66 |
+
if baggage:
|
67 |
+
trace_id = baggage.get(self._TRACE_ID_BAGGAGE_KEY)
|
68 |
+
span_id = baggage.get(self._SPAN_ID_BAGGAGE_KEY)
|
69 |
+
if trace_id and span_id:
|
70 |
+
carrier.set(self._TRACE_ID_HEDER_NAMES[0], trace_id)
|
71 |
+
carrier.set(self._SPAN_ID_HEDER_NAMES[0], span_id)
|
72 |
+
|
73 |
+
pen_attrs_dict = {}
|
74 |
+
for key, value in baggage.items():
|
75 |
+
if key == self._TRACE_ID_BAGGAGE_KEY or key == self._SPAN_ID_BAGGAGE_KEY:
|
76 |
+
continue
|
77 |
+
if key == self._PEN_ATTRS_BAGGAGE_KEY and value:
|
78 |
+
pen_attrs_dict.update(dict(item.split("=")
|
79 |
+
for item in value.split("&")))
|
80 |
+
continue
|
81 |
+
if key == self._SYS_PEN_ATTRS_BAGGAGE_KEY and value:
|
82 |
+
carrier.set(self._SYS_PEN_ATTRS_HEDER_NAME, value)
|
83 |
+
continue
|
84 |
+
|
85 |
+
# other baggage items will be injected to sofaPenAttrs
|
86 |
+
pen_attrs_dict.update({key: value})
|
87 |
+
|
88 |
+
if pen_attrs_dict:
|
89 |
+
pen_attrs = "&".join(f"{key}={value}"
|
90 |
+
for key, value in pen_attrs_dict.items())
|
91 |
+
carrier.set(self._PEN_ATTRS_HEDER_NAME, pen_attrs)
|
92 |
+
|
93 |
+
|
94 |
+
class SofaSpanHelper:
|
95 |
+
"""
|
96 |
+
Sofa span helper.
|
97 |
+
"""
|
98 |
+
|
99 |
+
@staticmethod
|
100 |
+
def set_sofa_context_to_attr(span_attributes: dict[str, AttributeValueType]):
|
101 |
+
"""
|
102 |
+
Set sofa context to span attributes.
|
103 |
+
Args:
|
104 |
+
span_attributes: The span attributes to set sofa context to.
|
105 |
+
"""
|
106 |
+
baggage = BaggageContext.get_baggage()
|
107 |
+
if baggage:
|
108 |
+
trace_id = baggage.get(
|
109 |
+
SofaTracerBaggagePropagator._TRACE_ID_BAGGAGE_KEY)
|
110 |
+
span_id = baggage.get(
|
111 |
+
SofaTracerBaggagePropagator._SPAN_ID_BAGGAGE_KEY)
|
112 |
+
if trace_id and span_id:
|
113 |
+
span_attributes.update({
|
114 |
+
SofaTracerBaggagePropagator._TRACE_ID_BAGGAGE_KEY: trace_id,
|
115 |
+
SofaTracerBaggagePropagator._SPAN_ID_BAGGAGE_KEY: span_id
|
116 |
+
})
|
117 |
+
pen_attrs = baggage.get(
|
118 |
+
SofaTracerBaggagePropagator._PEN_ATTRS_BAGGAGE_KEY)
|
119 |
+
if pen_attrs:
|
120 |
+
span_attributes.update({
|
121 |
+
SofaTracerBaggagePropagator._PEN_ATTRS_BAGGAGE_KEY: pen_attrs
|
122 |
+
})
|
123 |
+
sys_pen_attrs = baggage.get(
|
124 |
+
SofaTracerBaggagePropagator._SYS_PEN_ATTRS_BAGGAGE_KEY)
|
125 |
+
if sys_pen_attrs:
|
126 |
+
span_attributes.update({
|
127 |
+
SofaTracerBaggagePropagator._SYS_PEN_ATTRS_BAGGAGE_KEY: sys_pen_attrs
|
128 |
+
})
|
aworld/trace/baggage/w3c.py
ADDED
@@ -0,0 +1,65 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import re
|
2 |
+
from typing import List
|
3 |
+
from aworld.trace.base import Propagator, Carrier, TraceContext
|
4 |
+
from aworld.trace.baggage import BaggageContext
|
5 |
+
from aworld.logs.util import logger
|
6 |
+
from urllib.parse import quote_plus, unquote_plus
|
7 |
+
|
8 |
+
|
9 |
+
class W3CBaggagePropagator(Propagator):
|
10 |
+
"""
|
11 |
+
W3C baggage propagator.
|
12 |
+
"""
|
13 |
+
|
14 |
+
_MAX_HEADER_LENGTH = 8192
|
15 |
+
_MAX_PAIR_LENGTH = 4096
|
16 |
+
_MAX_PAIRS = 180
|
17 |
+
_BAGGAGE_HEADER_NAME = "baggage"
|
18 |
+
_DELIMITER_PATTERN = re.compile(r"[ \t]*,[ \t]*")
|
19 |
+
|
20 |
+
def extract(self, carrier: Carrier):
|
21 |
+
"""
|
22 |
+
Extract the trace context from the carrier.
|
23 |
+
Args:
|
24 |
+
carrier: The carrier to extract the trace context from.
|
25 |
+
"""
|
26 |
+
baggage_header = self._get_value(carrier, self._BAGGAGE_HEADER_NAME)
|
27 |
+
if not baggage_header:
|
28 |
+
return None
|
29 |
+
|
30 |
+
if len(baggage_header) > self._MAX_HEADER_LENGTH:
|
31 |
+
logger.warning(
|
32 |
+
f"baggage header length exceeds {self._MAX_HEADER_LENGTH}")
|
33 |
+
return None
|
34 |
+
|
35 |
+
baggage_entries: List[str] = re.split(
|
36 |
+
self._DELIMITER_PATTERN, baggage_header)
|
37 |
+
if len(baggage_entries) > self._MAX_PAIRS:
|
38 |
+
logger.warning(f"baggage entries exceeds {self._MAX_PAIRS}")
|
39 |
+
|
40 |
+
for entry in baggage_entries:
|
41 |
+
if len(entry) > self._MAX_PAIR_LENGTH:
|
42 |
+
logger.warning(
|
43 |
+
f"baggage entry length exceeds {self._MAX_PAIR_LENGTH}")
|
44 |
+
continue
|
45 |
+
try:
|
46 |
+
key, value = entry.split("=", 1)
|
47 |
+
key = unquote_plus(key).strip()
|
48 |
+
value = unquote_plus(value).strip()
|
49 |
+
except ValueError:
|
50 |
+
logger.warning(f"baggage entry format error: {entry}")
|
51 |
+
continue
|
52 |
+
BaggageContext.set_baggage(key, value)
|
53 |
+
|
54 |
+
def inject(self, carrier: Carrier, context: TraceContext):
|
55 |
+
"""
|
56 |
+
Inject the trace context into the carrier.
|
57 |
+
Args:
|
58 |
+
carrier: The carrier to inject the trace context into.
|
59 |
+
context: The trace context to inject.
|
60 |
+
"""
|
61 |
+
baggage = BaggageContext.get_baggage()
|
62 |
+
if baggage:
|
63 |
+
baggage_header = ",".join(
|
64 |
+
f"{quote_plus(key)}={quote_plus(value)}" for key, value in baggage.items())
|
65 |
+
carrier.set(self._BAGGAGE_HEADER_NAME, baggage_header)
|