diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -14,39 +14,20 @@ from urllib.parse import urlparse, urljoin import requests import validators import gradio as gr -# from diskcache import Cache # Diskcache not used in the provided code, commented out +from diskcache import Cache from bs4 import BeautifulSoup from fake_useragent import UserAgent -from cleantext import clean # Keep import, even if direct use is commented, in case of future use or indirect dependency needs +from cleantext import clean import qrcode -import qrcode.constants # Explicit import for constants -import qrcode.exceptions # Explicit import for exceptions from PIL import Image, ImageDraw, ImageFont import numpy as np import tarfile import gzip -import bz2 # Ensure bz2 is imported for .bz2 file handling import networkx as nx import matplotlib.pyplot as plt from matplotlib.colors import to_rgba import io import math -import cv2 # Imported for QR code detection - -try: - import PyPDF2 # Added for PDF processing - from PyPDF2.errors import PdfReadError, DependencyError # Specific PDF errors - PDF_ENABLED = True -except ImportError: - PDF_ENABLED = False - # Define dummy classes/exceptions if PyPDF2 is not installed - class PdfReadError(Exception): pass - class DependencyError(Exception): pass - print("--------------------------------------------------------------------") - print("WARNING: PyPDF2 not installed. PDF processing will be disabled.") - print(" Install with: pip install pypdf2[crypto]") - print("--------------------------------------------------------------------") - # Setup enhanced logging with more detailed formatting logging.basicConfig( @@ -65,28 +46,18 @@ TEMP_DIR = OUTPUTS_DIR / 'temp' for directory in [OUTPUTS_DIR, QR_CODES_DIR, TEMP_DIR]: directory.mkdir(parents=True, exist_ok=True) -# --- Constants --- -# Reduced max size for QR payload to ensure fit within Version 40 with H ECC -# QR Code V40-H capacity (binary bytes): 1273 -# JSON overhead + chunk metadata needs buffer. 1000 bytes is safer. -QR_PAYLOAD_MAX_BYTES = 1000 -DEFAULT_MAX_FILE_SIZE_BYTES = 5 * 1024 * 1024 * 1024 # 5 GB - class EnhancedURLProcessor: - """Advanced URL processing with content extraction and basic validation.""" + """Advanced URL processing with complete content extraction""" def __init__(self): self.session = requests.Session() self.timeout = 15 # Extended timeout for larger content self.max_retries = 3 - try: - self.user_agent = UserAgent() - except Exception: # Handle potential errors fetching user agents list - logger.warning("Failed to initialize UserAgent. Using default.") - self.user_agent = None # Fallback + self.user_agent = UserAgent() # Enhanced headers for better site compatibility - headers = { - 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', + self.session.headers.update({ + 'User-Agent': self.user_agent.random, + 'Accept': '*/*', # Accept all content types 'Accept-Language': 'en-US,en;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', @@ -95,1254 +66,560 @@ class EnhancedURLProcessor: 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-User': '?1', - 'DNT': '1' # Do Not Track - } - if self.user_agent: - headers['User-Agent'] = self.user_agent.random - else: - headers['User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' # Default fallback - - self.session.headers.update(headers) - + 'DNT': '1' + }) def validate_url(self, url: str) -> Dict: - """Enhanced URL validation with detailed feedback using HEAD and GET.""" + """Enhanced URL validation with detailed feedback""" try: if not validators.url(url): return {'is_valid': False, 'message': 'Invalid URL format', 'details': 'URL must begin with http:// or https://'} parsed = urlparse(url) if not all([parsed.scheme, parsed.netloc]): return {'is_valid': False, 'message': 'Incomplete URL', 'details': 'Missing scheme or domain'} - - details = {} - is_valid = False - validation_message = "Validation failed" - - # Update User-Agent for this specific request - current_ua = self.user_agent.random if self.user_agent else self.session.headers['User-Agent'] - req_headers = {'User-Agent': current_ua} - - # Try HEAD request first + # Try HEAD request first to check accessibility try: - head_response = self.session.head(url, timeout=5, allow_redirects=True, headers=req_headers) + head_response = self.session.head(url, timeout=5) head_response.raise_for_status() - details = { - 'method': 'HEAD', + except requests.exceptions.RequestException: + # If HEAD fails, try GET as some servers don't support HEAD + response = self.session.get(url, timeout=self.timeout) + response.raise_for_status() + + return { + 'is_valid': True, + 'message': 'URL is valid and accessible', + 'details': { 'content_type': head_response.headers.get('Content-Type', 'unknown'), 'server': head_response.headers.get('Server', 'unknown'), - 'size': head_response.headers.get('Content-Length', 'unknown'), - 'final_url': head_response.url + 'size': head_response.headers.get('Content-Length', 'unknown') } - is_valid = True - validation_message = 'URL is valid and accessible (HEAD)' - logger.info(f"HEAD request successful for {url}") - - except requests.exceptions.RequestException as head_err: - logger.warning(f"HEAD request failed for {url}: {head_err}. Trying GET.") - # If HEAD fails, try GET (stream=True to avoid downloading full content) - try: - get_response = self.session.get(url, timeout=self.timeout, stream=True, allow_redirects=True, headers=req_headers) - get_response.raise_for_status() - details = { - 'method': 'GET', - 'content_type': get_response.headers.get('Content-Type', 'unknown'), - 'server': get_response.headers.get('Server', 'unknown'), - 'size': get_response.headers.get('Content-Length', 'unknown'), - 'final_url': get_response.url - } - get_response.close() # Close the stream immediately - is_valid = True - validation_message = 'URL is valid and accessible (GET)' - logger.info(f"GET request validation successful for {url}") - except requests.exceptions.RequestException as get_err: - logger.error(f"Both HEAD and GET requests failed for URL validation: {url}. Last error: {get_err}") - validation_message = f'URL validation failed (HEAD/GET): {get_err}' - details = {'error': str(get_err)} - - return { - 'is_valid': is_valid, - 'message': validation_message, - 'details': details } except Exception as e: - logger.error(f"Unexpected error during URL validation for {url}: {e}", exc_info=True) return {'is_valid': False, 'message': f'URL validation failed: {str(e)}', 'details': str(e)} def fetch_content(self, url: str, retry_count: int = 0) -> Optional[Dict]: - """Fetches URL content with retries, handles various types (HTML, Text, JSON, PDF).""" + """Enhanced content fetcher with retry mechanism and complete character extraction""" try: logger.info(f"Fetching content from URL: {url} (Attempt {retry_count + 1}/{self.max_retries})") - current_ua = self.user_agent.random if self.user_agent else self.session.headers['User-Agent'] - req_headers = {'User-Agent': current_ua} - response = self.session.get(url, timeout=self.timeout, allow_redirects=True, headers=req_headers) + # Update User-Agent randomly for each request + self.session.headers.update({'User-Agent': self.user_agent.random}) + + response = self.session.get(url, timeout=self.timeout) response.raise_for_status() - # --- Encoding Detection --- - detected_encoding = chardet.detect(response.content)['encoding'] - encoding = detected_encoding or response.apparent_encoding or 'utf-8' - logger.debug(f"Encoding for {url}: Detected={detected_encoding}, Apparent={response.apparent_encoding}, Using={encoding}") + # Detect encoding + if response.encoding is None: + encoding = chardet.detect(response.content)['encoding'] or 'utf-8' + else: + encoding = response.encoding - # --- Decode Content --- - raw_content_str = None + # Decode content with fallback try: - raw_content_str = response.content.decode(encoding, errors='replace') - except (UnicodeDecodeError, LookupError) as decode_err: - logger.warning(f"Decoding {url} with {encoding} failed ({decode_err}), falling back to utf-8") - encoding = 'utf-8' - raw_content_str = response.content.decode(encoding, errors='replace') + raw_content = response.content.decode(encoding, errors='replace') + except (UnicodeDecodeError, LookupError): + raw_content = response.content.decode('utf-8', errors='replace') - # --- Metadata --- + # Extract metadata metadata = { 'url': url, 'timestamp': datetime.now().isoformat(), - 'encoding_detected': detected_encoding, - 'encoding_used': encoding, + 'encoding': encoding, 'content_type': response.headers.get('Content-Type', ''), - 'content_length_bytes': len(response.content), + 'content_length': len(response.content), 'headers': dict(response.headers), - 'status_code': response.status_code, - 'final_url': response.url + 'status_code': response.status_code } - # --- Content Processing by Type --- - content_type = metadata['content_type'].lower() - processed_content = None - + # Process based on content type + content_type = response.headers.get('Content-Type', '').lower() if 'text/html' in content_type: - processed_content = self._process_html_content(raw_content_str, response.url) - logger.info(f"Fetched and processed HTML from {url}") - elif content_type.startswith('text/'): - processed_content = raw_content_str - logger.info(f"Fetched non-HTML text from {url}") - elif 'application/json' in content_type: - try: - processed_content = json.loads(raw_content_str) - logger.info(f"Fetched and parsed JSON from {url}") - except json.JSONDecodeError: - logger.warning(f"Content type JSON, but failed to parse {url}. Storing raw text.") - processed_content = raw_content_str - elif 'application/pdf' in content_type: - if PDF_ENABLED: - logger.info(f"Detected PDF from URL {url}. Attempting extraction.") - processed_content = self._extract_pdf_text_from_bytes(response.content, url) - else: - logger.warning(f"Detected PDF from URL {url}, but PyPDF2 is disabled. Skipping extraction.") - processed_content = "[PDF Content - PyPDF2 not installed]" + processed_content = self._process_html_content(raw_content, url) else: - logger.info(f"Fetched non-text/HTML/JSON/PDF content ({content_type}) from {url}. Storing raw string.") - # Limit storage of potentially large binary data represented as string - max_raw_str_len = 10000 - if raw_content_str and len(raw_content_str) > max_raw_str_len: - processed_content = raw_content_str[:max_raw_str_len] + f"... [truncated {len(raw_content_str) - max_raw_str_len} chars]" - elif raw_content_str: - processed_content = raw_content_str - else: - processed_content = "[Binary or Undecodable Content]" - - + processed_content = raw_content return { 'content': processed_content, - 'raw_bytes': response.content, # Keep raw bytes if needed for specific handling later + 'raw_content': raw_content, 'metadata': metadata } - - except requests.exceptions.Timeout: - logger.error(f"Timeout fetching {url} after {self.timeout}s.") - # Retry logic - if retry_count < self.max_retries - 1: - logger.warning(f"Retrying ({retry_count + 2}/{self.max_retries}) for URL: {url}") - time.sleep(1 * (retry_count + 1)) # Simple linear backoff - return self.fetch_content(url, retry_count + 1) - logger.error(f"Failed to fetch {url} after {self.max_retries} attempts due to timeout.") - return None except requests.exceptions.RequestException as e: - # Retry logic for general request errors - if retry_count < self.max_retries - 1: - logger.warning(f"Request failed ({e}), Retrying ({retry_count + 2}/{self.max_retries}) for URL: {url}") - time.sleep(1 * (retry_count + 1)) - return self.fetch_content(url, retry_count + 1) - logger.error(f"Failed to fetch content from {url} after {self.max_retries} attempts: {e}") - return None + if retry_count < self.max_retries - 1: + logger.warning(f"Retry {retry_count + 1}/{self.max_retries} for URL: {url}") + time.sleep(2 ** retry_count) # Exponential backoff + return self.fetch_content(url, retry_count + 1) + logger.error(f"Failed to fetch content after {self.max_retries} attempts: {e}") + return None except Exception as e: - logger.error(f"Unexpected error fetching content from {url}: {e}", exc_info=True) + logger.error(f"Unexpected error while fetching content: {e}") return None def _process_html_content(self, content: str, base_url: str) -> str: - """Extracts text from HTML, attempts to absolutize links.""" + """Process HTML content while preserving all characters""" try: - soup = BeautifulSoup(content, 'lxml') # Use lxml parser - # Absolutize links (best effort) - for tag in soup.find_all(['a', 'img', 'link', 'script'], href=True) + soup.find_all(['img', 'script'], src=True): - attr = 'href' if tag.has_attr('href') else 'src' - if tag[attr]: - try: - # Handle cases where tag[attr] might be a list (rare, but possible) - attr_value = tag[attr] - if isinstance(attr_value, list): - attr_value = attr_value[0] # Take the first one - - if not isinstance(attr_value, str): continue # Skip if not a string - - base_tag = soup.find('base') - current_base = base_tag['href'] if base_tag and base_tag.get('href') else base_url - abs_url = urljoin(current_base, attr_value) - if validators.url(abs_url): # Check if the result is a valid URL - tag[attr] = abs_url - except Exception as url_e: - logger.debug(f"Ignoring error during URL absolutization: {url_e}") - pass # Ignore errors in URL joining/validation - - # Extract text content, joined by newlines - text_content = '\n'.join(soup.stripped_strings) - # Optional cleaning (use cautiously) - # text_content = clean(text_content, no_line_breaks=False, lower=False) - return text_content - except Exception as e: - logger.error(f"HTML processing error for base URL {base_url}: {e}", exc_info=True) - return content # Return original content on error + soup = BeautifulSoup(content, 'html.parser') - def _extract_pdf_text_from_bytes(self, pdf_bytes: bytes, source_desc: str) -> str: - """Helper to extract text from PDF bytes using PyPDF2.""" - if not PDF_ENABLED: return "[PDF Extraction Disabled]" - try: - pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_bytes)) - if pdf_reader.is_encrypted: - # Try decrypting with an empty password - try: - decrypt_status = pdf_reader.decrypt('') - # Check PyPDF2 version for return type (integer in older, enum in newer) - is_decrypted = False - if isinstance(decrypt_status, int): # Older PyPDF2 versions - is_decrypted = decrypt_status > 0 - elif hasattr(PyPDF2.PasswordType, 'OWNER_PASSWORD'): # Newer PyPDF2 versions (check attribute exists) - is_decrypted = decrypt_status in (PyPDF2.PasswordType.OWNER_PASSWORD, PyPDF2.PasswordType.USER_PASSWORD) - else: # Fallback check if PasswordType structure changes - logger.warning("Could not determine PyPDF2 decryption status type, assuming success if no error.") - is_decrypted = True # Assume success if decrypt didn't raise error - - - if is_decrypted: - logger.info(f"Successfully decrypted PDF from {source_desc} with empty password.") - else: # Decryption failed - logger.warning(f"PDF from {source_desc} is encrypted and could not be decrypted with empty password.") - return "[Encrypted PDF Content - Decryption Failed]" - except (NotImplementedError, DependencyError) as decrypt_err: - # PyPDF2 might need cryptography for some PDFs - logger.error(f"Decryption dependency error for PDF from {source_desc}: {decrypt_err}. Ensure 'cryptography' is installed (`pip install pypdf2[crypto]`).") - return f"[Encrypted PDF Content - Decryption Dependency Missing: {decrypt_err}]" - except Exception as decrypt_err: - logger.warning(f"Error during decryption attempt for PDF from {source_desc}: {decrypt_err}") - return "[Encrypted PDF Content - Decryption Error]" - - # Proceed with extraction if not encrypted or successfully decrypted - pdf_text_parts = [] - num_pages = len(pdf_reader.pages) - for page_num in range(num_pages): - try: - page = pdf_reader.pages[page_num] - extracted = page.extract_text() - pdf_text_parts.append(extracted if extracted else "") # Add empty string if extraction fails - except Exception as page_err: - logger.warning(f"Error extracting text from page {page_num+1}/{num_pages} of PDF from {source_desc}: {page_err}") - pdf_text_parts.append(f"[Error extracting page {page_num+1}]") - full_text = "\n".join(pdf_text_parts).strip() - logger.info(f"Successfully extracted text ({len(full_text)} chars) from {num_pages} pages of PDF from {source_desc}") - return full_text if full_text else "[PDF contains no extractable text]" - - except PdfReadError as pdf_err: - logger.error(f"Could not read PDF from {source_desc}: {pdf_err}") - return f"[Invalid or Corrupted PDF: {pdf_err}]" + # Convert relative URLs to absolute + for tag in soup.find_all(['a', 'img', 'link', 'script']): + for attr in ['href', 'src']: + if tag.get(attr): + try: + tag[attr] = urljoin(base_url, tag[attr]) + except Exception: + pass + # Extract all text content + text_parts = [] + for element in soup.stripped_strings: + text_parts.append(str(element)) + return '\n'.join(text_parts) except Exception as e: - logger.error(f"Unexpected error processing PDF from {source_desc}: {e}", exc_info=True) - return "[Error processing PDF]" - + logger.error(f"HTML processing error: {e}") + return content class EnhancedFileProcessor: - """Processes local files (text, json, pdf, archives) with content extraction.""" - def __init__(self, max_file_size: int = DEFAULT_MAX_FILE_SIZE_BYTES): + """Advanced file processing with complete content extraction""" + def __init__(self, max_file_size: int = 5 * 1024 * 1024 * 1024): # 5GB default self.max_file_size = max_file_size - self.text_extensions = { - '.txt', '.md', '.csv', '.xml', '.html', '.htm', '.css', '.js', - '.log', '.yml', '.yaml', '.ini', '.conf', '.cfg', '.toml', '.sql', '.py', '.java', '.c', '.cpp', '.h' + self.supported_extensions = { + '.txt', '.md', '.csv', '.json', '.xml', '.html', '.htm', + '.log', '.yml', '.yaml', '.ini', '.conf', '.cfg', + '.zip', '.tar', '.gz', '.bz2', '.7z', '.rar', + '.pdf', '.doc', '.docx', '.rtf', '.odt' } - self.json_extension = '.json' - self.pdf_extension = '.pdf' - self.archive_extensions = {'.zip', '.tar', '.gz', '.bz2', '.tgz', '.tar.gz', '.tar.bz2'} def process_file(self, file) -> List[Dict]: - """Processes a single uploaded Gradio file object.""" - if not file or not hasattr(file, 'name'): - logger.warning("Invalid file object received.") - return [] + """Process uploaded file with enhanced error handling and complete extraction""" + if not file: + return [] dataset = [] - # Use orig_name if available (better for Gradio temp files) - file_display_name = getattr(file, 'orig_name', getattr(file, 'name', 'Unknown File')) - file_path_obj = Path(file.name) if hasattr(file, 'name') else None - - if not file_path_obj: - logger.error("File object missing 'name' attribute.") - return [{'error': 'Invalid file object on server', 'filename': file_display_name}] - try: - # Gradio file objects might be temporary, ensure path exists - if not file_path_obj.exists(): - logger.error(f"Temporary file path does not exist: {file_path_obj}") - return [{'error': 'File not found on server', 'filename': file_display_name}] - - file_path = file_path_obj.resolve() # Get absolute path - file_size = file_path.stat().st_size - + file_size = os.path.getsize(file.name) if file_size > self.max_file_size: - logger.warning(f"File '{file_display_name}' size ({file_size} bytes) exceeds limit ({self.max_file_size} bytes).") - return [{'error': 'File too large', 'filename': file_display_name, 'file_size': file_size}] - if file_size == 0: - logger.warning(f"File '{file_display_name}' is empty.") - return [{'error': 'File is empty', 'filename': file_display_name, 'file_size': 0}] + logger.warning(f"File size ({file_size} bytes) exceeds maximum allowed size") + return [] - - # Use a temporary directory for extraction if needed - with tempfile.TemporaryDirectory(dir=TEMP_DIR, prefix="extract_") as temp_dir: + with tempfile.TemporaryDirectory() as temp_dir: temp_dir_path = Path(temp_dir) - if self._is_archive(str(file_path)): - logger.info(f"Processing archive: {file_display_name}") - dataset.extend(self._process_archive(str(file_path), temp_dir_path, archive_display_name=file_display_name)) + # Handle different archive types + if self._is_archive(file.name): + dataset.extend(self._process_archive(file.name, temp_dir_path)) + elif Path(file.name).suffix.lower() in self.supported_extensions: + dataset.extend(self._process_single_file(file)) else: - logger.info(f"Processing single file: {file_display_name}") - dataset.extend(self._process_single_file(file_path, file_display_name)) + logger.warning(f"Unsupported file type: {file.name}") except Exception as e: - logger.error(f"Error processing file '{file_display_name}': {e}", exc_info=True) - dataset.append({'error': f'Processing failed: {e}', 'filename': file_display_name}) + logger.error(f"Error processing file: {str(e)}") + return [] return dataset def _is_archive(self, filepath: str) -> bool: - """Checks if the file extension is a supported archive type.""" - # Check suffix and double suffixes like .tar.gz - p = Path(filepath) - suffix = p.suffix.lower() - double_suffix = "".join(p.suffixes[-2:]).lower() - return suffix in self.archive_extensions or double_suffix in self.archive_extensions - - - def _process_single_file(self, file_path: Path, display_name: str) -> List[Dict]: - """Processes a single non-archive file (text, json, pdf, binary).""" - file_name = display_name # Use the potentially original name for reporting - file_suffix = file_path.suffix.lower() + """Check if file is an archive""" + return any(filepath.lower().endswith(ext) for ext in [ + '.zip', '.tar', '.gz', '.bz2', '.7z', '.rar' + ]) + def _process_single_file(self, file) -> List[Dict]: + """Process a single file with enhanced character extraction and JSON handling""" try: - file_stat = file_path.stat() + file_stat = os.stat(file.name) file_size = file_stat.st_size - mime_type, _ = mimetypes.guess_type(file_path) - mime_type = mime_type or 'application/octet-stream' - - complete_content = None - raw_content_str = None # Store raw string if read as text - source = 'unknown_file' - - # --- Determine File Type and Process --- - # 1. PDF - if PDF_ENABLED and (file_suffix == self.pdf_extension or mime_type == 'application/pdf'): - source = 'pdf_file' - logger.info(f"Processing PDF file: {file_name}") - with open(file_path, 'rb') as f: - pdf_bytes = f.read() - # Use the same helper as URL processor - url_processor = EnhancedURLProcessor() # Create temporary instance - complete_content = url_processor._extract_pdf_text_from_bytes(pdf_bytes, f"file '{file_name}'") - - # 2. JSON - elif file_suffix == self.json_extension or mime_type == 'application/json': - source = 'json_file' - logger.info(f"Processing JSON file: {file_name}") - try: - # Read with UTF-8 first, fallback if needed + # Initialize content storage + content_parts = [] + # Process file in chunks for large files + chunk_size = 10 * 1024 * 1024 # 10MB chunks + with open(file.name, 'rb') as f: + while True: + chunk = f.read(chunk_size) + if not chunk: + break + # Detect encoding for each chunk + encoding = chardet.detect(chunk)['encoding'] or 'utf-8' try: - with open(file_path, 'r', encoding='utf-8') as f: - raw_content_str = f.read() - except UnicodeDecodeError: - logger.warning(f"UTF-8 decode failed for {file_name}, trying detected encoding.") - with open(file_path, 'rb') as fb: - raw_bytes = fb.read() - detected_encoding = chardet.detect(raw_bytes)['encoding'] or 'latin-1' # Fallback to latin-1 - logger.info(f"Detected encoding for {file_name}: {detected_encoding}") - raw_content_str = raw_bytes.decode(detected_encoding, errors='replace') - - complete_content = json.loads(raw_content_str) # Parse JSON - logger.info(f"Successfully parsed JSON from {file_name}") - except json.JSONDecodeError as json_err: - logger.warning(f"File {file_name} looks like JSON but failed to parse: {json_err}. Storing raw text.") - complete_content = raw_content_str # Store raw text - source = 'text_file_failed_json' - except IOError as e: - logger.error(f"IOError reading JSON file {file_name}: {e}") - return [{'error': f'IOError reading file: {e}', 'filename': file_name}] - except Exception as e: - logger.error(f"Error reading/parsing JSON file {file_name}: {e}", exc_info=True) - complete_content = f"[Error reading JSON file: {e}]" - raw_content_str = complete_content - - # 3. Text - elif file_suffix in self.text_extensions or (mime_type and mime_type.startswith('text/')): - source = 'text_file' - logger.info(f"Processing Text file: {file_name}") - try: - with open(file_path, 'rb') as f: - raw_bytes = f.read() - detected_encoding = chardet.detect(raw_bytes)['encoding'] or 'utf-8' - logger.info(f"Detected encoding for {file_name}: {detected_encoding}") - raw_content_str = raw_bytes.decode(detected_encoding, errors='replace') - complete_content = raw_content_str - logger.info(f"Successfully read text from {file_name}") - - # Optional: Try parsing as JSON + decoded_chunk = chunk.decode(encoding, errors='replace') + content_parts.append(decoded_chunk) + except (UnicodeDecodeError, LookupError): + decoded_chunk = chunk.decode('utf-8', errors='replace') + content_parts.append(decoded_chunk) + # Combine all chunks + complete_content = ''.join(content_parts) + # Check if the content is valid JSON regardless of file extension + try: + if mimetypes.guess_type(file.name)[0] == 'application/json' or file.name.lower().endswith('.json'): + # It's a JSON file by type or extension + json_data = json.loads(complete_content) + return [{ + 'source': 'json_file', + 'filename': os.path.basename(file.name), + 'file_size': file_size, + 'mime_type': 'application/json', + 'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(), + 'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(), + 'content': json_data, # Store the parsed JSON object + 'raw_content': complete_content, # Store the original JSON string + 'timestamp': datetime.now().isoformat() + }] + else: + # Try to parse as JSON anyway try: - # Basic check before attempting full parse - if raw_content_str.strip().startswith(("{", "[")) and raw_content_str.strip().endswith(("}", "]")): - json_data = json.loads(raw_content_str) - complete_content = json_data - source = 'json_content_detected_in_text' - logger.info(f"Detected and parsed JSON structure within text file {file_name}") + json_data = json.loads(complete_content) + # If we get here, it's valid JSON despite the extension + return [{ + 'source': 'json_content', + 'filename': os.path.basename(file.name), + 'file_size': file_size, + 'mime_type': 'application/json', + 'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(), + 'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(), + 'content': json_data, # Store the parsed JSON object + 'raw_content': complete_content, # Store the original JSON string + 'timestamp': datetime.now().isoformat() + }] except json.JSONDecodeError: - pass # It's just text - except IOError as e: - logger.error(f"IOError reading text file {file_name}: {e}") - return [{'error': f'IOError reading file: {e}', 'filename': file_name}] - except Exception as e: - logger.error(f"Error reading text file {file_name}: {e}", exc_info=True) - complete_content = f"[Error reading text file: {e}]" - raw_content_str = complete_content + logger.warning(f"File {file.name} is not valid JSON.") + except Exception as e: + logger.error(f"Error during JSON processing: {e}") - # 4. Other (Binary/Unsupported) - else: - source = 'binary_or_unsupported_file' - logger.warning(f"File {file_name} not processed as text/json/pdf (MIME: {mime_type}, Suffix: {file_suffix}). Treating as binary/other.") - complete_content = f"[Binary or unsupported content ({file_size} bytes)]" - - # --- Structure Output --- - result = { - 'source': source, - 'filename': file_name, # Use display name + return [{ + 'source': 'file', + 'filename': os.path.basename(file.name), 'file_size': file_size, - 'mime_type': mime_type, + 'mime_type': mimetypes.guess_type(file.name)[0], 'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(), 'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(), 'content': complete_content, 'timestamp': datetime.now().isoformat() - } - # Include raw string only if it's text and wasn't successfully parsed as JSON - if raw_content_str is not None and source in ['text_file', 'text_file_failed_json', 'json_content_detected_in_text']: - result['raw_content'] = raw_content_str - - return [result] - - except FileNotFoundError: - logger.error(f"File not found during processing: {file_path} (Display Name: {display_name})") - return [{'error': 'File not found', 'filename': display_name}] + }] except Exception as e: - logger.error(f"General file processing error for {display_name}: {e}", exc_info=True) - return [{'error': f'File processing failed: {e}', 'filename': display_name}] + logger.error(f"File processing error: {e}") + return [] - def _process_archive(self, archive_path_str: str, extract_to: Path, archive_display_name: str) -> List[Dict]: - """Extracts files from supported archives and processes them.""" + def _process_archive(self, archive_path: str, extract_to: Path) -> List[Dict]: + """Process an archive file with enhanced extraction""" dataset = [] - archive_path = Path(archive_path_str) - archive_name = archive_display_name # Use display name for reporting - logger.info(f"Attempting to extract archive: {archive_name}") - extracted_something = False - try: - # --- ZIP --- - if archive_name.lower().endswith('.zip') and zipfile.is_zipfile(archive_path): - logger.debug(f"Processing ZIP: {archive_name}") + # Handle ZIP archives + if zipfile.is_zipfile(archive_path): with zipfile.ZipFile(archive_path, 'r') as zip_ref: - # Basic zip bomb check (total size) - total_uncompressed = sum(f.file_size for f in zip_ref.infolist()) - # Allow larger uncompressed size, but cap relative to compressed size - max_allowed_uncompressed = max(self.max_file_size, 20 * archive_path.stat().st_size) - if total_uncompressed > max_allowed_uncompressed: - logger.warning(f"ZIP {archive_name} potentially too large (uncompressed: {total_uncompressed} vs limit {max_allowed_uncompressed}). Skipping.") - return [{'error': 'Archive potential bomb or excessively large', 'filename': archive_name}] - - for member in zip_ref.infolist(): - member_display_name = member.filename # Name inside archive - if member.is_dir() or member.file_size == 0: continue # Skip directories and empty files - if member.file_size > self.max_file_size: - logger.warning(f"Skipping '{member_display_name}' in {archive_name}: size {member.file_size} exceeds limit.") - dataset.append({'error': 'File in archive too large', 'filename': member_display_name, 'archive': archive_name}) - continue - - # Sanitize and prevent path traversal - ensure target is within extract_to - member_path = Path(member_display_name) - # Remove leading slashes and '..' components - safe_parts = [p for p in member_path.parts if p not in ('', '.', '..')] - if not safe_parts: continue # Skip if path becomes empty - target_path = extract_to.joinpath(*safe_parts).resolve() - - # Double check it's within the extraction directory - if not str(target_path).startswith(str(extract_to.resolve())): - logger.warning(f"Skipping potentially unsafe path '{member_display_name}' in {archive_name}") - continue - - target_path.parent.mkdir(parents=True, exist_ok=True) - - try: - with zip_ref.open(member) as source, open(target_path, "wb") as target: - size_written = 0 - while True: - chunk = source.read(8192) - if not chunk: break - size_written += len(chunk) - if size_written > self.max_file_size: # Check during extraction - raise OverflowError(f"File '{member_display_name}' exceeded size limit during extraction.") - target.write(chunk) - logger.debug(f"Extracted '{member_display_name}' to '{target_path.relative_to(extract_to)}' from {archive_name}") - # Process the extracted file, passing its name within the archive - results = self._process_single_file(target_path, member_display_name) - # Add archive context to results - for res in results: res['archive'] = archive_name - dataset.extend(results) - extracted_something = True - except OverflowError as oe: - logger.error(f"Error extracting {member_display_name} from {archive_name}: {oe}") - dataset.append({'error': str(oe), 'filename': member_display_name, 'archive': archive_name}) - if target_path.exists(): target_path.unlink() # Clean up partial file - except Exception as extract_err: - logger.error(f"Failed to extract/process {member_display_name} from {archive_name}: {extract_err}", exc_info=True) - dataset.append({'error': f'Extraction/processing failed: {extract_err}', 'filename': member_display_name, 'archive': archive_name}) - - # --- TAR (tar, tar.gz, tgz, tar.bz2) --- - elif tarfile.is_tarfile(archive_path): - logger.debug(f"Processing TAR: {archive_name}") - # Mode 'r:*' auto-detects compression - with tarfile.open(archive_path, 'r:*') as tar_ref: - for member in tar_ref.getmembers(): - member_display_name = member.name # Name inside archive - if not member.isfile() or member.size == 0: continue # Skip non-files and empty files - if member.size > self.max_file_size: - logger.warning(f"Skipping '{member_display_name}' in {archive_name}: size {member.size} exceeds limit.") - dataset.append({'error': 'File in archive too large', 'filename': member_display_name, 'archive': archive_name}) - continue - - # Sanitize and prevent path traversal - member_path = Path(member_display_name) - safe_parts = [p for p in member_path.parts if p not in ('', '.', '..')] - if not safe_parts: continue - target_path = extract_to.joinpath(*safe_parts).resolve() - - if not str(target_path).startswith(str(extract_to.resolve())): - logger.warning(f"Skipping potentially unsafe path '{member_display_name}' in {archive_name}") - continue - - target_path.parent.mkdir(parents=True, exist_ok=True) - - try: - fileobj = tar_ref.extractfile(member) - if fileobj: - with open(target_path, "wb") as target: - size_written = 0 - while True: - chunk = fileobj.read(8192) - if not chunk: break - size_written += len(chunk) - if size_written > self.max_file_size: # Check during extraction - raise OverflowError(f"File '{member_display_name}' exceeded size limit during extraction.") - target.write(chunk) - logger.debug(f"Extracted '{member_display_name}' to '{target_path.relative_to(extract_to)}' from {archive_name}") - # Process the extracted file - results = self._process_single_file(target_path, member_display_name) - for res in results: res['archive'] = archive_name - dataset.extend(results) - extracted_something = True - else: - logger.warning(f"Could not extract file object for '{member_display_name}' from {archive_name}") - except OverflowError as oe: - logger.error(f"Error extracting {member_display_name} from {archive_name}: {oe}") - dataset.append({'error': str(oe), 'filename': member_display_name, 'archive': archive_name}) - if target_path.exists(): target_path.unlink() - except Exception as extract_err: - logger.error(f"Failed to extract/process {member_display_name} from {archive_name}: {extract_err}", exc_info=True) - dataset.append({'error': f'Extraction/processing failed: {extract_err}', 'filename': member_display_name, 'archive': archive_name}) - - # --- GZIP (single file) --- - elif archive_name.lower().endswith('.gz') and not archive_name.lower().endswith('.tar.gz'): # Avoid double handling tar.gz - logger.debug(f"Processing GZIP: {archive_name}") - # Determine output filename (remove .gz suffix) - extracted_filename = Path(archive_name).stem - if not extracted_filename: extracted_filename = archive_name + "_extracted" # Handle cases like '.gz' - target_path = extract_to / extracted_filename - target_path.parent.mkdir(parents=True, exist_ok=True) - try: - with gzip.open(archive_path, 'rb') as gz_file, open(target_path, 'wb') as outfile: - size_written = 0 - while True: - chunk = gz_file.read(8192) - if not chunk: break - size_written += len(chunk) - if size_written > self.max_file_size: - raise OverflowError(f"Gzipped file '{archive_name}' exceeded size limit during decompression.") - outfile.write(chunk) - logger.debug(f"Extracted '{target_path.name}' from {archive_name}") - # Process the extracted file - results = self._process_single_file(target_path, extracted_filename) # Use extracted name - for res in results: res['archive'] = archive_name - dataset.extend(results) - extracted_something = True - except OverflowError as oe: - logger.error(f"Error extracting {archive_name}: {oe}") - dataset.append({'error': str(oe), 'filename': extracted_filename, 'archive': archive_name}) - if target_path.exists(): target_path.unlink() - except gzip.BadGzipFile as e: - logger.error(f"Error processing GZIP {archive_name}: Bad Gzip File - {e}") - dataset.append({'error': f'Bad Gzip File: {e}', 'filename': archive_name}) - except Exception as extract_err: - logger.error(f"Failed to extract/process gzip {archive_name}: {extract_err}", exc_info=True) - dataset.append({'error': f'Extraction/processing failed: {extract_err}', 'filename': archive_name}) - - # --- BZ2 (single file) --- - elif archive_name.lower().endswith('.bz2') and not archive_name.lower().endswith('.tar.bz2'): # Avoid double handling tar.bz2 - logger.debug(f"Processing BZ2: {archive_name}") - extracted_filename = Path(archive_name).stem - if not extracted_filename: extracted_filename = archive_name + "_extracted" - target_path = extract_to / extracted_filename - target_path.parent.mkdir(parents=True, exist_ok=True) - try: - with bz2.open(archive_path, 'rb') as bz2_file, open(target_path, 'wb') as outfile: - size_written = 0 - while True: - chunk = bz2_file.read(8192) - if not chunk: break - size_written += len(chunk) - if size_written > self.max_file_size: - raise OverflowError(f"Bzipped file '{archive_name}' exceeded size limit during decompression.") - outfile.write(chunk) - logger.debug(f"Extracted '{target_path.name}' from {archive_name}") - # Process the extracted file - results = self._process_single_file(target_path, extracted_filename) - for res in results: res['archive'] = archive_name - dataset.extend(results) - extracted_something = True - except OverflowError as oe: - logger.error(f"Error extracting {archive_name}: {oe}") - dataset.append({'error': str(oe), 'filename': extracted_filename, 'archive': archive_name}) - if target_path.exists(): target_path.unlink() - except Exception as extract_err: - logger.error(f"Failed to extract/process bz2 {archive_name}: {extract_err}", exc_info=True) - dataset.append({'error': f'Extraction/processing failed: {extract_err}', 'filename': archive_name}) + zip_ref.extractall(extract_to) + for file_info in zip_ref.infolist(): + if file_info.file_size > 0 and not file_info.filename.endswith('/'): + extracted_path = extract_to / file_info.filename + if extracted_path.suffix.lower() in self.supported_extensions: + with open(extracted_path, 'rb') as f: + dataset.extend(self._process_single_file(f)) + # Handle TAR archives + elif archive_path.lower().endswith(('.tar', '.tar.gz', '.tgz')): + try: + with tarfile.open(archive_path, 'r:*') as tar_ref: + for member in tar_ref.getmembers(): + if member.isfile(): + extracted_path = extract_to / member.name + tar_ref.extract(member, path=extract_to) + if extracted_path.suffix.lower() in self.supported_extensions: + with open(extracted_path, 'rb') as f: + dataset.extend(self._process_single_file(f)) + except tarfile.TarError as e: + logger.error(f"Error processing TAR archive: {e}") + # Handle GZIP archives (single file) + elif archive_path.lower().endswith('.gz'): + extracted_path = extract_to / Path(archive_path).stem + try: + with gzip.open(archive_path, 'rb') as gz_file, open(extracted_path, 'wb') as outfile: + outfile.write(gz_file.read()) + if extracted_path.suffix.lower() in self.supported_extensions: + with open(extracted_path, 'rb') as f: + dataset.extend(self._process_single_file(f)) + except gzip.GzipFile as e: + logger.error(f"Error processing GZIP archive: {e}") + # TODO: Add support for other archive types (.bz2, .7z, .rar) - may require external libraries + elif archive_path.lower().endswith(('.bz2', '.7z', '.rar')): + logger.warning(f"Support for {Path(archive_path).suffix} archives is not yet fully implemented.") - else: - # If it wasn't handled by tarfile (e.g., .tar.gz, .tar.bz2) or the single file handlers, log warning. - if not tarfile.is_tarfile(archive_path): - logger.warning(f"Archive type not recognized or handled: {archive_name}") - dataset.append({'error': 'Unrecognized archive type', 'filename': archive_name}) - # If it *was* a tarfile but didn't extract anything (e.g., empty or only dirs) - elif not extracted_something and not any('error' in d for d in dataset): - logger.warning(f"Archive {archive_name} processed, but no valid files were extracted or processed.") - # Optionally add a note to the dataset - # dataset.append({'warning': 'No processable files found in archive', 'filename': archive_name}) - - - except FileNotFoundError: - logger.error(f"Archive file not found: {archive_path}") - dataset.append({'error': 'Archive file not found', 'filename': archive_name}) - except (zipfile.BadZipFile, tarfile.TarError, gzip.BadGzipFile, EOFError) as archive_err: # Added EOFError for tar issues - logger.error(f"Invalid or corrupted archive file {archive_name}: {archive_err}") - dataset.append({'error': f'Corrupted or invalid archive: {archive_err}', 'filename': archive_name}) except Exception as e: - logger.error(f"General archive processing error for {archive_name}: {e}", exc_info=True) - dataset.append({'error': f'Archive processing failed: {e}', 'filename': archive_name}) - + logger.error(f"Archive processing error: {e}") return dataset - - # Adjusted chunk_data with recommended max_size for QR codes - def chunk_data(self, data: Union[Dict, List, str], max_size: int = QR_PAYLOAD_MAX_BYTES) -> List[Dict]: - """Enhanced data chunking with sequence metadata, sized for QR codes.""" + def chunk_data(self, data: Union[Dict, List, str], max_size: int = 2953) -> List[Dict]: + """Enhanced data chunking with sequence metadata""" try: if not isinstance(data, str): - # Convert complex data to JSON string first - # Use separators=(',', ':') for compact JSON - json_str = json.dumps(data, ensure_ascii=False, separators=(',', ':')) + # Convert data to JSON string + json_str = json.dumps(data, ensure_ascii=False) else: - json_str = data # Assume input string is already the data payload - - # Data here is the raw string (or JSON string) payload for the QR code - total_length = len(json_str.encode('utf-8')) # Use byte length for QR capacity - logger.debug(f"Chunking data of total byte length: {total_length}") + json_str = data + total_length = len(json_str) + + # Calculate overhead for metadata + metadata_template = { + "chunk_index": 0, + "total_chunks": 1, + "total_length": total_length, + "chunk_hash": "", + "data": "" + } + overhead = len(json.dumps(metadata_template)) + 20 # Extra padding for safety + # Calculate effective chunk size + effective_chunk_size = max_size - overhead - # Simplified: If the data fits within max_size (bytes), return one chunk object - # The chunk object itself adds metadata, but the 'data' field is what matters for QR limit. - if total_length <= max_size: - chunk_meta = { + if total_length <= effective_chunk_size: + # Data fits in one chunk + chunk = { "chunk_index": 0, "total_chunks": 1, - "total_length": total_length, # Store byte length - "chunk_hash": hash(json_str) & 0xFFFFFFFF, - "data": json_str # The actual string payload + "total_length": total_length, + "chunk_hash": hash(json_str) & 0xFFFFFFFF, # 32-bit hash + "data": json_str } - logger.debug(f"Data fits in one chunk (payload size {total_length} bytes)") - return [chunk_meta] - - # If data exceeds max_size, split the string payload - # We need to split the *string* representation carefully - # Aim for byte size chunks, which is tricky with UTF-8 variable char width - # Simple approach: estimate character chunk size based on bytes - - # Estimate average bytes per character (crude but simple) - avg_bytes_per_char = total_length / len(json_str) if len(json_str) > 0 else 1 - # Calculate target character chunk size based on byte limit - target_char_chunk_size = int(max_size / avg_bytes_per_char) - - if target_char_chunk_size < 1: target_char_chunk_size = 1 # Avoid zero chunk size + return [chunk] - # Calculate number of chunks based on estimated character size - num_chunks = math.ceil(len(json_str) / target_char_chunk_size) + # Calculate number of chunks needed + num_chunks = -(-total_length // effective_chunk_size) # Ceiling division + chunk_size = -(-total_length // num_chunks) # Even distribution chunks = [] - start_char_idx = 0 for i in range(num_chunks): - # Calculate end index, ensuring we don't overshoot - end_char_idx = min(start_char_idx + target_char_chunk_size, len(json_str)) - - # Extract the character chunk - chunk_payload_str = json_str[start_char_idx:end_char_idx] - - # Recalculate actual byte length for this specific chunk - current_chunk_byte_length = len(chunk_payload_str.encode('utf-8')) - - # Adjust end_char_idx if current chunk exceeds max_size (rare if estimate is decent) - # This loop ensures the byte limit is strictly adhered to - while current_chunk_byte_length > max_size and end_char_idx > start_char_idx: - end_char_idx -= 1 # Reduce characters by 1 - # Re-slice the string with the adjusted end index - chunk_payload_str = json_str[start_char_idx:end_char_idx] - current_chunk_byte_length = len(chunk_payload_str.encode('utf-8')) - - # Handle edge case where adjustment makes chunk empty (shouldn't happen if max_size > 1) - if not chunk_payload_str and start_char_idx < len(json_str): - logger.error(f"Chunking resulted in empty payload string unexpectedly at index {i}. Max size: {max_size}, Start char: {start_char_idx}.") - # Option: break, or try to take at least one char if possible - if end_char_idx == start_char_idx and start_char_idx < len(json_str): - end_char_idx += 1 # Force at least one char - chunk_payload_str = json_str[start_char_idx:end_char_idx] - current_chunk_byte_length = len(chunk_payload_str.encode('utf-8')) - if current_chunk_byte_length > max_size: - logger.error(f"Cannot create chunk even with 1 char without exceeding max_size ({max_size} bytes). Aborting chunking.") - return [] # Cannot proceed - else: - break # Avoid potential infinite loop - - - chunk_meta = { + start_idx = i * chunk_size + end_idx = min(start_idx + chunk_size, total_length) + chunk_data = json_str[start_idx:end_idx] + + chunk = { "chunk_index": i, - "total_chunks": num_chunks, # Initial estimate, may be adjusted later - "total_length": total_length, # Original total byte length - "chunk_byte_length": current_chunk_byte_length, # Actual byte length of this chunk's payload - "chunk_hash": hash(chunk_payload_str) & 0xFFFFFFFF, - "data": chunk_payload_str # The string payload for this chunk + "total_chunks": num_chunks, + "total_length": total_length, + "chunk_hash": hash(chunk_data) & 0xFFFFFFFF, + "data": chunk_data } - chunks.append(chunk_meta) - logger.debug(f"Created chunk {i+1}/{num_chunks}, payload byte size: {current_chunk_byte_length}") - - # Move to the next starting point - start_char_idx = end_char_idx - - # Safety break if start index doesn't advance - if start_char_idx == len(json_str) and i + 1 < num_chunks: - logger.warning(f"Chunking finished early at index {i+1} of {num_chunks}. Adjusting total chunks.") - # Adjust total_chunks in already created chunks - final_num_chunks = len(chunks) - for ch_idx, ch in enumerate(chunks): - ch['total_chunks'] = final_num_chunks - ch['chunk_index'] = ch_idx # Re-index just in case - num_chunks = final_num_chunks # Update num_chunks for loop condition/logging - break - - - # Final check if total chunks changed due to adjustments or early finish - if chunks and chunks[-1]['total_chunks'] != len(chunks): - logger.warning(f"Adjusting total_chunks from {chunks[-1]['total_chunks']} to {len(chunks)} after loop completion.") - final_num_chunks = len(chunks) - for i, chunk in enumerate(chunks): - chunk['total_chunks'] = final_num_chunks - chunk['chunk_index'] = i # Re-index just in case - + chunks.append(chunk) return chunks except Exception as e: - logger.error(f"Error chunking data: {e}", exc_info=True) + logger.error(f"Error chunking data: {e}") return [] - -def generate_stylish_qr(data: str, # Expecting string data from chunking +def generate_stylish_qr(data: Union[str, Dict], filename: str, size: int = 10, border: int = 4, fill_color: str = "#000000", - back_color: str = "#FFFFFF", - error_correction_level=qrcode.constants.ERROR_CORRECT_H) -> str: # Added param + back_color: str = "#FFFFFF") -> str: """Generate a stylish QR code with enhanced visual appeal""" try: qr = qrcode.QRCode( - version=None, # Auto-detect version - error_correction=error_correction_level, # Use parameter + version=None, + error_correction=qrcode.constants.ERROR_CORRECT_H, box_size=size, border=border ) - # Add string data directly (should be from chunker) - qr.add_data(data) + # Add data to QR code + if isinstance(data, dict): + qr.add_data(json.dumps(data, ensure_ascii=False)) + else: + qr.add_data(data) - # Let the library figure out the best version and mode qr.make(fit=True) - payload_bytes = len(data.encode('utf-8')) - logger.info(f"Generating QR code version {qr.version} for {filename} (Payload size: {payload_bytes} bytes)") - - # Check if payload size exceeds capacity for the chosen version/ECC level - # This is a secondary check, as DataOverflowError should catch it, but good for logging - # Note: Capacities vary by mode (Numeric, Alphanumeric, Byte, Kanji) - # We assume Byte mode for JSON strings. - # Example capacity for V40-H (Byte mode): 1273 bytes - # A more robust check would involve getting capacity from the library if possible. - # For now, rely on the DataOverflowError exception. - - # Create QR code image with custom colors qr_image = qr.make_image(fill_color=fill_color, back_color=back_color) - # Convert to RGBA for transparency support (optional gradient) + # Convert to RGBA for transparency support qr_image = qr_image.convert('RGBA') - # --- Optional: Add subtle gradient overlay --- - # gradient = Image.new('RGBA', qr_image.size, (0, 0, 0, 0)) - # draw = ImageDraw.Draw(gradient) - # for i in range(qr_image.width): - # alpha = int(255 * (1 - i/qr_image.width) * 0.1) # 10% maximum opacity - # draw.line([(i, 0), (i, qr_image.height)], fill=(255, 255, 255, alpha)) - # final_image = Image.alpha_composite(qr_image, gradient) - # --- End Optional Gradient --- + # Add subtle gradient overlay + gradient = Image.new('RGBA', qr_image.size, (0, 0, 0, 0)) + draw = ImageDraw.Draw(gradient) + for i in range(qr_image.width): + alpha = int(255 * (1 - i/qr_image.width) * 0.1) # 10% maximum opacity + draw.line([(i, 0), (i, qr_image.height)], fill=(255, 255, 255, alpha)) - final_image = qr_image # Use this line if gradient is commented out + # Combine images + final_image = Image.alpha_composite(qr_image, gradient) # Save the image output_path = QR_CODES_DIR / filename - # Ensure directory exists just before saving - output_path.parent.mkdir(parents=True, exist_ok=True) - - final_image.save(output_path, quality=95) # PNG quality is lossless, but ok + final_image.save(output_path, quality=95) return str(output_path) - # Catch specific data overflow error - except qrcode.exceptions.DataOverflowError as doe: - payload_bytes = len(data.encode('utf-8')) - logger.error(f"QR DataOverflowError for {filename}: {doe}. Data length (bytes): {payload_bytes}. Max capacity likely exceeded for ErrorLevel {error_correction_level} and auto-detected version {getattr(qr, 'version', 'N/A')}.") - return "" # Return empty string on failure except Exception as e: - logger.error(f"QR generation error for {filename}: {e}", exc_info=True) + logger.error(f"QR generation error: {e}") return "" - -def generate_qr_codes(data_to_encode: Union[str, Dict, List], combine_sources: bool = True) -> List[str]: - """Generate QR codes, chunking data appropriately.""" +def generate_qr_codes(data: Union[str, Dict, List], combined: bool = True) -> List[str]: + """Generate QR codes with enhanced visual appeal and metadata""" try: - file_processor = EnhancedFileProcessor() # Get chunking method - all_qr_paths = [] - qr_fill = "#1a365d" # Deep blue - qr_back = "#ffffff" - # Decide on error correction level - H is default, M or L allow more data - error_level = qrcode.constants.ERROR_CORRECT_H # Max correction, lowest capacity - # error_level = qrcode.constants.ERROR_CORRECT_M # Medium correction, medium capacity - # error_level = qrcode.constants.ERROR_CORRECT_L # Low correction, max capacity - - - if combine_sources: - logger.info("Combining all input sources into a single QR sequence.") - # Combine all data into one large structure (e.g., a list) before chunking - # This assumes `data_to_encode` is already the combined list/dict from process_inputs - if not data_to_encode: - logger.warning("No data provided to generate combined QR codes.") - return [] - - # Chunk the combined data structure - chunks = file_processor.chunk_data(data_to_encode, max_size=QR_PAYLOAD_MAX_BYTES) # Use constant - if not chunks: - logger.error("Chunking the combined data failed.") - return [] - - num_chunks = len(chunks) - logger.info(f"Generating {num_chunks} QR codes for combined data.") - for i, chunk_info in enumerate(chunks): - # chunk_info contains {'chunk_index', 'total_chunks', 'data', etc.} - filename = f'combined_qr_{int(time.time())}_{i+1}_of_{num_chunks}.png' - # Pass the actual payload string to the generator - qr_payload = chunk_info['data'] + file_processor = EnhancedFileProcessor() + paths = [] + + if combined: + # Process combined data + chunks = file_processor.chunk_data(data) + for i, chunk in enumerate(chunks): + filename = f'combined_qr_{int(time.time())}_{i+1}_of_{len(chunks)}.png' qr_path = generate_stylish_qr( - data=qr_payload, + data=chunk['data'], # Use the 'data' part of the chunk filename=filename, - fill_color=qr_fill, - back_color=qr_back, - error_correction_level=error_level # Pass level + fill_color="#1a365d", # Deep blue + back_color="#ffffff" ) if qr_path: - all_qr_paths.append(qr_path) - else: - logger.error(f"Failed to generate QR code for combined chunk {i+1}") - # Optionally stop or continue? Or add error marker? - + paths.append(qr_path) else: - # Process each item in the input list individually - logger.info("Generating separate QR code sequences for each input source.") - if not isinstance(data_to_encode, list): - logger.error("Input data must be a list when combine_sources is False.") - # Maybe wrap it? - if data_to_encode: - data_to_encode = [data_to_encode] - else: - return [] - - - total_items = len(data_to_encode) - for item_idx, item in enumerate(data_to_encode): - item_source_info = f"item {item_idx+1}/{total_items}" - # Try to get a better name (e.g., from filename if available) - if isinstance(item, dict): - # Prioritize filename, then url, then source type - if 'filename' in item: - item_source_info = Path(item['filename']).stem # Use stem for cleaner name - elif 'url' in item: - # Clean up URL for filename use - parsed_url = urlparse(item['url']) - url_path_name = Path(parsed_url.path).stem - url_domain = parsed_url.netloc.replace('.', '_') - item_source_info = f"url_{url_domain}_{url_path_name}" if url_path_name else f"url_{url_domain}_index" - elif 'source' in item: - item_source_info = item['source'] + f"_{item_idx+1}" - - logger.info(f"Processing source: {item_source_info}") - - # Chunk the individual item - chunks = file_processor.chunk_data(item, max_size=QR_PAYLOAD_MAX_BYTES) # Use constant - if not chunks: - logger.error(f"Chunking failed for item {item_idx+1} ({item_source_info})") - continue # Skip to next item - - num_chunks = len(chunks) - logger.info(f"Generating {num_chunks} QR codes for {item_source_info}.") - for chunk_idx, chunk_info in enumerate(chunks): - # Sanitize source info for filename - safe_source_name = re.sub(r'[^\w\-\.]+', '_', item_source_info)[:50] # Limit length - timestamp_short = str(int(time.time()))[-6:] # Shorter timestamp - filename = f'{safe_source_name}_chunk_{chunk_idx+1}_of_{num_chunks}_{timestamp_short}.png' - qr_payload = chunk_info['data'] + # Process individual items + if isinstance(data, list): + for idx, item in enumerate(data): + chunks = file_processor.chunk_data(item) + for chunk_idx, chunk in enumerate(chunks): + filename = f'item_{idx+1}_chunk_{chunk_idx+1}_of_{len(chunks)}_{int(time.time())}.png' + qr_path = generate_stylish_qr( + data=chunk['data'], # Use the 'data' part of the chunk + filename=filename, + fill_color="#1a365d", # Deep blue + back_color="#ffffff" + ) + if qr_path: + paths.append(qr_path) + else: + chunks = file_processor.chunk_data(data) + for i, chunk in enumerate(chunks): + filename = f'single_qr_{i+1}_of_{len(chunks)}_{int(time.time())}.png' qr_path = generate_stylish_qr( - data=qr_payload, + data=chunk['data'], # Use the 'data' part of the chunk filename=filename, - fill_color=qr_fill, - back_color=qr_back, - error_correction_level=error_level # Pass level + fill_color="#1a365d", # Deep blue + back_color="#ffffff" ) if qr_path: - all_qr_paths.append(qr_path) - else: - logger.error(f"Failed to generate QR code for {item_source_info} chunk {chunk_idx+1}") - - - logger.info(f"Generated a total of {len(all_qr_paths)} QR codes.") - return all_qr_paths + paths.append(qr_path) + return paths except Exception as e: - logger.error(f"General QR code generation process error: {e}", exc_info=True) + logger.error(f"QR code generation error: {e}") return [] -def _generate_sequence_visualization_image(qr_paths: List[str], qr_data: List[Dict], title: str = "QR Code Sequence") -> Optional[io.BytesIO]: - """ - Generates a visual representation of the QR code sequence using NetworkX and Matplotlib. - Args: - qr_paths: List of file paths to the QR code images. - qr_data: List of decoded data dictionaries, hopefully containing 'chunk_index'. - title: The title for the visualization plot. - Returns: - A BytesIO buffer containing the PNG image of the visualization, or None if error. - """ - if not qr_paths or not qr_data or len(qr_paths) != len(qr_data): - logger.warning("Mismatch or empty data for visualization.") - return None - - logger.info(f"Generating visualization for {len(qr_paths)} QR codes.") - try: - G = nx.DiGraph() - node_labels = {} - node_colors = [] - node_sizes = [] - - # Assume data is pre-sorted by chunk_index during loading - num_nodes = len(qr_paths) - total_chunks_from_meta = qr_data[0].get('total_chunks', num_nodes) if qr_data else num_nodes - - for i in range(num_nodes): - node_id = i - # Use chunk_index from metadata if possible, otherwise use list index - chunk_idx = qr_data[i].get('chunk_index', i) - label = f"{chunk_idx + 1}/{total_chunks_from_meta}" - # Add indicator if decode failed - if qr_data[i].get('data') == "[DECODE FAILED]": - label += "\n(Decode Err)" - node_labels[node_id] = label - G.add_node(node_id, path=qr_paths[i], data=qr_data[i]) - - # Add edges between consecutive nodes - if i > 0: - G.add_edge(i - 1, i) - - # Simple coloring/sizing (can be customized further) - node_colors.append('#4299e1') # Default blue color - node_sizes.append(1500) - - if not G.nodes: - logger.warning("No nodes to visualize.") - return None - - # --- Layout and Drawing --- - plt.style.use('seaborn-v0_8-whitegrid') # Use a clean style - plt.figure(figsize=(max(10, num_nodes * 1.5), 6)) # Adjust figure size based on number of nodes, slightly taller - - # Simple linear layout for sequences is often clearest - pos = {i: (i * 2, 0) for i in range(num_nodes)} # Horizontal layout - - nx.draw_networkx_nodes(G, pos, node_size=node_sizes, node_color=node_colors, alpha=0.9, edgecolors='grey') - nx.draw_networkx_edges(G, pos, arrowstyle='-|>', arrowsize=20, edge_color='gray', alpha=0.6, node_size=node_sizes) - nx.draw_networkx_labels(G, pos, labels=node_labels, font_size=9, font_color='white', font_weight='bold') - - plt.title(title, fontsize=16, pad=20) - plt.xlabel("Sequence Index", fontsize=12) - plt.yticks([]) # Hide Y-axis ticks for linear layout - plt.xticks(range(0, num_nodes * 2, 2), [f"{i+1}" for i in range(num_nodes)]) # Label X-axis ticks - plt.box(False) # Remove frame box - plt.margins(x=0.1) # Add some horizontal margin - plt.tight_layout() - - # Save plot to a BytesIO buffer - buf = io.BytesIO() - plt.savefig(buf, format='png', bbox_inches='tight', dpi=100) - plt.close() # Close the plot figure to free memory - buf.seek(0) - logger.info("Successfully generated visualization image buffer.") - return buf - - except Exception as e: - logger.error(f"Error generating visualization image: {e}", exc_info=True) - plt.close() # Ensure plot is closed even on error - return None - -# --- Gradio Interface Section --- - -def create_qr_sequence_visualizer(output_gallery_ref): # Pass a reference if needed later +def create_qr_sequence_visualizer(output_gallery): """Add QR sequence visualization capabilities to the application""" + # Create a new tab for the QR code sequence visualization with gr.Tab("🔄 QR Sequence Visualizer"): gr.Markdown(""" ## QR Code Sequence Visualizer - Upload a sequence of QR codes (e.g., those generated by this app) to decode them and visualize their order. - Uses OpenCV for detection and decoding. Requires `opencv-python-headless`. + Arrange and visualize your QR code sequences. Enable or disable individual QR codes to see how they connect. """) - # Store data globally within this tab's scope (alternative to Gradio State) - # This is simpler but not ideal for complex state management - shared_data = {'qr_paths': [], 'qr_data': []} - + # Inputs for the visualizer with gr.Row(): - with gr.Column(scale=1): - qr_input = gr.File( - label="Upload QR Code Images", - file_types=["image/png", "image/jpeg", ".png", ".jpg", ".jpeg"], # Be explicit - file_count="multiple" - ) - visualize_btn = gr.Button("👁️ Decode & Visualize Sequence", variant="primary") - reset_btn = gr.Button("🗑️ Reset Visualizer", variant="secondary") - # Use a single Textbox for status messages (including indicator) - visualization_status = gr.Textbox(label="Status", interactive=False, lines=3) + qr_input = gr.File( + label="Upload QR Codes", + file_types=["image/png", "image/jpeg"], + file_count="multiple" + ) - with gr.Column(scale=2): - qr_visualization = gr.Image(label="QR Code Sequence Map", type="pil", height=400) # Use PIL type - qr_preview = gr.Gallery(label="Uploaded QR Codes (Sorted)", columns=4, height=400, object_fit="contain", preview=True) + with gr.Column(): + visualize_btn = gr.Button("🔄 Generate Visualization", variant="primary") + reset_btn = gr.Button("🗑️ Reset", variant="secondary") + # Container for QR code toggles + qr_toggles_container = gr.HTML(label="QR Code Controls") + # Output visualization + with gr.Row(): + qr_visualization = gr.Image(label="QR Code Sequence Map", height=600) + qr_preview = gr.Gallery(label="Selected QR Codes", columns=2, height=600) - def process_qr_codes_and_visualize(files): - """Decodes QR files, sorts them, updates gallery, and generates visualization.""" - if not files: - shared_data['qr_paths'] = [] - shared_data['qr_data'] = [] - return None, None, "⚠️ Please upload QR code images." # Return None for gallery/image, text for status + # Status output + visualization_status = gr.Textbox(label="Visualization Status", interactive=False) - logger.info(f"Processing {len(files)} uploaded QR files for visualization.") - qr_data_list = [] - qr_path_list = [] - decode_errors = 0 + # Function to process uploaded QR codes + def process_qr_codes(files): + if not files: + return "Please upload QR code images.", None, None, "⚠️ No QR codes uploaded" - # Use OpenCV detector try: - detector = cv2.QRCodeDetector() - except Exception as init_e: - logger.error(f"Error initializing OpenCV QR detector: {init_e}") - return None, None, f"❌ Error initializing QR detector: {init_e}" + # Load QR codes and extract metadata + qr_data = [] + qr_paths = [] + for file in files: + try: + img = Image.open(file.name) - for file in files: - file_display_name = getattr(file, 'orig_name', getattr(file, 'name', 'N/A')) - try: - img_path = file.name # Gradio File object path - # Read image using OpenCV - img_cv = cv2.imread(img_path) - if img_cv is None: - logger.warning(f"Could not read image file: {file_display_name}") - decode_errors += 1 - # Add placeholder for gallery consistency? - qr_data_list.append({"data": "[READ FAILED]", "chunk_index": -1, "filename": file_display_name}) - qr_path_list.append(img_path) # Still need path for gallery - continue - - # Try to detect and decode QR code - data, bbox, straight_qrcode = detector.detectAndDecode(img_cv) - - if data: - logger.debug(f"Decoded data from {file_display_name}: {data[:50]}...") - # Try parsing the decoded data as JSON (expected format from generator) + # Try to decode QR code try: - qr_metadata = json.loads(data) - # Check if it looks like our chunk format - if isinstance(qr_metadata, dict) and 'chunk_index' in qr_metadata and 'total_chunks' in qr_metadata: - qr_metadata['filename'] = file_display_name # Add filename for reference - qr_data_list.append(qr_metadata) - qr_path_list.append(img_path) + detector = qrcode.QRCodeDetector() + data, bbox, _ = detector.detectAndDecode(np.array(img)) + if data: + try: + qr_json = json.loads(data) + qr_data.append(qr_json) + qr_paths.append(file.name) + except json.JSONDecodeError: + logger.warning(f"Could not decode JSON from QR: {data}") + qr_data.append({"data": data}) # Store raw data if JSON fails + qr_paths.append(file.name) else: - # Valid JSON, but not the expected chunk structure - logger.warning(f"Decoded valid JSON, but not expected format from {file_display_name}") - qr_data_list.append({"data": qr_metadata, "chunk_index": -1, "filename": file_display_name}) # Assign default index - qr_path_list.append(img_path) - - except json.JSONDecodeError: - # Data decoded, but not JSON - store raw data - logger.warning(f"Could not decode JSON from QR data in {file_display_name}. Storing raw.") - qr_data_list.append({"data": data, "chunk_index": -1, "filename": file_display_name}) # Assign default index - qr_path_list.append(img_path) - except Exception as json_e: - logger.error(f"Error processing decoded JSON from {file_display_name}: {json_e}") - qr_data_list.append({"data": f"Error: {json_e}", "chunk_index": -1, "filename": file_display_name}) - qr_path_list.append(img_path) - decode_errors += 1 - else: - # QR code detected, but no data decoded (or detection failed) - logger.warning(f"Could not decode data from QR image: {file_display_name}") - qr_data_list.append({"data": "[DECODE FAILED]", "chunk_index": -1, "filename": file_display_name}) - qr_path_list.append(img_path) - decode_errors += 1 - + qr_data.append({"data": "Empty QR"}) + qr_paths.append(file.name) + except Exception as e: + logger.warning(f"Could not decode QR: {e}") + # Add with default metadata + qr_data.append({ + "chunk_index": len(qr_data), + "total_chunks": len(files), + "data": "Unknown" + }) + qr_paths.append(file.name) + except Exception as e: + logger.error(f"Error processing QR image {file.name}: {e}") + + if not qr_data: + return "No valid QR codes found.", None, None, "❌ Failed to process QR codes" + + # Sort by chunk_index if available + try: + sorted_data = sorted(zip(qr_data, qr_paths), key=lambda x: x[0].get("chunk_index", 0)) + qr_data = [d[0] for d in sorted_data] + qr_paths = [d[1] for d in sorted_data] except Exception as e: - logger.error(f"Error processing QR image file {file_display_name}: {e}", exc_info=True) - decode_errors += 1 - # Add placeholder if processing failed entirely - qr_data_list.append({"data": "[PROCESS ERROR]", "chunk_index": -1, "filename": file_display_name}) - qr_path_list.append(getattr(file, 'name', None)) # Need path for gallery if possible + logger.error(f"Error sorting QR data: {e}") + # Generate toggle controls HTML + toggle_html = '
' + toggle_html += '

