urld / app2.py
acecalisto3's picture
fixes
2122090
raw
history blame
81.1 kB
import json
import os
import re
import time
import logging
import mimetypes
import zipfile
import tempfile
import chardet
import io # Needed for processing CSV from string
import csv # Needed for CSV
import xml.etree.ElementTree as ET # Needed for XML
from datetime import datetime
from typing import List, Dict, Optional, Union, Tuple, Any # Added Any for extracted_data
from pathlib import Path
from urllib.parse import urlparse, urljoin
import requests
import validators
import gradio as gr
from diskcache import Cache
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from cleantext import clean
import qrcode
from PIL import Image, ImageDraw, ImageFont
import numpy as np
import tarfile
import gzip
import math
# Setup enhanced logging with more detailed formatting
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('app.log', encoding='utf-8')
])
logger = logging.getLogger(__name__) # logger is now defined here
# Conditional imports for document processing
try:
from PyPDF2 import PdfReader
PDF_SUPPORT = True
except ImportError:
PDF_SUPPORT = False
logger.warning("PyPDF2 not installed. PDF file processing will be limited.") # logger is available
try:
from docx import Document
DOCX_SUPPORT = True
except ImportError:
DOCX_SUPPORT = False
logger.warning("python-docx not installed. DOCX file processing will be limited.") # logger is available
try:
from pyth.plugins.rtf15.reader import Rtf15Reader
from pyth.plugins.plaintext.writer import PlaintextWriter
RTF_SUPPORT = True
except ImportError:
RTF_SUPPORT = False
logger.warning("pyth not installed. RTF file processing will be limited.")
try:
from odf.opendocument import OpenDocumentText
from odf import text as odftext
ODT_SUPPORT = True
except ImportError:
ODT_SUPPORT = False
logger.warning("odfpy not installed. ODT file processing will be limited.")
# Setup enhanced logging with more detailed formatting
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('app.log', encoding='utf-8')
])
logger = logging.getLogger(__name__)
# Ensure output directories exist with modern structure
OUTPUTS_DIR = Path('output')
QR_CODES_DIR = OUTPUTS_DIR / 'qr_codes'
TEMP_DIR = OUTPUTS_DIR / 'temp'
for directory in [OUTPUTS_DIR, QR_CODES_DIR, TEMP_DIR]:
directory.mkdir(parents=True, exist_ok=True)
class EnhancedURLProcessor:
"""Advanced URL processing with enhanced content extraction and recursive link following."""
def __init__(self):
self.session = requests.Session()
self.timeout = 15 # Extended timeout for larger content
self.max_retries = 3
self.user_agent = UserAgent()
# Enhanced headers for better site compatibility
self.session.headers.update({
'User-Agent': self.user_agent.random,
'Accept': 'text/html, application/json, application/xml, text/plain, */*', # Request common types
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1', # May be ignored for non-HTML
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'DNT': '1'
})
def validate_url(self, url: str) -> Dict[str, Any]:
"""Enhanced URL validation with detailed feedback"""
try:
if not validators.url(url):
return {'is_valid': False, 'message': 'Invalid URL format', 'details': 'URL must begin with http:// or https://'}
parsed = urlparse(url)
if not all([parsed.scheme, parsed.netloc]):
return {'is_valid': False, 'message': 'Incomplete URL', 'details': 'Missing scheme or domain'}
# Try HEAD request first to check accessibility
try:
head_response = self.session.head(url, timeout=5)
head_response.raise_for_status()
final_url = head_response.url # Capture potential redirects
content_type = head_response.headers.get('Content-Type', 'unknown')
server = head_response.headers.get('Server', 'unknown')
size = head_response.headers.get('Content-Length', 'unknown')
except requests.exceptions.RequestException:
# If HEAD fails, try GET as some servers don't support HEAD
try:
response = self.session.get(url, timeout=self.timeout)
response.raise_for_status()
final_url = response.url # Capture potential redirects
content_type = response.headers.get('Content-Type', 'unknown')
server = response.headers.get('Server', 'unknown')
size = response.headers.get('Content-Length', 'unknown') # May not be accurate for full content
except requests.exceptions.RequestException as get_e:
return {'is_valid': False, 'message': f'URL not accessible after HEAD/GET attempts: {str(get_e)}', 'details': str(get_e)}
except Exception as get_e:
return {'is_valid': False, 'message': f'Unexpected error during GET validation: {str(get_e)}', 'details': str(get_e)}
return {
'is_valid': True,
'message': 'URL is valid and accessible',
'details': {
'final_url': final_url,
'content_type': content_type,
'server': server,
'size': size
}
}
except Exception as e:
return {'is_valid': False, 'message': f'URL validation failed: {str(e)}', 'details': str(e)}
def fetch_content(self, url: str, retry_count: int = 0) -> Optional[Dict[str, Any]]:
"""Enhanced content fetcher with retry mechanism and complete character extraction"""
try:
logger.info(f"Fetching content from URL: {url} (Attempt {retry_count + 1}/{self.max_retries})")
# Update User-Agent randomly for each request
self.session.headers.update({'User-Agent': self.user_agent.random})
response = self.session.get(url, timeout=self.timeout)
response.raise_for_status()
final_url = response.url # Capture potential redirects
content_type = response.headers.get('Content-Type', '')
# Detect encoding
if response.encoding is None or response.encoding == 'ISO-8859-1': # chardet often better than default response.encoding for text
encoding_detection = chardet.detect(response.content)
encoding = encoding_detection['encoding'] or 'utf-8'
logger.debug(f"Detected encoding '{encoding}' with confidence {encoding_detection['confidence']:.2f} for {url}")
else:
encoding = response.encoding
logger.debug(f"Using response.encoding '{encoding}' for {url}")
# Decode content with fallback
try:
raw_content = response.content.decode(encoding, errors='replace')
except (UnicodeDecodeError, LookupError):
# Fallback to a more common encoding if the first attempt fails
try:
raw_content = response.content.decode('utf-8', errors='replace')
encoding = 'utf-8 (fallback)'
logger.warning(f"Decoding with {encoding} fallback for {url}")
except Exception:
raw_content = response.content.decode('latin-1', errors='replace') # Another common fallback
encoding = 'latin-1 (fallback)'
logger.warning(f"Decoding with {encoding} fallback for {url}")
# Extract metadata
metadata = {
'original_url': url,
'final_url': final_url,
'timestamp': datetime.now().isoformat(),
'detected_encoding': encoding,
'content_type': content_type,
'content_length': len(response.content),
'headers': dict(response.headers),
'status_code': response.status_code
}
# Process based on content type
processed_extraction = self._process_web_content(raw_content, metadata['content_type'], final_url)
return {
'source': 'url',
'url': url, # Keep original URL as identifier for this step
'raw_content': raw_content,
'metadata': metadata,
'extracted_data': processed_extraction['data'],
'processing_notes': processed_extraction['notes']
}
except requests.exceptions.RequestException as e:
if retry_count < self.max_retries - 1:
logger.warning(f"Retry {retry_count + 1}/{self.max_retries} for URL: {url}")
time.sleep(2 ** retry_count) # Exponential backoff
return self.fetch_content(url, retry_count + 1)
logger.error(f"Failed to fetch content after {self.max_retries} attempts from {url}: {e}")
return {
'source': 'url',
'url': url,
'raw_content': None,
'metadata': {'original_url': url, 'timestamp': datetime.now().isoformat(), 'status_code': None}, # Include basic metadata on failure
'extracted_data': None,
'processing_notes': [f"Failed to fetch content after {self.max_retries} attempts: {str(e)}"] # Ensure notes is a list
}
except Exception as e:
logger.error(f"Unexpected error while fetching or processing URL {url}: {e}")
return {
'source': 'url',
'url': url,
'raw_content': raw_content if 'raw_content' in locals() else None,
'metadata': metadata if 'metadata' in locals() else {'original_url': url, 'timestamp': datetime.now().isoformat(), 'status_code': None},
'extracted_data': None,
'processing_notes': [f"Unexpected processing error: {str(e)}"]
}
def _process_web_content(self, content: str, content_type: str, base_url: str) -> Dict[str, Any]:
"""Process content based on detected content type"""
lower_content_type = content_type.lower()
notes = []
extracted_data: Any = None # Use Any to allow different types
try:
if 'text/html' in lower_content_type:
logger.debug(f"Processing HTML content from {base_url}")
extracted_data = self._process_html_content_enhanced(content, base_url)
notes.append("Processed as HTML")
elif 'application/json' in lower_content_type or 'text/json' in lower_content_type:
logger.debug(f"Processing JSON content from {base_url}")
try:
extracted_data = json.loads(content)
notes.append("Parsed as JSON")
except json.JSONDecodeError as e:
extracted_data = content # Keep raw text if invalid JSON
notes.append(f"Failed to parse as JSON: {e}")
logger.warning(f"Failed to parse JSON from {base_url}: {e}")
except Exception as e:
extracted_data = content
notes.append(f"Error processing JSON: {e}")
logger.error(f"Error processing JSON from {base_url}: {e}")
elif 'application/xml' in lower_content_type or 'text/xml' in lower_content_type or lower_content_type.endswith('+xml'):
logger.debug(f"Processing XML content from {base_url}")
try:
# Try parsing XML. Convert to a string representation.
root = ET.fromstring(content)
xml_text = ET.tostring(root, encoding='unicode', method='xml')
extracted_data = xml_text # Store as string for now
notes.append("Parsed as XML (text representation)")
except ET.ParseError as e:
extracted_data = content
notes.append(f"Failed to parse as XML: {e}")
logger.warning(f"Failed to parse XML from {base_url}: {e}")
except Exception as e:
extracted_data = content
notes.append(f"Error processing XML: {e}")
logger.error(f"Error processing XML from {base_url}: {e}")
elif 'text/plain' in lower_content_type or 'text/' in lower_content_type: # Catch other text types
logger.debug(f"Processing Plain Text content from {base_url}")
extracted_data = content
notes.append("Processed as Plain Text")
else:
logger.debug(f"Unknown content type '{content_type}' from {base_url}. Storing raw content.")
extracted_data = content # Store raw content for unknown types
notes.append(f"Unknown content type '{content_type}'. Stored raw text.")
except Exception as e:
logger.error(f"Unexpected error in _process_web_content for {base_url} ({content_type}): {e}")
extracted_data = content # Fallback to raw content on error
notes.append(f"Unexpected processing error: {e}. Stored raw text.")
return {'data': extracted_data, 'notes': notes}
def _process_html_content_enhanced(self, content: str, base_url: str) -> Dict[str, Any]:
"""Process HTML content, preserving text, and extracting metadata and links."""
extracted: Dict[str, Any] = {
'title': None,
'meta_description': None, # Add extraction for meta description
'full_text': "",
'links': [] # Add extraction for links
}
try:
soup = BeautifulSoup(content, 'html.parser')
# Extract Title
if soup.title and soup.title.string:
extracted['title'] = soup.title.string.strip()
# Extract Meta Description
meta_desc = soup.find('meta', attrs={'name': 'description'})
if meta_desc and meta_desc.get('content'):
extracted['meta_description'] = meta_desc['content'].strip()
# Extract and process links (convert relative to absolute)
# Use a set to avoid duplicate URLs in the links list
unique_links = set()
for a_tag in soup.find_all('a', href=True):
href = a_tag['href'].strip()
if href and not href.startswith(('#', 'mailto:', 'tel:', 'javascript:')): # Basic filter
text = a_tag.get_text().strip()
try:
absolute_url = urljoin(base_url, href)
if absolute_url not in unique_links:
extracted['links'].append({'text': text, 'url': absolute_url})
unique_links.add(absolute_url)
except Exception:
# If urljoin fails, keep the original href if it looks like a valid potential URL part
if validators.url(href) and href not in unique_links:
extracted['links'].append({'text': text, 'url': href})
unique_links.add(href)
elif urlparse(href).netloc and href not in unique_links: # Maybe just a domain/path?
extracted['links'].append({'text': text, 'url': href})
unique_links.add(href)
# Extract all text content (similar to stripped_strings but ensures order)
# Use a more robust way to get visible text, including handling script/style tags
soup_copy = BeautifulSoup(content, 'html.parser') # Work on a copy to preserve soup for links
for script_or_style in soup_copy(["script", "style"]):
script_or_style.extract() # Remove script and style tags
text = soup_copy.get_text(separator='\n') # Get text with newlines
# Clean up whitespace and empty lines
lines = text.splitlines()
cleaned_lines = [line.strip() for line in lines if line.strip()]
extracted['full_text'] = '\n'.join(cleaned_lines)
except Exception as e:
logger.error(f"Enhanced HTML processing error for {base_url}: {e}")
# Fallback: Store raw text and indicate error
soup_copy = BeautifulSoup(content, 'html.parser')
for script_or_style in soup_copy(["script", "style"]):
script_or_style.extract()
extracted['full_text'] = soup_copy.get_text(separator='\n').strip()
extracted['processing_error'] = f"Enhanced HTML processing failed: {e}"
return extracted
def fetch_content_with_depth(self, url: str, max_steps: int = 0) -> Dict[str, Any]:
"""
Fetches content from a URL and recursively follows links up to a specified depth.
Args:
url: The initial URL to fetch.
max_steps: The maximum number of levels to follow links (0-3).
0: Only fetch the initial URL.
1: Fetch the initial URL and the links found on that page.
2: Fetch the initial URL, its links, and the links on those pages.
3: Fetch up to the third level of links.
Returns:
A dictionary containing the extraction result for the initial URL and
nested results for followed links.
"""
if not isinstance(max_steps, int) or not (0 <= max_steps <= 3):
logger.error(f"Invalid max_steps value: {max_steps}. Must be an integer between 0 and 3.")
return {
'url': url,
'level': 0,
'fetch_result': None,
'linked_extractions': [],
'note': f"Invalid max_steps value: {max_steps}. Must be an integer between 0 and 3."
}
validation_result = self.validate_url(url)
if not validation_result['is_valid']:
logger.error(f"Initial URL validation failed for {url}: {validation_result['message']}")
return {
'url': url,
'level': 0,
'fetch_result': None,
'linked_extractions': [],
'note': f"Initial URL validation failed: {validation_result['message']}"
}
return self._fetch_content_recursive(url, max_steps, current_step=0)
def _fetch_content_recursive(self, url: str, max_steps: int, current_step: int) -> Dict[str, Any]:
"""Recursive helper to fetch content and follow links."""
if current_step > max_steps:
logger.debug(f"Depth limit reached for {url} at level {current_step}.")
return {
'url': url,
'level': current_step,
'fetch_result': None, # Indicate no fetch happened at this level
'linked_extractions': [],
'note': f"Depth limit ({max_steps}) reached."
}
logger.info(f"Processing URL: {url} at level {current_step}/{max_steps}")
# Fetch content for the current URL
fetch_result = self.fetch_content(url)
linked_extractions: List[Dict[str, Any]] = []
# Only follow links if fetch was successful, content is HTML, and within depth limit
if fetch_result and fetch_result.get('extracted_data') and 'text/html' in fetch_result.get('metadata', {}).get('content_type', '').lower():
extracted_data = fetch_result['extracted_data']
links = extracted_data.get('links', []) # Ensure links is a list even if missing
logger.info(f"Found {len(links)} links on {url} at level {current_step}. Proceeding to depth {current_step + 1}.")
# Recursively fetch linked content if not at max depth
if current_step < max_steps:
for link_info in links:
linked_url = link_info.get('url')
if linked_url:
# Simple check to avoid re-fetching the same URL repeatedly in a chain
# More sophisticated cycle detection might be needed for complex graphs
if linked_url != urlparse(url)._replace(fragment='').geturl(): # Avoid self-referencing links ignoring fragment
# Recursively call for the linked URL
linked_result = self._fetch_content_recursive(linked_url, max_steps, current_step + 1)
linked_extractions.append(linked_result)
else:
logger.debug(f"Skipping self-referencing link: {linked_url}")
linked_extractions.append({
'url': linked_url,
'level': current_step + 1,
'fetch_result': None,
'linked_extractions': [],
'note': 'Skipped self-referencing link'
})
else:
linked_extractions.append({
'url': 'Invalid or missing link',
'level': current_step + 1,
'fetch_result': None,
'linked_extractions': [],
'note': 'Link URL not found or invalid'
})
else:
logger.info(f"Max depth ({max_steps}) reached. Not following links from {url}.")
return {
'url': url,
'level': current_step,
'fetch_result': fetch_result,
'linked_extractions': linked_extractions,
'note': f"Processed at level {current_step}"
}
# --- Example Usage ---
if __name__ == "__main__":
processor = EnhancedURLProcessor()
# --- Test Cases ---
# Test with 0 steps (only initial URL)
print("\n--- Testing with max_steps = 0 ---")
result_0 = processor.fetch_content_with_depth("https://httpbin.org/html", max_steps=0)
# print(json.dumps(result_0, indent=2)) # Uncomment to see full structure
print(f"Initial URL ({result_0['url']}) fetched at level {result_0['level']}. Success: {result_0['fetch_result'] is not None}")
print(f"Number of linked extractions: {len(result_0['linked_extractions'])}") # Should be 0
# Test with 1 step (initial URL + its direct links)
# Note: Replace with a real website URL that has internal links for meaningful testing
# For demonstration, using a placeholder. A real site like a blog post or news article front page is better.
test_url_with_links = "https://quotes.toscrape.com/" # Example site with links
print(f"\n--- Testing with max_steps = 1 for {test_url_with_links} ---")
result_1 = processor.fetch_content_with_depth(test_url_with_links, max_steps=1)
# print(json.dumps(result_1, indent=2)) # Uncomment to see full structure
print(f"Initial URL ({result_1['url']}) fetched at level {result_1['level']}. Success: {result_1['fetch_result'] is not None}")
print(f"Number of direct links found and processed: {len(result_1['linked_extractions'])}")
if result_1['linked_extractions']:
print(f"First linked URL processed at level 1: {result_1['linked_extractions'][0]['url']}")
print(f"Number of links found on the first linked page: {len(result_1['linked_extractions'][0]['linked_extractions'])}") # Should be 0 for max_steps=1
# Test with 2 steps
print(f"\n--- Testing with max_steps = 2 for {test_url_with_links} ---")
result_2 = processor.fetch_content_with_depth(test_url_with_links, max_steps=2)
# print(json.dumps(result_2, indent=2)) # Uncomment to see full structure
print(f"Initial URL ({result_2['url']}) fetched at level {result_2['level']}. Success: {result_2['fetch_result'] is not None}")
print(f"Number of direct links found and processed (Level 1): {len(result_2['linked_extractions'])}")
if result_2['linked_extractions']:
print(f"First linked URL processed at level 1: {result_2['linked_extractions'][0]['url']}")
print(f"Number of links found on the first linked page and processed (Level 2): {len(result_2['linked_extractions'][0]['linked_extractions'])}")
if result_2['linked_extractions'][0]['linked_extractions']:
print(f"First level 2 linked URL: {result_2['linked_extractions'][0]['linked_extractions'][0]['url']}")
print(f"Number of links found on the first level 2 page: {len(result_2['linked_extractions'][0]['linked_extractions'][0]['linked_extractions'])}") # Should be 0 for max_steps=2
# Test with max_steps = 3 (will go one level deeper than 2)
# print(f"\n--- Testing with max_steps = 3 for {test_url_with_links} ---")
# result_3 = processor.fetch_content_with_depth(test_url_with_links, max_steps=3)
# print(json.dumps(result_3, indent=2)) # Uncomment to see full structure
# Add similar print statements for result_3 to show levels 1, 2, and 3 counts
# Test with invalid max_steps
print("\n--- Testing with invalid max_steps = 4 ---")
result_invalid = processor.fetch_content_with_depth("https://example.com", max_steps=4)
print(f"Result for invalid steps: {result_invalid.get('note')}")
# Test with invalid initial URL
print("\n--- Testing with invalid initial URL ---")
result_invalid_url = processor.fetch_content_with_depth("invalid-url", max_steps=1)
print(f"Result for invalid initial URL: {result_invalid_url.get('note')}")
# Test with a URL that might fail to fetch
print("\n--- Testing with a potentially failing URL ---")
# Use a non-existent subdomain or a port that's unlikely to be open
failing_url = "http://this-domain-does-not-exist-12345.com/"
result_fail = processor.fetch_content_with_depth(failing_url, max_steps=1)
print(f"Result for failing URL: {result_fail.get('note')}")
if result_fail.get('fetch_result'):
print(f"Fetch result notes for failing URL: {result_fail['fetch_result'].get('processing_notes')}")
class EnhancedFileProcessor:
"""Advanced file processing with enhanced content extraction"""
def __init__(self, max_file_size: int = 5 * 1024 * 1024 * 1024): # 5GB default
self.max_file_size = max_file_size
# Expanded supported extensions to include common docs and structured formats
self.supported_extensions = {
'.txt', '.md', '.csv', '.json', '.xml', '.html', '.htm',
'.log', '.yml', '.yaml', '.ini', '.conf', '.cfg',
'.pdf', '.doc', '.docx', '.rtf', '.odt',
# Archives are handled separately but listed for context
'.zip', '.tar', '.gz', '.bz2', '.7z', '.rar',
}
self.archive_extensions = {'.zip', '.tar', '.gz', '.bz2', '.7z', '.rar'}
def process_file(self, file) -> List[Dict]:
"""Process uploaded file with enhanced error handling and complete extraction"""
if not file or not hasattr(file, 'name'):
logger.warning("Received invalid file object.")
return []
dataset = []
file_path = Path(file.name) # Use Path object for easier handling
try:
file_size = file_path.stat().st_size
if file_size > self.max_file_size:
logger.warning(f"File '{file_path.name}' size ({file_size} bytes) exceeds maximum allowed size ({self.max_file_size} bytes).")
return [{
'source': 'file',
'filename': file_path.name,
'file_size': file_size,
'extracted_data': None,
'processing_notes': 'File size exceeds limit.'
}]
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir_path = Path(temp_dir)
# Decide processing strategy
if file_path.suffix.lower() in self.archive_extensions:
dataset.extend(self._process_archive(file_path, temp_dir_path))
elif file_path.suffix.lower() in self.supported_extensions:
# Pass the path to the single file processor
dataset.extend(self._process_single_file(file_path))
else:
logger.warning(f"Unsupported file type for processing: '{file_path.name}'")
# Optionally process as raw text even if extension is unsupported
try:
# Read as text with error replacement
content_bytes = file_path.read_bytes()
encoding_detection = chardet.detect(content_bytes)
encoding = encoding_detection['encoding'] or 'utf-8'
raw_content = content_bytes.decode(encoding, errors='replace')
dataset.append({
'source': 'file',
'filename': file_path.name,
'file_size': file_size,
'mime_type': mimetypes.guess_type(file_path.name)[0] or 'unknown/unknown',
'extracted_data': {'plain_text': raw_content}, # Store raw text under a key
'processing_notes': 'Processed as plain text (unsupported extension).'
})
except Exception as e:
logger.error(f"Error reading or processing unsupported file '{file_path.name}' as text: {e}")
dataset.append({
'source': 'file',
'filename': file_path.name,
'file_size': file_size,
'mime_type': mimetypes.guess_type(file_path.name)[0] or 'unknown/unknown',
'extracted_data': None,
'processing_notes': f'Unsupported file type and failed to read as text: {e}'
})
except Exception as e:
logger.error(f"Error processing file '{file_path.name}': {str(e)}")
dataset.append({
'source': 'file',
'filename': file_path.name,
'file_size': file_size if 'file_size' in locals() else None,
'extracted_data': None,
'processing_notes': f'Overall file processing error: {str(e)}'
})
return dataset
def _is_archive(self, filepath: Union[str, Path]) -> bool:
"""Check if file is an archive"""
p = Path(filepath) if isinstance(filepath, str) else filepath
return p.suffix.lower() in self.archive_extensions
def _process_single_file(self, file_path: Path) -> List[Dict]:
"""Process a single file with enhanced character extraction and format-specific handling"""
dataset_entries = []
filename = file_path.name
file_size = file_path.stat().st_size
mime_type, _ = mimetypes.guess_type(file_path)
mime_type = mime_type or 'unknown/unknown'
file_extension = file_path.suffix.lower()
logger.info(f"Processing single file: '{filename}' ({mime_type}, {file_size} bytes)")
raw_content: Optional[str] = None
extracted_data: Any = None
processing_notes = []
try:
# Read content efficiently
content_bytes = file_path.read_bytes()
encoding_detection = chardet.detect(content_bytes)
encoding = encoding_detection['encoding'] or 'utf-8'
raw_content = content_bytes.decode(encoding, errors='replace')
# --- Attempt format-specific parsing ---
# 1. Attempt JSON parsing (explicit .json or application/json, OR if content looks like JSON)
is_explicit_json = mime_type == 'application/json' or file_extension == '.json'
looks_like_json = raw_content.strip().startswith('{') or raw_content.strip().startswith('[')
if is_explicit_json or looks_like_json:
try:
extracted_data = json.loads(raw_content)
processing_notes.append("Parsed as JSON.")
if not is_explicit_json:
processing_notes.append("Note: Content looked like JSON despite extension/mime.")
logger.warning(f"File '{filename}' identified as JSON content despite extension/mime.")
mime_type = 'application/json' # Update mime_type if successfully parsed as JSON
except json.JSONDecodeError as e:
processing_notes.append(f"Failed to parse as JSON: {e}.")
if is_explicit_json:
logger.error(f"Explicit JSON file '{filename}' has invalid format: {e}")
else:
logger.warning(f"Content of '{filename}' looks like JSON but failed to parse: {e}")
except Exception as e:
processing_notes.append(f"Error processing JSON: {e}.")
logger.error(f"Error processing JSON in '{filename}': {e}")
# 2. Attempt XML parsing (if not already parsed as JSON, and looks like XML)
# Add check if extracted_data is still None (meaning JSON parsing failed or wasn't attempted/relevant)
looks_like_xml = extracted_data is None and raw_content.strip().startswith('<') and raw_content.strip().endswith('>') # Simple heuristic
is_explicit_xml = extracted_data is None and (mime_type in ('application/xml', 'text/xml') or mime_type.endswith('+xml') or file_extension in ('.xml', '.xsd'))
if extracted_data is None and (is_explicit_xml or looks_like_xml):
try:
root = ET.fromstring(raw_content)
# Convert XML element tree to a structured dictionary or string
# Simple string representation for QR code suitability
extracted_data = ET.tostring(root, encoding='unicode', method='xml')
processing_notes.append("Parsed as XML (text representation).")
if not is_explicit_xml:
processing_notes.append("Note: Content looked like XML despite extension/mime.")
# Update mime_type if successfully parsed as XML
if 'xml' not in mime_type: mime_type = 'application/xml'
except ET.ParseError as e:
processing_notes.append(f"Failed to parse as XML: {e}.")
if is_explicit_xml:
logger.error(f"Explicit XML file '{filename}' has invalid format: {e}")
else:
logger.warning(f"Content of '{filename}' looks like XML but failed to parse: {e}")
except Exception as e:
processing_notes.append(f"Error processing XML: {e}.")
logger.error(f"Error processing XML in '{filename}': {e}")
# 3. Attempt CSV parsing (if not already parsed, and looks like CSV or is explicit CSV)
is_explicit_csv = extracted_data is None and (mime_type == 'text/csv' or file_extension == '.csv')
# Heuristic: check for commas/semicolons and multiple lines
looks_like_csv = extracted_data is None and (',' in raw_content or ';' in raw_content) and ('\n' in raw_content or len(raw_content.splitlines()) > 1)
if extracted_data is None and (is_explicit_csv or looks_like_csv):
try:
# Use Sniffer to guess dialect for better compatibility
dialect = 'excel' # Default dialect
try:
# Look at first few lines to guess dialect
sample = '\n'.join(raw_content.splitlines()[:10])
if sample:
dialect = csv.Sniffer().sniff(sample).name
logger.debug(f"Sniffer detected CSV dialect: {dialect} for '{filename}'")
except csv.Error:
logger.debug(f"Sniffer failed to detect dialect for '{filename}', using 'excel'.")
dialect = 'excel' # Fallback
# Read using the guessed or default dialect
csv_reader = csv.reader(io.StringIO(raw_content), dialect=dialect)
rows = list(csv_reader)
if rows:
# Limit the number of rows included for potentially huge CSVs
max_rows_preview = 100
extracted_data = {
'headers': rows[0] if rows[0] else None, # Assume first row is header
'rows': rows[1:max_rows_preview+1] # Get up to max_rows_preview data rows
}
if len(rows) > max_rows_preview + 1:
processing_notes.append(f"CSV truncated to {max_rows_preview} data rows.")
processing_notes.append("Parsed as CSV.")
if not is_explicit_csv:
processing_notes.append("Note: Content looked like CSV despite extension/mime.")
mime_type = 'text/csv' # Update mime_type
else:
extracted_data = "Empty CSV"
processing_notes.append("Parsed as empty CSV.")
if not is_explicit_csv:
processing_notes.append("Note: Content looked like CSV but was empty.")
except Exception as e:
processing_notes.append(f"Failed to parse as CSV: {e}.")
logger.warning(f"Failed to parse CSV from '{filename}': {e}")
# 4. Attempt Document Text Extraction (if not already parsed)
if extracted_data is None:
try:
extracted_text = None
if file_extension == '.pdf' and PDF_SUPPORT:
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file:
tmp_file.write(content_bytes) # Write bytes to temp file
temp_path = Path(tmp_file.name)
try:
reader = PdfReader(temp_path)
text_content = "".join(page.extract_text() or "" for page in reader.pages)
extracted_text = text_content
processing_notes.append("Extracted text from PDF.")
finally:
temp_path.unlink() # Clean up temp file
elif file_extension == '.docx' and DOCX_SUPPORT:
with tempfile.NamedTemporaryFile(delete=False, suffix='.docx') as tmp_file:
tmp_file.write(content_bytes) # Write bytes to temp file
temp_path = Path(tmp_file.name)
try:
document = Document(temp_path)
text_content = "\n".join(paragraph.text for paragraph in document.paragraphs)
extracted_text = text_content
processing_notes.append("Extracted text from DOCX.")
finally:
temp_path.unlink() # Clean up temp file
elif file_extension == '.rtf' and RTF_SUPPORT:
# pyth can read directly from file-like object or string
try:
doc = Rtf15Reader.read(io.StringIO(raw_content))
text_content = PlaintextWriter.write(doc).getvalue()
extracted_text = text_content
processing_notes.append("Extracted text from RTF.")
except Exception as e:
processing_notes.append(f"RTF extraction error: {e}")
logger.warning(f"Failed to extract RTF text from '{filename}': {e}")
elif file_extension == '.odt' and ODT_SUPPORT:
with tempfile.NamedTemporaryFile(delete=False, suffix='.odt') as tmp_file:
tmp_file.write(content_bytes) # Write bytes to temp file
temp_path = Path(tmp_file.name)
try:
text_doc = OpenDocumentText(temp_path)
paragraphs = text_doc.getElementsByType(odftext.P)
text_content = "\n".join("".join(node.text for node in p.childNodes) for p in paragraphs)
extracted_text = text_content
processing_notes.append("Extracted text from ODT.")
finally:
temp_path.unlink() # Clean up temp file
elif file_extension in ['.doc', '.ppt', '.pptx', '.xls', '.xlsx']:
# These require more complex or platform-specific libraries (e.g. antiword, pandoc, COM objects on Windows)
processing_notes.append(f"Automatic text extraction for {file_extension.upper()} not fully implemented.")
logger.warning(f"Automatic text extraction for {file_extension.upper()} not fully implemented for '{filename}'.")
if extracted_text is not None:
# Limit extracted text size
max_extracted_text_size = 10000 # Limit text preview
extracted_data = {'text': extracted_text[:max_extracted_text_size]}
if len(extracted_text) > max_extracted_text_size:
extracted_data['text'] += "..."
processing_notes.append("Extracted text truncated.")
except ImportError as e:
processing_notes.append(f"Missing dependency for document type ({e}). Cannot extract text.")
except Exception as e:
processing_notes.append(f"Error during document text extraction: {e}")
logger.warning(f"Error during document text extraction for '{filename}': {e}")
# 5. Fallback to Plain Text (if no specific extraction succeeded)
if extracted_data is None:
extracted_data = {'plain_text': raw_content}
processing_notes.append("Stored as plain text.")
# Re-guess mime type if it was something specific like application/octet-stream and we just got text
if mime_type in ['unknown/unknown', 'application/octet-stream']:
guessed_text_mime, _ = mimetypes.guess_type('dummy.txt') # Use a dummy file name to guess plain text
if guessed_text_mime: mime_type = guessed_text_mime
except Exception as e:
# Catch errors during initial read or other unexpected issues
logger.error(f"Fatal error processing single file '{filename}': {e}")
processing_notes.append(f"Fatal processing error: {e}")
raw_content = None # Ensure raw_content is None if reading failed
extracted_data = None
# Add file info to the entry
entry = {
'source': 'file',
'filename': filename,
'file_size': file_size,
'mime_type': mime_type,
'created': datetime.fromtimestamp(file_path.stat().st_ctime).isoformat() if file_path.exists() else None,
'modified': datetime.fromtimestamp(file_path.stat().st_mtime).isoformat() if file_path.exists() else None,
'raw_content': raw_content, # Always include raw content if readable
'extracted_data': extracted_data, # Include the structured/extracted data
'processing_notes': processing_notes # Include any notes/errors encountered
}
dataset_entries.append(entry)
return dataset_entries
def _process_archive(self, archive_path: Path, extract_to: Path) -> List[Dict]:
"""Process an archive file with enhanced extraction"""
dataset = []
archive_extension = archive_path.suffix.lower()
logger.info(f"Processing archive: '{archive_path.name}'")
try:
if archive_extension == '.zip':
if zipfile.is_zipfile(archive_path):
with zipfile.ZipFile(archive_path, 'r') as zip_ref:
for file_info in zip_ref.infolist():
if file_info.file_size > 0 and not file_info.filename.endswith('/'):
try:
zip_ref.extract(file_info, path=extract_to)
extracted_file_path = extract_to / file_info.filename
# Recursively process the extracted file if it's supported and not an archive itself
if extracted_file_path.suffix.lower() in self.supported_extensions and not self._is_archive(extracted_file_path):
dataset.extend(self._process_single_file(extracted_file_path))
elif extracted_file_path.suffix.lower() in self.archive_extensions:
# Recursively process nested archives (careful with depth!)
logger.info(f"Found nested archive '{file_info.filename}', processing recursively.")
dataset.extend(self._process_archive(extracted_file_path, extract_to))
else:
logger.debug(f"Skipping unsupported file in archive: '{file_info.filename}'")
except Exception as e:
logger.warning(f"Error extracting/processing file '{file_info.filename}' from zip '{archive_path.name}': {e}")
else:
logger.error(f"'{archive_path.name}' is not a valid zip file.")
elif archive_extension in ('.tar', '.gz', '.tgz'):
try:
# Determine mode: 'r' for tar, 'r:gz' for tar.gz, 'r:bz2' for tar.bz2 (bz2 not fully supported yet)
mode = 'r'
if archive_extension in ('.tar.gz', '.tgz'): mode = 'r:gz'
# elif archive_extension == '.tar.bz2': mode = 'r:bz2' # Needs bz2 support
# Note: 'r:*' attempts to guess compression, safer to be explicit
with tarfile.open(archive_path, mode) as tar_ref:
for member in tar_ref.getmembers():
if member.isfile():
try:
tar_ref.extract(member, path=extract_to)
extracted_file_path = extract_to / member.name
# Recursively process extracted file
if extracted_file_path.suffix.lower() in self.supported_extensions and not self._is_archive(extracted_file_path):
dataset.extend(self._process_single_file(extracted_file_path))
elif extracted_file_path.suffix.lower() in self.archive_extensions:
logger.info(f"Found nested archive '{member.name}', processing recursively.")
dataset.extend(self._process_archive(extracted_file_path, extract_to))
else:
logger.debug(f"Skipping unsupported file in archive: '{member.name}'")
except Exception as e:
logger.warning(f"Error extracting/processing file '{member.name}' from tar '{archive_path.name}': {e}")
except tarfile.TarError as e:
logger.error(f"Error processing TAR archive '{archive_path.name}': {e}")
elif archive_extension == '.gz':
# GZIP archives typically contain a single file. Extract it and process.
extracted_name = archive_path.stem # Get name without .gz
extracted_path = extract_to / extracted_name
try:
with gzip.open(archive_path, 'rb') as gz_file, open(extracted_path, 'wb') as outfile:
outfile.write(gz_file.read())
# Process the extracted file if supported
if extracted_path.suffix.lower() in self.supported_extensions and not self._is_archive(extracted_path):
dataset.extend(self._process_single_file(extracted_path))
elif extracted_path.suffix.lower() in self.archive_extensions:
logger.info(f"Found nested archive '{extracted_name}', processing recursively.")
dataset.extend(self._process_archive(extracted_path, extract_to))
else:
logger.debug(f"Skipping unsupported file (from gz): '{extracted_name}'")
except gzip.GzipFile as e:
logger.error(f"Error processing GZIP file '{archive_path.name}': {e}")
except Exception as e:
logger.error(f"Error extracting/processing from GZIP '{archive_path.name}': {e}")
finally:
if extracted_path.exists(): extracted_path.unlink() # Clean up extracted file
# TODO: Add support for other archive types (.bz2, .7z, .rar)
elif archive_extension in ('.bz2', '.7z', '.rar'):
logger.warning(f"Support for {archive_extension} archives is not yet fully implemented and requires external tools/libraries.")
except Exception as e:
logger.error(f"Overall archive processing error for '{archive_path.name}': {e}")
# Clean up extracted files in temp_dir after processing
# Handled by context manager 'with tempfile.TemporaryDirectory()'
return dataset
def chunk_data(self, data: Union[Dict, List], max_size: int = 2953) -> List[Dict]:
"""Enhanced data chunking with sequence metadata"""
try:
# Convert data to JSON string
# Use separators=(',', ':') to remove unnecessary whitespace for maximum data density in QR code
json_str = json.dumps(data, ensure_ascii=False, separators=(',', ':'))
total_length = len(json_str)
# Calculate overhead for metadata
# Metadata structure: {"idx":0,"tc":1,"tl":XXX,"hash":"YYYY","data":"..."}, shortened keys
metadata_template = {
"idx": 0, # chunk_index
"tc": 1, # total_chunks
"tl": total_length, # total_length
"hash": "", # chunk_hash
"data": "" # chunk_data
}
# Estimate overhead more accurately by dumping a sample metadata structure
# and adding some safety margin. Shortened keys reduce overhead.
overhead_estimate = len(json.dumps(metadata_template, separators=(',', ':'))) + 50 # Extra padding
# Calculate effective chunk size
effective_chunk_size = max_size - overhead_estimate
if effective_chunk_size <= 0:
logger.error(f"Max QR size ({max_size}) is too small for metadata overhead ({overhead_estimate}). Cannot chunk.")
return []
if total_length <= effective_chunk_size:
# Data fits in one chunk
chunk_data = json_str # Use the full string
chunk = {
"idx": 0,
"tc": 1,
"tl": total_length,
"hash": hash(chunk_data) & 0xFFFFFFFF, # 32-bit hash
"data": chunk_data
}
return [chunk]
# Calculate number of chunks needed
num_chunks = -(-total_length // effective_chunk_size) # Ceiling division
# Adjust chunk_size slightly to distribute evenly, maybe not strictly necessary
# chunk_size = -(-total_length // num_chunks) # Use this if perfect distribution is needed
chunks = []
current_pos = 0
for i in range(num_chunks):
# Find the end of the current chunk. Avoid splitting in the middle of escaped characters or surrogate pairs if possible,
# but simple slicing is usually okay for standard text that's already been errors='replace'.
# We'll use basic slicing for simplicity, as the JSON string is just text.
end_pos = min(current_pos + effective_chunk_size, total_length)
# Basic attempt to not break in the middle of a UTF-8 character if slicing bytes,
# but since we are slicing a *decoded string*, this is less of an issue.
# However, slicing in the middle of JSON structure is bad.
# For simplicity and robustness with arbitrary JSON structures, slicing the raw string is the easiest.
chunk_data_str = json_str[current_pos:end_pos]
chunk = {
"idx": i,
"tc": num_chunks,
"tl": total_length,
"hash": hash(chunk_data_str) & 0xFFFFFFFF,
"data": chunk_data_str
}
chunks.append(chunk)
current_pos = end_pos
# Final check: Ensure all data was chunked
if current_pos < total_length:
# This shouldn't happen with correct ceiling division and min()
logger.error(f"Chunking logic error: Only processed {current_pos} of {total_length} characters.")
return [] # Indicate failure
logger.info(f"Chunked data into {num_chunks} chunks for QR codes.")
return chunks
except Exception as e:
logger.error(f"Error chunking data: {e}")
return []
def generate_stylish_qr(data: Union[str, Dict],
filename: str,
size: int = 10,
border: int = 4,
fill_color: str = "#000000",
back_color: str = "#FFFFFF") -> str:
"""Generate a stylish QR code with enhanced visual appeal"""
try:
qr = qrcode.QRCode(
version=None,
error_correction=qrcode.constants.ERROR_CORRECT_M, # Increased error correction
box_size=size,
border=border
)
# Add data to QR code
if isinstance(data, dict):
# Use compact JSON representation
qr.add_data(json.dumps(data, ensure_ascii=False, separators=(',', ':')))
else:
qr.add_data(str(data)) # Ensure it's a string
qr.make(fit=True)
# Create QR code image with custom colors
qr_image = qr.make_image(fill_color=fill_color, back_color=back_color)
# Convert to RGBA for transparency support and potential overlays
qr_image = qr_image.convert('RGBA')
# Optional: Add a small logo or icon in the center (requires design)
# logo = Image.open("logo.png").convert("RGBA")
# logo = logo.resize((logo.width // 4, logo.height // 4)) # Resize logo
# logo_pos = ((qr_image.width - logo.width) // 2, (qr_image.height - logo.height) // 2)
# qr_image.paste(logo, logo_pos, logo)
# Add subtle gradient overlay (optional visual enhancement)
try:
gradient = Image.new('RGBA', qr_image.size, (0, 0, 0, 0))
draw = ImageDraw.Draw(gradient)
# Horizontal gradient for subtle effect
for i in range(qr_image.width):
# Fades from left (alpha=0) to right (max_alpha)
alpha = int(255 * (i/qr_image.width) * 0.05) # e.g., 5% maximum opacity fade-in
draw.line([(i, 0), (i, qr_image.height)], fill=(0, 0, 0, alpha))
# Combine images
final_image = Image.alpha_composite(qr_image, gradient)
except Exception as e:
logger.warning(f"Failed to add gradient overlay to QR code: {e}. Using plain QR.")
final_image = qr_image
# Save the image
output_path = QR_CODES_DIR / filename
final_image.save(output_path, quality=90) # Save with slightly lower quality for smaller file size
return str(output_path)
except Exception as e:
logger.error(f"QR generation error: {e}")
return ""
def generate_qr_codes(data: Union[str, Dict, List], combined: bool = True) -> List[str]:
"""Generate QR codes with enhanced visual appeal and metadata"""
# Assume 'data' here is the list of dictionaries produced by process_inputs
if not isinstance(data, list):
logger.error("generate_qr_codes received data that is not a list.")
return []
try:
file_processor = EnhancedFileProcessor() # Use the enhanced processor for chunking
paths = []
if combined:
# Process combined data
chunks = file_processor.chunk_data(data) # chunk_data works on the list of dicts
if not chunks:
logger.warning("No chunks generated for combined data.")
return []
for i, chunk in enumerate(chunks):
filename = f'combined_qr_{int(time.time())}_{i+1}_of_{len(chunks)}.png'
qr_path = generate_stylish_qr(
data=chunk, # Pass the chunk dictionary
filename=filename,
fill_color="#1a365d", # Deep blue
back_color="#ffffff"
)
if qr_path:
paths.append(qr_path)
else:
logger.warning(f"Failed to generate QR for chunk {i+1}/{len(chunks)}.")
else:
# Process individual items (each dictionary in the list)
if data: # Ensure data is not empty
for idx, item in enumerate(data):
chunks = file_processor.chunk_data(item) # chunk_data works on individual dict
if not chunks:
logger.warning(f"No chunks generated for item {idx+1}.")
continue
for chunk_idx, chunk in enumerate(chunks):
filename = f'item_{idx+1}_chunk_{chunk_idx+1}_of_{len(chunks)}_{int(time.time())}.png'
qr_path = generate_stylish_qr(
data=chunk, # Pass the chunk dictionary
filename=filename,
fill_color="#1a365d", # Deep blue
back_color="#ffffff"
)
if qr_path:
paths.append(qr_path)
else:
logger.warning(f"Failed to generate QR for item {idx+1} chunk {chunk_idx+1}/{len(chunks)}.")
else:
logger.warning("No items in data list to process individually.")
logger.info(f"Generated {len(paths)} QR codes.")
return paths
except Exception as e:
logger.error(f"QR code generation error: {e}")
return []
# Keep the Gradio UI definition and main function as they are,
# as the changes are internal to the processing classes and the
# process_inputs function already handles calling them and getting
# the combined list of results.
def create_modern_interface():
"""Create a modern and visually appealing Gradio interface"""
# Modern CSS styling
css = """
/* Modern color scheme */
:root {
--primary-color: #1a365d;
--secondary-color: #2d3748;
--accent-color: #4299e1;
--background-color: #f7fafc;
--success-color: #48bb78;
--error-color: #f56565;
--warning-color: #ed8936;
}
/* Container styling */
.container {
max-width: 1200px;
margin: auto;
padding: 2rem;
background-color: var(--background-color);
border-radius: 1rem;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
}
/* Component styling */
.input-container {
background-color: white;
padding: 1.5rem;
border-radius: 0.5rem;
border: 1px solid #e2e8f0;
margin-bottom: 1rem;
}
/* Button styling */
.primary-button {
background-color: var(--primary-color);
color: white;
padding: 0.75rem 1.5rem;
border-radius: 0.375rem;
border: none;
cursor: pointer;
transition: all 0.2s;
}
.primary-button:hover {
background-color: var(--accent-color);
transform: translateY(-1px);
}
/* Status messages */
.status {
padding: 1rem;
border-radius: 0.375rem;
margin: 1rem 0;
}
.status.success { background-color: #f0fff4; color: var(--success-color); }
.status.error { background-color: #fff5f5; color: var(--error-color); }
.status.warning { background-color: #fffaf0; color: var(--warning-color); }
/* Gallery styling */
.gallery {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 1rem;
padding: 1rem;
background-color: white;
border-radius: 0.5rem;
border: 1px solid #e2e8f0;
}
.gallery img {
width: 100%;
height: auto;
border-radius: 0.375rem;
transition: transform 0.2s;
}
.gallery img:hover {
transform: scale(1.05);
}
/* QR Code Viewport Styling */
.viewport-container {
display: grid;
gap: 0.5rem;
padding: 1rem;
background-color: white;
border-radius: 0.5rem;
border: 1px solid #e2e8f0;
margin-top: 1rem;
}
.viewport-item {
display: flex;
flex-direction: column;
align-items: center;
}
.viewport-item img {
width: 100%;
height: auto;
border-radius: 0.375rem;
transition: transform 0.2s;
max-width: 150px; /* Adjust as needed */
max-height: 150px; /* Adjust as needed */
}
"""
# Create interface with modern design
with gr.Blocks(css=css, title="Advanced Data Processor & QR Generator") as interface:
interface.head += """
<script>
let enabledStates = [];
function updateEnabledStates(checkbox) {
const index = parseInt(checkbox.dataset.index);
if (checkbox.checked) {
if (!enabledStates.includes(index)) {
enabledStates.push(index);
}
} else {
enabledStates = enabledStates.filter(item => item !== index);
}
// Send the updated state back to Gradio (you might need to adjust the target component)
const enabled_qr_codes_component = document.querySelector('[data-component-type="state"][data-state-name="enabled_qr_codes"]');
if (enabled_qr_codes_component) {
enabled_qr_codes_component.value = JSON.stringify(enabledStates);
enabled_qr_codes_component.dispatchEvent(new Event('input')); // Trigger update
}
console.log("Enabled QR Code Indices:", enabledStates);
// You might want to trigger an update of the viewport here if needed
}
</script>
"""
with gr.Row():
crawl_depth_slider = gr.Slider(
label="Crawl Depth",
minimum=0,
maximum=3,
value=0,
step=1,
interactive=True,
info="Select the maximum depth for crawling links (0-3)."
)
qr_code_paths = gr.State([])
gr.Markdown("""
# 🌐 Advanced Data Processing & QR Code Generator
Transform your data into beautifully designed, sequenced QR codes with our cutting-edge processor.
""")
with gr.Tab("πŸ“ URL Processing"):
url_input = gr.Textbox(
label="Enter URLs (comma or newline separated)",
lines=5,
placeholder="https://example1.com\nhttps://example2.com",
value=""
)
with gr.Tab("πŸ“ File Input"):
file_input = gr.File(
label="Upload Files",
file_types=None, # Accept all file types
file_count="multiple"
)
with gr.Tab("πŸ“‹ JSON Input"):
text_input = gr.TextArea(
label="Direct JSON Input",
lines=15,
placeholder="Paste your JSON data here...",
value=""
)
with gr.Row():
example_btn = gr.Button("πŸ“ Load Example", variant="secondary")
clear_btn = gr.Button("πŸ—‘οΈ Clear", variant="secondary")
with gr.Row():
combine_data = gr.Checkbox(
label="Combine all data into sequence",
value=True,
info="Generate sequential QR codes for combined data"
)
process_btn = gr.Button(
"πŸ”„ Process & Generate QR",
variant="primary"
)
# Output components
output_json = gr.JSON(label="Processed Data")
output_gallery = gr.Gallery(
label="Generated QR Codes",
columns=3,
height=400,
show_label=True
)
output_text = gr.Textbox(
label="Processing Status",
interactive=False
)
with gr.Tab("πŸ–ΌοΈ QR Code Viewport") as viewport_tab:
viewport_output = gr.HTML(label="QR Code Sequence Viewport")
enabled_qr_codes = gr.State([]) # To store the enabled/disabled state
# Load example data
def load_example():
example = {
"type": "product_catalog",
"items": [
{
"id": "123",
"name": "Premium Widget",
"description": "High-quality widget with advanced features",
"price": 299.99,
"category": "electronics",
"tags": ["premium", "featured", "new"]
},
{
"id": "456",
"name": "Basic Widget",
"description": "Reliable widget for everyday use",
"price": 149.99,
"category": "electronics",
"tags": ["basic", "popular"]
}
],
"metadata": {
"timestamp": datetime.now().isoformat(),
"version": "2.0",
"source": "example"
}
}
return json.dumps(example, indent=2)
def clear_input():
return "", None, "" # Clear url, files, text
def update_viewport(paths, enabled_states):
if not paths:
return "<p>No QR codes generated yet.</p>"
num_qr_codes = len(paths)
cols = math.ceil(math.sqrt(num_qr_codes)) # Calculate columns for a roughly square grid
cols = max(1, min(cols, 6)) # Limit max columns for small screens
rows = math.ceil(num_qr_codes / cols)
viewport_html = f'<div class="viewport-container" style="grid-template-columns: repeat({cols}, 1fr);">'.format(cols)
# Initialize enabledStates if it's empty (first load)
if not enabled_states and paths:
enabled_states = list(range(num_qr_codes)) # Enable all by default on first view
for i, path in enumerate(paths):
is_enabled = i in enabled_states
border = "border: 2px solid green;" if is_enabled else "border: 2px solid lightgray;"
opacity = "opacity: 1.0;" if is_enabled else "opacity: 0.5;"
viewport_html += f'<div class="viewport-item" id="qr_item_{i}">'
viewport_html += f'<img src="/file={path}" style="{border} {opacity}" alt="QR Code {i+1}">' # Use /file= for Gradio to serve static files
viewport_html += f'<label><input type="checkbox" data-index="{i}" {"checked" if is_enabled else ""} onchange="updateEnabledStates(this)"> Enable</label>'
viewport_html += '</div>'
viewport_html += '</div>'
return viewport_html
def process_inputs(urls, files, text, combine, *args):
"""Process all inputs and generate QR codes"""
results = []
processing_status_messages = []
url_processor = EnhancedURLProcessor()
file_processor = EnhancedFileProcessor()
try:
# Process JSON input
if text and text.strip():
try:
json_data = json.loads(text)
# Wrap direct JSON input in a dictionary for consistency with file/URL output structure
results.append({
'source': 'json_input',
'extracted_data': json_data,
'timestamp': datetime.now().isoformat(),
'processing_notes': ['Parsed from direct JSON input.']
})
processing_status_messages.append("βœ… Successfully parsed direct JSON input.")
except json.JSONDecodeError as e:
processing_status_messages.append(f"❌ Invalid JSON format in text input: {str(e)}")
except Exception as e:
processing_status_messages.append(f"❌ Error processing direct JSON input: {str(e)}")
# Process URLs
if urls and urls.strip():
url_list = re.split(r'[,\n]', urls)
url_list = [url.strip() for url in url_list if url.strip()]
for url in url_list:
validation = url_processor.validate_url(url)
if validation['is_valid']:
processing_status_messages.append(f"🌐 Fetching URL: {url}...")
content_result = url_processor.fetch_content(url)
if content_result:
results.append(content_result)
processing_status_messages.append(f"βœ… Fetched and processed URL: {url}")
else:
processing_status_messages.append(f"❌ Failed to fetch/process URL: {url}")
if validation['details'].get('final_url'):
processing_status_messages[-1] += f" (Redirected to {validation['details']['final_url']})"
else:
processing_status_messages.append(f"⚠️ Skipping invalid URL: {url} ({validation['message']})")
# Process files
if files:
for file in files:
processing_status_messages.append(f"πŸ“ Processing file: {file.name}...")
file_results = file_processor.process_file(file)
if file_results:
results.extend(file_results)
processing_status_messages.append(f"βœ… Processed file: {file.name}")
else:
processing_status_messages.append(f"❌ Failed to process file: {file.name}")
# Generate QR codes
qr_paths = []
final_json_output = None
if results:
# Use the collected results (list of dicts) for QR code generation
qr_paths = generate_qr_codes(results, combine)
final_json_output = results # Show the structured data in the JSON output box
if qr_paths:
processing_status_messages.append(f"βœ… Successfully generated {len(qr_paths)} QR codes.")
else:
processing_status_messages.append("❌ Failed to generate QR codes.")
else:
processing_status_messages.append("⚠️ No valid content collected from inputs.")
except Exception as e:
logger.error(f"Overall processing error in process_inputs: {e}")
processing_status_messages.append(f"❌ An unexpected error occurred during processing: {str(e)}")
return (
final_json_output,
[str(path) for path in qr_paths], # Gradio Gallery expects list of paths (strings)
"\n".join(processing_status_messages) # Join status messages
)
def on_qr_generation(qr_paths_list):
# When QR codes are generated, update the state with the list of paths
# and initialize the enabled_qr_codes state with all indices enabled
if qr_paths_list is None:
num_qrs=0
else:
else:
num_qrs=len(qr_paths_list)
initial_enabled_states = list(range(num_qrs))
return qr_paths_list, initial_enabled_states # Return paths list and initial enabled state
# Link events
example_btn.click(load_example, inputs=[], outputs=text_input)
clear_btn.click(clear_input, inputs=[], outputs=[url_input, file_input, text_input]) # Clear all inputs
process_btn.click(
process_inputs,
inputs=[url_input, file_input, text_input, combine_data, crawl_depth_slider],
outputs=[output_json, output_gallery, output_text]
).then( # Chain a .then() to update the QR paths state and trigger viewport update
on_qr_generation,
inputs=[output_gallery], # Get the list of paths from the gallery output
outputs=[qr_code_paths, enabled_qr_codes] # Update the state variables
)
# The viewport tab's select event will trigger update_viewport to render the grid
viewport_tab.select(update_viewport, inputs=[qr_code_paths, enabled_qr_codes], outputs=[viewport_output])
# Add helpful documentation
gr.Markdown("""
### πŸš€ Features
- **Enhanced URL Scraping**: Extracts HTML text, title, meta description, links, and attempts parsing JSON/XML from URLs based on content type.
- **Advanced File Processing**: Reads various text-based files (.txt, .md, .log etc.), HTML, XML, CSV, and attempts text extraction from common documents (.pdf, .docx, .rtf, .odt - *requires extra dependencies*).
- **Smart JSON Handling**: Parses valid JSON from direct input, files (.json or content), or URLs.
- **Archive Support**: Extracts and processes supported files from .zip, .tar, .gz archives.
- **Robust Encoding Detection**: Uses `chardet` for reliable character encoding identification.
- **Structured Output**: Provides a consistent JSON output format containing raw content (if applicable), extracted data, and processing notes for each processed item.
- **Sequential QR Codes**: Maintains data integrity across multiple codes by chunking the combined/individual processed data.
- **QR Code Viewport**: Visualize generated QR codes in a sequenced square grid with options to enable/disable individual codes for selective scanning/sharing.
- **Modern Design**: Clean, responsive interface with visual feedback.
### πŸ’‘ Tips
1. **URLs**: Enter multiple URLs separated by commas or newlines. The processor will attempt to fetch and structure the content based on its type.
2. **Files**: Upload any type of file. The processor will attempt to handle supported text-based files, archives (.zip, .tar, .gz), and specific document/structured formats.
3. **JSON**: Use the "Direct JSON Input" tab for pasting JSON data. The system also tries to detect JSON content in file uploads and URLs. Use the "Load Example" button to see a sample JSON structure.
4. **Dependencies**: Processing PDF, DOCX, RTF, and ODT files requires installing optional Python libraries. Check the console logs for warnings if a library is missing.
5. **QR Codes**: Choose whether to "Combine all data into sequence" or generate separate sequences for each input item.
6. **Processing**: Monitor the "Processing Status" box for real-time updates and notes about errors or processing steps.
7. **Output**: The "Processed Data" JSON box shows the structured data extracted from your inputs. The "Generated QR Codes" gallery shows the QR code images.
### 🎨 Output Details
- The "Processed Data" JSON will be a list of dictionaries. Each dictionary represents one processed input (URL or file).
- Each item will have keys like `source`, `filename` (for files), `url` (for URLs), `mime_type`, `raw_content` (if readable), `extracted_data`, and `processing_notes`.
- `extracted_data` will contain the parsed/extracted content, structured according to the input type (e.g., dictionary for JSON, text for documents, list of rows for CSV, dictionary with title/text/links for HTML).
- `processing_notes` will list any issues encountered during extraction.
- Generated QR codes are saved in the `output/qr_codes` directory.
### βš™οΈ QR Code Viewport Instructions
1. Navigate to the **QR Code Viewport** tab after generating QR codes.
2. The generated QR codes will be displayed in a grid based on their total count.
3. Use the checkboxes below each QR code to enable or disable it for visual selection. Enabled codes have a green border and full opacity.
4. This viewport is currently for visualization and selection *within the UI*; it doesn't change the generated files themselves. You would manually select which physical QR codes to scan based on this view.
""")
return interface
def main():
"""Initialize and launch the application"""
try:
# Configure system settings
mimetypes.init()
# Create and launch interface
interface = create_modern_interface()
# Launch with configuration
interface.launch(
share=False,
debug=False, # Set to True for more verbose Gradio logging
show_error=True,
show_api=False
)
except Exception as e:
logger.error(f"Application startup error: {e}")
# Optionally print a user-friendly message before exiting
print(f"\nFatal Error: {e}\nCheck the logs for details.")
raise # Re-raise the exception to ensure the process exits if launch fails
if __name__ == "__main__":
main()