Duibonduil commited on
Commit
b59e348
·
verified ·
1 Parent(s): 6b4889b

Upload __init__.py

Browse files
Files changed (1) hide show
  1. aworld/trace/__init__.py +333 -0
aworld/trace/__init__.py ADDED
@@ -0,0 +1,333 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import wrapt
2
+ import time
3
+ import openai
4
+ import traceback
5
+ import aworld.trace.instrumentation.semconv as semconv
6
+ from typing import Collection, Any, Union
7
+ from aworld.trace.instrumentation import Instrumentor
8
+ from aworld.trace.base import (
9
+ Tracer,
10
+ SpanType,
11
+ get_tracer_provider_silent
12
+ )
13
+ from aworld.trace.constants import ATTRIBUTES_MESSAGE_RUN_TYPE_KEY, RunType
14
+ from aworld.trace.instrumentation.openai.inout_parse import (
15
+ run_async,
16
+ handle_openai_request,
17
+ is_streaming_response,
18
+ record_stream_response_chunk,
19
+ parse_openai_response,
20
+ record_stream_token_usage,
21
+ model_as_dict,
22
+ parse_response_message,
23
+ )
24
+ from aworld.trace.instrumentation.llm_metrics import (
25
+ record_exception_metric,
26
+ record_chat_response_metric,
27
+ record_streaming_time_to_first_token,
28
+ record_streaming_time_to_generate
29
+ )
30
+ from aworld.logs.util import logger
31
+
32
+
33
+ def _chat_wrapper(tracer: Tracer):
34
+
35
+ @wrapt.decorator
36
+ def wrapper(wrapped, instance, args, kwargs):
37
+ model_name = kwargs.get("model", "")
38
+ if not model_name:
39
+ model_name = "OpenAI"
40
+ span_attributes = {}
41
+ span_attributes[ATTRIBUTES_MESSAGE_RUN_TYPE_KEY] = RunType.LLM.value
42
+
43
+ span = tracer.start_span(
44
+ name=model_name, span_type=SpanType.CLIENT, attributes=span_attributes)
45
+
46
+ run_async(handle_openai_request(span, kwargs, instance))
47
+ start_time = time.time()
48
+ try:
49
+ response = wrapped(*args, **kwargs)
50
+ except Exception as e:
51
+ record_exception(span=span,
52
+ start_time=start_time,
53
+ exception=e
54
+ )
55
+ span.end()
56
+ raise e
57
+
58
+ if is_streaming_response(response):
59
+ return WrappedStreamResponse(span=span,
60
+ response=response,
61
+ instance=instance,
62
+ start_time=start_time,
63
+ request_kwargs=kwargs
64
+ )
65
+
66
+ record_completion(span=span,
67
+ start_time=start_time,
68
+ response=response,
69
+ request_kwargs=kwargs,
70
+ instance=instance
71
+ )
72
+ span.end()
73
+ return response
74
+
75
+ return wrapper
76
+
77
+
78
+ def _achat_class_wrapper(tracer: Tracer):
79
+
80
+ async def awrapper(wrapped, instance, args, kwargs):
81
+ model_name = kwargs.get("model", "")
82
+ if not model_name:
83
+ model_name = "OpenAI"
84
+ span_attributes = {}
85
+ span_attributes[ATTRIBUTES_MESSAGE_RUN_TYPE_KEY] = RunType.LLM.value
86
+
87
+ span = tracer.start_span(
88
+ name=model_name, span_type=SpanType.CLIENT, attributes=span_attributes)
89
+
90
+ await handle_openai_request(span, kwargs, instance)
91
+ start_time = time.time()
92
+ try:
93
+ response = await wrapped(*args, **kwargs)
94
+ except Exception as e:
95
+ record_exception(span=span,
96
+ start_time=start_time,
97
+ exception=e
98
+ )
99
+ span.end()
100
+ raise e
101
+
102
+ if is_streaming_response(response):
103
+ return WrappedStreamResponse(span=span,
104
+ response=response,
105
+ instance=instance,
106
+ start_time=start_time,
107
+ request_kwargs=kwargs
108
+ )
109
+ record_completion(span=span,
110
+ start_time=start_time,
111
+ response=response,
112
+ request_kwargs=kwargs,
113
+ instance=instance
114
+ )
115
+ span.end()
116
+ return response
117
+
118
+ return awrapper
119
+
120
+
121
+ def _achat_instance_wrapper(tracer: Tracer):
122
+
123
+ @wrapt.decorator
124
+ async def _awrapper(wrapped, instance, args, kwargs):
125
+ wrapper_func = _achat_class_wrapper(tracer)
126
+ return await wrapper_func(wrapped, instance, args, kwargs)
127
+
128
+ return _awrapper
129
+
130
+
131
+ def record_exception(span, start_time, exception):
132
+ '''
133
+ record openai chat exception to trace and metrics
134
+ '''
135
+ try:
136
+ duration = time.time() - start_time if "start_time" in locals() else 0
137
+ if span.is_recording:
138
+ span.record_exception(exception=exception)
139
+ record_exception_metric(exception=exception, duration=duration)
140
+ except Exception as e:
141
+ logger.warning(f"openai instrument record exception error.{e}")
142
+
143
+
144
+ def record_completion(span,
145
+ start_time,
146
+ response,
147
+ request_kwargs,
148
+ instance):
149
+ '''
150
+ Record chat completion to trace and metrics
151
+ '''
152
+ duration = time.time() - start_time if "start_time" in locals() else 0
153
+ response_dict = model_as_dict(response)
154
+ attributes = parse_openai_response(
155
+ response_dict, request_kwargs, instance, False)
156
+ usage = response_dict.get("usage")
157
+ choices = response_dict.get("choices")
158
+ prompt_tokens = usage.get("prompt_tokens")
159
+ completion_tokens = usage.get("completion_tokens")
160
+
161
+ span_attributes = {
162
+ **attributes,
163
+ semconv.GEN_AI_USAGE_INPUT_TOKENS: prompt_tokens,
164
+ semconv.GEN_AI_USAGE_OUTPUT_TOKENS: completion_tokens,
165
+ semconv.GEN_AI_DURATION: duration
166
+ }
167
+ span_attributes.update(parse_response_message(choices))
168
+ span.set_attributes(span_attributes)
169
+ record_chat_response_metric(attributes=attributes,
170
+ prompt_tokens=prompt_tokens,
171
+ completion_tokens=completion_tokens,
172
+ duration=duration,
173
+ choices=choices
174
+ )
175
+
176
+
177
+ class WrappedStreamResponse(wrapt.ObjectProxy):
178
+
179
+ def __init__(
180
+ self,
181
+ span,
182
+ response,
183
+ instance=None,
184
+ start_time=None,
185
+ request_kwargs=None
186
+ ):
187
+ super().__init__(response)
188
+ self._span = span
189
+ self._instance = instance
190
+ self._start_time = start_time
191
+ self._complete_response = {"choices": [], "model": ""}
192
+ self._first_token_recorded = False
193
+ self._request_kwargs = request_kwargs
194
+
195
+ def __enter__(self):
196
+ return self
197
+
198
+ def __exit__(self, exc_type, exc_val, exc_tb):
199
+ self.__wrapped__.__exit__(exc_type, exc_val, exc_tb)
200
+
201
+ async def __aenter__(self):
202
+ return self
203
+
204
+ async def __aexit__(self, exc_type, exc_val, exc_tb):
205
+ await self.__wrapped__.__aexit__(exc_type, exc_val, exc_tb)
206
+
207
+ def __iter__(self):
208
+ return self
209
+
210
+ def __aiter__(self):
211
+ return self
212
+
213
+ def __next__(self):
214
+ try:
215
+ chunk = self.__wrapped__.__next__()
216
+ except Exception as e:
217
+ if isinstance(e, StopIteration):
218
+ self._close_span()
219
+ raise e
220
+ else:
221
+ self._process_stream_chunk(chunk)
222
+ return chunk
223
+
224
+ async def __anext__(self):
225
+ try:
226
+ chunk = await self.__wrapped__.__anext__()
227
+ except Exception as e:
228
+ if isinstance(e, StopAsyncIteration):
229
+ self._close_span()
230
+ raise e
231
+ else:
232
+ self._process_stream_chunk(chunk)
233
+ return chunk
234
+
235
+ def _process_stream_chunk(self, chunk):
236
+ record_stream_response_chunk(chunk, self._complete_response)
237
+ if not self._first_token_recorded:
238
+ self._time_of_first_token = time.time()
239
+ duration = self._time_of_first_token - self._start_time
240
+ attribute = parse_openai_response(
241
+ self._complete_response, self._request_kwargs, self._instance, True)
242
+ record_streaming_time_to_first_token(duration, attribute)
243
+ self._first_token_recorded = True
244
+
245
+ def _close_span(self):
246
+ duration = None
247
+ first_token_duration = None
248
+ first_token_to_generate_duration = None
249
+ if self._start_time and isinstance(self._start_time, (float, int)):
250
+ duration = time.time() - self._start_time
251
+ if self._time_of_first_token and self._start_time and isinstance(self._start_time, (float, int)):
252
+ first_token_duration = self._time_of_first_token - self._start_time
253
+ first_token_to_generate_duration = time.time() - self._time_of_first_token
254
+ prompt_usage, completion_usage = record_stream_token_usage(
255
+ self._complete_response, self._request_kwargs)
256
+ attributes = parse_openai_response(
257
+ self._complete_response, self._request_kwargs, self._instance, True)
258
+ choices = self._complete_response.get("choices")
259
+ span_attributes = {
260
+ **attributes,
261
+ "llm.prompt_tokens": prompt_usage,
262
+ "llm.completion_tokens": completion_usage,
263
+ "llm.duration": duration,
264
+ "llm.first_token_duration": first_token_duration
265
+ }
266
+ span_attributes.update(parse_response_message(choices))
267
+ self._span.set_attributes(span_attributes)
268
+ record_chat_response_metric(attributes=attributes,
269
+ prompt_tokens=prompt_usage,
270
+ completion_tokens=completion_usage,
271
+ duration=duration,
272
+ choices=choices
273
+ )
274
+ record_streaming_time_to_generate(
275
+ first_token_to_generate_duration, attributes)
276
+
277
+ self._span.end()
278
+
279
+
280
+ class OpenAIInstrumentor(Instrumentor):
281
+
282
+ def instrumentation_dependencies(self) -> Collection[str]:
283
+ return ("openai >= 1.0.0",)
284
+
285
+ def _instrument(self, **kwargs):
286
+ tracer_provider = kwargs.get("tracer_provider")
287
+ tracer = tracer_provider.get_tracer(
288
+ "aworld.trace.instrumentation.openai")
289
+
290
+ wrapt.wrap_function_wrapper(
291
+ "openai.resources.chat.completions",
292
+ "Completions.create",
293
+ _chat_wrapper(tracer=tracer)
294
+ )
295
+
296
+ wrapt.wrap_function_wrapper(
297
+ "openai.resources.chat.completions",
298
+ "AsyncCompletions.create",
299
+ _achat_class_wrapper(tracer)
300
+ )
301
+
302
+ def _instrument(self, **kwargs: Any):
303
+ pass
304
+
305
+
306
+ def wrap_openai(client: Union[openai.OpenAI, openai.AsyncOpenAI]):
307
+ """Patch the OpenAI client to make it traceable.
308
+ Example:
309
+ client = wrap_openai(openai.OpenAI())
310
+ """
311
+ try:
312
+ tracer_provider = get_tracer_provider_silent()
313
+ if not tracer_provider:
314
+ return
315
+ tracer = tracer_provider.get_tracer(
316
+ "aworld.trace.instrumentation.openai")
317
+
318
+ if isinstance(client, openai.OpenAI):
319
+ wrapper = _chat_wrapper(tracer)
320
+ client.chat.completions.create = wrapper(
321
+ client.chat.completions.create)
322
+ logger.info(
323
+ f"[{client.__class__}]client.chat.completions.create be warpped")
324
+ if isinstance(client, openai.AsyncOpenAI):
325
+ awrapper = _achat_instance_wrapper(tracer)
326
+ client.chat.completions.create = awrapper(
327
+ client.chat.completions.create)
328
+ logger.info(
329
+ f"[{client.__class__}]client.chat.completions.create be warpped")
330
+ except Exception:
331
+ logger.warning(traceback.format_exc())
332
+
333
+ return client