Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import re | |
| import typing | |
| from ..exceptions import LocationParseError | |
| from .util import to_str | |
| # We only want to normalize urls with an HTTP(S) scheme. | |
| # urllib3 infers URLs without a scheme (None) to be http. | |
| _NORMALIZABLE_SCHEMES = ("http", "https", None) | |
| # Almost all of these patterns were derived from the | |
| # 'rfc3986' module: https://github.com/python-hyper/rfc3986 | |
| _PERCENT_RE = re.compile(r"%[a-fA-F0-9]{2}") | |
| _SCHEME_RE = re.compile(r"^(?:[a-zA-Z][a-zA-Z0-9+-]*:|/)") | |
| _URI_RE = re.compile( | |
| r"^(?:([a-zA-Z][a-zA-Z0-9+.-]*):)?" | |
| r"(?://([^\\/?#]*))?" | |
| r"([^?#]*)" | |
| r"(?:\?([^#]*))?" | |
| r"(?:#(.*))?$", | |
| re.UNICODE | re.DOTALL, | |
| ) | |
| _IPV4_PAT = r"(?:[0-9]{1,3}\.){3}[0-9]{1,3}" | |
| _HEX_PAT = "[0-9A-Fa-f]{1,4}" | |
| _LS32_PAT = "(?:{hex}:{hex}|{ipv4})".format(hex=_HEX_PAT, ipv4=_IPV4_PAT) | |
| _subs = {"hex": _HEX_PAT, "ls32": _LS32_PAT} | |
| _variations = [ | |
| # 6( h16 ":" ) ls32 | |
| "(?:%(hex)s:){6}%(ls32)s", | |
| # "::" 5( h16 ":" ) ls32 | |
| "::(?:%(hex)s:){5}%(ls32)s", | |
| # [ h16 ] "::" 4( h16 ":" ) ls32 | |
| "(?:%(hex)s)?::(?:%(hex)s:){4}%(ls32)s", | |
| # [ *1( h16 ":" ) h16 ] "::" 3( h16 ":" ) ls32 | |
| "(?:(?:%(hex)s:)?%(hex)s)?::(?:%(hex)s:){3}%(ls32)s", | |
| # [ *2( h16 ":" ) h16 ] "::" 2( h16 ":" ) ls32 | |
| "(?:(?:%(hex)s:){0,2}%(hex)s)?::(?:%(hex)s:){2}%(ls32)s", | |
| # [ *3( h16 ":" ) h16 ] "::" h16 ":" ls32 | |
| "(?:(?:%(hex)s:){0,3}%(hex)s)?::%(hex)s:%(ls32)s", | |
| # [ *4( h16 ":" ) h16 ] "::" ls32 | |
| "(?:(?:%(hex)s:){0,4}%(hex)s)?::%(ls32)s", | |
| # [ *5( h16 ":" ) h16 ] "::" h16 | |
| "(?:(?:%(hex)s:){0,5}%(hex)s)?::%(hex)s", | |
| # [ *6( h16 ":" ) h16 ] "::" | |
| "(?:(?:%(hex)s:){0,6}%(hex)s)?::", | |
| ] | |
| _UNRESERVED_PAT = r"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789._\-~" | |
| _IPV6_PAT = "(?:" + "|".join([x % _subs for x in _variations]) + ")" | |
| _ZONE_ID_PAT = "(?:%25|%)(?:[" + _UNRESERVED_PAT + "]|%[a-fA-F0-9]{2})+" | |
| _IPV6_ADDRZ_PAT = r"\[" + _IPV6_PAT + r"(?:" + _ZONE_ID_PAT + r")?\]" | |
| _REG_NAME_PAT = r"(?:[^\[\]%:/?#]|%[a-fA-F0-9]{2})*" | |
| _TARGET_RE = re.compile(r"^(/[^?#]*)(?:\?([^#]*))?(?:#.*)?$") | |
| _IPV4_RE = re.compile("^" + _IPV4_PAT + "$") | |
| _IPV6_RE = re.compile("^" + _IPV6_PAT + "$") | |
| _IPV6_ADDRZ_RE = re.compile("^" + _IPV6_ADDRZ_PAT + "$") | |
| _BRACELESS_IPV6_ADDRZ_RE = re.compile("^" + _IPV6_ADDRZ_PAT[2:-2] + "$") | |
| _ZONE_ID_RE = re.compile("(" + _ZONE_ID_PAT + r")\]$") | |
| _HOST_PORT_PAT = ("^(%s|%s|%s)(?::0*?(|0|[1-9][0-9]{0,4}))?$") % ( | |
| _REG_NAME_PAT, | |
| _IPV4_PAT, | |
| _IPV6_ADDRZ_PAT, | |
| ) | |
| _HOST_PORT_RE = re.compile(_HOST_PORT_PAT, re.UNICODE | re.DOTALL) | |
| _UNRESERVED_CHARS = set( | |
| "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789._-~" | |
| ) | |
| _SUB_DELIM_CHARS = set("!$&'()*+,;=") | |
| _USERINFO_CHARS = _UNRESERVED_CHARS | _SUB_DELIM_CHARS | {":"} | |
| _PATH_CHARS = _USERINFO_CHARS | {"@", "/"} | |
| _QUERY_CHARS = _FRAGMENT_CHARS = _PATH_CHARS | {"?"} | |
| class Url( | |
| typing.NamedTuple( | |
| "Url", | |
| [ | |
| ("scheme", typing.Optional[str]), | |
| ("auth", typing.Optional[str]), | |
| ("host", typing.Optional[str]), | |
| ("port", typing.Optional[int]), | |
| ("path", typing.Optional[str]), | |
| ("query", typing.Optional[str]), | |
| ("fragment", typing.Optional[str]), | |
| ], | |
| ) | |
| ): | |
| """ | |
| Data structure for representing an HTTP URL. Used as a return value for | |
| :func:`parse_url`. Both the scheme and host are normalized as they are | |
| both case-insensitive according to RFC 3986. | |
| """ | |
| def __new__( # type: ignore[no-untyped-def] | |
| cls, | |
| scheme: str | None = None, | |
| auth: str | None = None, | |
| host: str | None = None, | |
| port: int | None = None, | |
| path: str | None = None, | |
| query: str | None = None, | |
| fragment: str | None = None, | |
| ): | |
| if path and not path.startswith("/"): | |
| path = "/" + path | |
| if scheme is not None: | |
| scheme = scheme.lower() | |
| return super().__new__(cls, scheme, auth, host, port, path, query, fragment) | |
| def hostname(self) -> str | None: | |
| """For backwards-compatibility with urlparse. We're nice like that.""" | |
| return self.host | |
| def request_uri(self) -> str: | |
| """Absolute path including the query string.""" | |
| uri = self.path or "/" | |
| if self.query is not None: | |
| uri += "?" + self.query | |
| return uri | |
| def authority(self) -> str | None: | |
| """ | |
| Authority component as defined in RFC 3986 3.2. | |
| This includes userinfo (auth), host and port. | |
| i.e. | |
| userinfo@host:port | |
| """ | |
| userinfo = self.auth | |
| netloc = self.netloc | |
| if netloc is None or userinfo is None: | |
| return netloc | |
| else: | |
| return f"{userinfo}@{netloc}" | |
| def netloc(self) -> str | None: | |
| """ | |
| Network location including host and port. | |
| If you need the equivalent of urllib.parse's ``netloc``, | |
| use the ``authority`` property instead. | |
| """ | |
| if self.host is None: | |
| return None | |
| if self.port: | |
| return f"{self.host}:{self.port}" | |
| return self.host | |
| def url(self) -> str: | |
| """ | |
| Convert self into a url | |
| This function should more or less round-trip with :func:`.parse_url`. The | |
| returned url may not be exactly the same as the url inputted to | |
| :func:`.parse_url`, but it should be equivalent by the RFC (e.g., urls | |
| with a blank port will have : removed). | |
| Example: | |
| .. code-block:: python | |
| import urllib3 | |
| U = urllib3.util.parse_url("https://google.com/mail/") | |
| print(U.url) | |
| # "https://google.com/mail/" | |
| print( urllib3.util.Url("https", "username:password", | |
| "host.com", 80, "/path", "query", "fragment" | |
| ).url | |
| ) | |
| # "https://username:[email protected]:80/path?query#fragment" | |
| """ | |
| scheme, auth, host, port, path, query, fragment = self | |
| url = "" | |
| # We use "is not None" we want things to happen with empty strings (or 0 port) | |
| if scheme is not None: | |
| url += scheme + "://" | |
| if auth is not None: | |
| url += auth + "@" | |
| if host is not None: | |
| url += host | |
| if port is not None: | |
| url += ":" + str(port) | |
| if path is not None: | |
| url += path | |
| if query is not None: | |
| url += "?" + query | |
| if fragment is not None: | |
| url += "#" + fragment | |
| return url | |
| def __str__(self) -> str: | |
| return self.url | |
| def _encode_invalid_chars( | |
| component: str, allowed_chars: typing.Container[str] | |
| ) -> str: # Abstract | |
| ... | |
| def _encode_invalid_chars( | |
| component: None, allowed_chars: typing.Container[str] | |
| ) -> None: # Abstract | |
| ... | |
| def _encode_invalid_chars( | |
| component: str | None, allowed_chars: typing.Container[str] | |
| ) -> str | None: | |
| """Percent-encodes a URI component without reapplying | |
| onto an already percent-encoded component. | |
| """ | |
| if component is None: | |
| return component | |
| component = to_str(component) | |
| # Normalize existing percent-encoded bytes. | |
| # Try to see if the component we're encoding is already percent-encoded | |
| # so we can skip all '%' characters but still encode all others. | |
| component, percent_encodings = _PERCENT_RE.subn( | |
| lambda match: match.group(0).upper(), component | |
| ) | |
| uri_bytes = component.encode("utf-8", "surrogatepass") | |
| is_percent_encoded = percent_encodings == uri_bytes.count(b"%") | |
| encoded_component = bytearray() | |
| for i in range(0, len(uri_bytes)): | |
| # Will return a single character bytestring | |
| byte = uri_bytes[i : i + 1] | |
| byte_ord = ord(byte) | |
| if (is_percent_encoded and byte == b"%") or ( | |
| byte_ord < 128 and byte.decode() in allowed_chars | |
| ): | |
| encoded_component += byte | |
| continue | |
| encoded_component.extend(b"%" + (hex(byte_ord)[2:].encode().zfill(2).upper())) | |
| return encoded_component.decode() | |
| def _remove_path_dot_segments(path: str) -> str: | |
| # See http://tools.ietf.org/html/rfc3986#section-5.2.4 for pseudo-code | |
| segments = path.split("/") # Turn the path into a list of segments | |
| output = [] # Initialize the variable to use to store output | |
| for segment in segments: | |
| # '.' is the current directory, so ignore it, it is superfluous | |
| if segment == ".": | |
| continue | |
| # Anything other than '..', should be appended to the output | |
| if segment != "..": | |
| output.append(segment) | |
| # In this case segment == '..', if we can, we should pop the last | |
| # element | |
| elif output: | |
| output.pop() | |
| # If the path starts with '/' and the output is empty or the first string | |
| # is non-empty | |
| if path.startswith("/") and (not output or output[0]): | |
| output.insert(0, "") | |
| # If the path starts with '/.' or '/..' ensure we add one more empty | |
| # string to add a trailing '/' | |
| if path.endswith(("/.", "/..")): | |
| output.append("") | |
| return "/".join(output) | |
| def _normalize_host(host: None, scheme: str | None) -> None: | |
| ... | |
| def _normalize_host(host: str, scheme: str | None) -> str: | |
| ... | |
| def _normalize_host(host: str | None, scheme: str | None) -> str | None: | |
| if host: | |
| if scheme in _NORMALIZABLE_SCHEMES: | |
| is_ipv6 = _IPV6_ADDRZ_RE.match(host) | |
| if is_ipv6: | |
| # IPv6 hosts of the form 'a::b%zone' are encoded in a URL as | |
| # such per RFC 6874: 'a::b%25zone'. Unquote the ZoneID | |
| # separator as necessary to return a valid RFC 4007 scoped IP. | |
| match = _ZONE_ID_RE.search(host) | |
| if match: | |
| start, end = match.span(1) | |
| zone_id = host[start:end] | |
| if zone_id.startswith("%25") and zone_id != "%25": | |
| zone_id = zone_id[3:] | |
| else: | |
| zone_id = zone_id[1:] | |
| zone_id = _encode_invalid_chars(zone_id, _UNRESERVED_CHARS) | |
| return f"{host[:start].lower()}%{zone_id}{host[end:]}" | |
| else: | |
| return host.lower() | |
| elif not _IPV4_RE.match(host): | |
| return to_str( | |
| b".".join([_idna_encode(label) for label in host.split(".")]), | |
| "ascii", | |
| ) | |
| return host | |
| def _idna_encode(name: str) -> bytes: | |
| if not name.isascii(): | |
| try: | |
| import idna | |
| except ImportError: | |
| raise LocationParseError( | |
| "Unable to parse URL without the 'idna' module" | |
| ) from None | |
| try: | |
| return idna.encode(name.lower(), strict=True, std3_rules=True) | |
| except idna.IDNAError: | |
| raise LocationParseError( | |
| f"Name '{name}' is not a valid IDNA label" | |
| ) from None | |
| return name.lower().encode("ascii") | |
| def _encode_target(target: str) -> str: | |
| """Percent-encodes a request target so that there are no invalid characters | |
| Pre-condition for this function is that 'target' must start with '/'. | |
| If that is the case then _TARGET_RE will always produce a match. | |
| """ | |
| match = _TARGET_RE.match(target) | |
| if not match: # Defensive: | |
| raise LocationParseError(f"{target!r} is not a valid request URI") | |
| path, query = match.groups() | |
| encoded_target = _encode_invalid_chars(path, _PATH_CHARS) | |
| if query is not None: | |
| query = _encode_invalid_chars(query, _QUERY_CHARS) | |
| encoded_target += "?" + query | |
| return encoded_target | |
| def parse_url(url: str) -> Url: | |
| """ | |
| Given a url, return a parsed :class:`.Url` namedtuple. Best-effort is | |
| performed to parse incomplete urls. Fields not provided will be None. | |
| This parser is RFC 3986 and RFC 6874 compliant. | |
| The parser logic and helper functions are based heavily on | |
| work done in the ``rfc3986`` module. | |
| :param str url: URL to parse into a :class:`.Url` namedtuple. | |
| Partly backwards-compatible with :mod:`urllib.parse`. | |
| Example: | |
| .. code-block:: python | |
| import urllib3 | |
| print( urllib3.util.parse_url('http://google.com/mail/')) | |
| # Url(scheme='http', host='google.com', port=None, path='/mail/', ...) | |
| print( urllib3.util.parse_url('google.com:80')) | |
| # Url(scheme=None, host='google.com', port=80, path=None, ...) | |
| print( urllib3.util.parse_url('/foo?bar')) | |
| # Url(scheme=None, host=None, port=None, path='/foo', query='bar', ...) | |
| """ | |
| if not url: | |
| # Empty | |
| return Url() | |
| source_url = url | |
| if not _SCHEME_RE.search(url): | |
| url = "//" + url | |
| scheme: str | None | |
| authority: str | None | |
| auth: str | None | |
| host: str | None | |
| port: str | None | |
| port_int: int | None | |
| path: str | None | |
| query: str | None | |
| fragment: str | None | |
| try: | |
| scheme, authority, path, query, fragment = _URI_RE.match(url).groups() # type: ignore[union-attr] | |
| normalize_uri = scheme is None or scheme.lower() in _NORMALIZABLE_SCHEMES | |
| if scheme: | |
| scheme = scheme.lower() | |
| if authority: | |
| auth, _, host_port = authority.rpartition("@") | |
| auth = auth or None | |
| host, port = _HOST_PORT_RE.match(host_port).groups() # type: ignore[union-attr] | |
| if auth and normalize_uri: | |
| auth = _encode_invalid_chars(auth, _USERINFO_CHARS) | |
| if port == "": | |
| port = None | |
| else: | |
| auth, host, port = None, None, None | |
| if port is not None: | |
| port_int = int(port) | |
| if not (0 <= port_int <= 65535): | |
| raise LocationParseError(url) | |
| else: | |
| port_int = None | |
| host = _normalize_host(host, scheme) | |
| if normalize_uri and path: | |
| path = _remove_path_dot_segments(path) | |
| path = _encode_invalid_chars(path, _PATH_CHARS) | |
| if normalize_uri and query: | |
| query = _encode_invalid_chars(query, _QUERY_CHARS) | |
| if normalize_uri and fragment: | |
| fragment = _encode_invalid_chars(fragment, _FRAGMENT_CHARS) | |
| except (ValueError, AttributeError) as e: | |
| raise LocationParseError(source_url) from e | |
| # For the sake of backwards compatibility we put empty | |
| # string values for path if there are any defined values | |
| # beyond the path in the URL. | |
| # TODO: Remove this when we break backwards compatibility. | |
| if not path: | |
| if query is not None or fragment is not None: | |
| path = "" | |
| else: | |
| path = None | |
| return Url( | |
| scheme=scheme, | |
| auth=auth, | |
| host=host, | |
| port=port_int, | |
| path=path, | |
| query=query, | |
| fragment=fragment, | |
| ) | |