urld / app.py
acecalisto3's picture
Update app.py
554b5c7 verified
raw
history blame
10 kB
import json
import os
import re
import time
import logging
import mimetypes
import concurrent.futures
import string
from typing import List, Dict, Optional, Union
from pathlib import Path
from urllib.parse import urlparse
import requests
import validators
import gradio as gr
import torch
import cachetools
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from ratelimit import limits, sleep_and_retry
# Advanced Logging Configuration
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('app_advanced.log', encoding='utf-8')
]
)
logger = logging.getLogger(__name__)
class AdvancedURLProcessor:
"""Enhanced URL processing with advanced features"""
def __init__(
self,
timeout: int = 15,
max_retries: int = 3,
concurrent_requests: int = 5,
cache_size: int = 100
):
self.timeout = timeout
self.max_retries = max_retries
self.concurrent_requests = concurrent_requests
self.ua = UserAgent()
# Implement multilevel caching
self.url_cache = cachetools.LRUCache(maxsize=cache_size)
self.content_cache = cachetools.TTLCache(maxsize=cache_size, ttl=3600) # 1-hour cache
self.session = requests.Session()
self.session.headers.update({
'User-Agent': self.ua.random,
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'Connection': 'keep-alive'
})
@sleep_and_retry
@limits(calls=10, period=60) # Rate limiting: 10 calls per minute
def validate_url(self, url: str) -> Dict[str, Union[bool, str]]:
"""Enhanced URL validation with comprehensive checks"""
try:
# Check cache first
if url in self.url_cache:
return self.url_cache[url]
# Comprehensive URL validation
result = urlparse(url)
validation_result = {
'is_valid': False,
'message': 'Invalid URL',
'scheme': result.scheme,
'netloc': result.netloc
}
if not all([result.scheme, result.netloc]):
validation_result['message'] = 'Missing scheme or network location'
return validation_result
# Use validators for additional checks
if not validators.url(url):
validation_result['message'] = 'URL format validation failed'
return validation_result
# Perform HEAD request for accessibility
try:
response = self.session.head(
url,
timeout=self.timeout,
allow_redirects=True
)
validation_result['is_valid'] = response.status_code in [200, 301, 302]
validation_result['status_code'] = response.status_code
validation_result['message'] = f"URL is {'valid' if validation_result['is_valid'] else 'invalid'}"
except requests.RequestException as e:
validation_result['message'] = f"Connection error: {str(e)}"
# Cache the result
self.url_cache[url] = validation_result
return validation_result
except Exception as e:
logger.error(f"Unexpected error validating URL {url}: {e}")
return {
'is_valid': False,
'message': f"Unexpected validation error: {str(e)}"
}
def advanced_text_cleaning(self, text: str) -> str:
"""Sophisticated text cleaning and normalization"""
if not text:
return ""
# Remove control characters
text = re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text)
# Normalize Unicode characters
text = text.encode('ascii', 'ignore').decode('ascii')
# Replace multiple whitespaces
text = re.sub(r'\s+', ' ', text)
# Remove HTML entities
text = re.sub(r'&[a-zA-Z]+;', '', text)
# Normalize quotation marks
text = text.replace('"', '"').replace('"', '"')
text = text.replace(''', "'").replace(''', "'")
# Remove excessive punctuation
text = re.sub(r'([.,!?]){2,}', r'\1', text)
return text.strip()
@sleep_and_retry
@limits(calls=20, period=60) # Refined rate limiting
def fetch_content(self, url: str) -> Optional[str]:
"""Advanced content fetching with multiple safeguards"""
# Check content cache first
if url in self.content_cache:
return self.content_cache[url]
for attempt in range(self.max_retries):
try:
response = self.session.get(
url,
timeout=self.timeout,
headers={'User-Agent': self.ua.random}
)
response.raise_for_status()
# Use BeautifulSoup for more robust parsing
soup = BeautifulSoup(response.text, 'html.parser')
# Remove scripts, styles, comments
for script in soup(["script", "style"]):
script.decompose()
# Extract clean text
text = soup.get_text(separator=' ')
cleaned_text = self.advanced_text_cleaning(text)
# Cache the result
self.content_cache[url] = cleaned_text
return cleaned_text
except requests.RequestException as e:
logger.warning(f"Fetch attempt {attempt + 1} failed for {url}: {e}")
time.sleep(2 ** attempt) # Exponential backoff
return None
def create_advanced_interface():
"""Create a comprehensive Gradio interface with advanced features"""
css = """
.container { max-width: 1200px; margin: auto; }
.warning { background-color: #fff3cd; color: #856404; }
.error { background-color: #f8d7da; color: #721c24; }
"""
with gr.Blocks(css=css, title="Advanced Text & URL Processor") as interface:
gr.Markdown("# 🌐 Advanced URL & Text Processing Toolkit")
with gr.Tab("URL Processing"):
url_input = gr.Textbox(
label="Enter URLs (comma or newline separated)",
lines=5,
placeholder="https://example1.com\nhttps://example2.com"
)
with gr.Tab("Text Input"):
text_input = gr.Textbox(
label="Raw Text Input",
lines=5,
placeholder="Paste your text here..."
)
process_btn = gr.Button("Process Input", variant="primary")
output_text = gr.Textbox(label="Processing Results", interactive=False)
output_file = gr.File(label="Processed Output")
def process_input(urls, text):
try:
processor = AdvancedURLProcessor()
results = []
# Process URLs
if urls:
url_list = re.split(r'[,\n]', urls)
url_list = [url.strip() for url in url_list if url.strip()]
for url in url_list:
validation = processor.validate_url(url)
if validation.get('is_valid'):
content = processor.fetch_content(url)
if content:
results.append({
'url': url,
'content': content,
'timestamp': time.strftime("%Y-%m-%d %H:%M:%S")
})
# Process text input
if text:
cleaned_text = processor.advanced_text_cleaning(text)
results.append({
'source': 'direct_input',
'content': cleaned_text,
'timestamp': time.strftime("%Y-%m-%d %H:%M:%S")
})
# Generate output
if results:
output_path = 'processed_data.json'
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(results, f, ensure_ascii=False, indent=2)
summary = f"Processed {len(results)} items successfully!"
return output_path, summary
else:
return None, "No valid content to process."
except Exception as e:
logger.error(f"Processing error: {e}")
return None, f"Error: {str(e)}"
process_btn.click(
process_input,
inputs=[url_input, text_input],
outputs=[output_file, output_text]
)
gr.Markdown("""
### Usage Guidelines
- URL Processing: Enter valid HTTP/HTTPS URLs
- Text Input: Direct text processing
- Advanced cleaning and validation included
""")
return interface
def main():
# Configure system settings
mimetypes.init()
# Create and launch interface
interface = create_advanced_interface()
interface.launch(
share=True,
server_name="0.0.0.0",
server_port=7860,
debug=True
)
if __name__ == "__main__":
main()