Spaces:
Running
Running
import json | |
import os | |
import torch | |
import string | |
import requests | |
from bs4 import BeautifulSoup | |
import tempfile | |
import zipfile | |
import mimetypes | |
from tqdm import tqdm | |
import logging | |
import gradio as gr | |
from typing import List, Dict, Union, Optional | |
from urllib.parse import urlparse | |
import concurrent.futures | |
import validators | |
from pathlib import Path | |
import re | |
# Setup logging with more detailed configuration | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler('app.log') | |
] | |
) | |
logger = logging.getLogger(__name__) | |
class URLProcessor: | |
"""Class to handle URL processing with advanced features""" | |
def __init__(self, timeout: int = 10, max_retries: int = 3, concurrent_requests: int = 5): | |
self.timeout = timeout | |
self.max_retries = max_retries | |
self.concurrent_requests = concurrent_requests | |
self.session = requests.Session() | |
# Add common headers to mimic browser behavior | |
self.session.headers.update({ | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' | |
}) | |
def validate_url(self, url: str) -> bool: | |
"""Validate URL format and accessibility""" | |
try: | |
result = urlparse(url) | |
return all([result.scheme, result.netloc]) and validators.url(url) | |
except Exception as e: | |
logger.warning(f"Invalid URL format: {url} - {str(e)}") | |
return False | |
def fetch_content(self, url: str) -> Optional[str]: | |
"""Fetch content from URL with retry mechanism""" | |
for attempt in range(self.max_retries): | |
try: | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
return response.text | |
except requests.RequestException as e: | |
logger.error(f"Attempt {attempt + 1}/{self.max_retries} failed for {url}: {str(e)}") | |
if attempt == self.max_retries - 1: | |
return None | |
time.sleep(1) # Delay between retries | |
def process_urls(self, urls: List[str]) -> List[Dict]: | |
"""Process multiple URLs concurrently""" | |
valid_urls = [url for url in urls if self.validate_url(url)] | |
if not valid_urls: | |
logger.warning("No valid URLs to process") | |
return [] | |
results = [] | |
with concurrent.futures.ThreadPoolExecutor(max_workers=self.concurrent_requests) as executor: | |
future_to_url = {executor.submit(self.fetch_content, url): url for url in valid_urls} | |
for future in concurrent.futures.as_completed(future_to_url): | |
url = future_to_url[future] | |
try: | |
html = future.result() | |
if html: | |
text = extract_text(html) | |
if text: | |
results.append({ | |
"source": "url", | |
"url": url, | |
"content": text, | |
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S") | |
}) | |
else: | |
logger.warning(f"No text content extracted from {url}") | |
except Exception as e: | |
logger.error(f"Error processing {url}: {str(e)}") | |
return results | |
def extract_text(html: str) -> str: | |
"""Enhanced text extraction with better cleaning""" | |
if not html: | |
return "" | |
soup = BeautifulSoup(html, 'html.parser') | |
# Remove unwanted elements | |
for element in soup(['script', 'style', 'header', 'footer', 'nav']): | |
element.decompose() | |
# Extract text with better formatting | |
text = soup.get_text(separator=' ') | |
# Clean up the text | |
lines = (line.strip() for line in text.splitlines()) | |
chunks = (phrase.strip() for line in lines for phrase in line.split(" ")) | |
text = ' '.join(chunk for chunk in chunks if chunk) | |
# Remove excessive whitespace | |
text = re.sub(r'\s+', ' ', text) | |
return text.strip() | |
class FileProcessor: | |
"""Class to handle file processing""" | |
def __init__(self, max_file_size: int = 10 * 1024 * 1024): # 10MB default | |
self.max_file_size = max_file_size | |
self.supported_text_extensions = {'.txt', '.md', '.csv', '.json', '.xml'} | |
def is_text_file(self, filepath: str) -> bool: | |
"""Check if file is a text file""" | |
try: | |
mime_type, _ = mimetypes.guess_type(filepath) | |
return mime_type and mime_type.startswith('text/') | |
except Exception: | |
return False | |
def process_file(self, file) -> List[Dict]: | |
"""Process uploaded file with enhanced error handling""" | |
if not file: | |
return [] | |
dataset = [] | |
try: | |
file_size = os.path.getsize(file.name) | |
if file_size > self.max_file_size: | |
logger.warning(f"File size ({file_size} bytes) exceeds maximum allowed size") | |
return [] | |
with tempfile.TemporaryDirectory() as temp_dir: | |
if zipfile.is_zipfile(file.name): | |
dataset.extend(self._process_zip_file(file.name, temp_dir)) | |
else: | |
dataset.extend(self._process_single_file(file)) | |
except Exception as e: | |
logger.error(f"Error processing file: {str(e)}") | |
return [] | |
return dataset | |
def _process_zip_file(self, zip_path: str, temp_dir: str) -> List[Dict]: | |
"""Process ZIP file contents""" | |
results = [] | |
with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
zip_ref.extractall(temp_dir) | |
for root, _, files in os.walk(temp_dir): | |
for filename in files: | |
filepath = os.path.join(root, filename) | |
if self.is_text_file(filepath): | |
try: | |
with open(filepath, 'r', errors='ignore') as f: | |
content = f.read() | |
if content.strip(): | |
results.append({ | |
"source": "file", | |
"filename": filename, | |
"content": content, | |
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S") | |
}) | |
except Exception as e: | |
logger.error(f"Error reading file {filename}: {str(e)}") | |
return results | |
def _process_single_file(self, file) -> List[Dict]: | |
"""Process single file""" | |
results = [] | |
try: | |
content = file.read().decode('utf-8', errors='ignore') | |
if content.strip(): | |
results.append({ | |
"source": "file", | |
"filename": os.path.basename(file.name), | |
"content": content, | |
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S") | |
}) | |
except Exception as e: | |
logger.error(f"Error processing single file: {str(e)}") | |
return results | |
def preprocess_bulk_text(text: str) -> str: | |
"""Enhanced text preprocessing""" | |
if not text: | |
return "" | |
# Normalize line endings | |
text = text.replace('\r\n', '\n').replace('\r', '\n') | |
# Define separators | |
separators = ['\n', ' / ', '/', ';', ' - ', '|', ' '] | |
# Replace separators with commas if not already comma-separated | |
if ',' not in text: | |
for separator in separators: | |
text = text.replace(separator, ',') | |
# Handle domain endings | |
domain_pattern = r'(\.[a-z]{2,})\s+' | |
text = re.sub(domain_pattern, r'\1,', text) | |
# Clean up multiple commas and whitespace | |
text = re.sub(r',+', ',', text) | |
text = text.strip(',' + string.whitespace) | |
text = re.sub(r'\s*,\s*', ', ', text) | |
return text | |
def create_interface(): | |
"""Create enhanced Gradio interface""" | |
# Custom CSS for better styling | |
custom_css = """ | |
.container { max-width: 1200px; margin: auto; padding: 20px; } | |
.output-panel { margin-top: 20px; } | |
.warning { color: #856404; background-color: #fff3cd; padding: 10px; border-radius: 4px; } | |
.error { color: #721c24; background-color: #f8d7da; padding: 10px; border-radius: 4px; } | |
""" | |
with gr.Blocks(css=custom_css) as interface: | |
gr.Markdown("# Advanced URL and Text Processing Tool") | |
with gr.Tab("URL Input"): | |
url_input = gr.Textbox( | |
label="Enter URLs (comma-separated or one per line)", | |
placeholder="https://example1.com, https://example2.com", | |
lines=5 | |
) | |
with gr.Tab("File Input"): | |
file_input = gr.File( | |
label="Upload text file or ZIP archive", | |
file_types=[".txt", ".zip", ".md", ".csv", ".json", ".xml"] | |
) | |
with gr.Tab("Text Input"): | |
text_input = gr.Textbox( | |
label="Enter text directly", | |
placeholder="Enter your text here...", | |
lines=5 | |
) | |
# Process button with loading state | |
process_btn = gr.Button("Process Input", variant="primary") | |
# Output components | |
with gr.Row(): | |
output_file = gr.File(label="Processed Dataset") | |
output_text = gr.Textbox( | |
label="Processing Results", | |
lines=3, | |
interactive=False | |
) | |
def process_all_inputs(urls, file, text): | |
"""Process all input types with progress tracking""" | |
try: | |
dataset = [] | |
# Process URLs | |
if urls: | |
url_processor = URLProcessor() | |
url_list = [u.strip() for u in urls.split(',') if u.strip()] | |
dataset.extend(url_processor.process_urls(url_list)) | |
# Process files | |
if file: | |
file_processor = FileProcessor() | |
dataset.extend(file_processor.process_file(file)) | |
# Process text input | |
if text: | |
processed_text = preprocess_bulk_text(text) | |
if processed_text: | |
dataset.append({ | |
"source": "input", | |
"content": processed_text, | |
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S") | |
}) | |
if not dataset: | |
return [None, "No valid data to process. Please check your inputs."] | |
# Save results | |
output_file = 'processed_dataset.json' | |
with open(output_file, 'w', encoding='utf-8') as f: | |
json.dump(dataset, f, indent=2, ensure_ascii=False) | |
# Generate summary | |
summary = f""" | |
Processing completed successfully! | |
- URLs processed: {sum(1 for d in dataset if d['source'] == 'url')} | |
- Files processed: {sum(1 for d in dataset if d['source'] == 'file')} | |
- Text inputs processed: {sum(1 for d in dataset if d['source'] == 'input')} | |
""" | |
return [output_file, summary] | |
except Exception as e: | |
error_msg = f"Error during processing: {str(e)}" | |
logger.error(error_msg) | |
return [None, error_msg] | |
# Connect the interface | |
process_btn.click( | |
fn=process_all_inputs, | |
inputs=[url_input, file_input, text_input], | |
outputs=[output_file, output_text] | |
) | |
# Add comprehensive instructions | |
gr.Markdown(""" | |
## Instructions | |
1. **URL Input**: | |
- Enter URLs separated by commas or new lines | |
- URLs must start with http:// or https:// | |
- Invalid URLs will be skipped | |
2. **File Input**: | |
- Upload text files or ZIP archives | |
- Supported formats: .txt, .zip, .md, .csv, .json, .xml | |
- Maximum file size: 10MB | |
3. **Text Input**: | |
- Directly enter or paste text | |
- Text will be automatically formatted | |
4. Click 'Process Input' to generate the dataset | |
The tool will combine all valid inputs into a single JSON dataset file. | |
""") | |
return interface | |
if __name__ == "__main__": | |
# Initialize mimetypes | |
mimetypes.init() | |
# Create and launch the interface | |
interface = create_interface() | |
interface.launch( | |
share=True, | |
server_name="0.0.0.0", | |
server_port=7860, | |
debug=True | |
) |