diff --git "a/app.py" "b/app.py" --- "a/app.py" +++ "b/app.py" @@ -17,7 +17,7 @@ import gradio as gr # from diskcache import Cache # Diskcache not used in the provided code, commented out from bs4 import BeautifulSoup from fake_useragent import UserAgent -from cleantext import clean +from cleantext import clean # Keep import, even if direct use is commented, in case of future use or indirect dependency needs import qrcode import qrcode.constants # Explicit import for constants import qrcode.exceptions # Explicit import for exceptions @@ -31,6 +31,8 @@ import matplotlib.pyplot as plt from matplotlib.colors import to_rgba import io import math +import cv2 # Imported for QR code detection + try: import PyPDF2 # Added for PDF processing from PyPDF2.errors import PdfReadError, DependencyError # Specific PDF errors @@ -42,7 +44,7 @@ except ImportError: class DependencyError(Exception): pass print("--------------------------------------------------------------------") print("WARNING: PyPDF2 not installed. PDF processing will be disabled.") - print(" Install with: pip install pypdf2") + print(" Install with: pip install pypdf2[crypto]") print("--------------------------------------------------------------------") @@ -76,11 +78,14 @@ class EnhancedURLProcessor: self.session = requests.Session() self.timeout = 15 # Extended timeout for larger content self.max_retries = 3 - self.user_agent = UserAgent() + try: + self.user_agent = UserAgent() + except Exception: # Handle potential errors fetching user agents list + logger.warning("Failed to initialize UserAgent. Using default.") + self.user_agent = None # Fallback # Enhanced headers for better site compatibility - self.session.headers.update({ - 'User-Agent': self.user_agent.random, + headers = { 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.9', 'Accept-Language': 'en-US,en;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', @@ -91,7 +96,14 @@ class EnhancedURLProcessor: 'Sec-Fetch-Site': 'none', 'Sec-Fetch-User': '?1', 'DNT': '1' # Do Not Track - }) + } + if self.user_agent: + headers['User-Agent'] = self.user_agent.random + else: + headers['User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' # Default fallback + + self.session.headers.update(headers) + def validate_url(self, url: str) -> Dict: """Enhanced URL validation with detailed feedback using HEAD and GET.""" @@ -106,9 +118,13 @@ class EnhancedURLProcessor: is_valid = False validation_message = "Validation failed" + # Update User-Agent for this specific request + current_ua = self.user_agent.random if self.user_agent else self.session.headers['User-Agent'] + req_headers = {'User-Agent': current_ua} + # Try HEAD request first try: - head_response = self.session.head(url, timeout=5, allow_redirects=True) + head_response = self.session.head(url, timeout=5, allow_redirects=True, headers=req_headers) head_response.raise_for_status() details = { 'method': 'HEAD', @@ -125,7 +141,7 @@ class EnhancedURLProcessor: logger.warning(f"HEAD request failed for {url}: {head_err}. Trying GET.") # If HEAD fails, try GET (stream=True to avoid downloading full content) try: - get_response = self.session.get(url, timeout=self.timeout, stream=True, allow_redirects=True) + get_response = self.session.get(url, timeout=self.timeout, stream=True, allow_redirects=True, headers=req_headers) get_response.raise_for_status() details = { 'method': 'GET', @@ -156,9 +172,10 @@ class EnhancedURLProcessor: """Fetches URL content with retries, handles various types (HTML, Text, JSON, PDF).""" try: logger.info(f"Fetching content from URL: {url} (Attempt {retry_count + 1}/{self.max_retries})") - self.session.headers.update({'User-Agent': self.user_agent.random}) + current_ua = self.user_agent.random if self.user_agent else self.session.headers['User-Agent'] + req_headers = {'User-Agent': current_ua} - response = self.session.get(url, timeout=self.timeout, allow_redirects=True) + response = self.session.get(url, timeout=self.timeout, allow_redirects=True, headers=req_headers) response.raise_for_status() # --- Encoding Detection --- @@ -214,11 +231,19 @@ class EnhancedURLProcessor: processed_content = "[PDF Content - PyPDF2 not installed]" else: logger.info(f"Fetched non-text/HTML/JSON/PDF content ({content_type}) from {url}. Storing raw string.") - processed_content = raw_content_str if raw_content_str else "[Binary or Undecodable Content]" + # Limit storage of potentially large binary data represented as string + max_raw_str_len = 10000 + if raw_content_str and len(raw_content_str) > max_raw_str_len: + processed_content = raw_content_str[:max_raw_str_len] + f"... [truncated {len(raw_content_str) - max_raw_str_len} chars]" + elif raw_content_str: + processed_content = raw_content_str + else: + processed_content = "[Binary or Undecodable Content]" + return { 'content': processed_content, - 'raw_bytes': response.content, # Keep raw bytes if needed + 'raw_bytes': response.content, # Keep raw bytes if needed for specific handling later 'metadata': metadata } @@ -246,18 +271,26 @@ class EnhancedURLProcessor: def _process_html_content(self, content: str, base_url: str) -> str: """Extracts text from HTML, attempts to absolutize links.""" try: - soup = BeautifulSoup(content, 'html.parser') + soup = BeautifulSoup(content, 'lxml') # Use lxml parser # Absolutize links (best effort) for tag in soup.find_all(['a', 'img', 'link', 'script'], href=True) + soup.find_all(['img', 'script'], src=True): attr = 'href' if tag.has_attr('href') else 'src' if tag[attr]: try: + # Handle cases where tag[attr] might be a list (rare, but possible) + attr_value = tag[attr] + if isinstance(attr_value, list): + attr_value = attr_value[0] # Take the first one + + if not isinstance(attr_value, str): continue # Skip if not a string + base_tag = soup.find('base') current_base = base_tag['href'] if base_tag and base_tag.get('href') else base_url - abs_url = urljoin(current_base, tag[attr]) + abs_url = urljoin(current_base, attr_value) if validators.url(abs_url): # Check if the result is a valid URL tag[attr] = abs_url - except Exception: + except Exception as url_e: + logger.debug(f"Ignoring error during URL absolutization: {url_e}") pass # Ignore errors in URL joining/validation # Extract text content, joined by newlines @@ -266,7 +299,7 @@ class EnhancedURLProcessor: # text_content = clean(text_content, no_line_breaks=False, lower=False) return text_content except Exception as e: - logger.error(f"HTML processing error for base URL {base_url}: {e}") + logger.error(f"HTML processing error for base URL {base_url}: {e}", exc_info=True) return content # Return original content on error def _extract_pdf_text_from_bytes(self, pdf_bytes: bytes, source_desc: str) -> str: @@ -282,8 +315,12 @@ class EnhancedURLProcessor: is_decrypted = False if isinstance(decrypt_status, int): # Older PyPDF2 versions is_decrypted = decrypt_status > 0 - elif hasattr(PyPDF2, 'PasswordType'): # Newer PyPDF2 versions + elif hasattr(PyPDF2.PasswordType, 'OWNER_PASSWORD'): # Newer PyPDF2 versions (check attribute exists) is_decrypted = decrypt_status in (PyPDF2.PasswordType.OWNER_PASSWORD, PyPDF2.PasswordType.USER_PASSWORD) + else: # Fallback check if PasswordType structure changes + logger.warning("Could not determine PyPDF2 decryption status type, assuming success if no error.") + is_decrypted = True # Assume success if decrypt didn't raise error + if is_decrypted: logger.info(f"Successfully decrypted PDF from {source_desc} with empty password.") @@ -340,25 +377,29 @@ class EnhancedFileProcessor: return [] dataset = [] - file_path_obj = Path(file.name) + # Use orig_name if available (better for Gradio temp files) + file_display_name = getattr(file, 'orig_name', getattr(file, 'name', 'Unknown File')) + file_path_obj = Path(file.name) if hasattr(file, 'name') else None + + if not file_path_obj: + logger.error("File object missing 'name' attribute.") + return [{'error': 'Invalid file object on server', 'filename': file_display_name}] try: # Gradio file objects might be temporary, ensure path exists if not file_path_obj.exists(): logger.error(f"Temporary file path does not exist: {file_path_obj}") - # Try accessing the original name if available (less reliable) - orig_name = getattr(file, 'orig_name', file_path_obj.name) - return [{'error': 'File not found on server', 'filename': orig_name}] + return [{'error': 'File not found on server', 'filename': file_display_name}] file_path = file_path_obj.resolve() # Get absolute path file_size = file_path.stat().st_size if file_size > self.max_file_size: - logger.warning(f"File '{file_path.name}' size ({file_size} bytes) exceeds limit ({self.max_file_size} bytes).") - return [{'error': 'File too large', 'filename': file_path.name, 'file_size': file_size}] + logger.warning(f"File '{file_display_name}' size ({file_size} bytes) exceeds limit ({self.max_file_size} bytes).") + return [{'error': 'File too large', 'filename': file_display_name, 'file_size': file_size}] if file_size == 0: - logger.warning(f"File '{file_path.name}' is empty.") - return [{'error': 'File is empty', 'filename': file_path.name, 'file_size': 0}] + logger.warning(f"File '{file_display_name}' is empty.") + return [{'error': 'File is empty', 'filename': file_display_name, 'file_size': 0}] # Use a temporary directory for extraction if needed @@ -366,15 +407,15 @@ class EnhancedFileProcessor: temp_dir_path = Path(temp_dir) if self._is_archive(str(file_path)): - logger.info(f"Processing archive: {file_path.name}") - dataset.extend(self._process_archive(str(file_path), temp_dir_path)) + logger.info(f"Processing archive: {file_display_name}") + dataset.extend(self._process_archive(str(file_path), temp_dir_path, archive_display_name=file_display_name)) else: - logger.info(f"Processing single file: {file_path.name}") - dataset.extend(self._process_single_file(file_path)) + logger.info(f"Processing single file: {file_display_name}") + dataset.extend(self._process_single_file(file_path, file_display_name)) except Exception as e: - logger.error(f"Error processing file '{file_path_obj.name}': {e}", exc_info=True) - dataset.append({'error': f'Processing failed: {e}', 'filename': file_path_obj.name}) + logger.error(f"Error processing file '{file_display_name}': {e}", exc_info=True) + dataset.append({'error': f'Processing failed: {e}', 'filename': file_display_name}) return dataset def _is_archive(self, filepath: str) -> bool: @@ -386,9 +427,9 @@ class EnhancedFileProcessor: return suffix in self.archive_extensions or double_suffix in self.archive_extensions - def _process_single_file(self, file_path: Path) -> List[Dict]: + def _process_single_file(self, file_path: Path, display_name: str) -> List[Dict]: """Processes a single non-archive file (text, json, pdf, binary).""" - file_name = file_path.name + file_name = display_name # Use the potentially original name for reporting file_suffix = file_path.suffix.lower() try: @@ -483,7 +524,7 @@ class EnhancedFileProcessor: # --- Structure Output --- result = { 'source': source, - 'filename': file_name, + 'filename': file_name, # Use display name 'file_size': file_size, 'mime_type': mime_type, 'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(), @@ -492,23 +533,23 @@ class EnhancedFileProcessor: 'timestamp': datetime.now().isoformat() } # Include raw string only if it's text and wasn't successfully parsed as JSON - if raw_content_str is not None and source in ['text_file', 'text_file_failed_json']: + if raw_content_str is not None and source in ['text_file', 'text_file_failed_json', 'json_content_detected_in_text']: result['raw_content'] = raw_content_str return [result] except FileNotFoundError: - logger.error(f"File not found during processing: {file_path}") - return [{'error': 'File not found', 'filename': file_name}] + logger.error(f"File not found during processing: {file_path} (Display Name: {display_name})") + return [{'error': 'File not found', 'filename': display_name}] except Exception as e: - logger.error(f"General file processing error for {file_path.name}: {e}", exc_info=True) - return [{'error': f'File processing failed: {e}', 'filename': file_name}] + logger.error(f"General file processing error for {display_name}: {e}", exc_info=True) + return [{'error': f'File processing failed: {e}', 'filename': display_name}] - def _process_archive(self, archive_path_str: str, extract_to: Path) -> List[Dict]: + def _process_archive(self, archive_path_str: str, extract_to: Path, archive_display_name: str) -> List[Dict]: """Extracts files from supported archives and processes them.""" dataset = [] archive_path = Path(archive_path_str) - archive_name = archive_path.name + archive_name = archive_display_name # Use display name for reporting logger.info(f"Attempting to extract archive: {archive_name}") extracted_something = False @@ -526,14 +567,15 @@ class EnhancedFileProcessor: return [{'error': 'Archive potential bomb or excessively large', 'filename': archive_name}] for member in zip_ref.infolist(): + member_display_name = member.filename # Name inside archive if member.is_dir() or member.file_size == 0: continue # Skip directories and empty files if member.file_size > self.max_file_size: - logger.warning(f"Skipping '{member.filename}' in {archive_name}: size {member.file_size} exceeds limit.") - dataset.append({'error': 'File in archive too large', 'filename': member.filename, 'archive': archive_name}) + logger.warning(f"Skipping '{member_display_name}' in {archive_name}: size {member.file_size} exceeds limit.") + dataset.append({'error': 'File in archive too large', 'filename': member_display_name, 'archive': archive_name}) continue # Sanitize and prevent path traversal - ensure target is within extract_to - member_path = Path(member.filename) + member_path = Path(member_display_name) # Remove leading slashes and '..' components safe_parts = [p for p in member_path.parts if p not in ('', '.', '..')] if not safe_parts: continue # Skip if path becomes empty @@ -541,7 +583,7 @@ class EnhancedFileProcessor: # Double check it's within the extraction directory if not str(target_path).startswith(str(extract_to.resolve())): - logger.warning(f"Skipping potentially unsafe path '{member.filename}' in {archive_name}") + logger.warning(f"Skipping potentially unsafe path '{member_display_name}' in {archive_name}") continue target_path.parent.mkdir(parents=True, exist_ok=True) @@ -554,18 +596,22 @@ class EnhancedFileProcessor: if not chunk: break size_written += len(chunk) if size_written > self.max_file_size: # Check during extraction - raise OverflowError(f"File '{member.filename}' exceeded size limit during extraction.") + raise OverflowError(f"File '{member_display_name}' exceeded size limit during extraction.") target.write(chunk) - logger.debug(f"Extracted '{member.filename}' to '{target_path.relative_to(extract_to)}' from {archive_name}") - dataset.extend(self._process_single_file(target_path)) + logger.debug(f"Extracted '{member_display_name}' to '{target_path.relative_to(extract_to)}' from {archive_name}") + # Process the extracted file, passing its name within the archive + results = self._process_single_file(target_path, member_display_name) + # Add archive context to results + for res in results: res['archive'] = archive_name + dataset.extend(results) extracted_something = True except OverflowError as oe: - logger.error(f"Error extracting {member.filename} from {archive_name}: {oe}") - dataset.append({'error': str(oe), 'filename': member.filename, 'archive': archive_name}) + logger.error(f"Error extracting {member_display_name} from {archive_name}: {oe}") + dataset.append({'error': str(oe), 'filename': member_display_name, 'archive': archive_name}) if target_path.exists(): target_path.unlink() # Clean up partial file except Exception as extract_err: - logger.error(f"Failed to extract/process {member.filename} from {archive_name}: {extract_err}") - dataset.append({'error': f'Extraction/processing failed: {extract_err}', 'filename': member.filename, 'archive': archive_name}) + logger.error(f"Failed to extract/process {member_display_name} from {archive_name}: {extract_err}", exc_info=True) + dataset.append({'error': f'Extraction/processing failed: {extract_err}', 'filename': member_display_name, 'archive': archive_name}) # --- TAR (tar, tar.gz, tgz, tar.bz2) --- elif tarfile.is_tarfile(archive_path): @@ -573,20 +619,21 @@ class EnhancedFileProcessor: # Mode 'r:*' auto-detects compression with tarfile.open(archive_path, 'r:*') as tar_ref: for member in tar_ref.getmembers(): + member_display_name = member.name # Name inside archive if not member.isfile() or member.size == 0: continue # Skip non-files and empty files if member.size > self.max_file_size: - logger.warning(f"Skipping '{member.name}' in {archive_name}: size {member.size} exceeds limit.") - dataset.append({'error': 'File in archive too large', 'filename': member.name, 'archive': archive_name}) + logger.warning(f"Skipping '{member_display_name}' in {archive_name}: size {member.size} exceeds limit.") + dataset.append({'error': 'File in archive too large', 'filename': member_display_name, 'archive': archive_name}) continue # Sanitize and prevent path traversal - member_path = Path(member.name) + member_path = Path(member_display_name) safe_parts = [p for p in member_path.parts if p not in ('', '.', '..')] if not safe_parts: continue target_path = extract_to.joinpath(*safe_parts).resolve() if not str(target_path).startswith(str(extract_to.resolve())): - logger.warning(f"Skipping potentially unsafe path '{member.name}' in {archive_name}") + logger.warning(f"Skipping potentially unsafe path '{member_display_name}' in {archive_name}") continue target_path.parent.mkdir(parents=True, exist_ok=True) @@ -601,26 +648,29 @@ class EnhancedFileProcessor: if not chunk: break size_written += len(chunk) if size_written > self.max_file_size: # Check during extraction - raise OverflowError(f"File '{member.name}' exceeded size limit during extraction.") + raise OverflowError(f"File '{member_display_name}' exceeded size limit during extraction.") target.write(chunk) - logger.debug(f"Extracted '{member.name}' to '{target_path.relative_to(extract_to)}' from {archive_name}") - dataset.extend(self._process_single_file(target_path)) + logger.debug(f"Extracted '{member_display_name}' to '{target_path.relative_to(extract_to)}' from {archive_name}") + # Process the extracted file + results = self._process_single_file(target_path, member_display_name) + for res in results: res['archive'] = archive_name + dataset.extend(results) extracted_something = True else: - logger.warning(f"Could not extract file object for '{member.name}' from {archive_name}") + logger.warning(f"Could not extract file object for '{member_display_name}' from {archive_name}") except OverflowError as oe: - logger.error(f"Error extracting {member.name} from {archive_name}: {oe}") - dataset.append({'error': str(oe), 'filename': member.name, 'archive': archive_name}) + logger.error(f"Error extracting {member_display_name} from {archive_name}: {oe}") + dataset.append({'error': str(oe), 'filename': member_display_name, 'archive': archive_name}) if target_path.exists(): target_path.unlink() except Exception as extract_err: - logger.error(f"Failed to extract/process {member.name} from {archive_name}: {extract_err}") - dataset.append({'error': f'Extraction/processing failed: {extract_err}', 'filename': member.name, 'archive': archive_name}) + logger.error(f"Failed to extract/process {member_display_name} from {archive_name}: {extract_err}", exc_info=True) + dataset.append({'error': f'Extraction/processing failed: {extract_err}', 'filename': member_display_name, 'archive': archive_name}) # --- GZIP (single file) --- elif archive_name.lower().endswith('.gz') and not archive_name.lower().endswith('.tar.gz'): # Avoid double handling tar.gz logger.debug(f"Processing GZIP: {archive_name}") # Determine output filename (remove .gz suffix) - extracted_filename = archive_path.stem + extracted_filename = Path(archive_name).stem if not extracted_filename: extracted_filename = archive_name + "_extracted" # Handle cases like '.gz' target_path = extract_to / extracted_filename target_path.parent.mkdir(parents=True, exist_ok=True) @@ -635,7 +685,10 @@ class EnhancedFileProcessor: raise OverflowError(f"Gzipped file '{archive_name}' exceeded size limit during decompression.") outfile.write(chunk) logger.debug(f"Extracted '{target_path.name}' from {archive_name}") - dataset.extend(self._process_single_file(target_path)) + # Process the extracted file + results = self._process_single_file(target_path, extracted_filename) # Use extracted name + for res in results: res['archive'] = archive_name + dataset.extend(results) extracted_something = True except OverflowError as oe: logger.error(f"Error extracting {archive_name}: {oe}") @@ -645,13 +698,13 @@ class EnhancedFileProcessor: logger.error(f"Error processing GZIP {archive_name}: Bad Gzip File - {e}") dataset.append({'error': f'Bad Gzip File: {e}', 'filename': archive_name}) except Exception as extract_err: - logger.error(f"Failed to extract/process gzip {archive_name}: {extract_err}") + logger.error(f"Failed to extract/process gzip {archive_name}: {extract_err}", exc_info=True) dataset.append({'error': f'Extraction/processing failed: {extract_err}', 'filename': archive_name}) # --- BZ2 (single file) --- elif archive_name.lower().endswith('.bz2') and not archive_name.lower().endswith('.tar.bz2'): # Avoid double handling tar.bz2 logger.debug(f"Processing BZ2: {archive_name}") - extracted_filename = archive_path.stem + extracted_filename = Path(archive_name).stem if not extracted_filename: extracted_filename = archive_name + "_extracted" target_path = extract_to / extracted_filename target_path.parent.mkdir(parents=True, exist_ok=True) @@ -666,14 +719,17 @@ class EnhancedFileProcessor: raise OverflowError(f"Bzipped file '{archive_name}' exceeded size limit during decompression.") outfile.write(chunk) logger.debug(f"Extracted '{target_path.name}' from {archive_name}") - dataset.extend(self._process_single_file(target_path)) + # Process the extracted file + results = self._process_single_file(target_path, extracted_filename) + for res in results: res['archive'] = archive_name + dataset.extend(results) extracted_something = True except OverflowError as oe: logger.error(f"Error extracting {archive_name}: {oe}") dataset.append({'error': str(oe), 'filename': extracted_filename, 'archive': archive_name}) if target_path.exists(): target_path.unlink() except Exception as extract_err: - logger.error(f"Failed to extract/process bz2 {archive_name}: {extract_err}") + logger.error(f"Failed to extract/process bz2 {archive_name}: {extract_err}", exc_info=True) dataset.append({'error': f'Extraction/processing failed: {extract_err}', 'filename': archive_name}) else: @@ -695,92 +751,14 @@ class EnhancedFileProcessor: logger.error(f"Invalid or corrupted archive file {archive_name}: {archive_err}") dataset.append({'error': f'Corrupted or invalid archive: {archive_err}', 'filename': archive_name}) except Exception as e: - logger.error(f"Failed to extract/process file {file_info.filename} from zip {archive_path_obj.name}: {extract_err}") - - # Handle TAR archives (covers .tar, .tar.gz, .tgz, .tar.bz2) - # Need to import bz2 if supporting .bz2 - elif tarfile.is_tarfile(archive_path): - logger.debug(f"Processing TAR file: {archive_path_obj.name}") - # Mode 'r:*' auto-detects compression (gz, bz2, xz if libs available) - with tarfile.open(archive_path, 'r:*') as tar_ref: - # Add security checks for tar extraction if needed (e.g., checking paths) - for member in tar_ref.getmembers(): - if member.isfile() and member.name and not member.name.startswith('/') and '..' not in member.name: - try: - # Construct safe path - extracted_path = extract_to / member.name - extracted_path.parent.mkdir(parents=True, exist_ok=True) - # Extract safely - with tar_ref.extractfile(member) as source, open(extracted_path, "wb") as target: - target.write(source.read()) - - logger.debug(f"Extracted {member.name} from tar.") - dataset.extend(self._process_single_file(extracted_path)) - except Exception as extract_err: - logger.error(f"Failed to extract/process member {member.name} from tar {archive_path_obj.name}: {extract_err}") - - # Handle GZIP archives (single file compression) - check it's not a tar.gz - elif archive_path.lower().endswith('.gz') and not archive_path.lower().endswith('.tar.gz'): - logger.debug(f"Processing GZIP file: {archive_path_obj.name}") - # Need to determine the output filename (remove .gz) - extracted_filename = archive_path_obj.stem - # Handle cases like '.txt.gz' -> '.txt' - if '.' in extracted_filename: - extracted_path = extract_to / extracted_filename - else: - # If no inner extension (e.g., 'myfile.gz'), maybe add a default like '.bin' or leave as is? - extracted_path = extract_to / (extracted_filename + ".bin") # Example + logger.error(f"General archive processing error for {archive_name}: {e}", exc_info=True) + dataset.append({'error': f'Archive processing failed: {e}', 'filename': archive_name}) - try: - extracted_path.parent.mkdir(parents=True, exist_ok=True) - with gzip.open(archive_path, 'rb') as gz_file, open(extracted_path, 'wb') as outfile: - outfile.write(gz_file.read()) - logger.debug(f"Extracted {extracted_path.name} from gzip.") - dataset.extend(self._process_single_file(extracted_path)) - except gzip.BadGzipFile as e: - logger.error(f"Error processing GZIP archive {archive_path_obj.name}: Bad Gzip File - {e}") - except Exception as extract_err: - logger.error(f"Failed to extract/process gzip file {archive_path_obj.name}: {extract_err}") - - # Add BZ2 single file support (requires bz2 import) - elif archive_path.lower().endswith('.bz2') and not archive_path.lower().endswith('.tar.bz2'): - logger.debug(f"Processing BZ2 file: {archive_path_obj.name}") - try: - import bz2 - extracted_filename = archive_path_obj.stem - extracted_path = extract_to / extracted_filename - if '.' not in extracted_filename: - extracted_path = extract_to / (extracted_filename + ".bin") - - extracted_path.parent.mkdir(parents=True, exist_ok=True) - with bz2.open(archive_path, 'rb') as bz2_file, open(extracted_path, 'wb') as outfile: - outfile.write(bz2_file.read()) - logger.debug(f"Extracted {extracted_path.name} from bz2.") - dataset.extend(self._process_single_file(extracted_path)) - - except ImportError: - logger.warning("bz2 library not available, cannot process .bz2 files.") - except Exception as extract_err: - logger.error(f"Failed to extract/process bz2 file {archive_path_obj.name}: {extract_err}") - - - # Placeholder for other types or if no specific handler matched - else: - logger.warning(f"Archive type not explicitly handled or not a recognized archive: {archive_path_obj.name}") - - - except FileNotFoundError: - logger.error(f"Archive file not found: {archive_path}") - except (zipfile.BadZipFile, tarfile.TarError, gzip.BadGzipFile) as archive_err: - logger.error(f"Invalid or corrupted archive file {archive_path_obj.name}: {archive_err}") - dataset.append({'error': f'Corrupted archive: {archive_err}', 'filename': archive_path_obj.name}) - except Exception as e: - logger.error(f"General archive processing error for {archive_path_obj.name}: {e}", exc_info=True) - dataset.append({'error': f'Archive processing failed: {e}', 'filename': archive_path_obj.name}) return dataset + # Adjusted chunk_data with recommended max_size for QR codes - def chunk_data(self, data: Union[Dict, List, str], max_size: int = 1700) -> List[Dict]: + def chunk_data(self, data: Union[Dict, List, str], max_size: int = QR_PAYLOAD_MAX_BYTES) -> List[Dict]: """Enhanced data chunking with sequence metadata, sized for QR codes.""" try: if not isinstance(data, str): @@ -836,20 +814,31 @@ class EnhancedFileProcessor: current_chunk_byte_length = len(chunk_payload_str.encode('utf-8')) # Adjust end_char_idx if current chunk exceeds max_size (rare if estimate is decent) + # This loop ensures the byte limit is strictly adhered to while current_chunk_byte_length > max_size and end_char_idx > start_char_idx: - end_char_idx -= 1 # Reduce characters + end_char_idx -= 1 # Reduce characters by 1 + # Re-slice the string with the adjusted end index chunk_payload_str = json_str[start_char_idx:end_char_idx] current_chunk_byte_length = len(chunk_payload_str.encode('utf-8')) + # Handle edge case where adjustment makes chunk empty (shouldn't happen if max_size > 1) if not chunk_payload_str and start_char_idx < len(json_str): - # This should not happen with the logic above, but as a safeguard - logger.error("Chunking resulted in empty payload string unexpectedly.") - # Handle error: skip, break, or adjust logic - break # Avoid infinite loop + logger.error(f"Chunking resulted in empty payload string unexpectedly at index {i}. Max size: {max_size}, Start char: {start_char_idx}.") + # Option: break, or try to take at least one char if possible + if end_char_idx == start_char_idx and start_char_idx < len(json_str): + end_char_idx += 1 # Force at least one char + chunk_payload_str = json_str[start_char_idx:end_char_idx] + current_chunk_byte_length = len(chunk_payload_str.encode('utf-8')) + if current_chunk_byte_length > max_size: + logger.error(f"Cannot create chunk even with 1 char without exceeding max_size ({max_size} bytes). Aborting chunking.") + return [] # Cannot proceed + else: + break # Avoid potential infinite loop + chunk_meta = { "chunk_index": i, - "total_chunks": num_chunks, + "total_chunks": num_chunks, # Initial estimate, may be adjusted later "total_length": total_length, # Original total byte length "chunk_byte_length": current_chunk_byte_length, # Actual byte length of this chunk's payload "chunk_hash": hash(chunk_payload_str) & 0xFFFFFFFF, @@ -863,15 +852,19 @@ class EnhancedFileProcessor: # Safety break if start index doesn't advance if start_char_idx == len(json_str) and i + 1 < num_chunks: - logger.warning(f"Chunking finished early at index {i+1} of {num_chunks}. Check logic.") - # Adjust total_chunks if ending early? - for ch in chunks: ch['total_chunks'] = len(chunks) + logger.warning(f"Chunking finished early at index {i+1} of {num_chunks}. Adjusting total chunks.") + # Adjust total_chunks in already created chunks + final_num_chunks = len(chunks) + for ch_idx, ch in enumerate(chunks): + ch['total_chunks'] = final_num_chunks + ch['chunk_index'] = ch_idx # Re-index just in case + num_chunks = final_num_chunks # Update num_chunks for loop condition/logging break - # Final check if total chunks changed - if chunks and chunks[0]['total_chunks'] != len(chunks): - logger.warning(f"Adjusting total_chunks from {chunks[0]['total_chunks']} to {len(chunks)}") + # Final check if total chunks changed due to adjustments or early finish + if chunks and chunks[-1]['total_chunks'] != len(chunks): + logger.warning(f"Adjusting total_chunks from {chunks[-1]['total_chunks']} to {len(chunks)} after loop completion.") final_num_chunks = len(chunks) for i, chunk in enumerate(chunks): chunk['total_chunks'] = final_num_chunks @@ -906,7 +899,16 @@ def generate_stylish_qr(data: str, # Expecting string data from chunking # Let the library figure out the best version and mode qr.make(fit=True) - logger.info(f"Generating QR code version {qr.version} for {filename} (Payload size: {len(data.encode('utf-8'))} bytes)") + payload_bytes = len(data.encode('utf-8')) + logger.info(f"Generating QR code version {qr.version} for {filename} (Payload size: {payload_bytes} bytes)") + + # Check if payload size exceeds capacity for the chosen version/ECC level + # This is a secondary check, as DataOverflowError should catch it, but good for logging + # Note: Capacities vary by mode (Numeric, Alphanumeric, Byte, Kanji) + # We assume Byte mode for JSON strings. + # Example capacity for V40-H (Byte mode): 1273 bytes + # A more robust check would involve getting capacity from the library if possible. + # For now, rely on the DataOverflowError exception. # Create QR code image with custom colors @@ -936,7 +938,8 @@ def generate_stylish_qr(data: str, # Expecting string data from chunking return str(output_path) # Catch specific data overflow error except qrcode.exceptions.DataOverflowError as doe: - logger.error(f"QR DataOverflowError for {filename}: {doe}. Data length (bytes): {len(data.encode('utf-8'))}. Max capacity likely exceeded for ErrorLevel {error_correction_level}.") + payload_bytes = len(data.encode('utf-8')) + logger.error(f"QR DataOverflowError for {filename}: {doe}. Data length (bytes): {payload_bytes}. Max capacity likely exceeded for ErrorLevel {error_correction_level} and auto-detected version {getattr(qr, 'version', 'N/A')}.") return "" # Return empty string on failure except Exception as e: logger.error(f"QR generation error for {filename}: {e}", exc_info=True) @@ -965,7 +968,7 @@ def generate_qr_codes(data_to_encode: Union[str, Dict, List], combine_sources: b return [] # Chunk the combined data structure - chunks = file_processor.chunk_data(data_to_encode) # Chunker expects dict/list/str + chunks = file_processor.chunk_data(data_to_encode, max_size=QR_PAYLOAD_MAX_BYTES) # Use constant if not chunks: logger.error("Chunking the combined data failed.") return [] @@ -988,7 +991,7 @@ def generate_qr_codes(data_to_encode: Union[str, Dict, List], combine_sources: b all_qr_paths.append(qr_path) else: logger.error(f"Failed to generate QR code for combined chunk {i+1}") - # Optionally stop or continue? + # Optionally stop or continue? Or add error marker? else: # Process each item in the input list individually @@ -1006,15 +1009,23 @@ def generate_qr_codes(data_to_encode: Union[str, Dict, List], combine_sources: b for item_idx, item in enumerate(data_to_encode): item_source_info = f"item {item_idx+1}/{total_items}" # Try to get a better name (e.g., from filename if available) - if isinstance(item, dict) and 'filename' in item: - item_source_info = item['filename'] - elif isinstance(item, dict) and 'url' in item: - item_source_info = Path(urlparse(item['url']).path).name or f"url_item_{item_idx+1}" + if isinstance(item, dict): + # Prioritize filename, then url, then source type + if 'filename' in item: + item_source_info = Path(item['filename']).stem # Use stem for cleaner name + elif 'url' in item: + # Clean up URL for filename use + parsed_url = urlparse(item['url']) + url_path_name = Path(parsed_url.path).stem + url_domain = parsed_url.netloc.replace('.', '_') + item_source_info = f"url_{url_domain}_{url_path_name}" if url_path_name else f"url_{url_domain}_index" + elif 'source' in item: + item_source_info = item['source'] + f"_{item_idx+1}" logger.info(f"Processing source: {item_source_info}") # Chunk the individual item - chunks = file_processor.chunk_data(item) + chunks = file_processor.chunk_data(item, max_size=QR_PAYLOAD_MAX_BYTES) # Use constant if not chunks: logger.error(f"Chunking failed for item {item_idx+1} ({item_source_info})") continue # Skip to next item @@ -1023,8 +1034,9 @@ def generate_qr_codes(data_to_encode: Union[str, Dict, List], combine_sources: b logger.info(f"Generating {num_chunks} QR codes for {item_source_info}.") for chunk_idx, chunk_info in enumerate(chunks): # Sanitize source info for filename - safe_source_name = re.sub(r'[^\w\-]+', '_', item_source_info) - filename = f'{safe_source_name}_chunk_{chunk_idx+1}_of_{num_chunks}_{int(time.time())}.png' + safe_source_name = re.sub(r'[^\w\-\.]+', '_', item_source_info)[:50] # Limit length + timestamp_short = str(int(time.time()))[-6:] # Shorter timestamp + filename = f'{safe_source_name}_chunk_{chunk_idx+1}_of_{num_chunks}_{timestamp_short}.png' qr_payload = chunk_info['data'] qr_path = generate_stylish_qr( data=qr_payload, @@ -1048,12 +1060,10 @@ def generate_qr_codes(data_to_encode: Union[str, Dict, List], combine_sources: b def _generate_sequence_visualization_image(qr_paths: List[str], qr_data: List[Dict], title: str = "QR Code Sequence") -> Optional[io.BytesIO]: """ Generates a visual representation of the QR code sequence using NetworkX and Matplotlib. - Args: qr_paths: List of file paths to the QR code images. qr_data: List of decoded data dictionaries, hopefully containing 'chunk_index'. title: The title for the visualization plot. - Returns: A BytesIO buffer containing the PNG image of the visualization, or None if error. """ @@ -1077,6 +1087,9 @@ def _generate_sequence_visualization_image(qr_paths: List[str], qr_data: List[Di # Use chunk_index from metadata if possible, otherwise use list index chunk_idx = qr_data[i].get('chunk_index', i) label = f"{chunk_idx + 1}/{total_chunks_from_meta}" + # Add indicator if decode failed + if qr_data[i].get('data') == "[DECODE FAILED]": + label += "\n(Decode Err)" node_labels[node_id] = label G.add_node(node_id, path=qr_paths[i], data=qr_data[i]) @@ -1093,24 +1106,22 @@ def _generate_sequence_visualization_image(qr_paths: List[str], qr_data: List[Di return None # --- Layout and Drawing --- - plt.figure(figsize=(max(10, num_nodes * 1.5), 5)) # Adjust figure size based on number of nodes + plt.style.use('seaborn-v0_8-whitegrid') # Use a clean style + plt.figure(figsize=(max(10, num_nodes * 1.5), 6)) # Adjust figure size based on number of nodes, slightly taller # Simple linear layout for sequences is often clearest pos = {i: (i * 2, 0) for i in range(num_nodes)} # Horizontal layout - # For more complex graphs, consider other layouts: - # pos = nx.spring_layout(G, k=0.5, iterations=50) - # pos = nx.kamada_kawai_layout(G) + nx.draw_networkx_nodes(G, pos, node_size=node_sizes, node_color=node_colors, alpha=0.9, edgecolors='grey') + nx.draw_networkx_edges(G, pos, arrowstyle='-|>', arrowsize=20, edge_color='gray', alpha=0.6, node_size=node_sizes) + nx.draw_networkx_labels(G, pos, labels=node_labels, font_size=9, font_color='white', font_weight='bold') - nx.draw_networkx_nodes(G, pos, node_size=node_sizes, node_color=node_colors, alpha=0.9) - nx.draw_networkx_edges(G, pos, arrowstyle='-|>', arrowsize=20, edge_color='gray', alpha=0.6) - nx.draw_networkx_labels(G, pos, labels=node_labels, font_size=10, font_color='white') - - plt.title(title, fontsize=16) + plt.title(title, fontsize=16, pad=20) plt.xlabel("Sequence Index", fontsize=12) plt.yticks([]) # Hide Y-axis ticks for linear layout plt.xticks(range(0, num_nodes * 2, 2), [f"{i+1}" for i in range(num_nodes)]) # Label X-axis ticks plt.box(False) # Remove frame box + plt.margins(x=0.1) # Add some horizontal margin plt.tight_layout() # Save plot to a BytesIO buffer @@ -1134,6 +1145,7 @@ def create_qr_sequence_visualizer(output_gallery_ref): # Pass a reference if nee gr.Markdown(""" ## QR Code Sequence Visualizer Upload a sequence of QR codes (e.g., those generated by this app) to decode them and visualize their order. + Uses OpenCV for detection and decoding. Requires `opencv-python-headless`. """) # Store data globally within this tab's scope (alternative to Gradio State) @@ -1149,9 +1161,8 @@ def create_qr_sequence_visualizer(output_gallery_ref): # Pass a reference if nee ) visualize_btn = gr.Button("👁️ Decode & Visualize Sequence", variant="primary") reset_btn = gr.Button("🗑️ Reset Visualizer", variant="secondary") + # Use a single Textbox for status messages (including indicator) visualization_status = gr.Textbox(label="Status", interactive=False, lines=3) - # Placeholder for interactive elements (future improvement) - # qr_toggles_container = gr.HTML(label="QR Code Controls (Future)") with gr.Column(scale=2): qr_visualization = gr.Image(label="QR Code Sequence Map", type="pil", height=400) # Use PIL type @@ -1163,74 +1174,89 @@ def create_qr_sequence_visualizer(output_gallery_ref): # Pass a reference if nee if not files: shared_data['qr_paths'] = [] shared_data['qr_data'] = [] - return "Please upload QR code images.", None, None, "⚠️ No QR codes uploaded." + return None, None, "⚠️ Please upload QR code images." # Return None for gallery/image, text for status logger.info(f"Processing {len(files)} uploaded QR files for visualization.") qr_data_list = [] qr_path_list = [] decode_errors = 0 - # Use OpenCV detector via qrcode library + # Use OpenCV detector try: - detector = qrcode.QRCodeDetector() - except AttributeError: - logger.error("qrcode.QRCodeDetector not found. Ensure correct library version or dependencies.") - return "Error initializing QR detector.", None, None, "❌ Library Error" + detector = cv2.QRCodeDetector() except Exception as init_e: - logger.error(f"Error initializing QR detector: {init_e}") - return f"Error initializing QR detector: {init_e}", None, None, "❌ Detector Init Error" + logger.error(f"Error initializing OpenCV QR detector: {init_e}") + return None, None, f"❌ Error initializing QR detector: {init_e}" for file in files: + file_display_name = getattr(file, 'orig_name', getattr(file, 'name', 'N/A')) try: img_path = file.name # Gradio File object path - img = Image.open(img_path) - img_np = np.array(img.convert('RGB')) # Detector often prefers RGB + # Read image using OpenCV + img_cv = cv2.imread(img_path) + if img_cv is None: + logger.warning(f"Could not read image file: {file_display_name}") + decode_errors += 1 + # Add placeholder for gallery consistency? + qr_data_list.append({"data": "[READ FAILED]", "chunk_index": -1, "filename": file_display_name}) + qr_path_list.append(img_path) # Still need path for gallery + continue - # Try to decode QR code - data, bbox, straight_qrcode = detector.detectAndDecode(img_np) + # Try to detect and decode QR code + data, bbox, straight_qrcode = detector.detectAndDecode(img_cv) if data: - logger.debug(f"Decoded data from {os.path.basename(img_path)}: {data[:50]}...") + logger.debug(f"Decoded data from {file_display_name}: {data[:50]}...") # Try parsing the decoded data as JSON (expected format from generator) try: qr_metadata = json.loads(data) # Check if it looks like our chunk format if isinstance(qr_metadata, dict) and 'chunk_index' in qr_metadata and 'total_chunks' in qr_metadata: + qr_metadata['filename'] = file_display_name # Add filename for reference qr_data_list.append(qr_metadata) qr_path_list.append(img_path) else: # Valid JSON, but not the expected chunk structure - logger.warning(f"Decoded valid JSON, but not expected format from {os.path.basename(img_path)}") - qr_data_list.append({"data": qr_metadata, "chunk_index": -1}) # Assign default index + logger.warning(f"Decoded valid JSON, but not expected format from {file_display_name}") + qr_data_list.append({"data": qr_metadata, "chunk_index": -1, "filename": file_display_name}) # Assign default index qr_path_list.append(img_path) except json.JSONDecodeError: # Data decoded, but not JSON - store raw data - logger.warning(f"Could not decode JSON from QR data in {os.path.basename(img_path)}. Storing raw.") - qr_data_list.append({"data": data, "chunk_index": -1}) # Assign default index + logger.warning(f"Could not decode JSON from QR data in {file_display_name}. Storing raw.") + qr_data_list.append({"data": data, "chunk_index": -1, "filename": file_display_name}) # Assign default index qr_path_list.append(img_path) except Exception as json_e: - logger.error(f"Error processing decoded JSON from {os.path.basename(img_path)}: {json_e}") - qr_data_list.append({"data": f"Error: {json_e}", "chunk_index": -1}) + logger.error(f"Error processing decoded JSON from {file_display_name}: {json_e}") + qr_data_list.append({"data": f"Error: {json_e}", "chunk_index": -1, "filename": file_display_name}) qr_path_list.append(img_path) decode_errors += 1 else: # QR code detected, but no data decoded (or detection failed) - logger.warning(f"Could not decode data from QR image: {os.path.basename(img_path)}") - qr_data_list.append({"data": "[DECODE FAILED]", "chunk_index": -1}) + logger.warning(f"Could not decode data from QR image: {file_display_name}") + qr_data_list.append({"data": "[DECODE FAILED]", "chunk_index": -1, "filename": file_display_name}) qr_path_list.append(img_path) decode_errors += 1 except Exception as e: - logger.error(f"Error processing QR image file {os.path.basename(getattr(file, 'name', 'N/A'))}: {e}", exc_info=True) - # Optionally add placeholder for failed file? + logger.error(f"Error processing QR image file {file_display_name}: {e}", exc_info=True) decode_errors += 1 + # Add placeholder if processing failed entirely + qr_data_list.append({"data": "[PROCESS ERROR]", "chunk_index": -1, "filename": file_display_name}) + qr_path_list.append(getattr(file, 'name', None)) # Need path for gallery if possible + - if not qr_path_list: + # Filter out entries where path couldn't be determined + valid_entries = [(data, path) for data, path in zip(qr_data_list, qr_path_list) if path is not None] + if not valid_entries: shared_data['qr_paths'] = [] shared_data['qr_data'] = [] - return "No valid QR codes could be processed or decoded.", None, None, "❌ Failed to process/decode QR codes" + return None, None, "❌ No valid QR codes could be processed or decoded." + + qr_data_list = [item[0] for item in valid_entries] + qr_path_list = [item[1] for item in valid_entries] + # Attempt to sort by chunk_index (handle missing index gracefully) try: @@ -1240,7 +1266,7 @@ def create_qr_sequence_visualizer(output_gallery_ref): # Pass a reference if nee # Use provided chunk_index, fallback to list index if missing or invalid (-1) sort_key = data.get('chunk_index', i) if not isinstance(sort_key, int) or sort_key < 0: - sort_key = i # Fallback to original order for this item + sort_key = float('inf') # Put items without valid index at the end indexed_items.append((sort_key, data, path)) # Sort based on the index key @@ -1278,35 +1304,35 @@ def create_qr_sequence_visualizer(output_gallery_ref): # Pass a reference if nee logger.error(f"Failed to load visualization buffer into PIL Image: {img_e}") + status_indicator = "✅" if vis_image_pil else "⚠️" status_message = f"Processed {len(shared_data['qr_paths'])} QR codes." if decode_errors > 0: - status_message += f" ({decode_errors} decode errors)" + status_message += f" ({decode_errors} decode/read errors)" status_message += "\nSequence visualized." if vis_image_pil else "\nVisualization generation failed." - final_status = "✅ Done" if vis_image_pil else "⚠️ Errors Occurred" + final_status_message = f"{status_indicator} {status_message}" # Update outputs: Gallery with sorted paths, Image with visualization, Status text - # The gallery expects a list of image paths or PIL images gallery_output = shared_data['qr_paths'] - return gallery_output, vis_image_pil, status_message, final_status + # Return gallery paths, visualization PIL image, combined status message + return gallery_output, vis_image_pil, final_status_message def reset_visualizer_state(): shared_data['qr_paths'] = [] shared_data['qr_data'] = [] logger.info("Resetting QR visualizer state.") - return None, None, None, "⚪ Visualizer Reset. Upload new QR codes." + # Clear gallery, image, file input, status text + reset_status_message = "⚪ Visualizer Reset. Upload new QR codes." + return None, None, None, reset_status_message # Event handlers visualize_btn.click( process_qr_codes_and_visualize, inputs=[qr_input], - outputs=[qr_preview, qr_visualization, visualization_status, visualization_status] # Update gallery, image, and status twice? Let's map correctly. - # Correct mapping: - # outputs=[qr_preview (Gallery), qr_visualization (Image), visualization_status (Textbox), visualization_status (Textbox again - maybe just need 3 outputs?)] - # Let's try mapping to the 4 defined outputs: - # outputs=[qr_preview, qr_visualization, visualization_status, visualization_status] # Seems redundant, but matches function signature needs. Let's adjust function signature later if needed. + # Map function outputs to Gradio components + outputs=[qr_preview, qr_visualization, visualization_status] # Gallery, Image, Status Text ).then( lambda: logger.info("Visualization process complete."), inputs=None, outputs=None ) @@ -1315,13 +1341,13 @@ def create_qr_sequence_visualizer(output_gallery_ref): # Pass a reference if nee reset_btn.click( reset_visualizer_state, inputs=[], - outputs=[qr_preview, qr_visualization, qr_input, visualization_status] # Clear gallery, image, file input, status + outputs=[qr_preview, qr_visualization, qr_input, visualization_status] # Clear gallery, image, file input, status text ) def create_modern_interface(): """Create a modern and visually appealing Gradio interface""" - # Modern CSS styling (Seems intact) + # Modern CSS styling css = """ /* Modern color scheme */ :root { @@ -1332,343 +1358,427 @@ def create_modern_interface(): --success-color: #48bb78; --error-color: #f56565; --warning-color: #ed8936; + --text-color: #1a202c; + --border-color: #e2e8f0; } + body { font-family: sans-serif; color: var(--text-color); } /* Container styling */ - .container { - max-width: 1200px; - margin: auto; - padding: 2rem; + .gradio-container { /* Target the main container */ background-color: var(--background-color); border-radius: 1rem; box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); + padding: 1rem; /* Add padding to container */ } /* Component styling */ - .input-container { + .input-container { /* Custom class if needed, otherwise target Gradio classes */ background-color: white; padding: 1.5rem; border-radius: 0.5rem; - border: 1px solid #e2e8f0; + border: 1px solid var(--border-color); margin-bottom: 1rem; } /* Button styling */ - .primary-button { - background-color: var(--primary-color); - color: white; - padding: 0.75rem 1.5rem; + .gradio-button { /* Target Gradio buttons */ + transition: all 0.2s; border-radius: 0.375rem; + padding: 0.75rem 1.5rem; + } + .gradio-button.primary { /* Primary variant */ + background-color: var(--primary-color) !important; + color: white !important; border: none; - cursor: pointer; - transition: all 0.2s; } - .primary-button:hover { - background-color: var(--accent-color); + .gradio-button.primary:hover { + background-color: var(--accent-color) !important; + transform: translateY(-1px); + } + .gradio-button.secondary { /* Secondary variant */ + background-color: var(--secondary-color) !important; + color: white !important; + border: none; + } + .gradio-button.secondary:hover { + background-color: #4a5568 !important; /* Darker secondary */ transform: translateY(-1px); } /* Status messages */ - .status { + .status { /* Custom class if needed */ padding: 1rem; border-radius: 0.375rem; margin: 1rem 0; + border: 1px solid transparent; } - .status.success { background-color: #f0fff4; color: var(--success-color); } - .status.error { background-color: #fff5f5; color: var(--error-color); } - .status.warning { background-color: #fffaf0; color: var(--warning-color); } + .status.success { border-color: var(--success-color); background-color: #f0fff4; color: var(--success-color); } + .status.error { border-color: var(--error-color); background-color: #fff5f5; color: var(--error-color); } + .status.warning { border-color: var(--warning-color); background-color: #fffaf0; color: var(--warning-color); } /* Gallery styling */ - .gallery { - display: grid; - grid-template-columns: repeat(auto-fit, minmax(150px, 1fr)); /* Adjust minmax */ + .gradio-gallery { /* Target Gradio gallery */ gap: 1rem; padding: 1rem; background-color: white; border-radius: 0.5rem; - border: 1px solid #e2e8f0; + border: 1px solid var(--border-color); min-height: 150px; /* Ensure gallery has some height */ } - .gallery img { - width: 100%; - height: auto; - object-fit: contain; /* Use contain to avoid stretching */ + /* Style gallery images */ + .gradio-gallery > div[data-testid="gallery-item"] > img { + object-fit: contain !important; /* Use contain to avoid stretching */ border-radius: 0.375rem; transition: transform 0.2s; border: 1px solid #eee; /* Add subtle border */ + background-color: #f8f9fa; /* Light background for images */ } - .gallery img:hover { + .gradio-gallery > div[data-testid="gallery-item"] > img:hover { transform: scale(1.05); box-shadow: 0 2px 4px rgba(0,0,0,0.1); /* Add hover shadow */ } + /* Tab styling */ + .gradio-tabs > .tabs > .tab-nav > button { /* Target tab buttons */ + padding: 0.75rem 1rem; + border-radius: 0.375rem 0.375rem 0 0; + font-weight: 500; + } + .gradio-tabs > .tabs > .tab-nav > button.selected { /* Selected tab */ + background-color: white !important; + border-bottom: 2px solid var(--primary-color) !important; + } + /* Textbox/Textarea styling */ + .gradio-textbox, .gradio-textarea { + border-radius: 0.375rem !important; + border: 1px solid var(--border-color) !important; + } + .gradio-textbox:focus, .gradio-textarea:focus { + border-color: var(--accent-color) !important; + box-shadow: 0 0 0 1px var(--accent-color) !important; + } """ # Create interface with modern design - with gr.Blocks(css=css, title="Advanced Data Processor & QR Generator") as interface: + with gr.Blocks(css=css, title="Advanced Data Processor & QR Generator", theme=gr.themes.Soft()) as interface: gr.Markdown(""" # 🌐 Advanced Data Processing & QR Code Generator - Transform your data into beautifully designed, sequenced QR codes with our cutting-edge processor. + Transform URLs, files (Text, JSON, PDF, Archives), or direct input into sequenced QR codes. Visualize sequences. """) - with gr.Row(): - with gr.Column(scale=2): - # Input Tabs - with gr.Tabs(): - with gr.TabItem("📝 URL Input"): - url_input = gr.Textbox( - label="Enter URLs (one per line or comma-separated)", - lines=5, - placeholder="https://example1.com\nhttps://example2.com", - elem_id="url-input" - ) - with gr.TabItem("📁 File Input"): - file_input = gr.File( - label="Upload Files (Text, JSON, Archives: zip, tar, gz, bz2)", - file_count="multiple", - # Removed file_types="*" to rely on backend logic, or specify supported ones: - # file_types=[".txt", ".json", ".csv", ".md", ".xml", ".html", ".zip", ".tar", ".gz", ".bz2"] - elem_id="file-input" - ) - with gr.TabItem("📋 Direct Input / JSON"): - text_input = gr.TextArea( - label="Direct Text/JSON Input", - lines=10, - placeholder="Paste your text or JSON data here...", - elem_id="text-input" - ) - with gr.Row(): - example_btn = gr.Button("📝 Load JSON Example") - clear_btn = gr.Button("🗑️ Clear Input") - - # Processing Options & Button - with gr.Row(): - combine_data = gr.Checkbox( - label="Combine all inputs into one sequence", - value=True, # Default to combined - info="If unchecked, each URL/File/Input generates its own QR sequence." - ) - process_btn = gr.Button( - "🔄 Process & Generate QR Codes", - variant="primary", - elem_id="process-button" - ) - - # Status Output - output_text = gr.Textbox( - label="Processing Status", - interactive=False, - lines=2, - elem_id="status-output" - ) - - - with gr.Column(scale=3): - # Output Area - gr.Markdown("### Results") - with gr.Tabs(): - with gr.TabItem("🖼️ QR Codes"): - output_gallery = gr.Gallery( - label="Generated QR Codes", - columns=4, # Adjust columns as needed - height=500, # Adjust height - object_fit="contain", - preview=True, # Enable preview click - elem_id="qr-gallery" - ) - with gr.TabItem("📄 Processed Data (JSON)"): - output_json = gr.JSON( - label="Processed Data Structure", - elem_id="json-output" - ) - - # Load example data - def load_example(): - example = { - "project": "Data Transfer Example", - "version": 1.1, - "items": [ - {"id": "A001", "name": "Item One", "value": 123.45, "tags": ["tag1", "tag2"]}, - {"id": "B002", "name": "Item Two", "value": 67.89, "enabled": True} - ], - "timestamp": datetime.now().isoformat() - } - return json.dumps(example, indent=2) - - def clear_input_area(): - # Clear only the direct text input area - return "" - - # --- Main Processing Function --- - def process_inputs_and_generate_qrs(urls, files, text, combine): - """Process all inputs, combine if requested, and generate QR codes.""" - start_time = time.time() - logger.info("Starting data processing...") - status_updates = [] - all_processed_data = [] # List to hold results from all sources - - url_processor = EnhancedURLProcessor() - file_processor = EnhancedFileProcessor() - - # 1. Process URLs - if urls and urls.strip(): - url_list = re.split(r'[,\n]+', urls) # Split by comma or newline, handle multiple newlines - url_list = [u.strip() for u in url_list if u.strip()] # Clean up - status_updates.append(f"Processing {len(url_list)} URLs...") - logger.info(f"Processing URLs: {url_list}") - for i, url in enumerate(url_list): - logger.info(f"Processing URL {i+1}/{len(url_list)}: {url}") - # Basic validation before fetching - if not validators.url(url): - logger.warning(f"Skipping invalid URL format: {url}") - status_updates.append(f"⚠️ Skipped invalid URL: {url[:50]}...") - all_processed_data.append({'error': 'Invalid URL format', 'url': url}) - continue - - content_data = url_processor.fetch_content(url) - if content_data and 'content' in content_data: - logger.info(f"Successfully fetched content from {url} ({len(content_data.get('raw_content',''))} bytes)") - # Structure the result similarly to file processing output - processed_url_data = { - 'source': 'url', - 'url': url, - 'content': content_data['content'], # Processed text content - 'raw_content': content_data['raw_content'], # Raw response body - 'metadata': content_data['metadata'], # Headers, status, etc. - 'timestamp': datetime.now().isoformat() - } - all_processed_data.append(processed_url_data) - status_updates.append(f"✓ Fetched: {url[:60]}...") - else: - logger.error(f"Failed to fetch content from URL: {url}") - status_updates.append(f"❌ Failed fetch: {url[:60]}...") - all_processed_data.append({'error': 'Failed to fetch content', 'url': url}) - - # 2. Process Files - if files: - status_updates.append(f"Processing {len(files)} uploaded files...") - logger.info(f"Processing {len(files)} files.") - for i, file_obj in enumerate(files): - logger.info(f"Processing file {i+1}/{len(files)}: {getattr(file_obj, 'name', 'N/A')}") - try: - # Pass the Gradio file object directly to process_file - file_results = file_processor.process_file(file_obj) - if file_results: - all_processed_data.extend(file_results) - # Get filename safely from results (might be multiple from archive) - processed_filenames = [res.get('filename', 'N/A') for res in file_results] - status_updates.append(f"✓ Processed file(s): {', '.join(processed_filenames)}") - logger.info(f"Successfully processed file(s): {', '.join(processed_filenames)}") + with gr.Tab("⚙️ Data Processor & QR Generator"): + with gr.Row(): + with gr.Column(scale=2): + # Input Tabs + with gr.Tabs(): + with gr.TabItem("📝 URL Input"): + url_input = gr.Textbox( + label="Enter URLs (one per line or comma-separated)", + lines=5, + placeholder="https://example1.com\nhttps://example2.com", + elem_id="url-input" + ) + with gr.TabItem("📁 File Input"): + file_input = gr.File( + label="Upload Files (Text, JSON, PDF, Archives: zip, tar, gz, bz2)", + file_count="multiple", + # Specify supported types explicitly for better UX + file_types=[".txt", ".json", ".csv", ".md", ".xml", ".html", ".pdf", ".zip", ".tar", ".gz", ".bz2", ".tgz", ".tar.gz", ".tar.bz2"], + elem_id="file-input" + ) + with gr.TabItem("📋 Direct Input / JSON"): + text_input = gr.TextArea( + label="Direct Text/JSON Input", + lines=10, + placeholder="Paste your text or JSON data here...", + elem_id="text-input" + ) + with gr.Row(): + example_btn = gr.Button("📝 Load JSON Example") + clear_btn = gr.Button("🗑️ Clear Input") + + # Processing Options & Button + with gr.Row(equal_height=True): + with gr.Column(scale=1, min_width=150): + combine_data = gr.Checkbox( + label="Combine all inputs", + value=True, # Default to combined + info="Create one sequence from all sources." + ) + with gr.Column(scale=2): + process_btn = gr.Button( + "🔄 Process & Generate QR Codes", + variant="primary", + elem_id="process-button" + ) + + # Status Output + output_text = gr.Textbox( + label="Processing Status", + interactive=False, + lines=4, # Increased lines for more status info + elem_id="status-output" + ) + + + with gr.Column(scale=3): + # Output Area + gr.Markdown("### Results") + with gr.Tabs(): + with gr.TabItem("🖼️ QR Codes"): + output_gallery = gr.Gallery( + label="Generated QR Codes", + columns=4, # Adjust columns as needed + height=500, # Adjust height + object_fit="contain", + preview=True, # Enable preview click + elem_id="qr-gallery" + ) + with gr.TabItem("📄 Processed Data (JSON)"): + output_json = gr.JSON( + label="Processed Data Structure", + elem_id="json-output" + ) + + # Load example data + def load_example(): + example = { + "project": "Data Transfer Example", + "version": 1.1, + "items": [ + {"id": "A001", "name": "Item One", "value": 123.45, "tags": ["tag1", "tag2"]}, + {"id": "B002", "name": "Item Two", "value": 67.89, "enabled": True} + ], + "timestamp": datetime.now().isoformat() + } + return json.dumps(example, indent=2) + + def clear_input_area(): + # Clear only the direct text input area + return "" + + # --- Main Processing Function --- + def process_inputs_and_generate_qrs(urls, files, text, combine, progress=gr.Progress(track_tqdm=True)): + """Process all inputs, combine if requested, and generate QR codes.""" + start_time = time.time() + logger.info("Starting data processing...") + status_updates = [] + all_processed_data = [] # List to hold results from all sources + + url_processor = EnhancedURLProcessor() + file_processor = EnhancedFileProcessor() + + # Estimate total steps for progress bar + num_urls = len(re.split(r'[,\n]+', urls.strip())) if urls and urls.strip() else 0 + num_files = len(files) if files else 0 + has_text = 1 if text and text.strip() else 0 + total_steps = num_urls + num_files + has_text + 1 # +1 for QR generation step + + step_counter = 0 + + # Wrap iterables with tqdm for progress tracking + url_list = [] + if urls and urls.strip(): + url_list = [u.strip() for u in re.split(r'[,\n]+', urls.strip()) if u.strip()] # Clean up + + file_list = files if files else [] + text_input_present = text and text.strip() + + # 1. Process URLs + if url_list: + status_updates.append(f"Processing {len(url_list)} URLs...") + logger.info(f"Processing URLs: {url_list}") + # Use tqdm description for progress + desc = f"Processing URLs" + for i, url in enumerate(progress.tqdm(url_list, desc=desc, total=len(url_list))): + step_counter += 1 + # progress(step_counter / total_steps, desc=f"URL {i+1}/{len(url_list)}") + logger.info(f"Processing URL {i+1}/{len(url_list)}: {url}") + # Basic validation before fetching + if not validators.url(url): + logger.warning(f"Skipping invalid URL format: {url}") + status_updates.append(f"⚠️ Skipped invalid URL: {url[:50]}...") + all_processed_data.append({'error': 'Invalid URL format', 'url': url}) + continue + + content_data = url_processor.fetch_content(url) + if content_data and 'content' in content_data: + logger.info(f"Successfully fetched content from {url} (Size: {content_data['metadata'].get('content_length_bytes','N/A')} bytes)") + # Structure the result similarly to file processing output + processed_url_data = { + 'source': 'url', + 'url': content_data['metadata']['final_url'], # Use final URL after redirects + 'content': content_data['content'], # Processed text content + # 'raw_bytes': content_data['raw_bytes'], # Don't include raw bytes in final JSON unless needed + 'metadata': { # Select relevant metadata + 'original_url': url, + 'content_type': content_data['metadata']['content_type'], + 'status_code': content_data['metadata']['status_code'], + 'encoding_used': content_data['metadata']['encoding_used'], + 'content_length_bytes': content_data['metadata']['content_length_bytes'] + }, + 'timestamp': datetime.now().isoformat() + } + all_processed_data.append(processed_url_data) + status_updates.append(f"✓ Fetched: {url[:60]}...") else: - status_updates.append(f"⚠️ No data extracted from file: {getattr(file_obj, 'name', 'N/A')}") - logger.warning(f"No data extracted from file: {getattr(file_obj, 'name', 'N/A')}") - # Add placeholder error if desired - # all_processed_data.append({'error': 'No data extracted', 'filename': getattr(file_obj, 'name', 'N/A')}) - - except Exception as file_proc_err: - file_name = getattr(file_obj, 'name', 'N/A') - logger.error(f"Error processing file {file_name}: {file_proc_err}", exc_info=True) - status_updates.append(f"❌ Error processing file: {file_name}") - all_processed_data.append({'error': f'File processing error: {file_proc_err}', 'filename': file_name}) - - - # 3. Process Direct Text/JSON Input - if text and text.strip(): - status_updates.append("Processing direct input...") - logger.info("Processing direct text/JSON input.") - # Attempt to parse as JSON first + logger.error(f"Failed to fetch content from URL: {url}") + status_updates.append(f"❌ Failed fetch: {url[:60]}...") + all_processed_data.append({'error': 'Failed to fetch content', 'url': url}) + + # 2. Process Files + if file_list: + status_updates.append(f"Processing {len(file_list)} uploaded files...") + logger.info(f"Processing {len(file_list)} files.") + desc = f"Processing Files" + for i, file_obj in enumerate(progress.tqdm(file_list, desc=desc, total=len(file_list))): + step_counter += 1 + # progress(step_counter / total_steps, desc=f"File {i+1}/{len(file_list)}") + file_name_for_log = getattr(file_obj, 'orig_name', getattr(file_obj, 'name', 'N/A')) # Try orig_name first + logger.info(f"Processing file {i+1}/{len(file_list)}: {file_name_for_log}") + try: + # Pass the Gradio file object directly to process_file + file_results = file_processor.process_file(file_obj) + if file_results: + # Filter out results that are just errors before extending + valid_results = [res for res in file_results if 'error' not in res] + error_results = [res for res in file_results if 'error' in res] + + if valid_results: + all_processed_data.extend(valid_results) + processed_filenames = [res.get('filename', 'N/A') for res in valid_results] + status_updates.append(f"✓ Processed: {', '.join(processed_filenames)}") + logger.info(f"Successfully processed: {', '.join(processed_filenames)}") + if error_results: + all_processed_data.extend(error_results) # Keep errors for JSON output + error_filenames = [res.get('filename', 'N/A') for res in error_results] + status_updates.append(f"❌ Errors processing: {', '.join(error_filenames)}") + logger.warning(f"Errors processing: {', '.join(error_filenames)}") + + if not valid_results and not error_results: + status_updates.append(f"⚠️ No data extracted from: {file_name_for_log}") + logger.warning(f"No data extracted from: {file_name_for_log}") + all_processed_data.append({'error': 'No data extracted', 'filename': file_name_for_log}) + + except Exception as file_proc_err: + file_name = getattr(file_obj, 'orig_name', getattr(file_obj, 'name', 'N/A')) + logger.error(f"Error processing file {file_name}: {file_proc_err}", exc_info=True) + status_updates.append(f"❌ Error processing file: {file_name}") + all_processed_data.append({'error': f'File processing error: {file_proc_err}', 'filename': file_name}) + + + # 3. Process Direct Text/JSON Input + if text_input_present: + step_counter += 1 + progress(step_counter / total_steps, desc="Processing Direct Input") + status_updates.append("Processing direct input...") + logger.info("Processing direct text/JSON input.") + # Attempt to parse as JSON first + try: + json_data = json.loads(text) + logger.info("Direct input parsed as JSON.") + processed_text_data = { + 'source': 'direct_json', + 'content': json_data, # Parsed JSON object/list + 'raw_content': text, # Original string + 'timestamp': datetime.now().isoformat() + } + all_processed_data.append(processed_text_data) + status_updates.append("✓ Processed direct input as JSON.") + except json.JSONDecodeError: + # If not JSON, treat as plain text + logger.info("Direct input treated as plain text.") + processed_text_data = { + 'source': 'direct_text', + 'content': text, # Store as plain text + 'timestamp': datetime.now().isoformat() + } + all_processed_data.append(processed_text_data) + status_updates.append("✓ Processed direct input as Text.") + except Exception as direct_input_err: + logger.error(f"Error processing direct input: {direct_input_err}", exc_info=True) + status_updates.append(f"❌ Error processing direct input.") + all_processed_data.append({'error': f'Direct input error: {direct_input_err}', 'source': 'direct_input'}) + + + # 4. Check if any valid data was processed + valid_processed_data = [d for d in all_processed_data if 'error' not in d] + if not valid_processed_data: + logger.warning("No valid data sources found or processed.") + status_updates.append("⚠�� No valid data to process. Please provide input or check errors.") + final_status = "\n".join(status_updates) + # Return the error data for JSON view, empty gallery, and status + return all_processed_data, [], final_status + + logger.info(f"Total valid processed data items: {len(valid_processed_data)}") + status_updates.append(f"Data processed ({len(valid_processed_data)} valid items). Generating QR codes...") + + # 5. Generate QR Codes using only valid data + qr_paths = [] + progress(step_counter / total_steps, desc="Generating QR Codes") # Update progress before QR step try: - json_data = json.loads(text) - logger.info("Direct input parsed as JSON.") - processed_text_data = { - 'source': 'direct_json', - 'content': json_data, # Parsed JSON object/list - 'raw_content': text, # Original string - 'timestamp': datetime.now().isoformat() - } - all_processed_data.append(processed_text_data) - status_updates.append("✓ Processed direct input as JSON.") - except json.JSONDecodeError: - # If not JSON, treat as plain text - logger.info("Direct input treated as plain text.") - processed_text_data = { - 'source': 'direct_text', - 'content': text, # Store as plain text - 'timestamp': datetime.now().isoformat() - } - all_processed_data.append(processed_text_data) - status_updates.append("✓ Processed direct input as Text.") - except Exception as direct_input_err: - logger.error(f"Error processing direct input: {direct_input_err}", exc_info=True) - status_updates.append(f"❌ Error processing direct input.") - all_processed_data.append({'error': f'Direct input error: {direct_input_err}', 'source': 'direct_input'}) - - - # 4. Check if any data was processed - if not all_processed_data: - logger.warning("No valid data sources found or processed.") - status_updates.append("⚠️ No data to process. Please provide input.") + # Pass the list of *valid* processed data items + data_for_qr = valid_processed_data if combine else valid_processed_data # Pass the list itself if not combining + qr_paths = generate_qr_codes(data_for_qr, combine) + if qr_paths: + status_updates.append(f"✓ Generated {len(qr_paths)} QR codes.") + logger.info(f"Successfully generated {len(qr_paths)} QR codes.") + else: + # Check if chunking failed or QR generation failed for all chunks + if not valid_processed_data: # Should have been caught earlier, but double-check + status_updates.append("❌ No valid data was available for QR generation.") + else: + status_updates.append("❌ QR code generation failed or produced no codes (check logs for details).") + logger.error("QR code generation returned no paths despite valid input data.") + # Keep processed data, but gallery will be empty + + except Exception as qr_gen_err: + logger.error(f"Error during QR code generation step: {qr_gen_err}", exc_info=True) + status_updates.append(f"❌ Error generating QR codes: {qr_gen_err}") + # Keep processed data, gallery will be empty + + + # 6. Finalize and Return + end_time = time.time() + processing_time = end_time - start_time + status_updates.append(f"Total processing time: {processing_time:.2f} seconds.") final_status = "\n".join(status_updates) - return None, [], final_status # Return empty results - logger.info(f"Total processed data items: {len(all_processed_data)}") - status_updates.append(f"Data processed ({len(all_processed_data)} items). Generating QR codes...") + # Return processed data (including errors for JSON view), QR paths (for Gallery), and status string + # Ensure qr_paths is a list of strings + qr_paths_str = [str(p) for p in qr_paths] if qr_paths else [] - # 5. Generate QR Codes - qr_paths = [] - try: - # Pass the list of processed data items - qr_paths = generate_qr_codes(all_processed_data, combine) - if qr_paths: - status_updates.append(f"✓ Generated {len(qr_paths)} QR codes.") - logger.info(f"Successfully generated {len(qr_paths)} QR codes.") - else: - status_updates.append("❌ QR code generation failed or produced no codes.") - logger.error("QR code generation returned no paths.") - # Keep processed data, but gallery will be empty - - except Exception as qr_gen_err: - logger.error(f"Error during QR code generation step: {qr_gen_err}", exc_info=True) - status_updates.append(f"❌ Error generating QR codes: {qr_gen_err}") - # Keep processed data, gallery will be empty - - - # 6. Finalize and Return - end_time = time.time() - processing_time = end_time - start_time - status_updates.append(f"Total processing time: {processing_time:.2f} seconds.") - final_status = "\n".join(status_updates) - - # Return processed data (for JSON view), QR paths (for Gallery), and status string - # Ensure qr_paths is a list of strings - qr_paths_str = [str(p) for p in qr_paths] if qr_paths else [] + # Return all data (including errors) for JSON output, gallery paths, and status text + return all_processed_data, qr_paths_str, final_status - # Return data for JSON output, gallery paths, and status text - return all_processed_data, qr_paths_str, final_status + # --- Event Handlers --- + example_btn.click(load_example, outputs=[text_input]) + clear_btn.click(clear_input_area, outputs=[text_input]) - # --- Event Handlers --- - example_btn.click(load_example, outputs=[text_input]) - clear_btn.click(clear_input_area, outputs=[text_input]) + process_btn.click( + process_inputs_and_generate_qrs, + inputs=[url_input, file_input, text_input, combine_data], + outputs=[output_json, output_gallery, output_text] # Match function return order + ) - process_btn.click( - process_inputs_and_generate_qrs, - inputs=[url_input, file_input, text_input, combine_data], - outputs=[output_json, output_gallery, output_text] # Match function return order - ) + # Add helpful documentation + gr.Markdown(""" + ### 🚀 Features + - **Complete URL Scraping**: Extracts text content from web pages (HTML, Text, JSON, PDF). Follows redirects. + - **Advanced File Processing**: Handles text, JSON, PDF, and archives (.zip, .tar.*, .gz, .bz2). Extracts archive contents. Attempts intelligent JSON detection. + - **Direct Input**: Paste text or JSON directly. + - **Sequential QR Codes**: Chunks large data and embeds sequencing info. Option to combine inputs. Uses Error Correction Level H for robustness. + - **Modern Design**: Clean, responsive interface with progress tracking. + ### 💡 Tips + 1. **Inputs**: Use any combination of URL, File, or Direct Input tabs. + 2. **Combine**: Check 'Combine all inputs' to create one QR sequence from all sources. Uncheck to get separate QR sequences for each source (URL, file within archive, direct input). + 3. **Files**: Upload text-based files, JSON, PDF, or supported archives. Content from archives is extracted and processed individually. Large files up to 5GB are supported (but QR generation may fail for very large content). + 4. **JSON**: Use the example button or upload a `.json` file. The app also tries to parse `.txt` or other files as JSON if they contain valid JSON structure. + 5. **Status**: Monitor the Processing Status box for feedback, including errors and progress. + ### 🎨 Output + - Generated QR codes appear in the 'QR Codes' tab and are saved in the `output/qr_codes` directory. + - The structured data processed from all inputs (including any errors) is shown in the 'Processed Data (JSON)' tab. + - Hover over or click QR codes in the gallery for a larger preview. + - Use the 'QR Sequence Visualizer' tab to decode and verify sequences. + """) + + # Add the QR sequence visualizer tab + create_qr_sequence_visualizer(output_gallery) # Pass gallery if needed - # Add helpful documentation (Seems intact) - gr.Markdown(""" - ### 🚀 Features - - **Complete URL Scraping**: Extracts text content from web pages. - - **Advanced File Processing**: Handles text, JSON, and archives (.zip, .tar.*, .gz, .bz2). Attempts intelligent JSON detection. - - **Direct Input**: Paste text or JSON directly. - - **Sequential QR Codes**: Chunks large data and embeds sequencing info. Option to combine inputs. - - **Modern Design**: Clean, responsive interface. - ### 💡 Tips - 1. **Inputs**: Use any combination of URL, File, or Direct Input tabs. - 2. **Combine**: Check 'Combine all inputs' to create one QR sequence from all sources. Uncheck to get separate QR sequences for each source. - 3. **Files**: Upload text-based files, JSON, or supported archives. Content from archives is extracted and processed. - 4. **JSON**: Use the example button or upload a `.json` file. The app also tries to parse `.txt` or other files as JSON if they contain valid JSON structure. - 5. **Status**: Monitor the Processing Status box for feedback. - ### 🎨 Output - - Generated QR codes appear in the 'QR Codes' tab and are saved in the `output/qr_codes` directory. - - The structured data processed from all inputs is shown in the 'Processed Data (JSON)' tab. - - Hover over or click QR codes in the gallery for a larger preview. - """) return interface def main(): @@ -1681,23 +1791,22 @@ def main(): # Create and launch interface interface = create_modern_interface() - # Add the QR sequence visualizer tab (if function is defined and needed) - # with interface: - # create_qr_sequence_visualizer(None) # Pass relevant components if needed - # Launch with configuration interface.launch( - share=False, # Set to True for public link (use with caution) - debug=False, # Set to True for more verbose Gradio errors + share=os.getenv("GRADIO_SHARE", "false").lower() == "true", # Allow sharing via env var + debug=os.getenv("GRADIO_DEBUG", "false").lower() == "true", # Allow debug via env var show_error=True, # Show Python errors in browser console - # server_name="0.0.0.0", # Bind to all interfaces if needed for Docker/network access - # server_port=7860, # Specify port if needed - show_api=False # Disable default Gradio API endpoint unless needed + server_name="0.0.0.0", # Bind to all interfaces for container/network access + server_port=int(os.getenv("GRADIO_PORT", 7860)), # Allow port config via env var + show_api=False, # Disable default Gradio API endpoint unless needed + # enable_queue=True # Consider enabling queue for longer tasks ) logger.info("Gradio application stopped.") except Exception as e: - logger.error(f"Application startup or runtime error: {e}", exc_info=True) - raise + logger.critical(f"Application startup or runtime error: {e}", exc_info=True) + # Optionally add a small delay or specific cleanup before exiting + time.sleep(1) + raise # Reraise critical errors if __name__ == "__main__": # Ensure output directories exist before starting