Spaces:
Running
Running
import json | |
import os | |
import re | |
import time | |
import logging | |
import mimetypes | |
import tempfile | |
from datetime import datetime | |
from pathlib import Path | |
from urllib.parse import urlparse | |
from typing import List, Dict, Tuple, Union, Optional | |
import requests | |
import validators | |
import gradio as gr | |
from diskcache import Cache | |
from bs4 import BeautifulSoup | |
from fake_useragent import UserAgent | |
from cleantext import clean | |
import qrcode | |
import PyPDF2 | |
from PIL import Image | |
import pytesseract | |
import cv2 | |
import numpy as np | |
import fitz # PyMuPDF | |
import zipfile | |
# Setup logging with detailed configuration | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler('app.log', encoding='utf-8') | |
] | |
) | |
logger = logging.getLogger(__name__) | |
class URLProcessor: | |
def __init__(self): | |
self.session = requests.Session() | |
self.timeout = 10 # seconds | |
self.session.headers.update({ | |
'User-Agent': UserAgent().random, | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'Accept-Encoding': 'gzip, deflate, br', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1' | |
}) | |
self.supported_content_types = { | |
'text/html': self._fetch_html_content, | |
'application/pdf': self._fetch_pdf_content, | |
'image': self._fetch_image_content, | |
'application/json': self._fetch_json_content, | |
'text/plain': self._fetch_text_content | |
} | |
def advanced_text_cleaning(self, text: str) -> str: | |
"""Robust text cleaning with version compatibility""" | |
try: | |
cleaned_text = clean( | |
text, | |
fix_unicode=True, | |
to_ascii=True, | |
lower=True, | |
no_line_breaks=True, | |
no_urls=True, | |
no_emails=True, | |
no_phone_numbers=True, | |
no_numbers=False, | |
no_digits=False, | |
no_currency_symbols=True, | |
no_punct=False | |
).strip() | |
return cleaned_text | |
except Exception as e: | |
logger.warning(f"Text cleaning error: {e}. Using fallback method.") | |
text = re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text) # Remove control characters | |
text = text.encode('ascii', 'ignore').decode('ascii') # Remove non-ASCII characters | |
text = re.sub(r'\s+', ' ', text) # Normalize whitespace | |
return text.strip() | |
def validate_url(self, url: str) -> Dict: | |
"""Validate URL format and accessibility""" | |
try: | |
if not validators.url(url): | |
return {'is_valid': False, 'message': 'Invalid URL format'} | |
response = self.session.head(url, timeout=self.timeout) | |
response.raise_for_status() | |
return {'is_valid': True, 'message': 'URL is valid and accessible'} | |
except Exception as e: | |
return {'is_valid': False, 'message': f'URL validation failed: {str(e)}'} | |
def fetch_content(self, url: str) -> Optional[Dict]: | |
"""Universal content fetcher with enhanced content type handling""" | |
try: | |
# Special case handling | |
if 'drive.google.com' in url: | |
return self._handle_google_drive(url) | |
if 'calendar.google.com' in url and 'ical' in url: | |
return self._handle_google_calendar(url) | |
# Get content type | |
response = self.session.head(url, timeout=self.timeout) | |
content_type = response.headers.get('Content-Type', '').split(';')[0].lower() | |
# Find appropriate handler | |
handler = None | |
for supported_type, type_handler in self.supported_content_types.items(): | |
if content_type.startswith(supported_type): | |
handler = type_handler | |
break | |
if handler: | |
return handler(url) | |
else: | |
logger.warning(f"Unsupported content type: {content_type}") | |
return self._fetch_text_content(url) | |
except Exception as e: | |
logger.error(f"Content fetch failed: {e}") | |
return None | |
def _handle_google_drive(self, url: str) -> Optional[Dict]: | |
"""Process Google Drive file links""" | |
try: | |
file_id = re.search(r'/file/d/([a-zA-Z0-9_-]+)', url) | |
if not file_id: | |
logger.error(f"Invalid Google Drive URL: {url}") | |
return None | |
direct_url = f"https://drive.google.com/uc?export=download&id={file_id.group(1)}" | |
response = self.session.get(direct_url, timeout=self.timeout) | |
response.raise_for_status() | |
return { | |
'content': response.text, | |
'content_type': response.headers.get('Content-Type', ''), | |
'timestamp': datetime.now().isoformat() | |
} | |
except Exception as e: | |
logger.error(f"Google Drive processing failed: {e}") | |
return None | |
def _handle_google_calendar(self, url: str) -> Optional[Dict]: | |
"""Process Google Calendar ICS feeds""" | |
try: | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
return { | |
'content': response.text, | |
'content_type': 'text/calendar', | |
'timestamp': datetime.now().isoformat() | |
} | |
except Exception as e: | |
logger.error(f"Calendar fetch failed: {e}") | |
return None | |
def _fetch_html_content(self, url: str) -> Optional[Dict]: | |
"""Enhanced HTML content processing with metadata extraction""" | |
try: | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.text, 'html.parser') | |
# Remove unwanted elements | |
for element in soup(['script', 'style', 'nav', 'footer', 'header', 'meta', 'link']): | |
element.decompose() | |
# Extract main content | |
main_content = soup.find('main') or soup.find('article') or soup.body | |
# Extract metadata | |
metadata = { | |
'title': soup.title.string if soup.title else None, | |
'description': soup.find('meta', {'name': 'description'})['content'] if soup.find('meta', {'name': 'description'}) else None, | |
'keywords': soup.find('meta', {'name': 'keywords'})['content'] if soup.find('meta', {'name': 'keywords'}) else None, | |
'author': soup.find('meta', {'name': 'author'})['content'] if soup.find('meta', {'name': 'author'}) else None | |
} | |
# Clean and structure content | |
text_content = main_content.get_text(separator='\n', strip=True) | |
cleaned_content = self.advanced_text_cleaning(text_content) | |
return { | |
'content': cleaned_content, | |
'metadata': metadata, | |
'content_type': response.headers.get('Content-Type', ''), | |
'timestamp': datetime.now().isoformat() | |
} | |
except Exception as e: | |
logger.error(f"HTML processing failed: {e}") | |
return None | |
def _fetch_pdf_content(self, url: str) -> Optional[Dict]: | |
"""Process PDF content with enhanced metadata extraction""" | |
try: | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
with tempfile.NamedTemporaryFile(suffix='.pdf') as temp_file: | |
temp_file.write(response.content) | |
temp_file.flush() | |
# Extract text and metadata using PyMuPDF | |
doc = fitz.open(temp_file.name) | |
# Extract text with formatting preservation | |
text = "" | |
metadata = { | |
'title': doc.metadata.get('title'), | |
'author': doc.metadata.get('author'), | |
'subject': doc.metadata.get('subject'), | |
'keywords': doc.metadata.get('keywords'), | |
'creator': doc.metadata.get('creator'), | |
'producer': doc.metadata.get('producer'), | |
'page_count': len(doc), | |
'file_size': os.path.getsize(temp_file.name), | |
'version': doc.version | |
} | |
# Extract text with layout preservation | |
for page in doc: | |
blocks = page.get_text("blocks") | |
for block in blocks: | |
if block[6] == 0: # Text block | |
text += block[4] + "\n" | |
doc.close() | |
cleaned_content = self.advanced_text_cleaning(text) | |
return { | |
'content': cleaned_content, | |
'metadata': metadata, | |
'content_type': 'application/pdf', | |
'timestamp': datetime.now().isoformat() | |
} | |
except Exception as e: | |
logger.error(f"PDF processing failed: {e}") | |
return None | |
def _fetch_image_content(self, url: str) -> Optional[Dict]: | |
"""Process image content with OCR and advanced image processing""" | |
try: | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
with tempfile.NamedTemporaryFile(suffix='.jpg') as temp_file: | |
temp_file.write(response.content) | |
temp_file.flush() | |
# Load image with OpenCV | |
img = cv2.imread(temp_file.name) | |
if img is None: | |
raise ValueError("Failed to load image") | |
# Image preprocessing for better OCR | |
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
denoised = cv2.fastNlMeansDenoising(gray) | |
thresh = cv2.threshold(denoised, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1] | |
# Extract text using Tesseract | |
text = pytesseract.image_to_string(thresh) | |
cleaned_text = self.advanced_text_cleaning(text) if text else None | |
# Extract metadata and additional image features | |
with Image.open(temp_file.name) as pil_img: | |
exif = pil_img._getexif() if hasattr(pil_img, '_getexif') else None | |
metadata = { | |
'format': pil_img.format, | |
'mode': pil_img.mode, | |
'size': pil_img.size, | |
'exif': exif, | |
'image_features': { | |
'resolution': img.shape, | |
'channels': img.shape[2] if len(img.shape) > 2 else 1, | |
'mean_brightness': np.mean(gray), | |
'has_text': bool(cleaned_text and cleaned_text.strip()) | |
} | |
} | |
return { | |
'content': cleaned_text, | |
'metadata': metadata, | |
'content_type': response.headers.get('Content-Type', ''), | |
'timestamp': datetime.now().isoformat() | |
} | |
except Exception as e: | |
logger.error(f"Image processing failed: {e}") | |
return None | |
def _fetch_json_content(self, url: str) -> Optional[Dict]: | |
"""Process JSON content""" | |
try: | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
content = response.json() | |
return { | |
'content': json.dumps(content, indent=2), | |
'content_type': 'application/json', | |
'timestamp': datetime.now().isoformat() | |
} | |
except Exception as e: | |
logger.error(f"JSON processing failed: {e}") | |
return None | |
def _fetch_text_content(self, url: str) -> Optional[Dict]: | |
"""Process plain text content""" | |
try: | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
cleaned_content = self.advanced_text_cleaning(response.text) | |
return { | |
'content': cleaned_content, | |
'content_type': response.headers.get('Content-Type', ''), | |
'timestamp': datetime.now().isoformat() | |
} | |
except Exception as e: | |
logger.error(f"Text processing failed: {e}") | |
return None | |
class FileProcessor: | |
"""Class to handle file processing""" | |
def __init__(self, max_file_size: int = 2 * 1024 * 1024 * 1024): # 2GB default | |
self.max_file_size = max_file_size | |
self.supported_text_extensions = {'.txt', '.md', '.csv', '.json', '.xml'} | |
def is_text_file(self, filepath: str) -> bool: | |
"""Check if file is a text file""" | |
try: | |
mime_type, _ = mimetypes.guess_type(filepath) | |
return (mime_type and mime_type.startswith('text/')) or \ | |
(os.path.splitext(filepath)[1].lower() in self.supported_text_extensions) | |
except Exception: | |
return False | |
def process_file(self, file) -> List[Dict]: | |
"""Process uploaded file with enhanced error handling""" | |
if not file: | |
return [] | |
dataset = [] | |
try: | |
file_size = os.path.getsize(file.name) | |
if file_size > self.max_file_size: | |
logger.warning(f"File size ({file_size} bytes) exceeds maximum allowed size") | |
return [] | |
with tempfile.TemporaryDirectory() as temp_dir: | |
if zipfile.is_zipfile(file.name): | |
dataset.extend(self._process_zip_file(file.name, temp_dir)) | |
else: | |
dataset.extend(self._process_single_file(file)) | |
except Exception as e: | |
logger.error(f"Error processing file: {str(e)}") | |
return [] | |
return dataset | |
def _process_zip_file(self, zip_path: str, temp_dir: str) -> List[Dict]: | |
"""Process ZIP file contents""" | |
results = [] | |
with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
zip_ref.extractall(temp_dir) | |
for root, _, files in os.walk(temp_dir): | |
for filename in files: | |
filepath = os.path.join(root, filename) | |
if self.is_text_file(filepath): | |
try: | |
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read() | |
if content.strip(): | |
results.append({ | |
"source": "file", | |
"filename": filename, | |
"content": content, | |
"timestamp": datetime.now().isoformat() | |
}) | |
except Exception as e: | |
logger.error(f"Error reading file {filename}: {str(e)}") | |
return results | |
def _process_single_file(self, file) -> List[Dict]: | |
try: | |
file_stat = os.stat(file.name) | |
# For very large files, read in chunks and summarize | |
if file_stat.st_size > 100 * 1024 * 1024: # 100MB | |
logger.info(f"Processing large file: {file.name} ({file_stat.st_size} bytes)") | |
# Read first and last 1MB for extremely large files | |
content = "" | |
with open(file.name, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read(1 * 1024 * 1024) # First 1MB | |
content += "\n...[Content truncated due to large file size]...\n" | |
# Seek to the last 1MB | |
f.seek(max(0, file_stat.st_size - 1 * 1024 * 1024)) | |
content += f.read() # Last 1MB | |
else: | |
# Regular file processing | |
with open(file.name, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read() | |
return [{ | |
'source': 'file', | |
'filename': os.path.basename(file.name), | |
'file_size': file_stat.st_size, | |
'mime_type': mimetypes.guess_type(file.name)[0], | |
'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(), | |
'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(), | |
'content': content, | |
'timestamp': datetime.now().isoformat() | |
}] | |
except Exception as e: | |
logger.error(f"File processing error: {e}") | |
return [] | |
def generate_qr_code(json_data): | |
"""Generate a QR code from JSON data.""" | |
qr = qrcode.make(json_data) | |
qr_path = "output/qr_code.png" | |
qr.save(qr_path) | |
return qr_path | |
def create_interface(): | |
"""Create a comprehensive Gradio interface with advanced features""" | |
css = """ | |
.container { max-width: 1200px; margin: auto; } | |
.warning { background-color: #fff3cd; color: #856404; } | |
.error { background-color: #f8d7da; color: #721c24; } | |
""" | |
with gr.Blocks(css=css, title="Advanced Text & URL Processor") as interface: | |
gr.Markdown("# 🌐 Advanced URL & Text Processing Toolkit") | |
with gr.Tab("URL Processing"): | |
url_input = gr.Textbox( | |
label="Enter URLs (comma or newline separated)", | |
lines=5, | |
placeholder="https://example1.com\nhttps://example2.com" | |
) | |
with gr.Tab("File Input"): | |
file_input = gr.File( | |
label="Upload text file or ZIP archive", | |
file_types=[".txt", ".zip", ".md", ".csv", ".json", ".xml"] | |
) | |
with gr.Tab("Text Input"): | |
text_input = gr.Textbox( | |
label="Raw Text Input", | |
lines=5, | |
placeholder="Paste your text here..." | |
) | |
with gr.Tab("JSON Editor"): | |
json_editor = gr.Textbox( | |
label="JSON Editor", | |
lines=20, | |
placeholder="View and edit your JSON data here...", | |
interactive=True, | |
elem_id="json-editor" # Optional: for custom styling | |
) | |
with gr.Tab("Scratchpad"): | |
scratchpad = gr.Textbox( | |
label="Scratchpad", | |
lines=10, | |
placeholder="Quick notes or text collections...", | |
interactive=True | |
) | |
process_btn = gr.Button("Process Input", variant="primary") | |
qr_btn = gr.Button("Generate QR Code", variant="secondary") | |
output_text = gr.Textbox(label="Processing Results", interactive=False) | |
output_file = gr.File(label="Processed Output") | |
qr_output = gr.Image(label="QR Code", type="filepath") # To display the generated QR code | |
def process_all_inputs(urls, file, text, notes): | |
"""Process all input types with progress tracking""" | |
try: | |
processor = URLProcessor() | |
file_processor = FileProcessor() | |
results = [] | |
# Process URLs | |
if urls: | |
url_list = re.split(r'[,\n]', urls) | |
url_list = [url.strip() for url in url_list if url.strip()] | |
for url in url_list: | |
validation = processor.validate_url(url) | |
if validation.get('is_valid'): | |
content = processor.fetch_content(url) | |
if content: | |
results.append({ | |
'source': 'url', | |
'url': url, | |
'content': content, | |
'timestamp': datetime.now().isoformat() | |
}) | |
# Process files | |
if file: | |
results.extend(file_processor.process_file(file)) | |
# Process text input | |
if text: | |
cleaned_text = processor.advanced_text_cleaning(text) | |
results.append({ | |
'source': 'direct_input', | |
'content': cleaned_text, | |
'timestamp': datetime.now().isoformat() | |
}) | |
# Generate output | |
if results: | |
output_dir = Path('output') / datetime.now().strftime('%Y-%m-%d') | |
output_dir.mkdir(parents=True, exist_ok=True) | |
output_path = output_dir / f'processed_{int(time.time())}.json' | |
with open(output_path, 'w', encoding='utf-8') as f: | |
json.dump(results, f, ensure_ascii=False, indent=2) | |
summary = f"Processed {len(results)} items successfully!" | |
json_data = json.dumps(results, indent=2) # Prepare JSON for QR code | |
return str(output_path), summary, json_data # Return JSON for editor | |
else: | |
return None, "No valid content to process.", "" | |
except Exception as e: | |
logger.error(f"Processing error: {e}") | |
return None, f"Error: {str(e)}", "" | |
def generate_qr(json_data): | |
"""Generate QR code from JSON data and return the file path.""" | |
if json_data: | |
return generate_qr_code(json_data) | |
return None | |
process_btn.click( | |
process_all_inputs, | |
inputs=[url_input, file_input, text_input, scratchpad], | |
outputs=[output_file, output_text, json_editor] # Update outputs to include JSON editor | |
) | |
qr_btn.click( | |
generate_qr, | |
inputs=json_editor, | |
outputs=qr_output | |
) | |
gr.Markdown(""" | |
### Usage Guidelines | |
- **URL Processing**: Enter valid HTTP/HTTPS URLs | |
- **File Input**: Upload text files or ZIP archives | |
- **Text Input**: Direct text processing | |
- **JSON Editor**: View and edit your JSON data | |
- **Scratchpad**: Quick notes or text collections | |
- Advanced cleaning and validation included | |
""") | |
return interface | |
def main(): | |
# Configure system settings | |
mimetypes.init() | |
# Create and launch interface | |
interface = create_interface() | |
# Launch with proper configuration | |
interface.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
show_error=True, | |
share=False, | |
inbrowser=True, | |
debug=True | |
) | |
if __name__ == "__main__": | |
main() |