Spaces:
Runtime error
Runtime error
| """ | |
| OSINT engine for username and person search. | |
| """ | |
| from typing import Dict, List, Any, Optional | |
| import asyncio | |
| from datetime import datetime | |
| import json | |
| import whois | |
| from holehe.core import AsyncEngine | |
| from holehe.localuseragent import ua | |
| import subprocess | |
| import tempfile | |
| import os | |
| import geopy | |
| from geopy.geocoders import Nominatim | |
| from geopy.exc import GeocoderTimedOut | |
| class OSINTEngine: | |
| def __init__(self): | |
| self.holehe_engine = AsyncEngine() | |
| self.geocoder = Nominatim(user_agent="osint_search") | |
| async def search_username(self, username: str) -> Dict[str, Any]: | |
| """Search for username across platforms.""" | |
| results = { | |
| "platforms": [], | |
| "emails": [], | |
| "metadata": {} | |
| } | |
| # Holehe search | |
| try: | |
| holehe_results = await self.holehe_engine.check_all(username) | |
| for result in holehe_results: | |
| if result["exists"]: | |
| results["platforms"].append({ | |
| "name": result["name"], | |
| "url": result["url"] if "url" in result else None, | |
| "type": "social" if "social" in result["type"] else "other" | |
| }) | |
| if "email" in result and result["email"]: | |
| results["emails"].append(result["email"]) | |
| except Exception as e: | |
| print(f"Holehe search error: {e}") | |
| # Sherlock search using subprocess | |
| try: | |
| with tempfile.TemporaryDirectory() as temp_dir: | |
| output_file = os.path.join(temp_dir, "sherlock_results.txt") | |
| process = subprocess.Popen( | |
| ["sherlock", username, "--output", output_file], | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE | |
| ) | |
| stdout, stderr = process.communicate(timeout=30) | |
| if os.path.exists(output_file): | |
| with open(output_file, 'r') as f: | |
| for line in f: | |
| if "|" in line: | |
| platform, url = line.strip().split("|") | |
| results["platforms"].append({ | |
| "name": platform.strip(), | |
| "url": url.strip(), | |
| "type": "social" | |
| }) | |
| except Exception as e: | |
| print(f"Sherlock search error: {e}") | |
| # Deduplicate results | |
| results["platforms"] = list({json.dumps(x) for x in results["platforms"]}) | |
| results["platforms"] = [json.loads(x) for x in results["platforms"]] | |
| results["emails"] = list(set(results["emails"])) | |
| return results | |
| async def search_person(self, name: str, location: Optional[str] = None, age: Optional[int] = None) -> Dict[str, Any]: | |
| """Search for person information.""" | |
| results = { | |
| "basic_info": {}, | |
| "locations": [], | |
| "social_profiles": [], | |
| "metadata": {} | |
| } | |
| # Process location if provided | |
| if location: | |
| try: | |
| location_info = self.geocoder.geocode(location, timeout=10) | |
| if location_info: | |
| results["locations"].append({ | |
| "address": location_info.address, | |
| "latitude": location_info.latitude, | |
| "longitude": location_info.longitude | |
| }) | |
| except GeocoderTimedOut: | |
| print("Geocoding timed out") | |
| except Exception as e: | |
| print(f"Geocoding error: {e}") | |
| # Basic info | |
| results["basic_info"] = { | |
| "name": name, | |
| "age": age if age else None, | |
| "location": location if location else None | |
| } | |
| # Search for potential usernames | |
| usernames = self._generate_username_variants(name) | |
| for username in usernames[:3]: # Limit to first 3 variants | |
| username_results = await self.search_username(username) | |
| results["social_profiles"].extend(username_results["platforms"]) | |
| # Deduplicate social profiles | |
| results["social_profiles"] = list({json.dumps(x) for x in results["social_profiles"]}) | |
| results["social_profiles"] = [json.loads(x) for x in results["social_profiles"]] | |
| return results | |
| def _generate_username_variants(self, name: str) -> List[str]: | |
| """Generate possible username variants from a name.""" | |
| name = name.lower() | |
| parts = name.split() | |
| variants = [] | |
| if len(parts) >= 2: | |
| first, last = parts[0], parts[-1] | |
| variants.extend([ | |
| first + last, | |
| first + "_" + last, | |
| first + "." + last, | |
| first[0] + last, | |
| first + last[0], | |
| last + first | |
| ]) | |
| if len(parts) == 1: | |
| variants.extend([ | |
| parts[0], | |
| parts[0] + "123", | |
| "the" + parts[0] | |
| ]) | |
| return list(set(variants)) | |
| async def search_domain(self, domain: str) -> Dict[str, Any]: | |
| """Get information about a domain.""" | |
| try: | |
| domain_info = whois.whois(domain) | |
| return { | |
| "registrar": domain_info.registrar, | |
| "creation_date": domain_info.creation_date, | |
| "expiration_date": domain_info.expiration_date, | |
| "last_updated": domain_info.updated_date, | |
| "status": domain_info.status, | |
| "name_servers": domain_info.name_servers, | |
| "emails": domain_info.emails, | |
| "raw": domain_info | |
| } | |
| except Exception as e: | |
| return { | |
| "error": str(e) | |
| } | |