Spaces:
Runtime error
Runtime error
File size: 6,037 Bytes
48922fa 048fa41 48922fa 1f9ba54 048fa41 1f9ba54 048fa41 48922fa 048fa41 48922fa 048fa41 1f9ba54 048fa41 48922fa 1f9ba54 048fa41 1f9ba54 048fa41 1f9ba54 048fa41 1f9ba54 048fa41 48922fa 048fa41 48922fa 048fa41 48922fa 048fa41 48922fa 048fa41 1f9ba54 048fa41 1f9ba54 048fa41 1f9ba54 048fa41 1f9ba54 048fa41 1f9ba54 048fa41 1f9ba54 048fa41 48922fa 048fa41 48922fa 048fa41 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
"""
OSINT engine for username and person search.
"""
from typing import Dict, List, Any, Optional
import asyncio
from datetime import datetime
import json
import whois
from holehe.core import AsyncEngine
from holehe.localuseragent import ua
import subprocess
import tempfile
import os
import geopy
from geopy.geocoders import Nominatim
from geopy.exc import GeocoderTimedOut
class OSINTEngine:
def __init__(self):
self.holehe_engine = AsyncEngine()
self.geocoder = Nominatim(user_agent="osint_search")
async def search_username(self, username: str) -> Dict[str, Any]:
"""Search for username across platforms."""
results = {
"platforms": [],
"emails": [],
"metadata": {}
}
# Holehe search
try:
holehe_results = await self.holehe_engine.check_all(username)
for result in holehe_results:
if result["exists"]:
results["platforms"].append({
"name": result["name"],
"url": result["url"] if "url" in result else None,
"type": "social" if "social" in result["type"] else "other"
})
if "email" in result and result["email"]:
results["emails"].append(result["email"])
except Exception as e:
print(f"Holehe search error: {e}")
# Sherlock search using subprocess
try:
with tempfile.TemporaryDirectory() as temp_dir:
output_file = os.path.join(temp_dir, "sherlock_results.txt")
process = subprocess.Popen(
["sherlock", username, "--output", output_file],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
stdout, stderr = process.communicate(timeout=30)
if os.path.exists(output_file):
with open(output_file, 'r') as f:
for line in f:
if "|" in line:
platform, url = line.strip().split("|")
results["platforms"].append({
"name": platform.strip(),
"url": url.strip(),
"type": "social"
})
except Exception as e:
print(f"Sherlock search error: {e}")
# Deduplicate results
results["platforms"] = list({json.dumps(x) for x in results["platforms"]})
results["platforms"] = [json.loads(x) for x in results["platforms"]]
results["emails"] = list(set(results["emails"]))
return results
async def search_person(self, name: str, location: Optional[str] = None, age: Optional[int] = None) -> Dict[str, Any]:
"""Search for person information."""
results = {
"basic_info": {},
"locations": [],
"social_profiles": [],
"metadata": {}
}
# Process location if provided
if location:
try:
location_info = self.geocoder.geocode(location, timeout=10)
if location_info:
results["locations"].append({
"address": location_info.address,
"latitude": location_info.latitude,
"longitude": location_info.longitude
})
except GeocoderTimedOut:
print("Geocoding timed out")
except Exception as e:
print(f"Geocoding error: {e}")
# Basic info
results["basic_info"] = {
"name": name,
"age": age if age else None,
"location": location if location else None
}
# Search for potential usernames
usernames = self._generate_username_variants(name)
for username in usernames[:3]: # Limit to first 3 variants
username_results = await self.search_username(username)
results["social_profiles"].extend(username_results["platforms"])
# Deduplicate social profiles
results["social_profiles"] = list({json.dumps(x) for x in results["social_profiles"]})
results["social_profiles"] = [json.loads(x) for x in results["social_profiles"]]
return results
def _generate_username_variants(self, name: str) -> List[str]:
"""Generate possible username variants from a name."""
name = name.lower()
parts = name.split()
variants = []
if len(parts) >= 2:
first, last = parts[0], parts[-1]
variants.extend([
first + last,
first + "_" + last,
first + "." + last,
first[0] + last,
first + last[0],
last + first
])
if len(parts) == 1:
variants.extend([
parts[0],
parts[0] + "123",
"the" + parts[0]
])
return list(set(variants))
async def search_domain(self, domain: str) -> Dict[str, Any]:
"""Get information about a domain."""
try:
domain_info = whois.whois(domain)
return {
"registrar": domain_info.registrar,
"creation_date": domain_info.creation_date,
"expiration_date": domain_info.expiration_date,
"last_updated": domain_info.updated_date,
"status": domain_info.status,
"name_servers": domain_info.name_servers,
"emails": domain_info.emails,
"raw": domain_info
}
except Exception as e:
return {
"error": str(e)
}
|