TDAgentTools / tdagent /tools /query_abuse_ip_db.py
Pedro Bento
fix input
b1ceeff
raw
history blame
4.33 kB
import os
import requests
from datetime import datetime
from dataclasses import dataclass
from typing import List, Optional, Dict, Any, Tuple, Union
import gradio as gr
# API docs: https://docs.abuseipdb.com/#check-endpoint
@dataclass
class AbuseReport:
date: str
categories: str
comment: str
reporter: str
@dataclass
class IPCheckResult:
ip_address: str
abuse_confidence_score: int
total_reports: int
country_code: str
domain: str
isp: str
last_reported: str
reports: List[AbuseReport]
def format_summary(self) -> str:
"""Format a summary of the IP check result"""
return f"""
IP Address: {self.ip_address}
Abuse Confidence Score: {self.abuse_confidence_score}%
Total Reports: {self.total_reports}
Country: {self.country_code}
Domain: {self.domain}
ISP: {self.isp}
Last Reported: {self.last_reported}
"""
def check_ip(ip_address: str, api_key: str, days: str = "30") -> Dict[str, Any]:
"""
Query the AbuseIPDB API to check if an IP address has been reported.
Args:
ip_address: The IP address to check
api_key: Your AbuseIPDB API key
days: Number of days to check (default: 30)
Returns:
API response data as a dictionary
"""
url = 'https://api.abuseipdb.com/api/v2/check'
headers = {
'Accept': 'application/json',
'Key': api_key
}
params = {
'ipAddress': ip_address,
'maxAgeInDays': days,
'verbose': True
}
try:
response = requests.get(url, headers=headers, params=params)
response.raise_for_status() # Raise exception for HTTP errors
return response.json()
except requests.exceptions.RequestException as e:
return {"error": str(e)}
def parse_response(response: Dict[str, Any]) -> Tuple[Optional[IPCheckResult], Optional[str]]:
"""
Parse the API response into a dataclass
Args:
response: The API response dictionary
Returns:
A tuple containing (result_object, error_message)
If successful, error_message will be None
If failed, result_object will be None
"""
if "error" in response:
return None, f"Error: {response['error']}"
data = response.get("data", {})
# Create a list of AbuseReport objects
reports = []
if "reports" in data and data["reports"]:
for report in data["reports"]:
reported_at = datetime.fromisoformat(report["reportedAt"].replace("Z", "+00:00")).strftime(
"%Y-%m-%d %H:%M:%S")
categories = ", ".join([str(cat) for cat in report.get("categories", [])])
reports.append(AbuseReport(
date=reported_at,
categories=categories,
comment=report.get("comment", ""),
reporter=str(report.get("reporterId", "Anonymous"))
))
# Create the main result object
result = IPCheckResult(
ip_address=data.get("ipAddress", "N/A"),
abuse_confidence_score=data.get("abuseConfidenceScore", 0),
total_reports=data.get("totalReports", 0),
country_code=data.get("countryCode", "N/A"),
domain=data.get("domain", "N/A"),
isp=data.get("isp", "N/A"),
last_reported=data.get("lastReportedAt", "Never"),
reports=reports
)
return result, None
def query_abuseipdb(ip_address: str, days: int = 30) -> str:
"""
Main function to query AbuseIPDB and format the response for Gradio
Args:
ip_address: The IP address to check
api_key: Your AbuseIPDB API key
days: Number of days to check
Returns:
A tuple containing (summary_text, reports_text)
"""
if not ip_address:
return "Please provide both an IP address and API key"
api_key = os.environ["ABUSEIPDB_KEY"]
response = check_ip(ip_address, api_key, str(days))
result, error = parse_response(response)
if error:
return error
return result.format_summary()
gr_query_abuseipdb = gr.Interface(
fn=query_abuseipdb,
inputs=["text"],
outputs="text",
title="AbuseIPDB IP Checker",
description="Check if an IP address has been reported for abusive behavior using AbuseIP DB API",
)