Spaces:
Running
Running
import base64 | |
import gradio as gr | |
import hashlib | |
import io | |
import json | |
import logging | |
import mimetypes | |
import os | |
from PIL import Image | |
import qrcode# Setup logging | |
import random | |
import re | |
import requests | |
import tempfile | |
import time | |
import validators | |
import zipfile | |
import zxing | |
from bs4 import BeautifulSoup | |
from cleantext import clean | |
from datetime import datetime | |
from fake_useragent import UserAgent | |
from pathlib import Path | |
from qr_processor import QRProcessor | |
from selenium import webdriver | |
from typing import List, Dict, Optional, Union, Any | |
from url_processor import URLProcessor | |
from urllib.parse import urlparse | |
# Configure logging | |
import logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger('App') | |
# URLProcessor class | |
# =================== | |
class URLProcessor: | |
"""Class to handle URL processing with advanced features""" | |
def __init__(self, request_delay: float = 1.0, timeout: int = 30, max_retries: int = 3, respect_robots: bool = True): | |
self.request_delay = request_delay | |
self.timeout = timeout | |
self.max_retries = max_retries | |
self.respect_robots = respect_robots | |
self.rate_limits = {} # Domain -> (last_access_time, count) | |
# Initialize session with rotating user agents | |
self.session = requests.Session() | |
self.update_user_agent() | |
# Selenium driver (lazy initialization) | |
self._driver = None | |
def update_user_agent(self): | |
"""Rotate user agent to avoid detection""" | |
user_agents = [ | |
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36', | |
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15', | |
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0' | |
] | |
self.session.headers.update({ | |
'User-Agent': random.choice(user_agents), | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1', | |
'Pragma': 'no-cache', | |
'Cache-Control': 'no-cache', | |
}) | |
def get_selenium_driver(self): | |
"""Get or create Selenium WebDriver with proper settings""" | |
if self._driver is not None: | |
return self._driver | |
try: | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.chrome.service import Service | |
from webdriver_manager.chrome import ChromeDriverManager | |
options = Options() | |
options.add_argument('--headless') | |
options.add_argument('--no-sandbox') | |
options.add_argument('--disable-dev-shm-usage') | |
options.add_argument('--disable-gpu') | |
options.add_argument('--window-size=1920,1080') | |
options.add_argument(f'user-agent={self.session.headers["User-Agent"]}') | |
service = Service(ChromeDriverManager().install()) | |
self._driver = webdriver.Chrome(service=service, options=options) | |
return self._driver | |
except Exception as e: | |
logger.error(f"Failed to initialize Selenium: {e}") | |
return None | |
def close(self): | |
"""Close resources""" | |
if self._driver is not None: | |
self._driver.quit() | |
self._driver = None | |
def handle_rate_limits(self, url: str): | |
"""Implement rate limiting per domain""" | |
parsed_url = urlparse(url) | |
parsed_domain = parsed_url.netloc | |
current_time = time.time() | |
if parsed_domain in self.rate_limits: | |
last_access, count = self.rate_limits[parsed_domain] | |
# Determine appropriate delay based on domain | |
min_delay = self.request_delay | |
if "linkedin.com" in parsed_domain: | |
min_delay = 5.0 # LinkedIn is sensitive to scraping | |
elif "gov" in parsed_domain: | |
min_delay = 2.0 # Be respectful with government sites | |
else: | |
min_delay = self.request_delay | |
# Exponential backoff if we're making many requests | |
if count > 10: | |
min_delay *= 2 | |
# Wait if needed | |
elapsed = current_time - last_access | |
if elapsed < min_delay: | |
time.sleep(min_delay - elapsed) | |
# Update count | |
self.rate_limits[parsed_domain] = (time.time(), count + 1) | |
else: | |
# First time accessing this domain | |
self.rate_limits[parsed_domain] = (current_time, 1) | |
def handle_interactive_site(self, url): | |
"""Handle sites that require interaction to bypass blocks""" | |
driver = self.get_selenium_driver() | |
if not driver: | |
return None | |
try: | |
driver.get(url) | |
# Wait for page to load | |
import time | |
time.sleep(3) | |
# Handle different types of sites | |
if "facebook.com" in url or "instagram.com" in url: | |
self._handle_social_media_site(driver) | |
elif "google.com" in url: | |
self._handle_google_site(driver) | |
# Get the page source after interaction | |
page_source = driver.page_source | |
return { | |
'content': page_source, | |
'content_type': 'text/html', | |
'url': url, | |
'title': driver.title | |
} | |
except Exception as e: | |
logger.error(f"Error handling interactive site {url}: {e}") | |
return None | |
def _handle_social_media_site(self, driver): | |
"""Handle Facebook/Instagram login walls""" | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.common.keys import Keys | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
try: | |
# Try to find and close login popups | |
close_buttons = driver.find_elements(By.XPATH, "//button[contains(@aria-label, 'Close')]") | |
if close_buttons: | |
close_buttons[0].click() | |
time.sleep(1) | |
# Press ESC key to dismiss popups | |
webdriver.ActionChains(driver).send_keys(Keys.ESCAPE).perform() | |
time.sleep(1) | |
# Scroll down to load more content | |
driver.execute_script("window.scrollTo(0, document.body.scrollHeight/2);") | |
time.sleep(2) | |
except Exception as e: | |
logger.warning(f"Error handling social media site: {e}") | |
def _handle_google_site(self, driver): | |
"""Handle Google authentication and consent pages""" | |
from selenium.webdriver.common.by import By | |
try: | |
# Look for consent buttons | |
consent_buttons = driver.find_elements(By.XPATH, "//button[contains(text(), 'Accept all')]") | |
if consent_buttons: | |
consent_buttons[0].click() | |
time.sleep(1) | |
# Look for "I agree" buttons | |
agree_buttons = driver.find_elements(By.XPATH, "//button[contains(text(), 'I agree')]") | |
if agree_buttons: | |
agree_buttons[0].click() | |
time.sleep(1) | |
except Exception as e: | |
logger.warning(f"Error handling Google site: {e}") | |
def check_robots_txt(self, url: str) -> bool: | |
"""Check if URL is allowed by robots.txt""" | |
if not self.respect_robots: | |
return True | |
try: | |
from urllib.parse import urlparse | |
from urllib.robotparser import RobotFileParser | |
parsed_url = urlparse(url) | |
robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt" | |
rp = RobotFileParser() | |
rp.set_url(robots_url) | |
rp.read() | |
return rp.can_fetch(self.session.headers['User-Agent'], url) | |
except Exception as e: | |
logger.warning(f"Error checking robots.txt: {e}") | |
return True | |
def fetch_content(self, url: str) -> Optional[Dict]: | |
"""Universal content fetcher with special case handling""" | |
try: | |
if 'drive.google.com' in url: | |
return self._handle_google_drive(url) | |
if 'calendar.google.com' in url and 'ical' in url: | |
return self._handle_google_calendar(url) | |
return self._fetch_html_content(url) | |
except Exception as e: | |
logger.error(f"Content fetch failed: {e}") | |
return None | |
def _handle_google_drive(self, url: str) -> Optional[Dict]: | |
"""Process Google Drive file links""" | |
try: | |
file_id = re.search(r'/file/d/([a-zA-Z0-9_-]+)', url) | |
if not file_id: | |
logger.error(f"Invalid Google Drive URL: {url}") | |
return None | |
direct_url = f"https://drive.google.com/uc?export=download&id={file_id.group(1)}" | |
response = self.session.get(direct_url, timeout=self.timeout) | |
response.raise_for_status() | |
return { | |
'content': response.text, | |
'content_type': response.headers.get('Content-Type', ''), | |
'timestamp': datetime.now().isoformat() | |
} | |
except Exception as e: | |
logger.error(f"Google Drive processing failed: {e}") | |
return None | |
def _handle_google_calendar(self, url: str) -> Optional[Dict]: | |
"""Process Google Calendar ICS feeds""" | |
try: | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
return { | |
'content': response.text, | |
'content_type': 'text/calendar', | |
'timestamp': datetime.now().isoformat() | |
} | |
except Exception as e: | |
logger.error(f"Calendar fetch failed: {e}") | |
return None | |
def _fetch_html_content(self, url: str) -> Optional[Dict]: | |
"""Enhanced HTML content processing to extract everything""" | |
try: | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
# Store the original HTML | |
original_html = response.text | |
# Parse with BeautifulSoup | |
soup = BeautifulSoup(response.text, 'html.parser') | |
# Extract all text content | |
text_content = soup.get_text(separator='\n', strip=True) | |
# Extract all links | |
links = [] | |
for link in soup.find_all('a', href=True): | |
href = link['href'] | |
# Convert relative URLs to absolute | |
if href.startswith('/'): | |
from urllib.parse import urlparse, urljoin | |
parsed_url = urlparse(url) | |
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" | |
href = urljoin(base_url, href) | |
link_text = link.get_text(strip=True) | |
links.append({ | |
'url': href, | |
'text': link_text if link_text else '[No text]' | |
}) | |
# Extract all images | |
images = [] | |
for img in soup.find_all('img', src=True): | |
src = img['src'] | |
# Convert relative URLs to absolute | |
if src.startswith('/'): | |
from urllib.parse import urlparse, urljoin | |
parsed_url = urlparse(url) | |
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" | |
src = urljoin(base_url, src) | |
alt_text = img.get('alt', '') | |
images.append({ | |
'src': src, | |
'alt': alt_text if alt_text else '[No alt text]' | |
}) | |
# Extract all scripts | |
scripts = [] | |
for script in soup.find_all('script'): | |
script_content = script.string | |
if script_content: | |
scripts.append(script_content) | |
# Extract all styles | |
styles = [] | |
for style in soup.find_all('style'): | |
style_content = style.string | |
if style_content: | |
styles.append(style_content) | |
# Extract metadata | |
metadata = {} | |
for meta in soup.find_all('meta'): | |
if meta.get('name') and meta.get('content'): | |
metadata[meta['name']] = meta['content'] | |
elif meta.get('property') and meta.get('content'): | |
metadata[meta['property']] = meta['content'] | |
# Extract title | |
title = soup.title.string if soup.title else '' | |
# Return comprehensive data | |
return { | |
'url': url, | |
'title': title, | |
'metadata': metadata, | |
'content': text_content, | |
'html': original_html, | |
'links': links, | |
'images': images, | |
'scripts': scripts, | |
'styles': styles, | |
'content_type': response.headers.get('Content-Type', ''), | |
'timestamp': datetime.now().isoformat() | |
} | |
except Exception as e: | |
logger.error(f"HTML processing failed: {e}") | |
return None | |
def advanced_text_cleaning(self, text: str) -> str: | |
"""Robust text cleaning with version compatibility""" | |
try: | |
# Try to use cleantext if available | |
import importlib.util | |
if importlib.util.find_spec("cleantext") is not None: | |
from cleantext import clean | |
cleaned_text = clean( | |
text, | |
fix_unicode=True, | |
to_ascii=True, | |
lower=True, | |
no_line_breaks=True, | |
no_urls=True, | |
no_emails=True, | |
no_phone_numbers=True, | |
no_numbers=False, | |
no_digits=False, | |
no_currency_symbols=True, | |
no_punct=False | |
).strip() | |
return cleaned_text | |
else: | |
# Fallback cleaning | |
text = re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text) | |
text = text.encode('ascii', 'ignore').decode('ascii') | |
text = re.sub(r'\s+', ' ', text) | |
return text.strip() | |
except Exception as e: | |
logger.warning(f"Text cleaning error: {e}") | |
return text.strip() if text else "" | |
def process_urls(self, urls: List[str], mode: str = 'basic') -> List[Dict]: | |
"""Process a list of URLs with different modes""" | |
results = [] | |
for url in urls: | |
# Validate URL | |
if not validators.url(url): | |
results.append({ | |
'url': url, | |
'error': 'Invalid URL format', | |
'timestamp': datetime.now().isoformat() | |
}) | |
continue | |
# Check robots.txt | |
if not self.check_robots_txt(url): | |
results.append({ | |
'url': url, | |
'error': 'Access disallowed by robots.txt', | |
'timestamp': datetime.now().isoformat() | |
}) | |
continue | |
# Apply rate limiting | |
self.handle_rate_limits(url) | |
# Process based on mode | |
try: | |
if mode == 'basic': | |
content = self.fetch_content(url) | |
if content: | |
results.append(content) | |
else: | |
results.append({ | |
'url': url, | |
'error': 'Failed to fetch content', | |
'timestamp': datetime.now().isoformat() | |
}) | |
elif mode == 'interactive': | |
content = self.handle_interactive_site(url) | |
if content: | |
results.append(content) | |
else: | |
# Fallback to basic mode | |
content = self.fetch_content(url) | |
if content: | |
results.append(content) | |
else: | |
results.append({ | |
'url': url, | |
'error': 'Failed to fetch content in interactive mode', | |
'timestamp': datetime.now().isoformat() | |
}) | |
elif mode == 'deep': | |
# Deep mode: get main content and follow some links | |
main_content = self.fetch_content(url) | |
if not main_content: | |
results.append({ | |
'url': url, | |
'error': 'Failed to fetch main content', | |
'timestamp': datetime.now().isoformat() | |
}) | |
continue | |
results.append(main_content) | |
# Follow up to 5 links from the main page | |
if 'links' in main_content and main_content['links']: | |
followed_count = 0 | |
for link_data in main_content['links'][:10]: # Consider first 10 links | |
link_url = link_data['url'] | |
# Skip external links and non-http(s) links | |
if not link_url.startswith(('http://', 'https://')): | |
continue | |
# Skip if not same domain | |
main_domain = urlparse(url).netloc | |
link_domain = urlparse(link_url).netloc | |
if main_domain != link_domain: | |
continue | |
# Apply rate limiting | |
self.handle_rate_limits(link_url) | |
# Fetch the linked content | |
link_content = self.fetch_content(link_url) | |
if link_content: | |
results.append(link_content) | |
followed_count += 1 | |
# Limit to 5 followed links | |
if followed_count >= 5: | |
break | |
except Exception as e: | |
logger.error(f"Error processing URL {url}: {e}") | |
results.append({ | |
'url': url, | |
'error': f"Processing error: {str(e)}", | |
'timestamp': datetime.now().isoformat() | |
}) | |
# FileProcessor class | |
# =================== | |
class FileProcessor: | |
"""Class to handle file processing with enhanced capabilities""" | |
def __init__(self, max_file_size: int = 2 * 1024 * 1024 * 1024): # 2GB default | |
self.max_file_size = max_file_size | |
self.supported_text_extensions = {'.txt', '.md', '.csv', '.json', '.xml', '.html', '.htm', '.js', '.css', '.py', '.java', '.c', '.cpp', '.h', '.rb', '.php', '.sql', '.yaml', '.yml', '.ini', '.cfg', '.conf', '.log', '.sh', '.bat', '.ps1'} | |
self.supported_binary_extensions = {'.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.zip', '.tar', '.gz', '.rar', '.7z', '.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.mp3', '.mp4', '.avi', '.mov', '.wmv', '.flv', '.wav', '.ogg'} | |
def is_text_file(self, filepath: str) -> bool: | |
"""Check if file is a text file""" | |
try: | |
mime_type, _ = mimetypes.guess_type(filepath) | |
ext = os.path.splitext(filepath)[1].lower() | |
# Check by extension first | |
if ext in self.supported_text_extensions: | |
return True | |
# Then check by mime type | |
if mime_type and mime_type.startswith('text/'): | |
return True | |
# Try to read the file as text | |
if os.path.exists(filepath) and os.path.getsize(filepath) < 1024 * 1024: # Only try for files < 1MB | |
try: | |
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: | |
sample = f.read(1024) # Read first 1KB | |
# Check if it's mostly printable ASCII | |
printable_ratio = sum(c.isprintable() for c in sample) / len(sample) if sample else 0 | |
return printable_ratio > 0.8 | |
except Exception: | |
pass | |
return False | |
except Exception as e: | |
logger.error(f"Error checking if file is text: {e}") | |
return False | |
def process_file(self, file) -> List[Dict]: | |
"""Process uploaded file with enhanced error handling and binary support""" | |
if not file: | |
return [{"error": "No file provided"}] | |
dataset = [] | |
try: | |
file_size = os.path.getsize(file.name) | |
if file_size > self.max_file_size: | |
logger.warning(f"File size ({file_size} bytes) exceeds maximum allowed size") | |
return [{"error": f"File size ({file_size} bytes) exceeds maximum allowed size of {self.max_file_size} bytes"}] | |
with tempfile.TemporaryDirectory() as temp_dir: | |
# Check if it's an archive file | |
if zipfile.is_zipfile(file.name): | |
dataset.extend(self._process_zip_file(file.name, temp_dir)) | |
elif file.name.endswith('.tar.gz') or file.name.endswith('.tgz'): | |
dataset.extend(self._process_tar_file(file.name, temp_dir)) | |
elif file.name.endswith('.rar'): | |
dataset.extend(self._process_rar_file(file.name, temp_dir)) | |
elif file.name.endswith('.7z'): | |
dataset.extend(self._process_7z_file(file.name, temp_dir)) | |
# Check if it's a document file | |
elif file.name.endswith(('.doc', '.docx')): | |
dataset.extend(self._process_word_file(file.name)) | |
elif file.name.endswith(('.xls', '.xlsx')): | |
dataset.extend(self._process_excel_file(file.name)) | |
elif file.name.endswith(('.ppt', '.pptx')): | |
dataset.extend(self._process_powerpoint_file(file.name)) | |
elif file.name.endswith('.pdf'): | |
dataset.extend(self._process_pdf_file(file.name)) | |
# Check if it's an image file | |
elif file.name.endswith(('.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff')): | |
dataset.extend(self._process_image_file(file.name)) | |
# Check if it's an audio/video file | |
elif file.name.endswith(('.mp3', '.wav', '.ogg', '.mp4', '.avi', '.mov', '.wmv', '.flv')): | |
dataset.extend(self._process_media_file(file.name)) | |
# Default to text file processing | |
else: | |
dataset.extend(self._process_single_file(file)) | |
if not dataset: | |
return [{"warning": "No extractable content found in the file"}] | |
except Exception as e: | |
logger.error(f"Error processing file: {str(e)}") | |
return [{"error": f"Error processing file: {str(e)}"}] | |
return dataset | |
def _process_zip_file(self, zip_path: str, temp_dir: str) -> List[Dict]: | |
"""Process ZIP file contents with enhanced extraction""" | |
results = [] | |
try: | |
with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
# Get file list first | |
file_list = zip_ref.namelist() | |
total_files = len(file_list) | |
# Extract all files | |
zip_ref.extractall(temp_dir) | |
# Process each file | |
processed_count = 0 | |
for root, dirs, files in os.walk(temp_dir): | |
for filename in files: | |
filepath = os.path.join(root, filename) | |
rel_path = os.path.relpath(filepath, temp_dir) | |
# Get file info from zip | |
try: | |
zip_info = zip_ref.getinfo(rel_path.replace('\\', '/')) | |
file_size = zip_info.file_size | |
compressed_size = zip_info.compress_size | |
compression_ratio = (1 - compressed_size / file_size) * 100 if file_size > 0 else 0 | |
except Exception: | |
file_size = os.path.getsize(filepath) | |
compressed_size = None | |
compression_ratio = None | |
# Process based on file type | |
if self.is_text_file(filepath): | |
try: | |
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read() | |
results.append({ | |
"source": "zip", | |
"archive": os.path.basename(zip_path), | |
"filename": filename, | |
"path": rel_path, | |
"size": file_size, | |
"compressed_size": compressed_size, | |
"compression_ratio": f"{compression_ratio:.2f}%" if compression_ratio is not None else None, | |
"content": content, | |
"timestamp": datetime.now().isoformat() | |
}) | |
processed_count += 1 | |
except Exception as e: | |
logger.error(f"Error reading file {filename}: {str(e)}") | |
else: | |
# For binary files, just record metadata | |
mime_type, _ = mimetypes.guess_type(filepath) | |
results.append({ | |
"source": "zip", | |
"archive": os.path.basename(zip_path), | |
"filename": filename, | |
"path": rel_path, | |
"size": file_size, | |
"compressed_size": compressed_size, | |
"compression_ratio": f"{compression_ratio:.2f}%" if compression_ratio is not None else None, | |
"mime_type": mime_type, | |
"content": f"[Binary file: {mime_type or 'unknown type'}]", | |
"timestamp": datetime.now().isoformat() | |
}) | |
processed_count += 1 | |
# Add summary | |
results.append({ | |
"source": "zip_summary", | |
"archive": os.path.basename(zip_path), | |
"total_files": total_files, | |
"processed_files": processed_count, | |
"timestamp": datetime.now().isoformat() | |
}) | |
except Exception as e: | |
logger.error(f"Error processing ZIP file: {str(e)}") | |
results.append({"error": f"Error processing ZIP file: {str(e)}"}) | |
return results | |
def _process_tar_file(self, tar_path: str, temp_dir: str) -> List[Dict]: | |
"""Process TAR/GZ file contents""" | |
results = [] | |
try: | |
import tarfile | |
with tarfile.open(tar_path, 'r:*') as tar: | |
# Get file list | |
file_list = tar.getnames() | |
total_files = len(file_list) | |
# Extract all files | |
tar.extractall(temp_dir) | |
# Process each file | |
processed_count = 0 | |
for root, dirs, files in os.walk(temp_dir): | |
for filename in files: | |
filepath = os.path.join(root, filename) | |
rel_path = os.path.relpath(filepath, temp_dir) | |
# Process based on file type | |
if self.is_text_file(filepath): | |
try: | |
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read() | |
results.append({ | |
"source": "tar", | |
"archive": os.path.basename(tar_path), | |
"filename": filename, | |
"path": rel_path, | |
"size": os.path.getsize(filepath), | |
"content": content, | |
"timestamp": datetime.now().isoformat() | |
}) | |
processed_count += 1 | |
except Exception as e: | |
logger.error(f"Error reading file {filename}: {str(e)}") | |
else: | |
# For binary files, just record metadata | |
mime_type, _ = mimetypes.guess_type(filepath) | |
results.append({ | |
"source": "tar", | |
"archive": os.path.basename(tar_path), | |
"filename": filename, | |
"path": rel_path, | |
"size": os.path.getsize(filepath), | |
"mime_type": mime_type, | |
"content": f"[Binary file: {mime_type or 'unknown type'}]", | |
"timestamp": datetime.now().isoformat() | |
}) | |
processed_count += 1 | |
# Add summary | |
results.append({ | |
"source": "tar_summary", | |
"archive": os.path.basename(tar_path), | |
"total_files": total_files, | |
"processed_files": processed_count, | |
"timestamp": datetime.now().isoformat() | |
}) | |
except Exception as e: | |
logger.error(f"Error processing TAR file: {str(e)}") | |
results.append({"error": f"Error processing TAR file: {str(e)}"}) | |
return results | |
def _process_single_file(self, file) -> List[Dict]: | |
"""Process a single file with enhanced metadata extraction""" | |
try: | |
file_stat = os.stat(file.name) | |
file_path = file.name | |
filename = os.path.basename(file_path) | |
mime_type, _ = mimetypes.guess_type(file_path) | |
# For text files | |
if self.is_text_file(file_path): | |
if file_stat.st_size > 100 * 1024 * 1024: # 100MB | |
logger.info(f"Processing large file: {file_path} ({file_stat.st_size} bytes)") | |
content = "" | |
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read(1 * 1024 * 1024) # First 1MB | |
content += "\n...[Content truncated due to large file size]...\n" | |
f.seek(max(0, file_stat.st_size - 1 * 1024 * 1024)) | |
content += f.read() # Last 1MB | |
else: | |
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read() | |
return [{ | |
'source': 'file', | |
'filename': filename, | |
'file_size': file_stat.st_size, | |
'mime_type': mime_type, | |
'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(), | |
'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(), | |
'content': content, | |
'timestamp': datetime.now().isoformat() | |
}] | |
else: | |
# For binary files, extract metadata and try specialized extraction | |
if file_path.endswith(('.pdf', '.doc', '.docx')): | |
return self._process_document_file(file_path) | |
elif file_path.endswith(('.jpg', '.jpeg', '.png', '.gif', '.bmp')): | |
return self._process_image_file(file_path) | |
elif file_path.endswith(('.mp3', '.wav', '.ogg', '.mp4', '.avi', '.mov')): | |
return self._process_media_file(file_path) | |
else: | |
# Generic binary file handling | |
return [{ | |
'source': 'binary_file', | |
'filename': filename, | |
'file_size': file_stat.st_size, | |
'mime_type': mime_type, | |
'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(), | |
'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(), | |
'content': f"[Binary file: {mime_type or 'unknown type'}]", | |
'timestamp': datetime.now().isoformat() | |
}] | |
except Exception as e: | |
logger.error(f"File processing error: {e}") | |
return [{ | |
'source': 'error', | |
'filename': os.path.basename(file.name) if file else 'unknown', | |
'error': str(e), | |
'timestamp': datetime.now().isoformat() | |
}] | |
def _process_pdf_file(self, file_path: str) -> List[Dict]: | |
"""Extract text from PDF files""" | |
try: | |
# Try to import PyPDF2 module | |
import importlib.util | |
if importlib.util.find_spec("PyPDF2") is None: | |
return [{ | |
"error": "PDF processing requires the 'PyPDF2' module. Install with 'pip install PyPDF2'." | |
}] | |
import PyPDF2 | |
with open(file_path, 'rb') as file: | |
reader = PyPDF2.PdfReader(file) | |
num_pages = len(reader.pages) | |
# Extract text from each page | |
all_text = "" | |
page_texts = [] | |
for i in range(num_pages): | |
page = reader.pages[i] | |
text = page.extract_text() | |
all_text += text + "\n\n" | |
page_texts.append({ | |
"page_number": i + 1, | |
"content": text | |
}) | |
# Get file metadata | |
file_stat = os.stat(file_path) | |
return [{ | |
"source": "pdf", | |
"filename": os.path.basename(file_path), | |
"file_size": file_stat.st_size, | |
"mime_type": "application/pdf", | |
"created": datetime.fromtimestamp(file_stat.st_ctime).isoformat(), | |
"modified": datetime.fromtimestamp(file_stat.st_mtime).isoformat(), | |
"num_pages": num_pages, | |
"content": all_text, | |
"pages": page_texts, | |
"timestamp": datetime.now().isoformat() | |
}] | |
except Exception as e: | |
logger.error(f"Error processing PDF file: {str(e)}") | |
return [{ | |
"source": "error", | |
"filename": os.path.basename(file_path), | |
"error": f"Error processing PDF file: {str(e)}", | |
"timestamp": datetime.now().isoformat() | |
}] | |
def _process_image_file(self, file_path: str) -> List[Dict]: | |
"""Extract metadata and attempt OCR on image files""" | |
try: | |
# Try to import PIL module | |
import importlib.util | |
if importlib.util.find_spec("PIL") is None: | |
return [{ | |
"error": "Image processing requires the 'Pillow' module. Install with 'pip install Pillow'." | |
}] | |
from PIL import Image | |
# Open image and get basic metadata | |
with Image.open(file_path) as img: | |
width, height = img.size | |
format_name = img.format | |
mode = img.mode | |
# Extract EXIF data if available | |
exif_data = {} | |
if hasattr(img, '_getexif') and img._getexif(): | |
exif = img._getexif() | |
if exif: | |
for tag_id, value in exif.items(): | |
tag_name = f"tag_{tag_id}" | |
exif_data[tag_name] = str(value) | |
# Try OCR if pytesseract is available | |
ocr_text = None | |
if importlib.util.find_spec("pytesseract") is not None: | |
try: | |
import pytesseract | |
ocr_text = pytesseract.image_to_string(img) | |
except Exception as e: | |
logger.warning(f"OCR failed: {e}") | |
# Get file metadata | |
file_stat = os.stat(file_path) | |
return [{ | |
"source": "image", | |
"filename": os.path.basename(file_path), | |
"file_size": file_stat.st_size, | |
"mime_type": f"image/{format_name.lower()}" if format_name else "image/unknown", | |
"created": datetime.fromtimestamp(file_stat.st_ctime).isoformat(), | |
"modified": datetime.fromtimestamp(file_stat.st_mtime).isoformat(), | |
"width": width, | |
"height": height, | |
"format": format_name, | |
"mode": mode, | |
"exif": exif_data, | |
"ocr_text": ocr_text, | |
"content": ocr_text if ocr_text else f"[Image: {width}x{height} {format_name}]", | |
"timestamp": datetime.now().isoformat() | |
}] | |
except Exception as e: | |
logger.error(f"Error processing image file: {str(e)}") | |
return [{ | |
"source": "error", | |
"filename": os.path.basename(file_path), | |
"error": f"Error processing image file: {str(e)}", | |
"timestamp": datetime.now().isoformat() | |
}] | |
def _process_media_file(self, file_path: str) -> List[Dict]: | |
"""Extract metadata from audio/video files""" | |
try: | |
# Try to import mutagen module | |
import importlib.util | |
if importlib.util.find_spec("mutagen") is None: | |
return [{ | |
"error": "Media processing requires the 'mutagen' module. Install with 'pip install mutagen'." | |
}] | |
import mutagen | |
# Get file metadata | |
file_stat = os.stat(file_path) | |
mime_type, _ = mimetypes.guess_type(file_path) | |
# Extract media metadata | |
media_info = mutagen.File(file_path) | |
metadata = {} | |
if media_info: | |
# Extract common metadata | |
if hasattr(media_info, 'info') and hasattr(media_info.info, 'length'): | |
metadata['duration'] = media_info.info.length | |
# Extract tags | |
for key, value in media_info.items(): | |
if isinstance(value, list) and len(value) == 1: | |
metadata[key] = str(value[0]) | |
else: | |
metadata[key] = str(value) | |
return [{ | |
"source": "media", | |
"filename": os.path.basename(file_path), | |
"file_size": file_stat.st_size, | |
"mime_type": mime_type, | |
"created": datetime.fromtimestamp(file_stat.st_ctime).isoformat(), | |
"modified": datetime.fromtimestamp(file_stat.st_mtime).isoformat(), | |
"metadata": metadata, | |
"content": f"[Media file: {mime_type or 'unknown type'}]", | |
"timestamp": datetime.now().isoformat() | |
}] | |
except Exception as e: | |
logger.error(f"Error processing media file: {str(e)}") | |
return [{ | |
"source": "error", | |
"filename": os.path.basename(file_path), | |
"error": f"Error processing media file: {str(e)}", | |
"timestamp": datetime.now().isoformat() | |
}] | |
# QRProcessor class | |
# ================= | |
class QRProcessor: | |
"""Class to handle QR code processing""" | |
def __init__(self): | |
# Check for required libraries | |
self._check_dependencies() | |
def _check_dependencies(self): | |
"""Check if required libraries are installed""" | |
try: | |
import importlib.util | |
# Check for pyzbar | |
if importlib.util.find_spec("pyzbar") is None: | |
logger.warning("pyzbar library not found. QR code detection will not work. Install with 'pip install pyzbar'") | |
# Check for qrcode | |
if importlib.util.find_spec("qrcode") is None: | |
logger.warning("qrcode library not found. QR code generation will not work. Install with 'pip install qrcode'") | |
except ImportError as e: | |
logger.error(f"Error checking dependencies: {e}") | |
def detect_qr_codes(self, image_path: str) -> List[Dict]: | |
"""Detect QR codes in an image""" | |
try: | |
import importlib.util | |
if importlib.util.find_spec("pyzbar") is None: | |
return [{"error": "pyzbar library not found. Install with 'pip install pyzbar'"}] | |
from pyzbar.pyzbar import decode | |
from PIL import Image | |
# Open the image | |
image = Image.open(image_path) | |
# Decode QR codes | |
decoded_objects = decode(image) | |
results = [] | |
for obj in decoded_objects: | |
# Get the bounding box | |
rect = obj.rect | |
bbox = { | |
'left': rect.left, | |
'top': rect.top, | |
'width': rect.width, | |
'height': rect.height | |
} | |
# Get the data | |
data = obj.data.decode('utf-8', errors='replace') | |
# Get the type | |
qr_type = obj.type | |
results.append({ | |
'type': qr_type, | |
'data': data, | |
'bbox': bbox, | |
'timestamp': datetime.now().isoformat() | |
}) | |
if not results: | |
results.append({ | |
'warning': 'No QR codes detected in the image', | |
'timestamp': datetime.now().isoformat() | |
}) | |
return results | |
except Exception as e: | |
logger.error(f"Error detecting QR codes: {e}") | |
return [{"error": f"Error detecting QR codes: {str(e)}"}] | |
def generate_qr_code(self, data: str, output_path: Optional[str] = None, size: int = 10) -> Dict: | |
"""Generate a QR code from data""" | |
try: | |
import importlib.util | |
if importlib.util.find_spec("qrcode") is None: | |
return {"error": "qrcode library not found. Install with 'pip install qrcode'"} | |
import qrcode | |
# Create QR code instance | |
qr = qrcode.QRCode( | |
version=1, | |
error_correction=qrcode.constants.ERROR_CORRECT_L, | |
box_size=size, | |
border=4, | |
) | |
# Add data | |
qr.add_data(data) | |
qr.make(fit=True) | |
# Create an image from the QR Code instance | |
img = qr.make_image(fill_color="black", back_color="white") | |
# Save the image if output path is provided | |
if output_path: | |
img.save(output_path) | |
return { | |
'success': True, | |
'data': data, | |
'output_path': output_path, | |
'timestamp': datetime.now().isoformat() | |
} | |
else: | |
# Save to a temporary file | |
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp: | |
temp_path = tmp.name | |
img.save(temp_path) | |
return { | |
'success': True, | |
'data': data, | |
'output_path': temp_path, | |
'timestamp': datetime.now().isoformat() | |
} | |
except Exception as e: | |
logger.error(f"Error generating QR code: {e}") | |
return {"error": f"Error generating QR code: {str(e)}"} | |
def extract_qr_from_url(self, url_processor, url: str) -> List[Dict]: | |
"""Extract QR codes from an image URL""" | |
try: | |
# Fetch the image from the URL | |
response = url_processor.session.get(url, stream=True) | |
response.raise_for_status() | |
# Save to a temporary file | |
with tempfile.NamedTemporaryFile(suffix='.png', delete=False) as tmp: | |
temp_path = tmp.name | |
for chunk in response.iter_content(chunk_size=128): | |
tmp.write(chunk) | |
# Process the image | |
results = self.detect_qr_codes(temp_path) | |
# Add source information | |
for result in results: | |
result['source_url'] = url | |
# Clean up | |
os.unlink(temp_path) | |
return results | |
except Exception as e: | |
logger.error(f"Error extracting QR from URL: {e}") | |
return [{"error": f"Error extracting QR from URL: {str(e)}"}] | |
def batch_process_images(self, image_paths: List[str]) -> Dict[str, List[Dict]]: | |
"""Process multiple images for QR codes""" | |
results = {} | |
for image_path in image_paths: | |
try: | |
if os.path.exists(image_path): | |
image_results = self.detect_qr_codes(image_path) | |
results[image_path] = image_results | |
else: | |
results[image_path] = [{"error": f"Image file not found: {image_path}"}] | |
except Exception as e: | |
logger.error(f"Error processing image {image_path}: {e}") | |
results[image_path] = [{"error": f"Processing error: {str(e)}"}] | |
def create_interface(): | |
"""Create a comprehensive Gradio interface with advanced features""" | |
css = """ | |
.container { max-width: 1200px; margin: auto; } | |
.warning { background-color: #fff3cd; color: #856404; } | |
.error { background-color: #f8d7da; color: #721c24; } | |
""" | |
with gr.Blocks(css=css, title="Advanced Text & URL Processor") as interface: | |
gr.Markdown("# π Advanced URL & Text Processing Toolkit") | |
with gr.Tab("URL Processing"): | |
url_input = gr.Textbox( | |
label="Enter URLs (comma or newline separated)", | |
lines=5, | |
placeholder="https://example1.com\nhttps://example2.com" | |
) | |
with gr.Tab("File Input"): | |
file_input = gr.File( | |
label="Upload text file or ZIP archive", | |
file_types=[".txt", ".zip", ".md", ".csv", ".json", ".xml"] | |
) | |
with gr.Tab("Text Input"): | |
text_input = gr.Textbox( | |
label="Raw Text Input", | |
lines=5, | |
placeholder="Paste your text here..." | |
) | |
with gr.Tab("JSON Editor"): | |
json_editor = gr.Textbox( | |
label="JSON Editor", | |
lines=20, | |
placeholder="View and edit your JSON data here...", | |
interactive=True, | |
elem_id="json-editor" # Optional: for custom styling | |
) | |
with gr.Tab("Scratchpad"): | |
scratchpad = gr.Textbox( | |
label="Scratchpad", | |
lines=10, | |
placeholder="Quick notes or text collections...", | |
interactive=True | |
) | |
process_btn = gr.Button("Process Input", variant="primary") | |
qr_btn = gr.Button("Generate QR Code", variant="secondary") | |
output_text = gr.Textbox(label="Processing Results", interactive=False) | |
output_file = gr.File(label="Processed Output") | |
qr_output = gr.Image(label="QR Code", type="filepath") # To display the generated QR code | |
process_btn.click( | |
process_all_inputs, | |
inputs=[url_input, file_input, text_input, scratchpad], | |
outputs=[output_file, output_text, json_editor] # Update outputs to include JSON editor | |
) | |
qr_btn.click( | |
generate_qr_code, | |
inputs=json_editor, | |
outputs=qr_output | |
) | |
gr.Markdown(""" | |
### Usage Guidelines | |
- **URL Processing**: Enter valid HTTP/HTTPS URLs | |
- **File Input**: Upload text files or ZIP archives | |
- ** Text Input**: Direct text processing | |
- **JSON Editor**: View and edit your JSON data | |
- **Scratchpad**: Quick notes or text collections | |
- Advanced cleaning and validation included | |
""") | |
return interface | |
def main(): | |
# Configure system settings | |
mimetypes.init() | |
# Create and launch interface | |
interface = create_interface() | |
# Launch with proper configuration | |
interface.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
show_error=True, | |
share=False, | |
inbrowser=True, | |
debug=True | |
) | |
if __name__ == "__main__": | |
main() | |