Spaces:
Running
Running
import json | |
import os | |
import re | |
import time | |
import logging | |
import mimetypes | |
import zipfile | |
import tempfile | |
import chardet | |
import io # Needed for processing CSV from string | |
import csv # Needed for CSV | |
import xml.etree.ElementTree as ET # Needed for XML | |
from datetime import datetime | |
from typing import List, Dict, Optional, Union, Tuple, Any # Added Any for extracted_data | |
from pathlib import Path | |
from urllib.parse import urlparse, urljoin | |
import requests | |
import validators | |
import gradio as gr | |
from diskcache import Cache | |
from bs4 import BeautifulSoup | |
from fake_useragent import UserAgent | |
from cleantext import clean | |
import qrcode | |
from PIL import Image, ImageDraw, ImageFont | |
import numpy as np | |
import tarfile | |
import gzip | |
import math | |
# Setup enhanced logging with more detailed formatting | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler('app.log', encoding='utf-8') | |
]) | |
logger = logging.getLogger(__name__) # logger is now defined here | |
# Conditional imports for document processing | |
try: | |
from PyPDF2 import PdfReader | |
PDF_SUPPORT = True | |
except ImportError: | |
PDF_SUPPORT = False | |
logger.warning("PyPDF2 not installed. PDF file processing will be limited.") # logger is available | |
try: | |
from docx import Document | |
DOCX_SUPPORT = True | |
except ImportError: | |
DOCX_SUPPORT = False | |
logger.warning("python-docx not installed. DOCX file processing will be limited.") # logger is available | |
try: | |
from pyth.plugins.rtf15.reader import Rtf15Reader | |
from pyth.plugins.plaintext.writer import PlaintextWriter | |
RTF_SUPPORT = True | |
except ImportError: | |
RTF_SUPPORT = False | |
logger.warning("pyth not installed. RTF file processing will be limited.") | |
try: | |
from odf.opendocument import OpenDocumentText | |
from odf import text as odftext | |
ODT_SUPPORT = True | |
except ImportError: | |
ODT_SUPPORT = False | |
logger.warning("odfpy not installed. ODT file processing will be limited.") | |
# Setup enhanced logging with more detailed formatting | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler('app.log', encoding='utf-8') | |
]) | |
logger = logging.getLogger(__name__) | |
# Ensure output directories exist with modern structure | |
OUTPUTS_DIR = Path('output') | |
QR_CODES_DIR = OUTPUTS_DIR / 'qr_codes' | |
TEMP_DIR = OUTPUTS_DIR / 'temp' | |
for directory in [OUTPUTS_DIR, QR_CODES_DIR, TEMP_DIR]: | |
directory.mkdir(parents=True, exist_ok=True) | |
class EnhancedURLProcessor: | |
"""Advanced URL processing with enhanced content extraction and recursive link following.""" | |
def __init__(self): | |
self.session = requests.Session() | |
self.timeout = 15 # Extended timeout for larger content | |
self.max_retries = 3 | |
self.user_agent = UserAgent() | |
# Enhanced headers for better site compatibility | |
self.session.headers.update({ | |
'User-Agent': self.user_agent.random, | |
'Accept': 'text/html, application/json, application/xml, text/plain, */*', # Request common types | |
'Accept-Language': 'en-US,en;q=0.9', | |
'Accept-Encoding': 'gzip, deflate, br', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1', # May be ignored for non-HTML | |
'Sec-Fetch-Dest': 'document', | |
'Sec-Fetch-Mode': 'navigate', | |
'Sec-Fetch-Site': 'none', | |
'Sec-Fetch-User': '?1', | |
'DNT': '1' | |
}) | |
def validate_url(self, url: str) -> Dict[str, Any]: | |
"""Enhanced URL validation with detailed feedback""" | |
try: | |
if not validators.url(url): | |
return {'is_valid': False, 'message': 'Invalid URL format', 'details': 'URL must begin with http:// or https://'} | |
parsed = urlparse(url) | |
if not all([parsed.scheme, parsed.netloc]): | |
return {'is_valid': False, 'message': 'Incomplete URL', 'details': 'Missing scheme or domain'} | |
# Try HEAD request first to check accessibility | |
try: | |
head_response = self.session.head(url, timeout=5) | |
head_response.raise_for_status() | |
final_url = head_response.url # Capture potential redirects | |
content_type = head_response.headers.get('Content-Type', 'unknown') | |
server = head_response.headers.get('Server', 'unknown') | |
size = head_response.headers.get('Content-Length', 'unknown') | |
except requests.exceptions.RequestException: | |
# If HEAD fails, try GET as some servers don't support HEAD | |
try: | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
final_url = response.url # Capture potential redirects | |
content_type = response.headers.get('Content-Type', 'unknown') | |
server = response.headers.get('Server', 'unknown') | |
size = response.headers.get('Content-Length', 'unknown') # May not be accurate for full content | |
except requests.exceptions.RequestException as get_e: | |
return {'is_valid': False, 'message': f'URL not accessible after HEAD/GET attempts: {str(get_e)}', 'details': str(get_e)} | |
except Exception as get_e: | |
return {'is_valid': False, 'message': f'Unexpected error during GET validation: {str(get_e)}', 'details': str(get_e)} | |
return { | |
'is_valid': True, | |
'message': 'URL is valid and accessible', | |
'details': { | |
'final_url': final_url, | |
'content_type': content_type, | |
'server': server, | |
'size': size | |
} | |
} | |
except Exception as e: | |
return {'is_valid': False, 'message': f'URL validation failed: {str(e)}', 'details': str(e)} | |
def fetch_content(self, url: str, retry_count: int = 0) -> Optional[Dict[str, Any]]: | |
"""Enhanced content fetcher with retry mechanism and complete character extraction""" | |
try: | |
logger.info(f"Fetching content from URL: {url} (Attempt {retry_count + 1}/{self.max_retries})") | |
# Update User-Agent randomly for each request | |
self.session.headers.update({'User-Agent': self.user_agent.random}) | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
final_url = response.url # Capture potential redirects | |
content_type = response.headers.get('Content-Type', '') | |
# Detect encoding | |
if response.encoding is None or response.encoding == 'ISO-8859-1': # chardet often better than default response.encoding for text | |
encoding_detection = chardet.detect(response.content) | |
encoding = encoding_detection['encoding'] or 'utf-8' | |
logger.debug(f"Detected encoding '{encoding}' with confidence {encoding_detection['confidence']:.2f} for {url}") | |
else: | |
encoding = response.encoding | |
logger.debug(f"Using response.encoding '{encoding}' for {url}") | |
# Decode content with fallback | |
try: | |
raw_content = response.content.decode(encoding, errors='replace') | |
except (UnicodeDecodeError, LookupError): | |
# Fallback to a more common encoding if the first attempt fails | |
try: | |
raw_content = response.content.decode('utf-8', errors='replace') | |
encoding = 'utf-8 (fallback)' | |
logger.warning(f"Decoding with {encoding} fallback for {url}") | |
except Exception: | |
raw_content = response.content.decode('latin-1', errors='replace') # Another common fallback | |
encoding = 'latin-1 (fallback)' | |
logger.warning(f"Decoding with {encoding} fallback for {url}") | |
# Extract metadata | |
metadata = { | |
'original_url': url, | |
'final_url': final_url, | |
'timestamp': datetime.now().isoformat(), | |
'detected_encoding': encoding, | |
'content_type': content_type, | |
'content_length': len(response.content), | |
'headers': dict(response.headers), | |
'status_code': response.status_code | |
} | |
# Process based on content type | |
processed_extraction = self._process_web_content(raw_content, metadata['content_type'], final_url) | |
return { | |
'source': 'url', | |
'url': url, # Keep original URL as identifier for this step | |
'raw_content': raw_content, | |
'metadata': metadata, | |
'extracted_data': processed_extraction['data'], | |
'processing_notes': processed_extraction['notes'] | |
} | |
except requests.exceptions.RequestException as e: | |
if retry_count < self.max_retries - 1: | |
logger.warning(f"Retry {retry_count + 1}/{self.max_retries} for URL: {url}") | |
time.sleep(2 ** retry_count) # Exponential backoff | |
return self.fetch_content(url, retry_count + 1) | |
logger.error(f"Failed to fetch content after {self.max_retries} attempts from {url}: {e}") | |
return { | |
'source': 'url', | |
'url': url, | |
'raw_content': None, | |
'metadata': {'original_url': url, 'timestamp': datetime.now().isoformat(), 'status_code': None}, # Include basic metadata on failure | |
'extracted_data': None, | |
'processing_notes': [f"Failed to fetch content after {self.max_retries} attempts: {str(e)}"] # Ensure notes is a list | |
} | |
except Exception as e: | |
logger.error(f"Unexpected error while fetching or processing URL {url}: {e}") | |
return { | |
'source': 'url', | |
'url': url, | |
'raw_content': raw_content if 'raw_content' in locals() else None, | |
'metadata': metadata if 'metadata' in locals() else {'original_url': url, 'timestamp': datetime.now().isoformat(), 'status_code': None}, | |
'extracted_data': None, | |
'processing_notes': [f"Unexpected processing error: {str(e)}"] | |
} | |
def _process_web_content(self, content: str, content_type: str, base_url: str) -> Dict[str, Any]: | |
"""Process content based on detected content type""" | |
lower_content_type = content_type.lower() | |
notes = [] | |
extracted_data: Any = None # Use Any to allow different types | |
try: | |
if 'text/html' in lower_content_type: | |
logger.debug(f"Processing HTML content from {base_url}") | |
extracted_data = self._process_html_content_enhanced(content, base_url) | |
notes.append("Processed as HTML") | |
elif 'application/json' in lower_content_type or 'text/json' in lower_content_type: | |
logger.debug(f"Processing JSON content from {base_url}") | |
try: | |
extracted_data = json.loads(content) | |
notes.append("Parsed as JSON") | |
except json.JSONDecodeError as e: | |
extracted_data = content # Keep raw text if invalid JSON | |
notes.append(f"Failed to parse as JSON: {e}") | |
logger.warning(f"Failed to parse JSON from {base_url}: {e}") | |
except Exception as e: | |
extracted_data = content | |
notes.append(f"Error processing JSON: {e}") | |
logger.error(f"Error processing JSON from {base_url}: {e}") | |
elif 'application/xml' in lower_content_type or 'text/xml' in lower_content_type or lower_content_type.endswith('+xml'): | |
logger.debug(f"Processing XML content from {base_url}") | |
try: | |
# Try parsing XML. Convert to a string representation. | |
root = ET.fromstring(content) | |
xml_text = ET.tostring(root, encoding='unicode', method='xml') | |
extracted_data = xml_text # Store as string for now | |
notes.append("Parsed as XML (text representation)") | |
except ET.ParseError as e: | |
extracted_data = content | |
notes.append(f"Failed to parse as XML: {e}") | |
logger.warning(f"Failed to parse XML from {base_url}: {e}") | |
except Exception as e: | |
extracted_data = content | |
notes.append(f"Error processing XML: {e}") | |
logger.error(f"Error processing XML from {base_url}: {e}") | |
elif 'text/plain' in lower_content_type or 'text/' in lower_content_type: # Catch other text types | |
logger.debug(f"Processing Plain Text content from {base_url}") | |
extracted_data = content | |
notes.append("Processed as Plain Text") | |
else: | |
logger.debug(f"Unknown content type '{content_type}' from {base_url}. Storing raw content.") | |
extracted_data = content # Store raw content for unknown types | |
notes.append(f"Unknown content type '{content_type}'. Stored raw text.") | |
except Exception as e: | |
logger.error(f"Unexpected error in _process_web_content for {base_url} ({content_type}): {e}") | |
extracted_data = content # Fallback to raw content on error | |
notes.append(f"Unexpected processing error: {e}. Stored raw text.") | |
return {'data': extracted_data, 'notes': notes} | |
def _process_html_content_enhanced(self, content: str, base_url: str) -> Dict[str, Any]: | |
"""Process HTML content, preserving text, and extracting metadata and links.""" | |
extracted: Dict[str, Any] = { | |
'title': None, | |
'meta_description': None, # Add extraction for meta description | |
'full_text': "", | |
'links': [] # Add extraction for links | |
} | |
try: | |
soup = BeautifulSoup(content, 'html.parser') | |
# Extract Title | |
if soup.title and soup.title.string: | |
extracted['title'] = soup.title.string.strip() | |
# Extract Meta Description | |
meta_desc = soup.find('meta', attrs={'name': 'description'}) | |
if meta_desc and meta_desc.get('content'): | |
extracted['meta_description'] = meta_desc['content'].strip() | |
# Extract and process links (convert relative to absolute) | |
# Use a set to avoid duplicate URLs in the links list | |
unique_links = set() | |
for a_tag in soup.find_all('a', href=True): | |
href = a_tag['href'].strip() | |
if href and not href.startswith(('#', 'mailto:', 'tel:', 'javascript:')): # Basic filter | |
text = a_tag.get_text().strip() | |
try: | |
absolute_url = urljoin(base_url, href) | |
if absolute_url not in unique_links: | |
extracted['links'].append({'text': text, 'url': absolute_url}) | |
unique_links.add(absolute_url) | |
except Exception: | |
# If urljoin fails, keep the original href if it looks like a valid potential URL part | |
if validators.url(href) and href not in unique_links: | |
extracted['links'].append({'text': text, 'url': href}) | |
unique_links.add(href) | |
elif urlparse(href).netloc and href not in unique_links: # Maybe just a domain/path? | |
extracted['links'].append({'text': text, 'url': href}) | |
unique_links.add(href) | |
# Extract all text content (similar to stripped_strings but ensures order) | |
# Use a more robust way to get visible text, including handling script/style tags | |
soup_copy = BeautifulSoup(content, 'html.parser') # Work on a copy to preserve soup for links | |
for script_or_style in soup_copy(["script", "style"]): | |
script_or_style.extract() # Remove script and style tags | |
text = soup_copy.get_text(separator='\n') # Get text with newlines | |
# Clean up whitespace and empty lines | |
lines = text.splitlines() | |
cleaned_lines = [line.strip() for line in lines if line.strip()] | |
extracted['full_text'] = '\n'.join(cleaned_lines) | |
except Exception as e: | |
logger.error(f"Enhanced HTML processing error for {base_url}: {e}") | |
# Fallback: Store raw text and indicate error | |
soup_copy = BeautifulSoup(content, 'html.parser') | |
for script_or_style in soup_copy(["script", "style"]): | |
script_or_style.extract() | |
extracted['full_text'] = soup_copy.get_text(separator='\n').strip() | |
extracted['processing_error'] = f"Enhanced HTML processing failed: {e}" | |
return extracted | |
def fetch_content_with_depth(self, url: str, max_steps: int = 0) -> Dict[str, Any]: | |
""" | |
Fetches content from a URL and recursively follows links up to a specified depth. | |
Args: | |
url: The initial URL to fetch. | |
max_steps: The maximum number of levels to follow links (0-3). | |
0: Only fetch the initial URL. | |
1: Fetch the initial URL and the links found on that page. | |
2: Fetch the initial URL, its links, and the links on those pages. | |
3: Fetch up to the third level of links. | |
Returns: | |
A dictionary containing the extraction result for the initial URL and | |
nested results for followed links. | |
""" | |
if not isinstance(max_steps, int) or not (0 <= max_steps <= 3): | |
logger.error(f"Invalid max_steps value: {max_steps}. Must be an integer between 0 and 3.") | |
return { | |
'url': url, | |
'level': 0, | |
'fetch_result': None, | |
'linked_extractions': [], | |
'note': f"Invalid max_steps value: {max_steps}. Must be an integer between 0 and 3." | |
} | |
validation_result = self.validate_url(url) | |
if not validation_result['is_valid']: | |
logger.error(f"Initial URL validation failed for {url}: {validation_result['message']}") | |
return { | |
'url': url, | |
'level': 0, | |
'fetch_result': None, | |
'linked_extractions': [], | |
'note': f"Initial URL validation failed: {validation_result['message']}" | |
} | |
return self._fetch_content_recursive(url, max_steps, current_step=0) | |
def _fetch_content_recursive(self, url: str, max_steps: int, current_step: int) -> Dict[str, Any]: | |
"""Recursive helper to fetch content and follow links.""" | |
if current_step > max_steps: | |
logger.debug(f"Depth limit reached for {url} at level {current_step}.") | |
return { | |
'url': url, | |
'level': current_step, | |
'fetch_result': None, # Indicate no fetch happened at this level | |
'linked_extractions': [], | |
'note': f"Depth limit ({max_steps}) reached." | |
} | |
logger.info(f"Processing URL: {url} at level {current_step}/{max_steps}") | |
# Fetch content for the current URL | |
fetch_result = self.fetch_content(url) | |
linked_extractions: List[Dict[str, Any]] = [] | |
# Only follow links if fetch was successful, content is HTML, and within depth limit | |
if fetch_result and fetch_result.get('extracted_data') and 'text/html' in fetch_result.get('metadata', {}).get('content_type', '').lower(): | |
extracted_data = fetch_result['extracted_data'] | |
links = extracted_data.get('links', []) # Ensure links is a list even if missing | |
logger.info(f"Found {len(links)} links on {url} at level {current_step}. Proceeding to depth {current_step + 1}.") | |
# Recursively fetch linked content if not at max depth | |
if current_step < max_steps: | |
for link_info in links: | |
linked_url = link_info.get('url') | |
if linked_url: | |
# Simple check to avoid re-fetching the same URL repeatedly in a chain | |
# More sophisticated cycle detection might be needed for complex graphs | |
if linked_url != urlparse(url)._replace(fragment='').geturl(): # Avoid self-referencing links ignoring fragment | |
# Recursively call for the linked URL | |
linked_result = self._fetch_content_recursive(linked_url, max_steps, current_step + 1) | |
linked_extractions.append(linked_result) | |
else: | |
logger.debug(f"Skipping self-referencing link: {linked_url}") | |
linked_extractions.append({ | |
'url': linked_url, | |
'level': current_step + 1, | |
'fetch_result': None, | |
'linked_extractions': [], | |
'note': 'Skipped self-referencing link' | |
}) | |
else: | |
linked_extractions.append({ | |
'url': 'Invalid or missing link', | |
'level': current_step + 1, | |
'fetch_result': None, | |
'linked_extractions': [], | |
'note': 'Link URL not found or invalid' | |
}) | |
else: | |
logger.info(f"Max depth ({max_steps}) reached. Not following links from {url}.") | |
return { | |
'url': url, | |
'level': current_step, | |
'fetch_result': fetch_result, | |
'linked_extractions': linked_extractions, | |
'note': f"Processed at level {current_step}" | |
} | |
# --- Example Usage --- | |
if __name__ == "__main__": | |
processor = EnhancedURLProcessor() | |
# --- Test Cases --- | |
# Test with 0 steps (only initial URL) | |
print("\n--- Testing with max_steps = 0 ---") | |
result_0 = processor.fetch_content_with_depth("https://httpbin.org/html", max_steps=0) | |
# print(json.dumps(result_0, indent=2)) # Uncomment to see full structure | |
print(f"Initial URL ({result_0['url']}) fetched at level {result_0['level']}. Success: {result_0['fetch_result'] is not None}") | |
print(f"Number of linked extractions: {len(result_0['linked_extractions'])}") # Should be 0 | |
# Test with 1 step (initial URL + its direct links) | |
# Note: Replace with a real website URL that has internal links for meaningful testing | |
# For demonstration, using a placeholder. A real site like a blog post or news article front page is better. | |
test_url_with_links = "https://quotes.toscrape.com/" # Example site with links | |
print(f"\n--- Testing with max_steps = 1 for {test_url_with_links} ---") | |
result_1 = processor.fetch_content_with_depth(test_url_with_links, max_steps=1) | |
# print(json.dumps(result_1, indent=2)) # Uncomment to see full structure | |
print(f"Initial URL ({result_1['url']}) fetched at level {result_1['level']}. Success: {result_1['fetch_result'] is not None}") | |
print(f"Number of direct links found and processed: {len(result_1['linked_extractions'])}") | |
if result_1['linked_extractions']: | |
print(f"First linked URL processed at level 1: {result_1['linked_extractions'][0]['url']}") | |
print(f"Number of links found on the first linked page: {len(result_1['linked_extractions'][0]['linked_extractions'])}") # Should be 0 for max_steps=1 | |
# Test with 2 steps | |
print(f"\n--- Testing with max_steps = 2 for {test_url_with_links} ---") | |
result_2 = processor.fetch_content_with_depth(test_url_with_links, max_steps=2) | |
# print(json.dumps(result_2, indent=2)) # Uncomment to see full structure | |
print(f"Initial URL ({result_2['url']}) fetched at level {result_2['level']}. Success: {result_2['fetch_result'] is not None}") | |
print(f"Number of direct links found and processed (Level 1): {len(result_2['linked_extractions'])}") | |
if result_2['linked_extractions']: | |
print(f"First linked URL processed at level 1: {result_2['linked_extractions'][0]['url']}") | |
print(f"Number of links found on the first linked page and processed (Level 2): {len(result_2['linked_extractions'][0]['linked_extractions'])}") | |
if result_2['linked_extractions'][0]['linked_extractions']: | |
print(f"First level 2 linked URL: {result_2['linked_extractions'][0]['linked_extractions'][0]['url']}") | |
print(f"Number of links found on the first level 2 page: {len(result_2['linked_extractions'][0]['linked_extractions'][0]['linked_extractions'])}") # Should be 0 for max_steps=2 | |
# Test with max_steps = 3 (will go one level deeper than 2) | |
# print(f"\n--- Testing with max_steps = 3 for {test_url_with_links} ---") | |
# result_3 = processor.fetch_content_with_depth(test_url_with_links, max_steps=3) | |
# print(json.dumps(result_3, indent=2)) # Uncomment to see full structure | |
# Add similar print statements for result_3 to show levels 1, 2, and 3 counts | |
# Test with invalid max_steps | |
print("\n--- Testing with invalid max_steps = 4 ---") | |
result_invalid = processor.fetch_content_with_depth("https://example.com", max_steps=4) | |
print(f"Result for invalid steps: {result_invalid.get('note')}") | |
# Test with invalid initial URL | |
print("\n--- Testing with invalid initial URL ---") | |
result_invalid_url = processor.fetch_content_with_depth("invalid-url", max_steps=1) | |
print(f"Result for invalid initial URL: {result_invalid_url.get('note')}") | |
# Test with a URL that might fail to fetch | |
print("\n--- Testing with a potentially failing URL ---") | |
# Use a non-existent subdomain or a port that's unlikely to be open | |
failing_url = "http://this-domain-does-not-exist-12345.com/" | |
result_fail = processor.fetch_content_with_depth(failing_url, max_steps=1) | |
print(f"Result for failing URL: {result_fail.get('note')}") | |
if result_fail.get('fetch_result'): | |
print(f"Fetch result notes for failing URL: {result_fail['fetch_result'].get('processing_notes')}") | |
class EnhancedFileProcessor: | |
"""Advanced file processing with enhanced content extraction""" | |
def __init__(self, max_file_size: int = 5 * 1024 * 1024 * 1024): # 5GB default | |
self.max_file_size = max_file_size | |
# Expanded supported extensions to include common docs and structured formats | |
self.supported_extensions = { | |
'.txt', '.md', '.csv', '.json', '.xml', '.html', '.htm', | |
'.log', '.yml', '.yaml', '.ini', '.conf', '.cfg', | |
'.pdf', '.doc', '.docx', '.rtf', '.odt', | |
# Archives are handled separately but listed for context | |
'.zip', '.tar', '.gz', '.bz2', '.7z', '.rar', | |
} | |
self.archive_extensions = {'.zip', '.tar', '.gz', '.bz2', '.7z', '.rar'} | |
def process_file(self, file) -> List[Dict]: | |
"""Process uploaded file with enhanced error handling and complete extraction""" | |
if not file or not hasattr(file, 'name'): | |
logger.warning("Received invalid file object.") | |
return [] | |
dataset = [] | |
file_path = Path(file.name) # Use Path object for easier handling | |
try: | |
file_size = file_path.stat().st_size | |
if file_size > self.max_file_size: | |
logger.warning(f"File '{file_path.name}' size ({file_size} bytes) exceeds maximum allowed size ({self.max_file_size} bytes).") | |
return [{ | |
'source': 'file', | |
'filename': file_path.name, | |
'file_size': file_size, | |
'extracted_data': None, | |
'processing_notes': 'File size exceeds limit.' | |
}] | |
with tempfile.TemporaryDirectory() as temp_dir: | |
temp_dir_path = Path(temp_dir) | |
# Decide processing strategy | |
if file_path.suffix.lower() in self.archive_extensions: | |
dataset.extend(self._process_archive(file_path, temp_dir_path)) | |
elif file_path.suffix.lower() in self.supported_extensions: | |
# Pass the path to the single file processor | |
dataset.extend(self._process_single_file(file_path)) | |
else: | |
logger.warning(f"Unsupported file type for processing: '{file_path.name}'") | |
# Optionally process as raw text even if extension is unsupported | |
try: | |
# Read as text with error replacement | |
content_bytes = file_path.read_bytes() | |
encoding_detection = chardet.detect(content_bytes) | |
encoding = encoding_detection['encoding'] or 'utf-8' | |
raw_content = content_bytes.decode(encoding, errors='replace') | |
dataset.append({ | |
'source': 'file', | |
'filename': file_path.name, | |
'file_size': file_size, | |
'mime_type': mimetypes.guess_type(file_path.name)[0] or 'unknown/unknown', | |
'extracted_data': {'plain_text': raw_content}, # Store raw text under a key | |
'processing_notes': 'Processed as plain text (unsupported extension).' | |
}) | |
except Exception as e: | |
logger.error(f"Error reading or processing unsupported file '{file_path.name}' as text: {e}") | |
dataset.append({ | |
'source': 'file', | |
'filename': file_path.name, | |
'file_size': file_size, | |
'mime_type': mimetypes.guess_type(file_path.name)[0] or 'unknown/unknown', | |
'extracted_data': None, | |
'processing_notes': f'Unsupported file type and failed to read as text: {e}' | |
}) | |
except Exception as e: | |
logger.error(f"Error processing file '{file_path.name}': {str(e)}") | |
dataset.append({ | |
'source': 'file', | |
'filename': file_path.name, | |
'file_size': file_size if 'file_size' in locals() else None, | |
'extracted_data': None, | |
'processing_notes': f'Overall file processing error: {str(e)}' | |
}) | |
return dataset | |
def _is_archive(self, filepath: Union[str, Path]) -> bool: | |
"""Check if file is an archive""" | |
p = Path(filepath) if isinstance(filepath, str) else filepath | |
return p.suffix.lower() in self.archive_extensions | |
def _process_single_file(self, file_path: Path) -> List[Dict]: | |
"""Process a single file with enhanced character extraction and format-specific handling""" | |
dataset_entries = [] | |
filename = file_path.name | |
file_size = file_path.stat().st_size | |
mime_type, _ = mimetypes.guess_type(file_path) | |
mime_type = mime_type or 'unknown/unknown' | |
file_extension = file_path.suffix.lower() | |
logger.info(f"Processing single file: '{filename}' ({mime_type}, {file_size} bytes)") | |
raw_content: Optional[str] = None | |
extracted_data: Any = None | |
processing_notes = [] | |
try: | |
# Read content efficiently | |
content_bytes = file_path.read_bytes() | |
encoding_detection = chardet.detect(content_bytes) | |
encoding = encoding_detection['encoding'] or 'utf-8' | |
raw_content = content_bytes.decode(encoding, errors='replace') | |
# --- Attempt format-specific parsing --- | |
# 1. Attempt JSON parsing (explicit .json or application/json, OR if content looks like JSON) | |
is_explicit_json = mime_type == 'application/json' or file_extension == '.json' | |
looks_like_json = raw_content.strip().startswith('{') or raw_content.strip().startswith('[') | |
if is_explicit_json or looks_like_json: | |
try: | |
extracted_data = json.loads(raw_content) | |
processing_notes.append("Parsed as JSON.") | |
if not is_explicit_json: | |
processing_notes.append("Note: Content looked like JSON despite extension/mime.") | |
logger.warning(f"File '{filename}' identified as JSON content despite extension/mime.") | |
mime_type = 'application/json' # Update mime_type if successfully parsed as JSON | |
except json.JSONDecodeError as e: | |
processing_notes.append(f"Failed to parse as JSON: {e}.") | |
if is_explicit_json: | |
logger.error(f"Explicit JSON file '{filename}' has invalid format: {e}") | |
else: | |
logger.warning(f"Content of '{filename}' looks like JSON but failed to parse: {e}") | |
except Exception as e: | |
processing_notes.append(f"Error processing JSON: {e}.") | |
logger.error(f"Error processing JSON in '{filename}': {e}") | |
# 2. Attempt XML parsing (if not already parsed as JSON, and looks like XML) | |
# Add check if extracted_data is still None (meaning JSON parsing failed or wasn't attempted/relevant) | |
looks_like_xml = extracted_data is None and raw_content.strip().startswith('<') and raw_content.strip().endswith('>') # Simple heuristic | |
is_explicit_xml = extracted_data is None and (mime_type in ('application/xml', 'text/xml') or mime_type.endswith('+xml') or file_extension in ('.xml', '.xsd')) | |
if extracted_data is None and (is_explicit_xml or looks_like_xml): | |
try: | |
root = ET.fromstring(raw_content) | |
# Convert XML element tree to a structured dictionary or string | |
# Simple string representation for QR code suitability | |
extracted_data = ET.tostring(root, encoding='unicode', method='xml') | |
processing_notes.append("Parsed as XML (text representation).") | |
if not is_explicit_xml: | |
processing_notes.append("Note: Content looked like XML despite extension/mime.") | |
# Update mime_type if successfully parsed as XML | |
if 'xml' not in mime_type: mime_type = 'application/xml' | |
except ET.ParseError as e: | |
processing_notes.append(f"Failed to parse as XML: {e}.") | |
if is_explicit_xml: | |
logger.error(f"Explicit XML file '{filename}' has invalid format: {e}") | |
else: | |
logger.warning(f"Content of '{filename}' looks like XML but failed to parse: {e}") | |
except Exception as e: | |
processing_notes.append(f"Error processing XML: {e}.") | |
logger.error(f"Error processing XML in '{filename}': {e}") | |
# 3. Attempt CSV parsing (if not already parsed, and looks like CSV or is explicit CSV) | |
is_explicit_csv = extracted_data is None and (mime_type == 'text/csv' or file_extension == '.csv') | |
# Heuristic: check for commas/semicolons and multiple lines | |
looks_like_csv = extracted_data is None and (',' in raw_content or ';' in raw_content) and ('\n' in raw_content or len(raw_content.splitlines()) > 1) | |
if extracted_data is None and (is_explicit_csv or looks_like_csv): | |
try: | |
# Use Sniffer to guess dialect for better compatibility | |
dialect = 'excel' # Default dialect | |
try: | |
# Look at first few lines to guess dialect | |
sample = '\n'.join(raw_content.splitlines()[:10]) | |
if sample: | |
dialect = csv.Sniffer().sniff(sample).name | |
logger.debug(f"Sniffer detected CSV dialect: {dialect} for '{filename}'") | |
except csv.Error: | |
logger.debug(f"Sniffer failed to detect dialect for '{filename}', using 'excel'.") | |
dialect = 'excel' # Fallback | |
# Read using the guessed or default dialect | |
csv_reader = csv.reader(io.StringIO(raw_content), dialect=dialect) | |
rows = list(csv_reader) | |
if rows: | |
# Limit the number of rows included for potentially huge CSVs | |
max_rows_preview = 100 | |
extracted_data = { | |
'headers': rows[0] if rows[0] else None, # Assume first row is header | |
'rows': rows[1:max_rows_preview+1] # Get up to max_rows_preview data rows | |
} | |
if len(rows) > max_rows_preview + 1: | |
processing_notes.append(f"CSV truncated to {max_rows_preview} data rows.") | |
processing_notes.append("Parsed as CSV.") | |
if not is_explicit_csv: | |
processing_notes.append("Note: Content looked like CSV despite extension/mime.") | |
mime_type = 'text/csv' # Update mime_type | |
else: | |
extracted_data = "Empty CSV" | |
processing_notes.append("Parsed as empty CSV.") | |
if not is_explicit_csv: | |
processing_notes.append("Note: Content looked like CSV but was empty.") | |
except Exception as e: | |
processing_notes.append(f"Failed to parse as CSV: {e}.") | |
logger.warning(f"Failed to parse CSV from '{filename}': {e}") | |
# 4. Attempt Document Text Extraction (if not already parsed) | |
if extracted_data is None: | |
try: | |
extracted_text = None | |
if file_extension == '.pdf' and PDF_SUPPORT: | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: | |
tmp_file.write(content_bytes) # Write bytes to temp file | |
temp_path = Path(tmp_file.name) | |
try: | |
reader = PdfReader(temp_path) | |
text_content = "".join(page.extract_text() or "" for page in reader.pages) | |
extracted_text = text_content | |
processing_notes.append("Extracted text from PDF.") | |
finally: | |
temp_path.unlink() # Clean up temp file | |
elif file_extension == '.docx' and DOCX_SUPPORT: | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.docx') as tmp_file: | |
tmp_file.write(content_bytes) # Write bytes to temp file | |
temp_path = Path(tmp_file.name) | |
try: | |
document = Document(temp_path) | |
text_content = "\n".join(paragraph.text for paragraph in document.paragraphs) | |
extracted_text = text_content | |
processing_notes.append("Extracted text from DOCX.") | |
finally: | |
temp_path.unlink() # Clean up temp file | |
elif file_extension == '.rtf' and RTF_SUPPORT: | |
# pyth can read directly from file-like object or string | |
try: | |
doc = Rtf15Reader.read(io.StringIO(raw_content)) | |
text_content = PlaintextWriter.write(doc).getvalue() | |
extracted_text = text_content | |
processing_notes.append("Extracted text from RTF.") | |
except Exception as e: | |
processing_notes.append(f"RTF extraction error: {e}") | |
logger.warning(f"Failed to extract RTF text from '{filename}': {e}") | |
elif file_extension == '.odt' and ODT_SUPPORT: | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.odt') as tmp_file: | |
tmp_file.write(content_bytes) # Write bytes to temp file | |
temp_path = Path(tmp_file.name) | |
try: | |
text_doc = OpenDocumentText(temp_path) | |
paragraphs = text_doc.getElementsByType(odftext.P) | |
text_content = "\n".join("".join(node.text for node in p.childNodes) for p in paragraphs) | |
extracted_text = text_content | |
processing_notes.append("Extracted text from ODT.") | |
finally: | |
temp_path.unlink() # Clean up temp file | |
elif file_extension in ['.doc', '.ppt', '.pptx', '.xls', '.xlsx']: | |
# These require more complex or platform-specific libraries (e.g. antiword, pandoc, COM objects on Windows) | |
processing_notes.append(f"Automatic text extraction for {file_extension.upper()} not fully implemented.") | |
logger.warning(f"Automatic text extraction for {file_extension.upper()} not fully implemented for '{filename}'.") | |
if extracted_text is not None: | |
# Limit extracted text size | |
max_extracted_text_size = 10000 # Limit text preview | |
extracted_data = {'text': extracted_text[:max_extracted_text_size]} | |
if len(extracted_text) > max_extracted_text_size: | |
extracted_data['text'] += "..." | |
processing_notes.append("Extracted text truncated.") | |
except ImportError as e: | |
processing_notes.append(f"Missing dependency for document type ({e}). Cannot extract text.") | |
except Exception as e: | |
processing_notes.append(f"Error during document text extraction: {e}") | |
logger.warning(f"Error during document text extraction for '{filename}': {e}") | |
# 5. Fallback to Plain Text (if no specific extraction succeeded) | |
if extracted_data is None: | |
extracted_data = {'plain_text': raw_content} | |
processing_notes.append("Stored as plain text.") | |
# Re-guess mime type if it was something specific like application/octet-stream and we just got text | |
if mime_type in ['unknown/unknown', 'application/octet-stream']: | |
guessed_text_mime, _ = mimetypes.guess_type('dummy.txt') # Use a dummy file name to guess plain text | |
if guessed_text_mime: mime_type = guessed_text_mime | |
except Exception as e: | |
# Catch errors during initial read or other unexpected issues | |
logger.error(f"Fatal error processing single file '{filename}': {e}") | |
processing_notes.append(f"Fatal processing error: {e}") | |
raw_content = None # Ensure raw_content is None if reading failed | |
extracted_data = None | |
# Add file info to the entry | |
entry = { | |
'source': 'file', | |
'filename': filename, | |
'file_size': file_size, | |
'mime_type': mime_type, | |
'created': datetime.fromtimestamp(file_path.stat().st_ctime).isoformat() if file_path.exists() else None, | |
'modified': datetime.fromtimestamp(file_path.stat().st_mtime).isoformat() if file_path.exists() else None, | |
'raw_content': raw_content, # Always include raw content if readable | |
'extracted_data': extracted_data, # Include the structured/extracted data | |
'processing_notes': processing_notes # Include any notes/errors encountered | |
} | |
dataset_entries.append(entry) | |
return dataset_entries | |
def _process_archive(self, archive_path: Path, extract_to: Path) -> List[Dict]: | |
"""Process an archive file with enhanced extraction""" | |
dataset = [] | |
archive_extension = archive_path.suffix.lower() | |
logger.info(f"Processing archive: '{archive_path.name}'") | |
try: | |
if archive_extension == '.zip': | |
if zipfile.is_zipfile(archive_path): | |
with zipfile.ZipFile(archive_path, 'r') as zip_ref: | |
for file_info in zip_ref.infolist(): | |
if file_info.file_size > 0 and not file_info.filename.endswith('/'): | |
try: | |
zip_ref.extract(file_info, path=extract_to) | |
extracted_file_path = extract_to / file_info.filename | |
# Recursively process the extracted file if it's supported and not an archive itself | |
if extracted_file_path.suffix.lower() in self.supported_extensions and not self._is_archive(extracted_file_path): | |
dataset.extend(self._process_single_file(extracted_file_path)) | |
elif extracted_file_path.suffix.lower() in self.archive_extensions: | |
# Recursively process nested archives (careful with depth!) | |
logger.info(f"Found nested archive '{file_info.filename}', processing recursively.") | |
dataset.extend(self._process_archive(extracted_file_path, extract_to)) | |
else: | |
logger.debug(f"Skipping unsupported file in archive: '{file_info.filename}'") | |
except Exception as e: | |
logger.warning(f"Error extracting/processing file '{file_info.filename}' from zip '{archive_path.name}': {e}") | |
else: | |
logger.error(f"'{archive_path.name}' is not a valid zip file.") | |
elif archive_extension in ('.tar', '.gz', '.tgz'): | |
try: | |
# Determine mode: 'r' for tar, 'r:gz' for tar.gz, 'r:bz2' for tar.bz2 (bz2 not fully supported yet) | |
mode = 'r' | |
if archive_extension in ('.tar.gz', '.tgz'): mode = 'r:gz' | |
# elif archive_extension == '.tar.bz2': mode = 'r:bz2' # Needs bz2 support | |
# Note: 'r:*' attempts to guess compression, safer to be explicit | |
with tarfile.open(archive_path, mode) as tar_ref: | |
for member in tar_ref.getmembers(): | |
if member.isfile(): | |
try: | |
tar_ref.extract(member, path=extract_to) | |
extracted_file_path = extract_to / member.name | |
# Recursively process extracted file | |
if extracted_file_path.suffix.lower() in self.supported_extensions and not self._is_archive(extracted_file_path): | |
dataset.extend(self._process_single_file(extracted_file_path)) | |
elif extracted_file_path.suffix.lower() in self.archive_extensions: | |
logger.info(f"Found nested archive '{member.name}', processing recursively.") | |
dataset.extend(self._process_archive(extracted_file_path, extract_to)) | |
else: | |
logger.debug(f"Skipping unsupported file in archive: '{member.name}'") | |
except Exception as e: | |
logger.warning(f"Error extracting/processing file '{member.name}' from tar '{archive_path.name}': {e}") | |
except tarfile.TarError as e: | |
logger.error(f"Error processing TAR archive '{archive_path.name}': {e}") | |
elif archive_extension == '.gz': | |
# GZIP archives typically contain a single file. Extract it and process. | |
extracted_name = archive_path.stem # Get name without .gz | |
extracted_path = extract_to / extracted_name | |
try: | |
with gzip.open(archive_path, 'rb') as gz_file, open(extracted_path, 'wb') as outfile: | |
outfile.write(gz_file.read()) | |
# Process the extracted file if supported | |
if extracted_path.suffix.lower() in self.supported_extensions and not self._is_archive(extracted_path): | |
dataset.extend(self._process_single_file(extracted_path)) | |
elif extracted_path.suffix.lower() in self.archive_extensions: | |
logger.info(f"Found nested archive '{extracted_name}', processing recursively.") | |
dataset.extend(self._process_archive(extracted_path, extract_to)) | |
else: | |
logger.debug(f"Skipping unsupported file (from gz): '{extracted_name}'") | |
except gzip.GzipFile as e: | |
logger.error(f"Error processing GZIP file '{archive_path.name}': {e}") | |
except Exception as e: | |
logger.error(f"Error extracting/processing from GZIP '{archive_path.name}': {e}") | |
finally: | |
if extracted_path.exists(): extracted_path.unlink() # Clean up extracted file | |
# TODO: Add support for other archive types (.bz2, .7z, .rar) | |
elif archive_extension in ('.bz2', '.7z', '.rar'): | |
logger.warning(f"Support for {archive_extension} archives is not yet fully implemented and requires external tools/libraries.") | |
except Exception as e: | |
logger.error(f"Overall archive processing error for '{archive_path.name}': {e}") | |
# Clean up extracted files in temp_dir after processing | |
# Handled by context manager 'with tempfile.TemporaryDirectory()' | |
return dataset | |
def chunk_data(self, data: Union[Dict, List], max_size: int = 2953) -> List[Dict]: | |
"""Enhanced data chunking with sequence metadata""" | |
try: | |
# Convert data to JSON string | |
# Use separators=(',', ':') to remove unnecessary whitespace for maximum data density in QR code | |
json_str = json.dumps(data, ensure_ascii=False, separators=(',', ':')) | |
total_length = len(json_str) | |
# Calculate overhead for metadata | |
# Metadata structure: {"idx":0,"tc":1,"tl":XXX,"hash":"YYYY","data":"..."}, shortened keys | |
metadata_template = { | |
"idx": 0, # chunk_index | |
"tc": 1, # total_chunks | |
"tl": total_length, # total_length | |
"hash": "", # chunk_hash | |
"data": "" # chunk_data | |
} | |
# Estimate overhead more accurately by dumping a sample metadata structure | |
# and adding some safety margin. Shortened keys reduce overhead. | |
overhead_estimate = len(json.dumps(metadata_template, separators=(',', ':'))) + 50 # Extra padding | |
# Calculate effective chunk size | |
effective_chunk_size = max_size - overhead_estimate | |
if effective_chunk_size <= 0: | |
logger.error(f"Max QR size ({max_size}) is too small for metadata overhead ({overhead_estimate}). Cannot chunk.") | |
return [] | |
if total_length <= effective_chunk_size: | |
# Data fits in one chunk | |
chunk_data = json_str # Use the full string | |
chunk = { | |
"idx": 0, | |
"tc": 1, | |
"tl": total_length, | |
"hash": hash(chunk_data) & 0xFFFFFFFF, # 32-bit hash | |
"data": chunk_data | |
} | |
return [chunk] | |
# Calculate number of chunks needed | |
num_chunks = -(-total_length // effective_chunk_size) # Ceiling division | |
# Adjust chunk_size slightly to distribute evenly, maybe not strictly necessary | |
# chunk_size = -(-total_length // num_chunks) # Use this if perfect distribution is needed | |
chunks = [] | |
current_pos = 0 | |
for i in range(num_chunks): | |
# Find the end of the current chunk. Avoid splitting in the middle of escaped characters or surrogate pairs if possible, | |
# but simple slicing is usually okay for standard text that's already been errors='replace'. | |
# We'll use basic slicing for simplicity, as the JSON string is just text. | |
end_pos = min(current_pos + effective_chunk_size, total_length) | |
# Basic attempt to not break in the middle of a UTF-8 character if slicing bytes, | |
# but since we are slicing a *decoded string*, this is less of an issue. | |
# However, slicing in the middle of JSON structure is bad. | |
# For simplicity and robustness with arbitrary JSON structures, slicing the raw string is the easiest. | |
chunk_data_str = json_str[current_pos:end_pos] | |
chunk = { | |
"idx": i, | |
"tc": num_chunks, | |
"tl": total_length, | |
"hash": hash(chunk_data_str) & 0xFFFFFFFF, | |
"data": chunk_data_str | |
} | |
chunks.append(chunk) | |
current_pos = end_pos | |
# Final check: Ensure all data was chunked | |
if current_pos < total_length: | |
# This shouldn't happen with correct ceiling division and min() | |
logger.error(f"Chunking logic error: Only processed {current_pos} of {total_length} characters.") | |
return [] # Indicate failure | |
logger.info(f"Chunked data into {num_chunks} chunks for QR codes.") | |
return chunks | |
except Exception as e: | |
logger.error(f"Error chunking data: {e}") | |
return [] | |
def generate_stylish_qr(data: Union[str, Dict], | |
filename: str, | |
size: int = 10, | |
border: int = 4, | |
fill_color: str = "#000000", | |
back_color: str = "#FFFFFF") -> str: | |
"""Generate a stylish QR code with enhanced visual appeal""" | |
try: | |
qr = qrcode.QRCode( | |
version=None, | |
error_correction=qrcode.constants.ERROR_CORRECT_M, # Increased error correction | |
box_size=size, | |
border=border | |
) | |
# Add data to QR code | |
if isinstance(data, dict): | |
# Use compact JSON representation | |
qr.add_data(json.dumps(data, ensure_ascii=False, separators=(',', ':'))) | |
else: | |
qr.add_data(str(data)) # Ensure it's a string | |
qr.make(fit=True) | |
# Create QR code image with custom colors | |
qr_image = qr.make_image(fill_color=fill_color, back_color=back_color) | |
# Convert to RGBA for transparency support and potential overlays | |
qr_image = qr_image.convert('RGBA') | |
# Optional: Add a small logo or icon in the center (requires design) | |
# logo = Image.open("logo.png").convert("RGBA") | |
# logo = logo.resize((logo.width // 4, logo.height // 4)) # Resize logo | |
# logo_pos = ((qr_image.width - logo.width) // 2, (qr_image.height - logo.height) // 2) | |
# qr_image.paste(logo, logo_pos, logo) | |
# Add subtle gradient overlay (optional visual enhancement) | |
try: | |
gradient = Image.new('RGBA', qr_image.size, (0, 0, 0, 0)) | |
draw = ImageDraw.Draw(gradient) | |
# Horizontal gradient for subtle effect | |
for i in range(qr_image.width): | |
# Fades from left (alpha=0) to right (max_alpha) | |
alpha = int(255 * (i/qr_image.width) * 0.05) # e.g., 5% maximum opacity fade-in | |
draw.line([(i, 0), (i, qr_image.height)], fill=(0, 0, 0, alpha)) | |
# Combine images | |
final_image = Image.alpha_composite(qr_image, gradient) | |
except Exception as e: | |
logger.warning(f"Failed to add gradient overlay to QR code: {e}. Using plain QR.") | |
final_image = qr_image | |
# Save the image | |
output_path = QR_CODES_DIR / filename | |
final_image.save(output_path, quality=90) # Save with slightly lower quality for smaller file size | |
return str(output_path) | |
except Exception as e: | |
logger.error(f"QR generation error: {e}") | |
return "" | |
def generate_qr_codes(data: Union[str, Dict, List], combined: bool = True) -> List[str]: | |
"""Generate QR codes with enhanced visual appeal and metadata""" | |
# Assume 'data' here is the list of dictionaries produced by process_inputs | |
if not isinstance(data, list): | |
logger.error("generate_qr_codes received data that is not a list.") | |
return [] | |
try: | |
file_processor = EnhancedFileProcessor() # Use the enhanced processor for chunking | |
paths = [] | |
if combined: | |
# Process combined data | |
chunks = file_processor.chunk_data(data) # chunk_data works on the list of dicts | |
if not chunks: | |
logger.warning("No chunks generated for combined data.") | |
return [] | |
for i, chunk in enumerate(chunks): | |
filename = f'combined_qr_{int(time.time())}_{i+1}_of_{len(chunks)}.png' | |
qr_path = generate_stylish_qr( | |
data=chunk, # Pass the chunk dictionary | |
filename=filename, | |
fill_color="#1a365d", # Deep blue | |
back_color="#ffffff" | |
) | |
if qr_path: | |
paths.append(qr_path) | |
else: | |
logger.warning(f"Failed to generate QR for chunk {i+1}/{len(chunks)}.") | |
else: | |
# Process individual items (each dictionary in the list) | |
if data: # Ensure data is not empty | |
for idx, item in enumerate(data): | |
chunks = file_processor.chunk_data(item) # chunk_data works on individual dict | |
if not chunks: | |
logger.warning(f"No chunks generated for item {idx+1}.") | |
continue | |
for chunk_idx, chunk in enumerate(chunks): | |
filename = f'item_{idx+1}_chunk_{chunk_idx+1}_of_{len(chunks)}_{int(time.time())}.png' | |
qr_path = generate_stylish_qr( | |
data=chunk, # Pass the chunk dictionary | |
filename=filename, | |
fill_color="#1a365d", # Deep blue | |
back_color="#ffffff" | |
) | |
if qr_path: | |
paths.append(qr_path) | |
else: | |
logger.warning(f"Failed to generate QR for item {idx+1} chunk {chunk_idx+1}/{len(chunks)}.") | |
else: | |
logger.warning("No items in data list to process individually.") | |
logger.info(f"Generated {len(paths)} QR codes.") | |
return paths | |
except Exception as e: | |
logger.error(f"QR code generation error: {e}") | |
return [] | |
# Keep the Gradio UI definition and main function as they are, | |
# as the changes are internal to the processing classes and the | |
# process_inputs function already handles calling them and getting | |
# the combined list of results. | |
def create_modern_interface(): | |
"""Create a modern and visually appealing Gradio interface""" | |
# Modern CSS styling | |
css = """ | |
/* Modern color scheme */ | |
:root { | |
--primary-color: #1a365d; | |
--secondary-color: #2d3748; | |
--accent-color: #4299e1; | |
--background-color: #f7fafc; | |
--success-color: #48bb78; | |
--error-color: #f56565; | |
--warning-color: #ed8936; | |
} | |
/* Container styling */ | |
.container { | |
max-width: 1200px; | |
margin: auto; | |
padding: 2rem; | |
background-color: var(--background-color); | |
border-radius: 1rem; | |
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); | |
} | |
/* Component styling */ | |
.input-container { | |
background-color: white; | |
padding: 1.5rem; | |
border-radius: 0.5rem; | |
border: 1px solid #e2e8f0; | |
margin-bottom: 1rem; | |
} | |
/* Button styling */ | |
.primary-button { | |
background-color: var(--primary-color); | |
color: white; | |
padding: 0.75rem 1.5rem; | |
border-radius: 0.375rem; | |
border: none; | |
cursor: pointer; | |
transition: all 0.2s; | |
} | |
.primary-button:hover { | |
background-color: var(--accent-color); | |
transform: translateY(-1px); | |
} | |
/* Status messages */ | |
.status { | |
padding: 1rem; | |
border-radius: 0.375rem; | |
margin: 1rem 0; | |
} | |
.status.success { background-color: #f0fff4; color: var(--success-color); } | |
.status.error { background-color: #fff5f5; color: var(--error-color); } | |
.status.warning { background-color: #fffaf0; color: var(--warning-color); } | |
/* Gallery styling */ | |
.gallery { | |
display: grid; | |
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); | |
gap: 1rem; | |
padding: 1rem; | |
background-color: white; | |
border-radius: 0.5rem; | |
border: 1px solid #e2e8f0; | |
} | |
.gallery img { | |
width: 100%; | |
height: auto; | |
border-radius: 0.375rem; | |
transition: transform 0.2s; | |
} | |
.gallery img:hover { | |
transform: scale(1.05); | |
} | |
/* QR Code Viewport Styling */ | |
.viewport-container { | |
display: grid; | |
gap: 0.5rem; | |
padding: 1rem; | |
background-color: white; | |
border-radius: 0.5rem; | |
border: 1px solid #e2e8f0; | |
margin-top: 1rem; | |
} | |
.viewport-item { | |
display: flex; | |
flex-direction: column; | |
align-items: center; | |
} | |
.viewport-item img { | |
width: 100%; | |
height: auto; | |
border-radius: 0.375rem; | |
transition: transform 0.2s; | |
max-width: 150px; /* Adjust as needed */ | |
max-height: 150px; /* Adjust as needed */ | |
} | |
""" | |
# Create interface with modern design | |
with gr.Blocks(css=css, title="Advanced Data Processor & QR Generator") as interface: | |
interface.head += """ | |
<script> | |
let enabledStates = []; | |
function updateEnabledStates(checkbox) { | |
const index = parseInt(checkbox.dataset.index); | |
if (checkbox.checked) { | |
if (!enabledStates.includes(index)) { | |
enabledStates.push(index); | |
} | |
} else { | |
enabledStates = enabledStates.filter(item => item !== index); | |
} | |
// Send the updated state back to Gradio (you might need to adjust the target component) | |
const enabled_qr_codes_component = document.querySelector('[data-component-type="state"][data-state-name="enabled_qr_codes"]'); | |
if (enabled_qr_codes_component) { | |
enabled_qr_codes_component.value = JSON.stringify(enabledStates); | |
enabled_qr_codes_component.dispatchEvent(new Event('input')); // Trigger update | |
} | |
console.log("Enabled QR Code Indices:", enabledStates); | |
// You might want to trigger an update of the viewport here if needed | |
} | |
</script> | |
""" | |
with gr.Row(): | |
crawl_depth_slider = gr.Slider( | |
label="Crawl Depth", | |
minimum=0, | |
maximum=3, | |
value=0, | |
step=1, | |
interactive=True, | |
info="Select the maximum depth for crawling links (0-3)." | |
) | |
qr_code_paths = gr.State([]) | |
gr.Markdown(""" | |
# π Advanced Data Processing & QR Code Generator | |
Transform your data into beautifully designed, sequenced QR codes with our cutting-edge processor. | |
""") | |
with gr.Tab("π URL Processing"): | |
url_input = gr.Textbox( | |
label="Enter URLs (comma or newline separated)", | |
lines=5, | |
placeholder="https://example1.com\nhttps://example2.com", | |
value="" | |
) | |
with gr.Tab("π File Input"): | |
file_input = gr.File( | |
label="Upload Files", | |
file_types=None, # Accept all file types | |
file_count="multiple" | |
) | |
with gr.Tab("π JSON Input"): | |
text_input = gr.TextArea( | |
label="Direct JSON Input", | |
lines=15, | |
placeholder="Paste your JSON data here...", | |
value="" | |
) | |
with gr.Row(): | |
example_btn = gr.Button("π Load Example", variant="secondary") | |
clear_btn = gr.Button("ποΈ Clear", variant="secondary") | |
with gr.Row(): | |
combine_data = gr.Checkbox( | |
label="Combine all data into sequence", | |
value=True, | |
info="Generate sequential QR codes for combined data" | |
) | |
process_btn = gr.Button( | |
"π Process & Generate QR", | |
variant="primary" | |
) | |
# Output components | |
output_json = gr.JSON(label="Processed Data") | |
output_gallery = gr.Gallery( | |
label="Generated QR Codes", | |
columns=3, | |
height=400, | |
show_label=True | |
) | |
output_text = gr.Textbox( | |
label="Processing Status", | |
interactive=False | |
) | |
with gr.Tab("πΌοΈ QR Code Viewport") as viewport_tab: | |
viewport_output = gr.HTML(label="QR Code Sequence Viewport") | |
enabled_qr_codes = gr.State([]) # To store the enabled/disabled state | |
# Load example data | |
def load_example(): | |
example = { | |
"type": "product_catalog", | |
"items": [ | |
{ | |
"id": "123", | |
"name": "Premium Widget", | |
"description": "High-quality widget with advanced features", | |
"price": 299.99, | |
"category": "electronics", | |
"tags": ["premium", "featured", "new"] | |
}, | |
{ | |
"id": "456", | |
"name": "Basic Widget", | |
"description": "Reliable widget for everyday use", | |
"price": 149.99, | |
"category": "electronics", | |
"tags": ["basic", "popular"] | |
} | |
], | |
"metadata": { | |
"timestamp": datetime.now().isoformat(), | |
"version": "2.0", | |
"source": "example" | |
} | |
} | |
return json.dumps(example, indent=2) | |
def clear_input(): | |
return "", None, "" # Clear url, files, text | |
def update_viewport(paths, enabled_states): | |
if not paths: | |
return "<p>No QR codes generated yet.</p>" | |
num_qr_codes = len(paths) | |
cols = math.ceil(math.sqrt(num_qr_codes)) # Calculate columns for a roughly square grid | |
cols = max(1, min(cols, 6)) # Limit max columns for small screens | |
rows = math.ceil(num_qr_codes / cols) | |
viewport_html = f'<div class="viewport-container" style="grid-template-columns: repeat({cols}, 1fr);">'.format(cols) | |
# Initialize enabledStates if it's empty (first load) | |
if not enabled_states and paths: | |
enabled_states = list(range(num_qr_codes)) # Enable all by default on first view | |
for i, path in enumerate(paths): | |
is_enabled = i in enabled_states | |
border = "border: 2px solid green;" if is_enabled else "border: 2px solid lightgray;" | |
opacity = "opacity: 1.0;" if is_enabled else "opacity: 0.5;" | |
viewport_html += f'<div class="viewport-item" id="qr_item_{i}">' | |
viewport_html += f'<img src="/file={path}" style="{border} {opacity}" alt="QR Code {i+1}">' # Use /file= for Gradio to serve static files | |
viewport_html += f'<label><input type="checkbox" data-index="{i}" {"checked" if is_enabled else ""} onchange="updateEnabledStates(this)"> Enable</label>' | |
viewport_html += '</div>' | |
viewport_html += '</div>' | |
return viewport_html | |
def process_inputs(urls, files, text, combine, *args): | |
"""Process all inputs and generate QR codes""" | |
results = [] | |
processing_status_messages = [] | |
url_processor = EnhancedURLProcessor() | |
file_processor = EnhancedFileProcessor() | |
try: | |
# Process JSON input | |
if text and text.strip(): | |
try: | |
json_data = json.loads(text) | |
# Wrap direct JSON input in a dictionary for consistency with file/URL output structure | |
results.append({ | |
'source': 'json_input', | |
'extracted_data': json_data, | |
'timestamp': datetime.now().isoformat(), | |
'processing_notes': ['Parsed from direct JSON input.'] | |
}) | |
processing_status_messages.append("β Successfully parsed direct JSON input.") | |
except json.JSONDecodeError as e: | |
processing_status_messages.append(f"β Invalid JSON format in text input: {str(e)}") | |
except Exception as e: | |
processing_status_messages.append(f"β Error processing direct JSON input: {str(e)}") | |
# Process URLs | |
if urls and urls.strip(): | |
url_list = re.split(r'[,\n]', urls) | |
url_list = [url.strip() for url in url_list if url.strip()] | |
for url in url_list: | |
validation = url_processor.validate_url(url) | |
if validation['is_valid']: | |
processing_status_messages.append(f"π Fetching URL: {url}...") | |
content_result = url_processor.fetch_content(url) | |
if content_result: | |
results.append(content_result) | |
processing_status_messages.append(f"β Fetched and processed URL: {url}") | |
else: | |
processing_status_messages.append(f"β Failed to fetch/process URL: {url}") | |
if validation['details'].get('final_url'): | |
processing_status_messages[-1] += f" (Redirected to {validation['details']['final_url']})" | |
else: | |
processing_status_messages.append(f"β οΈ Skipping invalid URL: {url} ({validation['message']})") | |
# Process files | |
if files: | |
for file in files: | |
processing_status_messages.append(f"π Processing file: {file.name}...") | |
file_results = file_processor.process_file(file) | |
if file_results: | |
results.extend(file_results) | |
processing_status_messages.append(f"β Processed file: {file.name}") | |
else: | |
processing_status_messages.append(f"β Failed to process file: {file.name}") | |
# Generate QR codes | |
qr_paths = [] | |
final_json_output = None | |
if results: | |
# Use the collected results (list of dicts) for QR code generation | |
qr_paths = generate_qr_codes(results, combine) | |
final_json_output = results # Show the structured data in the JSON output box | |
if qr_paths: | |
processing_status_messages.append(f"β Successfully generated {len(qr_paths)} QR codes.") | |
else: | |
processing_status_messages.append("β Failed to generate QR codes.") | |
else: | |
processing_status_messages.append("β οΈ No valid content collected from inputs.") | |
except Exception as e: | |
logger.error(f"Overall processing error in process_inputs: {e}") | |
processing_status_messages.append(f"β An unexpected error occurred during processing: {str(e)}") | |
return ( | |
final_json_output, | |
[str(path) for path in qr_paths], # Gradio Gallery expects list of paths (strings) | |
"\n".join(processing_status_messages) # Join status messages | |
) | |
def on_qr_generation(qr_paths_list): | |
# When QR codes are generated, update the state with the list of paths | |
# and initialize the enabled_qr_codes state with all indices enabled | |
if qr_paths_list is None: | |
num_qrs=0 | |
else: | |
else: | |
num_qrs=len(qr_paths_list) | |
initial_enabled_states = list(range(num_qrs)) | |
return qr_paths_list, initial_enabled_states # Return paths list and initial enabled state | |
# Link events | |
example_btn.click(load_example, inputs=[], outputs=text_input) | |
clear_btn.click(clear_input, inputs=[], outputs=[url_input, file_input, text_input]) # Clear all inputs | |
process_btn.click( | |
process_inputs, | |
inputs=[url_input, file_input, text_input, combine_data, crawl_depth_slider], | |
outputs=[output_json, output_gallery, output_text] | |
).then( # Chain a .then() to update the QR paths state and trigger viewport update | |
on_qr_generation, | |
inputs=[output_gallery], # Get the list of paths from the gallery output | |
outputs=[qr_code_paths, enabled_qr_codes] # Update the state variables | |
) | |
# The viewport tab's select event will trigger update_viewport to render the grid | |
viewport_tab.select(update_viewport, inputs=[qr_code_paths, enabled_qr_codes], outputs=[viewport_output]) | |
# Add helpful documentation | |
gr.Markdown(""" | |
### π Features | |
- **Enhanced URL Scraping**: Extracts HTML text, title, meta description, links, and attempts parsing JSON/XML from URLs based on content type. | |
- **Advanced File Processing**: Reads various text-based files (.txt, .md, .log etc.), HTML, XML, CSV, and attempts text extraction from common documents (.pdf, .docx, .rtf, .odt - *requires extra dependencies*). | |
- **Smart JSON Handling**: Parses valid JSON from direct input, files (.json or content), or URLs. | |
- **Archive Support**: Extracts and processes supported files from .zip, .tar, .gz archives. | |
- **Robust Encoding Detection**: Uses `chardet` for reliable character encoding identification. | |
- **Structured Output**: Provides a consistent JSON output format containing raw content (if applicable), extracted data, and processing notes for each processed item. | |
- **Sequential QR Codes**: Maintains data integrity across multiple codes by chunking the combined/individual processed data. | |
- **QR Code Viewport**: Visualize generated QR codes in a sequenced square grid with options to enable/disable individual codes for selective scanning/sharing. | |
- **Modern Design**: Clean, responsive interface with visual feedback. | |
### π‘ Tips | |
1. **URLs**: Enter multiple URLs separated by commas or newlines. The processor will attempt to fetch and structure the content based on its type. | |
2. **Files**: Upload any type of file. The processor will attempt to handle supported text-based files, archives (.zip, .tar, .gz), and specific document/structured formats. | |
3. **JSON**: Use the "Direct JSON Input" tab for pasting JSON data. The system also tries to detect JSON content in file uploads and URLs. Use the "Load Example" button to see a sample JSON structure. | |
4. **Dependencies**: Processing PDF, DOCX, RTF, and ODT files requires installing optional Python libraries. Check the console logs for warnings if a library is missing. | |
5. **QR Codes**: Choose whether to "Combine all data into sequence" or generate separate sequences for each input item. | |
6. **Processing**: Monitor the "Processing Status" box for real-time updates and notes about errors or processing steps. | |
7. **Output**: The "Processed Data" JSON box shows the structured data extracted from your inputs. The "Generated QR Codes" gallery shows the QR code images. | |
### π¨ Output Details | |
- The "Processed Data" JSON will be a list of dictionaries. Each dictionary represents one processed input (URL or file). | |
- Each item will have keys like `source`, `filename` (for files), `url` (for URLs), `mime_type`, `raw_content` (if readable), `extracted_data`, and `processing_notes`. | |
- `extracted_data` will contain the parsed/extracted content, structured according to the input type (e.g., dictionary for JSON, text for documents, list of rows for CSV, dictionary with title/text/links for HTML). | |
- `processing_notes` will list any issues encountered during extraction. | |
- Generated QR codes are saved in the `output/qr_codes` directory. | |
### βοΈ QR Code Viewport Instructions | |
1. Navigate to the **QR Code Viewport** tab after generating QR codes. | |
2. The generated QR codes will be displayed in a grid based on their total count. | |
3. Use the checkboxes below each QR code to enable or disable it for visual selection. Enabled codes have a green border and full opacity. | |
4. This viewport is currently for visualization and selection *within the UI*; it doesn't change the generated files themselves. You would manually select which physical QR codes to scan based on this view. | |
""") | |
return interface | |
def main(): | |
"""Initialize and launch the application""" | |
try: | |
# Configure system settings | |
mimetypes.init() | |
# Create and launch interface | |
interface = create_modern_interface() | |
# Launch with configuration | |
interface.launch( | |
share=False, | |
debug=False, # Set to True for more verbose Gradio logging | |
show_error=True, | |
show_api=False | |
) | |
except Exception as e: | |
logger.error(f"Application startup error: {e}") | |
# Optionally print a user-friendly message before exiting | |
print(f"\nFatal Error: {e}\nCheck the logs for details.") | |
raise # Re-raise the exception to ensure the process exits if launch fails | |
if __name__ == "__main__": | |
main() |