Spaces:
Running
Running
import json | |
import os | |
import re | |
import time | |
import logging | |
import mimetypes | |
import concurrent.futures | |
import string | |
from typing import List, Dict, Optional, Union, Any | |
from pathlib import Path | |
from urllib.parse import urlparse | |
import requests | |
import validators | |
import gradio as gr | |
import torch | |
import cachetools | |
from bs4 import BeautifulSoup | |
from fake_useragent import UserAgent | |
from ratelimit import limits, sleep_and_retry | |
# Advanced Logging Configuration | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler('app_advanced.log', encoding='utf-8') | |
] | |
) | |
logger = logging.getLogger(__name__) | |
class AdvancedURLProcessor: | |
"""Enhanced URL processing with advanced features""" | |
def __init__( | |
self, | |
timeout: int = 15, | |
max_retries: int = 3, | |
concurrent_requests: int = 5, | |
cache_size: int = 100 | |
): | |
self.timeout = timeout | |
self.max_retries = max_retries | |
self.concurrent_requests = concurrent_requests | |
self.ua = UserAgent() | |
# Implement multilevel caching | |
self.url_cache = cachetools.LRUCache(maxsize=cache_size) | |
self.content_cache = cachetools.TTLCache(maxsize=cache_size, ttl=3600) # 1-hour cache | |
self.session = requests.Session() | |
self.session.headers.update({ | |
'User-Agent': self.ua.random, | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'Connection': 'keep-alive' | |
}) | |
# Rate limiting: 10 calls per minute | |
def validate_url(self, url: str) -> Dict[str, Union[bool, str]]: | |
"""Enhanced URL validation with comprehensive checks""" | |
try: | |
# Check cache first | |
if url in self.url_cache: | |
return self.url_cache[url] | |
# Comprehensive URL validation | |
result = urlparse(url) | |
validation_result = { | |
'is_valid': False, | |
'message': 'Invalid URL', | |
'scheme': result.scheme, | |
'netloc': result.netloc | |
} | |
if not all([result.scheme, result.netloc]): | |
validation_result['message'] = 'Missing scheme or network location' | |
return validation_result | |
# Use validators for additional checks | |
if not validators.url(url): | |
validation_result['message'] = 'URL format validation failed' | |
return validation_result | |
# Perform HEAD request for accessibility | |
try: | |
response = self.session.head( | |
url, | |
timeout=self.timeout, | |
allow_redirects=True | |
) | |
validation_result['is_valid'] = response.status_code in [200, 301, 302] | |
validation_result['status_code'] = response.status_code | |
validation_result['message'] = f"URL is {'valid' if validation_result['is_valid'] else 'invalid'}" | |
except requests.RequestException as e: | |
validation_result['message'] = f"Connection error: {str(e)}" | |
# Cache the result | |
self.url_cache[url] = validation_result | |
return validation_result | |
except Exception as e: | |
logger.error(f"Unexpected error validating URL {url}: {e}") | |
return { | |
'is_valid': False, | |
'message': f"Unexpected validation error: {str(e)}" | |
} | |
async def fetch_content(self, url: str) -> Optional[str]: | |
"""Fetch content from URL with retry mechanism and caching""" | |
try: | |
# Check content cache first | |
if url in self.content_cache: | |
logger.info(f"Cache hit for URL: {url}") | |
return self.content_cache[url] | |
for attempt in range(self.max_retries): | |
try: | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
content = response.text | |
# Cache the content | |
self.content_cache[url] = content | |
return content | |
except requests.RequestException as e: | |
logger.warning(f"Attempt {attempt + 1}/{self.max_retries} failed for {url}: {str(e)}") | |
if attempt == self.max_retries - 1: | |
raise | |
time.sleep(1) # Delay between retries | |
except Exception as e: | |
logger.error(f"Error fetching content from {url}: {e}") | |
return None | |
class ContentExtractor: | |
"""Advanced content extraction and processing""" | |
def __init__(self): | |
self.cleaners = [ | |
self._remove_scripts, | |
self._remove_styles, | |
self._remove_special_chars, | |
self._normalize_whitespace | |
] | |
def extract_text(self, html: str, url: str = "") -> Dict[str, Union[str, Dict]]: | |
"""Extract and clean text content with metadata""" | |
try: | |
if not html: | |
return { | |
"success": False, | |
"content": "", | |
"metadata": {"error": "Empty HTML content"} | |
} | |
soup = BeautifulSoup(html, 'html.parser') | |
# Extract metadata | |
metadata = self._extract_metadata(soup, url) | |
# Clean content | |
content = self._process_content(soup) | |
return { | |
"success": True, | |
"content": content, | |
"metadata": metadata | |
} | |
except Exception as e: | |
logger.error(f"Content extraction error for {url}: {e}") | |
return { | |
"success": False, | |
"content": "", | |
"metadata": {"error": str(e)} | |
} | |
def _extract_metadata(self, soup: BeautifulSoup, url: str) -> Dict: | |
"""Extract page metadata""" | |
metadata = { | |
"title": self._get_title(soup), | |
"description": self._get_meta_description(soup), | |
"keywords": self._get_meta_keywords(soup), | |
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), | |
"url": url | |
} | |
return metadata | |
def _process_content(self, soup: BeautifulSoup) -> str: | |
"""Process and clean content through multiple passes""" | |
for cleaner in self.cleaners: | |
soup = cleaner(soup) | |
# Extract text with preserved structure | |
lines = [] | |
for element in soup.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6']): | |
text = element.get_text(strip=True) | |
if text: | |
lines.append(text) | |
return "\n".join(lines) | |
def _remove_scripts(soup: BeautifulSoup) -> BeautifulSoup: | |
for script in soup(["script", "style", "iframe", "noscript"]): | |
script.decompose() | |
return soup | |
def _remove_styles(soup: BeautifulSoup) -> BeautifulSoup: | |
for element in soup.find_all(style=True): | |
del element['style'] | |
return soup | |
def _remove_special_chars(soup: BeautifulSoup) -> BeautifulSoup: | |
text = soup.get_text() | |
text = re.sub(r'[^\w\s\.\,\!\?\-]', '', text) | |
new_soup = BeautifulSoup(f"<div>{text}</div>", 'html.parser') | |
return new_soup | |
def _normalize_whitespace(soup: BeautifulSoup) -> BeautifulSoup: | |
text = soup.get_text() | |
text = re.sub(r'\s+', ' ', text) | |
new_soup = BeautifulSoup(f"<div>{text}</div>", 'html.parser') | |
return new_soup | |
def _get_title(soup: BeautifulSoup) -> str: | |
title = soup.find('title') | |
return title.get_text(strip=True) if title else "" | |
def _get_meta_description(soup: BeautifulSoup) -> str: | |
meta = soup.find('meta', attrs={'name': 'description'}) | |
return meta.get('content', '') if meta else "" | |
def _get_meta_keywords(soup: BeautifulSoup) -> str: | |
meta = soup.find('meta', attrs={'name': 'keywords'}) | |
return meta.get('content', '') if meta else "" | |
class ContentProcessor: | |
"""Main content processing orchestrator""" | |
def __init__(self): | |
self.url_processor = AdvancedURLProcessor() | |
self.content_extractor = ContentExtractor() | |
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=5) | |
async def process_urls(self, urls: List[str]) -> Dict[str, Any]: | |
"""Process multiple URLs concurrently with advanced error handling""" | |
results = { | |
"successful": [], | |
"failed": [], | |
"metadata": { | |
"total_urls": len(urls), | |
"start_time": time.strftime("%Y-%m-%d %H:%M:%S") | |
} | |
} | |
try: | |
# Validate URLs first | |
valid_urls = [] | |
for url in urls: | |
validation_result = self.url_processor.validate_url(url) | |
if validation_result['is_valid']: | |
valid_urls.append(url) | |
else: | |
results['failed'].append({ | |
"url": url, | |
"error": validation_result['message'] | |
}) | |
# Process valid URLs concurrently | |
futures = [] | |
for url in valid_urls: | |
future = self.executor.submit(self._process_single_url, url) | |
futures.append((url, future)) | |
# Collect results | |
for url, future in futures: | |
try: | |
result = future.result(timeout=30) # 30-second timeout | |
if result["success"]: | |
results["successful"].append(result) | |
else: | |
results["failed"].append({ | |
"url": url, | |
"error": "Processing failed" | |
}) | |
except Exception as e: | |
logger.error(f"Error processing {url}: {e}") | |
results["failed"].append({ | |
"url": url, | |
"error": str(e) | |
}) | |
# Update metadata | |
results["metadata"].update({ | |
"end_time": time.strftime("%Y-%m-%d %H:%M:%S"), | |
"successful_count": len(results["successful"]), | |
"failed_count": len(results["failed"]) | |
}) | |
return results | |
except Exception as e: | |
logger.error(f"Batch processing error: {e}") | |
raise | |
def _process_single_url(self, url: str) -> Dict: | |
"""Process a single URL with comprehensive error handling""" | |
try: | |
response = self.url_processor.session.get( | |
url, | |
timeout=self.url_processor.timeout | |
) | |
response.raise_for_status() | |
result = self.content_extractor.extract_text( | |
response.text, | |
url | |
) | |
result["url"] = url | |
result["timestamp"] = time.strftime("%Y-%m-%d %H:%M:%S") | |
return result | |
except Exception as e: | |
logger.error(f"Error processing {url}: {e}") | |
return { | |
"success": False, | |
"url": url, | |
"error": str(e) | |
} | |
def create_interface(): | |
"""Create Gradio interface with advanced features""" | |
processor = ContentProcessor() | |
with gr.Blocks(title="Advanced URL Content Processor") as interface: | |
gr.Markdown("# Advanced URL Content Processor") | |
with gr.Row(): | |
with gr.Column(): | |
url_input = gr.Textbox( | |
label="Enter URLs (one per line)", | |
placeholder="https://example.com\nhttps://example.org", | |
lines=5 | |
) | |
with gr.Row(): | |
process_btn = gr.Button("Process URLs", variant="primary") | |
clear_btn = gr.Button("Clear") | |
with gr.Column(): | |
status_output = gr.JSON( | |
label="Processing Results", | |
show_label=True | |
) | |
gr.Markdown("## Processing Status") | |
with gr.Row(): | |
progress_output = gr.Textbox( | |
label="Progress", | |
show_label=True | |
) | |
async def process_urls(urls): | |
if not urls.strip(): | |
return {"error": "No URLs provided"} | |
url_list = [url.strip() for url in urls.splitlines() if url.strip()] | |
results = await processor.process_urls(url_list) | |
return results | |
def clear_inputs(): | |
return None, None | |
process_btn.click( | |
fn=process_urls, | |
inputs=[url_input], | |
outputs=[status_output] | |
) | |
clear_btn.click( | |
fn=clear_inputs, | |
inputs=[], | |
outputs=[url_input, status_output] | |
) | |
return interface | |
if __name__ == "__main__": | |
# Initialize mimetypes | |
mimetypes.init() | |
# Create and launch interface | |
interface = create_interface() | |
interface.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
share=False, | |
debug=True | |
) | |