urld / app.py
acecalisto3's picture
Update app.py
200e562 verified
raw
history blame
14.3 kB
import json
import os
import re
import time
import logging
import mimetypes
import concurrent.futures
import string
from typing import List, Dict, Optional, Union, Any
from pathlib import Path
from urllib.parse import urlparse
import requests
import validators
import gradio as gr
import torch
import cachetools
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from ratelimit import limits, sleep_and_retry
# Advanced Logging Configuration
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('app_advanced.log', encoding='utf-8')
]
)
logger = logging.getLogger(__name__)
class AdvancedURLProcessor:
"""Enhanced URL processing with advanced features"""
def __init__(
self,
timeout: int = 15,
max_retries: int = 3,
concurrent_requests: int = 5,
cache_size: int = 100
):
self.timeout = timeout
self.max_retries = max_retries
self.concurrent_requests = concurrent_requests
self.ua = UserAgent()
# Implement multilevel caching
self.url_cache = cachetools.LRUCache(maxsize=cache_size)
self.content_cache = cachetools.TTLCache(maxsize=cache_size, ttl=3600) # 1-hour cache
self.session = requests.Session()
self.session.headers.update({
'User-Agent': self.ua.random,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Connection': 'keep-alive'
})
@sleep_and_retry
@limits(calls=10, period=60) # Rate limiting: 10 calls per minute
def validate_url(self, url: str) -> Dict[str, Union[bool, str]]:
"""Enhanced URL validation with comprehensive checks"""
try:
# Check cache first
if url in self.url_cache:
return self.url_cache[url]
# Comprehensive URL validation
result = urlparse(url)
validation_result = {
'is_valid': False,
'message': 'Invalid URL',
'scheme': result.scheme,
'netloc': result.netloc
}
if not all([result.scheme, result.netloc]):
validation_result['message'] = 'Missing scheme or network location'
return validation_result
# Use validators for additional checks
if not validators.url(url):
validation_result['message'] = 'URL format validation failed'
return validation_result
# Perform HEAD request for accessibility
try:
response = self.session.head(
url,
timeout=self.timeout,
allow_redirects=True
)
validation_result['is_valid'] = response.status_code in [200, 301, 302]
validation_result['status_code'] = response.status_code
validation_result['message'] = f"URL is {'valid' if validation_result['is_valid'] else 'invalid'}"
except requests.RequestException as e:
validation_result['message'] = f"Connection error: {str(e)}"
# Cache the result
self.url_cache[url] = validation_result
return validation_result
except Exception as e:
logger.error(f"Unexpected error validating URL {url}: {e}")
return {
'is_valid': False,
'message': f"Unexpected validation error: {str(e)}"
}
@sleep_and_retry
@limits(calls=10, period=60)
async def fetch_content(self, url: str) -> Optional[str]:
"""Fetch content from URL with retry mechanism and caching"""
try:
# Check content cache first
if url in self.content_cache:
logger.info(f"Cache hit for URL: {url}")
return self.content_cache[url]
for attempt in range(self.max_retries):
try:
response = self.session.get(url, timeout=self.timeout)
response.raise_for_status()
content = response.text
# Cache the content
self.content_cache[url] = content
return content
except requests.RequestException as e:
logger.warning(f"Attempt {attempt + 1}/{self.max_retries} failed for {url}: {str(e)}")
if attempt == self.max_retries - 1:
raise
time.sleep(1) # Delay between retries
except Exception as e:
logger.error(f"Error fetching content from {url}: {e}")
return None
class ContentExtractor:
"""Advanced content extraction and processing"""
def __init__(self):
self.cleaners = [
self._remove_scripts,
self._remove_styles,
self._remove_special_chars,
self._normalize_whitespace
]
def extract_text(self, html: str, url: str = "") -> Dict[str, Union[str, Dict]]:
"""Extract and clean text content with metadata"""
try:
if not html:
return {
"success": False,
"content": "",
"metadata": {"error": "Empty HTML content"}
}
soup = BeautifulSoup(html, 'html.parser')
# Extract metadata
metadata = self._extract_metadata(soup, url)
# Clean content
content = self._process_content(soup)
return {
"success": True,
"content": content,
"metadata": metadata
}
except Exception as e:
logger.error(f"Content extraction error for {url}: {e}")
return {
"success": False,
"content": "",
"metadata": {"error": str(e)}
}
def _extract_metadata(self, soup: BeautifulSoup, url: str) -> Dict:
"""Extract page metadata"""
metadata = {
"title": self._get_title(soup),
"description": self._get_meta_description(soup),
"keywords": self._get_meta_keywords(soup),
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"url": url
}
return metadata
def _process_content(self, soup: BeautifulSoup) -> str:
"""Process and clean content through multiple passes"""
for cleaner in self.cleaners:
soup = cleaner(soup)
# Extract text with preserved structure
lines = []
for element in soup.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6']):
text = element.get_text(strip=True)
if text:
lines.append(text)
return "\n".join(lines)
@staticmethod
def _remove_scripts(soup: BeautifulSoup) -> BeautifulSoup:
for script in soup(["script", "style", "iframe", "noscript"]):
script.decompose()
return soup
@staticmethod
def _remove_styles(soup: BeautifulSoup) -> BeautifulSoup:
for element in soup.find_all(style=True):
del element['style']
return soup
@staticmethod
def _remove_special_chars(soup: BeautifulSoup) -> BeautifulSoup:
text = soup.get_text()
text = re.sub(r'[^\w\s\.\,\!\?\-]', '', text)
new_soup = BeautifulSoup(f"<div>{text}</div>", 'html.parser')
return new_soup
@staticmethod
def _normalize_whitespace(soup: BeautifulSoup) -> BeautifulSoup:
text = soup.get_text()
text = re.sub(r'\s+', ' ', text)
new_soup = BeautifulSoup(f"<div>{text}</div>", 'html.parser')
return new_soup
@staticmethod
def _get_title(soup: BeautifulSoup) -> str:
title = soup.find('title')
return title.get_text(strip=True) if title else ""
@staticmethod
def _get_meta_description(soup: BeautifulSoup) -> str:
meta = soup.find('meta', attrs={'name': 'description'})
return meta.get('content', '') if meta else ""
@staticmethod
def _get_meta_keywords(soup: BeautifulSoup) -> str:
meta = soup.find('meta', attrs={'name': 'keywords'})
return meta.get('content', '') if meta else ""
class ContentProcessor:
"""Main content processing orchestrator"""
def __init__(self):
self.url_processor = AdvancedURLProcessor()
self.content_extractor = ContentExtractor()
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=5)
async def process_urls(self, urls: List[str]) -> Dict[str, Any]:
"""Process multiple URLs concurrently with advanced error handling"""
results = {
"successful": [],
"failed": [],
"metadata": {
"total_urls": len(urls),
"start_time": time.strftime("%Y-%m-%d %H:%M:%S")
}
}
try:
# Validate URLs first
valid_urls = []
for url in urls:
validation_result = self.url_processor.validate_url(url)
if validation_result['is_valid']:
valid_urls.append(url)
else:
results['failed'].append({
"url": url,
"error": validation_result['message']
})
# Process valid URLs concurrently
futures = []
for url in valid_urls:
future = self.executor.submit(self._process_single_url, url)
futures.append((url, future))
# Collect results
for url, future in futures:
try:
result = future.result(timeout=30) # 30-second timeout
if result["success"]:
results["successful"].append(result)
else:
results["failed"].append({
"url": url,
"error": "Processing failed"
})
except Exception as e:
logger.error(f"Error processing {url}: {e}")
results["failed"].append({
"url": url,
"error": str(e)
})
# Update metadata
results["metadata"].update({
"end_time": time.strftime("%Y-%m-%d %H:%M:%S"),
"successful_count": len(results["successful"]),
"failed_count": len(results["failed"])
})
return results
except Exception as e:
logger.error(f"Batch processing error: {e}")
raise
def _process_single_url(self, url: str) -> Dict:
"""Process a single URL with comprehensive error handling"""
try:
response = self.url_processor.session.get(
url,
timeout=self.url_processor.timeout
)
response.raise_for_status()
result = self.content_extractor.extract_text(
response.text,
url
)
result["url"] = url
result["timestamp"] = time.strftime("%Y-%m-%d %H:%M:%S")
return result
except Exception as e:
logger.error(f"Error processing {url}: {e}")
return {
"success": False,
"url": url,
"error": str(e)
}
def create_interface():
"""Create Gradio interface with advanced features"""
processor = ContentProcessor()
with gr.Blocks(title="Advanced URL Content Processor") as interface:
gr.Markdown("# Advanced URL Content Processor")
with gr.Row():
with gr.Column():
url_input = gr.Textbox(
label="Enter URLs (one per line)",
placeholder="https://example.com\nhttps://example.org",
lines=5
)
with gr.Row():
process_btn = gr.Button("Process URLs", variant="primary")
clear_btn = gr.Button("Clear")
with gr.Column():
status_output = gr.JSON(
label="Processing Results",
show_label=True
)
gr.Markdown("## Processing Status")
with gr.Row():
progress_output = gr.Textbox(
label="Progress",
show_label=True
)
async def process_urls(urls):
if not urls.strip():
return {"error": "No URLs provided"}
url_list = [url.strip() for url in urls.splitlines() if url.strip()]
results = await processor.process_urls(url_list)
return results
def clear_inputs():
return None, None
process_btn.click(
fn=process_urls,
inputs=[url_input],
outputs=[status_output]
)
clear_btn.click(
fn=clear_inputs,
inputs=[],
outputs=[url_input, status_output]
)
return interface
if __name__ == "__main__":
# Initialize mimetypes
mimetypes.init()
# Create and launch interface
interface = create_interface()
interface.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
debug=True
)