cache_and_increase_timeout

#8
README.md CHANGED
@@ -8,6 +8,8 @@ sdk_version: 5.32.1
8
  app_file: app.py
9
  pinned: false
10
  license: apache-2.0
 
 
11
  short_description: tdb
12
  ---
13
 
 
8
  app_file: app.py
9
  pinned: false
10
  license: apache-2.0
11
+ tags:
12
+ - mcp-server-track
13
  short_description: tdb
14
  ---
15
 
requirements-dev.txt CHANGED
@@ -1,5 +1,5 @@
1
  # This file was autogenerated by uv via the following command:
2
- # uv export --format requirements.txt --no-hashes --group dev --group test -o requirements-dev.txt
3
  aiofiles==24.1.0
4
  # via
5
  # gradio
 
1
  # This file was autogenerated by uv via the following command:
2
+ # uv export --format requirements-txt --no-hashes --group dev --group test -o requirements-dev.txt
3
  aiofiles==24.1.0
4
  # via
5
  # gradio
requirements.txt CHANGED
@@ -1,5 +1,5 @@
1
  # This file was autogenerated by uv via the following command:
2
- # uv export --format requirements.txt --no-hashes --no-dev -o requirements.txt
3
  aiofiles==24.1.0
4
  # via
5
  # gradio
 
1
  # This file was autogenerated by uv via the following command:
2
+ # uv export --format requirements-txt --no-hashes --no-dev -o requirements.txt
3
  aiofiles==24.1.0
4
  # via
5
  # gradio
tdagent/tools/get_domain_information.py CHANGED
@@ -4,6 +4,7 @@ from concurrent.futures import ThreadPoolExecutor
4
  from pathlib import Path
5
  from typing import Any
6
 
 
7
  import gradio as gr
8
  import requests
9
  import urllib3
@@ -30,7 +31,13 @@ _DNS_RECORD_TYPES = [
30
 
31
  _COMMON_SUBDOMAINS_TXT_PATH = Path("./subdomains/subdomains.txt")
32
 
 
 
33
 
 
 
 
 
34
  def get_geolocation(ip: str) -> dict[str, Any] | str:
35
  """Get location information from an ip address.
36
 
@@ -65,13 +72,16 @@ def get_geolocation(ip: str) -> dict[str, Any] | str:
65
  try:
66
  return requests.get(
67
  f"https://geolocation-db.com/json/{ip}",
68
- timeout=0.5,
69
  ).json()
70
  except Exception as e: # noqa: BLE001
71
  return str(e)
72
 
73
-
74
- def _request_dns_record(domain: str, record_type: str) -> list[str]:
 
 
 
75
  """Utility to build dns resolve requests that do not use port 53.
76
 
77
  Args:
@@ -90,7 +100,7 @@ def _request_dns_record(domain: str, record_type: str) -> list[str]:
90
  },
91
  data=q.to_wire(),
92
  verify=True,
93
- timeout=0.2,
94
  )
95
  dns_message = message.from_wire(response.content)
96
  return [str(rdata) for rdata in dns_message.answer[0]] if dns_message.answer else []
@@ -98,6 +108,9 @@ def _request_dns_record(domain: str, record_type: str) -> list[str]:
98
 
99
  # see: https://thepythoncode.com/article/dns-enumeration-with-python
100
  # https://dnspython.readthedocs.io
 
 
 
101
  def enumerate_dns(domain_name: str) -> dict[str, Any] | None:
102
  r"""Enumerates information about a specific domain's DNS configuration.
103
 
@@ -160,14 +173,13 @@ def enumerate_dns(domain_name: str) -> dict[str, Any] | None:
160
  enumeration = {}
161
  for record_type in _DNS_RECORD_TYPES:
162
  try:
163
- record = _request_dns_record(domain_name, record_type)
164
  if record:
165
  enumeration[record_type] = record
166
  except Exception as e: # noqa: BLE001, PERF203
167
  enumeration[record_type] = [str(e)]
168
  return enumeration if enumeration else None
169
 
170
-
171
  def resolve_subdomain_ipv4(domain: str) -> str | None:
172
  """Resolve the IPv4 address of a domain.
