Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import requests | |
| import time | |
| import re | |
| from duckduckgo_search import DDGS | |
| from bs4 import BeautifulSoup | |
| import anthropic | |
| import os | |
| from datetime import datetime, timedelta | |
| from dateutil import parser | |
| import json | |
| import threading | |
| from concurrent.futures import ThreadPoolExecutor, TimeoutError, as_completed | |
| # Initialize Anthropic client | |
| client = anthropic.Anthropic( | |
| api_key=os.getenv("ANTHROPIC_API_KEY") | |
| ) | |
| # Global variable to track cancellation | |
| cancel_operation = threading.Event() | |
| def reset_cancellation(): | |
| """Reset the cancellation flag""" | |
| cancel_operation.clear() | |
| def check_cancellation(): | |
| """Check if operation should be cancelled""" | |
| return cancel_operation.is_set() | |
| # === Enhanced Model functions with progress tracking === | |
| def extract_publication_date(soup, url): | |
| """Extract publication date from article HTML - same as before""" | |
| try: | |
| date_selectors = [ | |
| 'time[datetime]', | |
| '.date', '.publish-date', '.published', '.post-date', | |
| '[class*="date"]', '[class*="time"]', | |
| 'meta[property="article:published_time"]', | |
| 'meta[name="publishdate"]', | |
| 'meta[name="date"]' | |
| ] | |
| for selector in date_selectors: | |
| element = soup.select_one(selector) | |
| if element: | |
| date_text = element.get('datetime') or element.get('content') or element.get_text() | |
| if date_text: | |
| try: | |
| return parser.parse(date_text) | |
| except: | |
| continue | |
| date_patterns = [ | |
| r'(\w+ \d{1,2}, \d{4})', | |
| r'(\d{1,2}/\d{1,2}/\d{4})', | |
| r'(\d{4}-\d{2}-\d{2})' | |
| ] | |
| text = soup.get_text()[:2000] | |
| for pattern in date_patterns: | |
| matches = re.findall(pattern, text) | |
| if matches: | |
| try: | |
| return parser.parse(matches[0]) | |
| except: | |
| continue | |
| except Exception as e: | |
| print(f"Date extraction error for {url}: {e}") | |
| return None | |
| def get_full_article_with_timeout(url, timeout=15): | |
| """Enhanced article fetching with timeout and better error handling""" | |
| if check_cancellation(): | |
| return "[CANCELLED] Operation was cancelled", None | |
| try: | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)', | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
| 'Accept-Language': 'en-US,en;q=0.5', | |
| 'Connection': 'keep-alive', | |
| 'Upgrade-Insecure-Requests': '1' | |
| } | |
| response = requests.get(url, headers=headers, timeout=timeout, verify=True) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| pub_date = extract_publication_date(soup, url) | |
| for element in soup(['script', 'style', 'nav', 'header', 'footer', 'aside', 'ads', 'noscript', 'form']): | |
| element.decompose() | |
| article_selectors = [ | |
| 'article', '.article-content', '.post-content', '.story-body', '.story-content', | |
| '.entry-content', '.content-body', '.article-body', 'main article', 'main .content', 'main', | |
| '[role="main"]', '.main-content', '.page-content', '.text', '.article-text' | |
| ] | |
| for selector in article_selectors: | |
| content = soup.select_one(selector) | |
| if content: | |
| paragraphs = content.find_all(['p', 'div'], string=True) | |
| if paragraphs: | |
| text_parts = [p.get_text(strip=True) for p in paragraphs if len(p.get_text(strip=True)) > 30] | |
| full_text = '\n\n'.join(text_parts) | |
| if len(full_text) > 300: | |
| return full_text[:10000], pub_date | |
| body_text = soup.get_text(separator='\n\n', strip=True) | |
| body_text = re.sub(r'\n{3,}', '\n\n', body_text) | |
| return (body_text[:10000] if len(body_text) > 300 else "[INFO] Could not extract substantial content"), pub_date | |
| except requests.exceptions.Timeout: | |
| return "[WARNING] Article fetch timeout - using snippet instead", None | |
| except requests.exceptions.RequestException as e: | |
| return f"[ERROR] Network error: {str(e)}", None | |
| except Exception as e: | |
| return f"[ERROR] Could not fetch article: {str(e)}", None | |
| def search_articles_by_timeframe_enhanced(name: str, timeframe: str, max_articles: int = 3, progress=None) -> list: | |
| """Enhanced search with progress tracking and better error handling""" | |
| if check_cancellation(): | |
| return [] | |
| if timeframe == "recent": | |
| search_queries = [ | |
| f'"{name}" founder news 2024 2025', | |
| f'"{name}" CEO founder recent', | |
| f'"{name}" founder update latest' | |
| ] | |
| else: | |
| search_queries = [ | |
| f'"{name}" founded established history', | |
| f'"{name}" founder origin story', | |
| f'"{name}" started began founder', | |
| f'"{name}" founder early days' | |
| ] | |
| all_results = [] | |
| max_retries = 2 | |
| base_delay = 2 # Reduced delay | |
| total_queries = len(search_queries) | |
| for query_idx, search_query in enumerate(search_queries): | |
| if len(all_results) >= max_articles or check_cancellation(): | |
| break | |
| if progress: | |
| query_progress = (query_idx / total_queries) * 0.3 # 30% of progress for queries | |
| progress(query_progress, desc=f"Searching {timeframe} articles ({query_idx + 1}/{total_queries})") | |
| for attempt in range(max_retries): | |
| if check_cancellation(): | |
| return all_results | |
| try: | |
| print(f"Search attempt {attempt + 1} for query {query_idx + 1} ({timeframe}): {search_query}") | |
| if attempt > 0: | |
| time.sleep(base_delay * attempt) | |
| configs = [ | |
| {'timeout': 15, 'region': 'us-en', 'safesearch': 'moderate'}, | |
| {'timeout': 20, 'region': 'wt-wt', 'safesearch': 'off'} | |
| ] | |
| config = configs[min(attempt, len(configs)-1)] | |
| with DDGS(timeout=config['timeout']) as ddgs: | |
| search_params = { | |
| 'keywords': search_query, | |
| 'max_results': max_articles - len(all_results) + 2, | |
| 'safesearch': config['safesearch'] | |
| } | |
| if config['region']: | |
| search_params['region'] = config['region'] | |
| results = list(ddgs.text(**search_params)) | |
| if results: | |
| existing_urls = {r.get('url', '') for r in all_results} | |
| for result in results: | |
| if len(all_results) >= max_articles: | |
| break | |
| url = result.get('href', '') | |
| if url and url not in existing_urls: | |
| all_results.append(result) | |
| existing_urls.add(url) | |
| break | |
| except Exception as e: | |
| print(f"Attempt {attempt + 1} failed for {timeframe} query {query_idx + 1}: {str(e)}") | |
| if attempt < max_retries - 1: | |
| time.sleep(base_delay * (attempt + 1)) | |
| return all_results[:max_articles] | |
| def categorize_article_by_date(pub_date): | |
| """Same as before""" | |
| if not pub_date: | |
| return "unknown" | |
| one_year_ago = datetime.now() - timedelta(days=365) | |
| return "recent" if pub_date >= one_year_ago else "historical" | |
| def fetch_article_parallel(result, article_num, total_articles, progress=None): | |
| """Fetch single article with progress update""" | |
| if check_cancellation(): | |
| return None | |
| url = result.get('href', 'No URL') | |
| title = result.get('title', 'No Title') | |
| snippet = result.get('body', 'No snippet available') | |
| expected_timeframe = result.get('expected_timeframe', 'unknown') | |
| if progress: | |
| fetch_progress = 0.4 + (article_num / total_articles) * 0.5 # 40-90% of total progress | |
| progress(fetch_progress, desc=f"Fetching article {article_num + 1}/{total_articles}: {title[:50]}...") | |
| full_text, pub_date = get_full_article_with_timeout(url, timeout=12) | |
| if check_cancellation(): | |
| return None | |
| actual_timeframe = categorize_article_by_date(pub_date) | |
| if any(error in str(full_text) for error in ["[ERROR]", "timeout", "Network error", "[CANCELLED]"]): | |
| content = f"[SNIPPET ONLY]\n{snippet}" | |
| else: | |
| content = full_text | |
| timeframe_indicator = "" | |
| if pub_date: | |
| date_str = pub_date.strftime("%B %d, %Y") | |
| timeframe_indicator = f"π **Published**: {date_str} ({actual_timeframe.title()})" | |
| else: | |
| timeframe_indicator = f"π **Timeframe**: {expected_timeframe.title()} (estimated)" | |
| article = f"### {article_num + 1}. {title}\n" | |
| article += f"[Source]({url})\n" | |
| article += f"{timeframe_indicator}\n\n" | |
| article += f"{content}\n" | |
| return { | |
| 'article': article, | |
| 'timeframe': actual_timeframe, | |
| 'url': url, | |
| 'title': title | |
| } | |
| def search_articles_enhanced(name: str, max_articles: int = 4, progress=None) -> str: | |
| """Enhanced search with progress tracking and parallel processing""" | |
| reset_cancellation() # Reset cancellation flag | |
| if progress: | |
| progress(0, desc="Initializing enhanced search...") | |
| recent_count = max_articles // 2 | |
| historical_count = max_articles - recent_count | |
| if progress: | |
| progress(0.05, desc=f"Planning search: {recent_count} recent + {historical_count} historical articles") | |
| # Search for recent articles | |
| if progress: | |
| progress(0.1, desc="Searching for recent articles...") | |
| recent_results = search_articles_by_timeframe_enhanced(name, "recent", recent_count, progress) | |
| if check_cancellation(): | |
| return "[CANCELLED] Search was cancelled by user" | |
| if progress: | |
| progress(0.3, desc="Searching for historical articles...") | |
| # Brief pause between searches | |
| time.sleep(1) | |
| historical_results = search_articles_by_timeframe_enhanced(name, "historical", historical_count, progress) | |
| if check_cancellation(): | |
| return "[CANCELLED] Search was cancelled by user" | |
| # Combine results | |
| all_results = [] | |
| for result in recent_results: | |
| result['expected_timeframe'] = 'recent' | |
| all_results.append(result) | |
| for result in historical_results: | |
| result['expected_timeframe'] = 'historical' | |
| all_results.append(result) | |
| if not all_results: | |
| if progress: | |
| progress(1.0, desc="Search completed - no results found") | |
| return f"[INFO] No articles found for {name}" | |
| if progress: | |
| progress(0.4, desc=f"Found {len(all_results)} articles, now fetching content...") | |
| # Fetch articles with parallel processing (but limited concurrency) | |
| articles = [] | |
| recent_found = 0 | |
| historical_found = 0 | |
| # Use ThreadPoolExecutor for controlled parallel fetching | |
| max_workers = min(3, len(all_results)) # Limit concurrent requests | |
| with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| # Submit all tasks | |
| future_to_result = { | |
| executor.submit(fetch_article_parallel, result, i, len(all_results), progress): (result, i) | |
| for i, result in enumerate(all_results) | |
| } | |
| # Collect results as they complete | |
| for future in as_completed(future_to_result, timeout=60): # 60 second timeout | |
| if check_cancellation(): | |
| # Cancel remaining futures | |
| for f in future_to_result: | |
| f.cancel() | |
| return "[CANCELLED] Search was cancelled by user" | |
| try: | |
| result_data = future.result(timeout=15) | |
| if result_data: | |
| articles.append(result_data) | |
| # Count by timeframe | |
| if result_data['timeframe'] == "recent": | |
| recent_found += 1 | |
| elif result_data['timeframe'] == "historical": | |
| historical_found += 1 | |
| except TimeoutError: | |
| print("Article fetch timed out") | |
| continue | |
| except Exception as e: | |
| print(f"Error fetching article: {e}") | |
| continue | |
| if check_cancellation(): | |
| return "[CANCELLED] Search was cancelled by user" | |
| if progress: | |
| progress(0.95, desc="Formatting results...") | |
| # Sort articles to maintain order | |
| articles.sort(key=lambda x: all_results.index(next(r for r in all_results if r.get('href', '') == x['url']))) | |
| # Create final output | |
| summary = f"**Search Summary**: Found {len(articles)} articles total - {recent_found} recent, {historical_found} historical, {len(articles) - recent_found - historical_found} unknown timeframe\n\n" | |
| article_texts = [article_data['article'] for article_data in articles] | |
| if progress: | |
| progress(1.0, desc=f"Search completed! Found {len(articles)} articles") | |
| return summary + "\n---\n".join(article_texts) | |
| def extract_entities_enhanced(search_results: str, company_name: str, progress=None) -> str: | |
| """Enhanced entity extraction with progress tracking""" | |
| if progress: | |
| progress(0, desc="Preparing text for analysis...") | |
| MAX_CHARS = 15000 | |
| if len(search_results) > MAX_CHARS: | |
| trunc = search_results[:MAX_CHARS] | |
| last_period = trunc.rfind('. ') | |
| search_results = trunc[:last_period + 1] if last_period > 3000 else trunc | |
| if progress: | |
| progress(0.2, desc="Analyzing articles with AI...") | |
| prompt = f"""Extract all named entities that are described as founders of "{company_name}" specifically from the following text. | |
| Only include founders who are explicitly mentioned as founders of {company_name}. | |
| Ignore founders of other companies that may be mentioned in the text. | |
| Return a JSON object with the following structure: | |
| {{ | |
| "founders": [ | |
| {{ | |
| "name": "Founder Name", | |
| "evidence": ["brief quote or context where they were mentioned as founder"] | |
| }} | |
| ] | |
| }} | |
| Respond only with valid JSON. Do not include any explanations, comments, or additional formatting. | |
| You have to examine every article available in the search results below. | |
| Text: | |
| {search_results}""" | |
| try: | |
| if progress: | |
| progress(0.5, desc="Sending request to AI model...") | |
| message = client.messages.create( | |
| # The model name is left as you provided. | |
| model="claude-sonnet-4-20250514", | |
| max_tokens=1500, | |
| temperature=0.1, | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": prompt | |
| } | |
| ] | |
| ) | |
| if progress: | |
| progress(0.9, desc="Processing AI response...") | |
| result = message.content[0].text | |
| if progress: | |
| progress(1.0, desc="Analysis completed!") | |
| return result | |
| except Exception as e: | |
| if progress: | |
| progress(1.0, desc="Analysis failed") | |
| return f"[ERROR] Extraction failed: {str(e)}" | |
| # === Enhanced Gradio interface functions === | |
| def search_only_enhanced(name: str, article_count: int, progress=gr.Progress()): | |
| """Enhanced search with progress tracking""" | |
| if not name.strip(): | |
| return "β No name provided", "" | |
| try: | |
| start = time.time() | |
| progress(0, desc="Starting enhanced temporal search...") | |
| articles_output = search_articles_enhanced(name.strip(), max_articles=article_count, progress=progress) | |
| # This now correctly handles cancellation from the main thread | |
| if "[CANCELLED]" in articles_output: | |
| return "π Search was cancelled by user.", "" | |
| elapsed = time.time() - start | |
| progress(1.0, desc=f"Search completed in {elapsed:.1f}s") | |
| results = f"β **Enhanced Temporal Search** completed for **{name}** in {elapsed:.1f}s\n\n" | |
| results += articles_output | |
| return results, articles_output | |
| except Exception as e: | |
| progress(1.0, desc="Search failed") | |
| return f"β **Search failed**: {str(e)}", "" | |
| def extract_only_enhanced(stored_results: str, company_name: str, progress=gr.Progress()): | |
| """Enhanced extraction with progress tracking""" | |
| if not stored_results.strip(): | |
| return "β No search results available. Please search first." | |
| if not company_name.strip(): | |
| return "β No company name provided. Please search first." | |
| if "[CANCELLED]" in stored_results: | |
| return "β Cannot extract from cancelled search results. Please search again." | |
| try: | |
| start = time.time() | |
| entities = extract_entities_enhanced(stored_results, company_name.strip(), progress) | |
| elapsed = time.time() - start | |
| # Try to format JSON for better readability | |
| try: | |
| parsed = json.loads(entities) | |
| formatted = json.dumps(parsed, indent=2) | |
| return f"β **Enhanced Extraction** completed in {elapsed:.1f}s\n\n```json\n{formatted}\n```" | |
| except: | |
| return f"β **Enhanced Extraction** completed in {elapsed:.1f}s\n\n{entities}" | |
| except Exception as e: | |
| progress(1.0, desc="Extraction failed") | |
| return f"β **Extraction failed**: {str(e)}" | |
| def cancel_search(): | |
| """Cancel the current search operation""" | |
| cancel_operation.set() | |
| return "π **Cancellation requested** - stopping current operation..." | |
| # === Enhanced Gradio UI === | |
| with gr.Blocks(title="Enhanced Founder Finder", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# π Enhanced Founder Finder") | |
| gr.Markdown("Enter a business or project name to search for its founder using **temporal search strategy** with **real-time progress tracking**.") | |
| gr.Markdown("*π **New Features**: Progress bars, cancellation support, parallel processing, better error handling*") | |
| gr.Markdown("*β±οΈ Note: Enhanced search typically takes 30β60 seconds with full progress visibility.*") | |
| search_state = gr.State("") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| name_input = gr.Textbox( | |
| label="Company Name", | |
| placeholder="Enter business name (e.g., 'Tesla', 'SpaceX', 'Microsoft')", | |
| lines=1 | |
| ) | |
| with gr.Column(scale=1): | |
| article_count_slider = gr.Slider( | |
| 2, 12, | |
| value=4, | |
| step=2, | |
| label="Total Articles", | |
| info="Split between recent/historical" | |
| ) | |
| with gr.Row(): | |
| search_btn = gr.Button("π Enhanced Temporal Search", variant="primary", size="lg") | |
| cancel_btn = gr.Button("π Cancel Search", variant="secondary", size="lg") | |
| extract_btn = gr.Button("π Extract Founder Intelligence", variant="secondary", size="lg") | |
| with gr.Row(): | |
| status_output = gr.Markdown("Ready to search...") | |
| with gr.Row(): | |
| with gr.Column(): | |
| output1 = gr.Markdown(label="Search Results with Temporal Analysis", height=400) | |
| with gr.Column(): | |
| output2 = gr.Textbox( | |
| label="Founder Intelligence Report", | |
| lines=15, | |
| max_lines=25, | |
| show_copy_button=True | |
| ) | |
| # --- FIX 2: Implement proper Gradio event cancellation --- | |
| # The search_btn.click event is assigned to a variable. | |
| search_event = search_btn.click( | |
| fn=search_only_enhanced, | |
| inputs=[name_input, article_count_slider], | |
| outputs=[output1, search_state], | |
| show_progress="full" # Use "full" for better progress bar experience | |
| ) | |
| # The cancel_btn.click event now correctly cancels the search_event. | |
| cancel_btn.click( | |
| fn=cancel_search, | |
| inputs=None, | |
| outputs=[status_output], | |
| # This is the crucial argument that links the cancel button to the search event. | |
| cancels=[search_event] | |
| ) | |
| extract_btn.click( | |
| fn=extract_only_enhanced, | |
| inputs=[search_state, name_input], | |
| outputs=[output2], | |
| show_progress="full" | |
| ) | |
| # Add some example companies | |
| gr.Examples( | |
| examples=[ | |
| ["Tesla", 4], | |
| ["SpaceX", 6], | |
| ["Microsoft", 4], | |
| ["Apple", 6], | |
| ["OpenAI", 4] | |
| ], | |
| inputs=[name_input, article_count_slider], | |
| ) | |
| # --- FIX 1: Enable the queue. This is essential for long-running tasks on Hugging Face Spaces. --- | |
| demo.queue() | |
| if __name__ == "__main__": | |
| demo.launch( | |
| share=False, | |
| show_error=True | |
| ) | |
| ''' | |
| import gradio as gr | |
| import requests | |
| import time | |
| import re | |
| from duckduckgo_search import DDGS | |
| from bs4 import BeautifulSoup | |
| import anthropic | |
| import os | |
| from datetime import datetime, timedelta | |
| from dateutil import parser | |
| import json | |
| # Initialize Anthropic client | |
| client = anthropic.Anthropic( | |
| api_key=os.getenv("ANTHROPIC_API_KEY") # Set as secret in HF Space settings | |
| ) | |
| # === Model functions === | |
| def extract_publication_date(soup, url): | |
| """Extract publication date from article HTML""" | |
| try: | |
| # Common date selectors | |
| date_selectors = [ | |
| 'time[datetime]', | |
| '.date', '.publish-date', '.published', '.post-date', | |
| '[class*="date"]', '[class*="time"]', | |
| 'meta[property="article:published_time"]', | |
| 'meta[name="publishdate"]', | |
| 'meta[name="date"]' | |
| ] | |
| for selector in date_selectors: | |
| element = soup.select_one(selector) | |
| if element: | |
| date_text = element.get('datetime') or element.get('content') or element.get_text() | |
| if date_text: | |
| try: | |
| return parser.parse(date_text) | |
| except: | |
| continue | |
| # Look for date patterns in text | |
| date_patterns = [ | |
| r'(\w+ \d{1,2}, \d{4})', # January 15, 2023 | |
| r'(\d{1,2}/\d{1,2}/\d{4})', # 01/15/2023 | |
| r'(\d{4}-\d{2}-\d{2})' # 2023-01-15 | |
| ] | |
| text = soup.get_text()[:2000] # First 2000 chars | |
| for pattern in date_patterns: | |
| matches = re.findall(pattern, text) | |
| if matches: | |
| try: | |
| return parser.parse(matches[0]) | |
| except: | |
| continue | |
| except Exception as e: | |
| print(f"Date extraction error for {url}: {e}") | |
| return None | |
| def get_full_article(url): | |
| try: | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)', | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
| 'Accept-Language': 'en-US,en;q=0.5', | |
| 'Connection': 'keep-alive', | |
| 'Upgrade-Insecure-Requests': '1' | |
| } | |
| response = requests.get(url, headers=headers, timeout=20, verify=True) | |
| response.raise_for_status() | |
| soup = BeautifulSoup(response.content, 'html.parser') | |
| # Extract publication date | |
| pub_date = extract_publication_date(soup, url) | |
| for element in soup(['script', 'style', 'nav', 'header', 'footer', 'aside', 'ads', 'noscript', 'form']): | |
| element.decompose() | |
| article_selectors = [ | |
| 'article', '.article-content', '.post-content', '.story-body', '.story-content', | |
| '.entry-content', '.content-body', '.article-body', 'main article', 'main .content', 'main', | |
| '[role="main"]', '.main-content', '.page-content', '.text', '.article-text' | |
| ] | |
| for selector in article_selectors: | |
| content = soup.select_one(selector) | |
| if content: | |
| paragraphs = content.find_all(['p', 'div'], string=True) | |
| if paragraphs: | |
| text_parts = [p.get_text(strip=True) for p in paragraphs if len(p.get_text(strip=True)) > 30] | |
| full_text = '\n\n'.join(text_parts) | |
| if len(full_text) > 300: | |
| return full_text[:10000], pub_date | |
| body_text = soup.get_text(separator='\n\n', strip=True) | |
| body_text = re.sub(r'\n{3,}', '\n\n', body_text) | |
| return (body_text[:10000] if len(body_text) > 300 else "[INFO] Could not extract substantial content"), pub_date | |
| except requests.exceptions.Timeout: | |
| return "[WARNING] Article fetch timeout - using snippet instead", None | |
| except requests.exceptions.RequestException: | |
| return "[ERROR] Could not fetch article: Network error", None | |
| except Exception as e: | |
| return f"[ERROR] Could not fetch article: {str(e)}", None | |
| def search_articles_by_timeframe(name: str, timeframe: str, max_articles: int = 3) -> list: | |
| """Search for articles in specific timeframe""" | |
| # Define search queries based on timeframe | |
| if timeframe == "recent": | |
| # Recent articles (news, updates, current events) | |
| search_queries = [ | |
| f'"{name}" founder news 2024 2025', | |
| f'"{name}" CEO founder recent', | |
| f'"{name}" founder update latest' | |
| ] | |
| else: # historical | |
| # Historical articles (founding, establishment, origin stories) | |
| search_queries = [ | |
| f'"{name}" founded established history', | |
| f'"{name}" founder origin story', | |
| f'"{name}" started began founder', | |
| f'"{name}" founder early days' | |
| ] | |
| all_results = [] | |
| max_retries = 2 | |
| base_delay = 3 | |
| for query_idx, search_query in enumerate(search_queries): | |
| if len(all_results) >= max_articles: | |
| break | |
| for attempt in range(max_retries): | |
| try: | |
| print(f"Search attempt {attempt + 1} for query {query_idx + 1} ({timeframe}): {search_query}") | |
| time.sleep(base_delay * (attempt + 1)) | |
| configs = [ | |
| {'timeout': 20, 'region': 'us-en', 'safesearch': 'moderate'}, | |
| {'timeout': 25, 'region': 'wt-wt', 'safesearch': 'off'} | |
| ] | |
| config = configs[min(attempt, len(configs)-1)] | |
| with DDGS(timeout=config['timeout']) as ddgs: | |
| search_params = { | |
| 'keywords': search_query, | |
| 'max_results': max_articles - len(all_results) + 2, # Get a few extra to filter | |
| 'safesearch': config['safesearch'] | |
| } | |
| if config['region']: | |
| search_params['region'] = config['region'] | |
| results = list(ddgs.text(**search_params)) | |
| print(f"Found {len(results)} results for query {query_idx + 1}") | |
| if results: | |
| # Add unique results (avoid duplicates) | |
| existing_urls = {r.get('url', '') for r in all_results} | |
| for result in results: | |
| if len(all_results) >= max_articles: | |
| break | |
| url = result.get('href', '') | |
| if url and url not in existing_urls: | |
| all_results.append(result) | |
| existing_urls.add(url) | |
| break | |
| except Exception as e: | |
| print(f"Attempt {attempt + 1} failed for {timeframe} query {query_idx + 1}: {str(e)}") | |
| if attempt < max_retries - 1: | |
| time.sleep(base_delay * (attempt + 2)) | |
| return all_results[:max_articles] | |
| def categorize_article_by_date(pub_date): | |
| """Categorize article as recent or historical based on publication date""" | |
| if not pub_date: | |
| return "unknown" | |
| one_year_ago = datetime.now() - timedelta(days=365) | |
| if pub_date >= one_year_ago: | |
| return "recent" | |
| else: | |
| return "historical" | |
| def search_articles(name: str, max_articles: int = 4) -> str: | |
| """Enhanced search that ensures both recent and historical articles""" | |
| # Split articles between recent and historical | |
| recent_count = max_articles // 2 | |
| historical_count = max_articles - recent_count | |
| print(f"Searching for {recent_count} recent and {historical_count} historical articles about {name}") | |
| # Search for recent articles | |
| recent_results = search_articles_by_timeframe(name, "recent", recent_count) | |
| time.sleep(2) # Brief pause between timeframe searches | |
| # Search for historical articles | |
| historical_results = search_articles_by_timeframe(name, "historical", historical_count) | |
| # Combine and process all results | |
| all_results = [] | |
| # Process recent articles | |
| for result in recent_results: | |
| result['expected_timeframe'] = 'recent' | |
| all_results.append(result) | |
| # Process historical articles | |
| for result in historical_results: | |
| result['expected_timeframe'] = 'historical' | |
| all_results.append(result) | |
| if not all_results: | |
| return f"[INFO] No articles found for {name}" | |
| # Fetch and categorize articles | |
| articles = [] | |
| recent_found = 0 | |
| historical_found = 0 | |
| for i, result in enumerate(all_results, 1): | |
| url = result.get('href', 'No URL') | |
| title = result.get('title', 'No Title') | |
| snippet = result.get('body', 'No snippet available') | |
| expected_timeframe = result.get('expected_timeframe', 'unknown') | |
| if i > 1: | |
| time.sleep(2) | |
| full_text, pub_date = get_full_article(url) | |
| actual_timeframe = categorize_article_by_date(pub_date) | |
| # Count articles by actual timeframe | |
| if actual_timeframe == "recent": | |
| recent_found += 1 | |
| elif actual_timeframe == "historical": | |
| historical_found += 1 | |
| if any(error in str(full_text) for error in ["[ERROR]", "timeout", "Network error"]): | |
| print(f"Using snippet fallback for article {i}") | |
| content = f"[SNIPPET ONLY]\n{snippet}" | |
| else: | |
| content = full_text | |
| # Create timeframe indicator | |
| timeframe_indicator = "" | |
| if pub_date: | |
| date_str = pub_date.strftime("%B %d, %Y") | |
| timeframe_indicator = f"π **Published**: {date_str} ({actual_timeframe.title()})" | |
| else: | |
| timeframe_indicator = f"π **Timeframe**: {expected_timeframe.title()} (estimated)" | |
| article = f"### {i}. {title}\n" | |
| article += f"[Source]({url})\n" | |
| article += f"{timeframe_indicator}\n\n" | |
| article += f"{content}\n" | |
| articles.append(article) | |
| # Add summary of coverage | |
| summary = f"**Search Summary**: Found {len(articles)} articles total - {recent_found} recent, {historical_found} historical, {len(articles) - recent_found - historical_found} unknown timeframe\n\n" | |
| return summary + "\n---\n".join(articles) | |
| def extract_entities(search_results: str, company_name: str) -> str: | |
| """Extract entities using Claude 4""" | |
| MAX_CHARS = 15000 | |
| if len(search_results) > MAX_CHARS: | |
| trunc = search_results[:MAX_CHARS] | |
| last_period = trunc.rfind('. ') | |
| search_results = trunc[:last_period + 1] if last_period > 3000 else trunc | |
| prompt = f"""Extract all named entities that are described as founders of "{company_name}" specifically from the following text. | |
| Only include founders who are explicitly mentioned as founders of {company_name}. | |
| Ignore founders of other companies that may be mentioned in the text. | |
| Return a JSON object with the following structure: | |
| {{ | |
| "founders": [ | |
| {{ | |
| "name": "Founder Name", | |
| "evidence": ["brief quote or context where they were mentioned as founder"] | |
| }} | |
| ] | |
| }} | |
| Respond only with valid JSON. Do not include any explanations, comments, or additional formatting. | |
| You have to examine every article available in the search results below. | |
| Text: | |
| {search_results}""" | |
| try: | |
| message = client.messages.create( | |
| model="claude-sonnet-4-20250514", | |
| max_tokens=1500, | |
| temperature=0.1, | |
| messages=[ | |
| { | |
| "role": "user", | |
| "content": prompt | |
| } | |
| ] | |
| ) | |
| return message.content[0].text | |
| except Exception as e: | |
| return f"[ERROR] Extraction failed: {str(e)}" | |
| # === Gradio interface functions === | |
| def search_only(name: str, article_count: int): | |
| if not name.strip(): | |
| return "No name provided", "" | |
| try: | |
| start = time.time() | |
| articles_output = search_articles(name.strip(), max_articles=article_count) | |
| elapsed = time.time() - start | |
| results = f"β **Enhanced Temporal Search** completed for **{name}** in {elapsed:.1f}s\n\n" | |
| results += articles_output | |
| return results, articles_output | |
| except Exception as e: | |
| return f"[ERROR] Search failed: {str(e)}", "" | |
| def extract_only(stored_results: str, company_name: str): | |
| if not stored_results.strip(): | |
| return "No search results available. Please search first." | |
| if not company_name.strip(): | |
| return "No company name provided. Please search first." | |
| try: | |
| start = time.time() | |
| entities = extract_entities(stored_results, company_name.strip()) | |
| elapsed = time.time() - start | |
| # Try to format JSON for better readability | |
| try: | |
| parsed = json.loads(entities) | |
| formatted = json.dumps(parsed, indent=2) | |
| return f"β **Enhanced Extraction** completed in {elapsed:.1f}s\n\n```json\n{formatted}\n```" | |
| except: | |
| return f"β **Enhanced Extraction** completed in {elapsed:.1f}s\n\n{entities}" | |
| except Exception as e: | |
| return f"[ERROR] Extraction failed: {str(e)}" | |
| # === Gradio UI === | |
| with gr.Blocks(title="Enhanced Founder Finder") as demo: | |
| gr.Markdown("# π Enhanced Founder Finder") | |
| gr.Markdown("Enter a business or project name to search for its founder using **temporal search strategy**.") | |
| gr.Markdown("*π **New**: Automatically searches for both recent news AND historical founding information*") | |
| gr.Markdown("*β±οΈ Note: Enhanced search may take 60β90 seconds for comprehensive results.*") | |
| search_state = gr.State("") | |
| with gr.Row(): | |
| name_input = gr.Textbox(label="Company Name", placeholder="Enter business name") | |
| article_count_slider = gr.Slider(2, 12, value=4, step=2, label="Total Articles (split between recent/historical)") | |
| with gr.Column(): | |
| search_btn = gr.Button("π Enhanced Temporal Search", variant="primary") | |
| extract_btn = gr.Button("π Extract Founder Intelligence", variant="secondary") | |
| output1 = gr.Markdown(label="Search Results with Temporal Analysis") | |
| output2 = gr.Textbox( | |
| label="Founder Intelligence Report", | |
| lines=15, | |
| max_lines=25, | |
| show_copy_button=True | |
| ) | |
| search_btn.click( | |
| fn=search_only, | |
| inputs=[name_input, article_count_slider], | |
| outputs=[output1, search_state] | |
| ) | |
| extract_btn.click( | |
| fn=extract_only, | |
| inputs=[search_state, name_input], | |
| outputs=[output2] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |
| ''' |