Spaces:
Running
Running
import json | |
import sys | |
import os | |
import re | |
import time | |
import logging | |
import mimetypes | |
import tempfile | |
from datetime import datetime | |
from pathlib import Path | |
from urllib.parse import urlparse | |
from typing import List, Dict, Tuple, Union, Optional | |
import requests | |
import validators | |
import gradio as gr | |
from bs4 import BeautifulSoup | |
from fake_useragent import UserAgent | |
from cleantext import clean | |
import qrcode | |
import zipfile | |
# Setup logging with detailed configuration | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler('app.log', encoding='utf-8') | |
]) | |
logger = logging.getLogger(__name__) | |
class URLProcessor: | |
def __init__(self): | |
self.session = requests.Session() | |
self.timeout = 10 # seconds | |
self.session.headers.update({ | |
'User-Agent': UserAgent().random, | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'Accept-Encoding': 'gzip, deflate, br', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1' | |
}) | |
def advanced_text_cleaning(self, text: str) -> str: | |
"""Robust text cleaning with version compatibility""" | |
try: | |
cleaned_text = clean( | |
text, | |
to_ascii=True, | |
lower=True, | |
no_line_breaks=True, | |
no_urls=True, | |
no_emails=True, | |
no_phone_numbers=True, | |
no_numbers=False, | |
no_digits=False, | |
no_currency_symbols=True, | |
no_punct=False | |
).strip() | |
return cleaned_text | |
except Exception as e: | |
logger.warning(f"Text cleaning error: {e}. Using fallback method.") | |
text = re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text) # Remove control characters | |
text = text.encode('ascii', 'ignore').decode('ascii') # Remove non-ASCII characters | |
text = re.sub(r'\s+', ' ', text) # Normalize whitespace | |
return text.strip() | |
def validate_url(self, url: str) -> Dict: | |
"""Validate URL format and accessibility""" | |
try: | |
if not validators.url(url): | |
return {'is_valid': False, 'message': 'Invalid URL format'} | |
# Try with DNS resolution retry | |
for attempt in range(3): # Try up to 3 times | |
try: | |
# Some sites block HEAD requests but allow GET | |
try: | |
response = self.session.head(url, timeout=self.timeout) | |
response.raise_for_status() | |
except (requests.exceptions.RequestException, Exception) as e: | |
logger.warning(f"HEAD request failed for {url}, trying GET: {e}") | |
# Try with GET request if HEAD fails | |
response = self.session.get(url, timeout=self.timeout, stream=True) | |
response.raise_for_status() | |
# Close the connection to avoid downloading the entire content | |
response.close() | |
return {'is_valid': True, 'message': 'URL is valid and accessible'} | |
except requests.exceptions.ConnectionError as e: | |
if "NameResolutionError" in str(e) or "Failed to resolve" in str(e): | |
logger.warning(f"DNS resolution failed for {url}, attempt {attempt + 1}/3") | |
time.sleep(1) # Wait a bit before retrying | |
continue | |
else: | |
raise | |
except Exception as e: | |
raise | |
# If we get here, all attempts failed | |
return {'is_valid': False, | |
'message': f'URL validation failed: DNS resolution failed after multiple attempts'} | |
except Exception as e: | |
logger.error(f"URL validation failed for {url}: {str(e)}") | |
return {'is_valid': False, 'message': f'URL validation failed: {str(e)}'} | |
def fetch_content(self, url: str) -> Optional[Dict]: | |
"""Universal content fetcher with special case handling""" | |
try: | |
logger.info(f"Fetching content from: {url}") | |
# Google Drive document handling | |
if 'drive.google.com' in url: | |
return self._handle_google_drive(url) | |
# Google Calendar ICS handling | |
if 'calendar.google.com' in url and 'ical' in url: | |
return self._handle_google_calendar(url) | |
# Try standard HTML processing first | |
result = self._fetch_html_content(url) | |
# If standard processing failed or returned minimal content, try with Selenium | |
if not result or len(result.get('content', '')) < 100: | |
logger.info( | |
f"Standard processing failed or returned minimal content for {url}, trying Selenium") | |
selenium_html = self._fetch_with_selenium(url) | |
if selenium_html: | |
# Process the Selenium HTML | |
soup = BeautifulSoup(selenium_html, 'html.parser') | |
# Remove unwanted elements | |
for element in soup(['script', 'style', 'nav', 'footer', 'header', 'meta', 'link']): | |
element.decompose() | |
# Apply the same content extraction strategies as in _fetch_html_content | |
# Strategy 1: Look for semantic HTML5 elements | |
main_content = None | |
for selector in ['main', 'article', 'section', '.content', '.main', '.body', '.post', | |
'.entry', '.page']: | |
elements = soup.select(selector) | |
if elements: | |
main_content = elements[0] | |
logger.info(f"Found content with selector: {selector}") | |
break | |
# If no main content found, use body | |
if not main_content or not main_content.get_text(strip=True): | |
main_content = soup.body if soup.body else soup | |
# Extract text | |
text_content = main_content.get_text(separator='\n', strip=True) | |
# Clean content | |
cleaned_content = self.advanced_text_cleaning(text_content) | |
if len(cleaned_content) >= 20: | |
result = { | |
'content': cleaned_content, | |
'content_type': 'text/html', | |
'timestamp': datetime.now().isoformat(), | |
'url': url, | |
'source': 'selenium' # Mark that this came from Selenium | |
} | |
# Log the result status | |
if result: | |
logger.info(f"Successfully extracted content from {url} ({len(result.get('content', ''))} chars)") | |
else: | |
logger.error(f"Failed to extract content from {url}") | |
return result | |
except Exception as e: | |
logger.error(f"Content fetch failed for {url}: {e}") | |
return None | |
def _fetch_html_content(self, url: str) -> Optional[Dict]: | |
"""Standard HTML content processing""" | |
try: | |
# Try with a different user agent if it's a social media site | |
if any(domain in url for domain in | |
['facebook.com', 'instagram.com', 'twitter.com', 'x.com', 'huggingface.co']): | |
# Use a more realistic browser user agent instead of random one | |
self.session.headers.update({ | |
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', | |
# Add cookie consent headers to bypass some login walls | |
'Cookie': 'c_user=0; xs=0; datr=0; locale=en_US; wd=1920x1080; consent_accepted=true; cookie_consent=accepted', | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', | |
'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', | |
'sec-ch-ua-mobile': '?0', | |
'sec-ch-ua-platform': '"macOS"', | |
'Sec-Fetch-Dest': 'document', | |
'Sec-Fetch-Mode': 'navigate', | |
'Sec-Fetch-Site': 'none', | |
'Sec-Fetch-User': '?1', | |
'Upgrade-Insecure-Requests': '1' | |
}) | |
# For Facebook, try to access the mobile version which often has fewer restrictions | |
if 'facebook.com' in url and 'm.facebook.com' not in url: | |
url = url.replace('www.facebook.com', 'm.facebook.com') | |
logger.info(f"Switched to mobile Facebook URL: {url}") | |
# Add a delay to simulate human browsing | |
time.sleep(1) | |
# Try to get the page with multiple attempts | |
max_attempts = 3 | |
for attempt in range(max_attempts): | |
try: | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
break | |
except (requests.exceptions.RequestException, Exception) as e: | |
if attempt < max_attempts - 1: | |
logger.warning(f"Attempt {attempt + 1} failed for {url}: {e}. Retrying...") | |
time.sleep(2) # Wait longer between retries | |
else: | |
raise | |
logger.info(f"Response status: {response.status_code}, Content-Type: {response.headers.get('Content-Type')}") | |
# Save the raw HTML for debugging if needed | |
debug_path = f"/Users/a2014/urld/debug_raw_{int(time.time())}.html" | |
with open(debug_path, "w", encoding="utf-8") as f: | |
f.write(response.text) | |
logger.info(f"Saved raw HTML to {debug_path}") | |
# Check if we got a valid response with content | |
if not response.text or len(response.text) < 100: | |
logger.error(f"Empty or very short response from {url}") | |
return None | |
soup = BeautifulSoup(response.text, 'html.parser') | |
# Remove unwanted elements | |
for element in soup(['script', 'style', 'nav', 'footer', 'header', 'meta', 'link']): | |
element.decompose() | |
# Simulate "ESC key" by removing login walls and overlays common on social media sites | |
login_wall_selectors = [ | |
'.login-wall', '.signup-wall', '.overlay', '.modal', | |
'[role="dialog"]', '[aria-modal="true"]', '.login-overlay', | |
'.signup-overlay', '.uiLayer', '.fb_overlay', '.ReactModalPortal', | |
'[data-testid="login_dialog"]', '[data-testid="signup_dialog"]', | |
'.login-signup-modal', '.onboarding-modal', '.signup-wrapper', | |
'.login-wrapper', '.login-container', '.signup-container', | |
'.login-modal', '.signup-modal', '.auth-modal', '.auth-wall' | |
] | |
for selector in login_wall_selectors: | |
for element in soup.select(selector): | |
logger.info(f"Removing login wall element: {selector}") | |
element.decompose() | |
# Enhanced removal for social media sites | |
if 'facebook.com' in url: | |
# Facebook specific elements - simulating ESC key | |
fb_selectors = [ | |
'[data-testid="cookie-policy-manage-dialog"]', | |
'[role="banner"]', '[role="complementary"]', | |
'.login_form_container', '.login_form', '#login_form', | |
'.uiLayer', '.pluginConnectButton', '.fbPageBanner', | |
'._5hn6', '._67m7', '.nonLoggedInSignUp', | |
'#headerArea', '.uiContextualLayer', '.uiContextualLayerPositioner' | |
] | |
for selector in fb_selectors: | |
for element in soup.select(selector): | |
element.decompose() | |
# Look for the main content in mobile version | |
main_content = soup.select_one('#m_story_permalink_view') or soup.select_one( | |
'#mobile_injected_video_feed_pagelet') | |
if main_content: | |
logger.info("Found Facebook mobile main content") | |
elif 'instagram.com' in url: | |
# Instagram specific elements - simulating ESC key | |
ig_selectors = [ | |
'[role="presentation"]', '[role="banner"]', '[role="complementary"]', | |
'.RnEpo', '._acb3', '._ab8w', '._abn5', '.x1n2onr6', | |
'.x78zum5', '.x1q0g3np', '.xieb3on', '._a9-z', '._a9_1', | |
'._aa4b', '.x1i10hfl', '.x9f619', '.xnz67gz', '.x78zum5', | |
'.x1q0g3np', '.x1gslohp', '.xieb3on', '.x1lcm9me' | |
] | |
for selector in ig_selectors: | |
for element in soup.select(selector): | |
element.decompose() | |
# Try to find the main content | |
insta_content = soup.select_one('main article') or soup.select_one('._aagv') or soup.select_one( | |
'._ab1y') | |
if insta_content: | |
logger.info("Found Instagram main content") | |
elif 'twitter.com' in url or 'x.com' in url: | |
# X/Twitter already works well for public content, but clean up any remaining overlays | |
x_selectors = [ | |
'[data-testid="LoginForm"]', '[data-testid="SignupForm"]', | |
'[data-testid="sheetDialog"]', '[data-testid="mask"]', | |
'.r-zchlnj', '.r-1xcajam', '.r-1d2f490', '.r-1p0dtai', | |
'.r-1pi2tsx', '.r-u8s1d', '.css-175oi2r', '.css-1dbjc4n', | |
'.r-kemksi', '[data-testid="BottomBar"]' | |
] | |
for selector in x_selectors: | |
for element in soup.select(selector): | |
element.decompose() | |
elif 'huggingface.co' in url: | |
# Special handling for Hugging Face | |
logger.info("Applying special handling for Hugging Face") | |
# Try to find the main content | |
hf_selectors = ['.prose', '.space-content', '.model-description', | |
'.dataset-description', 'article', '.markdown'] | |
for selector in hf_selectors: | |
elements = soup.select(selector) | |
if elements: | |
logger.info(f"Found Hugging Face content with selector: {selector}") | |
break | |
# Extract content using a general approach - try multiple strategies | |
# Strategy 1: Look for semantic HTML5 elements | |
main_content = None | |
for selector in ['main', 'article', 'section', '.content', '.main', '.body', '.post', '.entry', | |
'.page']: | |
elements = soup.select(selector) | |
if elements: | |
main_content = elements[0] | |
logger.info(f"Found content with selector: {selector}") | |
break | |
# Strategy 2: If no semantic elements, try common class names | |
if not main_content or not main_content.get_text(strip=True): | |
for div in soup.find_all('div'): | |
class_name = div.get('class', []) | |
id_name = div.get('id', '') | |
if any(term in ' '.join(class_name).lower() for term in | |
['content', 'main', 'body', 'article', 'post']): | |
main_content = div | |
logger.info(f"Found content with div class: {class_name}") | |
break | |
if any(term in id_name.lower() for term in ['content', 'main', 'body', 'article', 'post']): | |
main_content = div | |
logger.info(f"Found content with div id: {id_name}") | |
break | |
# Strategy 3: Fall back to body | |
if not main_content or not main_content.get_text(strip=True): | |
logger.info(f"No main content container found for {url}, using body") | |
main_content = soup.body if soup.body else soup | |
# Extract text with proper spacing | |
text_content = main_content.get_text(separator='\n', strip=True) | |
# Strategy 4: If content is too short, extract all visible text | |
if len(text_content) < 100: | |
logger.info(f"Content too short for {url} ({len(text_content)} chars), using all visible text") | |
visible_text = [] | |
for element in soup.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'span', 'div']): | |
if element.get_text(strip=True): | |
visible_text.append(element.get_text(strip=True)) | |
text_content = '\n'.join(visible_text) | |
# Strategy 5: Last resort - get all text from the page | |
if len(text_content) < 50: | |
logger.info(f"Still insufficient content for {url} ({len(text_content)} chars), using entire page text") | |
text_content = soup.get_text(separator='\n', strip=True) | |
# Clean and structure content | |
cleaned_content = self.advanced_text_cleaning(text_content) | |
logger.info(f"Final content length: {len(cleaned_content)} chars") | |
# If we still have no content, this is a failure | |
if len(cleaned_content) < 20: | |
logger.error(f"Failed to extract meaningful content from {url}") | |
return None | |
return { | |
'content': cleaned_content, | |
'content_type': response.headers.get('Content-Type', ''), | |
'timestamp': datetime.now().isoformat(), | |
'url': url # Add the URL to the returned data for reference | |
} | |
except Exception as e: | |
logger.error(f"HTML processing failed for {url}: {e}") | |
return None | |
def _handle_google_drive(self, url: str) -> Optional[Dict]: | |
"""Handle Google Drive document URLs""" | |
try: | |
# Construct direct download URL | |
file_id = url.split("/d/")[1].split("/")[0] | |
download_url = f"https://drive.google.com/uc?export=download&id={file_id}" | |
response = self.session.get(download_url, stream=True, timeout=self.timeout) | |
response.raise_for_status() | |
# Read content (limit to the first 1MB) | |
content = b"" | |
for chunk in response.iter_content(chunk_size=8192): # 8KB chunks | |
content += chunk | |
if len(content) > 1024 * 1024: # 1MB limit | |
content = content[:1024 * 1024] | |
logger.warning(f"Truncated Google Drive file after 1MB") | |
break | |
text_content = content.decode('utf-8', errors='ignore') | |
cleaned_text = self.advanced_text_cleaning(text_content) | |
return { | |
'content': cleaned_text, | |
'content_type': 'text/plain', # Assume plain text for simplicity | |
'timestamp': datetime.now().isoformat(), | |
'url': url, | |
'source': 'google_drive' | |
} | |
except Exception as e: | |
logger.error(f"Error handling Google Drive URL {url}: {e}") | |
return None | |
def _handle_google_calendar(self, url: str) -> Optional[Dict]: | |
"""Handle Google Calendar ICS URLs""" | |
try: | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
text_content = response.text | |
cleaned_text = self.advanced_text_cleaning(text_content) | |
return { | |
'content': cleaned_text, | |
'content_type': 'text/calendar', # Correct MIME type | |
'timestamp': datetime.now().isoformat(), | |
'url': url, | |
'source': 'google_calendar' | |
} | |
except Exception as e: | |
logger.error(f"Error handling Google Calendar URL {url}: {e}") | |
return None | |
def _fetch_with_selenium(self, url: str) -> Optional[str]: | |
"""Use Selenium as a fallback for difficult sites""" | |
try: | |
from selenium import webdriver | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.common.exceptions import TimeoutException | |
import time | |
logger.info(f"Attempting to fetch {url} with Selenium") | |
# Set up Chrome options | |
chrome_options = Options() | |
chrome_options.add_argument("--headless") | |
chrome_options.add_argument("--no-sandbox") | |
chrome_options.add_argument("--disable-dev-shm-usage") | |
chrome_options.add_argument("--disable-gpu") | |
chrome_options.add_argument("--window-size=1920,1080") | |
chrome_options.add_argument( | |
"user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36") | |
# Initialize the driver | |
driver = webdriver.Chrome(options=chrome_options) | |
try: | |
# Navigate to the URL | |
driver.get(url) | |
# Wait for the page to load | |
WebDriverWait(driver, 10).until( | |
EC.presence_of_element_located((By.TAG_NAME, "body")) | |
) | |
# Simulate pressing ESC key to dismiss overlays | |
from selenium.webdriver.common.keys import Keys | |
action_chains = webdriver.ActionChains(driver) | |
action_chains.send_keys(Keys.ESCAPE).perform() | |
time.sleep(1) # give it a moment to take effect | |
action_chains.reset_actions() # Clear actions | |
# try again | |
action_chains.send_keys(Keys.ESCAPE).perform() | |
time.sleep(1) # give it a moment to take effect | |
action_chains.reset_actions() | |
# Get the page source | |
page_source = driver.page_source | |
# Save the Selenium HTML for debugging | |
debug_path = f"/Users/a2014/urld/debug_selenium_{int(time.time())}.html" | |
with open(debug_path, "w", encoding="utf-8") as f: | |
f.write(page_source) | |
logger.info(f"Saved Selenium HTML to {debug_path}") | |
return page_source | |
finally: | |
driver.quit() | |
except ImportError: | |
logger.error("Selenium is not installed. Cannot use browser automation.") | |
return None | |
except Exception as e: | |
logger.error(f"Selenium processing failed for {url}: {e}") | |
return None | |
class FileProcessor: | |
"""Class to handle file processing""" | |
def __init__(self, max_file_size: int = 2 * 1024 * 1024 * 1024): # 2GB default | |
self.max_file_size = max_file_size | |
self.supported_text_extensions = {'.txt', '.md', '.csv', '.json', '.xml'} | |
def is_text_file(self, filepath: str) -> bool: | |
"""Check if file is a text file""" | |
try: | |
mime_type, _ = mimetypes.guess_type(filepath) | |
return (mime_type and mime_type.startswith('text/')) or \ | |
(os.path.splitext(filepath)[1].lower() in self.supported_text_extensions) | |
except Exception: | |
return False | |
def process_file(self, file) -> List[Dict]: | |
"""Process uploaded file with enhanced error handling""" | |
if not file: | |
return [] | |
dataset = [] | |
try: | |
file_size = os.path.getsize(file.name) | |
if file_size > self.max_file_size: | |
logger.warning(f"File size ({file_size} bytes) exceeds maximum allowed size") | |
ret |