Spaces:
Sleeping
Sleeping
import json | |
import os | |
import re | |
import time | |
import logging | |
import mimetypes | |
import zipfile | |
import tempfile | |
import chardet | |
from datetime import datetime | |
from typing import List, Dict, Optional, Union, Tuple | |
from pathlib import Path | |
from urllib.parse import urlparse, urljoin | |
import requests | |
import validators | |
import gradio as gr | |
from diskcache import Cache | |
from bs4 import BeautifulSoup | |
from fake_useragent import UserAgent | |
from cleantext import clean | |
import qrcode | |
from PIL import Image, ImageDraw, ImageFont | |
import numpy as np | |
import tarfile | |
import gzip | |
import networkx as nx | |
import matplotlib.pyplot as plt | |
from matplotlib.colors import to_rgba | |
import io | |
import math | |
# Setup enhanced logging with more detailed formatting | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler('app.log', encoding='utf-8') | |
]) | |
logger = logging.getLogger(__name__) | |
# Ensure output directories exist with modern structure | |
OUTPUTS_DIR = Path('output') | |
QR_CODES_DIR = OUTPUTS_DIR / 'qr_codes' | |
TEMP_DIR = OUTPUTS_DIR / 'temp' | |
for directory in [OUTPUTS_DIR, QR_CODES_DIR, TEMP_DIR]: | |
directory.mkdir(parents=True, exist_ok=True) | |
class EnhancedURLProcessor: | |
"""Advanced URL processing with complete content extraction""" | |
def __init__(self): | |
self.session = requests.Session() | |
self.timeout = 15 # Extended timeout for larger content | |
self.max_retries = 3 | |
self.user_agent = UserAgent() | |
# Enhanced headers for better site compatibility | |
self.session.headers.update({ | |
'User-Agent': self.user_agent.random, # Corrected spacing | |
'Accept': '*/*', # Accept all content types | |
'Accept-Language': 'en-US,en;q=0.9', | |
'Accept-Encoding': 'gzip, deflate, br', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1', | |
'Sec-Fetch-Dest': 'document', | |
'Sec-Fetch-Mode': 'navigate', | |
'Sec-Fetch-Site': 'none', | |
'Sec-Fetch-User': '?1', # Corrected spacing | |
'DNT': '1' | |
}) | |
def validate_url(self, url: str) -> Dict: | |
"""Enhanced URL validation with detailed feedback""" | |
try: | |
if not validators.url(url): | |
return {'is_valid': False, 'message': 'Invalid URL format', 'details': 'URL must begin with http:// or https://'} | |
parsed = urlparse(url) | |
if not all([parsed.scheme, parsed.netloc]): | |
return {'is_valid': False, 'message': 'Incomplete URL', 'details': 'Missing scheme or domain'} | |
# Try HEAD request first to check accessibility | |
head_response = None # Initialize head_response | |
try: | |
head_response = self.session.head(url, timeout=5) | |
head_response.raise_for_status() | |
# Need details from head_response if successful | |
details = { | |
'content_type': head_response.headers.get('Content-Type', 'unknown'), | |
'server': head_response.headers.get('Server', 'unknown'), | |
'size': head_response.headers.get('Content-Length', 'unknown') | |
} | |
except requests.exceptions.RequestException: | |
# If HEAD fails, try GET as some servers don't support HEAD | |
logger.info(f"HEAD request failed for {url}, trying GET.") | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
# Use details from GET response if HEAD failed | |
details = { | |
'content_type': response.headers.get('Content-Type', 'unknown'), | |
'server': response.headers.get('Server', 'unknown'), | |
'size': response.headers.get('Content-Length', 'unknown') # Might not be accurate for GET stream | |
} | |
return { | |
'is_valid': True, | |
'message': 'URL is valid and accessible', | |
'details': details | |
} | |
except Exception as e: | |
return {'is_valid': False, 'message': f'URL validation failed: {str(e)}', 'details': str(e)} | |
def fetch_content(self, url: str, retry_count: int = 0) -> Optional[Dict]: | |
"""Enhanced content fetcher with retry mechanism and complete character extraction""" | |
try: | |
logger.info(f"Fetching content from URL: {url} (Attempt {retry_count + 1}/{self.max_retries})") | |
# Update User-Agent randomly for each request | |
self.session.headers.update({'User-Agent': self.user_agent.random}) # Corrected spacing | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
# Detect encoding | |
if response.encoding is None: | |
encoding = chardet.detect(response.content)['encoding'] or 'utf-8' | |
else: | |
encoding = response.encoding | |
# Decode content with fallback | |
try: | |
raw_content = response.content.decode(encoding, errors='replace') | |
except (UnicodeDecodeError, LookupError): # Corrected error type | |
raw_content = response.content.decode('utf-8', errors='replace') | |
# Extract metadata | |
metadata = { | |
'url': url, | |
'timestamp': datetime.now().isoformat(), | |
'encoding': encoding, | |
'content_type': response.headers.get('Content-Type', ''), | |
'content_length': len(response.content), | |
'headers': dict(response.headers), | |
'status_code': response.status_code | |
} | |
# Process based on content type | |
content_type = response.headers.get('Content-Type', '').lower() | |
if 'text/html' in content_type: | |
processed_content = self._process_html_content(raw_content, url) | |
else: | |
processed_content = raw_content # Store raw non-html content as processed | |
return { | |
'content': processed_content, | |
'raw_content': raw_content, # Keep raw bytes if needed elsewhere | |
'metadata': metadata | |
} | |
except requests.exceptions.RequestException as e: | |
if retry_count < self.max_retries - 1: | |
logger.warning(f"Retry {retry_count + 1}/{self.max_retries} for URL: {url}") | |
time.sleep(2 ** retry_count) # Exponential backoff | |
return self.fetch_content(url, retry_count + 1) | |
logger.error(f"Failed to fetch content after {self.max_retries} attempts: {e}") | |
return None | |
except Exception as e: | |
logger.error(f"Unexpected error while fetching content: {e}") | |
return None | |
def _process_html_content(self, content: str, base_url: str) -> str: | |
"""Process HTML content while preserving all characters""" | |
try: | |
soup = BeautifulSoup(content, 'html.parser') | |
# Convert relative URLs to absolute | |
for tag in soup.find_all(['a', 'img', 'link', 'script']): | |
for attr in ['href', 'src']: | |
if tag.get(attr): | |
try: | |
# Handle potential base tag | |
base = soup.find('base') | |
current_base_url = base['href'] if base and base.get('href') else base_url | |
tag[attr] = urljoin(current_base_url, tag[attr]) | |
except Exception as url_e: | |
# logger.warning(f"Could not absolutize URL {tag.get(attr)} in {base_url}: {url_e}") | |
pass # Keep original if conversion fails | |
# Extract all text content more cleanly | |
text_parts = [element for element in soup.stripped_strings] | |
# text_content = ' '.join(text_parts) # Join with space instead of newline? Depends on use case. | |
# Or keep newlines for structure: | |
text_content = '\n'.join(text_parts) | |
# Alternative: Get all text including scripts/styles if needed | |
# text_content = soup.get_text(separator='\n', strip=True) | |
return text_content | |
except Exception as e: | |
logger.error(f"HTML processing error: {e}") | |
# Return original content if parsing fails | |
return content | |
class EnhancedFileProcessor: | |
"""Advanced file processing with complete content extraction""" | |
def __init__(self, max_file_size: int = 5 * 1024 * 1024 * 1024): # 5GB default | |
self.max_file_size = max_file_size | |
# Added more potential text/data formats | |
self.supported_extensions = { | |
'.txt', '.md', '.csv', '.json', '.xml', '.html', '.htm', '.css', '.js', | |
'.log', '.yml', '.yaml', '.ini', '.conf', '.cfg', '.toml', '.sql', '.py', '.java', '.c', '.cpp', '.h', # Code files | |
'.zip', '.tar', '.gz', '.bz2', # No .7z, .rar without external libs | |
# '.pdf', '.doc', '.docx', '.rtf', '.odt' # These require more specific libraries (PyPDF2, python-docx etc.) - keep commented unless implemented | |
} | |
# Define extensions that should be treated primarily as text | |
self.text_extensions = { | |
'.txt', '.md', '.csv', '.json', '.xml', '.html', '.htm', '.css', '.js', | |
'.log', '.yml', '.yaml', '.ini', '.conf', '.cfg', '.toml', '.sql', '.py', '.java', '.c', '.cpp', '.h' | |
} | |
def process_file(self, file) -> List[Dict]: | |
"""Process uploaded file with enhanced error handling and complete extraction""" | |
if not file or not hasattr(file, 'name'): | |
logger.warning("Invalid file object received in process_file.") | |
return [] | |
dataset = [] | |
file_path_obj = Path(file.name) | |
try: | |
# Use Gradio's temp file path directly | |
file_path = file_path_obj.resolve() | |
if not file_path.exists(): | |
logger.error(f"File path does not exist: {file_path}") | |
return [] | |
file_size = file_path.stat().st_size | |
if file_size > self.max_file_size: | |
logger.warning(f"File size ({file_size} bytes) exceeds maximum allowed size ({self.max_file_size} bytes) for {file_path.name}") | |
# Optionally return a specific error message entry | |
# return [{'error': 'File too large', 'filename': file_path.name}] | |
return [] | |
file_suffix = file_path.suffix.lower() | |
# Check if supported at all | |
# if file_suffix not in self.supported_extensions and not self._is_archive(str(file_path)): | |
# logger.warning(f"Unsupported file type based on extension: {file_path.name}") | |
# # Decide if you want to try processing anyway or return | |
# # return [{'error': 'Unsupported file type', 'filename': file_path.name}] | |
# # Let's try processing anyway, _process_single_file will handle text reading | |
# pass # Continue to attempt processing | |
# Use a persistent temp directory if needed across calls, otherwise TemporaryDirectory is fine | |
with tempfile.TemporaryDirectory(dir=TEMP_DIR) as temp_dir: # Use configured temp dir | |
temp_dir_path = Path(temp_dir) | |
# Handle archives first | |
if self._is_archive(str(file_path)): | |
logger.info(f"Processing archive file: {file_path.name}") | |
dataset.extend(self._process_archive(str(file_path), temp_dir_path)) | |
else: | |
# Process as single file (might be text or something else) | |
logger.info(f"Processing single file: {file_path.name}") | |
# Pass the path string or Path object to _process_single_file | |
dataset.extend(self._process_single_file(file_path)) | |
except Exception as e: | |
logger.error(f"Error processing file '{file_path_obj.name}': {str(e)}", exc_info=True) # Log stack trace | |
# Optionally return error entry | |
# dataset.append({'error': f'Processing failed: {str(e)}', 'filename': file_path_obj.name}) | |
return [] # Return empty list on error for now | |
return dataset | |
def _is_archive(self, filepath: str) -> bool: | |
"""Check if file is a supported archive type""" | |
# Only include archive types we can handle | |
return filepath.lower().endswith(('.zip', '.tar', '.tar.gz', '.tgz', '.gz', '.bz2')) # Added bz2 if bz2 lib is imported | |
def _process_single_file(self, file_path: Union[str, Path]) -> List[Dict]: | |
"""Process a single file with enhanced character extraction and JSON handling""" | |
# Ensure file_path is a Path object | |
file_path = Path(file_path) | |
file_name = file_path.name | |
file_suffix = file_path.suffix.lower() | |
try: | |
file_stat = file_path.stat() | |
file_size = file_stat.st_size | |
mime_type, _ = mimetypes.guess_type(file_path) | |
mime_type = mime_type or 'application/octet-stream' # Default if guess fails | |
# Initialize content storage | |
complete_content = None | |
is_json_like = file_suffix == '.json' or 'json' in mime_type | |
# Try reading as text first if it's a text-like extension or potentially text mime type | |
# Increased chunk size for efficiency on larger text files | |
chunk_size = 10 * 1024 * 1024 # 10MB chunks | |
if file_suffix in self.text_extensions or (mime_type and mime_type.startswith('text/')): | |
content_parts = [] | |
detected_encoding = 'utf-8' # Default | |
try: | |
with open(file_path, 'rb') as f: | |
# Detect encoding from the first chunk for better accuracy | |
first_chunk = f.read(chunk_size) | |
if first_chunk: | |
detected_encoding = chardet.detect(first_chunk)['encoding'] or 'utf-8' | |
logger.info(f"Detected encoding for {file_name}: {detected_encoding}") | |
# Rewind or reopen might be cleaner if needed, but let's decode first chunk | |
try: | |
decoded_chunk = first_chunk.decode(detected_encoding, errors='replace') | |
content_parts.append(decoded_chunk) | |
except (UnicodeDecodeError, LookupError): | |
logger.warning(f"Failed to decode first chunk with {detected_encoding}, falling back to utf-8 for {file_name}") | |
detected_encoding = 'utf-8' # Fallback for subsequent reads | |
decoded_chunk = first_chunk.decode(detected_encoding, errors='replace') | |
content_parts.append(decoded_chunk) | |
# Read remaining chunks | |
while True: | |
chunk = f.read(chunk_size) | |
if not chunk: | |
break | |
try: | |
decoded_chunk = chunk.decode(detected_encoding, errors='replace') | |
content_parts.append(decoded_chunk) | |
except (UnicodeDecodeError, LookupError): | |
# Should not happen if fallback already occurred, but good practice | |
logger.warning(f"Decoding error in subsequent chunk for {file_name}, using replace.") | |
decoded_chunk = chunk.decode(detected_encoding, errors='replace') | |
content_parts.append(decoded_chunk) | |
complete_content = ''.join(content_parts) | |
logger.info(f"Successfully read text content from {file_name}") | |
except IOError as e: | |
logger.error(f"IOError reading file {file_name}: {e}") | |
return [] # Cannot process if read fails | |
except Exception as e: | |
logger.error(f"Error reading text file {file_name}: {e}", exc_info=True) | |
# Decide if we should return or try other methods | |
return [] | |
# Now, check if the read text content IS valid JSON | |
json_data = None | |
raw_json_content = None # Store the raw string if it was JSON | |
if complete_content is not None: | |
try: | |
json_data = json.loads(complete_content) | |
# It is JSON! Update metadata | |
raw_json_content = complete_content # Keep the original string | |
complete_content = json_data # Now content holds the parsed object | |
mime_type = 'application/json' # Correct mime type | |
source = 'json_content_detected' | |
if file_suffix == '.json': | |
source = 'json_file' | |
logger.info(f"Successfully parsed JSON content from {file_name}") | |
except json.JSONDecodeError: | |
# It looked like text, but wasn't valid JSON | |
if is_json_like: | |
logger.warning(f"File {file_name} has JSON extension/mime but failed to parse.") | |
# Keep complete_content as the string it was read as | |
source = 'text_file' | |
except Exception as e: | |
logger.error(f"Unexpected error during JSON parsing check for {file_name}: {e}") | |
# Keep complete_content as string, mark as text file | |
source = 'text_file' | |
else: | |
# File wasn't identified as text or failed to read | |
# Could attempt binary read here if needed, or just mark as non-text | |
logger.warning(f"Could not read {file_name} as text. Storing metadata only or treating as binary.") | |
source = 'binary_file' # Or 'unreadable_file' | |
complete_content = f"Binary or unreadable content ({file_size} bytes)" # Placeholder | |
# Structure the output | |
result = { | |
'source': source, | |
'filename': file_name, | |
'file_size': file_size, | |
'mime_type': mime_type, | |
'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(), | |
'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(), | |
'content': complete_content, # This is parsed JSON if successful, or text string, or placeholder | |
'timestamp': datetime.now().isoformat() | |
} | |
if raw_json_content: | |
result['raw_content'] = raw_json_content # Add raw string if it was JSON | |
return [result] | |
except FileNotFoundError: | |
logger.error(f"File not found during processing: {file_path}") | |
return [] | |
except Exception as e: | |
logger.error(f"File processing error for {file_path.name}: {e}", exc_info=True) | |
return [] | |
def _process_archive(self, archive_path: str, extract_to: Path) -> List[Dict]: | |
"""Process an archive file with enhanced extraction""" | |
dataset = [] | |
archive_path_obj = Path(archive_path) | |
logger.info(f"Attempting to extract archive: {archive_path_obj.name}") | |
try: | |
# Handle ZIP archives | |
if archive_path.lower().endswith('.zip') and zipfile.is_zipfile(archive_path): | |
logger.debug(f"Processing ZIP file: {archive_path_obj.name}") | |
with zipfile.ZipFile(archive_path, 'r') as zip_ref: | |
# Check for zip bomb potential (optional, basic check) | |
total_uncompressed_size = sum(file.file_size for file in zip_ref.infolist()) | |
# Add a limit, e.g., 10x the archive size or an absolute limit like 10GB | |
if total_uncompressed_size > self.max_file_size * 10: # Example limit | |
logger.warning(f"Potential zip bomb detected: {archive_path_obj.name}, uncompressed size {total_uncompressed_size}") | |
return [{'error': 'Archive potential bomb', 'filename': archive_path_obj.name}] | |
for file_info in zip_ref.infolist(): | |
# Avoid directory entries and potential path traversal issues | |
if not file_info.is_dir() and file_info.filename and not file_info.filename.startswith('/') and '..' not in file_info.filename: | |
try: | |
extracted_path = extract_to / file_info.filename | |
# Ensure parent directory exists | |
extracted_path.parent.mkdir(parents=True, exist_ok=True) | |
# Extract individual file safely | |
with zip_ref.open(file_info.filename) as source, open(extracted_path, "wb") as target: | |
target.write(source.read()) | |
logger.debug(f"Extracted {file_info.filename} from zip.") | |
# Now process the extracted file | |
dataset.extend(self._process_single_file(extracted_path)) | |
except Exception as extract_err: | |
logger.error(f"Failed to extract/process file {file_info.filename} from zip {archive_path_obj.name}: {extract_err}") | |
# Handle TAR archives (covers .tar, .tar.gz, .tgz, .tar.bz2) | |
# Need to import bz2 if supporting .bz2 | |
elif tarfile.is_tarfile(archive_path): | |
logger.debug(f"Processing TAR file: {archive_path_obj.name}") | |
# Mode 'r:*' auto-detects compression (gz, bz2, xz if libs available) | |
with tarfile.open(archive_path, 'r:*') as tar_ref: | |
# Add security checks for tar extraction if needed (e.g., checking paths) | |
for member in tar_ref.getmembers(): | |
if member.isfile() and member.name and not member.name.startswith('/') and '..' not in member.name: | |
try: | |
# Construct safe path | |
extracted_path = extract_to / member.name | |
extracted_path.parent.mkdir(parents=True, exist_ok=True) | |
# Extract safely | |
with tar_ref.extractfile(member) as source, open(extracted_path, "wb") as target: | |
target.write(source.read()) | |
logger.debug(f"Extracted {member.name} from tar.") | |
dataset.extend(self._process_single_file(extracted_path)) | |
except Exception as extract_err: | |
logger.error(f"Failed to extract/process member {member.name} from tar {archive_path_obj.name}: {extract_err}") | |
# Handle GZIP archives (single file compression) - check it's not a tar.gz | |
elif archive_path.lower().endswith('.gz') and not archive_path.lower().endswith('.tar.gz'): | |
logger.debug(f"Processing GZIP file: {archive_path_obj.name}") | |
# Need to determine the output filename (remove .gz) | |
extracted_filename = archive_path_obj.stem | |
# Handle cases like '.txt.gz' -> '.txt' | |
if '.' in extracted_filename: | |
extracted_path = extract_to / extracted_filename | |
else: | |
# If no inner extension (e.g., 'myfile.gz'), maybe add a default like '.bin' or leave as is? | |
extracted_path = extract_to / (extracted_filename + ".bin") # Example | |
try: | |
extracted_path.parent.mkdir(parents=True, exist_ok=True) | |
with gzip.open(archive_path, 'rb') as gz_file, open(extracted_path, 'wb') as outfile: | |
outfile.write(gz_file.read()) | |
logger.debug(f"Extracted {extracted_path.name} from gzip.") | |
dataset.extend(self._process_single_file(extracted_path)) | |
except gzip.BadGzipFile as e: | |
logger.error(f"Error processing GZIP archive {archive_path_obj.name}: Bad Gzip File - {e}") | |
except Exception as extract_err: | |
logger.error(f"Failed to extract/process gzip file {archive_path_obj.name}: {extract_err}") | |
# Add BZ2 single file support (requires bz2 import) | |
elif archive_path.lower().endswith('.bz2') and not archive_path.lower().endswith('.tar.bz2'): | |
logger.debug(f"Processing BZ2 file: {archive_path_obj.name}") | |
try: | |
import bz2 | |
extracted_filename = archive_path_obj.stem | |
extracted_path = extract_to / extracted_filename | |
if '.' not in extracted_filename: | |
extracted_path = extract_to / (extracted_filename + ".bin") | |
extracted_path.parent.mkdir(parents=True, exist_ok=True) | |
with bz2.open(archive_path, 'rb') as bz2_file, open(extracted_path, 'wb') as outfile: | |
outfile.write(bz2_file.read()) | |
logger.debug(f"Extracted {extracted_path.name} from bz2.") | |
dataset.extend(self._process_single_file(extracted_path)) | |
except ImportError: | |
logger.warning("bz2 library not available, cannot process .bz2 files.") | |
except Exception as extract_err: | |
logger.error(f"Failed to extract/process bz2 file {archive_path_obj.name}: {extract_err}") | |
# Placeholder for other types or if no specific handler matched | |
else: | |
logger.warning(f"Archive type not explicitly handled or not a recognized archive: {archive_path_obj.name}") | |
except FileNotFoundError: | |
logger.error(f"Archive file not found: {archive_path}") | |
except (zipfile.BadZipFile, tarfile.TarError, gzip.BadGzipFile) as archive_err: | |
logger.error(f"Invalid or corrupted archive file {archive_path_obj.name}: {archive_err}") | |
dataset.append({'error': f'Corrupted archive: {archive_err}', 'filename': archive_path_obj.name}) | |
except Exception as e: | |
logger.error(f"General archive processing error for {archive_path_obj.name}: {e}", exc_info=True) | |
dataset.append({'error': f'Archive processing failed: {e}', 'filename': archive_path_obj.name}) | |
return dataset | |
# Adjusted chunk_data with recommended max_size for QR codes | |
def chunk_data(self, data: Union[Dict, List, str], max_size: int = 1800) -> List[Dict]: | |
"""Enhanced data chunking with sequence metadata, sized for QR codes.""" | |
try: | |
if not isinstance(data, str): | |
# Convert complex data to JSON string first | |
# Use separators=(',', ':') for compact JSON | |
json_str = json.dumps(data, ensure_ascii=False, separators=(',', ':')) | |
else: | |
json_str = data # Assume input string is already the data payload | |
# Data here is the raw string (or JSON string) payload for the QR code | |
total_length = len(json_str.encode('utf-8')) # Use byte length for QR capacity | |
logger.debug(f"Chunking data of total byte length: {total_length}") | |
# Simplified: If the data fits within max_size (bytes), return one chunk object | |
# The chunk object itself adds metadata, but the 'data' field is what matters for QR limit. | |
if total_length <= max_size: | |
chunk_meta = { | |
"chunk_index": 0, | |
"total_chunks": 1, | |
"total_length": total_length, # Store byte length | |
"chunk_hash": hash(json_str) & 0xFFFFFFFF, | |
"data": json_str # The actual string payload | |
} | |
logger.debug(f"Data fits in one chunk (payload size {total_length} bytes)") | |
return [chunk_meta] | |
# If data exceeds max_size, split the string payload | |
# We need to split the *string* representation carefully | |
# Aim for byte size chunks, which is tricky with UTF-8 variable char width | |
# Simple approach: estimate character chunk size based on bytes | |
# Estimate average bytes per character (crude but simple) | |
avg_bytes_per_char = total_length / len(json_str) if len(json_str) > 0 else 1 | |
# Calculate target character chunk size based on byte limit | |
target_char_chunk_size = int(max_size / avg_bytes_per_char) | |
if target_char_chunk_size < 1: target_char_chunk_size = 1 # Avoid zero chunk size | |
# Calculate number of chunks based on estimated character size | |
num_chunks = math.ceil(len(json_str) / target_char_chunk_size) | |
chunks = [] | |
start_char_idx = 0 | |
for i in range(num_chunks): | |
# Calculate end index, ensuring we don't overshoot | |
end_char_idx = min(start_char_idx + target_char_chunk_size, len(json_str)) | |
# Extract the character chunk | |
chunk_payload_str = json_str[start_char_idx:end_char_idx] | |
# Recalculate actual byte length for this specific chunk | |
current_chunk_byte_length = len(chunk_payload_str.encode('utf-8')) | |
# Adjust end_char_idx if current chunk exceeds max_size (rare if estimate is decent) | |
while current_chunk_byte_length > max_size and end_char_idx > start_char_idx: | |
end_char_idx -= 1 # Reduce characters | |
chunk_payload_str = json_str[start_char_idx:end_char_idx] | |
current_chunk_byte_length = len(chunk_payload_str.encode('utf-8')) | |
if not chunk_payload_str and start_char_idx < len(json_str): | |
# This should not happen with the logic above, but as a safeguard | |
logger.error("Chunking resulted in empty payload string unexpectedly.") | |
# Handle error: skip, break, or adjust logic | |
break # Avoid infinite loop | |
chunk_meta = { | |
"chunk_index": i, | |
"total_chunks": num_chunks, | |
"total_length": total_length, # Original total byte length | |
"chunk_byte_length": current_chunk_byte_length, # Actual byte length of this chunk's payload | |
"chunk_hash": hash(chunk_payload_str) & 0xFFFFFFFF, | |
"data": chunk_payload_str # The string payload for this chunk | |
} | |
chunks.append(chunk_meta) | |
logger.debug(f"Created chunk {i+1}/{num_chunks}, payload byte size: {current_chunk_byte_length}") | |
# Move to the next starting point | |
start_char_idx = end_char_idx | |
# Safety break if start index doesn't advance | |
if start_char_idx == len(json_str) and i + 1 < num_chunks: | |
logger.warning(f"Chunking finished early at index {i+1} of {num_chunks}. Check logic.") | |
# Adjust total_chunks if ending early? | |
for ch in chunks: ch['total_chunks'] = len(chunks) | |
break | |
# Final check if total chunks changed | |
if chunks and chunks[0]['total_chunks'] != len(chunks): | |
logger.warning(f"Adjusting total_chunks from {chunks[0]['total_chunks']} to {len(chunks)}") | |
final_num_chunks = len(chunks) | |
for i, chunk in enumerate(chunks): | |
chunk['total_chunks'] = final_num_chunks | |
chunk['chunk_index'] = i # Re-index just in case | |
return chunks | |
except Exception as e: | |
logger.error(f"Error chunking data: {e}", exc_info=True) | |
return [] | |
def generate_stylish_qr(data: str, # Expecting string data from chunking | |
filename: str, | |
size: int = 10, | |
border: int = 4, | |
fill_color: str = "#000000", | |
back_color: str = "#FFFFFF", | |
error_correction_level=qrcode.constants.ERROR_CORRECT_H) -> str: # Added param | |
"""Generate a stylish QR code with enhanced visual appeal""" | |
try: | |
qr = qrcode.QRCode( | |
version=None, # Auto-detect version | |
error_correction=error_correction_level, # Use parameter | |
box_size=size, | |
border=border | |
) | |
# Add string data directly (should be from chunker) | |
qr.add_data(data) | |
# Let the library figure out the best version and mode | |
qr.make(fit=True) | |
logger.info(f"Generating QR code version {qr.version} for {filename} (Payload size: {len(data.encode('utf-8'))} bytes)") | |
# Create QR code image with custom colors | |
qr_image = qr.make_image(fill_color=fill_color, back_color=back_color) | |
# Convert to RGBA for transparency support (optional gradient) | |
qr_image = qr_image.convert('RGBA') | |
# --- Optional: Add subtle gradient overlay --- | |
# gradient = Image.new('RGBA', qr_image.size, (0, 0, 0, 0)) | |
# draw = ImageDraw.Draw(gradient) | |
# for i in range(qr_image.width): | |
# alpha = int(255 * (1 - i/qr_image.width) * 0.1) # 10% maximum opacity | |
# draw.line([(i, 0), (i, qr_image.height)], fill=(255, 255, 255, alpha)) | |
# final_image = Image.alpha_composite(qr_image, gradient) | |
# --- End Optional Gradient --- | |
final_image = qr_image # Use this line if gradient is commented out | |
# Save the image | |
output_path = QR_CODES_DIR / filename | |
# Ensure directory exists just before saving | |
output_path.parent.mkdir(parents=True, exist_ok=True) | |
final_image.save(output_path, quality=95) # PNG quality is lossless, but ok | |
return str(output_path) | |
# Catch specific data overflow error | |
except qrcode.exceptions.DataOverflowError as doe: | |
logger.error(f"QR DataOverflowError for {filename}: {doe}. Data length (bytes): {len(data.encode('utf-8'))}. Max capacity likely exceeded for ErrorLevel {error_correction_level}.") | |
return "" # Return empty string on failure | |
except Exception as e: | |
logger.error(f"QR generation error for {filename}: {e}", exc_info=True) | |
return "" | |
def generate_qr_codes(data_to_encode: Union[str, Dict, List], combine_sources: bool = True) -> List[str]: | |
"""Generate QR codes, chunking data appropriately.""" | |
try: | |
file_processor = EnhancedFileProcessor() # Get chunking method | |
all_qr_paths = [] | |
qr_fill = "#1a365d" # Deep blue | |
qr_back = "#ffffff" | |
# Decide on error correction level - H is default, M or L allow more data | |
error_level = qrcode.constants.ERROR_CORRECT_H # Max correction, lowest capacity | |
# error_level = qrcode.constants.ERROR_CORRECT_M # Medium correction, medium capacity | |
# error_level = qrcode.constants.ERROR_CORRECT_L # Low correction, max capacity | |
if combine_sources: | |
logger.info("Combining all input sources into a single QR sequence.") | |
# Combine all data into one large structure (e.g., a list) before chunking | |
# This assumes `data_to_encode` is already the combined list/dict from process_inputs | |
if not data_to_encode: | |
logger.warning("No data provided to generate combined QR codes.") | |
return [] | |
# Chunk the combined data structure | |
chunks = file_processor.chunk_data(data_to_encode) # Chunker expects dict/list/str | |
if not chunks: | |
logger.error("Chunking the combined data failed.") | |
return [] | |
num_chunks = len(chunks) | |
logger.info(f"Generating {num_chunks} QR codes for combined data.") | |
for i, chunk_info in enumerate(chunks): | |
# chunk_info contains {'chunk_index', 'total_chunks', 'data', etc.} | |
filename = f'combined_qr_{int(time.time())}_{i+1}_of_{num_chunks}.png' | |
# Pass the actual payload string to the generator | |
qr_payload = chunk_info['data'] | |
qr_path = generate_stylish_qr( | |
data=qr_payload, | |
filename=filename, | |
fill_color=qr_fill, | |
back_color=qr_back, | |
error_correction_level=error_level # Pass level | |
) | |
if qr_path: | |
all_qr_paths.append(qr_path) | |
else: | |
logger.error(f"Failed to generate QR code for combined chunk {i+1}") | |
# Optionally stop or continue? | |
else: | |
# Process each item in the input list individually | |
logger.info("Generating separate QR code sequences for each input source.") | |
if not isinstance(data_to_encode, list): | |
logger.error("Input data must be a list when combine_sources is False.") | |
# Maybe wrap it? | |
if data_to_encode: | |
data_to_encode = [data_to_encode] | |
else: | |
return [] | |
total_items = len(data_to_encode) | |
for item_idx, item in enumerate(data_to_encode): | |
item_source_info = f"item {item_idx+1}/{total_items}" | |
# Try to get a better name (e.g., from filename if available) | |
if isinstance(item, dict) and 'filename' in item: | |
item_source_info = item['filename'] | |
elif isinstance(item, dict) and 'url' in item: | |
item_source_info = Path(urlparse(item['url']).path).name or f"url_item_{item_idx+1}" | |
logger.info(f"Processing source: {item_source_info}") | |
# Chunk the individual item | |
chunks = file_processor.chunk_data(item) | |
if not chunks: | |
logger.error(f"Chunking failed for item {item_idx+1} ({item_source_info})") | |
continue # Skip to next item | |
num_chunks = len(chunks) | |
logger.info(f"Generating {num_chunks} QR codes for {item_source_info}.") | |
for chunk_idx, chunk_info in enumerate(chunks): | |
# Sanitize source info for filename | |
safe_source_name = re.sub(r'[^\w\-]+', '_', item_source_info) | |
filename = f'{safe_source_name}_chunk_{chunk_idx+1}_of_{num_chunks}_{int(time.time())}.png' | |
qr_payload = chunk_info['data'] | |
qr_path = generate_stylish_qr( | |
data=qr_payload, | |
filename=filename, | |
fill_color=qr_fill, | |
back_color=qr_back, | |
error_correction_level=error_level # Pass level | |
) | |
if qr_path: | |
all_qr_paths.append(qr_path) | |
else: | |
logger.error(f"Failed to generate QR code for {item_source_info} chunk {chunk_idx+1}") | |
logger.info(f"Generated a total of {len(all_qr_paths)} QR codes.") | |
return all_qr_paths | |
except Exception as e: | |
logger.error(f"General QR code generation process error: {e}", exc_info=True) | |
return [] | |
def _generate_sequence_visualization_image(qr_paths: List[str], qr_data: List[Dict], title: str = "QR Code Sequence") -> Optional[io.BytesIO]: | |
""" | |
Generates a visual representation of the QR code sequence using NetworkX and Matplotlib. | |
Args: | |
qr_paths: List of file paths to the QR code images. | |
qr_data: List of decoded data dictionaries, hopefully containing 'chunk_index'. | |
title: The title for the visualization plot. | |
Returns: | |
A BytesIO buffer containing the PNG image of the visualization, or None if error. | |
""" | |
if not qr_paths or not qr_data or len(qr_paths) != len(qr_data): | |
logger.warning("Mismatch or empty data for visualization.") | |
return None | |
logger.info(f"Generating visualization for {len(qr_paths)} QR codes.") | |
try: | |
G = nx.DiGraph() | |
node_labels = {} | |
node_colors = [] | |
node_sizes = [] | |
# Assume data is pre-sorted by chunk_index during loading | |
num_nodes = len(qr_paths) | |
total_chunks_from_meta = qr_data[0].get('total_chunks', num_nodes) if qr_data else num_nodes | |
for i in range(num_nodes): | |
node_id = i | |
# Use chunk_index from metadata if possible, otherwise use list index | |
chunk_idx = qr_data[i].get('chunk_index', i) | |
label = f"{chunk_idx + 1}/{total_chunks_from_meta}" | |
node_labels[node_id] = label | |
G.add_node(node_id, path=qr_paths[i], data=qr_data[i]) | |
# Add edges between consecutive nodes | |
if i > 0: | |
G.add_edge(i - 1, i) | |
# Simple coloring/sizing (can be customized further) | |
node_colors.append('#4299e1') # Default blue color | |
node_sizes.append(1500) | |
if not G.nodes: | |
logger.warning("No nodes to visualize.") | |
return None | |
# --- Layout and Drawing --- | |
plt.figure(figsize=(max(10, num_nodes * 1.5), 5)) # Adjust figure size based on number of nodes | |
# Simple linear layout for sequences is often clearest | |
pos = {i: (i * 2, 0) for i in range(num_nodes)} # Horizontal layout | |
# For more complex graphs, consider other layouts: | |
# pos = nx.spring_layout(G, k=0.5, iterations=50) | |
# pos = nx.kamada_kawai_layout(G) | |
nx.draw_networkx_nodes(G, pos, node_size=node_sizes, node_color=node_colors, alpha=0.9) | |
nx.draw_networkx_edges(G, pos, arrowstyle='-|>', arrowsize=20, edge_color='gray', alpha=0.6) | |
nx.draw_networkx_labels(G, pos, labels=node_labels, font_size=10, font_color='white') | |
plt.title(title, fontsize=16) | |
plt.xlabel("Sequence Index", fontsize=12) | |
plt.yticks([]) # Hide Y-axis ticks for linear layout | |
plt.xticks(range(0, num_nodes * 2, 2), [f"{i+1}" for i in range(num_nodes)]) # Label X-axis ticks | |
plt.box(False) # Remove frame box | |
plt.tight_layout() | |
# Save plot to a BytesIO buffer | |
buf = io.BytesIO() | |
plt.savefig(buf, format='png', bbox_inches='tight', dpi=100) | |
plt.close() # Close the plot figure to free memory | |
buf.seek(0) | |
logger.info("Successfully generated visualization image buffer.") | |
return buf | |
except Exception as e: | |
logger.error(f"Error generating visualization image: {e}", exc_info=True) | |
plt.close() # Ensure plot is closed even on error | |
return None | |
# --- Gradio Interface Section --- | |
def create_qr_sequence_visualizer(output_gallery_ref): # Pass a reference if needed later | |
"""Add QR sequence visualization capabilities to the application""" | |
with gr.Tab("π QR Sequence Visualizer"): | |
gr.Markdown(""" | |
## QR Code Sequence Visualizer | |
Upload a sequence of QR codes (e.g., those generated by this app) to decode them and visualize their order. | |
""") | |
# Store data globally within this tab's scope (alternative to Gradio State) | |
# This is simpler but not ideal for complex state management | |
shared_data = {'qr_paths': [], 'qr_data': []} | |
with gr.Row(): | |
with gr.Column(scale=1): | |
qr_input = gr.File( | |
label="Upload QR Code Images", | |
file_types=["image/png", "image/jpeg", ".png", ".jpg", ".jpeg"], # Be explicit | |
file_count="multiple" | |
) | |
visualize_btn = gr.Button("ποΈ Decode & Visualize Sequence", variant="primary") | |
reset_btn = gr.Button("ποΈ Reset Visualizer", variant="secondary") | |
visualization_status = gr.Textbox(label="Status", interactive=False, lines=3) | |
# Placeholder for interactive elements (future improvement) | |
# qr_toggles_container = gr.HTML(label="QR Code Controls (Future)") | |
with gr.Column(scale=2): | |
qr_visualization = gr.Image(label="QR Code Sequence Map", type="pil", height=400) # Use PIL type | |
qr_preview = gr.Gallery(label="Uploaded QR Codes (Sorted)", columns=4, height=400, object_fit="contain", preview=True) | |
def process_qr_codes_and_visualize(files): | |
"""Decodes QR files, sorts them, updates gallery, and generates visualization.""" | |
if not files: | |
shared_data['qr_paths'] = [] | |
shared_data['qr_data'] = [] | |
return "Please upload QR code images.", None, None, "β οΈ No QR codes uploaded." | |
logger.info(f"Processing {len(files)} uploaded QR files for visualization.") | |
qr_data_list = [] | |
qr_path_list = [] | |
decode_errors = 0 | |
# Use OpenCV detector via qrcode library | |
try: | |
detector = qrcode.QRCodeDetector() | |
except AttributeError: | |
logger.error("qrcode.QRCodeDetector not found. Ensure correct library version or dependencies.") | |
return "Error initializing QR detector.", None, None, "β Library Error" | |
except Exception as init_e: | |
logger.error(f"Error initializing QR detector: {init_e}") | |
return f"Error initializing QR detector: {init_e}", None, None, "β Detector Init Error" | |
for file in files: | |
try: | |
img_path = file.name # Gradio File object path | |
img = Image.open(img_path) | |
img_np = np.array(img.convert('RGB')) # Detector often prefers RGB | |
# Try to decode QR code | |
data, bbox, straight_qrcode = detector.detectAndDecode(img_np) | |
if data: | |
logger.debug(f"Decoded data from {os.path.basename(img_path)}: {data[:50]}...") | |
# Try parsing the decoded data as JSON (expected format from generator) | |
try: | |
qr_metadata = json.loads(data) | |
# Check if it looks like our chunk format | |
if isinstance(qr_metadata, dict) and 'chunk_index' in qr_metadata and 'total_chunks' in qr_metadata: | |
qr_data_list.append(qr_metadata) | |
qr_path_list.append(img_path) | |
else: | |
# Valid JSON, but not the expected chunk structure | |
logger.warning(f"Decoded valid JSON, but not expected format from {os.path.basename(img_path)}") | |
qr_data_list.append({"data": qr_metadata, "chunk_index": -1}) # Assign default index | |
qr_path_list.append(img_path) | |
except json.JSONDecodeError: | |
# Data decoded, but not JSON - store raw data | |
logger.warning(f"Could not decode JSON from QR data in {os.path.basename(img_path)}. Storing raw.") | |
qr_data_list.append({"data": data, "chunk_index": -1}) # Assign default index | |
qr_path_list.append(img_path) | |
except Exception as json_e: | |
logger.error(f"Error processing decoded JSON from {os.path.basename(img_path)}: {json_e}") | |
qr_data_list.append({"data": f"Error: {json_e}", "chunk_index": -1}) | |
qr_path_list.append(img_path) | |
decode_errors += 1 | |
else: | |
# QR code detected, but no data decoded (or detection failed) | |
logger.warning(f"Could not decode data from QR image: {os.path.basename(img_path)}") | |
qr_data_list.append({"data": "[DECODE FAILED]", "chunk_index": -1}) | |
qr_path_list.append(img_path) | |
decode_errors += 1 | |
except Exception as e: | |
logger.error(f"Error processing QR image file {os.path.basename(getattr(file, 'name', 'N/A'))}: {e}", exc_info=True) | |
# Optionally add placeholder for failed file? | |
decode_errors += 1 | |
if not qr_path_list: | |
shared_data['qr_paths'] = [] | |
shared_data['qr_data'] = [] | |
return "No valid QR codes could be processed or decoded.", None, None, "β Failed to process/decode QR codes" | |
# Attempt to sort by chunk_index (handle missing index gracefully) | |
try: | |
# Create tuples (index, data, path) for sorting | |
indexed_items = [] | |
for i, (data, path) in enumerate(zip(qr_data_list, qr_path_list)): | |
# Use provided chunk_index, fallback to list index if missing or invalid (-1) | |
sort_key = data.get('chunk_index', i) | |
if not isinstance(sort_key, int) or sort_key < 0: | |
sort_key = i # Fallback to original order for this item | |
indexed_items.append((sort_key, data, path)) | |
# Sort based on the index key | |
indexed_items.sort(key=lambda x: x[0]) | |
# Unpack sorted lists | |
sorted_qr_data = [item[1] for item in indexed_items] | |
sorted_qr_paths = [item[2] for item in indexed_items] | |
# Update shared data | |
shared_data['qr_paths'] = sorted_qr_paths | |
shared_data['qr_data'] = sorted_qr_data | |
logger.info("Successfully sorted QR data based on chunk_index.") | |
except Exception as e: | |
logger.error(f"Error sorting QR data: {e}. Using original order.") | |
# Use original order if sorting fails | |
shared_data['qr_paths'] = qr_path_list | |
shared_data['qr_data'] = qr_data_list | |
# Generate the visualization image using the helper function | |
# Use the sorted data stored in shared_data | |
visualization_image_buffer = _generate_sequence_visualization_image( | |
shared_data['qr_paths'], | |
shared_data['qr_data'], | |
title=f"Visualized Sequence ({len(shared_data['qr_paths'])} Codes)" | |
) | |
# Convert buffer to PIL Image for Gradio output if necessary | |
vis_image_pil = None | |
if visualization_image_buffer: | |
try: | |
vis_image_pil = Image.open(visualization_image_buffer) | |
except Exception as img_e: | |
logger.error(f"Failed to load visualization buffer into PIL Image: {img_e}") | |
status_message = f"Processed {len(shared_data['qr_paths'])} QR codes." | |
if decode_errors > 0: | |
status_message += f" ({decode_errors} decode errors)" | |
status_message += "\nSequence visualized." if vis_image_pil else "\nVisualization generation failed." | |
final_status = "β Done" if vis_image_pil else "β οΈ Errors Occurred" | |
# Update outputs: Gallery with sorted paths, Image with visualization, Status text | |
# The gallery expects a list of image paths or PIL images | |
gallery_output = shared_data['qr_paths'] | |
return gallery_output, vis_image_pil, status_message, final_status | |
def reset_visualizer_state(): | |
shared_data['qr_paths'] = [] | |
shared_data['qr_data'] = [] | |
logger.info("Resetting QR visualizer state.") | |
return None, None, None, "βͺ Visualizer Reset. Upload new QR codes." | |
# Event handlers | |
visualize_btn.click( | |
process_qr_codes_and_visualize, | |
inputs=[qr_input], | |
outputs=[qr_preview, qr_visualization, visualization_status, visualization_status] # Update gallery, image, and status twice? Let's map correctly. | |
# Correct mapping: | |
# outputs=[qr_preview (Gallery), qr_visualization (Image), visualization_status (Textbox), visualization_status (Textbox again - maybe just need 3 outputs?)] | |
# Let's try mapping to the 4 defined outputs: | |
# outputs=[qr_preview, qr_visualization, visualization_status, visualization_status] # Seems redundant, but matches function signature needs. Let's adjust function signature later if needed. | |
).then( | |
lambda: logger.info("Visualization process complete."), inputs=None, outputs=None | |
) | |
reset_btn.click( | |
reset_visualizer_state, | |
inputs=[], | |
outputs=[qr_preview, qr_visualization, qr_input, visualization_status] # Clear gallery, image, file input, status | |
) | |
def create_modern_interface(): | |
"""Create a modern and visually appealing Gradio interface""" | |
# Modern CSS styling (Seems intact) | |
css = """ | |
/* Modern color scheme */ | |
:root { | |
--primary-color: #1a365d; | |
--secondary-color: #2d3748; | |
--accent-color: #4299e1; | |
--background-color: #f7fafc; | |
--success-color: #48bb78; | |
--error-color: #f56565; | |
--warning-color: #ed8936; | |
} | |
/* Container styling */ | |
.container { | |
max-width: 1200px; | |
margin: auto; | |
padding: 2rem; | |
background-color: var(--background-color); | |
border-radius: 1rem; | |
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); | |
} | |
/* Component styling */ | |
.input-container { | |
background-color: white; | |
padding: 1.5rem; | |
border-radius: 0.5rem; | |
border: 1px solid #e2e8f0; | |
margin-bottom: 1rem; | |
} | |
/* Button styling */ | |
.primary-button { | |
background-color: var(--primary-color); | |
color: white; | |
padding: 0.75rem 1.5rem; | |
border-radius: 0.375rem; | |
border: none; | |
cursor: pointer; | |
transition: all 0.2s; | |
} | |
.primary-button:hover { | |
background-color: var(--accent-color); | |
transform: translateY(-1px); | |
} | |
/* Status messages */ | |
.status { | |
padding: 1rem; | |
border-radius: 0.375rem; | |
margin: 1rem 0; | |
} | |
.status.success { background-color: #f0fff4; color: var(--success-color); } | |
.status.error { background-color: #fff5f5; color: var(--error-color); } | |
.status.warning { background-color: #fffaf0; color: var(--warning-color); } | |
/* Gallery styling */ | |
.gallery { | |
display: grid; | |
grid-template-columns: repeat(auto-fit, minmax(150px, 1fr)); /* Adjust minmax */ | |
gap: 1rem; | |
padding: 1rem; | |
background-color: white; | |
border-radius: 0.5rem; | |
border: 1px solid #e2e8f0; | |
min-height: 150px; /* Ensure gallery has some height */ | |
} | |
.gallery img { | |
width: 100%; | |
height: auto; | |
object-fit: contain; /* Use contain to avoid stretching */ | |
border-radius: 0.375rem; | |
transition: transform 0.2s; | |
border: 1px solid #eee; /* Add subtle border */ | |
} | |
.gallery img:hover { | |
transform: scale(1.05); | |
box-shadow: 0 2px 4px rgba(0,0,0,0.1); /* Add hover shadow */ | |
} | |
""" | |
# Create interface with modern design | |
with gr.Blocks(css=css, title="Advanced Data Processor & QR Generator") as interface: | |
gr.Markdown(""" | |
# π Advanced Data Processing & QR Code Generator | |
Transform your data into beautifully designed, sequenced QR codes with our cutting-edge processor. | |
""") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
# Input Tabs | |
with gr.Tabs(): | |
with gr.TabItem("π URL Input"): | |
url_input = gr.Textbox( | |
label="Enter URLs (one per line or comma-separated)", | |
lines=5, | |
placeholder="https://example1.com\nhttps://example2.com", | |
elem_id="url-input" | |
) | |
with gr.TabItem("π File Input"): | |
file_input = gr.File( | |
label="Upload Files (Text, JSON, Archives: zip, tar, gz, bz2)", | |
file_count="multiple", | |
# Removed file_types="*" to rely on backend logic, or specify supported ones: | |
# file_types=[".txt", ".json", ".csv", ".md", ".xml", ".html", ".zip", ".tar", ".gz", ".bz2"] | |
elem_id="file-input" | |
) | |
with gr.TabItem("π Direct Input / JSON"): | |
text_input = gr.TextArea( | |
label="Direct Text/JSON Input", | |
lines=10, | |
placeholder="Paste your text or JSON data here...", | |
elem_id="text-input" | |
) | |
with gr.Row(): | |
example_btn = gr.Button("π Load JSON Example") | |
clear_btn = gr.Button("ποΈ Clear Input") | |
# Processing Options & Button | |
with gr.Row(): | |
combine_data = gr.Checkbox( | |
label="Combine all inputs into one sequence", | |
value=True, # Default to combined | |
info="If unchecked, each URL/File/Input generates its own QR sequence." | |
) | |
process_btn = gr.Button( | |
"π Process & Generate QR Codes", | |
variant="primary", | |
elem_id="process-button" | |
) | |
# Status Output | |
output_text = gr.Textbox( | |
label="Processing Status", | |
interactive=False, | |
lines=2, | |
elem_id="status-output" | |
) | |
with gr.Column(scale=3): | |
# Output Area | |
gr.Markdown("### Results") | |
with gr.Tabs(): | |
with gr.TabItem("πΌοΈ QR Codes"): | |
output_gallery = gr.Gallery( | |
label="Generated QR Codes", | |
columns=4, # Adjust columns as needed | |
height=500, # Adjust height | |
object_fit="contain", | |
preview=True, # Enable preview click | |
elem_id="qr-gallery" | |
) | |
with gr.TabItem("π Processed Data (JSON)"): | |
output_json = gr.JSON( | |
label="Processed Data Structure", | |
elem_id="json-output" | |
) | |
# Load example data | |
def load_example(): | |
example = { | |
"project": "Data Transfer Example", | |
"version": 1.1, | |
"items": [ | |
{"id": "A001", "name": "Item One", "value": 123.45, "tags": ["tag1", "tag2"]}, | |
{"id": "B002", "name": "Item Two", "value": 67.89, "enabled": True} | |
], | |
"timestamp": datetime.now().isoformat() | |
} | |
return json.dumps(example, indent=2) | |
def clear_input_area(): | |
# Clear only the direct text input area | |
return "" | |
# --- Main Processing Function --- | |
def process_inputs_and_generate_qrs(urls, files, text, combine): | |
"""Process all inputs, combine if requested, and generate QR codes.""" | |
start_time = time.time() | |
logger.info("Starting data processing...") | |
status_updates = [] | |
all_processed_data = [] # List to hold results from all sources | |
url_processor = EnhancedURLProcessor() | |
file_processor = EnhancedFileProcessor() | |
# 1. Process URLs | |
if urls and urls.strip(): | |
url_list = re.split(r'[,\n]+', urls) # Split by comma or newline, handle multiple newlines | |
url_list = [u.strip() for u in url_list if u.strip()] # Clean up | |
status_updates.append(f"Processing {len(url_list)} URLs...") | |
logger.info(f"Processing URLs: {url_list}") | |
for i, url in enumerate(url_list): | |
logger.info(f"Processing URL {i+1}/{len(url_list)}: {url}") | |
# Basic validation before fetching | |
if not validators.url(url): | |
logger.warning(f"Skipping invalid URL format: {url}") | |
status_updates.append(f"β οΈ Skipped invalid URL: {url[:50]}...") | |
all_processed_data.append({'error': 'Invalid URL format', 'url': url}) | |
continue | |
content_data = url_processor.fetch_content(url) | |
if content_data and 'content' in content_data: | |
logger.info(f"Successfully fetched content from {url} ({len(content_data.get('raw_content',''))} bytes)") | |
# Structure the result similarly to file processing output | |
processed_url_data = { | |
'source': 'url', | |
'url': url, | |
'content': content_data['content'], # Processed text content | |
'raw_content': content_data['raw_content'], # Raw response body | |
'metadata': content_data['metadata'], # Headers, status, etc. | |
'timestamp': datetime.now().isoformat() | |
} | |
all_processed_data.append(processed_url_data) | |
status_updates.append(f"β Fetched: {url[:60]}...") | |
else: | |
logger.error(f"Failed to fetch content from URL: {url}") | |
status_updates.append(f"β Failed fetch: {url[:60]}...") | |
all_processed_data.append({'error': 'Failed to fetch content', 'url': url}) | |
# 2. Process Files | |
if files: | |
status_updates.append(f"Processing {len(files)} uploaded files...") | |
logger.info(f"Processing {len(files)} files.") | |
for i, file_obj in enumerate(files): | |
logger.info(f"Processing file {i+1}/{len(files)}: {getattr(file_obj, 'name', 'N/A')}") | |
try: | |
# Pass the Gradio file object directly to process_file | |
file_results = file_processor.process_file(file_obj) | |
if file_results: | |
all_processed_data.extend(file_results) | |
# Get filename safely from results (might be multiple from archive) | |
processed_filenames = [res.get('filename', 'N/A') for res in file_results] | |
status_updates.append(f"β Processed file(s): {', '.join(processed_filenames)}") | |
logger.info(f"Successfully processed file(s): {', '.join(processed_filenames)}") | |
else: | |
status_updates.append(f"β οΈ No data extracted from file: {getattr(file_obj, 'name', 'N/A')}") | |
logger.warning(f"No data extracted from file: {getattr(file_obj, 'name', 'N/A')}") | |
# Add placeholder error if desired | |
# all_processed_data.append({'error': 'No data extracted', 'filename': getattr(file_obj, 'name', 'N/A')}) | |
except Exception as file_proc_err: | |
file_name = getattr(file_obj, 'name', 'N/A') | |
logger.error(f"Error processing file {file_name}: {file_proc_err}", exc_info=True) | |
status_updates.append(f"β Error processing file: {file_name}") | |
all_processed_data.append({'error': f'File processing error: {file_proc_err}', 'filename': file_name}) | |
# 3. Process Direct Text/JSON Input | |
if text and text.strip(): | |
status_updates.append("Processing direct input...") | |
logger.info("Processing direct text/JSON input.") | |
# Attempt to parse as JSON first | |
try: | |
json_data = json.loads(text) | |
logger.info("Direct input parsed as JSON.") | |
processed_text_data = { | |
'source': 'direct_json', | |
'content': json_data, # Parsed JSON object/list | |
'raw_content': text, # Original string | |
'timestamp': datetime.now().isoformat() | |
} | |
all_processed_data.append(processed_text_data) | |
status_updates.append("β Processed direct input as JSON.") | |
except json.JSONDecodeError: | |
# If not JSON, treat as plain text | |
logger.info("Direct input treated as plain text.") | |
processed_text_data = { | |
'source': 'direct_text', | |
'content': text, # Store as plain text | |
'timestamp': datetime.now().isoformat() | |
} | |
all_processed_data.append(processed_text_data) | |
status_updates.append("β Processed direct input as Text.") | |
except Exception as direct_input_err: | |
logger.error(f"Error processing direct input: {direct_input_err}", exc_info=True) | |
status_updates.append(f"β Error processing direct input.") | |
all_processed_data.append({'error': f'Direct input error: {direct_input_err}', 'source': 'direct_input'}) | |
# 4. Check if any data was processed | |
if not all_processed_data: | |
logger.warning("No valid data sources found or processed.") | |
status_updates.append("β οΈ No data to process. Please provide input.") | |
final_status = "\n".join(status_updates) | |
return None, [], final_status # Return empty results | |
logger.info(f"Total processed data items: {len(all_processed_data)}") | |
status_updates.append(f"Data processed ({len(all_processed_data)} items). Generating QR codes...") | |
# 5. Generate QR Codes | |
qr_paths = [] | |
try: | |
# Pass the list of processed data items | |
qr_paths = generate_qr_codes(all_processed_data, combine) | |
if qr_paths: | |
status_updates.append(f"β Generated {len(qr_paths)} QR codes.") | |
logger.info(f"Successfully generated {len(qr_paths)} QR codes.") | |
else: | |
status_updates.append("β QR code generation failed or produced no codes.") | |
logger.error("QR code generation returned no paths.") | |
# Keep processed data, but gallery will be empty | |
except Exception as qr_gen_err: | |
logger.error(f"Error during QR code generation step: {qr_gen_err}", exc_info=True) | |
status_updates.append(f"β Error generating QR codes: {qr_gen_err}") | |
# Keep processed data, gallery will be empty | |
# 6. Finalize and Return | |
end_time = time.time() | |
processing_time = end_time - start_time | |
status_updates.append(f"Total processing time: {processing_time:.2f} seconds.") | |
final_status = "\n".join(status_updates) | |
# Return processed data (for JSON view), QR paths (for Gallery), and status string | |
# Ensure qr_paths is a list of strings | |
qr_paths_str = [str(p) for p in qr_paths] if qr_paths else [] | |
# Return data for JSON output, gallery paths, and status text | |
return all_processed_data, qr_paths_str, final_status | |
# --- Event Handlers --- | |
example_btn.click(load_example, outputs=[text_input]) | |
clear_btn.click(clear_input_area, outputs=[text_input]) | |
process_btn.click( | |
process_inputs_and_generate_qrs, | |
inputs=[url_input, file_input, text_input, combine_data], | |
outputs=[output_json, output_gallery, output_text] # Match function return order | |
) | |
# Add helpful documentation (Seems intact) | |
gr.Markdown(""" | |
### π Features | |
- **Complete URL Scraping**: Extracts text content from web pages. | |
- **Advanced File Processing**: Handles text, JSON, and archives (.zip, .tar.*, .gz, .bz2). Attempts intelligent JSON detection. | |
- **Direct Input**: Paste text or JSON directly. | |
- **Sequential QR Codes**: Chunks large data and embeds sequencing info. Option to combine inputs. | |
- **Modern Design**: Clean, responsive interface. | |
### π‘ Tips | |
1. **Inputs**: Use any combination of URL, File, or Direct Input tabs. | |
2. **Combine**: Check 'Combine all inputs' to create one QR sequence from all sources. Uncheck to get separate QR sequences for each source. | |
3. **Files**: Upload text-based files, JSON, or supported archives. Content from archives is extracted and processed. | |
4. **JSON**: Use the example button or upload a `.json` file. The app also tries to parse `.txt` or other files as JSON if they contain valid JSON structure. | |
5. **Status**: Monitor the Processing Status box for feedback. | |
### π¨ Output | |
- Generated QR codes appear in the 'QR Codes' tab and are saved in the `output/qr_codes` directory. | |
- The structured data processed from all inputs is shown in the 'Processed Data (JSON)' tab. | |
- Hover over or click QR codes in the gallery for a larger preview. | |
""") | |
return interface | |
def main(): | |
"""Initialize and launch the application""" | |
try: | |
# Configure system settings if needed | |
mimetypes.init() # Ensure mime types are loaded | |
logger.info("Starting Gradio application...") | |
# Create and launch interface | |
interface = create_modern_interface() | |
# Add the QR sequence visualizer tab (if function is defined and needed) | |
# with interface: | |
# create_qr_sequence_visualizer(None) # Pass relevant components if needed | |
# Launch with configuration | |
interface.launch( | |
share=False, # Set to True for public link (use with caution) | |
debug=False, # Set to True for more verbose Gradio errors | |
show_error=True, # Show Python errors in browser console | |
# server_name="0.0.0.0", # Bind to all interfaces if needed for Docker/network access | |
# server_port=7860, # Specify port if needed | |
show_api=False # Disable default Gradio API endpoint unless needed | |
) | |
logger.info("Gradio application stopped.") | |
except Exception as e: | |
logger.error(f"Application startup or runtime error: {e}", exc_info=True) | |
raise | |
if __name__ == "__main__": | |
# Ensure output directories exist before starting | |
OUTPUTS_DIR.mkdir(parents=True, exist_ok=True) | |
QR_CODES_DIR.mkdir(parents=True, exist_ok=True) | |
TEMP_DIR.mkdir(parents=True, exist_ok=True) | |
main() |