TDAgentTools / tdagent /tools /query_abuse_ip_db.py
pedrobento988's picture
improve_ui (#12)
3e2bf63 verified
from __future__ import annotations
import os
from dataclasses import dataclass
from datetime import datetime
from typing import Any
import gradio as gr
import requests
# API docs: https://docs.abuseipdb.com/#check-endpoint
@dataclass
class AbuseReport: # noqa: D101
date: str
categories: str
comment: str
reporter: str
@dataclass
class IPCheckResult: # noqa: D101
ip_address: str
abuse_confidence_score: int
total_reports: int
country_code: str
domain: str
isp: str
last_reported: str
reports: list[AbuseReport]
def format_summary(self) -> str:
"""Format a summary of the IP check result."""
return f"""
IP Address: {self.ip_address}
Abuse Confidence Score: {self.abuse_confidence_score}%
Total Reports: {self.total_reports}
Country: {self.country_code}
Domain: {self.domain}
ISP: {self.isp}
Last Reported: {self.last_reported}
"""
def check_ip(ip_address: str, api_key: str, days: str = "30") -> dict[str, Any]:
"""Query the AbuseIPDB API to check if an IP address has been reported.
Args:
ip_address: The IP address to check
api_key: Your AbuseIPDB API key
days: Number of days to check (default: 30)
Returns:
API response data as a dictionary
"""
url = "https://api.abuseipdb.com/api/v2/check"
headers = {"Accept": "application/json", "Key": api_key}
params = {
"ipAddress": ip_address,
"maxAgeInDays": days,
"verbose": str(True),
}
try:
response = requests.get(
url,
headers=headers,
params=params,
timeout=30,
)
response.raise_for_status() # Raise exception for HTTP errors
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def parse_response(
response: dict[str, Any],
) -> tuple[IPCheckResult | None, str]:
"""Parse the API response into a dataclass.
Args:
response: The API response dictionary
Returns:
A tuple containing (result_object, error_message)
If successful, error_message will be None
If failed, result_object will be None
"""
if "error" in response:
return None, f"Error: {response['error']}"
data = response.get("data", {})
# Create a list of AbuseReport objects
reports = []
if data.get("reports"):
for report in data["reports"]:
reported_at = datetime.fromisoformat(
report["reportedAt"].replace("Z", "+00:00"),
).strftime("%Y-%m-%d %H:%M:%S")
categories = ", ".join([str(cat) for cat in report.get("categories", [])])
reports.append(
AbuseReport(
date=reported_at,
categories=categories,
comment=report.get("comment", ""),
reporter=str(report.get("reporterId", "Anonymous")),
),
)
# Create the main result object
result = IPCheckResult(
ip_address=data.get("ipAddress", "N/A"),
abuse_confidence_score=data.get("abuseConfidenceScore", 0),
total_reports=data.get("totalReports", 0),
country_code=data.get("countryCode", "N/A"),
domain=data.get("domain", "N/A"),
isp=data.get("isp", "N/A"),
last_reported=data.get("lastReportedAt", "Never"),
reports=reports,
)
return result, ""
def query_abuseipdb(ip_address: str, days: int = 30) -> str:
"""Query AbuseIP to find if an IP has been reported for abusive behavior.
Args:
ip_address: The IP address to check
api_key: Your AbuseIPDB API key
days: Number of days to check
Returns:
A tuple containing (summary_text, reports_text)
"""
if not ip_address:
return "Please provide both an IP address and API key"
api_key = os.environ["ABUSEIPDB_KEY"]
response = check_ip(ip_address, api_key, str(days))
result, error = parse_response(response)
if result:
return result.format_summary()
return error
gr_query_abuseipdb = gr.Interface(
fn=query_abuseipdb,
inputs=gr.Textbox(label="ip"),
outputs=gr.Text(label="Report on abusive behaviour"),
title="AbuseIPDB IP Checker",
description=(
"Check if an IP address has been reported for abusive behavior"
" using AbuseIP DB API"
),
examples=["5.252.155.14", "77.239.99.248"],
example_labels=["👾 Malicious IP 1", "👾 Malicious IP 2"],
)