Spaces:
Sleeping
Sleeping
File size: 6,451 Bytes
7a3c0ee |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 |
import os
from re import compile as re_compile
from re import search
from typing import Final, Iterable, Any
from urllib.parse import urlparse, urlunparse, unquote
from wsgiref.types import WSGIEnvironment
from requests.models import PreparedRequest
HTTP_REQUEST_METHOD: Final = "http.request.method"
HTTP_FLAVOR: Final = "http.flavor"
HTTP_HOST: Final = "http.host"
HTTP_SCHEME: Final = "http.scheme"
HTTP_USER_AGENT: Final = "http.user_agent"
HTTP_SERVER_NAME: Final = "http.server_name"
SERVER_ADDRESS: Final = "server.address"
SERVER_PORT: Final = "server.port"
URL_PATH: Final = "url.path"
URL_QUERY: Final = "url.query"
CLIENT_ADDRESS: Final = "client.address"
CLIENT_PORT: Final = "client.port"
URL_FULL: Final = "url.full"
HTTP_REQUEST_BODY_SIZE: Final = "http.request.body.size"
HTTP_REQUEST_HEADER: Final = "http.request.header"
HTTP_REQUEST_SIZE: Final = "http.request.size"
HTTP_RESPONSE_BODY_SIZE: Final = "http.response.body.size"
HTTP_RESPONSE_HEADER: Final = "http.response.header"
HTTP_RESPONSE_SIZE: Final = "http.response.size"
HTTP_RESPONSE_STATUS_CODE: Final = "http.response.status_code"
HTTP_ROUTE = "http.route"
def collect_request_attributes(environ: WSGIEnvironment):
attributes: dict[str] = {}
request_method = environ.get("REQUEST_METHOD", "")
request_method = request_method.upper()
attributes[HTTP_REQUEST_METHOD] = request_method
attributes[HTTP_FLAVOR] = environ.get("SERVER_PROTOCOL", "")
attributes[HTTP_SCHEME] = environ.get("wsgi.url_scheme", "")
attributes[HTTP_SERVER_NAME] = environ.get("SERVER_NAME", "")
attributes[HTTP_HOST] = environ.get("HTTP_HOST", "")
host_port = environ.get("SERVER_PORT")
if host_port:
attributes[SERVER_PORT] = host_port
target = environ.get("RAW_URI")
if target is None:
target = environ.get("REQUEST_URI")
if target:
path, query = _parse_url_query(target)
attributes[URL_PATH] = path
attributes[URL_QUERY] = query
remote_addr = environ.get("REMOTE_ADDR", "")
attributes[CLIENT_ADDRESS] = remote_addr
attributes[CLIENT_PORT] = environ.get("REMOTE_PORT", "")
remote_host = environ.get("REMOTE_HOST")
if remote_host and remote_host != remote_addr:
attributes[CLIENT_ADDRESS] = remote_host
attributes[HTTP_USER_AGENT] = environ.get("HTTP_USER_AGENT", "")
return attributes
def collect_attributes_from_request(request: PreparedRequest) -> dict[str]:
attributes: dict[str] = {}
url = remove_url_credentials(request.url)
attributes[HTTP_REQUEST_METHOD] = request.method
attributes[URL_FULL] = url
parsed_url = urlparse(url)
if parsed_url.scheme:
attributes[HTTP_SCHEME] = parsed_url.scheme
if parsed_url.hostname:
attributes[HTTP_HOST] = parsed_url.hostname
if parsed_url.port:
attributes[SERVER_PORT] = parsed_url.port
return attributes
def url_disabled(url: str, excluded_urls: Iterable[str]) -> bool:
"""
Check if the url is disabled.
Args:
url: The url to check.
excluded_urls: The excluded urls.
Returns:
True if the url is disabled, False otherwise.
"""
if excluded_urls is None:
return False
regex = re_compile("|".join(excluded_urls))
return search(regex, url)
def get_excluded_urls(instrumentation: str) -> list[str]:
"""
Get the excluded urls.
Args:
instrumentation: The instrumentation to get the excluded urls for.
Returns:
The excluded urls.
"""
excluded_urls = os.environ.get(f"{instrumentation}_EXCLUDED_URLS")
return parse_excluded_urls(excluded_urls)
def parse_excluded_urls(excluded_urls: str) -> list[str]:
"""
Parse the excluded urls.
Args:
excluded_urls: The excluded urls.
Returns:
The excluded urls.
"""
if excluded_urls:
excluded_url_list = [
excluded_url.strip() for excluded_url in excluded_urls.split(",")
]
else:
excluded_url_list = []
return excluded_url_list
def remove_url_credentials(url: str) -> str:
"""Given a string url, remove the username and password only if it is a valid url"""
try:
parsed = urlparse(url)
if all([parsed.scheme, parsed.netloc]): # checks for valid url
parsed_url = urlparse(url)
_, _, netloc = parsed.netloc.rpartition("@")
return urlunparse(
(
parsed_url.scheme,
netloc,
parsed_url.path,
parsed_url.params,
parsed_url.query,
parsed_url.fragment,
)
)
except ValueError: # an unparsable url was passed
pass
return url
def parser_host_port_url_from_asgi(scope: dict[str, Any]):
"""Returns (host, port, full_url) tuple."""
server = scope.get("server") or ["0.0.0.0", 80]
port = server[1]
server_host = server[0] + (":" + str(port) if str(port) != "80" else "")
full_path = scope.get("path", "")
http_url = scope.get("scheme", "http") + "://" + server_host + full_path
return server_host, port, http_url
def collect_request_attributes_asgi(scope: dict[str, Any]):
attributes: dict[str] = {}
server_host, port, http_url = parser_host_port_url_from_asgi(scope)
query_string = scope.get("query_string")
if query_string and http_url:
if isinstance(query_string, bytes):
query_string = query_string.decode("utf8")
http_url += "?" + unquote(query_string)
attributes[HTTP_REQUEST_METHOD] = scope.get("method", "")
attributes[HTTP_FLAVOR] = scope.get("http_version", "")
attributes[HTTP_SCHEME] = scope.get("scheme", "")
attributes[HTTP_HOST] = server_host
attributes[SERVER_PORT] = port
attributes[URL_FULL] = remove_url_credentials(http_url)
attributes[URL_PATH] = scope.get("path", "")
header = scope.get("headers")
if header:
for key, value in header:
if key == b"user-agent":
attributes[HTTP_USER_AGENT] = value.decode("utf8")
client = scope.get("client")
if client:
attributes[CLIENT_ADDRESS] = client[0]
attributes[CLIENT_PORT] = client[1]
return attributes
def _parse_url_query(url: str):
parsed_url = urlparse(url)
path = parsed_url.path
query_params = parsed_url.query
return path, query_params
|