Spaces:
Running
Running
import json | |
import os | |
import re | |
import time | |
import logging | |
import mimetypes | |
import zipfile | |
import tempfile | |
import chardet | |
import io | |
import csv | |
import xml.etree.ElementTree as ET | |
from datetime import datetime | |
from typing import List, Dict, Optional, Union, Tuple, Any | |
from pathlib import Path | |
from urllib.parse import urlparse, urljoin | |
import requests | |
import validators | |
import gradio as gr | |
from diskcache import Cache # Unused in provided code, kept for completeness | |
from bs4 import BeautifulSoup | |
from fake_useragent import UserAgent | |
from cleantext import clean # Unused in provided code, kept for completeness | |
import qrcode | |
from PIL import Image, ImageDraw, ImageFont # ImageFont may require pillow[extra] | |
import numpy as np # Unused in provided code, kept for completeness | |
import tarfile | |
import gzip | |
import math | |
import random | |
import pandas as pd | |
from requests.adapters import HTTPAdapter | |
from urllib3.util.retry import Retry | |
# Setup enhanced logging with more detailed formatting | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler('app.log', encoding='utf-8') | |
]) | |
logger = logging.getLogger(__name__) | |
# Conditional imports for document processing | |
try: | |
from PyPDF2 import PdfReader | |
PDF_SUPPORT = True | |
except ImportError: | |
PDF_SUPPORT = False | |
logger.warning("PyPDF2 not installed. PDF file processing will be limited.") | |
try: | |
from docx import Document | |
DOCX_SUPPORT = True | |
except ImportError: | |
DOCX_SUPPORT = False | |
logger.warning("python-docx not installed. DOCX file processing will be limited.") | |
try: | |
from pyth.plugins.plaintext.writer import PlaintextWriter | |
from pyth.plugins.rtf15.reader import Rtf15Reader | |
RTF_SUPPORT = True | |
except ImportError: | |
RTF_SUPPORT = False | |
logger.warning("pyth not installed. RTF file processing will be limited.") | |
try: | |
from odf.opendocument import OpenDocumentText | |
from odf import text as odftext | |
ODT_SUPPORT = True | |
except ImportError: | |
ODT_SUPPORT = False | |
logger.warning("odfpy not installed. ODT file processing will be limited.") | |
# Ensure output directories exist with modern structure | |
OUTPUTS_DIR = Path('output') | |
QR_CODES_DIR = OUTPUTS_DIR / 'qr_codes' | |
TEMP_DIR = OUTPUTS_DIR / 'temp' | |
for directory in [OUTPUTS_DIR, QR_CODES_DIR, TEMP_DIR]: | |
directory.mkdir(parents=True, exist_ok=True) | |
class EnhancedURLProcessor: | |
"""Advanced URL processing with enhanced content extraction and recursive link following.""" | |
def __init__(self): | |
# Use a real requests session with retry strategy | |
self.session = requests.Session() | |
retry_strategy = Retry( | |
total=3, | |
backoff_factor=1, | |
status_forcelist=[429, 500, 502, 503, 504], | |
allowed_methods=["HEAD", "GET"] | |
) | |
adapter = HTTPAdapter(max_retries=retry_strategy) | |
self.session.mount("http://", adapter) | |
self.session.mount("https://", adapter) | |
self.user_agent = UserAgent() | |
self.timeout = 15 # seconds | |
def validate_url(self, url: str) -> Dict[str, Any]: | |
"""Enhanced URL validation with accessibility check.""" | |
if not validators.url(url): | |
return {'is_valid': False, 'message': 'Invalid URL format', 'details': 'URL must begin with http:// or https://'} | |
parsed = urlparse(url) | |
if not all([parsed.scheme, parsed.netloc]): | |
return {'is_valid': False, 'message': 'Incomplete URL', 'details': 'Missing scheme or domain'} | |
try: | |
# Use a HEAD request to check accessibility without downloading full content | |
headers = {'User-Agent': self.user_agent.random} | |
response = self.session.head(url, timeout=self.timeout, headers=headers, allow_redirects=True) | |
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) | |
# Check content type if available in HEAD response | |
content_type = response.headers.get('Content-Type', '').split(';')[0].strip() | |
if not content_type or not (content_type.startswith('text/') or 'json' in content_type or 'xml' in content_type): | |
# Basic check if content type seems relevant for text extraction | |
logger.warning(f"URL {url} returned potentially irrelevant content type: {content_type}") | |
# Decide if this should invalidate the URL or just add a note | |
# For now, we'll allow fetching but add a note. | |
return { | |
'is_valid': True, | |
'message': 'URL is valid and accessible', | |
'details': { | |
'final_url': response.url, # Capture final URL after redirects | |
'content_type': content_type, | |
'server': response.headers.get('Server', 'N/A'), | |
'size': response.headers.get('Content-Length', 'N/A') | |
} | |
} | |
except requests.exceptions.RequestException as e: | |
return {'is_valid': False, 'message': 'URL not accessible', 'details': str(e)} | |
except Exception as e: | |
logger.error(f"Unexpected error during URL validation for {url}: {e}") | |
return {'is_valid': False, 'message': 'Unexpected validation error', 'details': str(e)} | |
def fetch_content(self, url: str, retry_count: int = 0) -> Optional[Dict[str, Any]]: | |
"""Enhanced content fetcher with retry mechanism and complete character extraction.""" | |
try: | |
logger.info(f"Fetching content from URL: {url} (Attempt {retry_count + 1})") | |
headers = {'User-Agent': self.user_agent.random} | |
response = self.session.get(url, timeout=self.timeout, headers=headers, allow_redirects=True) | |
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx) | |
final_url = response.url # Capture potential redirects | |
content_type = response.headers.get('Content-Type', '').split(';')[0].strip() | |
# Attempt to detect encoding if not specified in headers | |
encoding = response.encoding # requests attempts to guess encoding | |
if encoding is None or encoding == 'ISO-8859-1': # Fallback if requests guess is default/uncertain | |
try: | |
encoding_detection = chardet.detect(response.content) | |
encoding = encoding_detection['encoding'] or 'utf-8' | |
logger.debug(f"Chardet detected encoding: {encoding} for {url}") | |
except Exception as e: | |
logger.warning(f"Chardet detection failed for {url}: {e}. Falling back to utf-8.") | |
encoding = 'utf-8' | |
raw_content = response.content.decode(encoding, errors='replace') | |
# Extract metadata | |
metadata = { | |
'original_url': url, | |
'final_url': final_url, | |
'timestamp': datetime.now().isoformat(), | |
'detected_encoding': encoding, | |
'content_type': content_type, | |
'content_length': len(response.content), | |
'headers': dict(response.headers), | |
'status_code': response.status_code | |
} | |
# Process based on content type | |
processed_extraction = self._process_web_content(raw_content, metadata['content_type'], final_url) | |
return { | |
'source': 'url', | |
'url': url, # Keep original URL as identifier for this step | |
'raw_content': raw_content, | |
'metadata': metadata, | |
'extracted_data': processed_extraction['data'], | |
'processing_notes': processed_extraction['notes'] | |
} | |
except requests.exceptions.RequestException as e: | |
logger.error(f"Failed to fetch content from {url}: {e}") | |
return { | |
'source': 'url', | |
'url': url, | |
'raw_content': None, | |
'metadata': {'original_url': url, 'timestamp': datetime.now().isoformat(), 'status_code': getattr(e.response, 'status_code', None)}, | |
'extracted_data': None, | |
'processing_notes': [f"Failed to fetch content: {str(e)}"] | |
} | |
except Exception as e: | |
logger.error(f"Unexpected error while fetching or processing URL {url}: {e}") | |
return { | |
'source': 'url', | |
'url': url, | |
'raw_content': raw_content if 'raw_content' in locals() else None, | |
'metadata': metadata if 'metadata' in locals() else {'original_url': url, 'timestamp': datetime.now().isoformat(), 'status_code': None}, | |
'extracted_data': None, | |
'processing_notes': [f"Unexpected processing error: {str(e)}"] | |
} | |
def _process_web_content(self, content: str, content_type: str, base_url: str) -> Dict[str, Any]: | |
"""Process content based on detected content type""" | |
lower_content_type = content_type.lower() | |
notes = [] | |
extracted_data: Any = None | |
try: | |
if 'text/html' in lower_content_type: | |
logger.debug(f"Processing HTML content from {base_url}") | |
extracted_data = self._process_html_content_enhanced(content, base_url) | |
notes.append("Processed as HTML") | |
elif 'application/json' in lower_content_type or 'text/json' in lower_content_type: | |
logger.debug(f"Processing JSON content from {base_url}") | |
try: | |
extracted_data = json.loads(content) | |
notes.append("Parsed as JSON") | |
except json.JSONDecodeError as e: | |
extracted_data = content | |
notes.append(f"Failed to parse as JSON: {e}") | |
logger.warning(f"Failed to parse JSON from {base_url}: {e}") | |
except Exception as e: | |
extracted_data = content | |
notes.append(f"Error processing JSON: {e}") | |
logger.error(f"Error processing JSON from {base_url}: {e}") | |
elif 'application/xml' in lower_content_type or 'text/xml' in lower_content_type or lower_content_type.endswith('+xml'): | |
logger.debug(f"Processing XML content from {base_url}") | |
try: | |
root = ET.fromstring(content) | |
xml_text = ET.tostring(root, encoding='unicode', method='xml') | |
extracted_data = xml_text | |
notes.append("Parsed as XML (text representation)") | |
except ET.ParseError as e: | |
extracted_data = content | |
notes.append(f"Failed to parse as XML: {e}") | |
logger.warning(f"Failed to parse XML from {base_url}: {e}") | |
except Exception as e: | |
extracted_data = content | |
notes.append(f"Error processing XML: {e}") | |
logger.error(f"Error processing XML from {base_url}: {e}") | |
elif 'text/plain' in lower_content_type or 'text/' in lower_content_type: | |
logger.debug(f"Processing Plain Text content from {base_url}") | |
extracted_data = content | |
notes.append("Processed as Plain Text") | |
else: | |
logger.debug(f"Unknown content type '{content_type}' from {base_url}. Storing raw content.") | |
extracted_data = content | |
notes.append(f"Unknown content type '{content_type}'. Stored raw text.") | |
except Exception as e: | |
logger.error(f"Unexpected error in _process_web_content for {base_url} ({content_type}): {e}") | |
extracted_data = content | |
notes.append(f"Unexpected processing error: {e}. Stored raw text.") | |
return {'data': extracted_data, 'notes': notes} | |
def _process_html_content_enhanced(self, content: str, base_url: str) -> Dict[str, Any]: | |
"""Process HTML content, preserving text, and extracting metadata and links.""" | |
extracted: Dict[str, Any] = { | |
'title': None, | |
'meta_description': None, | |
'full_text': "", | |
'links': [] | |
} | |
try: | |
soup = BeautifulSoup(content, 'html.parser') | |
if soup.title and soup.title.string: | |
extracted['title'] = soup.title.string.strip() | |
meta_desc = soup.find('meta', attrs={'name': 'description'}) | |
if meta_desc and meta_desc.get('content'): | |
extracted['meta_description'] = meta_desc['content'].strip() | |
unique_links = set() | |
for a_tag in soup.find_all('a', href=True): | |
href = a_tag['href'].strip() | |
if href and not href.startswith(('#', 'mailto:', 'tel:', 'javascript:')): | |
text = a_tag.get_text().strip() | |
try: | |
absolute_url = urljoin(base_url, href) | |
if absolute_url not in unique_links: | |
extracted['links'].append({'text': text, 'url': absolute_url}) | |
unique_links.add(absolute_url) | |
except Exception: | |
if validators.url(href) and href not in unique_links: | |
extracted['links'].append({'text': text, 'url': href}) | |
unique_links.add(href) | |
elif urlparse(href).netloc and href not in unique_links: | |
extracted['links'].append({'text': text, 'url': href}) | |
unique_links.add(href) | |
soup_copy = BeautifulSoup(content, 'html.parser') | |
for script_or_style in soup_copy(["script", "style"]): | |
script_or_style.extract() | |
text = soup_copy.get_text(separator='\n') | |
lines = text.splitlines() | |
cleaned_lines = [line.strip() for line in lines if line.strip()] | |
extracted['full_text'] = '\n'.join(cleaned_lines) | |
except Exception as e: | |
logger.error(f"Enhanced HTML processing error for {base_url}: {e}") | |
soup_copy = BeautifulSoup(content, 'html.parser') | |
for script_or_style in soup_copy(["script", "style"]): | |
script_or_style.extract() | |
extracted['full_text'] = soup_copy.get_text(separator='\n').strip() | |
extracted['processing_error'] = f"Enhanced HTML processing failed: {e}" | |
return extracted | |
def fetch_content_with_depth(self, url: str, max_steps: int = 0) -> Dict[str, Any]: | |
"""Fetches content from a URL and recursively follows links up to max_steps depth.""" | |
if not isinstance(max_steps, int) or not (0 <= max_steps <= 10): | |
logger.error(f"Invalid max_steps value: {max_steps}. Must be an integer between 0 and 10.") | |
return { | |
'url': url, | |
'level': 0, | |
'fetch_result': None, | |
'linked_extractions': [], | |
'processing_notes': [f"Invalid max_steps value: {max_steps}. Must be an integer between 0 and 10."] | |
} | |
validation_result = self.validate_url(url) | |
if not validation_result['is_valid']: | |
logger.error(f"Initial URL validation failed for {url}: {validation_result['message']}") | |
return { | |
'url': url, | |
'level': 0, | |
'fetch_result': None, | |
'linked_extractions': [], | |
'processing_notes': [f"Initial URL validation failed: {validation_result['message']}"] | |
} | |
# Use a set to keep track of visited URLs during the crawl to avoid infinite loops | |
visited_urls = set() | |
return self._fetch_content_recursive(url, max_steps, current_step=0, visited_urls=visited_urls) | |
def _fetch_content_recursive(self, url: str, max_steps: int, current_step: int, visited_urls: set) -> Dict[str, Any]: | |
"""Recursive helper function to fetch content and follow links.""" | |
if current_step > max_steps: | |
logger.debug(f"Depth limit ({max_steps}) reached for {url} at level {current_step}.") | |
return { | |
'url': url, | |
'level': current_step, | |
'fetch_result': None, | |
'linked_extractions': [], | |
'processing_notes': [f"Depth limit ({max_steps}) reached."] | |
} | |
# Normalize URL before checking visited set | |
normalized_url = url.rstrip('/') # Simple normalization | |
if normalized_url in visited_urls: | |
logger.debug(f"Skipping already visited URL: {url} at level {current_step}.") | |
return { | |
'url': url, | |
'level': current_step, | |
'fetch_result': None, # Indicate not fetched in this run | |
'linked_extractions': [], | |
'processing_notes': ["URL already visited in this crawl."] | |
} | |
visited_urls.add(normalized_url) # Mark as visited | |
logger.info(f"Processing URL: {url} at level {current_step}/{max_steps}") | |
fetch_result = self.fetch_content(url) | |
linked_extractions: List[Dict[str, Any]] = [] | |
if fetch_result and fetch_result.get('extracted_data') and 'text/html' in fetch_result.get('metadata', {}).get('content_type', '').lower(): | |
extracted_data = fetch_result['extracted_data'] | |
links = extracted_data.get('links', []) | |
logger.info(f"Found {len(links)} potential links on {url} at level {current_step}. Proceeding to depth {current_step + 1}.") | |
if current_step < max_steps: | |
for link_info in links: | |
linked_url = link_info.get('url') | |
if linked_url: | |
# Ensure linked URL is absolute and potentially within the same domain | |
# Simple same-domain check (can be made more sophisticated) | |
try: | |
base_domain = urlparse(url).netloc | |
linked_domain = urlparse(linked_url).netloc | |
# Allow processing if domains match OR if linked_domain is empty (relative link) | |
if linked_domain and linked_domain != base_domain: | |
logger.debug(f"Skipping external link: {linked_url}") | |
continue # Skip external links | |
# Recursively call for linked URLs | |
linked_result = self._fetch_content_recursive(linked_url, max_steps, current_step + 1, visited_urls) | |
if linked_result: | |
linked_extractions.append(linked_result) | |
except Exception as e: | |
logger.warning(f"Error processing linked URL {linked_url} from {url}: {e}") | |
current_notes = fetch_result.get('processing_notes', []) if fetch_result else ['Fetch failed.'] | |
if fetch_result and fetch_result.get('fetch_result') is not None: # Only add level note if fetch was attempted | |
if f"Processed at level {current_step}" not in current_notes: | |
current_notes.append(f"Processed at level {current_step}") | |
return { | |
'url': url, | |
'level': current_step, | |
'fetch_result': fetch_result, | |
'linked_extractions': linked_extractions, | |
'processing_notes': current_notes | |
} | |
class EnhancedFileProcessor: | |
"""Advanced file processing with enhanced content extraction""" | |
def __init__(self, max_file_size: int = 5 * 1024 * 1024 * 1024): # 5GB default | |
self.max_file_size = max_file_size | |
self.supported_extensions = { | |
'.txt', '.md', '.csv', '.json', '.xml', '.html', '.htm', | |
'.log', '.yml', '.yaml', '.ini', '.conf', '.cfg', | |
'.pdf', '.doc', '.docx', '.rtf', '.odt', | |
'.zip', '.tar', '.gz', '.bz2', '.7z', '.rar', | |
} | |
self.archive_extensions = {'.zip', '.tar', '.gz', '.bz2', '.7z', '.rar'} | |
def process_file(self, file) -> List[Dict]: | |
"""Process uploaded file with enhanced error handling and complete extraction""" | |
if not file or not hasattr(file, 'name'): | |
logger.warning("Received invalid file object.") | |
return [] | |
dataset = [] | |
# Gradio file object has a 'name' attribute which is the temporary path | |
file_path = Path(file.name) | |
if not file_path.exists(): | |
logger.error(f"File path does not exist: {file_path}") | |
return [{ | |
'source': 'file', | |
'filename': file.name if hasattr(file, 'name') else 'unknown', | |
'file_size': None, | |
'extracted_data': None, | |
'processing_notes': ['File path does not exist.'] | |
}] | |
try: | |
file_size = file_path.stat().st_size | |
if file_size > self.max_file_size: | |
logger.warning(f"File '{file_path.name}' size ({file_size} bytes) exceeds maximum allowed size ({self.max_file_size} bytes).") | |
return [{ | |
'source': 'file', | |
'filename': file_path.name, | |
'file_size': file_size, | |
'extracted_data': None, | |
'processing_notes': ['File size exceeds limit.'] | |
}] | |
# Use a temporary directory for archive extraction | |
with tempfile.TemporaryDirectory() as temp_dir: | |
temp_dir_path = Path(temp_dir) | |
if file_path.suffix.lower() in self.archive_extensions: | |
dataset.extend(self._process_archive(file_path, temp_dir_path)) | |
elif file_path.suffix.lower() in self.supported_extensions: | |
dataset.extend(self._process_single_file(file_path)) | |
else: | |
logger.warning(f"Unsupported file type for processing: '{file_path.name}'. Attempting to read as plain text.") | |
try: | |
content_bytes = file_path.read_bytes() | |
encoding_detection = chardet.detect(content_bytes) | |
encoding = encoding_detection['encoding'] or 'utf-8' | |
raw_content = content_bytes.decode(encoding, errors='replace') | |
dataset.append({ | |
'source': 'file', | |
'filename': file_path.name, | |
'file_size': file_size, | |
'mime_type': mimetypes.guess_type(file_path.name)[0] or 'unknown/unknown', | |
'extracted_data': {'plain_text': raw_content}, | |
'processing_notes': ['Processed as plain text (unsupported extension).'] | |
}) | |
except Exception as e: | |
logger.error(f"Error reading or processing unsupported file '{file_path.name}' as text: {e}") | |
dataset.append({ | |
'source': 'file', | |
'filename': file_path.name, | |
'file_size': file_size, | |
'mime_type': mimetypes.guess_type(file_path.name)[0] or 'unknown/unknown', | |
'extracted_data': None, | |
'processing_notes': [f'Unsupported file type and failed to read as text: {e}'] | |
}) | |
except Exception as e: | |
logger.error(f"Error processing file '{file_path.name}': {str(e)}") | |
dataset.append({ | |
'source': 'file', | |
'filename': file_path.name, | |
'file_size': file_size if 'file_size' in locals() else None, | |
'extracted_data': None, | |
'processing_notes': [f'Overall file processing error: {str(e)}'] | |
}) | |
return dataset | |
def _is_archive(self, filepath: Union[str, Path]) -> bool: | |
"""Check if file is an archive""" | |
p = Path(filepath) if isinstance(filepath, str) else filepath | |
return p.suffix.lower() in self.archive_extensions | |
def _process_single_file(self, file_path: Path) -> List[Dict]: | |
"""Process a single file with enhanced character extraction and format-specific handling""" | |
dataset_entries = [] | |
filename = file_path.name | |
file_size = file_path.stat().st_size | |
mime_type, _ = mimetypes.guess_type(file_path) | |
mime_type = mime_type or 'unknown/unknown' | |
file_extension = file_path.suffix.lower() | |
logger.info(f"Processing single file: '{filename}' ({mime_type}, {file_size} bytes)") | |
raw_content: Optional[str] = None | |
extracted_data: Any = None | |
processing_notes: List[str] = [] | |
try: | |
content_bytes = file_path.read_bytes() | |
encoding_detection = chardet.detect(content_bytes) | |
encoding = encoding_detection['encoding'] or 'utf-8' | |
raw_content = content_bytes.decode(encoding, errors='replace') | |
is_explicit_json = mime_type == 'application/json' or file_extension == '.json' | |
looks_like_json = raw_content.strip().startswith('{') or raw_content.strip().startswith('[') | |
if is_explicit_json or looks_like_json: | |
try: | |
extracted_data = json.loads(raw_content) | |
processing_notes.append("Parsed as JSON.") | |
if not is_explicit_json: | |
processing_notes.append("Note: Content looked like JSON despite extension/mime.") | |
logger.warning(f"File '{filename}' identified as JSON content despite extension/mime.") | |
mime_type = 'application/json' | |
except json.JSONDecodeError as e: | |
processing_notes.append(f"Failed to parse as JSON: {e}.") | |
if is_explicit_json: | |
logger.error(f"Explicit JSON file '{filename}' has invalid format: {e}") | |
else: | |
logger.warning(f"Content of '{filename}' looks like JSON but failed to parse: {e}") | |
except Exception as e: | |
processing_notes.append(f"Error processing JSON: {e}.") | |
logger.error(f"Error processing JSON in '{filename}': {e}") | |
looks_like_xml = extracted_data is None and raw_content.strip().startswith('<') and raw_content.strip().endswith('>') | |
is_explicit_xml = extracted_data is None and (mime_type in ('application/xml', 'text/xml') or mime_type.endswith('+xml') or file_extension in ('.xml', '.xsd')) | |
if extracted_data is None and (is_explicit_xml or looks_like_xml): | |
try: | |
root = ET.fromstring(raw_content) | |
extracted_data = ET.tostring(root, encoding='unicode', method='xml') | |
processing_notes.append("Parsed as XML (text representation).") | |
if not is_explicit_xml: | |
processing_notes.append("Note: Content looked like XML despite extension/mime.") | |
if 'xml' not in mime_type: mime_type = 'application/xml' | |
except ET.ParseError as e: | |
processing_notes.append(f"Failed to parse as XML: {e}.") | |
if is_explicit_xml: | |
logger.error(f"Explicit XML file '{filename}' has invalid format: {e}") | |
else: | |
logger.warning(f"Content of '{filename}' looks like XML but failed to parse: {e}") | |
except Exception as e: | |
processing_notes.append(f"Error processing XML: {e}.") | |
logger.error(f"Error processing XML in '{filename}': {e}") | |
is_explicit_csv = extracted_data is None and (mime_type == 'text/csv' or file_extension == '.csv') | |
looks_like_csv = extracted_data is None and (',' in raw_content or ';' in raw_content) and ('\n' in raw_content or len(raw_content.splitlines()) > 1) | |
if extracted_data is None and (is_explicit_csv or looks_like_csv): | |
try: | |
dialect = 'excel' | |
try: | |
sample = '\n'.join(raw_content.splitlines()[:10]) | |
if sample: | |
dialect = csv.Sniffer().sniff(sample).name | |
logger.debug(f"Sniffer detected CSV dialect: {dialect} for '{filename}'") | |
except csv.Error: | |
logger.debug(f"Sniffer failed to detect dialect for '{filename}', using 'excel'.") | |
dialect = 'excel' | |
csv_reader = csv.reader(io.StringIO(raw_content), dialect=dialect) | |
rows = list(csv_reader) | |
if rows: | |
max_rows_preview = 100 | |
extracted_data = { | |
'headers': rows[0] if rows and rows[0] else None, | |
'rows': rows[1:max_rows_preview+1] if len(rows) > 1 else [] | |
} | |
if len(rows) > max_rows_preview + 1: | |
processing_notes.append(f"CSV data rows truncated to {max_rows_preview}.") | |
processing_notes.append("Parsed as CSV.") | |
if not is_explicit_csv: | |
processing_notes.append("Note: Content looked like CSV despite extension/mime.") | |
mime_type = 'text/csv' | |
else: | |
extracted_data = "Empty CSV" | |
processing_notes.append("Parsed as empty CSV.") | |
if not is_explicit_csv: | |
processing_notes.append("Note: Content looked like CSV but was empty.") | |
except Exception as e: | |
processing_notes.append(f"Failed to parse as CSV: {e}.") | |
logger.warning(f"Failed to parse CSV from '{filename}': {e}") | |
if extracted_data is None: | |
try: | |
extracted_text = None | |
if file_extension == '.pdf' and PDF_SUPPORT: | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp_file: | |
tmp_file.write(content_bytes) | |
temp_path = Path(tmp_file.name) | |
try: | |
reader = PdfReader(temp_path) | |
text_content = "".join(page.extract_text() or "" for page in reader.pages) | |
extracted_text = text_content | |
processing_notes.append("Extracted text from PDF.") | |
finally: | |
if temp_path.exists(): temp_path.unlink() | |
elif file_extension == '.docx' and DOCX_SUPPORT: | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.docx') as tmp_file: | |
tmp_file.write(content_bytes) | |
temp_path = Path(tmp_file.name) | |
try: | |
document = Document(temp_path) | |
text_content = "\n".join(paragraph.text for paragraph in document.paragraphs) | |
extracted_text = text_content | |
processing_notes.append("Extracted text from DOCX.") | |
finally: | |
if temp_path.exists(): temp_path.unlink() | |
elif file_extension == '.rtf' and RTF_SUPPORT: | |
try: | |
# Need to read RTF content as text, not bytes, for pyth's Rtf15Reader | |
doc = Rtf15Reader.read(io.StringIO(raw_content)) | |
text_content = PlaintextWriter.write(doc).getvalue() | |
extracted_text = text_content | |
processing_notes.append("Extracted text from RTF.") | |
except Exception as e: | |
processing_notes.append(f"RTF extraction error: {e}") | |
logger.warning(f"Failed to extract RTF text from '{filename}': {e}") | |
elif file_extension == '.odt' and ODT_SUPPORT: | |
with tempfile.NamedTemporaryFile(delete=False, suffix='.odt') as tmp_file: | |
tmp_file.write(content_bytes) | |
temp_path = Path(tmp_file.name) | |
try: | |
text_doc = OpenDocumentText(temp_path) | |
paragraphs = text_doc.getElementsByType(odftext.P) | |
text_content = "\n".join("".join(node.text for node in p.childNodes) for p in paragraphs) | |
extracted_text = text_content | |
processing_notes.append("Extracted text from ODT.") | |
finally: | |
if temp_path.exists(): temp_path.unlink() | |
elif file_extension in ['.doc', '.ppt', '.pptx', '.xls', '.xlsx']: | |
processing_notes.append(f"Automatic text extraction for {file_extension.upper()} not fully implemented.") | |
logger.warning(f"Automatic text extraction for {file_extension.upper()} not fully implemented for '{filename}'.") | |
if extracted_text is not None: | |
max_extracted_text_size = 10000 | |
extracted_data = {'text': extracted_text[:max_extracted_text_size]} | |
if len(extracted_text) > max_extracted_text_size: | |
extracted_data['text'] += "..." | |
processing_notes.append("Extracted text truncated.") | |
except ImportError as e: | |
processing_notes.append(f"Missing dependency for document type ({e}). Cannot extract text.") | |
except Exception as e: | |
processing_notes.append(f"Error during document text extraction: {e}") | |
logger.warning(f"Error during document text extraction for '{filename}': {e}") | |
if extracted_data is None: | |
extracted_data = {'plain_text': raw_content} | |
processing_notes.append("Stored as plain text.") | |
if mime_type in ['unknown/unknown', 'application/octet-stream']: | |
guessed_text_mime, _ = mimetypes.guess_type('dummy.txt') | |
if guessed_text_mime: mime_type = guessed_text_mime | |
except Exception as e: | |
logger.error(f"Fatal error processing single file '{filename}': {e}") | |
processing_notes.append(f"Fatal processing error: {e}") | |
raw_content = None | |
extracted_data = None | |
entry = { | |
'source': 'file', | |
'filename': filename, | |
'file_size': file_size, | |
'mime_type': mime_type, | |
'created': datetime.fromtimestamp(file_path.stat().st_ctime).isoformat() if file_path.exists() else None, | |
'modified': datetime.fromtimestamp(file_path.stat().st_mtime).isoformat() if file_path.exists() else None, | |
'raw_content': raw_content, | |
'extracted_data': extracted_data, | |
'processing_notes': processing_notes | |
} | |
dataset_entries.append(entry) | |
return dataset_entries | |
def _process_archive(self, archive_path: Path, extract_to: Path) -> List[Dict]: | |
"""Process an archive file with enhanced extraction""" | |
dataset = [] | |
archive_extension = archive_path.suffix.lower() | |
logger.info(f"Processing archive: '{archive_path.name}'") | |
try: | |
if archive_extension == '.zip': | |
if zipfile.is_zipfile(archive_path): | |
with zipfile.ZipFile(archive_path, 'r') as zip_ref: | |
for file_info in zip_ref.infolist(): | |
# Prevent Zip Slip vulnerability | |
sanitized_filename = Path(file_info.filename).name # Takes only the base name | |
extracted_file_path = extract_to / sanitized_filename | |
if file_info.file_size > 0 and not file_info.filename.endswith('/'): | |
try: | |
# Use extract method with path to temp_dir for safety | |
zip_ref.extract(file_info, path=extract_to) | |
extracted_file_path = extract_to / file_info.filename # Get the actual extracted path | |
if extracted_file_path.suffix.lower() in self.supported_extensions and not self._is_archive(extracted_file_path): | |
dataset.extend(self._process_single_file(extracted_file_path)) | |
elif extracted_file_path.suffix.lower() in self.archive_extensions: | |
logger.info(f"Found nested archive '{file_info.filename}', processing recursively.") | |
dataset.extend(self._process_archive(extracted_file_path, extract_to)) | |
else: | |
logger.debug(f"Skipping unsupported file in archive: '{file_info.filename}'") | |
except Exception as e: | |
logger.warning(f"Error extracting/processing file '{file_info.filename}' from zip '{archive_path.name}': {e}") | |
finally: | |
# Clean up the extracted file immediately | |
if extracted_file_path.exists(): | |
try: | |
extracted_file_path.unlink() | |
except OSError as e: | |
logger.warning(f"Failed to clean up extracted file {extracted_file_path}: {e}") | |
else: | |
logger.error(f"'{archive_path.name}' is not a valid zip file.") | |
elif archive_extension in ('.tar', '.gz', '.tgz'): # .tgz is often tar.gz | |
try: | |
mode = 'r' | |
if archive_extension in ('.tar.gz', '.tgz'): mode = 'r:gz' # Handle .tar.gz and .tgz | |
with tarfile.open(archive_path, mode) as tar_ref: | |
for member in tar_ref.getmembers(): | |
if member.isfile(): | |
# Prevent Tar Slip vulnerability | |
sanitized_filename = Path(member.name).name # Takes only the base name | |
extracted_file_path = extract_to / sanitized_filename | |
try: | |
# Use extractfile method and write manually for better control/safety | |
member_file = tar_ref.extractfile(member) | |
if member_file: | |
with open(extracted_file_path, 'wb') as outfile: | |
outfile.write(member_file.read()) | |
member_file.close() # Close the BytesIO object | |
if extracted_file_path.suffix.lower() in self.supported_extensions and not self._is_archive(extracted_file_path): | |
dataset.extend(self._process_single_file(extracted_file_path)) | |
elif extracted_file_path.suffix.lower() in self.archive_extensions: | |
logger.info(f"Found nested archive '{member.name}', processing recursively.") | |
dataset.extend(self._process_archive(extracted_file_path, extract_to)) | |
else: | |
logger.warning(f"Could not get file-like object for {member.name} from tar.") | |
except Exception as e: | |
logger.warning(f"Error extracting/processing file '{member.name}' from tar '{archive_path.name}': {e}") | |
finally: | |
# Clean up the extracted file immediately | |
if extracted_file_path.exists(): | |
try: | |
extracted_file_path.unlink() | |
except OSError as e: | |
logger.warning(f"Failed to clean up extracted file {extracted_file_path}: {e}") | |
except tarfile.TarError as e: | |
logger.error(f"Error processing TAR archive '{archive_path.name}': {e}") | |
elif archive_extension == '.gz': # Handle standalone .gz (single file compression) | |
extracted_name = archive_path.stem # Get filename without .gz | |
extracted_path = extract_to / extracted_name | |
try: | |
with gzip.open(archive_path, 'rb') as gz_file, open(extracted_path, 'wb') as outfile: | |
outfile.write(gz_file.read()) | |
# Process the extracted file | |
if extracted_path.suffix.lower() in self.supported_extensions and not self._is_archive(extracted_path): | |
dataset.extend(self._process_single_file(extracted_path)) | |
elif extracted_path.suffix.lower() in self.archive_extensions: | |
logger.info(f"Found nested archive '{extracted_name}', processing recursively.") | |
dataset.extend(self._process_archive(extracted_path, extract_to)) | |
else: | |
logger.debug(f"Skipping unsupported file (from gz): '{extracted_name}'") | |
except gzip.BadGzipFile as e: | |
logger.error(f"Error processing GZIP file '{archive_path.name}': {e}") | |
except Exception as e: | |
logger.error(f"Error extracting/processing from GZIP '{archive_path.name}': {e}") | |
finally: | |
# Clean up the extracted file immediately | |
if extracted_path.exists(): | |
try: | |
extracted_path.unlink() | |
except OSError as e: | |
logger.warning(f"Failed to clean up extracted file {extracted_path}: {e}") | |
elif archive_extension in ('.bz2', '.7z', '.rar'): | |
logger.warning(f"Support for {archive_extension} archives is not yet fully implemented and requires external tools/libraries.") | |
except Exception as e: | |
logger.error(f"Overall archive processing error for '{archive_path.name}': {e}") | |
return dataset | |
def chunk_data(self, data: Union[Dict, List], max_size: int = 2953) -> List[Dict]: | |
"""Enhanced data chunking with sequence metadata""" | |
try: | |
# Ensure data is a list of items for consistent chunking | |
if not isinstance(data, list): | |
logger.warning("Data for chunking is not a list. Wrapping it in a list.") | |
data_list = [data] | |
else: | |
data_list = data | |
# JSON dump the entire list first | |
json_str = json.dumps(data_list, ensure_ascii=False, separators=(',', ':')) | |
total_length = len(json_str) | |
# Estimate overhead for metadata + some buffer | |
# Example metadata: {"idx":0,"tc":1,"tl":1000,"hash":1234567890,"data":"..."} | |
# A rough estimate of the metadata string length | |
# Assuming max 5 digits for idx/tc, 10 for tl, 10 for hash, plus keys, colons, commas, quotes | |
# {"idx":NNNNN,"tc":NNNNN,"tl":NNNNNNNNNN,"hash":NNNNNNNNNN,"data":""} | |
# ~ 7 + 5 + 6 + 5 + 6 + 10 + 7 + 10 + 7 + 0 + 2 + 4*3 (commas/colons) + 2*2 (quotes) = ~ 80-100 characters | |
# Let's use a slightly safer estimate | |
overhead_estimate = len(json.dumps({"idx": 99999, "tc": 99999, "tl": 9999999999, "hash": 9999999999, "data": ""}, separators=(',', ':'))) + 50 # Add buffer | |
# Max QR code capacity for alphanumeric is higher than byte/binary. | |
# Max size 2953 is for bytes. For alphanumeric, it's 4296. | |
# We are encoding JSON (mostly alphanumeric, but can contain non-ASCII). | |
# Using byte capacity (2953) is safer. Let's stick to 2953 as the max_size input. | |
effective_chunk_size = max_size - overhead_estimate | |
if effective_chunk_size <= 0: | |
logger.error(f"Max QR size ({max_size}) is too small for metadata overhead ({overhead_estimate}). Cannot chunk.") | |
return [] | |
if total_length <= effective_chunk_size: | |
# Single chunk case | |
chunk_data = json_str | |
chunk = { | |
"idx": 0, | |
"tc": 1, | |
"tl": total_length, | |
"hash": hash(chunk_data) & 0xFFFFFFFF, # Use a simple hash | |
"data": chunk_data | |
} | |
return [chunk] | |
# Multi-chunk case | |
num_chunks = math.ceil(total_length / effective_chunk_size) | |
chunks = [] | |
current_pos = 0 | |
for i in range(num_chunks): | |
end_pos = min(current_pos + effective_chunk_size, total_length) | |
chunk_data_str = json_str[current_pos:end_pos] | |
chunk = { | |
"idx": i, | |
"tc": num_chunks, | |
"tl": total_length, | |
"hash": hash(chunk_data_str) & 0xFFFFFFFF, # Hash each chunk | |
"data": chunk_data_str | |
} | |
chunks.append(chunk) | |
current_pos = end_pos | |
if current_pos < total_length: | |
logger.error(f"Chunking logic error: Only processed {current_pos} of {total_length} characters.") | |
# This should not happen with ceil and min, but as a safeguard | |
return [] # Indicate failure | |
logger.info(f"Chunked data into {num_chunks} chunks for QR codes.") | |
return chunks | |
except Exception as e: | |
logger.error(f"Error chunking data: {e}") | |
return [] | |
def generate_stylish_qr(data: Union[str, Dict], | |
filename: str, | |
size: int = 10, | |
border: int = 4, | |
fill_color: str = "#000000", | |
back_color: str = "#FFFFFF") -> str: | |
"""Generate a stylish QR code with enhanced visual appeal""" | |
try: | |
qr = qrcode.QRCode( | |
version=None, # Let the library determine the optimal version | |
error_correction=qrcode.constants.ERROR_CORRECT_M, # Medium error correction | |
box_size=size, | |
border=border | |
) | |
# Data to encode should be a string, typically the JSON chunk | |
if isinstance(data, dict): | |
# Ensure it's dumped to a string if it's a dict chunk | |
data_to_encode = json.dumps(data, ensure_ascii=False, separators=(',', ':')) | |
else: | |
# Assume it's already the string data chunk | |
data_to_encode = str(data) | |
qr.add_data(data_to_encode) | |
qr.make(fit=True) # Fit the QR code size to the data | |
qr_image = qr.make_image(fill_color=fill_color, back_color=back_color) | |
# qr_image = qr_image.convert('RGBA') # Conversion might not be needed for simple fill/back colors | |
# Optional: Add a simple gradient overlay for style (can be resource intensive) | |
# try: | |
# gradient = Image.new('RGBA', qr_image.size, (0, 0, 0, 0)) | |
# draw = ImageDraw.Draw(gradient) | |
# # Example: slight horizontal fade | |
# for i in range(qr_image.width): | |
# alpha = int(255 * (i/qr_image.width) * 0.05) # 5% fade | |
# draw.line([(i, 0), (i, qr_image.height)], fill=(0, 0, 0, alpha)) | |
# final_image = Image.alpha_composite(qr_image, gradient) | |
# except Exception as e: | |
# logger.warning(f"Failed to add gradient overlay to QR code: {e}. Using plain QR.") | |
# final_image = qr_image | |
# Using the plain image for simplicity and performance unless gradient is crucial | |
final_image = qr_image | |
output_path = QR_CODES_DIR / filename | |
# Use PNG for lossless quality, 90 quality is for JPEGs but harmless here | |
final_image.save(output_path, format='PNG') | |
return str(output_path) | |
except Exception as e: | |
logger.error(f"QR generation error: {e}") | |
return "" | |
def generate_qr_codes(data: List[Dict], combined: bool = True) -> List[str]: | |
"""Generate QR codes with enhanced visual appeal and metadata""" | |
# Ensure data is a list of dictionaries as expected | |
if not isinstance(data, list): | |
logger.error("generate_qr_codes received data that is not a list.") | |
return [] | |
if not all(isinstance(item, dict) for item in data): | |
logger.error("generate_qr_codes received a list containing non-dictionary items.") | |
return [] | |
try: | |
file_processor = EnhancedFileProcessor() # Use the processor for chunking | |
paths = [] | |
if combined: | |
# Chunk the entire list of data dictionaries | |
chunks = file_processor.chunk_data(data) | |
if not chunks: | |
logger.warning("No chunks generated for combined data.") | |
return [] | |
for i, chunk in enumerate(chunks): | |
# Filename includes chunk index and total chunks | |
filename = f'combined_qr_{int(time.time())}_{i+1}_of_{len(chunks)}.png' | |
qr_path = generate_stylish_qr( | |
data=chunk, # Pass the chunk dictionary | |
filename=filename, | |
fill_color="#1a365d", | |
back_color="#ffffff" | |
) | |
if qr_path: | |
paths.append(qr_path) | |
else: | |
logger.warning(f"Failed to generate QR for combined chunk {i+1}/{len(chunks)}.") | |
else: | |
# Chunk each item individually | |
if data: | |
for idx, item in enumerate(data): | |
# Chunk the single item (wrapped in a list for chunk_data consistency) | |
chunks = file_processor.chunk_data([item]) # Pass item as a list | |
if not chunks: | |
logger.warning(f"No chunks generated for item {idx+1}.") | |
continue | |
for chunk_idx, chunk in enumerate(chunks): | |
# Filename includes item index, chunk index, and total chunks for this item | |
filename = f'item_{idx+1}_chunk_{chunk_idx+1}_of_{len(chunks)}_{int(time.time())}.png' | |
qr_path = generate_stylish_qr( | |
data=chunk, # Pass the chunk dictionary | |
filename=filename, | |
fill_color="#1a365d", | |
back_color="#ffffff" | |
) | |
if qr_path: | |
paths.append(qr_path) | |
else: | |
logger.warning(f"Failed to generate QR for item {idx+1} chunk {chunk_idx+1}/{len(chunks)}.") | |
else: | |
logger.warning("No items in data list to process individually for QR codes.") | |
logger.info(f"Generated {len(paths)} QR codes.") | |
return paths | |
except Exception as e: | |
logger.error(f"Error generating QR codes: {e}") | |
return [] | |
# --- Chatbot Logic --- | |
def respond_to_chat( | |
message: str, | |
chat_history: List[Tuple[str, str]], | |
chatbot_data: Optional[List[Dict]], | |
current_filtered_df_state: Optional[pd.DataFrame] | |
) -> Tuple[List[Tuple[str, str]], Optional[List[Dict]], Optional[pd.DataFrame]]: | |
""" | |
Responds to user chat messages based on the loaded JSON data. | |
Manages and returns the state of the filtered DataFrame. | |
""" | |
# Initialize chat_history if it's None (Gradio might pass None initially) | |
if chat_history is None: | |
chat_history = [] | |
if chatbot_data is None or not chatbot_data: | |
chat_history.append((message, "Please process some data first using the other tabs before chatting.")) | |
return chat_history, chatbot_data, current_filtered_df_state # Return existing state | |
# Append user message to history immediately | |
chat_history.append((message, None)) # Use None as a placeholder for the assistant's response | |
response = "" | |
lower_message = message.lower().strip() | |
# Initialize new_filtered_df_state with the current state to preserve it unless a filter changes it | |
new_filtered_df_state = current_filtered_df_state | |
df = None | |
try: | |
# Attempt to create a DataFrame from the full chatbot_data for analysis | |
# This flattens the structure for easier querying with pandas | |
flat_data = [] | |
def flatten_item(d, parent_key='', sep='_'): | |
items = {} | |
if isinstance(d, dict): | |
for k, v in d.items(): | |
new_key = parent_key + sep + k if parent_key else k | |
if isinstance(v, (dict, list)): | |
# Recursively flatten nested dicts/lists | |
nested_items = flatten_item(v, new_key, sep=sep) | |
items.update(nested_items) | |
else: | |
# Add primitive values directly | |
items[new_key] = v | |
elif isinstance(d, list): | |
# Flatten list items, creating keys like parent_key_0, parent_key_1, etc. | |
for i, elem in enumerate(d): | |
nested_items = flatten_item(elem, f'{parent_key}{sep}{i}' if parent_key else str(i), sep=sep) | |
items.update(nested_items) | |
# If d is a primitive (int, str, bool, None), it won't add anything here, which is fine | |
# as primitives are handled in the dict/list branches. | |
return items | |
# Process each top-level item in chatbot_data | |
for i, item in enumerate(chatbot_data): | |
if isinstance(item, dict): | |
# Flatten the entire dictionary item | |
flat_item = flatten_item(item) | |
flat_data.append(flat_item) | |
# If chatbot_data contains non-dict top-level items, flatten them too | |
elif isinstance(item, (list, str, int, float, bool, type(None))): | |
flat_data.append({'item_value': item}) # Wrap primitives in a dict | |
except Exception as e: | |
# Handle exceptions that may occur during processing | |
response = f"An error occurred: {str(e)}" | |
chat_history.append((message, response)) # Append error message to chat history | |
if flat_data: | |
try: | |
# Create DataFrame. Use errors='ignore' for columns with mixed types that can't be coerced | |
df = pd.DataFrame(flat_data) | |
# Convert object columns to string type explicitly to avoid future warnings/errors | |
for col in df.columns: | |
if df[col].dtype == 'object': | |
df[col] = df[col].astype(str) | |
logger.debug(f"Created DataFrame with shape: {df.shape}") | |
logger.debug(f"DataFrame columns: {list(df.columns)}") | |
except Exception as e: | |
logger.warning(f"Could not create pandas DataFrame from processed data: {e}. Falling back to manual processing.") | |
df = None | |
else: | |
logger.warning("Flattened data is empty. Cannot create DataFrame.") | |
df = None | |
except Exception as e: | |
logger.error(f"Error during DataFrame creation from chatbot_data: {e}") | |
df = None | |
response = f"An error occurred while preparing data for analysis: {e}" | |
# --- Complex Queries and Analysis --- | |
# These operations should primarily act on the FULL dataframe 'df' | |
# unless the user explicitly asks about the 'filtered' data. | |
# The filter command itself updates `new_filtered_df_state`. | |
if df is not None and not response: # Proceed with analysis if DataFrame exists and no error yet | |
# List available columns (from the full DataFrame) | |
if "what columns are available" in lower_message or "list columns" in lower_message: | |
response = f"The available columns in the full dataset are: {', '.join(df.columns)}" | |
# Describe a specific column (from the full DataFrame) | |
match = re.search(r'describe column (\w+)', lower_message) | |
if match: | |
column_name = match.group(1) | |
if column_name in df.columns: | |
# Handle non-numeric describe gracefully | |
try: | |
description = df[column_name].describe().to_string() | |
response = f"Description for column '{column_name}':\n```\n{description}\n```" | |
except Exception as e: | |
response = f"Could not generate description for column '{column_name}': {e}" | |
logger.warning(f"Error describing column '{column_name}': {e}") | |
else: | |
response = f"I couldn't find a column named '{column_name}'. Available columns are: {', '.join(df.columns)}" | |
# How many unique values in a column? (from the full DataFrame) | |
match = re.search(r'how many unique values in (\w+)', lower_message) | |
if match: | |
column_name = match.group(1) | |
if column_name in df.columns: | |
try: | |
unique_count = df[column_name].nunique() | |
response = f"There are {unique_count} unique values in the '{column_name}' column (in the full dataset)." | |
except Exception as e: | |
response = f"Could not count unique values for column '{column_name}': {e}" | |
logger.warning(f"Error counting unique values for column '{column_name}': {e}") | |
else: | |
response = f"I couldn't find a column named '{column_name}' in the data. Available columns are: {', '.join(df.columns)}" | |
# What is the average/sum/min/max of a numeric column? (from the full DataFrame) | |
match = re.search(r'what is the (average|sum|min|max) of (\w+)', lower_message) | |
if match: | |
operation, column_name = match.groups() | |
if column_name in df.columns: | |
try: | |
# Attempt to convert to numeric, coercing errors to NaN, then drop NaNs | |
numeric_col = pd.to_numeric(df[column_name], errors='coerce').dropna() | |
if not numeric_col.empty: | |
if operation == 'average': | |
result = numeric_col.mean() | |
response = f"The average of '{column_name}' is {result:.2f}." | |
elif operation == 'sum': | |
result = numeric_col.sum() | |
response = f"The sum of '{column_name}' is {result:.2f}." | |
elif operation == 'min': | |
result = numeric_col.min() | |
response = f"The minimum of '{column_name}' is {result}." | |
elif operation == 'max': | |
result = numeric_col.max() | |
response = f"The maximum of '{column_name}' is {result}." | |
else: | |
response = "I can calculate average, sum, min, or max." # Should not reach here due to regex | |
else: | |
response = f"The column '{column_name}' does not contain numeric values that I can analyze." | |
except Exception as e: | |
response = f"An error occurred while calculating the {operation} of '{column_name}': {e}" | |
logger.error(f"Error calculating {operation} for column '{column_name}': {e}") | |
else: | |
response = f"I couldn't find a column named '{column_name}'. Available columns are: {', '.join(df.columns)}" | |
# Enhanced Filter data based on more complex conditions | |
# This section *updates* `new_filtered_df_state` based on the filter command. | |
# It should filter from the *full* dataframe (`df`). | |
filter_match = re.search( | |
r'(?:filter|show items|show me items|find entries|select items|get items)\s+' # Optional action phrases | |
r'(?:where|by|for|with|if)\s+' # Keyword indicating condition | |
r'(\w+)\s+' # Column name | |
r'(is|equals?|==|!=|>=?|<=?|contains?|starts with|ends with)\s+' # Operator | |
r'([\'"]?[\w\s.-]+[\'"]?)', # Value (allows spaces, dots, hyphens if quoted, or single words) | |
lower_message | |
) | |
if filter_match: | |
column_name, operator, value_str = filter_match.groups() | |
column_name = column_name.strip() | |
operator = operator.strip().lower() | |
value_str = value_str.strip().strip("'\"") | |
logger.info(f"Filter request: Column='{column_name}', Operator='{operator}', Value='{value_str}'") | |
if df is None: | |
response = "No data available to filter. Please process data first." | |
new_filtered_df_state = None # Ensure state is None if no data | |
elif column_name not in df.columns: | |
response = f"I couldn't find a column named '{column_name}'. Available columns are: {', '.join(df.columns)}" | |
new_filtered_df_state = None # Clear previous filter if column not found | |
else: | |
# Always filter from the original full dataframe 'df' | |
active_df_to_filter = df.copy() | |
col_series_original = active_df_to_filter[column_name] # Use original series for type checks | |
try: | |
# Attempt to infer value type for comparison and prepare column series | |
target_value: Any | |
condition = None # Initialize condition | |
# Handle numeric comparisons | |
if operator in ['>', '>=', '<', '<=', '==', '!=']: | |
try: | |
# Try converting *both* column and value to numeric | |
col_series_numeric = pd.to_numeric(col_series_original, errors='coerce') | |
target_value = float(value_str) | |
# Apply numeric condition only where conversion was successful (not NaN) | |
if operator == '==': condition = col_series_numeric == target_value | |
elif operator == '!=': condition = col_series_numeric != target_value | |
elif operator == '>': condition = col_series_numeric > target_value | |
elif operator == '>=': condition = col_series_numeric >= target_value | |
elif operator == '<': condition = col_series_numeric < target_value | |
elif operator == '<=': condition = col_series_numeric <= target_value | |
# Ensure condition is a boolean Series of the same index as the DataFrame | |
if condition is not None: | |
condition = condition.fillna(False) # Treat NaNs in numeric column as not matching | |
except ValueError: | |
response = f"For numeric comparison on column '{column_name}', '{value_str}' is not a valid number." | |
target_value = None # Error case | |
condition = None # Clear condition on error | |
# Handle string comparisons (includes 'is', 'equals', '!=', 'contains', 'starts with', 'ends with') | |
elif operator in ['is', 'equals', '==', '!=', 'contains', 'contain', 'starts with', 'ends with']: | |
# Ensure column is treated as string for these operations | |
col_series_string = col_series_original.astype(str).str.lower() | |
target_value = str(value_str).lower() # Case-insensitive comparison | |
if operator in ['is', 'equals', '==']: | |
condition = col_series_string == target_value | |
elif operator == '!=': | |
condition = col_series_string != target_value | |
elif operator in ['contains', 'contain']: | |
condition = col_series_string.str.contains(target_value, na=False) # na=False treats NaN strings as not containing | |
elif operator == 'starts with': | |
condition = col_series_string.str.startswith(target_value, na=False) | |
elif operator == 'ends with': | |
condition = col_series_string.str.endswith(target_value, na=False) | |
# else: condition remains None for unsupported string ops (should be caught by regex) | |
# Handle boolean comparisons (if column type is bool or value looks like bool) | |
elif operator in ['is', 'equals', '==', '!='] and (pd.api.types.is_bool_dtype(col_series_original) or value_str.lower() in ['true', 'false']): | |
try: | |
col_series_bool = col_series_original.astype(bool) # Attempt to convert column to bool | |
target_value = value_str.lower() == 'true' # Convert value string to bool | |
if operator in ['is', 'equals', '==']: | |
condition = col_series_bool == target_value | |
elif operator == '!=': | |
condition = col_series_bool != target_value | |
# Ensure condition is boolean Series | |
if condition is not None: | |
condition = condition.fillna(False) # Treat NaNs/errors in bool conversion as not matching | |
except ValueError: | |
response = f"For boolean comparison on column '{column_name}', '{value_str}' is not a valid boolean value (true/false)." | |
target_value = None | |
condition = None | |
else: | |
# If none of the above types matched, the operator is likely invalid for the column type | |
response = f"Unsupported operator '{operator}' for column '{column_name}'. Please check column type or operator." | |
condition = None | |
if condition is not None: | |
# Apply condition to the active_df_to_filter (which is a copy of the full df) | |
filtered_results_df = active_df_to_filter[condition] | |
if not filtered_results_df.empty: | |
new_filtered_df_state = filtered_results_df # Update state with new filter result | |
num_results = len(filtered_results_df) | |
preview_rows = min(num_results, 5) | |
preview_cols = min(len(filtered_results_df.columns), 5) | |
# Select only the first `preview_cols` columns for the preview | |
preview_df = filtered_results_df.head(preview_rows).iloc[:, :preview_cols] | |
preview_str = preview_df.to_string(index=False) | |
response = (f"Found {num_results} items where '{column_name}' {operator} '{value_str}'.\n" | |
f"Here's a preview (first {preview_rows} rows, first {preview_cols} columns):\n```\n{preview_str}\n```\n" | |
f"The full filtered dataset ({num_results} items) is now available for download using the 'Download Filtered JSON' button.") | |
else: | |
new_filtered_df_state = pd.DataFrame() # Store empty DF for "no results" | |
response = f"No items found where '{column_name}' {operator} '{value_str}'." | |
# If condition is None (e.g. bad operator or type mismatch error) and response not already set, set generic invalid op message. | |
elif not response: # Avoid overwriting specific error from type check | |
response = f"Unsupported operator '{operator}' for column '{column_name}'. Please check column type or operator." | |
new_filtered_df_state = None | |
except ValueError as ve: # Specifically catch ValueError for target_value conversion | |
response = f"Invalid value '{value_str}' for comparison on column '{column_name}'. {ve}" | |
new_filtered_df_state = None # Clear on value error | |
logger.warning(f"ValueError during filter: {ve}") | |
except Exception as e: | |
new_filtered_df_state = None # Clear on other errors | |
response = f"An error occurred while applying the filter: {e}" | |
logger.error(f"Error applying filter (column='{column_name}', op='{operator}', val='{value_str}'): {e}") | |
# If the message was a filter, new_filtered_df_state is now set (or None/empty if error/no results) | |
# --- End of Enhanced Filter Logic --- | |
# If `response` is still empty, it means no filter query was matched by the filter_match regex. | |
# In this case, new_filtered_df_state (initialized from current_filtered_df_state) remains unchanged. | |
# Request structured output (e.g., as CSV or simplified JSON) | |
# This section should act on the *original* df unless specifically asked for filtered data export. | |
# The new download buttons handle filtered data export separately. | |
# Let's assume for now it acts on the original df, and a separate command would be needed for "export filtered data" | |
# If no filter query matched, and no other specific df query matched, | |
# then `response` might still be empty. `new_filtered_df_state` will be the same as `current_filtered_df_state`. | |
# The general queries below should not reset `new_filtered_df_state` unless it's a "clear" command. | |
elif "output as csv" in lower_message or "export as csv" in lower_message: | |
if df is not None and not df.empty: | |
csv_output = df.to_csv(index=False) | |
response = f"Here is the data in CSV format:\n```csv\n{csv_output[:1000]}...\n```\n(Output truncated for chat display)" | |
else: | |
response = "There is no data available to output as CSV." | |
elif "output as json" in lower_message or "export as json" in lower_message: # Note: "export as json" is different from download buttons | |
if df is not None and not df.empty: | |
json_output = df.to_json(orient='records', indent=2) | |
response = f"Here is the data in JSON format:\n```json\n{json_output[:1000]}...\n```\n(Output truncated for chat display)" | |
else: | |
response = "There is no data available to output as JSON." | |
# --- General Queries (if no DataFrame or specific query matched AND no filter was applied in this turn) --- | |
# These should not clear new_filtered_df_state unless it's a "clear chat" | |
if not response: # Only enter if no response has been generated by DataFrame/filter logic | |
if "how many items" in lower_message or "number of items" in lower_message: | |
# Check filtered state first, then full df, then raw chatbot_data list | |
if new_filtered_df_state is not None and not new_filtered_df_state.empty: | |
response = f"The currently filtered dataset has {len(new_filtered_df_state)} items." | |
if df is not None: | |
response += f" The original dataset has {len(df)} items." | |
elif df is not None: # Check df from original chatbot_data | |
response = f"There are {len(df)} items in the processed data." | |
elif isinstance(chatbot_data, list): # Fallback if df creation failed but chatbot_data is list | |
response = f"There are {len(chatbot_data)} top-level items in the processed data (not in DataFrame)." | |
elif isinstance(chatbot_data, dict): | |
response = "The processed data is a single dictionary, not a list of items." | |
else: | |
response = "The processed data is not a standard list or dictionary structure." | |
elif "what is the structure" in lower_message or "tell me about the data" in lower_message: | |
# Describe filtered data structure if available, otherwise full data structure | |
if new_filtered_df_state is not None and not new_filtered_df_state.empty: | |
response = f"The filtered data is a table with {len(new_filtered_df_state)} rows and columns: {', '.join(new_filtered_df_state.columns)}. " | |
if df is not None: | |
response += f"The original data has columns: {', '.join(df.columns)}." | |
else: | |
response += "Original data structure is not tabular." | |
elif df is not None: | |
response = f"The data is a table with {len(df)} rows and columns: {', '.join(df.columns)}." | |
elif isinstance(chatbot_data, list) and chatbot_data: | |
sample_item = chatbot_data[0] | |
response = f"The data is a list containing {len(chatbot_data)} items. The first item has the following top-level keys: {list(sample_item.keys())}." | |
elif isinstance(chatbot_data, dict): | |
response = f"The data is a dictionary with the following top-level keys: {list(chatbot_data.keys())}." | |
else: | |
response = "The processed data is not a standard list or dictionary structure that I can easily describe." | |
# "show me" without a filter condition might be ambiguous. | |
# Let's assume it refers to the original data or provide guidance. | |
elif "show me" in lower_message or "get me" in lower_message or "extract" in lower_message: | |
# This specific 'show me' without 'where' should not trigger a filter or clear existing filter state. | |
# It's a general request for data, which is too broad. Guide the user. | |
response = "If you want to filter the data, please use a phrase like 'show items where column_name is value'. If you want to see the raw data, consider using the download buttons." | |
# --- Speculation about Modifications --- | |
# These responses are purely informative and do not modify data or state. | |
elif "how can i modify" in lower_message or "how to change" in lower_message or "can i add" in lower_message or "can i remove" in lower_message: | |
response = "I cannot directly modify the data here, but I can tell you how you *could* modify it programmatically. What kind of change are you considering (e.g., adding an item, changing a value, removing a field)?" | |
elif "add a field" in lower_message or "add a column" in lower_message: | |
response = "To add a field (or column if the data is tabular), you would typically iterate through each item (or row) in the data and add the new key-value pair. For example, adding a 'status' field with a default value." | |
elif "change a value" in lower_message or "update a field" in lower_message: | |
response = "To change a value, you would need to identify the specific item(s) and the field you want to update. You could use a condition (like filtering) to find the right items and then assign a new value to the field." | |
elif "remove a field" in lower_message or "delete a column" in lower_message: | |
response = "To remove a field, you would iterate through each item and delete the specified key. Be careful, as this is irreversible." | |
elif "restructure" in lower_message or "change the format" in lower_message: | |
response = "Restructuring data involves transforming it into a different shape. This could mean flattening nested objects, grouping items, or pivoting data. This often requires writing custom code to map the old structure to the new one." | |
elif "what if i" in lower_message or "if i changed" in lower_message: | |
response = "Tell me what specific change you're contemplating, and I can speculate on the potential impact or how you might approach it programmatically." | |
# --- General Conversation / Fallback --- | |
elif "hello" in lower_message or "hi" in lower_message: | |
response = random.choice(["Hello! How can I help you understand the processed data?", "Hi there! What's on your mind about this data?", "Hey! Ask me anything about the data you've loaded."]) | |
elif "thank you" in lower_message or "thanks" in lower_message: | |
response = random.choice(["You're welcome!", "Glad I could help.", "No problem! Let me know if you have more questions about the data."]) | |
elif "clear chat" in lower_message: # This should be caught by button, but as text too | |
# Gradio handles clearing the chatbot component state via the button action. | |
# We just need to clear the filtered data state here. | |
response = "Chat history cleared." # Respond that chat is cleared | |
new_filtered_df_state = None # Also clear filtered data on "clear chat" command by text | |
elif not response: # Fallback if nothing else matched | |
response = random.choice([ | |
"I can analyze the data you've processed. What would you like to know? Try asking to filter data, e.g., 'show items where status is active'.", | |
"Ask me about the number of items, the structure, or values of specific fields. You can also filter data.", | |
"I can perform basic analysis or filter the data. For example: 'filter by price > 100'.", | |
"Tell me what you want to extract or filter from the data. Use phrases like 'show items where ...'.", | |
"I'm equipped to filter your data. Try 'find entries where name contains widget'." | |
]) | |
# --- End of main try block --- | |
except Exception, e: | |
logger.error(f"Chatbot runtime error: {e}") | |
response = f"An internal error occurred while processing your request: {e}" | |
response += "\nPlease try rephrasing your question or clear the chat history." | |
# On unexpected error, preserve the current_filtered_df_state rather than clearing or modifying it. | |
# new_filtered_df_state = current_filtered_df_state # This line is effectively already done by initialization | |
# --- Finally block (optional, but good practice if cleanup is needed) --- | |
# finally: | |
# # Any cleanup code can go here | |
# pass | |
if not response: # Final safety net for response, if it's somehow still empty | |
response = "I'm not sure how to respond to that. Please try rephrasing or ask for help on available commands." | |
# Update the last message in chat history with the generated response | |
# Find the last entry where the assistant's response is None | |
for i in reversed(range(len(chat_history))): | |
if chat_history[i][1] is None: | |
chat_history[i] = (chat_history[i][0], response) | |
break | |
# If no None placeholder was found (shouldn't happen with current logic), append as new entry | |
# else: | |
# chat_history.append((message, response)) | |
# Ensure chat_history is in the format Gradio expects for type='messages' | |
# It should be a list of lists: [[user_msg, bot_msg], [user_msg, bot_msg], ...] | |
# The current format List[Tuple[str, str]] works with type='messages' as tuples are treated like lists. | |
return chat_history, chatbot_data, new_filtered_df_state | |
# --- Gradio Interface Definition --- | |
def create_modern_interface(): | |
"""Create a modern and visually appealing Gradio interface""" | |
css = """ | |
/* Modern color scheme */ | |
:root { | |
--primary-color: #1a365d; | |
--secondary-color: #2d3748; | |
--accent-color: #4299e1; | |
--background-color: #f7fafc; | |
--success-color: #48bb78; | |
--error-color: #f56565; | |
--warning-color: #ed8936; | |
} | |
/* Container styling */ | |
.container { | |
max-width: 1200px; | |
margin: auto; | |
padding: 2rem; | |
background-color: var(--background-color); | |
border-radius: 1rem; | |
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); | |
} | |
/* Component styling */ | |
.input-container { | |
background-color: white; | |
padding: 1.5rem; | |
border-radius: 0.5rem; | |
border: 1px solid #e2e8f0; | |
margin-bottom: 1rem; | |
} | |
/* Button styling */ | |
.primary-button { | |
background-color: var(--primary-color); | |
color: white; | |
padding: 0.75rem 1.5rem; | |
border-radius: 0.375rem; | |
border: none; | |
cursor: pointer; | |
transition: all 0.2s; | |
} | |
.primary-button:hover { | |
background-color: var(--accent-color); | |
transform: translateY(-1px); | |
} | |
/* Status messages */ | |
.status { | |
padding: 1rem; | |
border-radius: 0.375rem; | |
margin: 1rem 0; | |
} | |
.status.success { background-color: #f0fff4; color: var(--success-color); } | |
.status.error { background-color: #fff5f5; color: var(--error-color); } | |
.status.warning { background-color: #fffaf0; color: var(--warning-color); } | |
/* Gallery styling */ | |
.gallery { | |
display: grid; | |
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); | |
gap: 1rem; | |
padding: 1rem; | |
background-color: white; | |
border-radius: 0.5rem; | |
border: 1px solid #e2e8f0; | |
} | |
.gallery img { | |
width: 100%; | |
height: auto; | |
border-radius: 0.375rem; | |
transition: transform 0.2s; | |
} | |
.gallery img:hover { | |
transform: scale(1.05); | |
} | |
/* QR Code Viewport Styling */ | |
.viewport-container { | |
display: grid; | |
gap: 0.5rem; | |
padding: 1rem; | |
background-color: white; | |
border-radius: 0.5rem; | |
border: 1px solid #e2e8f0; | |
margin-top: 1rem; | |
} | |
.viewport-item { | |
display: flex; | |
flex-direction: column; | |
align-items: center; | |
} | |
.viewport-item img { | |
width: 100%; | |
height: auto; | |
border-radius: 0.375rem; | |
transition: transform 0.2s; | |
max-width: 150px; | |
max-height: 150px; | |
} | |
""" | |
with gr.Blocks(css=css, title="Advanced Data Processor & QR Generator") as interface: | |
interface.head += """ | |
<script> | |
let enabledStates = []; | |
function updateEnabledStates(checkbox) { | |
const index = parseInt(checkbox.dataset.index); | |
if (checkbox.checked) { | |
if (!enabledStates.includes(index)) { | |
enabledStates.push(index); | |
} | |
} else { | |
enabledStates = enabledStates.filter(item => item !== index); | |
} | |
const enabled_qr_codes_component = document.querySelector('[data-component-type="state"][data-state-name="enabled_qr_codes"]'); | |
if (enabled_qr_codes_component) { | |
enabled_qr_codes_component.value = JSON.stringify(enabledStates); | |
enabled_qr_codes_component.dispatchEvent(new Event('input')); | |
} | |
console.log("Enabled QR Code Indices:", enabledStates); | |
} | |
</script> | |
""" | |
with gr.Row(): | |
crawl_depth_slider = gr.Slider( | |
label="Crawl Depth", | |
minimum=0, | |
maximum=10, | |
value=0, | |
step=1, | |
interactive=True, | |
info="Select the maximum depth for crawling links (0-10)." | |
) | |
qr_code_paths = gr.State([]) | |
chatbot_data = gr.State(None) | |
gr.Markdown(""" | |
# π Advanced Data Processing & QR Code Generator | |
Transform your data into beautifully designed, sequenced QR codes with our cutting-edge processor. | |
""") | |
with gr.Tab("π URL Processing"): | |
url_input = gr.Textbox( | |
label="Enter URLs (comma or newline separated)", | |
lines=5, | |
placeholder="https://example1.com\nhttps://example2.com", | |
value="" | |
) | |
with gr.Tab("π File Input"): | |
file_input = gr.File( | |
label="Upload Files", | |
file_types=None, | |
file_count="multiple" | |
) | |
with gr.Tab("π JSON Input"): | |
text_input = gr.TextArea( | |
label="Direct JSON Input", | |
lines=15, | |
placeholder="Paste your JSON data here...", | |
value="" | |
) | |
with gr.Row(): | |
example_btn = gr.Button("π Load Example", variant="secondary") | |
clear_btn = gr.Button("ποΈ Clear", variant="secondary") | |
with gr.Row(): | |
combine_data = gr.Checkbox( | |
label="Combine all data into sequence", | |
value=True, | |
info="Generate sequential QR codes for combined data" | |
) | |
generate_qr_toggle = gr.Checkbox( | |
label="Generate QR Codes", | |
value=False, # Default to False as per task | |
info="Enable to generate QR codes for the processed data." | |
) | |
process_btn = gr.Button( | |
"π Process & Generate QR", | |
variant="primary" | |
) | |
output_json = gr.JSON(label="Processed Data") | |
output_gallery = gr.Gallery( | |
label="Generated QR Codes", | |
columns=3, | |
height=400, | |
show_label=True | |
) | |
output_text = gr.Textbox( | |
label="Processing Status", | |
interactive=False | |
) | |
with gr.Tab("πΌοΈ QR Code Viewport") as viewport_tab: | |
viewport_output = gr.HTML(label="QR Code Sequence Viewport") | |
enabled_qr_codes = gr.State([]) | |
with gr.Tab("π€ Chat with Data") as chat_tab: | |
chat_history = gr.State([]) | |
chatbot = gr.Chatbot(label="Data Chatbot", type='messages') # Set type to 'messages' | |
filtered_chatbot_df_state = gr.State(None) # To store the filtered DataFrame | |
with gr.Row(): | |
chat_input = gr.Textbox(label="Your Message", placeholder="Ask me about the processed data...") | |
send_msg_btn = gr.Button("Send") | |
with gr.Row(): | |
download_full_json_btn = gr.Button("Download Full JSON") | |
download_filtered_json_btn = gr.Button("Download Filtered JSON") | |
download_file_output = gr.File(label="Download Data", interactive=False) # For triggering download | |
clear_chat_btn = gr.Button("Clear Chat History") | |
# Event handlers must be defined within the Blocks context | |
def load_example(): | |
example = { | |
"type": "product_catalog", | |
"items": [ | |
{ | |
"id": "123", | |
"name": "Premium Widget", | |
"description": "High-quality widget with advanced features", | |
"price": 299.99, | |
"category": "electronics", | |
"tags": ["premium", "featured", "new"] | |
}, | |
{ | |
"id": "456", | |
"name": "Basic Widget", | |
"description": "Reliable widget for everyday use", | |
"price": 149.99, | |
"category": "electronics", | |
"tags": ["basic", "popular"] | |
} | |
], | |
"metadata": { | |
"timestamp": datetime.now().isoformat(), | |
"version": "2.0", | |
"source": "example" | |
} | |
} | |
return json.dumps(example, indent=2) | |
def clear_input(): | |
# Clear all input fields and the chatbot data state | |
return "", None, "", None | |
def update_viewport(paths, enabled_states): | |
if not paths: | |
return "<p>No QR codes generated yet.</p>" | |
num_qr_codes = len(paths) | |
# Determine grid columns based on the number of QRs, aiming for a roughly square layout | |
cols = math.ceil(math.sqrt(num_qr_codes)) | |
cols = max(1, min(cols, 6)) # Clamp columns between 1 and 6 | |
viewport_html = f'<div class="viewport-container" style="grid-template-columns: repeat({cols}, 1fr);">' | |
# Ensure enabled_states is a list of indices if it's None or doesn't match current paths | |
if enabled_states is None or len(enabled_states) != num_qr_codes: | |
enabled_states = list(range(num_qr_codes)) | |
for i, path in enumerate(paths): | |
is_enabled = i in enabled_states | |
border = "border: 2px solid green;" if is_enabled else "border: 2px solid lightgray;" | |
opacity = "opacity: 1.0;" if is_enabled else "opacity: 0.5;" | |
# Use /file= prefix for Gradio to serve local files | |
viewport_html += f'<div class="viewport-item" id="qr_item_{i}">' | |
viewport_html += f'<img src="/file={path}" style="{border} {opacity}" alt="QR Code {i+1}">' | |
# Add checkbox with data-index for JS to identify which QR it controls | |
viewport_html += f'<label><input type="checkbox" data-index="{i}" {"checked" if is_enabled else ""} onchange="updateEnabledStates(this)"> Enable</label>' | |
viewport_html += '</div>' | |
viewport_html += '</div>' | |
return viewport_html | |
def on_qr_generation(qr_paths_list): | |
"""Handler to initialize enabled_qr_codes state after QR generation.""" | |
if qr_paths_list is None: | |
num_qrs = 0 | |
else: | |
num_qrs = len(qr_paths_list) | |
# Initially enable all generated QR codes | |
initial_enabled_states = list(range(num_qrs)) | |
# Return the paths list and the initial enabled states | |
return qr_paths_list, initial_enabled_states | |
def process_inputs(urls, files, text, combine, crawl_depth, generate_qr_enabled): | |
"""Process all inputs and generate QR codes based on toggle""" | |
results = [] | |
processing_status_messages = [] | |
url_processor = EnhancedURLProcessor() | |
file_processor = EnhancedFileProcessor() | |
try: | |
if text and text.strip(): | |
try: | |
json_data = json.loads(text) | |
results.append({ | |
'source': 'json_input', | |
'extracted_data': json_data, | |
'timestamp': datetime.now().isoformat(), | |
'processing_notes': ['Parsed from direct JSON input.'] | |
}) | |
processing_status_messages.append("β Successfully parsed direct JSON input.") | |
except json.JSONDecodeError as e: | |
processing_status_messages.append(f"β Invalid JSON format in text input: {str(e)}") | |
logger.error(f"Invalid JSON format in text input: {e}") | |
except Exception as e: | |
processing_status_messages.append(f"β Error processing direct JSON input: {str(e)}") | |
logger.error(f"Error processing direct JSON input: {e}") | |
if urls and urls.strip(): | |
url_list = re.split(r'[,\n]', urls) | |
url_list = [url.strip() for url in url_list if url.strip()] | |
for url in url_list: | |
processing_status_messages.append(f"π Processing URL: {url} with crawl depth {crawl_depth}...") | |
# Call fetch_content_with_depth which handles recursion | |
content_result = url_processor.fetch_content_with_depth(url, max_steps=crawl_depth) | |
# The result from fetch_content_with_depth is already structured | |
# It includes the main fetch_result and linked_extractions | |
if content_result: # Check if a result dictionary was returned | |
results.append(content_result) | |
# Provide status based on the fetch_result within the recursive structure | |
main_fetch_status = content_result.get('fetch_result', {}).get('status_code') | |
if main_fetch_status is not None and 200 <= main_fetch_status < 300: | |
processing_status_messages.append(f"β Processed URL: {url} (Level 0, Status: {main_fetch_status})") | |
if content_result.get('processing_notes'): | |
processing_status_messages.append(f" Notes for {url}: {'; '.join(content_result['processing_notes'])}") | |
# Count successfully processed linked pages | |
def count_successful_fetches(crawl_result): | |
count = 0 | |
if crawl_result and crawl_result.get('fetch_result') is not None: | |
status = crawl_result['fetch_result'].get('status_code') | |
if status is not None and 200 <= status < 300: | |
count += 1 | |
for linked_result in crawl_result.get('linked_extractions', []): | |
count += count_successful_fetches(linked_result) | |
return count | |
total_attempted_links = len(content_result.get('linked_extractions', [])) | |
total_successful_linked = count_successful_fetches({'linked_extractions': content_result.get('linked_extractions', [])}) # Wrap to match expected structure | |
if total_attempted_links > 0: | |
processing_status_messages.append(f" Processed {total_successful_linked}/{total_attempted_links} linked pages up to depth {crawl_depth}.") | |
else: | |
processing_status_messages.append(f"β Failed to fetch or process URL: {url} (Status: {main_fetch_status})") | |
if content_result.get('processing_notes'): | |
processing_status_messages.append(f" Notes for {url}: {'; '.join(content_result['processing_notes'])}") | |
else: | |
processing_status_messages.append(f"β Failed to process URL: {url} (No result returned)") | |
if files: | |
for file in files: | |
processing_status_messages.append(f"π Processing file: {file.name}...") | |
file_results = file_processor.process_file(file) | |
if file_results: | |
results.extend(file_results) | |
processing_status_messages.append(f"β Processed file: {file.name}") | |
for res in file_results: | |
if res.get('processing_notes'): | |
processing_status_messages.append(f" Notes for {res.get('filename', 'item')}: {'; '.join(res['processing_notes'])}") | |
else: | |
processing_status_messages.append(f"β Failed to process file: {file.name}") | |
# Add a default note if process_file returned empty list without notes | |
if not file_results and file and hasattr(file, 'name'): | |
processing_status_messages.append(f" No results returned for file: {file.name}") | |
qr_paths = [] | |
final_json_output = None | |
if results: | |
final_json_output = results # Assign processed data regardless of QR generation | |
if generate_qr_enabled: | |
processing_status_messages.append("βοΈ Generating QR codes as requested...") | |
# generate_qr_codes expects a List[Dict] | |
qr_paths = generate_qr_codes(results, combine) | |
if qr_paths: | |
processing_status_messages.append(f"β Successfully generated {len(qr_paths)} QR codes.") | |
else: | |
processing_status_messages.append("β Failed to generate QR codes (empty result or error). Check logs.") | |
else: | |
processing_status_messages.append("βοΈ QR code generation was disabled. Processed data is available.") | |
qr_paths = [] # Ensure it's empty | |
else: | |
processing_status_messages.append("β οΈ No valid content collected from inputs.") | |
final_json_output = [] # Ensure output_json is cleared if no results | |
except Exception as e: | |
logger.error(f"Overall processing error in process_inputs: {e}") | |
processing_status_messages.append(f"β An unexpected error occurred during processing: {str(e)}") | |
final_json_output = [] # Clear output on unexpected error | |
qr_paths = [] # Clear qrs on unexpected error | |
# Return the processed data, QR paths, status messages, and update chatbot_data state | |
return ( | |
final_json_output, | |
[str(path) for path in qr_paths], # Return paths as strings for Gradio Gallery | |
"\n".join(processing_status_messages), | |
final_json_output # Update chatbot_data state | |
) | |
# --- Download Logic --- | |
def download_json_data(data_df: Optional[pd.DataFrame], filename_prefix: str) -> Optional[str]: | |
"""Helper function to convert DataFrame to JSON file for download.""" | |
if data_df is None or data_df.empty: | |
logger.info(f"No data provided for download with prefix '{filename_prefix}'.") | |
return None | |
try: | |
# Convert DataFrame to list of dictionaries | |
data_list = data_df.to_dict(orient='records') | |
json_str = json.dumps(data_list, indent=2, ensure_ascii=False) | |
timestamp = int(time.time()) | |
filename = f"{filename_prefix}_{timestamp}.json" | |
file_path = TEMP_DIR / filename | |
# Ensure temp directory exists (already done at startup, but good practice) | |
TEMP_DIR.mkdir(parents=True, exist_ok=True) | |
with open(file_path, 'w', encoding='utf-8') as f: | |
f.write(json_str) | |
logger.info(f"Successfully created JSON file for download: {file_path}") | |
# Return the path to the temporary file | |
return str(file_path) | |
except Exception as e: | |
logger.error(f"Error creating JSON file for {filename_prefix}: {e}") | |
return None | |
def handle_download_full_json(current_chatbot_data_state: Optional[List[Dict]]) -> Optional[str]: | |
"""Handler for the 'Download Full JSON' button.""" | |
# This function receives the full processed data (List[Dict]) from the chatbot_data state | |
if not current_chatbot_data_state: | |
logger.info("No full data available to download.") | |
return None | |
try: | |
# Attempt to create a DataFrame from the full data state for consistent output structure | |
# This uses the same flattening logic as the chatbot | |
flat_data = [] | |
def flatten_item_for_download(d, parent_key='', sep='_'): | |
items = {} | |
if isinstance(d, dict): | |
for k, v in d.items(): | |
new_key = parent_key + sep + k if parent_key else k | |
if isinstance(v, (dict, list)): | |
nested_items = flatten_item_for_download(v, new_key, sep=sep) | |
items.update(nested_items) | |
else: | |
items[new_key] = v | |
elif isinstance(d, list): | |
for i, elem in enumerate(d): | |
nested_items = flatten_item_for_download(elem, f'{parent_key}{sep}{i}' if parent_key else str(i), sep=sep) | |
items.update(nested_items) | |
return items | |
for item in current_chatbot_data_state: | |
if isinstance(item, dict): | |
flat_data.append(flatten_item_for_download(item)) | |
# Handle cases where top-level items might not be dicts, wrap them | |
elif isinstance(item, (list, str, int, float, bool, type(None))): | |
flat_data.append({'item_value': item}) | |
if not flat_data: | |
logger.info("Full data flattened to empty list. Nothing to download.") | |
return None | |
df_to_download = pd.DataFrame(flat_data) | |
if df_to_download.empty: | |
logger.info("Full data resulted in an empty DataFrame. Nothing to download.") | |
return None | |
except Exception as e: | |
logger.error(f"Error converting full chatbot_data to DataFrame for download: {e}") | |
return None | |
# Pass the DataFrame to the generic download function | |
return download_json_data(df_to_download, "full_data") | |
def handle_download_filtered_json(current_filtered_df_state: Optional[pd.DataFrame]) -> Optional[str]: | |
"""Handler for the 'Download Filtered JSON' button.""" | |
# This function receives the already filtered DataFrame from the state | |
if current_filtered_df_state is None or current_filtered_df_state.empty: | |
logger.info("No filtered data available to download.") | |
return None | |
# Pass the DataFrame directly to the generic download function | |
return download_json_data(current_filtered_df_state, "filtered_data") | |
# Connect event handlers within the Blocks context | |
example_btn.click(load_example, inputs=[], outputs=text_input) | |
clear_btn.click(clear_input, inputs=[], outputs=[url_input, file_input, text_input, chatbot_data]) | |
process_btn.click( | |
process_inputs, | |
inputs=[url_input, file_input, text_input, combine_data, crawl_depth_slider, generate_qr_toggle], | |
outputs=[output_json, output_gallery, output_text, chatbot_data] | |
).then( | |
# This .then() is triggered after process_inputs completes and updates output_gallery | |
on_qr_generation, | |
inputs=[output_gallery], # Pass the list of QR paths from the gallery output | |
outputs=[qr_code_paths, enabled_qr_codes] # Update the state variables | |
) | |
# When the viewport tab is selected, update the viewport HTML | |
viewport_tab.select(update_viewport, inputs=[qr_code_paths, enabled_qr_codes], outputs=[viewport_output]) | |
# Chatbot send button and text input submit events | |
send_msg_btn.click( | |
respond_to_chat, | |
inputs=[chat_input, chat_history, chatbot_data, filtered_chatbot_df_state], | |
outputs=[chatbot, chatbot_data, filtered_chatbot_df_state] | |
).then( | |
# Clear the chat input box after sending message | |
lambda: "", | |
inputs=None, | |
outputs=chat_input | |
) | |
chat_input.submit( # Allow submitting by pressing Enter in the text box | |
respond_to_chat, | |
inputs=[chat_input, chat_history, chatbot_data, filtered_chatbot_df_state], # Pass filtered_chatbot_df_state here too | |
outputs=[chatbot, chatbot_data, filtered_chatbot_df_state] # And return it | |
).then( | |
# Clear the chat input box after submitting | |
lambda: "", | |
inputs=None, | |
outputs=chat_input | |
) | |
# Clear chat history button | |
clear_chat_btn.click( | |
# Clear chat history component and the filtered data state | |
lambda: ([], None), | |
inputs=None, | |
outputs=[chatbot, filtered_chatbot_df_state] | |
) | |
# Download buttons | |
download_full_json_btn.click( | |
fn=handle_download_full_json, | |
inputs=[chatbot_data], # chatbot_data is the gr.State holding the full dataset (List[Dict]) | |
outputs=[download_file_output] # The File component acts as the download trigger | |
) | |
download_filtered_json_btn.click( | |
fn=handle_download_filtered_json, | |
inputs=[filtered_chatbot_df_state], # This state holds the filtered DataFrame | |
outputs=[download_file_output] # The File component acts as the download trigger | |
) | |
gr.Markdown(""" | |
### π Features | |
- **Enhanced URL Scraping**: Extracts HTML text, title, meta description, links, and attempts parsing JSON/XML from URLs based on content type. Supports crawling links up to a specified depth. **(Now performs real fetching)** | |
- **Advanced File Processing**: Reads various text-based files (.txt, .md, .log etc.), HTML, XML, CSV, and attempts text extraction from common documents (.pdf, .docx, .rtf, .odt - *requires extra dependencies*). **(Now performs real file processing)** | |
- **Smart JSON Handling**: Parses valid JSON from direct input, files (.json or content), or URLs. | |
- **Archive Support**: Extracts and processes supported files from .zip, .tar, .gz archives. **(Now performs real extraction)** | |
- **Robust Encoding Detection**: Uses `chardet` for reliable character encoding identification. | |
- **Structured Output**: Provides a consistent JSON output format containing raw content (if applicable), extracted data, and processing notes for each processed item. | |
- **Sequential QR Codes**: Maintains data integrity across multiple codes by chunking the combined/individual processed data. | |
- **QR Code Viewport**: Visualize generated QR codes in a sequenced square grid with options to enable/disable individual codes for selective scanning/sharing. | |
- **Modern Design**: Clean, responsive interface with visual feedback. | |
- **Data Chatbot**: Interact conversationally with the processed JSON data to ask questions about its structure, content, or request specific information. | |
### π‘ Tips | |
1. **URLs**: Enter multiple URLs separated by commas or newlines. The processor will attempt to fetch and structure the content based on its type, following links up to the specified **Crawl Depth**. | |
2. **Files**: Upload any type of file. The processor will attempt to handle supported text-based files, archives (.zip, .tar, .gz), and specific document/structured formats. | |
3. **JSON**: Use the "Direct JSON Input" tab for pasting JSON data. The system also tries to detect JSON content in file uploads and URLs. Use the "Load Example" button to see a sample JSON structure. | |
4. **Dependencies**: Processing PDF, DOCX, RTF, and ODT files requires installing optional Python libraries (`PyPDF2`, `python-docx`, `pyth`, `odfpy`). Check the console logs for warnings if a library is missing. | |
5. **QR Codes**: Choose whether to "Combine all data into sequence" or generate separate sequences for each input item. | |
6. **Processing**: Monitor the "Processing Status" box for real-time updates and notes about errors or processing steps. | |
7. **Output**: The "Processed Data" JSON box shows the structured data extracted from your inputs. The "Generated QR Codes" gallery shows the QR code images. | |
8. **Chatbot**: After processing data, go to the "Chat with Data" tab to ask questions about the JSON output. | |
### βοΈ QR Code Viewport Instructions | |
1. Navigate to the **QR Code Viewport** tab after generating QR codes. | |
2. The generated QR codes will be displayed in a grid based on their total count. | |
3. Use the checkboxes below each QR code to enable or disable it for visual selection. Enabled codes have a green border and full opacity. | |
4. This viewport is currently for visualization and selection *within the UI*; it doesn't change the generated files themselves. You would manually select which physical QR codes to scan based on this view. | |
""") | |
return interface | |
def main(): | |
"""Initialize and launch the application""" | |
try: | |
mimetypes.init() | |
interface = create_modern_interface() | |
interface.launch( | |
share=False, # Set to True to create a public link (requires auth token) | |
debug=False, # Set to True for detailed debug output | |
show_error=True, # Show errors in the UI | |
show_api=False # Hide API endpoint details | |
) | |
except Exception as e: | |
logger.error(f"Application startup error: {e}") | |
print(f"\nFatal Error: {e}\nCheck the logs for details.") | |
raise | |
if __name__ == "__main__": | |
# Ensure the script is run directly (not imported) | |
main() |