apexherbert200 commited on
Commit
ef71aa9
·
1 Parent(s): c9ef318

Working /experimenting

Browse files
Files changed (1) hide show
  1. business.py +189 -69
business.py CHANGED
@@ -104,6 +104,146 @@ def extract_emails(text: str) -> List[str]:
104
 
105
  return filtered_emails[:5] # Limit to 5 most likely emails
106
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
107
  @app.get("/search",
108
  response_model=SearchResponse,
109
  summary="Search Business Directory",
@@ -149,76 +289,28 @@ async def search_businesses(
149
  try:
150
  businesses = []
151
 
152
- # For demo purposes, we'll simulate directory search
153
- # In production, you'd implement actual directory scraping
154
- if source in ["auto", "yellowpages"]:
155
- # Simulate Yellow Pages search
156
- search_url = f"https://www.yellowpages.com/search?search_terms={query.replace(' ', '+')}"
157
 
158
- try:
159
- await page.goto(search_url, timeout=30000)
160
- await page.wait_for_load_state("networkidle", timeout=10000)
161
-
162
- # Extract business listings with robust error handling
163
- listings = await page.query_selector_all(".result")
164
-
165
- for listing in listings[:limit]:
166
- try:
167
- # Extract business name
168
- name_el = await listing.query_selector(".business-name, h3 a, .n")
169
- name = await name_el.inner_text() if name_el else "Unknown Business"
170
- name = name.strip()
171
-
172
- # Extract phone numbers
173
- content = await listing.inner_html()
174
- phones = extract_phone_numbers(content)
175
- primary_phone = phones[0] if phones else None
176
-
177
- # Extract emails
178
- emails = extract_emails(content)
179
- primary_email = emails[0] if emails else None
180
-
181
- # Extract website
182
- website_el = await listing.query_selector("a[href*='http']")
183
- website = None
184
- if website_el:
185
- href = await website_el.get_attribute("href")
186
- if href and not any(x in href for x in ['yellowpages.com', 'maps.google.com']):
187
- website = href
188
-
189
- # Extract address
190
- address_el = await listing.query_selector(".address, .street-address")
191
- address = await address_el.inner_text() if address_el else None
192
-
193
- # Extract industry/category
194
- category_el = await listing.query_selector(".categories, .category")
195
- industry = await category_el.inner_text() if category_el else None
196
-
197
- # Calculate confidence score based on available data
198
- confidence = 0.5 # Base score
199
- if primary_phone: confidence += 0.2
200
- if primary_email: confidence += 0.2
201
- if website: confidence += 0.1
202
-
203
- businesses.append(BusinessContact(
204
- business_name=name,
205
- phone=primary_phone,
206
- email=primary_email,
207
- website=website,
208
- address=address,
209
- industry=industry,
210
- social_profiles={},
211
- source_url=search_url,
212
- confidence_score=round(confidence, 2)
213
- ))
214
-
215
- except Exception as e:
216
- # Skip this listing if extraction fails
217
- continue
218
-
219
- except Exception as e:
220
- # If directory fails, return empty results with error info
221
- pass
222
 
223
  return SearchResponse(
224
  total_found=len(businesses),
@@ -506,4 +598,32 @@ async def bulk_extract_contacts(request: BulkExtractionRequest):
506
  successful=successful,
507
  failed=failed,
508
  results=results
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
509
  )
 
104
 
105
  return filtered_emails[:5] # Limit to 5 most likely emails
106
 
107
+ def generate_sample_businesses(query: str, limit: int) -> List[BusinessContact]:
108
+ """Generate sample business data for demonstration purposes"""
109
+ import random
110
+
111
+ # Sample business data templates
112
+ business_templates = [
113
+ {
114
+ "name_suffix": "Solutions",
115
+ "industry": "Technology",
116
+ "phone_prefix": "555-01",
117
+ "email_domain": "techsolutions.com"
118
+ },
119
+ {
120
+ "name_suffix": "Services",
121
+ "industry": "Consulting",
122
+ "phone_prefix": "555-02",
123
+ "email_domain": "services.net"
124
+ },
125
+ {
126
+ "name_suffix": "Group",
127
+ "industry": "Finance",
128
+ "phone_prefix": "555-03",
129
+ "email_domain": "group.org"
130
+ },
131
+ {
132
+ "name_suffix": "Company",
133
+ "industry": "Manufacturing",
134
+ "phone_prefix": "555-04",
135
+ "email_domain": "company.com"
136
+ },
137
+ {
138
+ "name_suffix": "Associates",
139
+ "industry": "Legal",
140
+ "phone_prefix": "555-05",
141
+ "email_domain": "associates.law"
142
+ }
143
+ ]
144
+
145
+ businesses = []
146
+ query_words = query.lower().split()
147
+ base_name = query_words[0].title() if query_words else "Sample"
148
+
149
+ for i in range(min(limit, len(business_templates))):
150
+ template = business_templates[i]
151
+
152
+ # Generate business name
153
+ business_name = f"{base_name} {template['name_suffix']}"
154
+
155
+ # Generate phone number
156
+ phone = f"{template['phone_prefix']}{random.randint(10, 99)}"
157
+
158
+ # Generate email
159
+ email = f"contact@{base_name.lower()}{template['email_domain']}"
160
+
161
+ # Generate website
162
+ website = f"https://www.{base_name.lower()}{template['name_suffix'].lower()}.com"
163
+
164
+ # Generate address
165
+ addresses = [
166
+ f"{random.randint(100, 9999)} Main St, New York, NY {random.randint(10001, 10999)}",
167
+ f"{random.randint(100, 9999)} Business Ave, Los Angeles, CA {random.randint(90001, 90999)}",
168
+ f"{random.randint(100, 9999)} Commerce Blvd, Chicago, IL {random.randint(60601, 60699)}",
169
+ f"{random.randint(100, 9999)} Industry Dr, Houston, TX {random.randint(77001, 77099)}",
170
+ f"{random.randint(100, 9999)} Corporate Way, Miami, FL {random.randint(33101, 33199)}"
171
+ ]
172
+
173
+ businesses.append(BusinessContact(
174
+ business_name=business_name,
175
+ phone=phone,
176
+ email=email,
177
+ website=website,
178
+ address=addresses[i % len(addresses)],
179
+ industry=template['industry'],
180
+ social_profiles={
181
+ "linkedin": f"https://linkedin.com/company/{base_name.lower()}-{template['name_suffix'].lower()}",
182
+ "facebook": f"https://facebook.com/{base_name.lower()}{template['name_suffix'].lower()}"
183
+ },
184
+ source_url="sample_data",
185
+ confidence_score=0.8
186
+ ))
187
+
188
+ return businesses
189
+
190
+ async def search_google_businesses(page, query: str, limit: int) -> List[BusinessContact]:
191
+ """Attempt to search Google for business information"""
192
+ businesses = []
193
+
194
+ try:
195
+ # Search Google for businesses
196
+ search_url = f"https://www.google.com/search?q={query.replace(' ', '+')}+contact+phone+email"
197
+
198
+ await page.goto(search_url, timeout=20000)
199
+ await page.wait_for_load_state("domcontentloaded", timeout=10000)
200
+
201
+ # Look for search result snippets
202
+ results = await page.query_selector_all("div.g")
203
+
204
+ for result in results[:limit]:
205
+ try:
206
+ # Extract title/business name
207
+ title_el = await result.query_selector("h3")
208
+ if not title_el:
209
+ continue
210
+
211
+ title = await title_el.inner_text()
212
+
213
+ # Extract snippet text for contact info
214
+ snippet_el = await result.query_selector(".VwiC3b, .s")
215
+ snippet = await snippet_el.inner_text() if snippet_el else ""
216
+
217
+ # Extract URL
218
+ link_el = await result.query_selector("a")
219
+ url = await link_el.get_attribute("href") if link_el else None
220
+
221
+ # Extract contact info from snippet
222
+ phones = extract_phone_numbers(snippet)
223
+ emails = extract_emails(snippet)
224
+
225
+ if phones or emails: # Only add if we found contact info
226
+ businesses.append(BusinessContact(
227
+ business_name=title,
228
+ phone=phones[0] if phones else None,
229
+ email=emails[0] if emails else None,
230
+ website=url,
231
+ address=None,
232
+ industry=None,
233
+ social_profiles={},
234
+ source_url=search_url,
235
+ confidence_score=0.6
236
+ ))
237
+
238
+ except Exception:
239
+ continue
240
+
241
+ except Exception:
242
+ # If Google search fails, return empty list
243
+ pass
244
+
245
+ return businesses
246
+
247
  @app.get("/search",
248
  response_model=SearchResponse,
249
  summary="Search Business Directory",
 
289
  try:
290
  businesses = []
291
 
292
+ # For demonstration and testing, we'll create sample data
293
+ # In production, you would implement actual directory scraping
294
+ # with proper anti-bot measures and rotating proxies
 
 
295
 
296
+ try:
297
+ # Generate sample business data based on query
298
+ sample_businesses = generate_sample_businesses(query, limit)
299
+ businesses.extend(sample_businesses)
300
+
301
+ # Optionally, try to scrape from a simple directory or use Google search
302
+ # This is a fallback that might work for some queries
303
+ if len(businesses) < limit and source in ["auto", "google"]:
304
+ try:
305
+ google_results = await search_google_businesses(page, query, limit - len(businesses))
306
+ businesses.extend(google_results)
307
+ except Exception as e:
308
+ # If Google search fails, continue with sample data
309
+ pass
310
+
311
+ except Exception as e:
312
+ # If all methods fail, return at least some sample data
313
+ businesses = generate_sample_businesses(query, min(limit, 3))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
314
 
315
  return SearchResponse(
316
  total_found=len(businesses),
 
598
  successful=successful,
599
  failed=failed,
600
  results=results
601
+ )
602
+
603
+
604
+ @app.get("/health")
605
+ async def health_check():
606
+ """Health check endpoint to verify API is working"""
607
+ return {
608
+ "status": "healthy",
609
+ "message": "Business Contact Intelligence API is running",
610
+ "version": "1.0.0",
611
+ "endpoints": [
612
+ "/search - Search business directories",
613
+ "/extract-from-url - Extract contacts from website",
614
+ "/bulk-extract - Bulk contact extraction (Premium)"
615
+ ]
616
+ }
617
+
618
+
619
+ @app.get("/test-search")
620
+ async def test_search():
621
+ """Test endpoint that returns sample data without web scraping"""
622
+ sample_businesses = generate_sample_businesses("restaurant", 3)
623
+
624
+ return SearchResponse(
625
+ total_found=len(sample_businesses),
626
+ results=sample_businesses,
627
+ search_query="restaurant",
628
+ source="test"
629
  )