Pedro Bento commited on
Commit
5061d39
·
1 Parent(s): 481cde4

Added AbuseIP db tool

Browse files
Files changed (2) hide show
  1. app.py +2 -1
  2. tdagent/tools/query_abuse_ip_db.py +154 -0
app.py CHANGED
@@ -2,12 +2,13 @@ import gradio as gr
2
 
3
  from tdagent.tools.get_url_content import gr_get_url_http_content
4
  from tdagent.tools.letter_counter import gr_letter_counter
5
-
6
 
7
  gr_app = gr.TabbedInterface(
8
  [
9
  gr_get_url_http_content,
10
  gr_letter_counter,
 
11
  ],
12
  )
13
 
 
2
 
3
  from tdagent.tools.get_url_content import gr_get_url_http_content
4
  from tdagent.tools.letter_counter import gr_letter_counter
5
+ from tdagent.tools.query_abuse_ip_db import gr_query_abuseipdb
6
 
7
  gr_app = gr.TabbedInterface(
8
  [
9
  gr_get_url_http_content,
10
  gr_letter_counter,
11
+ gr_query_abuseipdb,
12
  ],
13
  )
14
 
tdagent/tools/query_abuse_ip_db.py ADDED
@@ -0,0 +1,154 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+
3
+ import requests
4
+ from datetime import datetime
5
+ from dataclasses import dataclass
6
+ from typing import List, Optional, Dict, Any, Tuple, Union
7
+ import gradio as gr
8
+
9
+
10
+ # API docs: https://docs.abuseipdb.com/#check-endpoint
11
+
12
+ @dataclass
13
+ class AbuseReport:
14
+ date: str
15
+ categories: str
16
+ comment: str
17
+ reporter: str
18
+
19
+
20
+ @dataclass
21
+ class IPCheckResult:
22
+ ip_address: str
23
+ abuse_confidence_score: int
24
+ total_reports: int
25
+ country_code: str
26
+ domain: str
27
+ isp: str
28
+ last_reported: str
29
+ reports: List[AbuseReport]
30
+
31
+ def format_summary(self) -> str:
32
+ """Format a summary of the IP check result"""
33
+ return f"""
34
+ IP Address: {self.ip_address}
35
+ Abuse Confidence Score: {self.abuse_confidence_score}%
36
+ Total Reports: {self.total_reports}
37
+ Country: {self.country_code}
38
+ Domain: {self.domain}
39
+ ISP: {self.isp}
40
+ Last Reported: {self.last_reported}
41
+ """
42
+
43
+
44
+ def check_ip(ip_address: str, api_key: str, days: str = "30") -> Dict[str, Any]:
45
+ """
46
+ Query the AbuseIPDB API to check if an IP address has been reported.
47
+
48
+ Args:
49
+ ip_address: The IP address to check
50
+ api_key: Your AbuseIPDB API key
51
+ days: Number of days to check (default: 30)
52
+
53
+ Returns:
54
+ API response data as a dictionary
55
+ """
56
+ url = 'https://api.abuseipdb.com/api/v2/check'
57
+
58
+ headers = {
59
+ 'Accept': 'application/json',
60
+ 'Key': api_key
61
+ }
62
+
63
+ params = {
64
+ 'ipAddress': ip_address,
65
+ 'maxAgeInDays': days,
66
+ 'verbose': True
67
+ }
68
+
69
+ try:
70
+ response = requests.get(url, headers=headers, params=params)
71
+ response.raise_for_status() # Raise exception for HTTP errors
72
+ return response.json()
73
+ except requests.exceptions.RequestException as e:
74
+ return {"error": str(e)}
75
+
76
+
77
+ def parse_response(response: Dict[str, Any]) -> Tuple[Optional[IPCheckResult], Optional[str]]:
78
+ """
79
+ Parse the API response into a dataclass
80
+
81
+ Args:
82
+ response: The API response dictionary
83
+
84
+ Returns:
85
+ A tuple containing (result_object, error_message)
86
+ If successful, error_message will be None
87
+ If failed, result_object will be None
88
+ """
89
+ if "error" in response:
90
+ return None, f"Error: {response['error']}"
91
+
92
+ data = response.get("data", {})
93
+
94
+ # Create a list of AbuseReport objects
95
+ reports = []
96
+ if "reports" in data and data["reports"]:
97
+ for report in data["reports"]:
98
+ reported_at = datetime.fromisoformat(report["reportedAt"].replace("Z", "+00:00")).strftime(
99
+ "%Y-%m-%d %H:%M:%S")
100
+ categories = ", ".join([str(cat) for cat in report.get("categories", [])])
101
+
102
+ reports.append(AbuseReport(
103
+ date=reported_at,
104
+ categories=categories,
105
+ comment=report.get("comment", ""),
106
+ reporter=str(report.get("reporterId", "Anonymous"))
107
+ ))
108
+
109
+ # Create the main result object
110
+ result = IPCheckResult(
111
+ ip_address=data.get("ipAddress", "N/A"),
112
+ abuse_confidence_score=data.get("abuseConfidenceScore", 0),
113
+ total_reports=data.get("totalReports", 0),
114
+ country_code=data.get("countryCode", "N/A"),
115
+ domain=data.get("domain", "N/A"),
116
+ isp=data.get("isp", "N/A"),
117
+ last_reported=data.get("lastReportedAt", "Never"),
118
+ reports=reports
119
+ )
120
+
121
+ return result, None
122
+
123
+
124
+ def query_abuseipdb(ip_address: str, api_key: str, days: int) -> str:
125
+ """
126
+ Main function to query AbuseIPDB and format the response for Gradio
127
+
128
+ Args:
129
+ ip_address: The IP address to check
130
+ api_key: Your AbuseIPDB API key
131
+ days: Number of days to check
132
+
133
+ Returns:
134
+ A tuple containing (summary_text, reports_text)
135
+ """
136
+ if not ip_address or not api_key:
137
+ return "Please provide both an IP address and API key"
138
+
139
+ response = check_ip(ip_address, api_key, str(days))
140
+ result, error = parse_response(response)
141
+
142
+ if error:
143
+ return error
144
+
145
+ return result.format_summary()
146
+
147
+
148
+ gr_query_abuseipdb = gr.Interface(
149
+ fn=query_abuseipdb,
150
+ inputs=["ipaddress", os.environ["ABUSEIPDB_KEY"]],
151
+ outputs="string",
152
+ title="AbuseIPDB IP Checker",
153
+ description="Check if an IP address has been reported for abusive behavior using AbuseIP DB API",
154
+ )