dygoo's picture
Update app.py
aa62c4f verified
raw
history blame
10.8 kB
import gradio as gr
import requests
import time
from duckduckgo_search import DDGS
from bs4 import BeautifulSoup
# === Model functions ===
def get_full_article(url):
"""Fetch full article content from URL"""
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Accept-Encoding': 'gzip, deflate',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
response = requests.get(url, headers=headers, timeout=15, verify=False)
response.raise_for_status()
soup = BeautifulSoup(response.content, 'html.parser')
# Remove unwanted elements
for element in soup(['script', 'style', 'nav', 'header', 'footer', 'aside', 'ads', 'noscript', 'form']):
element.decompose()
# Try common article selectors in order of preference
article_selectors = [
'article',
'.article-content', '.post-content', '.story-body', '.story-content',
'.entry-content', '.content-body', '.article-body',
'main article', 'main .content', 'main',
'[role="main"]', '.main-content', '.page-content',
'.text', '.article-text'
]
for selector in article_selectors:
content = soup.select_one(selector)
if content:
# Extract paragraphs for better formatting
paragraphs = content.find_all(['p', 'div'], string=True)
if paragraphs:
text_parts = []
for p in paragraphs:
text = p.get_text(strip=True)
if len(text) > 30: # Filter out short/empty paragraphs
text_parts.append(text)
full_text = '\n\n'.join(text_parts)
if len(full_text) > 300: # Only return if substantial content
return full_text[:10000] # Limit to 10000 chars
# Fallback to body text
body_text = soup.get_text(separator='\n\n', strip=True)
# Clean up multiple newlines
import re
body_text = re.sub(r'\n{3,}', '\n\n', body_text)
return body_text[:10000] if len(body_text) > 300 else "Could not extract substantial content"
except requests.exceptions.Timeout:
return "Article fetch timeout - using snippet instead"
except requests.exceptions.RequestException as e:
return f"Could not fetch article: Network error"
except Exception as e:
return f"Could not fetch article: {str(e)}"
def search_articles(name: str) -> str:
"""Search for newspaper articles containing the name and keywords using DuckDuckGo"""
keywords = ['founders', 'partners', 'funders', 'owners']
search_query = f'"{name}" ({" OR ".join(keywords)}) site:news'
max_retries = 3
base_delay = 3
for attempt in range(max_retries):
try:
print(f"Search attempt {attempt + 1}: {search_query}")
# Progressive delay
delay = base_delay * (attempt + 1)
time.sleep(delay)
# Use different configurations for each attempt
configs = [
{'timeout': 20, 'region': 'us-en', 'safesearch': 'moderate'},
{'timeout': 25, 'region': 'wt-wt', 'safesearch': 'off'},
{'timeout': 30, 'region': None, 'safesearch': 'moderate'}
]
config = configs[min(attempt, len(configs)-1)]
with DDGS(timeout=config['timeout']) as ddgs:
search_params = {
'keywords': search_query,
'max_results': 2,
'safesearch': config['safesearch']
}
if config['region']:
search_params['region'] = config['region']
results = list(ddgs.text(**search_params))
print(f"Found {len(results)} results on attempt {attempt + 1}")
if not results:
if attempt < max_retries - 1:
print(f"No results found, retrying with different parameters...")
continue
return f"No articles found for {name} after {max_retries} attempts"
articles = []
for i, result in enumerate(results, 1):
url = result.get('href', 'No URL')
title = result.get('title', 'No Title')
snippet = result.get('body', 'No snippet available')
print(f"Processing article {i}: {title}")
print(f"URL: {url}")
# Add delay between article fetches
if i > 1:
time.sleep(2)
# Try to get full article
full_text = get_full_article(url)
# Use snippet as fallback if full article extraction fails
if any(error in full_text for error in ["Could not fetch", "timeout", "Network error"]):
print(f"Using snippet fallback for article {i}")
content = f"[SNIPPET ONLY - Full article unavailable]\n{snippet}"
else:
content = full_text
article = f"**{i}. {title}**\n"
article += f"Source: {url}\n\n"
article += f"{content}\n"
articles.append(article)
return "\n" + "="*80 + "\n".join(articles)
except Exception as e:
error_msg = f"Attempt {attempt + 1} failed: {str(e)}"
print(error_msg)
if attempt < max_retries - 1:
wait_time = base_delay * (attempt + 2)
print(f"Waiting {wait_time} seconds before retry...")
time.sleep(wait_time)
else:
return f"[ERROR] Search failed after {max_retries} attempts. Last error: {str(e)}"
def extract_entities(search_results: str) -> str:
"""Extract entities using Mistral 7B endpoint"""
modal_endpoint = "https://msoaresdiego--mistral-llm-endpoint-fastapi-app.modal.run/generate"
prompt = f"""Extract all person names and organization names from the following text. Do not extract products and service names. Only individuals and organizations. Bring the full details of the name in the newspaper article. For example, if only ACME is mentioned as company name, bring only ACME. IF ACME Inc is mentioned as company name, then you have to extract ACME Inc. In addition, define the relationship between the entity and the company that is being searched. For example, is ACME Inc an owner of the company being searched? Then write 'owner'. Is ACME Inc. a funder of the company being searched? Then write 'funder'
Format as:
PERSON: [name] - [relationship]
ORG: [organization name] - [relationship]
Text: {search_results}"""
try:
response = requests.post(
modal_endpoint,
json={"prompt": prompt, "max_tokens": 1500, "temperature": 0.15},
timeout=30
)
if response.status_code == 200:
return response.json().get("response", "No entities extracted")
else:
return f"[ERROR] API Error: {response.status_code} - {response.text}"
except requests.exceptions.Timeout:
return "[ERROR] Entity extraction timeout - please try again"
except Exception as e:
return f"[ERROR] Extraction failed: {str(e)}"
# === Gradio interface functions ===
def search_only(name: str):
"""Perform search only and return results"""
if not name.strip():
return "No name provided", ""
try:
search_start = time.time()
articles_output = search_articles(name.strip())
search_time = time.time() - search_start
search_results = f"Search completed for: {name} in {search_time:.1f}s\n\n"
search_results += articles_output
return search_results, articles_output # Return both display and raw results
except Exception as e:
error_msg = f"[ERROR] Search failed: {str(e)}"
return error_msg, ""
def extract_only(stored_search_results: str):
"""Extract entities from stored search results"""
if not stored_search_results.strip():
return "No search results available. Please search first."
try:
extract_start = time.time()
entities = extract_entities(stored_search_results)
extract_time = time.time() - extract_start
extraction_results = f"Entity extraction completed in {extract_time:.1f}s\n\n"
extraction_results += entities
return extraction_results
except Exception as e:
return f"[ERROR] Extraction failed: {str(e)}"
# === Gradio UI ===
with gr.Blocks(title="Related Entities Finder") as demo:
gr.Markdown("# 🔎 Related Entities Finder")
gr.Markdown("Enter a business or project name to search for related articles and extract key entities.")
gr.Markdown("*Note: Full article extraction may take 30-60 seconds. Snippets will be used as fallback if needed.*")
# State to store search results between operations
search_state = gr.State("")
with gr.Row():
name_input = gr.Textbox(label="Company/Project Name", placeholder="Enter business or project name")
with gr.Column():
search_btn = gr.Button("🔍 Search Articles", variant="primary", size="lg")
extract_btn = gr.Button("📋 Extract Entities", variant="secondary", size="lg")
with gr.Column():
output1 = gr.Textbox(
label="Search Results",
lines=40,
max_lines=100,
show_copy_button=True
)
output2 = gr.Textbox(
label="Extracted Entities and Relationships",
lines=10,
max_lines=20,
show_copy_button=True
)
# Search button click
search_btn.click(
fn=search_only,
inputs=[name_input],
outputs=[output1, search_state]
)
# Extract button click
extract_btn.click(
fn=extract_only,
inputs=[search_state],
outputs=[output2]
)
if __name__ == "__main__":
demo.launch(share=False, server_name="0.0.0.0")