Spaces:
Running
Running
import json | |
import os | |
import re | |
import time | |
import logging | |
import mimetypes | |
import zipfile | |
import tempfile | |
import chardet | |
from datetime import datetime | |
from typing import List, Dict, Optional, Union, Tuple | |
from pathlib import Path | |
from urllib.parse import urlparse, urljoin | |
import requests | |
import validators | |
import gradio as gr | |
from diskcache import Cache | |
from bs4 import BeautifulSoup | |
from fake_useragent import UserAgent | |
from cleantext import clean | |
import qrcode | |
from PIL import Image, ImageDraw, ImageFont | |
import numpy as np | |
import tarfile | |
import gzip | |
import networkx as nx | |
import matplotlib.pyplot as plt | |
from matplotlib.colors import to_rgba | |
import io | |
import math | |
# Setup enhanced logging with more detailed formatting | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler('app.log', encoding='utf-8') | |
]) | |
logger = logging.getLogger(__name__) | |
# Ensure output directories exist with modern structure | |
OUTPUTS_DIR = Path('output') | |
QR_CODES_DIR = OUTPUTS_DIR / 'qr_codes' | |
TEMP_DIR = OUTPUTS_DIR / 'temp' | |
for directory in [OUTPUTS_DIR, QR_CODES_DIR, TEMP_DIR]: | |
directory.mkdir(parents=True, exist_ok=True) | |
class EnhancedURLProcessor: | |
"""Advanced URL processing with complete content extraction""" | |
def __init__(self): | |
self.session = requests.Session() | |
self.timeout = 15 # Extended timeout for larger content | |
self.max_retries = 3 | |
self.user_agent = UserAgent() | |
# Enhanced headers for better site compatibility | |
self.session.headers.update({ | |
'User-Agent': self.user_agent.random, | |
'Accept': '*/*', # Accept all content types | |
'Accept-Language': 'en-US,en;q=0.9', | |
'Accept-Encoding': 'gzip, deflate, br', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1', | |
'Sec-Fetch-Dest': 'document', | |
'Sec-Fetch-Mode': 'navigate', | |
'Sec-Fetch-Site': 'none', | |
'Sec-Fetch-User': '?1', | |
'DNT': '1' | |
}) | |
def validate_url(self, url: str) -> Dict: | |
"""Enhanced URL validation with detailed feedback""" | |
try: | |
if not validators.url(url): | |
return {'is_valid': False, 'message': 'Invalid URL format', 'details': 'URL must begin with http:// or https://'} | |
parsed = urlparse(url) | |
if not all([parsed.scheme, parsed.netloc]): | |
return {'is_valid': False, 'message': 'Incomplete URL', 'details': 'Missing scheme or domain'} | |
# Try HEAD request first to check accessibility | |
try: | |
head_response = self.session.head(url, timeout=5) | |
head_response.raise_for_status() | |
except requests.exceptions.RequestException: | |
# If HEAD fails, try GET as some servers don't support HEAD | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
return { | |
'is_valid': True, | |
'message': 'URL is valid and accessible', | |
'details': { | |
'content_type': head_response.headers.get('Content-Type', 'unknown'), | |
'server': head_response.headers.get('Server', 'unknown'), | |
'size': head_response.headers.get('Content-Length', 'unknown') | |
} | |
} | |
except Exception as e: | |
return {'is_valid': False, 'message': f'URL validation failed: {str(e)}', 'details': str(e)} | |
def fetch_content(self, url: str, retry_count: int = 0) -> Optional[Dict]: | |
"""Enhanced content fetcher with retry mechanism and complete character extraction""" | |
try: | |
logger.info(f"Fetching content from URL: {url} (Attempt {retry_count + 1}/{self.max_retries})") | |
# Update User-Agent randomly for each request | |
self.session.headers.update({'User-Agent': self.user_agent.random}) | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
# Detect encoding | |
if response.encoding is None: | |
encoding = chardet.detect(response.content)['encoding'] or 'utf-8' | |
else: | |
encoding = response.encoding | |
# Decode content with fallback | |
try: | |
raw_content = response.content.decode(encoding, errors='replace') | |
except (UnicodeDecodeError, LookupError): | |
raw_content = response.content.decode('utf-8', errors='replace') | |
# Extract metadata | |
metadata = { | |
'url': url, | |
'timestamp': datetime.now().isoformat(), | |
'encoding': encoding, | |
'content_type': response.headers.get('Content-Type', ''), | |
'content_length': len(response.content), | |
'headers': dict(response.headers), | |
'status_code': response.status_code | |
} | |
# Process based on content type | |
content_type = response.headers.get('Content-Type', '').lower() | |
if 'text/html' in content_type: | |
processed_content = self._process_html_content(raw_content, url) | |
else: | |
processed_content = raw_content | |
return { | |
'content': processed_content, | |
'raw_content': raw_content, | |
'metadata': metadata | |
} | |
except requests.exceptions.RequestException as e: | |
if retry_count < self.max_retries - 1: | |
logger.warning(f"Retry {retry_count + 1}/{self.max_retries} for URL: {url}") | |
time.sleep(2 ** retry_count) # Exponential backoff | |
return self.fetch_content(url, retry_count + 1) | |
logger.error(f"Failed to fetch content after {self.max_retries} attempts: {e}") | |
return None | |
except Exception as e: | |
logger.error(f"Unexpected error while fetching content: {e}") | |
return None | |
def _process_html_content(self, content: str, base_url: str) -> str: | |
"""Process HTML content while preserving all characters""" | |
try: | |
soup = BeautifulSoup(content, 'html.parser') | |
# Convert relative URLs to absolute | |
for tag in soup.find_all(['a', 'img', 'link', 'script']): | |
for attr in ['href', 'src']: | |
if tag.get(attr): | |
try: | |
tag[attr] = urljoin(base_url, tag[attr]) | |
except Exception: | |
pass | |
# Extract all text content | |
text_parts = [] | |
for element in soup.stripped_strings: | |
text_parts.append(str(element)) | |
return '\n'.join(text_parts) | |
except Exception as e: | |
logger.error(f"HTML processing error: {e}") | |
return content | |
class EnhancedFileProcessor: | |
"""Advanced file processing with complete content extraction""" | |
def __init__(self, max_file_size: int = 5 * 1024 * 1024 * 1024): # 5GB default | |
self.max_file_size = max_file_size | |
self.supported_extensions = { | |
'.txt', '.md', '.csv', '.json', '.xml', '.html', '.htm', | |
'.log', '.yml', '.yaml', '.ini', '.conf', '.cfg', | |
'.zip', '.tar', '.gz', '.bz2', '.7z', '.rar', | |
'.pdf', '.doc', '.docx', '.rtf', '.odt' | |
} | |
def process_file(self, file) -> List[Dict]: | |
"""Process uploaded file with enhanced error handling and complete extraction""" | |
if not file: | |
return [] | |
dataset = [] | |
try: | |
file_size = os.path.getsize(file.name) | |
if file_size > self.max_file_size: | |
logger.warning(f"File size ({file_size} bytes) exceeds maximum allowed size") | |
return [] | |
with tempfile.TemporaryDirectory() as temp_dir: | |
temp_dir_path = Path(temp_dir) | |
# Handle different archive types | |
if self._is_archive(file.name): | |
dataset.extend(self._process_archive(file.name, temp_dir_path)) | |
elif Path(file.name).suffix.lower() in self.supported_extensions: | |
dataset.extend(self._process_single_file(file)) | |
else: | |
logger.warning(f"Unsupported file type: {file.name}") | |
except Exception as e: | |
logger.error(f"Error processing file: {str(e)}") | |
return [] | |
return dataset | |
def _is_archive(self, filepath: str) -> bool: | |
"""Check if file is an archive""" | |
return any(filepath.lower().endswith(ext) for ext in [ | |
'.zip', '.tar', '.gz', '.bz2', '.7z', '.rar' | |
]) | |
def _process_single_file(self, file) -> List[Dict]: | |
"""Process a single file with enhanced character extraction and JSON handling""" | |
try: | |
file_stat = os.stat(file.name) | |
file_size = file_stat.st_size | |
# Initialize content storage | |
content_parts = [] | |
# Process file in chunks for large files | |
chunk_size = 10 * 1024 * 1024 # 10MB chunks | |
with open(file.name, 'rb') as f: | |
while True: | |
chunk = f.read(chunk_size) | |
if not chunk: | |
break | |
# Detect encoding for each chunk | |
encoding = chardet.detect(chunk)['encoding'] or 'utf-8' | |
try: | |
decoded_chunk = chunk.decode(encoding, errors='replace') | |
content_parts.append(decoded_chunk) | |
except (UnicodeDecodeError, LookupError): | |
decoded_chunk = chunk.decode('utf-8', errors='replace') | |
content_parts.append(decoded_chunk) | |
# Combine all chunks | |
complete_content = ''.join(content_parts) | |
# Check if the content is valid JSON regardless of file extension | |
try: | |
if mimetypes.guess_type(file.name)[0] == 'application/json' or file.name.lower().endswith('.json'): | |
# It's a JSON file by type or extension | |
json_data = json.loads(complete_content) | |
return [{ | |
'source': 'json_file', | |
'filename': os.path.basename(file.name), | |
'file_size': file_size, | |
'mime_type': 'application/json', | |
'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(), | |
'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(), | |
'content': json_data, # Store the parsed JSON object | |
'raw_content': complete_content, # Store the original JSON string | |
'timestamp': datetime.now().isoformat() | |
}] | |
else: | |
# Try to parse as JSON anyway | |
try: | |
json_data = json.loads(complete_content) | |
# If we get here, it's valid JSON despite the extension | |
return [{ | |
'source': 'json_content', | |
'filename': os.path.basename(file.name), | |
'file_size': file_size, | |
'mime_type': 'application/json', | |
'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(), | |
'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(), | |
'content': json_data, # Store the parsed JSON object | |
'raw_content': complete_content, # Store the original JSON string | |
'timestamp': datetime.now().isoformat() | |
}] | |
except json.JSONDecodeError: | |
logger.warning(f"File {file.name} is not valid JSON.") | |
except Exception as e: | |
logger.error(f"Error during JSON processing: {e}") | |
return [{ | |
'source': 'file', | |
'filename': os.path.basename(file.name), | |
'file_size': file_size, | |
'mime_type': mimetypes.guess_type(file.name)[0], | |
'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(), | |
'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(), | |
'content': complete_content, | |
'timestamp': datetime.now().isoformat() | |
}] | |
except Exception as e: | |
logger.error(f"File processing error: {e}") | |
return [] | |
def _process_archive(self, archive_path: str, extract_to: Path) -> List[Dict]: | |
"""Process an archive file with enhanced extraction""" | |
dataset = [] | |
try: | |
# Handle ZIP archives | |
if zipfile.is_zipfile(archive_path): | |
with zipfile.ZipFile(archive_path, 'r') as zip_ref: | |
zip_ref.extractall(extract_to) | |
for file_info in zip_ref.infolist(): | |
if file_info.file_size > 0 and not file_info.filename.endswith('/'): | |
extracted_path = extract_to / file_info.filename | |
if extracted_path.suffix.lower() in self.supported_extensions: | |
with open(extracted_path, 'rb') as f: | |
dataset.extend(self._process_single_file(f)) | |
# Handle TAR archives | |
elif archive_path.lower().endswith(('.tar', '.tar.gz', '.tgz')): | |
try: | |
with tarfile.open(archive_path, 'r:*') as tar_ref: | |
for member in tar_ref.getmembers(): | |
if member.isfile(): | |
extracted_path = extract_to / member.name | |
tar_ref.extract(member, path=extract_to) | |
if extracted_path.suffix.lower() in self.supported_extensions: | |
with open(extracted_path, 'rb') as f: | |
dataset.extend(self._process_single_file(f)) | |
except tarfile.TarError as e: | |
logger.error(f"Error processing TAR archive: {e}") | |
# Handle GZIP archives (single file) | |
elif archive_path.lower().endswith('.gz'): | |
extracted_path = extract_to / Path(archive_path).stem | |
try: | |
with gzip.open(archive_path, 'rb') as gz_file, open(extracted_path, 'wb') as outfile: | |
outfile.write(gz_file.read()) | |
if extracted_path.suffix.lower() in self.supported_extensions: | |
with open(extracted_path, 'rb') as f: | |
dataset.extend(self._process_single_file(f)) | |
except gzip.GzipFile as e: | |
logger.error(f"Error processing GZIP archive: {e}") | |
# TODO: Add support for other archive types (.bz2, .7z, .rar) - may require external libraries | |
elif archive_path.lower().endswith(('.bz2', '.7z', '.rar')): | |
logger.warning(f"Support for {Path(archive_path).suffix} archives is not yet fully implemented.") | |
except Exception as e: | |
logger.error(f"Archive processing error: {e}") | |
return dataset | |
def chunk_data(self, data: Union[Dict, List], max_size: int = 2953) -> List[Dict]: | |
"""Enhanced data chunking with sequence metadata""" | |
try: | |
# Convert data to JSON string | |
json_str = json.dumps(data, ensure_ascii=False) | |
total_length = len(json_str) | |
# Calculate overhead for metadata | |
metadata_template = { | |
"chunk_index": 0, | |
"total_chunks": 1, | |
"total_length": total_length, | |
"chunk_hash": "", | |
"data": "" | |
} | |
overhead = len(json.dumps(metadata_template)) + 20 # Extra padding for safety | |
# Calculate effective chunk size | |
effective_chunk_size = max_size - overhead | |
if total_length <= effective_chunk_size: | |
# Data fits in one chunk | |
chunk = { | |
"chunk_index": 0, | |
"total_chunks": 1, | |
"total_length": total_length, | |
"chunk_hash": hash(json_str) & 0xFFFFFFFF, # 32-bit hash | |
"data": json_str | |
} | |
return [chunk] | |
# Calculate number of chunks needed | |
num_chunks = -(-total_length // effective_chunk_size) # Ceiling division | |
chunk_size = -(-total_length // num_chunks) # Even distribution | |
chunks = [] | |
for i in range(num_chunks): | |
start_idx = i * chunk_size | |
end_idx = min(start_idx + chunk_size, total_length) | |
chunk_data = json_str[start_idx:end_idx] | |
chunk = { | |
"chunk_index": i, | |
"total_chunks": num_chunks, | |
"total_length": total_length, | |
"chunk_hash": hash(chunk_data) & 0xFFFFFFFF, | |
"data": chunk_data | |
} | |
chunks.append(chunk) | |
return chunks | |
except Exception as e: | |
logger.error(f"Error chunking data: {e}") | |
return [] | |
def generate_stylish_qr(data: Union[str, Dict], | |
filename: str, | |
size: int = 10, | |
border: int = 4, | |
fill_color: str = "#000000", | |
back_color: str = "#FFFFFF") -> str: | |
"""Generate a stylish QR code with enhanced visual appeal""" | |
try: | |
qr = qrcode.QRCode( | |
version=None, | |
error_correction=qrcode.constants.ERROR_CORRECT_S, | |
box_size=size, | |
border=border | |
) | |
# Add data to QR code | |
if isinstance(data, dict): | |
qr.add_data(json.dumps(data, ensure_ascii=False)) | |
else: | |
qr.add_data(data) | |
qr.make(fit=True) | |
# Create QR code image with custom colors | |
qr_image = qr.make_image(fill_color=fill_color, back_color=back_color) | |
# Convert to RGBA for transparency support | |
qr_image = qr_image.convert('RGBA') | |
# Add subtle gradient overlay | |
gradient = Image.new('RGBA', qr_image.size, (0, 0, 0, 0)) | |
draw = ImageDraw.Draw(gradient) | |
for i in range(qr_image.width): | |
alpha = int(255 * (1 - i/qr_image.width) * 0.1) # 10% maximum opacity | |
draw.line([(i, 0), (i, qr_image.height)], fill=(255, 255, 255, alpha)) | |
# Combine images | |
final_image = Image.alpha_composite(qr_image, gradient) | |
# Save the image | |
output_path = QR_CODES_DIR / filename | |
final_image.save(output_path, quality=95) | |
return str(output_path) | |
except Exception as e: | |
logger.error(f"QR generation error: {e}") | |
return "" | |
def generate_qr_codes(data: Union[str, Dict, List], combined: bool = True) -> List[str]: | |
"""Generate QR codes with enhanced visual appeal and metadata""" | |
try: | |
file_processor = EnhancedFileProcessor() | |
paths = [] | |
if combined: | |
# Process combined data | |
chunks = file_processor.chunk_data(data) | |
for i, chunk in enumerate(chunks): | |
filename = f'combined_qr_{int(time.time())}_{i+1}_of_{len(chunks)}.png' | |
qr_path = generate_stylish_qr( | |
data=chunk, | |
filename=filename, | |
fill_color="#1a365d", # Deep blue | |
back_color="#ffffff" | |
) | |
if qr_path: | |
paths.append(qr_path) | |
else: | |
# Process individual items | |
if isinstance(data, list): | |
for idx, item in enumerate(data): | |
chunks = file_processor.chunk_data(item) | |
for chunk_idx, chunk in enumerate(chunks): | |
filename = f'item_{idx+1}_chunk_{chunk_idx+1}_of_{len(chunks)}_{int(time.time())}.png' | |
qr_path = generate_stylish_qr( | |
data=chunk, | |
filename=filename, | |
fill_color="#1a365d", # Deep blue | |
back_color="#ffffff" | |
) | |
if qr_path: | |
paths.append(qr_path) | |
else: | |
chunks = file_processor.chunk_data(data) | |
for i, chunk in enumerate(chunks): | |
filename = f'single_qr_{i+1}_of_{len(chunks)}_{int(time.time())}.png' | |
qr_path = generate_stylish_qr( | |
data=chunk, | |
filename=filename, | |
fill_color="#1a365d", # Deep blue | |
back_color="#ffffff" | |
) | |
if qr_path: | |
paths.append(qr_path) | |
return paths | |
except Exception as e: | |
logger.error(f"QR code generation error: {e}") | |
return [] | |
def create_qr_sequence_visualizer(output_gallery): | |
"""Add QR sequence visualization capabilities to the application""" | |
# Create a new tab for the QR code sequence visualization | |
with gr.Tab("π QR Sequence Visualizer"): | |
gr.Markdown(""" | |
## QR Code Sequence Visualizer | |
Arrange and visualize your QR code sequences. Enable or disable individual QR codes to see how they connect. | |
""") | |
# Inputs for the visualizer | |
with gr.Row(): | |
qr_input = gr.File( | |
label="Upload QR Codes", | |
file_types=["image/png", "image/jpeg"], | |
file_count="multiple" | |
) | |
with gr.Column(): | |
visualize_btn = gr.Button("π Generate Visualization", variant="primary") | |
reset_btn = gr.Button("ποΈ Reset", variant="secondary") | |
# Container for QR code toggles | |
qr_toggles_container = gr.HTML(label="QR Code Controls") | |
# Output visualization | |
with gr.Row(): | |
qr_visualization = gr.Image(label="QR Code Sequence Map", height=600) | |
qr_preview = gr.Gallery(label="Selected QR Codes", columns=2, height=600) | |
# Status output | |
visualization_status = gr.Textbox(label="Visualization Status", interactive=False) | |
# Function to process uploaded QR codes | |
def process_qr_codes(files): | |
if not files: | |
return "Please upload QR code images.", None, None, "β οΈ No QR codes uploaded" | |
try: | |
# Load QR codes and extract metadata | |
qr_data = [] | |
qr_paths = [] | |
for file in files: | |
try: | |
img = Image.open(file.name) | |
# Try to decode QR code | |
try: | |
detector = qrcode.QRCodeDetector() | |
data, bbox, _ = detector.detectAndDecode(np.array(img)) | |
if data: | |
try: | |
qr_json = json.loads(data) | |
qr_data.append(qr_json) | |
qr_paths.append(file.name) | |
except json.JSONDecodeError: | |
logger.warning(f"Could not decode JSON from QR: {data}") | |
qr_data.append({"data": data}) # Store raw data if JSON fails | |
qr_paths.append(file.name) | |
else: | |
qr_data.append({"data": "Empty QR"}) | |
qr_paths.append(file.name) | |
except Exception as e: | |
logger.warning(f"Could not decode QR: {e}") | |
# Add with default metadata | |
qr_data.append({ | |
"chunk_index": len(qr_data), | |
"total_chunks": len(files), | |
"data": "Unknown" | |
}) | |
qr_paths.append(file.name) | |
except Exception as e: | |
logger.error(f"Error processing QR image {file.name}: {e}") | |
if not qr_data: | |
return "No valid QR codes found.", None, None, "β Failed to process QR codes" | |
# Sort by chunk_index if available | |
try: | |
sorted_data = sorted(zip(qr_data, qr_paths), key=lambda x: x[0].get("chunk_index", 0)) | |
qr_data = [d[0] for d in sorted_data] | |
qr_paths = [d[1] for d in sorted_data] | |
except Exception as e: | |
logger.error(f"Error sorting QR data: {e}") | |
# Generate toggle controls HTML | |
toggle_html = '<div style="max-height: 500px; overflow-y: auto; padding: 10px;">' | |
toggle_html += '<h3>Enable/Disable QR Codes:</h3>' | |
for i, path in enumerate(qr_paths): | |
toggle_html += f'<div><input type="checkbox" id="qr_toggle_{i}" checked> <label for="qr_toggle_{i}">{os.path.basename(path)}</label></div>' | |
toggle_html += '</div>' | |
# Update the toggles container | |
qr_toggles_container.update(value=toggle_html) | |
# Create initial visualization (replace with actual visualization logic) | |
initial_visualization = "Visualization will appear here." # Replace with your composite image generation | |
qr_visualization.update(value=initial_visualization) | |
return "QR codes processed successfully.", qr_paths, qr_data, "β Visualization ready!" | |
except Exception as e: | |
logger.error(f"Error processing QR codes: {e}") | |
return "An error occurred while processing QR codes.", None, None, "β Error" | |
# Function to generate visualization (replace with actual logic) | |
def generate_visualization(qr_paths): | |
enabled_indices = [i for i in range(len(qr_paths))] # Start with all enabled | |
composite_image = "Updated visualization will appear here." # Replace with your composite image generation based on enabled_indices | |
qr_visualization.update(value=composite_image) | |
# Event handlers | |
visualize_btn.click(process_qr_codes, inputs=qr_input, outputs=[visualization_status, qr_visualization, qr_preview]) | |
reset_btn.click(lambda: (None, None, None, "β οΈ Visualization reset."), outputs=[visualization_status, qr_visualization, qr_preview]) | |
# Integrate the visualizer into the main application | |
def visualize_qr_codes(qr_paths): | |
"""Visualize the generated QR codes with enable/disable functionality""" | |
# This function currently receives the output gallery content (list of file paths) | |
# You might need to adapt this based on how you want to visualize. | |
# For now, let's just log the paths. | |
logger.info(f"Visualizing QR codes: {qr_paths}") | |
return "Visualization placeholder" # Replace with actual visualization logic | |
def create_modern_interface(): | |
"""Create a modern and visually appealing Gradio interface""" | |
# Modern CSS styling | |
css = """ | |
/* Modern color scheme */ | |
:root { | |
--primary-color: #1a365d; | |
--secondary-color: #2d3748; | |
--accent-color: #4299e1; | |
--background-color: #f7fafc; | |
--success-color: #48bb78; | |
--error-color: #f56565; | |
--warning-color: #ed8936; | |
} | |
/* Container styling */ | |
.container { | |
max-width: 1200px; | |
margin: auto; | |
padding: 2rem; | |
background-color: var(--background-color); | |
border-radius: 1rem; | |
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); | |
} | |
/* Component styling */ | |
.input-container { | |
background-color: white; | |
padding: 1.5rem; | |
border-radius: 0.5rem; | |
border: 1px solid #e2e8f0; | |
margin-bottom: 1rem; | |
} | |
/* Button styling */ | |
.primary-button { | |
background-color: var(--primary-color); | |
color: white; | |
padding: 0.75rem 1.5rem; | |
border-radius: 0.375rem; | |
border: none; | |
cursor: pointer; | |
transition: all 0.2s; | |
} | |
.primary-button:hover { | |
background-color: var(--accent-color); | |
transform: translateY(-1px); | |
} | |
/* Status messages */ | |
.status { | |
padding: 1rem; | |
border-radius: 0.375rem; | |
margin: 1rem 0; | |
} | |
.status.success { background-color: #f0fff4; color: var(--success-color); } | |
.status.error { background-color: #fff5f5; color: var(--error-color); } | |
.status.warning { background-color: #fffaf0; color: var(--warning-color); } | |
/* Gallery styling */ | |
.gallery { | |
display: grid; | |
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); | |
gap: 1rem; | |
padding: 1rem; | |
background-color: white; | |
border-radius: 0.5rem; | |
border: 1px solid #e2e8f0; | |
} | |
.gallery img { | |
width: 100%; | |
height: auto; | |
border-radius: 0.375rem; | |
transition: transform 0.2s; | |
} | |
.gallery img:hover { | |
transform: scale(1.05); | |
} | |
""" | |
# Create interface with modern design | |
with gr.Blocks(css=css, title="Advanced Data Processor & QR Generator") as interface: | |
gr.Markdown(""" | |
# π Advanced Data Processing & QR Code Generator | |
Transform your data into beautifully designed, sequenced QR codes with our cutting-edge processor. | |
""") | |
with gr.Tab("π URL Processing"): | |
url_input = gr.Textbox( | |
label="Enter URLs (comma or newline separated)", | |
lines=5, | |
placeholder="https://example1.com\nhttps://example2.com", | |
value="" | |
) | |
with gr.Tab("π File Input"): | |
file_input = gr.File( | |
label="Upload Files", | |
file_types=["*"], # Accept all file types | |
file_count="multiple" | |
) | |
with gr.Tab("π JSON Input"): | |
text_input = gr.TextArea( | |
label="Direct JSON Input", | |
lines=15, | |
placeholder="Paste your JSON data here...", | |
value="" | |
) | |
with gr.Row(): | |
example_btn = gr.Button("π Load Example", variant="secondary") | |
clear_btn = gr.Button("ποΈ Clear", variant="secondary") | |
with gr.Row(): | |
combine_data = gr.Checkbox( | |
label="Combine all data into sequence ", | |
value=True, | |
info="Generate sequential QR codes for combined data" | |
) | |
process_btn = gr.Button( | |
"π Process & Generate QR", | |
variant="primary" | |
) | |
# Output components | |
output_json = gr.JSON(label="Processed Data") | |
output_gallery = gr.Gallery( | |
label="Generated QR Codes", | |
columns=3, | |
height=400, | |
show_label=True | |
) | |
output_text = gr.Textbox( | |
label="Processing Status", | |
interactive=False | |
) | |
# Load example data | |
def load_example(): | |
example = { | |
"type": "product_catalog", | |
"items": [ | |
{ | |
"id": "123", | |
"name": "Premium Widget", | |
"description": "High-quality widget with advanced features", | |
"price": 299.99, | |
"category": "electronics", | |
"tags": ["premium", "featured", "new"] | |
}, | |
{ | |
"id": "456", | |
"name": "Basic Widget", | |
"description": "Reliable widget for everyday use", | |
"price": 149.99, | |
"category": "electronics", | |
"tags": ["basic", "popular"] | |
} | |
], | |
"metadata": { | |
"timestamp": datetime.now().isoformat(), | |
"version": "2.0", | |
"source": "example" | |
} | |
} | |
return json.dumps(example, indent=2) | |
def clear_input(): | |
return "" | |
def process_inputs(urls, files, text, combine): | |
"""Process all inputs and generate QR codes""" | |
try: | |
results = [] | |
url_processor = EnhancedURLProcessor() | |
file_processor = EnhancedFileProcessor() | |
# Process JSON input | |
if text and text.strip(): | |
try: | |
json_data = json.loads(text) | |
if isinstance(json_data, list): | |
results.extend(json_data) | |
else: | |
results.append(json_data) | |
except json.JSONDecodeError as e: | |
return None, [], f"β Invalid JSON format: {str(e)}" | |
# Process URLs | |
if urls and urls.strip(): | |
url_list = re.split(r'[,\n]', urls) | |
url_list = [url.strip() for url in url_list if url.strip()] | |
for url in url_list: | |
validation = url_processor.validate_url(url) | |
if validation['is_valid']: | |
content = url_processor.fetch_content(url) | |
if content: | |
results.append({ | |
'source': 'url', | |
'url': url, | |
'content': content, | |
'timestamp': datetime.now().isoformat() | |
}) | |
# Process files | |
if files: | |
for file in files: | |
file_results = file_processor.process_file(file) | |
if file_results: | |
results.extend(file_results) | |
# Generate QR codes | |
if results: | |
qr_paths = generate_qr_codes(results, combine) | |
if qr_paths: | |
return ( | |
results, | |
[str(path) for path in qr_paths], | |
f"β Successfully processed {len(results)} items and generated {len(qr_paths)} QR codes!" | |
) | |
else: | |
return None, [], "β Failed to generate QR codes" | |
else: | |
return None, [], "β οΈ No valid content to process" | |
except Exception as e: | |
logger.error(f"Processing error: {e}") | |
return None, [], f"β Error: {str(e)}" | |
# Set up event handlers | |
example_btn.click(load_example, outputs=[text_input]) | |
clear_btn.click(clear_input, outputs=[text_input]) | |
process_btn.click( | |
process_inputs, | |
inputs=[url_input, file_input, text_input, combine_data], | |
outputs=[output_json, output_gallery, output_text] | |
) | |
# Add the visualization button and its click event within the interface scope | |
visualize_btn = gr.Button("π Visualize QR Codes") | |
visualize_btn.click(visualize_qr_codes, inputs=output_gallery, outputs=None) | |
# Add helpful documentation | |
gr.Markdown(""" | |
### π Features | |
- **Complete URL Scraping**: Extracts every character from web pages | |
- **Advanced File Processing**: Full content extraction from various text-based files and common archives. Supports flexible JSON handling. | |
- **Smart JSON Handling**: Processes any size JSON with automatic chunking, either via direct input or file upload. | |
- **Sequential QR Codes**: Maintains data integrity across multiple codes | |
- **Modern Design**: Clean, responsive interface with visual feedback | |
### π‘ Tips | |
1. **URLs**: Enter multiple URLs separated by commas or newlines | |
2. **Files**: Upload any type of file. The processor will attempt to handle supported text-based files, archives (.zip, .tar, .gz), and JSON files. | |
3. **JSON**: Use the example button to see the expected format or upload a .json file. The system will also try to detect JSON content in other file types. | |
4. **QR Codes**: Choose whether to combine data into sequential codes | |
5. **Processing**: Monitor the status for real-time feedback | |
### π¨ Output | |
- Generated QR codes are saved in the `output/qr_codes` directory | |
- Each QR code contains metadata for proper sequencing | |
- Hover over QR codes in the gallery to see details | |
""") | |
return interface | |
def main(): | |
"""Initialize and launch the application""" | |
try: | |
# Configure system settings | |
mimetypes.init() | |
# Create and launch interface | |
interface = create_modern_interface() | |
# Add the QR sequence visualizer tab | |
with interface: | |
create_qr_sequence_visualizer(None) # output_gallery might not be relevant here | |
# Launch with configuration | |
interface.launch( | |
share=False, | |
debug=False, | |
show_error=True, | |
show_api=False | |
) | |
except Exception as e: | |
logger.error(f"Application startup error: {e}") | |
raise | |
if __name__ == "__main__": | |
main() |