Enable/Disable QR Codes:

' + for i, path in enumerate(qr_paths): + toggle_html += f'
' + toggle_html += '
' - # Filter out entries where path couldn't be determined - valid_entries = [(data, path) for data, path in zip(qr_data_list, qr_path_list) if path is not None] - if not valid_entries: - shared_data['qr_paths'] = [] - shared_data['qr_data'] = [] - return None, None, "❌ No valid QR codes could be processed or decoded." + # Update the toggles container + qr_toggles_container.update(value=toggle_html) - qr_data_list = [item[0] for item in valid_entries] - qr_path_list = [item[1] for item in valid_entries] - - - # Attempt to sort by chunk_index (handle missing index gracefully) - try: - # Create tuples (index, data, path) for sorting - indexed_items = [] - for i, (data, path) in enumerate(zip(qr_data_list, qr_path_list)): - # Use provided chunk_index, fallback to list index if missing or invalid (-1) - sort_key = data.get('chunk_index', i) - if not isinstance(sort_key, int) or sort_key < 0: - sort_key = float('inf') # Put items without valid index at the end - indexed_items.append((sort_key, data, path)) - - # Sort based on the index key - indexed_items.sort(key=lambda x: x[0]) - - # Unpack sorted lists - sorted_qr_data = [item[1] for item in indexed_items] - sorted_qr_paths = [item[2] for item in indexed_items] - - # Update shared data - shared_data['qr_paths'] = sorted_qr_paths - shared_data['qr_data'] = sorted_qr_data - logger.info("Successfully sorted QR data based on chunk_index.") + # Create initial visualization (replace with actual visualization logic) + initial_visualization = "Visualization will appear here." # Replace with your composite image generation + qr_visualization.update(value=initial_visualization) + return "QR codes processed successfully.", qr_paths, qr_data, "✅ Visualization ready!" except Exception as e: - logger.error(f"Error sorting QR data: {e}. Using original order.") - # Use original order if sorting fails - shared_data['qr_paths'] = qr_path_list - shared_data['qr_data'] = qr_data_list - - # Generate the visualization image using the helper function - # Use the sorted data stored in shared_data - visualization_image_buffer = _generate_sequence_visualization_image( - shared_data['qr_paths'], - shared_data['qr_data'], - title=f"Visualized Sequence ({len(shared_data['qr_paths'])} Codes)" - ) - - # Convert buffer to PIL Image for Gradio output if necessary - vis_image_pil = None - if visualization_image_buffer: - try: - vis_image_pil = Image.open(visualization_image_buffer) - except Exception as img_e: - logger.error(f"Failed to load visualization buffer into PIL Image: {img_e}") - - - status_indicator = "✅" if vis_image_pil else "⚠️" - status_message = f"Processed {len(shared_data['qr_paths'])} QR codes." - if decode_errors > 0: - status_message += f" ({decode_errors} decode/read errors)" - status_message += "\nSequence visualized." if vis_image_pil else "\nVisualization generation failed." - final_status_message = f"{status_indicator} {status_message}" + logger.error(f"Error processing QR codes: {e}") + return "An error occurred while processing QR codes.", None, None, "❌ Error" - - # Update outputs: Gallery with sorted paths, Image with visualization, Status text - gallery_output = shared_data['qr_paths'] - - # Return gallery paths, visualization PIL image, combined status message - return gallery_output, vis_image_pil, final_status_message - - - def reset_visualizer_state(): - shared_data['qr_paths'] = [] - shared_data['qr_data'] = [] - logger.info("Resetting QR visualizer state.") - # Clear gallery, image, file input, status text - reset_status_message = "⚪ Visualizer Reset. Upload new QR codes." - return None, None, None, reset_status_message + # Function to generate visualization (replace with actual logic) + def generate_visualization(qr_paths): + enabled_indices = [i for i in range(len(qr_paths))] # Start with all enabled + composite_image = "Updated visualization will appear here." # Replace with your composite image generation based on enabled_indices + qr_visualization.update(value=composite_image) # Event handlers - visualize_btn.click( - process_qr_codes_and_visualize, - inputs=[qr_input], - # Map function outputs to Gradio components - outputs=[qr_preview, qr_visualization, visualization_status] # Gallery, Image, Status Text - ).then( - lambda: logger.info("Visualization process complete."), inputs=None, outputs=None - ) - - - reset_btn.click( - reset_visualizer_state, - inputs=[], - outputs=[qr_preview, qr_visualization, qr_input, visualization_status] # Clear gallery, image, file input, status text - ) + visualize_btn.click(process_qr_codes, inputs=qr_input, outputs=[visualization_status, qr_visualization, qr_preview]) + reset_btn.click(lambda: (None, None, None, "⚠️ Visualization reset."), outputs=[visualization_status, qr_visualization, qr_preview]) + +# Integrate the visualizer into the main application +def visualize_qr_codes(qr_paths): + """Visualize the generated QR codes with enable/disable functionality""" + # This function currently receives the output gallery content (list of file paths) + # You might need to adapt this based on how you want to visualize. + # For now, let's just log the paths. + logger.info(f"Visualizing QR codes: {qr_paths}") + return "Visualization placeholder" # Replace with actual visualization logic def create_modern_interface(): """Create a modern and visually appealing Gradio interface""" @@ -1358,459 +635,272 @@ def create_modern_interface(): --success-color: #48bb78; --error-color: #f56565; --warning-color: #ed8936; - --text-color: #1a202c; - --border-color: #e2e8f0; } - body { font-family: sans-serif; color: var(--text-color); } /* Container styling */ - .gradio-container { /* Target the main container */ + .container { + max-width: 1200px; + margin: auto; + padding: 2rem; background-color: var(--background-color); border-radius: 1rem; box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); - padding: 1rem; /* Add padding to container */ } /* Component styling */ - .input-container { /* Custom class if needed, otherwise target Gradio classes */ + .input-container { background-color: white; padding: 1.5rem; border-radius: 0.5rem; - border: 1px solid var(--border-color); + border: 1px solid #e2e8f0; margin-bottom: 1rem; } /* Button styling */ - .gradio-button { /* Target Gradio buttons */ - transition: all 0.2s; - border-radius: 0.375rem; + .primary-button { + background-color: var(--primary-color); + color: white; padding: 0.75rem 1.5rem; - } - .gradio-button.primary { /* Primary variant */ - background-color: var(--primary-color) !important; - color: white !important; - border: none; - } - .gradio-button.primary:hover { - background-color: var(--accent-color) !important; - transform: translateY(-1px); - } - .gradio-button.secondary { /* Secondary variant */ - background-color: var(--secondary-color) !important; - color: white !important; + border-radius: 0.375rem; border: none; + cursor: pointer; + transition: all 0.2s; } - .gradio-button.secondary:hover { - background-color: #4a5568 !important; /* Darker secondary */ + .primary-button:hover { + background-color: var(--accent-color); transform: translateY(-1px); } /* Status messages */ - .status { /* Custom class if needed */ + .status { padding: 1rem; border-radius: 0.375rem; margin: 1rem 0; - border: 1px solid transparent; } - .status.success { border-color: var(--success-color); background-color: #f0fff4; color: var(--success-color); } - .status.error { border-color: var(--error-color); background-color: #fff5f5; color: var(--error-color); } - .status.warning { border-color: var(--warning-color); background-color: #fffaf0; color: var(--warning-color); } + .status.success { background-color: #f0fff4; color: var(--success-color); } + .status.error { background-color: #fff5f5; color: var(--error-color); } + .status.warning { background-color: #fffaf0; color: var(--warning-color); } /* Gallery styling */ - .gradio-gallery { /* Target Gradio gallery */ + .gallery { + display: grid; + grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 1rem; padding: 1rem; background-color: white; border-radius: 0.5rem; - border: 1px solid var(--border-color); - min-height: 150px; /* Ensure gallery has some height */ + border: 1px solid #e2e8f0; } - /* Style gallery images */ - .gradio-gallery > div[data-testid="gallery-item"] > img { - object-fit: contain !important; /* Use contain to avoid stretching */ + .gallery img { + width: 100%; + height: auto; border-radius: 0.375rem; transition: transform 0.2s; - border: 1px solid #eee; /* Add subtle border */ - background-color: #f8f9fa; /* Light background for images */ } - .gradio-gallery > div[data-testid="gallery-item"] > img:hover { + .gallery img:hover { transform: scale(1.05); - box-shadow: 0 2px 4px rgba(0,0,0,0.1); /* Add hover shadow */ - } - /* Tab styling */ - .gradio-tabs > .tabs > .tab-nav > button { /* Target tab buttons */ - padding: 0.75rem 1rem; - border-radius: 0.375rem 0.375rem 0 0; - font-weight: 500; - } - .gradio-tabs > .tabs > .tab-nav > button.selected { /* Selected tab */ - background-color: white !important; - border-bottom: 2px solid var(--primary-color) !important; - } - /* Textbox/Textarea styling */ - .gradio-textbox, .gradio-textarea { - border-radius: 0.375rem !important; - border: 1px solid var(--border-color) !important; - } - .gradio-textbox:focus, .gradio-textarea:focus { - border-color: var(--accent-color) !important; - box-shadow: 0 0 0 1px var(--accent-color) !important; } """ # Create interface with modern design - with gr.Blocks(css=css, title="Advanced Data Processor & QR Generator", theme=gr.themes.Soft()) as interface: + with gr.Blocks(css=css, title="Advanced Data Processor & QR Generator") as interface: gr.Markdown(""" # 🌐 Advanced Data Processing & QR Code Generator - Transform URLs, files (Text, JSON, PDF, Archives), or direct input into sequenced QR codes. Visualize sequences. + Transform your data into beautifully designed, sequenced QR codes with our cutting-edge processor. """) - - with gr.Tab("⚙️ Data Processor & QR Generator"): + with gr.Tab("📝 URL Processing"): + url_input = gr.Textbox( + label="Enter URLs (comma or newline separated)", + lines=5, + placeholder="https://example1.com\nhttps://example2.com", + value="" + ) + with gr.Tab("📁 File Input"): + file_input = gr.File( + label="Upload Files", + file_types=["*"], # Accept all file types + file_count="multiple" + ) + with gr.Tab("📋 JSON Input"): + text_input = gr.TextArea( + label="Direct JSON Input", + lines=15, + placeholder="Paste your JSON data here...", + value="" + ) with gr.Row(): - with gr.Column(scale=2): - # Input Tabs - with gr.Tabs(): - with gr.TabItem("📝 URL Input"): - url_input = gr.Textbox( - label="Enter URLs (one per line or comma-separated)", - lines=5, - placeholder="https://example1.com\nhttps://example2.com", - elem_id="url-input" - ) - with gr.TabItem("📁 File Input"): - file_input = gr.File( - label="Upload Files (Text, JSON, PDF, Archives: zip, tar, gz, bz2)", - file_count="multiple", - # Specify supported types explicitly for better UX - file_types=[".txt", ".json", ".csv", ".md", ".xml", ".html", ".pdf", ".zip", ".tar", ".gz", ".bz2", ".tgz", ".tar.gz", ".tar.bz2"], - elem_id="file-input" - ) - with gr.TabItem("📋 Direct Input / JSON"): - text_input = gr.TextArea( - label="Direct Text/JSON Input", - lines=10, - placeholder="Paste your text or JSON data here...", - elem_id="text-input" - ) - with gr.Row(): - example_btn = gr.Button("📝 Load JSON Example") - clear_btn = gr.Button("🗑️ Clear Input") - - # Processing Options & Button - with gr.Row(equal_height=True): - with gr.Column(scale=1, min_width=150): - combine_data = gr.Checkbox( - label="Combine all inputs", - value=True, # Default to combined - info="Create one sequence from all sources." - ) - with gr.Column(scale=2): - process_btn = gr.Button( - "🔄 Process & Generate QR Codes", - variant="primary", - elem_id="process-button" - ) - - # Status Output - output_text = gr.Textbox( - label="Processing Status", - interactive=False, - lines=4, # Increased lines for more status info - elem_id="status-output" - ) - - - with gr.Column(scale=3): - # Output Area - gr.Markdown("### Results") - with gr.Tabs(): - with gr.TabItem("🖼️ QR Codes"): - output_gallery = gr.Gallery( - label="Generated QR Codes", - columns=4, # Adjust columns as needed - height=500, # Adjust height - object_fit="contain", - preview=True, # Enable preview click - elem_id="qr-gallery" - ) - with gr.TabItem("📄 Processed Data (JSON)"): - output_json = gr.JSON( - label="Processed Data Structure", - elem_id="json-output" - ) - - # Load example data - def load_example(): - example = { - "project": "Data Transfer Example", - "version": 1.1, - "items": [ - {"id": "A001", "name": "Item One", "value": 123.45, "tags": ["tag1", "tag2"]}, - {"id": "B002", "name": "Item Two", "value": 67.89, "enabled": True} - ], - "timestamp": datetime.now().isoformat() - } - return json.dumps(example, indent=2) + example_btn = gr.Button("📝 Load Example", variant="secondary") + clear_btn = gr.Button("🗑️ Clear", variant="secondary") + with gr.Row(): + combine_data = gr.Checkbox( + label="Combine all data into sequence ", + value=True, + info="Generate sequential QR codes for combined data" + ) + process_btn = gr.Button( + "🔄 Process & Generate QR", + variant="primary" + ) + # Output components + output_json = gr.JSON(label="Processed Data") + output_gallery = gr.Gallery( + label="Generated QR Codes", + columns=3, + height=400, + show_label=True + ) + output_text = gr.Textbox( + label="Processing Status", + interactive=False + ) - def clear_input_area(): - # Clear only the direct text input area - return "" + # Load example data + def load_example(): + example = { + "type": "product_catalog", + "items": [ + { + "id": "123", + "name": "Premium Widget", + "description": "High-quality widget with advanced features", + "price": 299.99, + "category": "electronics", + "tags": ["premium", "featured", "new"] + }, + { + "id": "456", + "name": "Basic Widget", + "description": "Reliable widget for everyday use", + "price": 149.99, + "category": "electronics", + "tags": ["basic", "popular"] + } + ], + "metadata": { + "timestamp": datetime.now().isoformat(), + "version": "2.0", + "source": "example" + } + } + return json.dumps(example, indent=2) - # --- Main Processing Function --- - def process_inputs_and_generate_qrs(urls, files, text, combine, progress=gr.Progress(track_tqdm=True)): - """Process all inputs, combine if requested, and generate QR codes.""" - start_time = time.time() - logger.info("Starting data processing...") - status_updates = [] - all_processed_data = [] # List to hold results from all sources + def clear_input(): + return "" + def process_inputs(urls, files, text, combine): + """Process all inputs and generate QR codes""" + try: + results = [] url_processor = EnhancedURLProcessor() file_processor = EnhancedFileProcessor() - # Estimate total steps for progress bar - num_urls = len(re.split(r'[,\n]+', urls.strip())) if urls and urls.strip() else 0 - num_files = len(files) if files else 0 - has_text = 1 if text and text.strip() else 0 - total_steps = num_urls + num_files + has_text + 1 # +1 for QR generation step - - step_counter = 0 - - # Wrap iterables with tqdm for progress tracking - url_list = [] - if urls and urls.strip(): - url_list = [u.strip() for u in re.split(r'[,\n]+', urls.strip()) if u.strip()] # Clean up - - file_list = files if files else [] - text_input_present = text and text.strip() - - # 1. Process URLs - if url_list: - status_updates.append(f"Processing {len(url_list)} URLs...") - logger.info(f"Processing URLs: {url_list}") - # Use tqdm description for progress - desc = f"Processing URLs" - for i, url in enumerate(progress.tqdm(url_list, desc=desc, total=len(url_list))): - step_counter += 1 - # progress(step_counter / total_steps, desc=f"URL {i+1}/{len(url_list)}") - logger.info(f"Processing URL {i+1}/{len(url_list)}: {url}") - # Basic validation before fetching - if not validators.url(url): - logger.warning(f"Skipping invalid URL format: {url}") - status_updates.append(f"⚠️ Skipped invalid URL: {url[:50]}...") - all_processed_data.append({'error': 'Invalid URL format', 'url': url}) - continue - - content_data = url_processor.fetch_content(url) - if content_data and 'content' in content_data: - logger.info(f"Successfully fetched content from {url} (Size: {content_data['metadata'].get('content_length_bytes','N/A')} bytes)") - # Structure the result similarly to file processing output - processed_url_data = { - 'source': 'url', - 'url': content_data['metadata']['final_url'], # Use final URL after redirects - 'content': content_data['content'], # Processed text content - # 'raw_bytes': content_data['raw_bytes'], # Don't include raw bytes in final JSON unless needed - 'metadata': { # Select relevant metadata - 'original_url': url, - 'content_type': content_data['metadata']['content_type'], - 'status_code': content_data['metadata']['status_code'], - 'encoding_used': content_data['metadata']['encoding_used'], - 'content_length_bytes': content_data['metadata']['content_length_bytes'] - }, - 'timestamp': datetime.now().isoformat() - } - all_processed_data.append(processed_url_data) - status_updates.append(f"✓ Fetched: {url[:60]}...") - else: - logger.error(f"Failed to fetch content from URL: {url}") - status_updates.append(f"❌ Failed fetch: {url[:60]}...") - all_processed_data.append({'error': 'Failed to fetch content', 'url': url}) - - # 2. Process Files - if file_list: - status_updates.append(f"Processing {len(file_list)} uploaded files...") - logger.info(f"Processing {len(file_list)} files.") - desc = f"Processing Files" - for i, file_obj in enumerate(progress.tqdm(file_list, desc=desc, total=len(file_list))): - step_counter += 1 - # progress(step_counter / total_steps, desc=f"File {i+1}/{len(file_list)}") - file_name_for_log = getattr(file_obj, 'orig_name', getattr(file_obj, 'name', 'N/A')) # Try orig_name first - logger.info(f"Processing file {i+1}/{len(file_list)}: {file_name_for_log}") - try: - # Pass the Gradio file object directly to process_file - file_results = file_processor.process_file(file_obj) - if file_results: - # Filter out results that are just errors before extending - valid_results = [res for res in file_results if 'error' not in res] - error_results = [res for res in file_results if 'error' in res] - - if valid_results: - all_processed_data.extend(valid_results) - processed_filenames = [res.get('filename', 'N/A') for res in valid_results] - status_updates.append(f"✓ Processed: {', '.join(processed_filenames)}") - logger.info(f"Successfully processed: {', '.join(processed_filenames)}") - if error_results: - all_processed_data.extend(error_results) # Keep errors for JSON output - error_filenames = [res.get('filename', 'N/A') for res in error_results] - status_updates.append(f"❌ Errors processing: {', '.join(error_filenames)}") - logger.warning(f"Errors processing: {', '.join(error_filenames)}") - - if not valid_results and not error_results: - status_updates.append(f"⚠️ No data extracted from: {file_name_for_log}") - logger.warning(f"No data extracted from: {file_name_for_log}") - all_processed_data.append({'error': 'No data extracted', 'filename': file_name_for_log}) - - except Exception as file_proc_err: - file_name = getattr(file_obj, 'orig_name', getattr(file_obj, 'name', 'N/A')) - logger.error(f"Error processing file {file_name}: {file_proc_err}", exc_info=True) - status_updates.append(f"❌ Error processing file: {file_name}") - all_processed_data.append({'error': f'File processing error: {file_proc_err}', 'filename': file_name}) - - - # 3. Process Direct Text/JSON Input - if text_input_present: - step_counter += 1 - progress(step_counter / total_steps, desc="Processing Direct Input") - status_updates.append("Processing direct input...") - logger.info("Processing direct text/JSON input.") - # Attempt to parse as JSON first + # Process JSON input + if text and text.strip(): try: json_data = json.loads(text) - logger.info("Direct input parsed as JSON.") - processed_text_data = { - 'source': 'direct_json', - 'content': json_data, # Parsed JSON object/list - 'raw_content': text, # Original string - 'timestamp': datetime.now().isoformat() - } - all_processed_data.append(processed_text_data) - status_updates.append("✓ Processed direct input as JSON.") - except json.JSONDecodeError: - # If not JSON, treat as plain text - logger.info("Direct input treated as plain text.") - processed_text_data = { - 'source': 'direct_text', - 'content': text, # Store as plain text - 'timestamp': datetime.now().isoformat() - } - all_processed_data.append(processed_text_data) - status_updates.append("✓ Processed direct input as Text.") - except Exception as direct_input_err: - logger.error(f"Error processing direct input: {direct_input_err}", exc_info=True) - status_updates.append(f"❌ Error processing direct input.") - all_processed_data.append({'error': f'Direct input error: {direct_input_err}', 'source': 'direct_input'}) - - - # 4. Check if any valid data was processed - valid_processed_data = [d for d in all_processed_data if 'error' not in d] - if not valid_processed_data: - logger.warning("No valid data sources found or processed.") - status_updates.append("⚠️ No valid data to process. Please provide input or check errors.") - final_status = "\n".join(status_updates) - # Return the error data for JSON view, empty gallery, and status - return all_processed_data, [], final_status - - logger.info(f"Total valid processed data items: {len(valid_processed_data)}") - status_updates.append(f"Data processed ({len(valid_processed_data)} valid items). Generating QR codes...") - - # 5. Generate QR Codes using only valid data - qr_paths = [] - progress(step_counter / total_steps, desc="Generating QR Codes") # Update progress before QR step - try: - # Pass the list of *valid* processed data items - data_for_qr = valid_processed_data if combine else valid_processed_data # Pass the list itself if not combining - qr_paths = generate_qr_codes(data_for_qr, combine) - if qr_paths: - status_updates.append(f"✓ Generated {len(qr_paths)} QR codes.") - logger.info(f"Successfully generated {len(qr_paths)} QR codes.") - else: - # Check if chunking failed or QR generation failed for all chunks - if not valid_processed_data: # Should have been caught earlier, but double-check - status_updates.append("❌ No valid data was available for QR generation.") - else: - status_updates.append("❌ QR code generation failed or produced no codes (check logs for details).") - logger.error("QR code generation returned no paths despite valid input data.") - # Keep processed data, but gallery will be empty - - except Exception as qr_gen_err: - logger.error(f"Error during QR code generation step: {qr_gen_err}", exc_info=True) - status_updates.append(f"❌ Error generating QR codes: {qr_gen_err}") - # Keep processed data, gallery will be empty - - - # 6. Finalize and Return - end_time = time.time() - processing_time = end_time - start_time - status_updates.append(f"Total processing time: {processing_time:.2f} seconds.") - final_status = "\n".join(status_updates) - - # Return processed data (including errors for JSON view), QR paths (for Gallery), and status string - # Ensure qr_paths is a list of strings - qr_paths_str = [str(p) for p in qr_paths] if qr_paths else [] - - # Return all data (including errors) for JSON output, gallery paths, and status text - return all_processed_data, qr_paths_str, final_status - - - # --- Event Handlers --- - example_btn.click(load_example, outputs=[text_input]) - clear_btn.click(clear_input_area, outputs=[text_input]) - - process_btn.click( - process_inputs_and_generate_qrs, - inputs=[url_input, file_input, text_input, combine_data], - outputs=[output_json, output_gallery, output_text] # Match function return order - ) - - # Add helpful documentation - gr.Markdown(""" - ### 🚀 Features - - **Complete URL Scraping**: Extracts text content from web pages (HTML, Text, JSON, PDF). Follows redirects. - - **Advanced File Processing**: Handles text, JSON, PDF, and archives (.zip, .tar.*, .gz, .bz2). Extracts archive contents. Attempts intelligent JSON detection. - - **Direct Input**: Paste text or JSON directly. - - **Sequential QR Codes**: Chunks large data and embeds sequencing info. Option to combine inputs. Uses Error Correction Level H for robustness. - - **Modern Design**: Clean, responsive interface with progress tracking. - ### 💡 Tips - 1. **Inputs**: Use any combination of URL, File, or Direct Input tabs. - 2. **Combine**: Check 'Combine all inputs' to create one QR sequence from all sources. Uncheck to get separate QR sequences for each source (URL, file within archive, direct input). - 3. **Files**: Upload text-based files, JSON, PDF, or supported archives. Content from archives is extracted and processed individually. Large files up to 5GB are supported (but QR generation may fail for very large content). - 4. **JSON**: Use the example button or upload a `.json` file. The app also tries to parse `.txt` or other files as JSON if they contain valid JSON structure. - 5. **Status**: Monitor the Processing Status box for feedback, including errors and progress. - ### 🎨 Output - - Generated QR codes appear in the 'QR Codes' tab and are saved in the `output/qr_codes` directory. - - The structured data processed from all inputs (including any errors) is shown in the 'Processed Data (JSON)' tab. - - Hover over or click QR codes in the gallery for a larger preview. - - Use the 'QR Sequence Visualizer' tab to decode and verify sequences. - """) + if isinstance(json_data, list): + results.extend(json_data) + else: + results.append(json_data) + except json.JSONDecodeError as e: + return None, [], f"❌ Invalid JSON format: {str(e)}" + + # Process URLs + if urls and urls.strip(): + url_list = re.split(r'[,\n]', urls) + url_list = [url.strip() for url in url_list if url.strip()] + for url in url_list: + validation = url_processor.validate_url(url) + if validation['is_valid']: + content_data = url_processor.fetch_content(url) + if content_data and 'content' in content_data: + # Chunk the content of each URL + chunks = file_processor.chunk_data(content_data['content']) + for i, chunk in enumerate(chunks): + results.append({ + 'source': 'url', + 'url': url, + 'chunk_index': i + 1, + 'total_chunks': len(chunks), + 'content': chunk['data'], # Store the chunked data + 'timestamp': datetime.now().isoformat() + }) + + # Process files + if files: + for file in files: + file_results = file_processor.process_file(file) + if file_results: + results.extend(file_results) + + # Generate QR codes + if results: + qr_paths = generate_qr_codes(results, combine) + if qr_paths: + return ( + results, + [str(path) for path in qr_paths], + f"✅ Successfully processed {len(results)} items and generated {len(qr_paths)} QR codes!" + ) + else: + return None, [], "❌ Failed to generate QR codes" + else: + return None, [], "⚠️ No valid content to process" + except Exception as e: + logger.error(f"Processing error: {e}") + return None, [], f"❌ Error: {str(e)}" + + # Set up event handlers + example_btn.click(load_example, outputs=[text_input]) + clear_btn.click(clear_input, outputs=[text_input]) + process_btn.click( + process_inputs, + inputs=[url_input, file_input, text_input, combine_data], + outputs=[output_json, output_gallery, output_text] + ) - # Add the QR sequence visualizer tab - create_qr_sequence_visualizer(output_gallery) # Pass gallery if needed + # Add the visualization button and its click event within the interface scope + visualize_btn = gr.Button("🔍 Visualize QR Codes") + visualize_btn.click(visualize_qr_codes, inputs=output_gallery, outputs=None) - return interface + # Add helpful documentation + gr.Markdown(""" + ### 🚀 Features + - **Complete URL Scraping**: Extracts every character from web pages + - **Advanced File Processing**: Full content extraction from various text-based files and common archives. Supports flexible JSON handling. + - **Smart JSON Handling**: Processes any size JSON with automatic chunking, either via direct input or file upload. + - **Sequential QR Codes**: Maintains data integrity across multiple codes + - **Modern Design**: Clean, responsive interface with visual feedback + ### 💡 Tips + 1. **URLs**: Enter multiple URLs separated by commas or newlines + 2. **Files**: Upload any type of file. The processor will attempt to handle supported text-based files, archives (.zip, .tar, .gz), and JSON files. + 3. **JSON**: Use the example button to see the expected format or upload a .json file. The system will also try to detect JSON content in other file types. + 4. **QR Codes**: Choose whether to combine data into sequential codes + 5. **Processing**: Monitor the status for real-time feedback + ### 🎨 Output + - Generated QR codes are saved in the `output/qr_codes` directory + - Each QR code contains metadata for proper sequencing + - Hover over QR codes in the gallery to see details + """) + return interface def main(): """Initialize and launch the application""" try: - # Configure system settings if needed - mimetypes.init() # Ensure mime types are loaded + # Configure system settings + mimetypes.init() - logger.info("Starting Gradio application...") # Create and launch interface interface = create_modern_interface() + # Add the QR sequence visualizer tab + with interface: + create_qr_sequence_visualizer(None) # output_gallery might not be relevant here + # Launch with configuration interface.launch( - share=os.getenv("GRADIO_SHARE", "false").lower() == "true", # Allow sharing via env var - debug=os.getenv("GRADIO_DEBUG", "false").lower() == "true", # Allow debug via env var - show_error=True, # Show Python errors in browser console - server_name="0.0.0.0", # Bind to all interfaces for container/network access - server_port=int(os.getenv("GRADIO_PORT", 7860)), # Allow port config via env var - show_api=False, # Disable default Gradio API endpoint unless needed - # enable_queue=True # Consider enabling queue for longer tasks + share=False, + debug=False, + show_error=True, + show_api=False ) - logger.info("Gradio application stopped.") except Exception as e: - logger.critical(f"Application startup or runtime error: {e}", exc_info=True) - # Optionally add a small delay or specific cleanup before exiting - time.sleep(1) - raise # Reraise critical errors + logger.error(f"Application startup error: {e}") + raise if __name__ == "__main__": - # Ensure output directories exist before starting - OUTPUTS_DIR.mkdir(parents=True, exist_ok=True) - QR_CODES_DIR.mkdir(parents=True, exist_ok=True) - TEMP_DIR.mkdir(parents=True, exist_ok=True) main() \ No newline at end of file