Spaces:
Sleeping
Sleeping
import gradio as gr | |
import requests | |
import time | |
from duckduckgo_search import DDGS | |
from bs4 import BeautifulSoup | |
# === Model functions === | |
def get_full_article(url): | |
"""Fetch full article content from URL""" | |
try: | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'Accept-Encoding': 'gzip, deflate', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1' | |
} | |
response = requests.get(url, headers=headers, timeout=15, verify=False) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.content, 'html.parser') | |
# Remove unwanted elements | |
for element in soup(['script', 'style', 'nav', 'header', 'footer', 'aside', 'ads', 'noscript', 'form']): | |
element.decompose() | |
# Try common article selectors in order of preference | |
article_selectors = [ | |
'article', | |
'.article-content', '.post-content', '.story-body', '.story-content', | |
'.entry-content', '.content-body', '.article-body', | |
'main article', 'main .content', 'main', | |
'[role="main"]', '.main-content', '.page-content', | |
'.text', '.article-text' | |
] | |
for selector in article_selectors: | |
content = soup.select_one(selector) | |
if content: | |
# Extract paragraphs for better formatting | |
paragraphs = content.find_all(['p', 'div'], string=True) | |
if paragraphs: | |
text_parts = [] | |
for p in paragraphs: | |
text = p.get_text(strip=True) | |
if len(text) > 30: # Filter out short/empty paragraphs | |
text_parts.append(text) | |
full_text = '\n\n'.join(text_parts) | |
if len(full_text) > 300: # Only return if substantial content | |
return full_text[:10000] # Limit to 10000 chars | |
# Fallback to body text | |
body_text = soup.get_text(separator='\n\n', strip=True) | |
# Clean up multiple newlines | |
import re | |
body_text = re.sub(r'\n{3,}', '\n\n', body_text) | |
return body_text[:10000] if len(body_text) > 300 else "Could not extract substantial content" | |
except requests.exceptions.Timeout: | |
return "Article fetch timeout - using snippet instead" | |
except requests.exceptions.RequestException as e: | |
return f"Could not fetch article: Network error" | |
except Exception as e: | |
return f"Could not fetch article: {str(e)}" | |
def search_articles(name: str) -> str: | |
"""Search for newspaper articles containing the name and keywords using DuckDuckGo""" | |
keywords = ['founders', 'partners', 'funders', 'owners'] | |
search_query = f'"{name}" ({" OR ".join(keywords)}) site:news' | |
max_retries = 3 | |
base_delay = 3 | |
for attempt in range(max_retries): | |
try: | |
print(f"Search attempt {attempt + 1}: {search_query}") | |
# Progressive delay | |
delay = base_delay * (attempt + 1) | |
time.sleep(delay) | |
# Use different configurations for each attempt | |
configs = [ | |
{'timeout': 20, 'region': 'us-en', 'safesearch': 'moderate'}, | |
{'timeout': 25, 'region': 'wt-wt', 'safesearch': 'off'}, | |
{'timeout': 30, 'region': None, 'safesearch': 'moderate'} | |
] | |
config = configs[min(attempt, len(configs)-1)] | |
with DDGS(timeout=config['timeout']) as ddgs: | |
search_params = { | |
'keywords': search_query, | |
'max_results': 2, | |
'safesearch': config['safesearch'] | |
} | |
if config['region']: | |
search_params['region'] = config['region'] | |
results = list(ddgs.text(**search_params)) | |
print(f"Found {len(results)} results on attempt {attempt + 1}") | |
if not results: | |
if attempt < max_retries - 1: | |
print(f"No results found, retrying with different parameters...") | |
continue | |
return f"No articles found for {name} after {max_retries} attempts" | |
articles = [] | |
for i, result in enumerate(results, 1): | |
url = result.get('href', 'No URL') | |
title = result.get('title', 'No Title') | |
snippet = result.get('body', 'No snippet available') | |
print(f"Processing article {i}: {title}") | |
print(f"URL: {url}") | |
# Add delay between article fetches | |
if i > 1: | |
time.sleep(2) | |
# Try to get full article | |
full_text = get_full_article(url) | |
# Use snippet as fallback if full article extraction fails | |
if any(error in full_text for error in ["Could not fetch", "timeout", "Network error"]): | |
print(f"Using snippet fallback for article {i}") | |
content = f"[SNIPPET ONLY - Full article unavailable]\n{snippet}" | |
else: | |
content = full_text | |
article = f"**{i}. {title}**\n" | |
article += f"Source: {url}\n\n" | |
article += f"{content}\n" | |
articles.append(article) | |
return "\n" + "="*80 + "\n".join(articles) | |
except Exception as e: | |
error_msg = f"Attempt {attempt + 1} failed: {str(e)}" | |
print(error_msg) | |
if attempt < max_retries - 1: | |
wait_time = base_delay * (attempt + 2) | |
print(f"Waiting {wait_time} seconds before retry...") | |
time.sleep(wait_time) | |
else: | |
return f"[ERROR] Search failed after {max_retries} attempts. Last error: {str(e)}" | |
def extract_entities(search_results: str) -> str: | |
"""Extract entities using Mistral 7B endpoint""" | |
modal_endpoint = "https://msoaresdiego--mistral-llm-endpoint-fastapi-app.modal.run/generate" | |
prompt = f"""Extract all person names and organization names from the following text. Do not extract products and service names. Only individuals and organizations. Bring the full details of the name in the newspaper article. For example, if only ACME is mentioned as company name, bring only ACME. IF ACME Inc is mentioned as company name, then you have to extract ACME Inc. In addition, define the relationship between the entity and the company that is being searched. For example, is ACME Inc an owner of the company being searched? Then write 'owner'. Is ACME Inc. a funder of the company being searched? Then write 'funder' | |
Format as: | |
PERSON: [name] - [relationship] | |
ORG: [organization name] - [relationship] | |
Text: {search_results}""" | |
try: | |
response = requests.post( | |
modal_endpoint, | |
json={"prompt": prompt, "max_tokens": 1500, "temperature": 0.15}, | |
timeout=30 | |
) | |
if response.status_code == 200: | |
return response.json().get("response", "No entities extracted") | |
else: | |
return f"[ERROR] API Error: {response.status_code} - {response.text}" | |
except requests.exceptions.Timeout: | |
return "[ERROR] Entity extraction timeout - please try again" | |
except Exception as e: | |
return f"[ERROR] Extraction failed: {str(e)}" | |
# === Gradio interface functions === | |
def search_only(name: str): | |
"""Perform search only and return results""" | |
if not name.strip(): | |
return "No name provided", "" | |
try: | |
search_start = time.time() | |
articles_output = search_articles(name.strip()) | |
search_time = time.time() - search_start | |
search_results = f"Search completed for: {name} in {search_time:.1f}s\n\n" | |
search_results += articles_output | |
return search_results, articles_output # Return both display and raw results | |
except Exception as e: | |
error_msg = f"[ERROR] Search failed: {str(e)}" | |
return error_msg, "" | |
def extract_only(stored_search_results: str): | |
"""Extract entities from stored search results""" | |
if not stored_search_results.strip(): | |
return "No search results available. Please search first." | |
try: | |
extract_start = time.time() | |
entities = extract_entities(stored_search_results) | |
extract_time = time.time() - extract_start | |
extraction_results = f"Entity extraction completed in {extract_time:.1f}s\n\n" | |
extraction_results += entities | |
return extraction_results | |
except Exception as e: | |
return f"[ERROR] Extraction failed: {str(e)}" | |
# === Gradio UI === | |
with gr.Blocks(title="Related Entities Finder") as demo: | |
gr.Markdown("# 🔎 Related Entities Finder") | |
gr.Markdown("Enter a business or project name to search for related articles and extract key entities.") | |
gr.Markdown("*Note: Full article extraction may take 30-60 seconds. Snippets will be used as fallback if needed.*") | |
# State to store search results between operations | |
search_state = gr.State("") | |
with gr.Row(): | |
name_input = gr.Textbox(label="Company/Project Name", placeholder="Enter business or project name") | |
with gr.Column(): | |
search_btn = gr.Button("🔍 Search Articles", variant="primary", size="lg") | |
extract_btn = gr.Button("📋 Extract Entities", variant="secondary", size="lg") | |
with gr.Column(): | |
output1 = gr.Textbox( | |
label="Search Results", | |
lines=40, | |
max_lines=100, | |
show_copy_button=True | |
) | |
output2 = gr.Textbox( | |
label="Extracted Entities and Relationships", | |
lines=10, | |
max_lines=20, | |
show_copy_button=True | |
) | |
# Search button click | |
search_btn.click( | |
fn=search_only, | |
inputs=[name_input], | |
outputs=[output1, search_state] | |
) | |
# Extract button click | |
extract_btn.click( | |
fn=extract_only, | |
inputs=[search_state], | |
outputs=[output2] | |
) | |
if __name__ == "__main__": | |
demo.launch(share=False, server_name="0.0.0.0") |