File size: 6,037 Bytes
48922fa
048fa41
48922fa
 
 
1f9ba54
048fa41
1f9ba54
048fa41
 
 
 
 
 
48922fa
 
 
 
 
048fa41
 
 
48922fa
048fa41
1f9ba54
048fa41
 
 
48922fa
 
1f9ba54
 
048fa41
1f9ba54
048fa41
 
 
 
 
1f9ba54
048fa41
 
1f9ba54
048fa41
48922fa
048fa41
48922fa
048fa41
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48922fa
048fa41
 
 
 
 
 
 
 
48922fa
048fa41
 
1f9ba54
048fa41
 
1f9ba54
048fa41
1f9ba54
 
048fa41
1f9ba54
048fa41
 
 
 
 
 
 
 
 
 
 
 
1f9ba54
048fa41
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1f9ba54
 
 
048fa41
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
48922fa
048fa41
 
 
 
 
 
 
 
 
 
 
48922fa
048fa41
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
"""
OSINT engine for username and person search.
"""
from typing import Dict, List, Any, Optional
import asyncio
from datetime import datetime
import json
import whois
from holehe.core import AsyncEngine
from holehe.localuseragent import ua
import subprocess
import tempfile
import os
import geopy
from geopy.geocoders import Nominatim
from geopy.exc import GeocoderTimedOut

class OSINTEngine:
    def __init__(self):
        self.holehe_engine = AsyncEngine()
        self.geocoder = Nominatim(user_agent="osint_search")
    
    async def search_username(self, username: str) -> Dict[str, Any]:
        """Search for username across platforms."""
        results = {
            "platforms": [],
            "emails": [],
            "metadata": {}
        }
        
        # Holehe search
        try:
            holehe_results = await self.holehe_engine.check_all(username)
            for result in holehe_results:
                if result["exists"]:
                    results["platforms"].append({
                        "name": result["name"],
                        "url": result["url"] if "url" in result else None,
                        "type": "social" if "social" in result["type"] else "other"
                    })
                if "email" in result and result["email"]:
                    results["emails"].append(result["email"])
        except Exception as e:
            print(f"Holehe search error: {e}")
        
        # Sherlock search using subprocess
        try:
            with tempfile.TemporaryDirectory() as temp_dir:
                output_file = os.path.join(temp_dir, "sherlock_results.txt")
                process = subprocess.Popen(
                    ["sherlock", username, "--output", output_file],
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE
                )
                stdout, stderr = process.communicate(timeout=30)
                
                if os.path.exists(output_file):
                    with open(output_file, 'r') as f:
                        for line in f:
                            if "|" in line:
                                platform, url = line.strip().split("|")
                                results["platforms"].append({
                                    "name": platform.strip(),
                                    "url": url.strip(),
                                    "type": "social"
                                })
        except Exception as e:
            print(f"Sherlock search error: {e}")
        
        # Deduplicate results
        results["platforms"] = list({json.dumps(x) for x in results["platforms"]})
        results["platforms"] = [json.loads(x) for x in results["platforms"]]
        results["emails"] = list(set(results["emails"]))
        
        return results
    
    async def search_person(self, name: str, location: Optional[str] = None, age: Optional[int] = None) -> Dict[str, Any]:
        """Search for person information."""
        results = {
            "basic_info": {},
            "locations": [],
            "social_profiles": [],
            "metadata": {}
        }
        
        # Process location if provided
        if location:
            try:
                location_info = self.geocoder.geocode(location, timeout=10)
                if location_info:
                    results["locations"].append({
                        "address": location_info.address,
                        "latitude": location_info.latitude,
                        "longitude": location_info.longitude
                    })
            except GeocoderTimedOut:
                print("Geocoding timed out")
            except Exception as e:
                print(f"Geocoding error: {e}")
        
        # Basic info
        results["basic_info"] = {
            "name": name,
            "age": age if age else None,
            "location": location if location else None
        }
        
        # Search for potential usernames
        usernames = self._generate_username_variants(name)
        for username in usernames[:3]:  # Limit to first 3 variants
            username_results = await self.search_username(username)
            results["social_profiles"].extend(username_results["platforms"])
        
        # Deduplicate social profiles
        results["social_profiles"] = list({json.dumps(x) for x in results["social_profiles"]})
        results["social_profiles"] = [json.loads(x) for x in results["social_profiles"]]
        
        return results
    
    def _generate_username_variants(self, name: str) -> List[str]:
        """Generate possible username variants from a name."""
        name = name.lower()
        parts = name.split()
        variants = []
        
        if len(parts) >= 2:
            first, last = parts[0], parts[-1]
            variants.extend([
                first + last,
                first + "_" + last,
                first + "." + last,
                first[0] + last,
                first + last[0],
                last + first
            ])
        
        if len(parts) == 1:
            variants.extend([
                parts[0],
                parts[0] + "123",
                "the" + parts[0]
            ])
        
        return list(set(variants))
    
    async def search_domain(self, domain: str) -> Dict[str, Any]:
        """Get information about a domain."""
        try:
            domain_info = whois.whois(domain)
            return {
                "registrar": domain_info.registrar,
                "creation_date": domain_info.creation_date,
                "expiration_date": domain_info.expiration_date,
                "last_updated": domain_info.updated_date,
                "status": domain_info.status,
                "name_servers": domain_info.name_servers,
                "emails": domain_info.emails,
                "raw": domain_info
            }
        except Exception as e:
            return {
                "error": str(e)
            }