Spaces:
Sleeping
Sleeping
| import os | |
| from urllib.parse import urljoin | |
| from typing import Optional, Sequence | |
| from typing_extensions import LiteralString | |
| from uuid import uuid4 | |
| from opentelemetry import metrics | |
| from opentelemetry.sdk.resources import Resource | |
| from opentelemetry.semconv.resource import ResourceAttributes | |
| from opentelemetry.sdk.metrics import MeterProvider | |
| from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader, ConsoleMetricExporter | |
| from aworld.metrics.metric import ( | |
| Gauge, | |
| Histogram, | |
| MetricProvider, | |
| Counter, | |
| MetricExporter, | |
| UpDownCounter, | |
| get_metric_provider, | |
| set_metric_provider | |
| ) | |
| MEMORY_FIELDS: list[LiteralString] = 'available used free active inactive buffers cached shared wired slab'.split() | |
| """ | |
| The fields of the memory information returned by psutil.virtual_memory(). | |
| """ | |
| class OpentelemetryMetricProvider(MetricProvider): | |
| """ | |
| MetricProvider is a class for providing metrics. | |
| """ | |
| def __init__(self, exporter: MetricExporter = None): | |
| """Initialize the MetricProvider. | |
| Args: | |
| exporter: The exporter of the metric. | |
| """ | |
| super().__init__() | |
| if not exporter: | |
| exporter = ConsoleMetricExporter() | |
| self._exporter = exporter | |
| self._otel_provider = MeterProvider( | |
| metric_readers=[PeriodicExportingMetricReader( | |
| exporter=self._exporter, export_interval_millis=5000)], | |
| resource=build_otel_resource() | |
| ) | |
| metrics.set_meter_provider(self._otel_provider) | |
| self._meter = self._otel_provider.get_meter("aworld") | |
| def create_counter(self, | |
| name: str, | |
| description: str, | |
| unit: str, | |
| labelnames: Optional[Sequence[str]] = None) -> Counter: | |
| """ | |
| Create a counter. | |
| Args: | |
| name: The name of the counter. | |
| description: The description of the counter. | |
| unit: The unit of the counter. | |
| """ | |
| return OpentelemetryCounter(name, description, unit, self) | |
| def create_un_down_counter(self, | |
| name: str, | |
| description: str, | |
| unit: str, | |
| labelnames: Optional[Sequence[str]] = None) -> UpDownCounter: | |
| """ | |
| Create a un-down counter. | |
| Args: | |
| name: The name of the counter. | |
| description: The description of the counter. | |
| unit: The unit of the counter. | |
| """ | |
| return OpentelemetryUpDownCounter(name, description, unit, self) | |
| def create_gauge(self, | |
| name: str, | |
| description: str, | |
| unit: str, | |
| labelnames: Optional[Sequence[str]] = None) -> Gauge: | |
| """ | |
| Create a gauge. | |
| Args: | |
| name: The name of the gauge. | |
| description: The description of the gauge. | |
| unit: The unit of the gauge. | |
| """ | |
| return OpentelemetryGauge(name, description, unit, self) | |
| def create_histogram(self, | |
| name: str, | |
| description: str, | |
| unit: str, | |
| buckets: Optional[Sequence[float]] = None, | |
| labelnames: Optional[Sequence[str]] = None) -> Histogram: | |
| """ | |
| Create a histogram. | |
| Args: | |
| name: The name of the histogram. | |
| description: The description of the histogram. | |
| unit: The unit of the histogram. | |
| buckets: The buckets of the histogram. | |
| """ | |
| return OpentelemetryHistogram(name, description, unit, self, buckets) | |
| def shutdown(self): | |
| """ | |
| Shutdown the metric provider. | |
| """ | |
| self._exporter.shutdown() | |
| self._otel_provider.shutdown() | |
| class OpentelemetryCounter(Counter): | |
| """ | |
| OpentelemetryCounter is a subclass of Counter, representing a counter metric. | |
| A counter is a cumulative metric that represents a single numerical value that only ever goes up. | |
| """ | |
| def __init__(self, | |
| name: str, | |
| description: str, | |
| unit: str, | |
| provider: OpentelemetryMetricProvider): | |
| """ | |
| Initialize the Counter. | |
| Args: | |
| name: The name of the counter. | |
| description: The description of the counter. | |
| unit: The unit of the counter. | |
| provider: The provider of the counter. | |
| """ | |
| super().__init__(name, description, unit, provider) | |
| self._counter = provider._meter.create_counter( | |
| name=name, description=description, unit=unit) | |
| def add(self, value: int, labels: dict = None) -> None: | |
| """ | |
| Add a value to the counter. | |
| Args: | |
| value: The value to add to the counter. | |
| labels: The labels to associate with the value. | |
| """ | |
| if labels is None: | |
| labels = {} | |
| self._counter.add(value, labels) | |
| class OpentelemetryUpDownCounter(UpDownCounter): | |
| """ | |
| OpentelemetryUpDownCounter is a subclass of UpDownCounter, representing an un-down counter metric. | |
| An un-down counter is a cumulative metric that represents a single numerical value that only ever goes up. | |
| """ | |
| def __init__(self, | |
| name: str, | |
| description: str, | |
| unit: str, | |
| provider: OpentelemetryMetricProvider): | |
| """ | |
| Initialize the UnDownCounter. | |
| Args: | |
| name: The name of the counter. | |
| description: The description of the counter. | |
| unit: The unit of the counter. | |
| provider: The provider of the counter. | |
| """ | |
| super().__init__(name, description, unit, provider) | |
| self._counter = provider._meter.create_up_down_counter( | |
| name=name, description=description, unit=unit) | |
| def inc(self, value: int, labels: dict = None) -> None: | |
| """ | |
| Add a value to the counter. | |
| Args: | |
| value: The value to add to the counter. | |
| labels: The labels to associate with the value. | |
| """ | |
| if labels is None: | |
| labels = {} | |
| self._counter.add(value, labels) | |
| def dec(self, value: int, labels: dict = None) -> None: | |
| """ | |
| Subtract a value from the counter. | |
| Args: | |
| value: The value to subtract from the counter. | |
| labels: The labels to associate with the value. | |
| """ | |
| if labels is None: | |
| labels = {} | |
| self._counter.add(-value, labels) | |
| class OpentelemetryGauge(Gauge): | |
| """ | |
| OpentelemetryGauge is a subclass of Gauge, representing a gauge metric. | |
| A gauge is a metric that represents a single numerical value that can arbitrarily go up and down. | |
| """ | |
| def __init__(self, | |
| name: str, | |
| description: str, | |
| unit: str, | |
| provider: OpentelemetryMetricProvider): | |
| """ | |
| Initialize the Gauge. | |
| Args: | |
| name: The name of the gauge. | |
| description: The description of the gauge. | |
| unit: The unit of the gauge. | |
| provider: The provider of the gauge. | |
| """ | |
| super().__init__(name, description, unit, provider) | |
| self._gauge = provider._meter.create_gauge( | |
| name=name, description=description, unit=unit) | |
| def set(self, value: int, labels: dict = None) -> None: | |
| """ | |
| Set the value of the gauge. | |
| Args: | |
| value: The value to set the gauge to. | |
| labels: The labels to associate with the value. | |
| """ | |
| if labels is None: | |
| labels = {} | |
| self._gauge.set(value, labels) | |
| class OpentelemetryHistogram(Histogram): | |
| """ | |
| OpentelemetryHistogram is a subclass of Histogram, representing a histogram metric. | |
| A histogram is a metric that represents the distribution of a set of values. | |
| """ | |
| def __init__(self, | |
| name: str, | |
| description: str, | |
| unit: str, | |
| provider: OpentelemetryMetricProvider, | |
| buckets: Sequence[float] = None): | |
| """ | |
| Initialize the Histogram. | |
| Args: | |
| name: The name of the histogram. | |
| description: The description of the histogram. | |
| unit: The unit of the histogram. | |
| provider: The provider of the histogram. | |
| buckets: The buckets of the histogram. | |
| """ | |
| super().__init__(name, description, unit, provider, buckets) | |
| self._histogram = provider._meter.create_histogram(name=name, | |
| description=description, | |
| unit=unit, | |
| explicit_bucket_boundaries_advisory=buckets) | |
| def record(self, value: int, labels: dict = None) -> None: | |
| """ | |
| Record a value in the histogram. | |
| Args: | |
| value: The value to record in the histogram. | |
| labels: The labels to associate with the value. | |
| """ | |
| if labels is None: | |
| labels = {} | |
| self._histogram.record(value, labels) | |
| def configure_otlp_provider(backend: Sequence[str] = None, | |
| base_url: str = None, | |
| write_token: str = None, | |
| **kwargs | |
| ) -> None: | |
| """ | |
| Configure the OpenTelemetry provider. | |
| Args: | |
| backends: The backends to use. | |
| base_url: The base URL of the backend. | |
| write_token: The write token of the backend. | |
| **kwargs: The keyword arguments to pass to the backend. | |
| """ | |
| import requests | |
| from opentelemetry.exporter.otlp.proto.http import Compression | |
| from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter | |
| if backend == "console": | |
| set_metric_provider(OpentelemetryMetricProvider()) | |
| elif backend == "logfire": | |
| base_url = base_url or "https://logfire-us.pydantic.dev" | |
| headers = {'User-Agent': f'logfire/3.14.0', | |
| 'Authorization': write_token} | |
| session = requests.Session() | |
| session.headers.update(headers) | |
| exporter = OTLPMetricExporter( | |
| endpoint=urljoin(base_url, '/v1/metrics'), | |
| session=session, | |
| compression=Compression.Gzip, | |
| ) | |
| set_metric_provider(OpentelemetryMetricProvider(exporter)) | |
| elif backend == "antmonitor": | |
| ant_otlp_endpoint = os.getenv("ANT_OTEL_ENDPOINT") | |
| base_url = base_url or ant_otlp_endpoint | |
| session = requests.Session() | |
| session.timeout = 30 | |
| exporter = OTLPMetricExporter( | |
| endpoint=base_url, | |
| session=session, | |
| compression=Compression.Gzip, | |
| timeout=30 | |
| ) | |
| set_metric_provider(OpentelemetryMetricProvider(exporter)) | |
| metrics_system_enabled = kwargs.get("metrics_system_enabled") or os.getenv( | |
| "METRICS_SYSTEM_ENABLED") or "false" | |
| if metrics_system_enabled.lower() == "true": | |
| instrument_system_metrics() | |
| def instrument_system_metrics(): | |
| """ | |
| Instrument system metrics. | |
| """ | |
| try: | |
| from opentelemetry.instrumentation.system_metrics import ( | |
| _DEFAULT_CONFIG, | |
| SystemMetricsInstrumentor | |
| ) | |
| except ImportError: | |
| raise ImportError( | |
| "Could not import opentelemetry.instrumentation.system_metrics, please install it with `pip install opentelemetry-instrumentation-system-metrics`" | |
| ) | |
| config = _DEFAULT_CONFIG.copy() | |
| config['system.memory.usage'] = MEMORY_FIELDS + ['total'] | |
| config['system.memory.utilization'] = MEMORY_FIELDS | |
| config['system.swap.utilization'] = ['used'] | |
| instrumentor = SystemMetricsInstrumentor(config=config) | |
| otel_provider = get_metric_provider()._otel_provider | |
| instrumentor.instrument(meter_provider=otel_provider) | |
| def build_otel_resource(): | |
| """ | |
| Build the OpenTelemetry resource. | |
| """ | |
| service_name = os.getenv("MONITOR_SERVICE_NAME") or "aworld" | |
| return Resource( | |
| attributes={ | |
| ResourceAttributes.SERVICE_NAME: service_name, | |
| ResourceAttributes.SERVICE_NAMESPACE: "aworld", | |
| ResourceAttributes.SERVICE_INSTANCE_ID: uuid4().hex | |
| } | |
| ) | |