diff --git "a/app2.py" "b/app2.py" --- "a/app2.py" +++ "b/app2.py" @@ -7,11 +7,11 @@ import mimetypes import zipfile import tempfile import chardet -import io # Needed for processing CSV from string -import csv # Needed for CSV -import xml.etree.ElementTree as ET # Needed for XML +import io +import csv +import xml.etree.ElementTree as ET from datetime import datetime -from typing import List, Dict, Optional, Union, Tuple, Any # Added Any for extracted_data +from typing import List, Dict, Optional, Union, Tuple, Any from pathlib import Path from urllib.parse import urlparse, urljoin import requests @@ -27,8 +27,10 @@ import numpy as np import tarfile import gzip import math -import random # Added for dummy chatbot response variation -import pandas as pd # Added for potential data analysis +import random +import pandas as pd +from requests.adapters import HTTPAdapter +from urllib3.util.retry import Retry # Setup enhanced logging with more detailed formatting logging.basicConfig( @@ -38,7 +40,7 @@ logging.basicConfig( logging.StreamHandler(), logging.FileHandler('app.log', encoding='utf-8') ]) -logger = logging.getLogger(__name__) # logger is now defined here +logger = logging.getLogger(__name__) # Conditional imports for document processing try: @@ -46,18 +48,18 @@ try: PDF_SUPPORT = True except ImportError: PDF_SUPPORT = False - logger.warning("PyPDF2 not installed. PDF file processing will be limited.") # logger is available + logger.warning("PyPDF2 not installed. PDF file processing will be limited.") try: from docx import Document DOCX_SUPPORT = True except ImportError: DOCX_SUPPORT = False - logger.warning("python-docx not installed. DOCX file processing will be limited.") # logger is available + logger.warning("python-docx not installed. DOCX file processing will be limited.") try: from pyth.plugins.plaintext.writer import PlaintextWriter - from pyth.plugins.rtf15.reader import Rtf15Reader # Import Rtf15Reader + from pyth.plugins.rtf15.reader import Rtf15Reader RTF_SUPPORT = True except ImportError: RTF_SUPPORT = False @@ -71,17 +73,6 @@ except ImportError: ODT_SUPPORT = False logger.warning("odfpy not installed. ODT file processing will be limited.") - -# Setup enhanced logging with more detailed formatting -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', - handlers=[ - logging.StreamHandler(), - logging.FileHandler('app.log', encoding='utf-8') - ]) -logger = logging.getLogger(__name__) - # Ensure output directories exist with modern structure OUTPUTS_DIR = Path('output') QR_CODES_DIR = OUTPUTS_DIR / 'qr_codes' @@ -89,129 +80,91 @@ TEMP_DIR = OUTPUTS_DIR / 'temp' for directory in [OUTPUTS_DIR, QR_CODES_DIR, TEMP_DIR]: directory.mkdir(parents=True, exist_ok=True) -# Dummy EnhancedURLProcessor class for demonstration purposes if the actual class isn't provided. -# This dummy simulates fetching and creating a nested structure based on max_steps. class EnhancedURLProcessor: - """Simulates advanced URL processing with enhanced content extraction and recursive link following.""" + """Advanced URL processing with enhanced content extraction and recursive link following.""" def __init__(self): - # Dummy session and user agent for simulation - self.session = type('obj', (object,), {'get': self._dummy_get_request})() - self.user_agent = type('obj', (object,), {'random': 'SimulatedAgent/1.0'})() - self.timeout = 15 - self.max_retries = 3 - - def _dummy_get_request(self, url, timeout): - """Simulates a GET request response.""" - class MockResponse: - def __init__(self, url, status_code, content_type, content, encoding='utf-8'): - self.url = url - self.status_code = status_code - self.headers = {'Content-Type': content_type} - self._content = content.encode(encoding) - self.encoding = encoding - - def raise_for_status(self): - if 400 <= self.status_code < 600: - raise requests.exceptions.RequestException(f"Simulated HTTP error {self.status_code}") - - @property - def content(self): - return self._content - - # Simulate different responses based on URL - if "this-domain-does-not-exist" in url: - raise requests.exceptions.RequestException("Simulated network error: Could not resolve host.") - elif "httpbin.org/html" in url: - # Simulate a simple HTML response - html_content = """ - - -
This is simulated HTML content.
- Link 1 - Link 2 - - - """ - return MockResponse(url, 200, 'text/html', html_content) - elif "quotes.toscrape.com" in url: - # Simulate a more complex HTML with more links for deeper testing - html_content = f""" - - -Some simulated quotes.
- Page 1 - Page 2 - Love Quotes - - - """ - return MockResponse(url, 200, 'text/html', html_content) - elif "/child" in url: - # Simulate nested HTML pages - html_content = f""" - - -Content for {url}.
- Grandchild 1 - - - """ - return MockResponse(url, 200, 'text/html', html_content) - else: - # Default simulated plain text response - return MockResponse(url, 200, 'text/plain', f"Simulated content for {url}") + # Use a real requests session with retry strategy + self.session = requests.Session() + retry_strategy = Retry( + total=3, + backoff_factor=1, + status_forcelist=[429, 500, 502, 503, 504], + allowed_methods=["HEAD", "GET"] + ) + adapter = HTTPAdapter(max_retries=retry_strategy) + self.session.mount("http://", adapter) + self.session.mount("https://", adapter) + self.user_agent = UserAgent() + self.timeout = 15 # seconds def validate_url(self, url: str) -> Dict[str, Any]: - """Enhanced URL validation with detailed feedback (Simulated)""" - # In a real implementation, this would perform actual network checks (HEAD/GET) - # For simulation, just check format + """Enhanced URL validation with accessibility check.""" if not validators.url(url): return {'is_valid': False, 'message': 'Invalid URL format', 'details': 'URL must begin with http:// or https://'} + parsed = urlparse(url) if not all([parsed.scheme, parsed.netloc]): return {'is_valid': False, 'message': 'Incomplete URL', 'details': 'Missing scheme or domain'} - # Simulate accessibility check - if "this-domain-does-not-exist" in url: - return {'is_valid': False, 'message': 'Simulated: URL not accessible', 'details': 'Simulated network error'} + try: + # Use a HEAD request to check accessibility without downloading full content + headers = {'User-Agent': self.user_agent.random} + response = self.session.head(url, timeout=self.timeout, headers=headers, allow_redirects=True) + response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) + + # Check content type if available in HEAD response + content_type = response.headers.get('Content-Type', '').split(';')[0].strip() + if not content_type or not (content_type.startswith('text/') or 'json' in content_type or 'xml' in content_type): + # Basic check if content type seems relevant for text extraction + logger.warning(f"URL {url} returned potentially irrelevant content type: {content_type}") + # Decide if this should invalidate the URL or just add a note + # For now, we'll allow fetching but add a note. - return { - 'is_valid': True, - 'message': 'Simulated: URL is valid and accessible', - 'details': { - 'final_url': url, # In simulation, final_url is same as original unless specifically handled - 'content_type': 'text/html', # Simulate HTML for most tests - 'server': 'SimulatedServer', - 'size': 'SimulatedSize' + return { + 'is_valid': True, + 'message': 'URL is valid and accessible', + 'details': { + 'final_url': response.url, # Capture final URL after redirects + 'content_type': content_type, + 'server': response.headers.get('Server', 'N/A'), + 'size': response.headers.get('Content-Length', 'N/A') + } } - } + except requests.exceptions.RequestException as e: + return {'is_valid': False, 'message': 'URL not accessible', 'details': str(e)} + except Exception as e: + logger.error(f"Unexpected error during URL validation for {url}: {e}") + return {'is_valid': False, 'message': 'Unexpected validation error', 'details': str(e)} + def fetch_content(self, url: str, retry_count: int = 0) -> Optional[Dict[str, Any]]: - """Enhanced content fetcher with retry mechanism and complete character extraction (Simulated)""" + """Enhanced content fetcher with retry mechanism and complete character extraction.""" try: - logger.info(f"Simulating fetch content from URL: {url} (Attempt {retry_count + 1}/{self.max_retries})") - # Simulate the request using the dummy get - response = self.session.get(url, timeout=self.timeout) - response.raise_for_status() - final_url = response.url # Capture potential redirects (simulated) - content_type = response.headers.get('Content-Type', '') - - # Simulate encoding detection (assuming utf-8 for simplicity in simulation) - encoding = 'utf-8' + logger.info(f"Fetching content from URL: {url} (Attempt {retry_count + 1})") + headers = {'User-Agent': self.user_agent.random} + response = self.session.get(url, timeout=self.timeout, headers=headers, allow_redirects=True) + response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) + + final_url = response.url # Capture potential redirects + content_type = response.headers.get('Content-Type', '').split(';')[0].strip() + + # Attempt to detect encoding if not specified in headers + encoding = response.encoding # requests attempts to guess encoding + if encoding is None or encoding == 'ISO-8859-1': # Fallback if requests guess is default/uncertain + try: + encoding_detection = chardet.detect(response.content) + encoding = encoding_detection['encoding'] or 'utf-8' + logger.debug(f"Chardet detected encoding: {encoding} for {url}") + except Exception as e: + logger.warning(f"Chardet detection failed for {url}: {e}. Falling back to utf-8.") + encoding = 'utf-8' + + raw_content = response.content.decode(encoding, errors='replace') - # Extract metadata (simulated) + # Extract metadata metadata = { 'original_url': url, 'final_url': final_url, @@ -223,7 +176,7 @@ class EnhancedURLProcessor: 'status_code': response.status_code } - # Process based on content type (using the actual _process_web_content) + # Process based on content type processed_extraction = self._process_web_content(raw_content, metadata['content_type'], final_url) return { @@ -235,28 +188,24 @@ class EnhancedURLProcessor: 'processing_notes': processed_extraction['notes'] } except requests.exceptions.RequestException as e: - if retry_count < self.max_retries - 1: - logger.warning(f"Simulated Retry {retry_count + 1}/{self.max_retries} for URL: {url}") - time.sleep(0.1) # Shorter backoff for simulation - return self.fetch_content(url, retry_count + 1) - logger.error(f"Simulated: Failed to fetch content after {self.max_retries} attempts from {url}: {e}") + logger.error(f"Failed to fetch content from {url}: {e}") return { 'source': 'url', 'url': url, 'raw_content': None, - 'metadata': {'original_url': url, 'timestamp': datetime.now().isoformat(), 'status_code': None}, # Include basic metadata on failure + 'metadata': {'original_url': url, 'timestamp': datetime.now().isoformat(), 'status_code': getattr(e.response, 'status_code', None)}, 'extracted_data': None, - 'processing_notes': [f"Simulated: Failed to fetch content after {self.max_retries} attempts: {str(e)}"] # Ensure notes is a list + 'processing_notes': [f"Failed to fetch content: {str(e)}"] } except Exception as e: - logger.error(f"Simulated: Unexpected error while fetching or processing URL {url}: {e}") + logger.error(f"Unexpected error while fetching or processing URL {url}: {e}") return { 'source': 'url', 'url': url, 'raw_content': raw_content if 'raw_content' in locals() else None, 'metadata': metadata if 'metadata' in locals() else {'original_url': url, 'timestamp': datetime.now().isoformat(), 'status_code': None}, 'extracted_data': None, - 'processing_notes': [f"Simulated: Unexpected processing error: {str(e)}"] + 'processing_notes': [f"Unexpected processing error: {str(e)}"] } @@ -264,7 +213,7 @@ class EnhancedURLProcessor: """Process content based on detected content type""" lower_content_type = content_type.lower() notes = [] - extracted_data: Any = None # Use Any to allow different types + extracted_data: Any = None try: if 'text/html' in lower_content_type: logger.debug(f"Processing HTML content from {base_url}") @@ -276,7 +225,7 @@ class EnhancedURLProcessor: extracted_data = json.loads(content) notes.append("Parsed as JSON") except json.JSONDecodeError as e: - extracted_data = content # Keep raw text if invalid JSON + extracted_data = content notes.append(f"Failed to parse as JSON: {e}") logger.warning(f"Failed to parse JSON from {base_url}: {e}") except Exception as e: @@ -286,10 +235,9 @@ class EnhancedURLProcessor: elif 'application/xml' in lower_content_type or 'text/xml' in lower_content_type or lower_content_type.endswith('+xml'): logger.debug(f"Processing XML content from {base_url}") try: - # Try parsing XML. Convert to a string representation. root = ET.fromstring(content) xml_text = ET.tostring(root, encoding='unicode', method='xml') - extracted_data = xml_text # Store as string for now + extracted_data = xml_text notes.append("Parsed as XML (text representation)") except ET.ParseError as e: extracted_data = content @@ -299,17 +247,17 @@ class EnhancedURLProcessor: extracted_data = content notes.append(f"Error processing XML: {e}") logger.error(f"Error processing XML from {base_url}: {e}") - elif 'text/plain' in lower_content_type or 'text/' in lower_content_type: # Catch other text types + elif 'text/plain' in lower_content_type or 'text/' in lower_content_type: logger.debug(f"Processing Plain Text content from {base_url}") extracted_data = content notes.append("Processed as Plain Text") else: logger.debug(f"Unknown content type '{content_type}' from {base_url}. Storing raw content.") - extracted_data = content # Store raw content for unknown types + extracted_data = content notes.append(f"Unknown content type '{content_type}'. Stored raw text.") except Exception as e: logger.error(f"Unexpected error in _process_web_content for {base_url} ({content_type}): {e}") - extracted_data = content # Fallback to raw content on error + extracted_data = content notes.append(f"Unexpected processing error: {e}. Stored raw text.") return {'data': extracted_data, 'notes': notes} @@ -317,28 +265,24 @@ class EnhancedURLProcessor: """Process HTML content, preserving text, and extracting metadata and links.""" extracted: Dict[str, Any] = { 'title': None, - 'meta_description': None, # Add extraction for meta description + 'meta_description': None, 'full_text': "", - 'links': [] # Add extraction for links + 'links': [] } try: soup = BeautifulSoup(content, 'html.parser') - # Extract Title if soup.title and soup.title.string: extracted['title'] = soup.title.string.strip() - # Extract Meta Description meta_desc = soup.find('meta', attrs={'name': 'description'}) if meta_desc and meta_desc.get('content'): extracted['meta_description'] = meta_desc['content'].strip() - # Extract and process links (convert relative to absolute) - # Use a set to avoid duplicate URLs in the links list unique_links = set() for a_tag in soup.find_all('a', href=True): href = a_tag['href'].strip() - if href and not href.startswith(('#', 'mailto:', 'tel:', 'javascript:')): # Basic filter + if href and not href.startswith(('#', 'mailto:', 'tel:', 'javascript:')): text = a_tag.get_text().strip() try: absolute_url = urljoin(base_url, href) @@ -346,29 +290,23 @@ class EnhancedURLProcessor: extracted['links'].append({'text': text, 'url': absolute_url}) unique_links.add(absolute_url) except Exception: - # If urljoin fails, keep the original href if it looks like a valid potential URL part if validators.url(href) and href not in unique_links: extracted['links'].append({'text': text, 'url': href}) unique_links.add(href) - elif urlparse(href).netloc and href not in unique_links: # Maybe just a domain/path? + elif urlparse(href).netloc and href not in unique_links: extracted['links'].append({'text': text, 'url': href}) unique_links.add(href) - - # Extract all text content (similar to stripped_strings but ensures order) - # Use a more robust way to get visible text, including handling script/style tags - soup_copy = BeautifulSoup(content, 'html.parser') # Work on a copy to preserve soup for links + soup_copy = BeautifulSoup(content, 'html.parser') for script_or_style in soup_copy(["script", "style"]): - script_or_style.extract() # Remove script and style tags - text = soup_copy.get_text(separator='\n') # Get text with newlines - # Clean up whitespace and empty lines + script_or_style.extract() + text = soup_copy.get_text(separator='\n') lines = text.splitlines() cleaned_lines = [line.strip() for line in lines if line.strip()] extracted['full_text'] = '\n'.join(cleaned_lines) except Exception as e: logger.error(f"Enhanced HTML processing error for {base_url}: {e}") - # Fallback: Store raw text and indicate error soup_copy = BeautifulSoup(content, 'html.parser') for script_or_style in soup_copy(["script", "style"]): script_or_style.extract() @@ -379,8 +317,7 @@ class EnhancedURLProcessor: def fetch_content_with_depth(self, url: str, max_steps: int = 0) -> Dict[str, Any]: """Fetches content from a URL and recursively follows links up to max_steps depth.""" - # Validate max_steps first - if not isinstance(max_steps, int) or not (0 <= max_steps <= 10): # Changed max depth to 10 + if not isinstance(max_steps, int) or not (0 <= max_steps <= 10): logger.error(f"Invalid max_steps value: {max_steps}. Must be an integer between 0 and 10.") return { 'url': url, @@ -390,7 +327,6 @@ class EnhancedURLProcessor: 'processing_notes': [f"Invalid max_steps value: {max_steps}. Must be an integer between 0 and 10."] } - # Validate the initial URL validation_result = self.validate_url(url) if not validation_result['is_valid']: logger.error(f"Initial URL validation failed for {url}: {validation_result['message']}") @@ -402,12 +338,12 @@ class EnhancedURLProcessor: 'processing_notes': [f"Initial URL validation failed: {validation_result['message']}"] } - # Start the recursive fetching process - return self._fetch_content_recursive(url, max_steps, current_step=0) + # Use a set to keep track of visited URLs during the crawl to avoid infinite loops + visited_urls = set() + return self._fetch_content_recursive(url, max_steps, current_step=0, visited_urls=visited_urls) - def _fetch_content_recursive(self, url: str, max_steps: int, current_step: int) -> Dict[str, Any]: + def _fetch_content_recursive(self, url: str, max_steps: int, current_step: int, visited_urls: set) -> Dict[str, Any]: """Recursive helper function to fetch content and follow links.""" - # Base case: Stop if current depth exceeds max_steps if current_step > max_steps: logger.debug(f"Depth limit ({max_steps}) reached for {url} at level {current_step}.") return { @@ -418,43 +354,59 @@ class EnhancedURLProcessor: 'processing_notes': [f"Depth limit ({max_steps}) reached."] } + # Normalize URL before checking visited set + normalized_url = url.rstrip('/') # Simple normalization + + if normalized_url in visited_urls: + logger.debug(f"Skipping already visited URL: {url} at level {current_step}.") + return { + 'url': url, + 'level': current_step, + 'fetch_result': None, # Indicate not fetched in this run + 'linked_extractions': [], + 'processing_notes': ["URL already visited in this crawl."] + } + + visited_urls.add(normalized_url) # Mark as visited + logger.info(f"Processing URL: {url} at level {current_step}/{max_steps}") fetch_result = self.fetch_content(url) linked_extractions: List[Dict[str, Any]] = [] - # Only attempt to extract and follow links if fetch was successful and content is HTML if fetch_result and fetch_result.get('extracted_data') and 'text/html' in fetch_result.get('metadata', {}).get('content_type', '').lower(): extracted_data = fetch_result['extracted_data'] links = extracted_data.get('links', []) - logger.info(f"Found {len(links)} links on {url} at level {current_step}. Proceeding to depth {current_step + 1}.") - # Recursively process linked URLs if more steps are allowed + logger.info(f"Found {len(links)} potential links on {url} at level {current_step}. Proceeding to depth {current_step + 1}.") if current_step < max_steps: for link_info in links: linked_url = link_info.get('url') if linked_url: - # Add a check to prevent processing the same URL repeatedly in a single crawl path - # (More sophisticated de-duplication across the *entire* crawl would require a visited set passed down) - # For simplicity here, we just prevent immediate cycles. - if linked_url != url: - linked_result = self._fetch_content_recursive(linked_url, max_steps, current_step + 1) - # Only append results if the recursive call returned something valid - if linked_result: + # Ensure linked URL is absolute and potentially within the same domain + # Simple same-domain check (can be made more sophisticated) + try: + base_domain = urlparse(url).netloc + linked_domain = urlparse(linked_url).netloc + if linked_domain and linked_domain != base_domain: + logger.debug(f"Skipping external link: {linked_url}") + continue # Skip external links + + # Recursively call for linked URLs + linked_result = self._fetch_content_recursive(linked_url, max_steps, current_step + 1, visited_urls) + if linked_result: linked_extractions.append(linked_result) - else: - logger.debug(f"Skipping self-referencing link: {linked_url}") + except Exception as e: + logger.warning(f"Error processing linked URL {linked_url} from {url}: {e}") - # Add processing notes from the fetch_result to the current level's notes current_notes = fetch_result.get('processing_notes', []) if fetch_result else ['Fetch failed.'] if f"Processed at level {current_step}" not in current_notes: current_notes.append(f"Processed at level {current_step}") - return { 'url': url, 'level': current_step, - 'fetch_result': fetch_result, # Include the full fetch result for details + 'fetch_result': fetch_result, 'linked_extractions': linked_extractions, 'processing_notes': current_notes } @@ -464,17 +416,14 @@ class EnhancedFileProcessor: """Advanced file processing with enhanced content extraction""" def __init__(self, max_file_size: int = 5 * 1024 * 1024 * 1024): # 5GB default self.max_file_size = max_file_size - # Expanded supported extensions to include common docs and structured formats self.supported_extensions = { '.txt', '.md', '.csv', '.json', '.xml', '.html', '.htm', '.log', '.yml', '.yaml', '.ini', '.conf', '.cfg', '.pdf', '.doc', '.docx', '.rtf', '.odt', - # Archives are handled separately but listed for context '.zip', '.tar', '.gz', '.bz2', '.7z', '.rar', } self.archive_extensions = {'.zip', '.tar', '.gz', '.bz2', '.7z', '.rar'} - def process_file(self, file) -> List[Dict]: """Process uploaded file with enhanced error handling and complete extraction""" if not file or not hasattr(file, 'name'): @@ -482,11 +431,8 @@ class EnhancedFileProcessor: return [] dataset = [] - # Use Path object for easier handling. Note: Gradio file object might not be a standard file path, - # but rather an object with a 'name' attribute pointing to a temp file path. file_path = Path(file.name) - # Ensure file exists before trying to get size/stats if not file_path.exists(): logger.error(f"File path does not exist: {file_path}") return [{ @@ -494,10 +440,9 @@ class EnhancedFileProcessor: 'filename': file.name if hasattr(file, 'name') else 'unknown', 'file_size': None, 'extracted_data': None, - 'processing_notes': 'File path does not exist.' + 'processing_notes': ['File path does not exist.'] }] - try: file_size = file_path.stat().st_size if file_size > self.max_file_size: @@ -507,24 +452,19 @@ class EnhancedFileProcessor: 'filename': file_path.name, 'file_size': file_size, 'extracted_data': None, - 'processing_notes': 'File size exceeds limit.' + 'processing_notes': ['File size exceeds limit.'] }] - # Use a temporary directory for extracting archives with tempfile.TemporaryDirectory() as temp_dir: temp_dir_path = Path(temp_dir) - # Decide processing strategy based on extension if file_path.suffix.lower() in self.archive_extensions: dataset.extend(self._process_archive(file_path, temp_dir_path)) elif file_path.suffix.lower() in self.supported_extensions: - # Pass the path to the single file processor dataset.extend(self._process_single_file(file_path)) else: logger.warning(f"Unsupported file type for processing: '{file_path.name}'. Attempting to read as plain text.") - # Attempt to process as raw text even if extension is unsupported try: - # Read as text with error replacement content_bytes = file_path.read_bytes() encoding_detection = chardet.detect(content_bytes) encoding = encoding_detection['encoding'] or 'utf-8' @@ -534,8 +474,8 @@ class EnhancedFileProcessor: 'filename': file_path.name, 'file_size': file_size, 'mime_type': mimetypes.guess_type(file_path.name)[0] or 'unknown/unknown', - 'extracted_data': {'plain_text': raw_content}, # Store raw text under a key - 'processing_notes': ['Processed as plain text (unsupported extension).'] # Ensure notes is a list + 'extracted_data': {'plain_text': raw_content}, + 'processing_notes': ['Processed as plain text (unsupported extension).'] }) except Exception as e: logger.error(f"Error reading or processing unsupported file '{file_path.name}' as text: {e}") @@ -545,10 +485,9 @@ class EnhancedFileProcessor: 'file_size': file_size, 'mime_type': mimetypes.guess_type(file_path.name)[0] or 'unknown/unknown', 'extracted_data': None, - 'processing_notes': [f'Unsupported file type and failed to read as text: {e}'] # Ensure notes is a list + 'processing_notes': [f'Unsupported file type and failed to read as text: {e}'] }) - except Exception as e: logger.error(f"Error processing file '{file_path.name}': {str(e)}") dataset.append({ @@ -556,7 +495,7 @@ class EnhancedFileProcessor: 'filename': file_path.name, 'file_size': file_size if 'file_size' in locals() else None, 'extracted_data': None, - 'processing_notes': [f'Overall file processing error: {str(e)}'] # Ensure notes is a list + 'processing_notes': [f'Overall file processing error: {str(e)}'] }) return dataset @@ -578,18 +517,14 @@ class EnhancedFileProcessor: raw_content: Optional[str] = None extracted_data: Any = None - processing_notes: List[str] = [] # Initialize notes as a list + processing_notes: List[str] = [] try: - # Read content efficiently content_bytes = file_path.read_bytes() encoding_detection = chardet.detect(content_bytes) encoding = encoding_detection['encoding'] or 'utf-8' raw_content = content_bytes.decode(encoding, errors='replace') - # --- Attempt format-specific parsing --- - - # 1. Attempt JSON parsing (explicit .json or application/json, OR if content looks like JSON) is_explicit_json = mime_type == 'application/json' or file_extension == '.json' looks_like_json = raw_content.strip().startswith('{') or raw_content.strip().startswith('[') @@ -600,7 +535,7 @@ class EnhancedFileProcessor: if not is_explicit_json: processing_notes.append("Note: Content looked like JSON despite extension/mime.") logger.warning(f"File '{filename}' identified as JSON content despite extension/mime.") - mime_type = 'application/json' # Update mime_type if successfully parsed as JSON + mime_type = 'application/json' except json.JSONDecodeError as e: processing_notes.append(f"Failed to parse as JSON: {e}.") if is_explicit_json: @@ -611,21 +546,16 @@ class EnhancedFileProcessor: processing_notes.append(f"Error processing JSON: {e}.") logger.error(f"Error processing JSON in '{filename}': {e}") - # 2. Attempt XML parsing (if not already parsed as JSON, and looks like XML) - # Add check if extracted_data is still None (meaning JSON parsing failed or wasn't attempted/relevant) - looks_like_xml = extracted_data is None and raw_content.strip().startswith('<') and raw_content.strip().endswith('>') # Simple heuristic + looks_like_xml = extracted_data is None and raw_content.strip().startswith('<') and raw_content.strip().endswith('>') is_explicit_xml = extracted_data is None and (mime_type in ('application/xml', 'text/xml') or mime_type.endswith('+xml') or file_extension in ('.xml', '.xsd')) if extracted_data is None and (is_explicit_xml or looks_like_xml): try: root = ET.fromstring(raw_content) - # Convert XML element tree to a structured dictionary or string - # Simple string representation for QR code suitability extracted_data = ET.tostring(root, encoding='unicode', method='xml') processing_notes.append("Parsed as XML (text representation).") if not is_explicit_xml: processing_notes.append("Note: Content looked like XML despite extension/mime.") - # Update mime_type if successfully parsed as XML if 'xml' not in mime_type: mime_type = 'application/xml' except ET.ParseError as e: processing_notes.append(f"Failed to parse as XML: {e}.") @@ -637,43 +567,36 @@ class EnhancedFileProcessor: processing_notes.append(f"Error processing XML: {e}.") logger.error(f"Error processing XML in '{filename}': {e}") - - # 3. Attempt CSV parsing (if not already parsed, and looks like CSV or is explicit CSV) is_explicit_csv = extracted_data is None and (mime_type == 'text/csv' or file_extension == '.csv') - # Heuristic: check for commas/semicolons and multiple lines looks_like_csv = extracted_data is None and (',' in raw_content or ';' in raw_content) and ('\n' in raw_content or len(raw_content.splitlines()) > 1) if extracted_data is None and (is_explicit_csv or looks_like_csv): try: - # Use Sniffer to guess dialect for better compatibility - dialect = 'excel' # Default dialect + dialect = 'excel' try: - # Look at first few lines to guess dialect sample = '\n'.join(raw_content.splitlines()[:10]) if sample: dialect = csv.Sniffer().sniff(sample).name logger.debug(f"Sniffer detected CSV dialect: {dialect} for '{filename}'") except csv.Error: logger.debug(f"Sniffer failed to detect dialect for '{filename}', using 'excel'.") - dialect = 'excel' # Fallback + dialect = 'excel' - # Read using the guessed or default dialect csv_reader = csv.reader(io.StringIO(raw_content), dialect=dialect) rows = list(csv_reader) if rows: - # Limit the number of rows included for potentially huge CSVs - max_rows_preview = 100 # Limit text preview + max_rows_preview = 100 extracted_data = { - 'headers': rows[0] if rows and rows[0] else None, # Assume first row is header if exists - 'rows': rows[1:max_rows_preview+1] if len(rows) > 1 else [] # Get up to max_rows_preview data rows, if any + 'headers': rows[0] if rows and rows[0] else None, + 'rows': rows[1:max_rows_preview+1] if len(rows) > 1 else [] } if len(rows) > max_rows_preview + 1: processing_notes.append(f"CSV data rows truncated to {max_rows_preview}.") processing_notes.append("Parsed as CSV.") if not is_explicit_csv: processing_notes.append("Note: Content looked like CSV despite extension/mime.") - mime_type = 'text/csv' # Update mime_type + mime_type = 'text/csv' else: extracted_data = "Empty CSV" @@ -685,14 +608,12 @@ class EnhancedFileProcessor: processing_notes.append(f"Failed to parse as CSV: {e}.") logger.warning(f"Failed to parse CSV from '{filename}': {e}") - - # 4. Attempt Document Text Extraction (if not already parsed) if extracted_data is None: try: extracted_text = None if file_extension == '.pdf' and PDF_SUPPORT: with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: - tmp_file.write(content_bytes) # Write bytes to temp file + tmp_file.write(content_bytes) temp_path = Path(tmp_file.name) try: reader = PdfReader(temp_path) @@ -700,10 +621,10 @@ class EnhancedFileProcessor: extracted_text = text_content processing_notes.append("Extracted text from PDF.") finally: - if temp_path.exists(): temp_path.unlink() # Clean up temp file + if temp_path.exists(): temp_path.unlink() elif file_extension == '.docx' and DOCX_SUPPORT: with tempfile.NamedTemporaryFile(delete=False, suffix='.docx') as tmp_file: - tmp_file.write(content_bytes) # Write bytes to temp file + tmp_file.write(content_bytes) temp_path = Path(tmp_file.name) try: document = Document(temp_path) @@ -711,11 +632,9 @@ class EnhancedFileProcessor: extracted_text = text_content processing_notes.append("Extracted text from DOCX.") finally: - if temp_path.exists(): temp_path.unlink() # Clean up temp file + if temp_path.exists(): temp_path.unlink() elif file_extension == '.rtf' and RTF_SUPPORT: - # pyth can read directly from file-like object or string try: - # Rtf15Reader expects a file-like object or string doc = Rtf15Reader.read(io.StringIO(raw_content)) text_content = PlaintextWriter.write(doc).getvalue() extracted_text = text_content @@ -725,7 +644,7 @@ class EnhancedFileProcessor: logger.warning(f"Failed to extract RTF text from '{filename}': {e}") elif file_extension == '.odt' and ODT_SUPPORT: with tempfile.NamedTemporaryFile(delete=False, suffix='.odt') as tmp_file: - tmp_file.write(content_bytes) # Write bytes to temp file + tmp_file.write(content_bytes) temp_path = Path(tmp_file.name) try: text_doc = OpenDocumentText(temp_path) @@ -734,15 +653,13 @@ class EnhancedFileProcessor: extracted_text = text_content processing_notes.append("Extracted text from ODT.") finally: - if temp_path.exists(): temp_path.unlink() # Clean up temp file + if temp_path.exists(): temp_path.unlink() elif file_extension in ['.doc', '.ppt', '.pptx', '.xls', '.xlsx']: - # These require more complex or platform-specific libraries (e.g. antiword, pandoc, COM objects on Windows) processing_notes.append(f"Automatic text extraction for {file_extension.upper()} not fully implemented.") logger.warning(f"Automatic text extraction for {file_extension.upper()} not fully implemented for '{filename}'.") if extracted_text is not None: - # Limit extracted text size - max_extracted_text_size = 10000 # Limit text preview + max_extracted_text_size = 10000 extracted_data = {'text': extracted_text[:max_extracted_text_size]} if len(extracted_text) > max_extracted_text_size: extracted_data['text'] += "..." @@ -754,26 +671,19 @@ class EnhancedFileProcessor: processing_notes.append(f"Error during document text extraction: {e}") logger.warning(f"Error during document text extraction for '{filename}': {e}") - - # 5. Fallback to Plain Text (if no specific extraction succeeded) if extracted_data is None: extracted_data = {'plain_text': raw_content} processing_notes.append("Stored as plain text.") - # Re-guess mime type if it was something specific like application/octet-stream and we just got text if mime_type in ['unknown/unknown', 'application/octet-stream']: - guessed_text_mime, _ = mimetypes.guess_type('dummy.txt') # Use a dummy file name to guess plain text + guessed_text_mime, _ = mimetypes.guess_type('dummy.txt') if guessed_text_mime: mime_type = guessed_text_mime - except Exception as e: - # Catch errors during initial read or other unexpected issues logger.error(f"Fatal error processing single file '{filename}': {e}") processing_notes.append(f"Fatal processing error: {e}") - raw_content = None # Ensure raw_content is None if reading failed + raw_content = None extracted_data = None - - # Add file info to the entry entry = { 'source': 'file', 'filename': filename, @@ -781,15 +691,14 @@ class EnhancedFileProcessor: 'mime_type': mime_type, 'created': datetime.fromtimestamp(file_path.stat().st_ctime).isoformat() if file_path.exists() else None, 'modified': datetime.fromtimestamp(file_path.stat().st_mtime).isoformat() if file_path.exists() else None, - 'raw_content': raw_content, # Always include raw content if readable - 'extracted_data': extracted_data, # Include the structured/extracted data - 'processing_notes': processing_notes # Include any notes/errors encountered + 'raw_content': raw_content, + 'extracted_data': extracted_data, + 'processing_notes': processing_notes } dataset_entries.append(entry) return dataset_entries - def _process_archive(self, archive_path: Path, extract_to: Path) -> List[Dict]: """Process an archive file with enhanced extraction""" dataset = [] @@ -801,21 +710,16 @@ class EnhancedFileProcessor: if zipfile.is_zipfile(archive_path): with zipfile.ZipFile(archive_path, 'r') as zip_ref: for file_info in zip_ref.infolist(): - # Skip directories and empty files if file_info.file_size > 0 and not file_info.filename.endswith('/'): - # Sanitize filename to prevent directory traversal issues - sanitized_filename = Path(file_info.filename).name # Takes only the base name + sanitized_filename = Path(file_info.filename).name extracted_file_path = extract_to / sanitized_filename try: - # Extract file to the temporary directory with zip_ref.open(file_info) as zf, open(extracted_file_path, 'wb') as outfile: outfile.write(zf.read()) - # Recursively process the extracted file if it's supported and not an archive itself if extracted_file_path.suffix.lower() in self.supported_extensions and not self._is_archive(extracted_file_path): dataset.extend(self._process_single_file(extracted_file_path)) elif extracted_file_path.suffix.lower() in self.archive_extensions: - # Recursively process nested archives (careful with depth!) logger.info(f"Found nested archive '{file_info.filename}', processing recursively.") dataset.extend(self._process_archive(extracted_file_path, extract_to)) else: @@ -823,45 +727,36 @@ class EnhancedFileProcessor: except Exception as e: logger.warning(f"Error extracting/processing file '{file_info.filename}' from zip '{archive_path.name}': {e}") finally: - # Clean up the extracted file immediately after processing if extracted_file_path.exists(): try: extracted_file_path.unlink() except OSError as e: logger.warning(f"Failed to clean up extracted file {extracted_file_path}: {e}") - else: logger.error(f"'{archive_path.name}' is not a valid zip file.") elif archive_extension in ('.tar', '.gz', '.tgz'): try: - # Determine mode: 'r' for tar, 'r:gz' for tar.gz, 'r:bz2' for tar.bz2 (bz2 not fully supported yet) mode = 'r' if archive_extension in ('.tar.gz', '.tgz'): mode = 'r:gz' - # elif archive_extension == '.tar.bz2': mode = 'r:bz2' # Needs bz2 support - # Note: 'r:*' attempts to guess compression, safer to be explicit with tarfile.open(archive_path, mode) as tar_ref: for member in tar_ref.getmembers(): if member.isfile(): - # Sanitize member name sanitized_filename = Path(member.name).name extracted_file_path = extract_to / sanitized_filename try: - # Extract member to the temporary directory - # Ensure the target path is within the extraction directory if not str(extracted_file_path).startswith(str(extract_to)): logger.warning(f"Skipping potentially malicious path in tar: {member.name}") - continue # Skip if path is outside the temp dir + continue with tar_ref.extractfile(member) as tf, open(extracted_file_path, 'wb') as outfile: - if tf: # extractfile can return None for special file types + if tf: outfile.write(tf.read()) else: logger.warning(f"Could not extract file-like object for {member.name} from tar.") - continue # Skip this member + continue - # Recursively process extracted file if extracted_file_path.suffix.lower() in self.supported_extensions and not self._is_archive(extracted_file_path): dataset.extend(self._process_single_file(extracted_file_path)) elif extracted_file_path.suffix.lower() in self.archive_extensions: @@ -872,7 +767,6 @@ class EnhancedFileProcessor: except Exception as e: logger.warning(f"Error extracting/processing file '{member.name}' from tar '{archive_path.name}': {e}") finally: - # Clean up the extracted file immediately after processing if extracted_file_path.exists(): try: extracted_file_path.unlink() @@ -883,13 +777,11 @@ class EnhancedFileProcessor: logger.error(f"Error processing TAR archive '{archive_path.name}': {e}") elif archive_extension == '.gz': - # GZIP archives typically contain a single file. Extract it and process. - extracted_name = archive_path.stem # Get name without .gz + extracted_name = archive_path.stem extracted_path = extract_to / extracted_name try: with gzip.open(archive_path, 'rb') as gz_file, open(extracted_path, 'wb') as outfile: outfile.write(gz_file.read()) - # Process the extracted file if supported if extracted_path.suffix.lower() in self.supported_extensions and not self._is_archive(extracted_path): dataset.extend(self._process_single_file(extracted_path)) elif extracted_path.suffix.lower() in self.archive_extensions: @@ -905,48 +797,34 @@ class EnhancedFileProcessor: finally: if extracted_path.exists(): try: - extracted_path.unlink() # Clean up extracted file + extracted_path.unlink() except OSError as e: logger.warning(f"Failed to clean up extracted file {extracted_path}: {e}") - - # TODO: Add support for other archive types (.bz2, .7z, .rar) elif archive_extension in ('.bz2', '.7z', '.rar'): logger.warning(f"Support for {archive_extension} archives is not yet fully implemented and requires external tools/libraries.") except Exception as e: logger.error(f"Overall archive processing error for '{archive_path.name}': {e}") - # Clean up extracted files in temp_dir after processing - # Handled by context manager 'with tempfile.TemporaryDirectory()' - - return dataset def chunk_data(self, data: Union[Dict, List], max_size: int = 2953) -> List[Dict]: """Enhanced data chunking with sequence metadata""" try: - # Convert data to JSON string - # Use separators=(',', ':') to remove unnecessary whitespace for maximum data density in QR code json_str = json.dumps(data, ensure_ascii=False, separators=(',', ':')) total_length = len(json_str) - # Calculate overhead for metadata - # Metadata structure: {"idx":0,"tc":1,"tl":XXX,"hash":"YYYY","data":"..."}, shortened keys metadata_template = { - "idx": 0, # chunk_index - "tc": 1, # total_chunks - "tl": total_length, # total_length - "hash": 0, # chunk_hash (using int for hash) - "data": "" # chunk_data + "idx": 0, + "tc": 1, + "tl": total_length, + "hash": 0, + "data": "" } - # Estimate overhead more accurately by dumping a sample metadata structure - # and adding some safety margin. Shortened keys reduce overhead. - # Use a dummy hash value (e.g., 1234567890) to get a realistic length estimate metadata_template_with_hash = {**metadata_template, "hash": 1234567890} - overhead_estimate = len(json.dumps(metadata_template_with_hash, separators=(',', ':'))) + 50 # Extra padding + overhead_estimate = len(json.dumps(metadata_template_with_hash, separators=(',', ':'))) + 50 - # Calculate effective chunk size effective_chunk_size = max_size - overhead_estimate if effective_chunk_size <= 0: @@ -954,35 +832,22 @@ class EnhancedFileProcessor: return [] if total_length <= effective_chunk_size: - # Data fits in one chunk - chunk_data = json_str # Use the full string + chunk_data = json_str chunk = { "idx": 0, "tc": 1, "tl": total_length, - "hash": hash(chunk_data) & 0xFFFFFFFF, # 32-bit hash + "hash": hash(chunk_data) & 0xFFFFFFFF, "data": chunk_data } return [chunk] - # Calculate number of chunks needed - num_chunks = -(-total_length // effective_chunk_size) # Ceiling division - # Adjust chunk_size slightly to distribute evenly, maybe not strictly necessary - # chunk_size = -(-total_length // num_chunks) # Use this if perfect distribution is needed - + num_chunks = -(-total_length // effective_chunk_size) chunks = [] current_pos = 0 for i in range(num_chunks): - # Find the end of the current chunk. Avoid splitting in the middle of escaped characters or surrogate pairs if possible, - # but simple slicing is usually okay for standard text that's already been errors='replace'. - # We'll use basic slicing for simplicity, as the JSON string is just text. end_pos = min(current_pos + effective_chunk_size, total_length) - - # Basic attempt to not break in the middle of a UTF-8 character if slicing bytes, - # but since we are slicing a *decoded string*, this is less of an issue. - # However, slicing in the middle of JSON structure is bad. - # For simplicity and robustness with arbitrary JSON structures, slicing the raw string is the easiest. chunk_data_str = json_str[current_pos:end_pos] chunk = { @@ -995,12 +860,9 @@ class EnhancedFileProcessor: chunks.append(chunk) current_pos = end_pos - # Final check: Ensure all data was chunked if current_pos < total_length: - # This shouldn't happen with correct ceiling division and min() logger.error(f"Chunking logic error: Only processed {current_pos} of {total_length} characters.") - return [] # Indicate failure - + return [] logger.info(f"Chunked data into {num_chunks} chunks for QR codes.") return chunks @@ -1019,51 +881,34 @@ def generate_stylish_qr(data: Union[str, Dict], try: qr = qrcode.QRCode( version=None, - error_correction=qrcode.constants.ERROR_CORRECT_M, # Increased error correction + error_correction=qrcode.constants.ERROR_CORRECT_M, box_size=size, border=border ) - # Add data to QR code if isinstance(data, dict): - # Use compact JSON representation qr.add_data(json.dumps(data, ensure_ascii=False, separators=(',', ':'))) else: - qr.add_data(str(data)) # Ensure it's a string + qr.add_data(str(data)) qr.make(fit=True) - # Create QR code image with custom colors qr_image = qr.make_image(fill_color=fill_color, back_color=back_color) - - # Convert to RGBA for transparency support and potential overlays qr_image = qr_image.convert('RGBA') - # Optional: Add a small logo or icon in the center (requires design) - # logo = Image.open("logo.png").convert("RGBA") - # logo = logo.resize((logo.width // 4, logo.height // 4)) # Resize logo - # logo_pos = ((qr_image.width - logo.width) // 2, (qr_image.height - logo.height) // 2) - # qr_image.paste(logo, logo_pos, logo) - - # Add subtle gradient overlay (optional visual enhancement) try: gradient = Image.new('RGBA', qr_image.size, (0, 0, 0, 0)) draw = ImageDraw.Draw(gradient) - # Horizontal gradient for subtle effect for i in range(qr_image.width): - # Fades from left (alpha=0) to right (max_alpha) - alpha = int(255 * (i/qr_image.width) * 0.05) # e.g., 5% maximum opacity fade-in + alpha = int(255 * (i/qr_image.width) * 0.05) draw.line([(i, 0), (i, qr_image.height)], fill=(0, 0, 0, alpha)) - # Combine images final_image = Image.alpha_composite(qr_image, gradient) except Exception as e: logger.warning(f"Failed to add gradient overlay to QR code: {e}. Using plain QR.") final_image = qr_image - - # Save the image output_path = QR_CODES_DIR / filename - final_image.save(output_path, quality=90) # Save with slightly lower quality for smaller file size + final_image.save(output_path, quality=90) return str(output_path) except Exception as e: @@ -1072,27 +917,25 @@ def generate_stylish_qr(data: Union[str, Dict], def generate_qr_codes(data: Union[str, Dict, List], combined: bool = True) -> List[str]: """Generate QR codes with enhanced visual appeal and metadata""" - # Assume 'data' here is the list of dictionaries produced by process_inputs if not isinstance(data, list): logger.error("generate_qr_codes received data that is not a list.") return [] try: - file_processor = EnhancedFileProcessor() # Use the enhanced processor for chunking + file_processor = EnhancedFileProcessor() paths = [] if combined: - # Process combined data - chunks = file_processor.chunk_data(data) # chunk_data works on the list of dicts + chunks = file_processor.chunk_data(data) if not chunks: logger.warning("No chunks generated for combined data.") return [] for i, chunk in enumerate(chunks): filename = f'combined_qr_{int(time.time())}_{i+1}_of_{len(chunks)}.png' qr_path = generate_stylish_qr( - data=chunk, # Pass the chunk dictionary + data=chunk, filename=filename, - fill_color="#1a365d", # Deep blue + fill_color="#1a365d", back_color="#ffffff" ) if qr_path: @@ -1100,19 +943,18 @@ def generate_qr_codes(data: Union[str, Dict, List], combined: bool = True) -> Li else: logger.warning(f"Failed to generate QR for chunk {i+1}/{len(chunks)}.") else: - # Process individual items (each dictionary in the list) - if data: # Ensure data is not empty + if data: for idx, item in enumerate(data): - chunks = file_processor.chunk_data(item) # chunk_data works on individual dict + chunks = file_processor.chunk_data(item) if not chunks: logger.warning(f"No chunks generated for item {idx+1}.") continue for chunk_idx, chunk in enumerate(chunks): filename = f'item_{idx+1}_chunk_{chunk_idx+1}_of_{len(chunks)}_{int(time.time())}.png' qr_path = generate_stylish_qr( - data=chunk, # Pass the chunk dictionary + data=chunk, filename=filename, - fill_color="#1a365d", # Deep blue + fill_color="#1a365d", back_color="#ffffff" ) if qr_path: @@ -1129,11 +971,6 @@ def generate_qr_codes(data: Union[str, Dict, List], combined: bool = True) -> Li logger.error(f"QR code generation error: {e}") return [] -# Keep the Gradio UI definition and main function as they are, -# as the changes are internal to the processing classes and the -# process_inputs function already handles calling them and getting -# the combined list of results. - # --- Chatbot Logic --- def respond_to_chat(message: str, chat_history: List[Tuple[str, str]], chatbot_data: Optional[List[Dict]]) -> Tuple[List[Tuple[str, str]], List[Dict]]: """Responds to user chat messages based on the loaded JSON data.""" @@ -1141,65 +978,85 @@ def respond_to_chat(message: str, chat_history: List[Tuple[str, str]], chatbot_d chat_history.append((message, "Please process some data first using the other tabs before chatting.")) return chat_history, chatbot_data - # Add user message to history - chat_history.append((message, "")) # Add empty string for bot response initially + chat_history.append((message, "")) response = "" lower_message = message.lower().strip() try: - # Attempt to flatten the data structure for easier querying if it's nested - # This is a simplified approach; a real implementation might need a more - # sophisticated data traversal/query engine. + # Attempt to flatten the data structure for easier querying flat_data = [] - for item in chatbot_data: - # Recursively flatten dictionaries and lists within the item - def flatten_item(d, parent_key='', sep='_'): - items = [] + def flatten_item(d, parent_key='', sep='_'): + items = [] + if isinstance(d, dict): for k, v in d.items(): new_key = parent_key + sep + k if parent_key else k - if isinstance(v, dict): + if isinstance(v, (dict, list)): items.extend(flatten_item(v, new_key, sep=sep).items()) - elif isinstance(v, list): - # Flatten lists by processing each element - for i, elem in enumerate(v): - if isinstance(elem, (dict, list)): - items.extend(flatten_item({f'{new_key}_{i}': elem}, sep=sep).items()) - else: - items.append((f'{new_key}_{i}', elem)) else: items.append((new_key, v)) - return dict(items) + elif isinstance(d, list): + for i, elem in enumerate(d): + if isinstance(elem, (dict, list)): + items.extend(flatten_item(elem, f'{parent_key}_{i}' if parent_key else str(i), sep=sep).items()) + else: + items.append((f'{parent_key}_{i}' if parent_key else str(i), elem)) # Handle lists of non-dicts + # Note: If the top-level chatbot_data is NOT a list of dicts, this flattening might need adjustment. + # Assuming chatbot_data is a list of results, where each result is a dict. + + return dict(items) + # Process each top-level item in chatbot_data + for i, item in enumerate(chatbot_data): if isinstance(item, dict): - flat_data.append(flatten_item(item)) + # Flatten the 'extracted_data' part if it exists and is a dict/list + extracted_data_part = item.get('extracted_data') + if isinstance(extracted_data_part, (dict, list)): + flat_item_data = flatten_item(extracted_data_part, parent_key=f'item_{i}_extracted_data') + # Include some top-level metadata if useful + metadata_part = {k: v for k, v in item.items() if k not in ['extracted_data', 'raw_content', 'linked_extractions']} + flat_data.append({**metadata_part, **flat_item_data}) + else: + # If extracted_data is not dict/list, just include top-level keys + flat_data.append({k: v for k, v in item.items() if k != 'raw_content'}) # Exclude raw_content + elif isinstance(item, list): - # If a top-level item is a list, try flattening its elements - for sub_item in item: - if isinstance(sub_item, dict): - flat_data.append(flatten_item(sub_item)) - else: - # Handle non-dict items in the top-level list if necessary - flat_data.append({'value': sub_item}) # Wrap in dict + # If a top-level item is a list itself (less common for single file/URL results but possible), flatten it + flat_data.extend(flatten_item(item, parent_key=f'item_{i}')) + else: + # Handle cases where top-level item is not a dict or list + flat_data.append({f'item_{i}_value': item}) - # Use pandas DataFrame for easier querying and analysis if flat_data is suitable df = None if flat_data: try: - # Attempt to create a DataFrame. This might fail if structures are highly inconsistent. df = pd.DataFrame(flat_data) logger.debug(f"Created DataFrame with shape: {df.shape}") logger.debug(f"DataFrame columns: {list(df.columns)}") except Exception as e: logger.warning(f"Could not create pandas DataFrame from processed data: {e}. Falling back to manual processing.") - df = None # Ensure df is None if creation failed - + df = None # --- Complex Queries and Analysis --- if df is not None: - # Example: How many unique values in a column? + # List available columns + if "what columns are available" in lower_message or "list columns" in lower_message: + response = f"The available columns in the data are: {', '.join(df.columns)}" + + # Describe a specific column + match = re.search(r'describe column (\w+)', lower_message) + if match: + column_name = match.group(1) + if column_name in df.columns: + description = df[column_name].describe().to_string() + response = f"Description for column '{column_name}':\n```\n{description}\n```" + else: + response = f"I couldn't find a column named '{column_name}'. Available columns are: {', '.join(df.columns)}" + + + # How many unique values in a column? match = re.search(r'how many unique values in (\w+)', lower_message) if match: column_name = match.group(1) @@ -1209,15 +1066,13 @@ def respond_to_chat(message: str, chat_history: List[Tuple[str, str]], chatbot_d else: response = f"I couldn't find a column named '{column_name}' in the data. Available columns are: {', '.join(df.columns)}" - # Example: What is the average/sum/min/max of a numeric column? + # What is the average/sum/min/max of a numeric column? match = re.search(r'what is the (average|sum|min|max) of (\w+)', lower_message) if match: operation, column_name = match.groups() if column_name in df.columns: try: - # Attempt to convert column to numeric, coercing errors numeric_col = pd.to_numeric(df[column_name], errors='coerce') - # Drop NaNs that resulted from coercion numeric_col = numeric_col.dropna() if not numeric_col.empty: @@ -1243,55 +1098,72 @@ def respond_to_chat(message: str, chat_history: List[Tuple[str, str]], chatbot_d else: response = f"I couldn't find a column named '{column_name}'. Available columns are: {', '.join(df.columns)}" - # Example: Filter data based on a simple condition (e.g., price > 100) - match = re.search(r'show me items where (\w+)\s*([<>=!]+)\s*(\w+)', lower_message) + # Filter data based on a simple condition + match = re.search(r'show me items where (\w+)\s*([<>=!]+)\s*([\w"\']*)', lower_message) # Added quotes to value regex if match: column_name, operator, value_str = match.groups() try: - # Attempt to infer value type (numeric, string) - try: - value = float(value_str) - is_numeric_comparison = True - except ValueError: - value = value_str.strip("'\"") # Remove quotes for string comparison - is_numeric_comparison = False + # Attempt to infer value type (numeric, string, boolean) + value: Any + is_numeric_comparison = False + is_boolean_comparison = False + + if value_str.lower() in ['true', 'false']: + value = value_str.lower() == 'true' + is_boolean_comparison = True + else: + try: + value = float(value_str.strip("'\"")) + is_numeric_comparison = True + except ValueError: + value = value_str.strip("'\"") + if column_name in df.columns: if is_numeric_comparison: - # Ensure the column is numeric for comparison numeric_col = pd.to_numeric(df[column_name], errors='coerce') - filtered_df = df.loc[pd.notna(numeric_col)] # Filter out rows where conversion failed - - if operator == '>': - filtered_results = filtered_df[numeric_col > value] - elif operator == '<': - filtered_results = filtered_df[numeric_col < value] - elif operator == '>=': - filtered_results = filtered_df[numeric_col >= value] - elif operator == '<=': - filtered_results = filtered_df[numeric_col <= value] - elif operator == '==': - filtered_results = filtered_df[numeric_col == value] - elif operator == '!=': - filtered_results = filtered_df[numeric_col != value] + filtered_df = df.loc[pd.notna(numeric_col)] + + if operator == '>': filtered_results = filtered_df[numeric_col > value] + elif operator == '<': filtered_results = filtered_df[numeric_col < value] + elif operator == '>=': filtered_results = filtered_df[numeric_col >= value] + elif operator == '<=': filtered_results = filtered_df[numeric_col <= value] + elif operator == '==': filtered_results = filtered_df[numeric_col == value] + elif operator == '!=': filtered_results = filtered_df[numeric_col != value] else: - filtered_results = pd.DataFrame() # Empty if operator is not recognized + filtered_results = pd.DataFrame() response = f"Unsupported numeric operator: {operator}. Try >, <, >=, <=, ==, !=." if not filtered_results.empty: preview = filtered_results.to_json(orient='records', indent=2)[:500] + "..." if len(filtered_results.to_json()) > 500 else filtered_results.to_json(orient='records', indent=2) response = f"Here are the items where '{column_name}' {operator} {value_str}:\n```json\n{preview}\n```" - elif 'response' not in locals(): # Only set if not already set by unsupported operator + elif 'response' not in locals(): response = f"No items found where '{column_name}' {operator} {value_str}." - elif operator == '==': # Simple string equality + elif is_boolean_comparison: + # Ensure column is boolean or can be interpreted as boolean + boolean_col = df[column_name].astype(bool, errors='ignore') # Coerce errors, might need more robust check + if operator == '==': filtered_results = df[boolean_col == value] + elif operator == '!=': filtered_results = df[boolean_col != value] + else: + filtered_results = pd.DataFrame() + response = f"Unsupported boolean operator: {operator}. Try == or !=." + + if not filtered_results.empty: + preview = filtered_results.to_json(orient='records', indent=2)[:500] + "..." if len(filtered_results.to_json()) > 500 else filtered_results.to_json(orient='records', indent=2) + response = f"Here are the items where '{column_name}' is {value_str}:\n```json\n{preview}\n```" + elif 'response' not in locals(): + response = f"No items found where '{column_name}' is {value_str}." + + + elif operator == '==': filtered_results = df[df[column_name] == value] if not filtered_results.empty: preview = filtered_results.to_json(orient='records', indent=2)[:500] + "..." if len(filtered_results.to_json()) > 500 else filtered_results.to_json(orient='records', indent=2) response = f"Here are the items where '{column_name}' is '{value}':\n```json\n{preview}\n```" else: response = f"No items found where '{column_name}' is '{value}'." - elif operator == '!=': # Simple string inequality + elif operator == '!=': filtered_results = df[df[column_name] != value] if not filtered_results.empty: preview = filtered_results.to_json(orient='records', indent=2)[:500] + "..." if len(filtered_results.to_json()) > 500 else filtered_results.to_json(orient='records', indent=2) @@ -1301,7 +1173,6 @@ def respond_to_chat(message: str, chat_history: List[Tuple[str, str]], chatbot_d else: response = f"Unsupported operator for string comparison: {operator}. Try == or !=." - else: response = f"I couldn't find a column named '{column_name}'. Available columns are: {', '.join(df.columns)}" @@ -1309,10 +1180,25 @@ def respond_to_chat(message: str, chat_history: List[Tuple[str, str]], chatbot_d response = f"An error occurred while filtering data: {e}" logger.error(f"Error filtering data based on condition: {e}") + # Request structured output (e.g., as CSV or simplified JSON) + if "output as csv" in lower_message or "export as csv" in lower_message: + if df is not None and not df.empty: + csv_output = df.to_csv(index=False) + response = f"Here is the data in CSV format:\n```csv\n{csv_output[:1000]}...\n```\n(Output truncated for chat display)" + else: + response = "There is no data available to output as CSV." + + elif "output as json" in lower_message or "export as json" in lower_message: + if df is not None and not df.empty: + json_output = df.to_json(orient='records', indent=2) + response = f"Here is the data in JSON format:\n```json\n{json_output[:1000]}...\n```\n(Output truncated for chat display)" + else: + response = "There is no data available to output as JSON." + - # --- General Queries --- + # --- General Queries (if no DataFrame or specific query matched) --- - if not response: # If no specific analysis/query matched yet + if not response: if "how many items" in lower_message or "number of items" in lower_message: if isinstance(chatbot_data, list): response = f"There are {len(chatbot_data)} top-level items in the processed data." @@ -1331,7 +1217,6 @@ def respond_to_chat(message: str, chat_history: List[Tuple[str, str]], chatbot_d response = "The processed data is not a standard list or dictionary structure that I can easily describe." elif "show me" in lower_message or "get me" in lower_message or "extract" in lower_message: - # Basic extraction if DataFrame wasn't created or query wasn't complex parts = lower_message.split("show me") if len(parts) > 1: key_request = parts[1].strip().split(" ")[0] @@ -1379,10 +1264,10 @@ def respond_to_chat(message: str, chat_history: List[Tuple[str, str]], chatbot_d response = random.choice(["You're welcome!", "Glad I could help.", "No problem! Let me know if you have more questions about the data."]) elif "clear chat" in lower_message: - chat_history = [] # Clear history + chat_history = [] response = "Chat history cleared." - elif not response: # If still no specific response matched + elif not response: response = random.choice([ "I can analyze the data you've processed. What would you like to know?", "Ask me about the number of items, the structure, or values of specific fields.", @@ -1396,23 +1281,17 @@ def respond_to_chat(message: str, chat_history: List[Tuple[str, str]], chatbot_d response = f"An internal error occurred while processing your request: {e}" response += "\nPlease try rephrasing your question or clear the chat history." - - # Update the last message in history with the bot's response if chat_history and chat_history[-1][1] == "": chat_history[-1] = (chat_history[-1][0], response) else: - # This case should ideally not happen if the initial append was done correctly - chat_history.append(("", response)) # Append bot response if something went wrong with initial append - - - return chat_history, chatbot_data # Return updated history and data state + chat_history.append(("", response)) + return chat_history, chatbot_data # --- Gradio Interface Definition --- def create_modern_interface(): """Create a modern and visually appealing Gradio interface""" - # Modern CSS styling css = """ /* Modern color scheme */ :root { @@ -1503,11 +1382,10 @@ def create_modern_interface(): height: auto; border-radius: 0.375rem; transition: transform 0.2s; - max-width: 150px; /* Adjust as needed */ - max-height: 150px; /* Adjust as needed */ + max-width: 150px; + max-height: 150px; } """ - # Create interface with modern design with gr.Blocks(css=css, title="Advanced Data Processor & QR Generator") as interface: interface.head += """ """ with gr.Row(): - # Adjusted crawl depth slider to match the max_steps limit in the code crawl_depth_slider = gr.Slider( label="Crawl Depth", minimum=0, - maximum=10, # Changed max depth to 10 + maximum=10, value=0, step=1, interactive=True, - info="Select the maximum depth for crawling links (0-10)." # Updated info + info="Select the maximum depth for crawling links (0-10)." ) qr_code_paths = gr.State([]) - chatbot_data = gr.State(None) # State to hold the processed JSON data for the chatbot + chatbot_data = gr.State(None) gr.Markdown(""" # 🌐 Advanced Data Processing & QR Code Generator Transform your data into beautifully designed, sequenced QR codes with our cutting-edge processor. @@ -1560,7 +1435,7 @@ def create_modern_interface(): with gr.Tab("📁 File Input"): file_input = gr.File( label="Upload Files", - file_types=None, # Accept all file types + file_types=None, file_count="multiple" ) with gr.Tab("📋 JSON Input"): @@ -1583,7 +1458,6 @@ def create_modern_interface(): "🔄 Process & Generate QR", variant="primary" ) - # Output components output_json = gr.JSON(label="Processed Data") output_gallery = gr.Gallery( label="Generated QR Codes", @@ -1598,18 +1472,16 @@ def create_modern_interface(): with gr.Tab("🖼️ QR Code Viewport") as viewport_tab: viewport_output = gr.HTML(label="QR Code Sequence Viewport") - enabled_qr_codes = gr.State([]) # To store the enabled/disabled state + enabled_qr_codes = gr.State([]) with gr.Tab("🤖 Chat with Data") as chat_tab: - chat_history = gr.State([]) # State to hold chat history + chat_history = gr.State([]) chatbot = gr.Chatbot(label="Data Chatbot") with gr.Row(): chat_input = gr.Textbox(label="Your Message", placeholder="Ask me about the processed data...") send_msg_btn = gr.Button("Send") clear_chat_btn = gr.Button("Clear Chat History") - - # Load example data def load_example(): example = { "type": "product_catalog", @@ -1640,37 +1512,34 @@ def create_modern_interface(): return json.dumps(example, indent=2) def clear_input(): - return "", None, "", None # Clear url, files, text, and chatbot data state + return "", None, "", None def update_viewport(paths, enabled_states): if not paths: return "No QR codes generated yet.
" num_qr_codes = len(paths) - cols = math.ceil(math.sqrt(num_qr_codes)) # Calculate columns for a roughly square grid - cols = max(1, min(cols, 6)) # Limit max columns for small screens - # rows = math.ceil(num_qr_codes / cols) # Not used in HTML generation + cols = math.ceil(math.sqrt(num_qr_codes)) + cols = max(1, min(cols, 6)) - viewport_html = f'