173
 
@@ -180,12 +192,14 @@ def resolve_subdomain_ipv4(domain: str) -> str | None:
180
  is returned.
181
  """
182
  try:
183
- _request_dns_record(domain, "A")
184
  return domain # noqa: TRY300
185
  except Exception: # noqa: BLE001
186
  return None
187
 
188
-
 
 
189
  def scrap_subdomains_for_domain(domain_name: str) -> list[str]:
190
  """Retrieves subdomains associated to a domain if any.
191
 
@@ -223,11 +237,14 @@ def scrap_subdomains_for_domain(domain_name: str) -> list[str]:
223
  return []
224
 
225
  potential_subdomains = [f"{subdomain}.{domain_name}" for subdomain in subdomains]
226
- with ThreadPoolExecutor(max_workers=5) as executor:
227
  results = executor.map(resolve_subdomain_ipv4, potential_subdomains)
228
  return [domain for domain in results if domain]
229
 
230
 
 
 
 
231
  def retrieve_ioc_from_threatfox(potentially_ioc: str) -> str:
232
  r"""Retrieves information about a potential IoC from ThreatFox.
233
 
@@ -284,6 +301,7 @@ def retrieve_ioc_from_threatfox(potentially_ioc: str) -> str:
284
  port=443,
285
  maxsize=50,
286
  headers=headers,
 
287
  )
288
  data = {
289
  "query": "search_ioc",
@@ -300,7 +318,7 @@ def retrieve_ioc_from_threatfox(potentially_ioc: str) -> str:
300
  geo_location_tool = gr.Interface(
301
  fn=get_geolocation,
302
  inputs=["text"],
303
- outputs=["text"],
304
  title="Domain Associated Geolocation Finder",
305
  description="Retrieves the geolocation associated to an input ip address",
306
  theme="default",
@@ -309,7 +327,7 @@ geo_location_tool = gr.Interface(
309
  dns_enumeration_tool = gr.Interface(
310
  fn=enumerate_dns,
311
  inputs=["text"],
312
- outputs=["text"],
313
  title="DNS record enumerator of domains",
314
  description="Retrieves several dns record types for the input domain names",
315
  theme="default",
@@ -318,7 +336,7 @@ dns_enumeration_tool = gr.Interface(
318
  scrap_subdomains_tool = gr.Interface(
319
  fn=scrap_subdomains_for_domain,
320
  inputs=["text"],
321
- outputs=["text"],
322
  title="Subdomains Extractor of domains",
323
  description="Retrieves the subdomains for the input domain if they are common",
324
  theme="default",
@@ -327,7 +345,7 @@ scrap_subdomains_tool = gr.Interface(
327
  extractor_of_ioc_from_threatfox_tool = gr.Interface(
328
  fn=retrieve_ioc_from_threatfox,
329
  inputs=["text"],
330
- outputs=["text"],
331
  title="IoC information extractor associated to particular entities",
332
  description=(
333
  "If information as an Indicator of Compromise (IoC) exists"
 
4
  from pathlib import Path
5
  from typing import Any
6
 
7
+ import cachetools
8
  import gradio as gr
9
  import requests
10
  import urllib3
 
31
 
32
  _COMMON_SUBDOMAINS_TXT_PATH = Path("./subdomains/subdomains.txt")
33
 
34
+ _CACHE_MAX_SIZE = 4096
35
+ _CACHE_TTL_SECONDS = 3600
36
 
37
+
38
+ @cachetools.cached(
39
+ cache=cachetools.TTLCache(maxsize=_CACHE_MAX_SIZE, ttl=_CACHE_TTL_SECONDS),
40
+ )
41
  def get_geolocation(ip: str) -> dict[str, Any] | str:
42
  """Get location information from an ip address.
43
 
 
72
  try:
73
  return requests.get(
74
  f"https://geolocation-db.com/json/{ip}",
75
+ timeout=1,
76
  ).json()
77
  except Exception as e: # noqa: BLE001
78
  return str(e)
79
 
