Add logging for domain availability checks and improve error handling
Browse files
app.py
CHANGED
|
@@ -1,3 +1,4 @@
|
|
|
|
|
| 1 |
from flask import Flask, request, send_from_directory
|
| 2 |
import dns.resolver
|
| 3 |
import socket
|
|
@@ -28,6 +29,7 @@ def index():
|
|
| 28 |
@app.route('/check/<domain>', methods=['POST'])
|
| 29 |
def check_domain(domain: str):
|
| 30 |
"""Check domain availability"""
|
|
|
|
| 31 |
try:
|
| 32 |
domain = domain.lower().strip('/').strip()
|
| 33 |
if '://' in domain:
|
|
@@ -38,53 +40,57 @@ def check_domain(domain: str):
|
|
| 38 |
return {
|
| 39 |
'domain': domain,
|
| 40 |
"available": False,
|
| 41 |
-
"method": f"Unsupported TLD, try at {unsupported_TLD.get('try')}"
|
|
|
|
| 42 |
}
|
| 43 |
|
| 44 |
-
result = check_domain_availability(domain)
|
| 45 |
if result:
|
| 46 |
return {
|
| 47 |
"domain": domain,
|
| 48 |
"method": f"Checked via {result['method']}",
|
| 49 |
-
"available": result['available']
|
|
|
|
| 50 |
}
|
| 51 |
-
except:
|
| 52 |
-
|
| 53 |
return {
|
| 54 |
'domain': domain,
|
| 55 |
"available": False,
|
| 56 |
-
"method": "Cannot confirm if doimain is available"
|
|
|
|
| 57 |
}
|
| 58 |
|
| 59 |
-
def check_domain_availability(domain):
|
| 60 |
"""Check domain availability using multiple methods."""
|
| 61 |
# First try DNS resolution
|
| 62 |
-
is_available, availability_method, _continue = dns_is_available(domain)
|
| 63 |
if not _continue:
|
| 64 |
return { "available": is_available, "method": f"DNS:{availability_method}" }
|
| 65 |
|
| 66 |
# Try RDAP
|
| 67 |
-
is_available, availability_method, _continue = rdap_is_available(domain)
|
| 68 |
if not _continue:
|
| 69 |
return { "available": is_available, "method": f"RDAP:{availability_method}" }
|
| 70 |
|
| 71 |
# Fall back to WHOIS
|
| 72 |
-
is_available, availability_method, _continue = whois_is_available(domain)
|
| 73 |
if not _continue:
|
| 74 |
return {"available": is_available, "method": f"WHOIS:{availability_method}"}
|
| 75 |
|
| 76 |
-
def dns_is_available(domain):
|
| 77 |
"""Check if domain exists in DNS by looking for common record types."""
|
| 78 |
# Check NS records first as they're required for valid domains
|
| 79 |
for record_type in ['NS', 'A', 'AAAA', 'MX', 'CNAME']:
|
| 80 |
try:
|
| 81 |
dns.resolver.resolve(domain, record_type)
|
| 82 |
return False, record_type, False
|
| 83 |
-
except:
|
|
|
|
| 84 |
continue
|
| 85 |
return True, None, True
|
| 86 |
|
| 87 |
-
def rdap_is_available(domain):
|
| 88 |
try:
|
| 89 |
bootstrap_url = "https://data.iana.org/rdap/dns.json"
|
| 90 |
bootstrap_data = requests.get(bootstrap_url, timeout=5).json()
|
|
@@ -99,11 +105,11 @@ def rdap_is_available(domain):
|
|
| 99 |
return True, rdap_base_url, False
|
| 100 |
elif response.status_code == 200:
|
| 101 |
return False, rdap_base_url, False
|
| 102 |
-
except:
|
| 103 |
-
|
| 104 |
return False, None, True
|
| 105 |
|
| 106 |
-
def whois_is_available(domain) -> bool:
|
| 107 |
try:
|
| 108 |
available_patterns = [
|
| 109 |
'no match',
|
|
@@ -116,19 +122,19 @@ def whois_is_available(domain) -> bool:
|
|
| 116 |
'domain not found'
|
| 117 |
]
|
| 118 |
is_available_callback = lambda output: any(pattern in output for pattern in available_patterns)
|
| 119 |
-
is_available, availability_method = socket_whois_is_available(domain, is_available_callback)
|
| 120 |
if is_available:
|
| 121 |
return True, availability_method, False
|
| 122 |
-
is_available, availability_method = terminal_whois_is_available(domain, is_available_callback)
|
| 123 |
if is_available:
|
| 124 |
return True, availability_method, False
|
| 125 |
-
except:
|
| 126 |
-
|
| 127 |
return False, None, True
|
| 128 |
|
| 129 |
-
def socket_whois_is_available(domain, is_available_callback):
|
| 130 |
try:
|
| 131 |
-
whois_server = get_whois_server(domain)
|
| 132 |
whois_server = "whois.reg.ly"
|
| 133 |
|
| 134 |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
@@ -140,11 +146,11 @@ def socket_whois_is_available(domain, is_available_callback):
|
|
| 140 |
|
| 141 |
response_lower = response.lower()
|
| 142 |
return is_available_callback(response_lower), whois_server
|
| 143 |
-
except:
|
| 144 |
-
|
| 145 |
return False, None
|
| 146 |
|
| 147 |
-
def terminal_whois_is_available(domain, is_available_callback):
|
| 148 |
try:
|
| 149 |
# Check if OS is Linux
|
| 150 |
if platform.system().lower() == 'linux':
|
|
@@ -155,16 +161,18 @@ def terminal_whois_is_available(domain, is_available_callback):
|
|
| 155 |
stdout=subprocess.PIPE,
|
| 156 |
stderr=subprocess.PIPE)
|
| 157 |
try:
|
| 158 |
-
stdout,
|
| 159 |
output = stdout.decode('utf-8', errors='ignore').lower()
|
|
|
|
| 160 |
return is_available_callback(output), "system whois"
|
| 161 |
-
except subprocess.TimeoutExpired:
|
|
|
|
| 162 |
process.kill()
|
| 163 |
-
except:
|
| 164 |
-
|
| 165 |
return False, None
|
| 166 |
|
| 167 |
-
def get_whois_server(domain):
|
| 168 |
"""Get WHOIS server from IANA root zone database."""
|
| 169 |
try:
|
| 170 |
response = requests.get(f'https://www.iana.org/whois?q={domain}')
|
|
@@ -172,6 +180,6 @@ def get_whois_server(domain):
|
|
| 172 |
for line in response.text.split('\n'):
|
| 173 |
if 'whois:' in line.lower():
|
| 174 |
return line.split(':')[1].strip()
|
| 175 |
-
except:
|
| 176 |
-
|
| 177 |
return None
|
|
|
|
| 1 |
+
from typing import Callable
|
| 2 |
from flask import Flask, request, send_from_directory
|
| 3 |
import dns.resolver
|
| 4 |
import socket
|
|
|
|
| 29 |
@app.route('/check/<domain>', methods=['POST'])
|
| 30 |
def check_domain(domain: str):
|
| 31 |
"""Check domain availability"""
|
| 32 |
+
logs: list[str] = []
|
| 33 |
try:
|
| 34 |
domain = domain.lower().strip('/').strip()
|
| 35 |
if '://' in domain:
|
|
|
|
| 40 |
return {
|
| 41 |
'domain': domain,
|
| 42 |
"available": False,
|
| 43 |
+
"method": f"Unsupported TLD, try at {unsupported_TLD.get('try')}",
|
| 44 |
+
"logs": logs
|
| 45 |
}
|
| 46 |
|
| 47 |
+
result = check_domain_availability(domain, logs.append)
|
| 48 |
if result:
|
| 49 |
return {
|
| 50 |
"domain": domain,
|
| 51 |
"method": f"Checked via {result['method']}",
|
| 52 |
+
"available": result['available'],
|
| 53 |
+
"logs": logs
|
| 54 |
}
|
| 55 |
+
except Exception as e:
|
| 56 |
+
logs.append(f"{check_domain.__name__}:{str(e)}")
|
| 57 |
return {
|
| 58 |
'domain': domain,
|
| 59 |
"available": False,
|
| 60 |
+
"method": "Cannot confirm if doimain is available",
|
| 61 |
+
"logs": logs
|
| 62 |
}
|
| 63 |
|
| 64 |
+
def check_domain_availability(domain, logs_append: Callable[[str], None]):
|
| 65 |
"""Check domain availability using multiple methods."""
|
| 66 |
# First try DNS resolution
|
| 67 |
+
is_available, availability_method, _continue = dns_is_available(domain, logs_append)
|
| 68 |
if not _continue:
|
| 69 |
return { "available": is_available, "method": f"DNS:{availability_method}" }
|
| 70 |
|
| 71 |
# Try RDAP
|
| 72 |
+
is_available, availability_method, _continue = rdap_is_available(domain, logs_append)
|
| 73 |
if not _continue:
|
| 74 |
return { "available": is_available, "method": f"RDAP:{availability_method}" }
|
| 75 |
|
| 76 |
# Fall back to WHOIS
|
| 77 |
+
is_available, availability_method, _continue = whois_is_available(domain, logs_append)
|
| 78 |
if not _continue:
|
| 79 |
return {"available": is_available, "method": f"WHOIS:{availability_method}"}
|
| 80 |
|
| 81 |
+
def dns_is_available(domain, logs_append: Callable[[str], None]):
|
| 82 |
"""Check if domain exists in DNS by looking for common record types."""
|
| 83 |
# Check NS records first as they're required for valid domains
|
| 84 |
for record_type in ['NS', 'A', 'AAAA', 'MX', 'CNAME']:
|
| 85 |
try:
|
| 86 |
dns.resolver.resolve(domain, record_type)
|
| 87 |
return False, record_type, False
|
| 88 |
+
except Exception as e:
|
| 89 |
+
logs_append(f"{dns_is_available.__name__}:{str(e)}")
|
| 90 |
continue
|
| 91 |
return True, None, True
|
| 92 |
|
| 93 |
+
def rdap_is_available(domain, logs_append: Callable[[str], None]):
|
| 94 |
try:
|
| 95 |
bootstrap_url = "https://data.iana.org/rdap/dns.json"
|
| 96 |
bootstrap_data = requests.get(bootstrap_url, timeout=5).json()
|
|
|
|
| 105 |
return True, rdap_base_url, False
|
| 106 |
elif response.status_code == 200:
|
| 107 |
return False, rdap_base_url, False
|
| 108 |
+
except Exception as e:
|
| 109 |
+
logs_append(f"{rdap_is_available.__name__}:{str(e)}")
|
| 110 |
return False, None, True
|
| 111 |
|
| 112 |
+
def whois_is_available(domain, logs_append: Callable[[str], None]) -> bool:
|
| 113 |
try:
|
| 114 |
available_patterns = [
|
| 115 |
'no match',
|
|
|
|
| 122 |
'domain not found'
|
| 123 |
]
|
| 124 |
is_available_callback = lambda output: any(pattern in output for pattern in available_patterns)
|
| 125 |
+
is_available, availability_method = socket_whois_is_available(domain, is_available_callback, logs_append)
|
| 126 |
if is_available:
|
| 127 |
return True, availability_method, False
|
| 128 |
+
is_available, availability_method = terminal_whois_is_available(domain, is_available_callback, logs_append)
|
| 129 |
if is_available:
|
| 130 |
return True, availability_method, False
|
| 131 |
+
except Exception as e:
|
| 132 |
+
logs_append(f"{whois_is_available.__name__}:{str(e)}")
|
| 133 |
return False, None, True
|
| 134 |
|
| 135 |
+
def socket_whois_is_available(domain, is_available_callback: Callable[[str], bool], logs_append: Callable[[str], None]):
|
| 136 |
try:
|
| 137 |
+
whois_server = get_whois_server(domain, logs_append)
|
| 138 |
whois_server = "whois.reg.ly"
|
| 139 |
|
| 140 |
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
| 146 |
|
| 147 |
response_lower = response.lower()
|
| 148 |
return is_available_callback(response_lower), whois_server
|
| 149 |
+
except Exception as e:
|
| 150 |
+
logs_append(f"{socket_whois_is_available.__name__}:{str(e)}")
|
| 151 |
return False, None
|
| 152 |
|
| 153 |
+
def terminal_whois_is_available(domain, is_available_callback: Callable[[str], bool], logs_append: Callable[[str], None]):
|
| 154 |
try:
|
| 155 |
# Check if OS is Linux
|
| 156 |
if platform.system().lower() == 'linux':
|
|
|
|
| 161 |
stdout=subprocess.PIPE,
|
| 162 |
stderr=subprocess.PIPE)
|
| 163 |
try:
|
| 164 |
+
stdout, stderr = process.communicate(timeout=60)
|
| 165 |
output = stdout.decode('utf-8', errors='ignore').lower()
|
| 166 |
+
logs_append(f"{terminal_whois_is_available.__name__}:stderr:{str(stderr.decode(encoding='utf-8'))}")
|
| 167 |
return is_available_callback(output), "system whois"
|
| 168 |
+
except subprocess.TimeoutExpired as timeout_e:
|
| 169 |
+
logs_append(f"{terminal_whois_is_available.__name__}:{str(timeout_e)}")
|
| 170 |
process.kill()
|
| 171 |
+
except Exception as e:
|
| 172 |
+
logs_append(f"{terminal_whois_is_available.__name__}:{str(e)}")
|
| 173 |
return False, None
|
| 174 |
|
| 175 |
+
def get_whois_server(domain, logs_append: Callable[[str], None]):
|
| 176 |
"""Get WHOIS server from IANA root zone database."""
|
| 177 |
try:
|
| 178 |
response = requests.get(f'https://www.iana.org/whois?q={domain}')
|
|
|
|
| 180 |
for line in response.text.split('\n'):
|
| 181 |
if 'whois:' in line.lower():
|
| 182 |
return line.split(':')[1].strip()
|
| 183 |
+
except Exception as e:
|
| 184 |
+
logs_append(f"{get_whois_server.__name__}:{str(e)}")
|
| 185 |
return None
|