Spaces:
Runtime error
Runtime error
import json | |
import os | |
from concurrent.futures import ThreadPoolExecutor | |
from pathlib import Path | |
from typing import Any | |
import cachetools | |
import gradio as gr | |
import requests | |
import urllib3 | |
from dns import message | |
_DNS_SERVER = "https://dns.google/dns-query" # can use others | |
_DNS_RECORD_TYPES = [ | |
"A", | |
"AAAA", | |
"CNAME", | |
"MX", | |
"NS", | |
"SOA", | |
"TXT", | |
"RP", | |
"LOC", | |
"CAA", | |
"SPF", | |
"SRV", | |
"NSEC", | |
"RRSIG", | |
] | |
_COMMON_SUBDOMAINS_TXT_PATH = Path("./subdomains/subdomains.txt") | |
_CACHE_MAX_SIZE = 4096 | |
_CACHE_TTL_SECONDS = 3600 | |
def get_geolocation(ip: str) -> dict[str, Any] | str: | |
"""Get location information from an ip address. | |
Returns the following information on an ip address: | |
1. IPv4 | |
2. city | |
4. country_code | |
5. country_name | |
6. latitude | |
7. longitude | |
8. postal | |
9. state | |
Example: | |
>>> from pprint import pprint | |
>>> pprint(get_location("103.100.104.0")) | |
... {'IPv4': '103.100.104.0', | |
'city': None, | |
'country_code': 'NZ', | |
'country_name': 'New Zealand', | |
'latitude': -41, | |
'longitude': 174, | |
'postal': None, | |
'state': None} | |
Args: | |
ip: ip address | |
Returns: | |
Location information on the ip address. | |
""" | |
try: | |
return requests.get( | |
f"https://geolocation-db.com/json/{ip.strip()}", | |
timeout=1, | |
).json() | |
except Exception as e: # noqa: BLE001 | |
return str(e) | |
def _request_dns_record( # noqa: D417 | |
domain: str, | |
record_type: str, | |
timeout: float = 0.5, | |
) -> list[str]: | |
"""Utility to build dns resolve requests that do not use port 53. | |
Args: | |
domain: domain to investigate | |
record_type: record type | |
Returns: | |
Information about the dns record type for the domain. | |
""" | |
q = message.make_query(domain, record_type) | |
response = requests.post( | |
_DNS_SERVER, | |
headers={ | |
"Content-Type": "application/dns-message", | |
"Accept": "application/dns-message", | |
}, | |
data=q.to_wire(), | |
verify=True, | |
timeout=timeout, | |
) | |
dns_message = message.from_wire(response.content) | |
return [str(rdata) for rdata in dns_message.answer[0]] if dns_message.answer else [] | |
# see: https://thepythoncode.com/article/dns-enumeration-with-python | |
# https://dnspython.readthedocs.io | |
def enumerate_dns(domain_name: str) -> dict[str, Any] | None: | |
r"""Enumerates information about a specific domain's DNS configuration. | |
Information collected about the domain name: | |
1. A records: the IPv4 associated with the domain | |
2. AAAA records: the IPv6 associated with the domain | |
3. CAA records: used by owners to specify which Certificate Authorities | |
are authorized to issue SSL/TLS certificates for their domains. | |
4. CNAME records: alias of one name to another - the DNS lookup will | |
continue by retrying the lookup with the new name. | |
5. LOC records: geographic location associated with a domain name. | |
6. MX records: associated email servers to the domain. | |
7. NS records: DNS servers that are authoritative for a particular domain. | |
These may be use to inquire information about the domain. | |
8. SOA records: defines authoritative information about a DNS zone, | |
including zone transfers and cache expiration. | |
9. TXT records: used for domain verification and email security. | |
10. RP records: the responsible person for a domain. | |
11. SPF records: defines authorized email servers. | |
12. SRV records: specifies location of specific services | |
(port and host) for the domain. | |
14. NSEC records: proves non-existence of DNS records | |
and prevents zone enumeration. | |
15. RRSIG records: contains cryptographic signatures for DNSSEC-signed | |
records, providing authentication and integrity. | |
Example: | |
>>> from pprint import pprint | |
>>> pprint(enumerate_dns("youtube.com")) | |
... {'A': 'youtube.com. 300 IN A 142.250.200.142', | |
'AAAA': 'youtube.com. 286 IN AAAA 2a00:1450:4003:80f::200e', | |
'CAA': 'youtube.com. 14352 IN CAA 0 issue "pki.goog"', | |
'CNAME': None, | |
'LOC': None, | |
'MX': 'youtube.com. 300 IN MX 0 smtp.google.com.', | |
'NS': 'youtube.com. 21600 IN NS ns4.google.com.\n' | |
'youtube.com. 21600 IN NS ns1.google.com.\n' | |
'youtube.com. 21600 IN NS ns2.google.com.\n' | |
'youtube.com. 21600 IN NS ns3.google.com.', | |
'NSEC': None, | |
'RP': None, | |
'RRSIG': None, | |
'SOA': 'youtube.com. 60 IN SOA ns1.google.com. dns-admin.google.com. ' | |
'766113658 900 900 1800 60', | |
'SPF': None, | |
'SRV': None, | |
'TXT': 'youtube.com. 3586 IN TXT "v=spf1 include:google.com mx -all"\n' | |
'youtube.com. 3586 IN TXT ' | |
'"facebook-domain-verification=64jdes7le4h7e7lfpi22rijygx58j1"\n' | |
'youtube.com. 3586 IN TXT ' | |
'"google-site-verification=QtQWEwHWM8tHiJ4s-jJWzEQrD_fF3luPnpzNDH-Nw-w"'} | |
Args: | |
domain_name: domain name for which to | |
enumerate the DNS configuration. | |
Returns: | |
The domain's DNS configuration. | |
""" | |
enumeration = {} | |
for record_type in _DNS_RECORD_TYPES: | |
try: | |
record = _request_dns_record(domain_name.strip(), record_type, timeout=1) | |
if record: | |
enumeration[record_type] = record | |
except Exception as e: # noqa: BLE001, PERF203 | |
enumeration[record_type] = [str(e)] | |
return enumeration if enumeration else None | |
def resolve_subdomain_ipv4(domain: str) -> str | None: | |
"""Resolve the IPv4 address of a domain. | |
Args: | |
domain: domain name | |
Returns: | |
The domain is returned provided | |
it was resolved. Otherwise nothing | |
is returned. | |
""" | |
try: | |
ipv4 = _request_dns_record(domain, "A", timeout=0.6) | |
if ipv4: | |
return domain | |
msg = "Cannot resolve it: it is likely non-existing" | |
raise Exception(msg) # noqa: TRY002, TRY301 | |
except Exception: # noqa: BLE001 | |
return None | |
def scrap_subdomains_for_domain(domain_name: str) -> list[str]: | |
"""Retrieves subdomains associated to a domain if any. | |
The information retrieved from a domain is its subdomains | |
provided they are the top 1000 subdomain prefixes as | |
indicated by https://github.com/rbsec/dnscan/tree/master | |
Importantly, it finds subdomains only if their prefixes | |
are along the top 1000 most common. Hence, it may not | |
yield all the subdomains associated to the domain. | |
Example: | |
>>> scrap_subdomains_for_domain("github.com") | |
... ['www.github.com', 'smtp.github.com', 'ns1.github.com', | |
'ns2.github.com','autodiscover.github.com', 'test.github.com', | |
'blog.github.com', 'admin.github.com', 'support.github.com', | |
'docs.github.com', 'shop.github.com', 'wiki.github.com', | |
'api.github.com', 'live.github.com', 'help.github.com', | |
'jobs.github.com', 'services.github.com', 'de.github.com', | |
'cs.github.com', 'fr.github.com', 'ssh.github.com', | |
'partner.github.com', 'community.github.com', | |
'mailer.github.com', 'training.github.com', ...] | |
Args: | |
domain_name: domain name for which to retrieve a | |
list of subdomains | |
Returns: | |
List of subdomains if any. | |
""" | |
try: | |
with open(_COMMON_SUBDOMAINS_TXT_PATH) as file: # noqa: PTH123 | |
subdomains = [line.strip() for line in file if line.strip()] | |
except FileNotFoundError: | |
return [] | |
potential_subdomains = [ | |
f"{subdomain}.{domain_name.strip()}" for subdomain in subdomains | |
] | |
with ThreadPoolExecutor(max_workers=None) as executor: | |
results = executor.map(resolve_subdomain_ipv4, potential_subdomains) | |
return [domain for domain in results if domain] | |
def retrieve_ioc_from_threatfox(potentially_ioc: str) -> str: | |
r"""Retrieves information about a potential IoC from ThreatFox. | |
It may be used to retrieve information of indicators of compromise | |
(IOCs) associated with malware, with the infosec community, AV | |
vendors and cyber threat intelligence providers. | |
Examples: | |
>>> retrieve_ioc_from_threatfox("139.180.203.104") | |
... { | |
"query_status": "ok", | |
"data": [ | |
{ | |
"id": "12", | |
"ioc": "139.180.203.104:443", | |
"threat_type": "botnet_cc", | |
"threat_type_desc": "Indicator that identifies a botnet command&control...", | |
"ioc_type": "ip:port", | |
"ioc_type_desc": "ip:port combination that is used for botnet Command&..., | |
"malware": "win.cobalt_strike", | |
"malware_printable": "Cobalt Strike", | |
"malware_alias": "Agentemis,BEACON,CobaltStrike", | |
"malware_malpedia": "https:\/\/malpedia.caad.fkie.fraunhofer.de\/...", | |
"confidence_level": 75, | |
"first_seen": "2020-12-06 09:10:23 UTC", | |
"last_seen": null, | |
"reference": null, | |
"reporter": "abuse_ch", | |
"tags": null, | |
"malware_samples": [ | |
{ | |
"time_stamp": "2021-03-23 08:18:06 UTC", | |
"md5_hash": "5b7e82e051ade4b14d163eea2a17bf8b", | |
"sha256_hash": "b325c92fa540edeb89b95dbfd4400c1cb33599c66859....", | |
"malware_bazaar": "https:\/\/bazaar.abuse.ch\/sample\/b325c...\/" | |
}, | |
] | |
} | |
] | |
} | |
Args: | |
potentially_ioc: this can be a url, a domain, a hash, | |
or any other type of IoC. | |
Returns: | |
Information of the input as an IoC: threat type, malware type andsamples, | |
confidence level, first/last seen dates, and more IoC information. | |
""" | |
headers = {"Auth-Key": os.environ["THREATFOX_APIKEY"]} | |
pool = urllib3.HTTPSConnectionPool( | |
"threatfox-api.abuse.ch", | |
port=443, | |
maxsize=50, | |
headers=headers, | |
timeout=5, | |
) | |
data = { | |
"query": "search_ioc", | |
"search_term": potentially_ioc.strip(), | |
} | |
json_data = json.dumps(data) | |
try: | |
response = pool.request("POST", "/api/v1/", body=json_data) | |
return response.data.decode("utf-8", "ignore") | |
except Exception as e: # noqa: BLE001 | |
return str(e) | |
geo_location_tool = gr.Interface( | |
fn=get_geolocation, | |
inputs=gr.Textbox(label="ip"), | |
outputs=gr.JSON(label="Geolocation of IP"), | |
title="Domain Associated Geolocation Finder", | |
description="Retrieves the geolocation associated to an input ip address", | |
theme="default", | |
examples=["1.0.3.255", "59.34.7.3"], | |
) | |
dns_enumeration_tool = gr.Interface( | |
fn=enumerate_dns, | |
inputs=gr.Textbox(label="domain"), | |
outputs=gr.JSON(label="DNS records"), | |
title="DNS record enumerator of domains", | |
description="Retrieves several dns record types for the input domain names", | |
theme="default", | |
examples=["owasp.org", "nist.gov"], | |
) | |
scrap_subdomains_tool = gr.Interface( | |
fn=scrap_subdomains_for_domain, | |
inputs=gr.Textbox(label="domain"), | |
outputs=gr.JSON(label="Subdomains managed by domain"), | |
title="Subdomains Extractor of domains", | |
description="Retrieves the subdomains for the input domain if they are common", | |
theme="default", | |
examples=["github.com", "netacea.com"], | |
) | |
extractor_of_ioc_from_threatfox_tool = gr.Interface( | |
fn=retrieve_ioc_from_threatfox, | |
inputs=gr.Textbox(label="IoC - url, domains or hash"), | |
outputs=gr.Text(label="Entity information as an IoC"), | |
title="IoC information extractor associated to particular entities", | |
description=( | |
"If information as an Indicator of Compromise (IoC) exists " | |
"for the input url, domain or hash, it retrieves it" | |
), | |
theme="default", | |
examples=["advertipros.com", "dev.couplesparks.com"], | |
example_labels=["👾 IoC 1", "👾 IoC 2"], | |
) | |