80
+ def _request_dns_record( # noqa: D417
81
+ domain: str,
82
+ record_type: str,
83
+ timeout: float = 0.5,
84
+ ) -> list[str]:
85
  """Utility to build dns resolve requests that do not use port 53.
86
 
87
  Args:
 
100
  },
101
  data=q.to_wire(),
102
  verify=True,
103
+ timeout=timeout,
104
  )
105
  dns_message = message.from_wire(response.content)
106
  return [str(rdata) for rdata in dns_message.answer[0]] if dns_message.answer else []
 
108
 
109
  # see: https://thepythoncode.com/article/dns-enumeration-with-python
110
  # https://dnspython.readthedocs.io
111
+ @cachetools.cached(
112
+ cache=cachetools.TTLCache(maxsize=_CACHE_MAX_SIZE, ttl=_CACHE_TTL_SECONDS),
113
+ )
114
  def enumerate_dns(domain_name: str) -> dict[str, Any] | None:
115
  r"""Enumerates information about a specific domain's DNS configuration.
116
 
 
173
  enumeration = {}
174
  for record_type in _DNS_RECORD_TYPES:
175
  try:
176
+ record = _request_dns_record(domain_name, record_type, timeout=1)
177
  if record:
178
  enumeration[record_type] = record
179
  except Exception as e: # noqa: BLE001, PERF203
180
  enumeration[record_type] = [str(e)]
181
  return enumeration if enumeration else None
182
 
 
183
  def resolve_subdomain_ipv4(domain: str) -> str | None:
184
  """Resolve the IPv4 address of a domain.
185
 
 
192
  is returned.
193
  """
194
  try:
195
+ _request_dns_record(domain, "A", timeout=0.6)
196
  return domain # noqa: TRY300
197
  except Exception: # noqa: BLE001
198
  return None
199
 
200
+ @cachetools.cached(
201
+ cache=cachetools.TTLCache(maxsize=_CACHE_MAX_SIZE, ttl=_CACHE_TTL_SECONDS),
202
+ )
203
  def scrap_subdomains_for_domain(domain_name: str) -> list[str]:
204
  """Retrieves subdomains associated to a domain if any.
205
 
 
237
  return []
238
 
239
  potential_subdomains = [f"{subdomain}.{domain_name}" for subdomain in subdomains]
240
+ with ThreadPoolExecutor(max_workers=None) as executor:
241
  results = executor.map(resolve_subdomain_ipv4, potential_subdomains)
242
  return [domain for domain in results if domain]
243
 
244
 
245
+ @cachetools.cached(
246
+ cache=cachetools.TTLCache(maxsize=_CACHE_MAX_SIZE, ttl=_CACHE_TTL_SECONDS),
247
+ )
248
  def retrieve_ioc_from_threatfox(potentially_ioc: str) -> str:
249
  r"""Retrieves information about a potential IoC from ThreatFox.
250
 
 
301
  port=443,
302
  maxsize=50,
303
  headers=headers,
304
+ timeout=5,
305
  )
306
  data = {
307
  "query": "search_ioc",
 
318
  geo_location_tool = gr.Interface(
319
  fn=get_geolocation,
320
  inputs=["text"],
321
+ outputs="json",
322
  title="Domain Associated Geolocation Finder",
323
  description="Retrieves the geolocation associated to an input ip address",
324
  theme="default",
 
327
  dns_enumeration_tool = gr.Interface(
328
  fn=enumerate_dns,
329
  inputs=["text"],
330
+ outputs="json",
331
  title="DNS record enumerator of domains",
332
  description="Retrieves several dns record types for the input domain names",
333
  theme="default",
 
336
  scrap_subdomains_tool = gr.Interface(
337
  fn=scrap_subdomains_for_domain,
338
  inputs=["text"],
339
+ outputs="json",
340
  title="Subdomains Extractor of domains",
341
  description="Retrieves the subdomains for the input domain if they are common",
342
  theme="default",
 
345
  extractor_of_ioc_from_threatfox_tool = gr.Interface(
346
  fn=retrieve_ioc_from_threatfox,
347
  inputs=["text"],
348
+ outputs="text",
349
  title="IoC information extractor associated to particular entities",
350
  description=(
351
  "If information as an Indicator of Compromise (IoC) exists"