Spaces:
Running
Running
import json | |
import os | |
import re | |
import time | |
import logging | |
import mimetypes | |
import concurrent.futures | |
import string | |
from typing import List, Dict, Optional, Union | |
from pathlib import Path | |
from urllib.parse import urlparse | |
import requests | |
import validators | |
import gradio as gr | |
import torch | |
import cachetools | |
from bs4 import BeautifulSoup | |
from fake_useragent import UserAgent | |
from ratelimit import limits, sleep_and_retry | |
# Advanced Logging Configuration | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler('app_advanced.log', encoding='utf-8') | |
] | |
) | |
logger = logging.getLogger(__name__) | |
class AdvancedURLProcessor: | |
"""Enhanced URL processing with advanced features""" | |
def __init__( | |
self, | |
timeout: int = 15, | |
max_retries: int = 3, | |
concurrent_requests: int = 5, | |
cache_size: int = 100 | |
): | |
self.timeout = timeout | |
self.max_retries = max_retries | |
self.concurrent_requests = concurrent_requests | |
self.ua = UserAgent() | |
# Implement multilevel caching | |
self.url_cache = cachetools.LRUCache(maxsize=cache_size) | |
self.content_cache = cachetools.TTLCache(maxsize=cache_size, ttl=3600) # 1-hour cache | |
self.session = requests.Session() | |
self.session.headers.update({ | |
'User-Agent': self.ua.random, | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'Connection': 'keep-alive' | |
}) | |
# Rate limiting: 10 calls per minute | |
def validate_url(self, url: str) -> Dict[str, Union[bool, str]]: | |
"""Enhanced URL validation with comprehensive checks""" | |
try: | |
# Check cache first | |
if url in self.url_cache: | |
return self.url_cache[url] | |
# Comprehensive URL validation | |
result = urlparse(url) | |
validation_result = { | |
'is_valid': False, | |
'message': 'Invalid URL', | |
'scheme': result.scheme, | |
'netloc': result.netloc | |
} | |
if not all([result.scheme, result.netloc]): | |
validation_result['message'] = 'Missing scheme or network location' | |
return validation_result | |
# Use validators for additional checks | |
if not validators.url(url): | |
validation_result['message'] = 'URL format validation failed' | |
return validation_result | |
# Perform HEAD request for accessibility | |
try: | |
response = self.session.head( | |
url, | |
timeout=self.timeout, | |
allow_redirects=True | |
) | |
validation_result['is_valid'] = response.status_code in [200, 301, 302] | |
validation_result['status_code'] = response.status_code | |
validation_result['message'] = f"URL is {'valid' if validation_result['is_valid'] else 'invalid'}" | |
except requests.RequestException as e: | |
validation_result['message'] = f"Connection error: {str(e)}" | |
# Cache the result | |
self.url_cache[url] = validation_result | |
return validation_result | |
except Exception as e: | |
logger.error(f"Unexpected error validating URL {url}: {e}") | |
return { | |
'is_valid': False, | |
'message': f"Unexpected validation error: {str(e)}" | |
} | |
def advanced_text_cleaning(self, text: str) -> str: | |
"""Sophisticated text cleaning and normalization""" | |
if not text: | |
return "" | |
# Remove control characters | |
text = re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text) | |
# Normalize Unicode characters | |
text = text.encode('ascii', 'ignore').decode('ascii') | |
# Replace multiple whitespaces | |
text = re.sub(r'\s+', ' ', text) | |
# Remove HTML entities | |
text = re.sub(r'&[a-zA-Z]+;', '', text) | |
# Normalize quotation marks | |
text = text.replace('"', '"').replace('"', '"') | |
text = text.replace(''', "'").replace(''', "'") | |
# Remove excessive punctuation | |
text = re.sub(r'([.,!?]){2,}', r'\1', text) | |
return text.strip() | |
# Refined rate limiting | |
def fetch_content(self, url: str) -> Optional[str]: | |
"""Advanced content fetching with multiple safeguards""" | |
# Check content cache first | |
if url in self.content_cache: | |
return self.content_cache[url] | |
for attempt in range(self.max_retries): | |
try: | |
response = self.session.get( | |
url, | |
timeout=self.timeout, | |
headers={'User-Agent': self.ua.random} | |
) | |
response.raise_for_status() | |
# Use BeautifulSoup for more robust parsing | |
soup = BeautifulSoup(response.text, 'html.parser') | |
# Remove scripts, styles, comments | |
for script in soup(["script", "style"]): | |
script.decompose() | |
# Extract clean text | |
text = soup.get_text(separator=' ') | |
cleaned_text = self.advanced_text_cleaning(text) | |
# Cache the result | |
self.content_cache[url] = cleaned_text | |
return cleaned_text | |
except requests.RequestException as e: | |
logger.warning(f"Fetch attempt {attempt + 1} failed for {url}: {e}") | |
time.sleep(2 ** attempt) # Exponential backoff | |
return None | |
def create_advanced_interface(): | |
"""Create a comprehensive Gradio interface with advanced features""" | |
css = """ | |
.container { max-width: 1200px; margin: auto; } | |
.warning { background-color: #fff3cd; color: #856404; } | |
.error { background-color: #f8d7da; color: #721c24; } | |
""" | |
with gr.Blocks(css=css, title="Advanced Text & URL Processor") as interface: | |
gr.Markdown("# π Advanced URL & Text Processing Toolkit") | |
with gr.Tab("URL Processing"): | |
url_input = gr.Textbox( | |
label="Enter URLs (comma or newline separated)", | |
lines=5, | |
placeholder="https://example1.com\nhttps://example2.com" | |
) | |
with gr.Tab("Text Input"): | |
text_input = gr.Textbox( | |
label="Raw Text Input", | |
lines=5, | |
placeholder="Paste your text here..." | |
) | |
process_btn = gr.Button("Process Input", variant="primary") | |
output_text = gr.Textbox(label="Processing Results", interactive=False) | |
output_file = gr.File(label="Processed Output") | |
def process_input(urls, text): | |
try: | |
processor = AdvancedURLProcessor() | |
results = [] | |
# Process URLs | |
if urls: | |
url_list = re.split(r'[,\n]', urls) | |
url_list = [url.strip() for url in url_list if url.strip()] | |
for url in url_list: | |
validation = processor.validate_url(url) | |
if validation.get('is_valid'): | |
content = processor.fetch_content(url) | |
if content: | |
results.append({ | |
'url': url, | |
'content': content, | |
'timestamp': time.strftime("%Y-%m-%d %H:%M:%S") | |
}) | |
# Process text input | |
if text: | |
cleaned_text = processor.advanced_text_cleaning(text) | |
results.append({ | |
'source': 'direct_input', | |
'content': cleaned_text, | |
'timestamp': time.strftime("%Y-%m-%d %H:%M:%S") | |
}) | |
# Generate output | |
if results: | |
output_path = 'processed_data.json' | |
with open(output_path, 'w', encoding='utf-8') as f: | |
json.dump(results, f, ensure_ascii=False, indent=2) | |
summary = f"Processed {len(results)} items successfully!" | |
return output_path, summary | |
else: | |
return None, "No valid content to process." | |
except Exception as e: | |
logger.error(f"Processing error: {e}") | |
return None, f"Error: {str(e)}" | |
process_btn.click( | |
process_input, | |
inputs=[url_input, text_input], | |
outputs=[output_file, output_text] | |
) | |
gr.Markdown(""" | |
### Usage Guidelines | |
- URL Processing: Enter valid HTTP/HTTPS URLs | |
- Text Input: Direct text processing | |
- Advanced cleaning and validation included | |
""") | |
return interface | |
def main(): | |
# Configure system settings | |
mimetypes.init() | |
# Create and launch interface | |
interface = create_advanced_interface() | |
interface.launch( | |
share=True, | |
server_name="0.0.0.0", | |
server_port=7860, | |
debug=True | |
) | |
if __name__ == "__main__": | |
main() |