""" OSINT engine for username and person search. """ from typing import Dict, List, Any, Optional import asyncio from datetime import datetime import json import whois from holehe.core import AsyncEngine from holehe.localuseragent import ua import subprocess import tempfile import os import geopy from geopy.geocoders import Nominatim from geopy.exc import GeocoderTimedOut class OSINTEngine: def __init__(self): self.holehe_engine = AsyncEngine() self.geocoder = Nominatim(user_agent="osint_search") async def search_username(self, username: str) -> Dict[str, Any]: """Search for username across platforms.""" results = { "platforms": [], "emails": [], "metadata": {} } # Holehe search try: holehe_results = await self.holehe_engine.check_all(username) for result in holehe_results: if result["exists"]: results["platforms"].append({ "name": result["name"], "url": result["url"] if "url" in result else None, "type": "social" if "social" in result["type"] else "other" }) if "email" in result and result["email"]: results["emails"].append(result["email"]) except Exception as e: print(f"Holehe search error: {e}") # Sherlock search using subprocess try: with tempfile.TemporaryDirectory() as temp_dir: output_file = os.path.join(temp_dir, "sherlock_results.txt") process = subprocess.Popen( ["sherlock", username, "--output", output_file], stdout=subprocess.PIPE, stderr=subprocess.PIPE ) stdout, stderr = process.communicate(timeout=30) if os.path.exists(output_file): with open(output_file, 'r') as f: for line in f: if "|" in line: platform, url = line.strip().split("|") results["platforms"].append({ "name": platform.strip(), "url": url.strip(), "type": "social" }) except Exception as e: print(f"Sherlock search error: {e}") # Deduplicate results results["platforms"] = list({json.dumps(x) for x in results["platforms"]}) results["platforms"] = [json.loads(x) for x in results["platforms"]] results["emails"] = list(set(results["emails"])) return results async def search_person(self, name: str, location: Optional[str] = None, age: Optional[int] = None) -> Dict[str, Any]: """Search for person information.""" results = { "basic_info": {}, "locations": [], "social_profiles": [], "metadata": {} } # Process location if provided if location: try: location_info = self.geocoder.geocode(location, timeout=10) if location_info: results["locations"].append({ "address": location_info.address, "latitude": location_info.latitude, "longitude": location_info.longitude }) except GeocoderTimedOut: print("Geocoding timed out") except Exception as e: print(f"Geocoding error: {e}") # Basic info results["basic_info"] = { "name": name, "age": age if age else None, "location": location if location else None } # Search for potential usernames usernames = self._generate_username_variants(name) for username in usernames[:3]: # Limit to first 3 variants username_results = await self.search_username(username) results["social_profiles"].extend(username_results["platforms"]) # Deduplicate social profiles results["social_profiles"] = list({json.dumps(x) for x in results["social_profiles"]}) results["social_profiles"] = [json.loads(x) for x in results["social_profiles"]] return results def _generate_username_variants(self, name: str) -> List[str]: """Generate possible username variants from a name.""" name = name.lower() parts = name.split() variants = [] if len(parts) >= 2: first, last = parts[0], parts[-1] variants.extend([ first + last, first + "_" + last, first + "." + last, first[0] + last, first + last[0], last + first ]) if len(parts) == 1: variants.extend([ parts[0], parts[0] + "123", "the" + parts[0] ]) return list(set(variants)) async def search_domain(self, domain: str) -> Dict[str, Any]: """Get information about a domain.""" try: domain_info = whois.whois(domain) return { "registrar": domain_info.registrar, "creation_date": domain_info.creation_date, "expiration_date": domain_info.expiration_date, "last_updated": domain_info.updated_date, "status": domain_info.status, "name_servers": domain_info.name_servers, "emails": domain_info.emails, "raw": domain_info } except Exception as e: return { "error": str(e) }