ISE / engines /osint.py
fikird
fix: Runtime errors and dependency issues
048fa41
raw
history blame
6.04 kB
"""
OSINT engine for username and person search.
"""
from typing import Dict, List, Any, Optional
import asyncio
from datetime import datetime
import json
import whois
from holehe.core import AsyncEngine
from holehe.localuseragent import ua
import subprocess
import tempfile
import os
import geopy
from geopy.geocoders import Nominatim
from geopy.exc import GeocoderTimedOut
class OSINTEngine:
def __init__(self):
self.holehe_engine = AsyncEngine()
self.geocoder = Nominatim(user_agent="osint_search")
async def search_username(self, username: str) -> Dict[str, Any]:
"""Search for username across platforms."""
results = {
"platforms": [],
"emails": [],
"metadata": {}
}
# Holehe search
try:
holehe_results = await self.holehe_engine.check_all(username)
for result in holehe_results:
if result["exists"]:
results["platforms"].append({
"name": result["name"],
"url": result["url"] if "url" in result else None,
"type": "social" if "social" in result["type"] else "other"
})
if "email" in result and result["email"]:
results["emails"].append(result["email"])
except Exception as e:
print(f"Holehe search error: {e}")
# Sherlock search using subprocess
try:
with tempfile.TemporaryDirectory() as temp_dir:
output_file = os.path.join(temp_dir, "sherlock_results.txt")
process = subprocess.Popen(
["sherlock", username, "--output", output_file],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
stdout, stderr = process.communicate(timeout=30)
if os.path.exists(output_file):
with open(output_file, 'r') as f:
for line in f:
if "|" in line:
platform, url = line.strip().split("|")
results["platforms"].append({
"name": platform.strip(),
"url": url.strip(),
"type": "social"
})
except Exception as e:
print(f"Sherlock search error: {e}")
# Deduplicate results
results["platforms"] = list({json.dumps(x) for x in results["platforms"]})
results["platforms"] = [json.loads(x) for x in results["platforms"]]
results["emails"] = list(set(results["emails"]))
return results
async def search_person(self, name: str, location: Optional[str] = None, age: Optional[int] = None) -> Dict[str, Any]:
"""Search for person information."""
results = {
"basic_info": {},
"locations": [],
"social_profiles": [],
"metadata": {}
}
# Process location if provided
if location:
try:
location_info = self.geocoder.geocode(location, timeout=10)
if location_info:
results["locations"].append({
"address": location_info.address,
"latitude": location_info.latitude,
"longitude": location_info.longitude
})
except GeocoderTimedOut:
print("Geocoding timed out")
except Exception as e:
print(f"Geocoding error: {e}")
# Basic info
results["basic_info"] = {
"name": name,
"age": age if age else None,
"location": location if location else None
}
# Search for potential usernames
usernames = self._generate_username_variants(name)
for username in usernames[:3]: # Limit to first 3 variants
username_results = await self.search_username(username)
results["social_profiles"].extend(username_results["platforms"])
# Deduplicate social profiles
results["social_profiles"] = list({json.dumps(x) for x in results["social_profiles"]})
results["social_profiles"] = [json.loads(x) for x in results["social_profiles"]]
return results
def _generate_username_variants(self, name: str) -> List[str]:
"""Generate possible username variants from a name."""
name = name.lower()
parts = name.split()
variants = []
if len(parts) >= 2:
first, last = parts[0], parts[-1]
variants.extend([
first + last,
first + "_" + last,
first + "." + last,
first[0] + last,
first + last[0],
last + first
])
if len(parts) == 1:
variants.extend([
parts[0],
parts[0] + "123",
"the" + parts[0]
])
return list(set(variants))
async def search_domain(self, domain: str) -> Dict[str, Any]:
"""Get information about a domain."""
try:
domain_info = whois.whois(domain)
return {
"registrar": domain_info.registrar,
"creation_date": domain_info.creation_date,
"expiration_date": domain_info.expiration_date,
"last_updated": domain_info.updated_date,
"status": domain_info.status,
"name_servers": domain_info.name_servers,
"emails": domain_info.emails,
"raw": domain_info
}
except Exception as e:
return {
"error": str(e)
}