urld / app.py
acecalisto3's picture
Update app.py
c1f083f verified
raw
history blame
13.1 kB
import json
import os
import torch
import string
import requests
from bs4 import BeautifulSoup
import tempfile
import zipfile
import mimetypes
from tqdm import tqdm
import logging
import gradio as gr
from typing import List, Dict, Union, Optional
from urllib.parse import urlparse
import concurrent.futures
import validators
from pathlib import Path
import re
# Setup logging with more detailed configuration
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('app.log')
]
)
logger = logging.getLogger(__name__)
class URLProcessor:
"""Class to handle URL processing with advanced features"""
def __init__(self, timeout: int = 10, max_retries: int = 3, concurrent_requests: int = 5):
self.timeout = timeout
self.max_retries = max_retries
self.concurrent_requests = concurrent_requests
self.session = requests.Session()
# Add common headers to mimic browser behavior
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
})
def validate_url(self, url: str) -> bool:
"""Validate URL format and accessibility"""
try:
result = urlparse(url)
return all([result.scheme, result.netloc]) and validators.url(url)
except Exception as e:
logger.warning(f"Invalid URL format: {url} - {str(e)}")
return False
def fetch_content(self, url: str) -> Optional[str]:
"""Fetch content from URL with retry mechanism"""
for attempt in range(self.max_retries):
try:
response = self.session.get(url, timeout=self.timeout)
response.raise_for_status()
return response.text
except requests.RequestException as e:
logger.error(f"Attempt {attempt + 1}/{self.max_retries} failed for {url}: {str(e)}")
if attempt == self.max_retries - 1:
return None
time.sleep(1) # Delay between retries
def process_urls(self, urls: List[str]) -> List[Dict]:
"""Process multiple URLs concurrently"""
valid_urls = [url for url in urls if self.validate_url(url)]
if not valid_urls:
logger.warning("No valid URLs to process")
return []
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=self.concurrent_requests) as executor:
future_to_url = {executor.submit(self.fetch_content, url): url for url in valid_urls}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
html = future.result()
if html:
text = extract_text(html)
if text:
results.append({
"source": "url",
"url": url,
"content": text,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
})
else:
logger.warning(f"No text content extracted from {url}")
except Exception as e:
logger.error(f"Error processing {url}: {str(e)}")
return results
def extract_text(html: str) -> str:
"""Enhanced text extraction with better cleaning"""
if not html:
return ""
soup = BeautifulSoup(html, 'html.parser')
# Remove unwanted elements
for element in soup(['script', 'style', 'header', 'footer', 'nav']):
element.decompose()
# Extract text with better formatting
text = soup.get_text(separator=' ')
# Clean up the text
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = ' '.join(chunk for chunk in chunks if chunk)
# Remove excessive whitespace
text = re.sub(r'\s+', ' ', text)
return text.strip()
class FileProcessor:
"""Class to handle file processing"""
def __init__(self, max_file_size: int = 10 * 1024 * 1024): # 10MB default
self.max_file_size = max_file_size
self.supported_text_extensions = {'.txt', '.md', '.csv', '.json', '.xml'}
def is_text_file(self, filepath: str) -> bool:
"""Check if file is a text file"""
try:
mime_type, _ = mimetypes.guess_type(filepath)
return mime_type and mime_type.startswith('text/')
except Exception:
return False
def process_file(self, file) -> List[Dict]:
"""Process uploaded file with enhanced error handling"""
if not file:
return []
dataset = []
try:
file_size = os.path.getsize(file.name)
if file_size > self.max_file_size:
logger.warning(f"File size ({file_size} bytes) exceeds maximum allowed size")
return []
with tempfile.TemporaryDirectory() as temp_dir:
if zipfile.is_zipfile(file.name):
dataset.extend(self._process_zip_file(file.name, temp_dir))
else:
dataset.extend(self._process_single_file(file))
except Exception as e:
logger.error(f"Error processing file: {str(e)}")
return []
return dataset
def _process_zip_file(self, zip_path: str, temp_dir: str) -> List[Dict]:
"""Process ZIP file contents"""
results = []
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(temp_dir)
for root, _, files in os.walk(temp_dir):
for filename in files:
filepath = os.path.join(root, filename)
if self.is_text_file(filepath):
try:
with open(filepath, 'r', errors='ignore') as f:
content = f.read()
if content.strip():
results.append({
"source": "file",
"filename": filename,
"content": content,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
})
except Exception as e:
logger.error(f"Error reading file {filename}: {str(e)}")
return results
def _process_single_file(self, file) -> List[Dict]:
"""Process single file"""
results = []
try:
content = file.read().decode('utf-8', errors='ignore')
if content.strip():
results.append({
"source": "file",
"filename": os.path.basename(file.name),
"content": content,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
})
except Exception as e:
logger.error(f"Error processing single file: {str(e)}")
return results
def preprocess_bulk_text(text: str) -> str:
"""Enhanced text preprocessing"""
if not text:
return ""
# Normalize line endings
text = text.replace('\r\n', '\n').replace('\r', '\n')
# Define separators
separators = ['\n', ' / ', '/', ';', ' - ', '|', ' ']
# Replace separators with commas if not already comma-separated
if ',' not in text:
for separator in separators:
text = text.replace(separator, ',')
# Handle domain endings
domain_pattern = r'(\.[a-z]{2,})\s+'
text = re.sub(domain_pattern, r'\1,', text)
# Clean up multiple commas and whitespace
text = re.sub(r',+', ',', text)
text = text.strip(',' + string.whitespace)
text = re.sub(r'\s*,\s*', ', ', text)
return text
def create_interface():
"""Create enhanced Gradio interface"""
# Custom CSS for better styling
custom_css = """
.container { max-width: 1200px; margin: auto; padding: 20px; }
.output-panel { margin-top: 20px; }
.warning { color: #856404; background-color: #fff3cd; padding: 10px; border-radius: 4px; }
.error { color: #721c24; background-color: #f8d7da; padding: 10px; border-radius: 4px; }
"""
with gr.Blocks(css=custom_css) as interface:
gr.Markdown("# Advanced URL and Text Processing Tool")
with gr.Tab("URL Input"):
url_input = gr.Textbox(
label="Enter URLs (comma-separated or one per line)",
placeholder="https://example1.com, https://example2.com",
lines=5
)
with gr.Tab("File Input"):
file_input = gr.File(
label="Upload text file or ZIP archive",
file_types=[".txt", ".zip", ".md", ".csv", ".json", ".xml"]
)
with gr.Tab("Text Input"):
text_input = gr.Textbox(
label="Enter text directly",
placeholder="Enter your text here...",
lines=5
)
# Process button with loading state
process_btn = gr.Button("Process Input", variant="primary")
# Output components
with gr.Row():
output_file = gr.File(label="Processed Dataset")
output_text = gr.Textbox(
label="Processing Results",
lines=3,
interactive=False
)
def process_all_inputs(urls, file, text):
"""Process all input types with progress tracking"""
try:
dataset = []
# Process URLs
if urls:
url_processor = URLProcessor()
url_list = [u.strip() for u in urls.split(',') if u.strip()]
dataset.extend(url_processor.process_urls(url_list))
# Process files
if file:
file_processor = FileProcessor()
dataset.extend(file_processor.process_file(file))
# Process text input
if text:
processed_text = preprocess_bulk_text(text)
if processed_text:
dataset.append({
"source": "input",
"content": processed_text,
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S")
})
if not dataset:
return [None, "No valid data to process. Please check your inputs."]
# Save results
output_file = 'processed_dataset.json'
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(dataset, f, indent=2, ensure_ascii=False)
# Generate summary
summary = f"""
Processing completed successfully!
- URLs processed: {sum(1 for d in dataset if d['source'] == 'url')}
- Files processed: {sum(1 for d in dataset if d['source'] == 'file')}
- Text inputs processed: {sum(1 for d in dataset if d['source'] == 'input')}
"""
return [output_file, summary]
except Exception as e:
error_msg = f"Error during processing: {str(e)}"
logger.error(error_msg)
return [None, error_msg]
# Connect the interface
process_btn.click(
fn=process_all_inputs,
inputs=[url_input, file_input, text_input],
outputs=[output_file, output_text]
)
# Add comprehensive instructions
gr.Markdown("""
## Instructions
1. **URL Input**:
- Enter URLs separated by commas or new lines
- URLs must start with http:// or https://
- Invalid URLs will be skipped
2. **File Input**:
- Upload text files or ZIP archives
- Supported formats: .txt, .zip, .md, .csv, .json, .xml
- Maximum file size: 10MB
3. **Text Input**:
- Directly enter or paste text
- Text will be automatically formatted
4. Click 'Process Input' to generate the dataset
The tool will combine all valid inputs into a single JSON dataset file.
""")
return interface
if __name__ == "__main__":
# Initialize mimetypes
mimetypes.init()
# Create and launch the interface
interface = create_interface()
interface.launch(
share=True,
server_name="0.0.0.0",
server_port=7860,
debug=True
)