Spaces:
Build error
Build error
File size: 6,855 Bytes
64772a4 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 |
import asyncio
import functools
import pycares
import socket
import sys
from typing import (
Any,
Optional,
Set,
Sequence,
Tuple,
Union
)
from . import error
__version__ = '3.2.0'
__all__ = ('DNSResolver', 'error')
READ = 1
WRITE = 2
query_type_map = {'A' : pycares.QUERY_TYPE_A,
'AAAA' : pycares.QUERY_TYPE_AAAA,
'ANY' : pycares.QUERY_TYPE_ANY,
'CAA' : pycares.QUERY_TYPE_CAA,
'CNAME' : pycares.QUERY_TYPE_CNAME,
'MX' : pycares.QUERY_TYPE_MX,
'NAPTR' : pycares.QUERY_TYPE_NAPTR,
'NS' : pycares.QUERY_TYPE_NS,
'PTR' : pycares.QUERY_TYPE_PTR,
'SOA' : pycares.QUERY_TYPE_SOA,
'SRV' : pycares.QUERY_TYPE_SRV,
'TXT' : pycares.QUERY_TYPE_TXT
}
query_class_map = {'IN' : pycares.QUERY_CLASS_IN,
'CHAOS' : pycares.QUERY_CLASS_CHAOS,
'HS' : pycares.QUERY_CLASS_HS,
'NONE' : pycares.QUERY_CLASS_NONE,
'ANY' : pycares.QUERY_CLASS_ANY
}
class DNSResolver:
def __init__(self, nameservers: Optional[Sequence[str]] = None,
loop: Optional[asyncio.AbstractEventLoop] = None,
**kwargs: Any) -> None:
self.loop = loop or asyncio.get_event_loop()
assert self.loop is not None
if sys.platform == 'win32':
if not isinstance(self.loop, asyncio.SelectorEventLoop):
try:
import winloop
if not isinstance(self.loop , winloop.Loop):
raise RuntimeError(
'aiodns needs a SelectorEventLoop on Windows. See more: https://github.com/saghul/aiodns/issues/86')
except ModuleNotFoundError:
raise RuntimeError(
'aiodns needs a SelectorEventLoop on Windows. See more: https://github.com/saghul/aiodns/issues/86')
kwargs.pop('sock_state_cb', None)
timeout = kwargs.pop('timeout', None)
self._timeout = timeout
self._channel = pycares.Channel(sock_state_cb=self._sock_state_cb,
timeout=timeout,
**kwargs)
if nameservers:
self.nameservers = nameservers
self._read_fds = set() # type: Set[int]
self._write_fds = set() # type: Set[int]
self._timer = None # type: Optional[asyncio.TimerHandle]
@property
def nameservers(self) -> Sequence[str]:
return self._channel.servers
@nameservers.setter
def nameservers(self, value: Sequence[str]) -> None:
self._channel.servers = value
@staticmethod
def _callback(fut: asyncio.Future, result: Any, errorno: int) -> None:
if fut.cancelled():
return
if errorno is not None:
fut.set_exception(error.DNSError(errorno, pycares.errno.strerror(errorno)))
else:
fut.set_result(result)
def query(self, host: str, qtype: str, qclass: Optional[str]=None) -> asyncio.Future:
try:
qtype = query_type_map[qtype]
except KeyError:
raise ValueError('invalid query type: {}'.format(qtype))
if qclass is not None:
try:
qclass = query_class_map[qclass]
except KeyError:
raise ValueError('invalid query class: {}'.format(qclass))
fut = asyncio.Future(loop=self.loop) # type: asyncio.Future
cb = functools.partial(self._callback, fut)
self._channel.query(host, qtype, cb, query_class=qclass)
return fut
def gethostbyname(self, host: str, family: socket.AddressFamily) -> asyncio.Future:
fut = asyncio.Future(loop=self.loop) # type: asyncio.Future
cb = functools.partial(self._callback, fut)
self._channel.gethostbyname(host, family, cb)
return fut
def getaddrinfo(self, host: str, family: socket.AddressFamily = socket.AF_UNSPEC, port: Optional[int] = None, proto: int = 0, type: int = 0, flags: int = 0) -> asyncio.Future:
fut = asyncio.Future(loop=self.loop) # type: asyncio.Future
cb = functools.partial(self._callback, fut)
self._channel.getaddrinfo(host, port, cb, family=family, type=type, proto=proto, flags=flags)
return fut
def getnameinfo(self, sockaddr: Union[Tuple[str, int], Tuple[str, int, int, int]], flags: int = 0) -> asyncio.Future:
fut = asyncio.Future(loop=self.loop) # type: asyncio.Future
cb = functools.partial(self._callback, fut)
self._channel.getnameinfo(sockaddr, flags, cb)
return fut
def gethostbyaddr(self, name: str) -> asyncio.Future:
fut = asyncio.Future(loop=self.loop) # type: asyncio.Future
cb = functools.partial(self._callback, fut)
self._channel.gethostbyaddr(name, cb)
return fut
def cancel(self) -> None:
self._channel.cancel()
def _sock_state_cb(self, fd: int, readable: bool, writable: bool) -> None:
if readable or writable:
if readable:
self.loop.add_reader(fd, self._handle_event, fd, READ)
self._read_fds.add(fd)
if writable:
self.loop.add_writer(fd, self._handle_event, fd, WRITE)
self._write_fds.add(fd)
if self._timer is None:
self._start_timer()
else:
# socket is now closed
if fd in self._read_fds:
self._read_fds.discard(fd)
self.loop.remove_reader(fd)
if fd in self._write_fds:
self._write_fds.discard(fd)
self.loop.remove_writer(fd)
if not self._read_fds and not self._write_fds and self._timer is not None:
self._timer.cancel()
self._timer = None
def _handle_event(self, fd: int, event: Any) -> None:
read_fd = pycares.ARES_SOCKET_BAD
write_fd = pycares.ARES_SOCKET_BAD
if event == READ:
read_fd = fd
elif event == WRITE:
write_fd = fd
self._channel.process_fd(read_fd, write_fd)
def _timer_cb(self) -> None:
if self._read_fds or self._write_fds:
self._channel.process_fd(pycares.ARES_SOCKET_BAD, pycares.ARES_SOCKET_BAD)
self._start_timer()
else:
self._timer = None
def _start_timer(self):
timeout = self._timeout
if timeout is None or timeout < 0 or timeout > 1:
timeout = 1
elif timeout == 0:
timeout = 0.1
self._timer = self.loop.call_later(timeout, self._timer_cb)
|