urld / app.py
acecalisto3's picture
Update app.py
26f8314 verified
raw
history blame
38.6 kB
import json
import os
import re
import time
import logging
import mimetypes
import zipfile
import tempfile
import chardet
from datetime import datetime
from typing import List, Dict, Optional, Union, Tuple
from pathlib import Path
from urllib.parse import urlparse, urljoin
import requests
import validators
import gradio as gr
from diskcache import Cache
from bs4 import BeautifulSoup
from fake_useragent import UserAgent
from cleantext import clean
import qrcode
from PIL import Image, ImageDraw, ImageFont
import numpy as np
import tarfile
import gzip
import networkx as nx
import matplotlib.pyplot as plt
from matplotlib.colors import to_rgba
import io
import math
# Setup enhanced logging with more detailed formatting
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('app.log', encoding='utf-8')
])
logger = logging.getLogger(__name__)
# Ensure output directories exist with modern structure
OUTPUTS_DIR = Path('output')
QR_CODES_DIR = OUTPUTS_DIR / 'qr_codes'
TEMP_DIR = OUTPUTS_DIR / 'temp'
for directory in [OUTPUTS_DIR, QR_CODES_DIR, TEMP_DIR]:
directory.mkdir(parents=True, exist_ok=True)
class EnhancedURLProcessor:
"""Advanced URL processing with complete content extraction"""
def __init__(self):
self.session = requests.Session()
self.timeout = 15 # Extended timeout for larger content
self.max_retries = 3
self.user_agent = UserAgent()
# Enhanced headers for better site compatibility
self.session.headers.update({
'User-Agent': self.user_agent.random,
'Accept': '*/*', # Accept all content types
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'DNT': '1'
})
def validate_url(self, url: str) -> Dict:
"""Enhanced URL validation with detailed feedback"""
try:
if not validators.url(url):
return {'is_valid': False, 'message': 'Invalid URL format', 'details': 'URL must begin with http:// or https://'}
parsed = urlparse(url)
if not all([parsed.scheme, parsed.netloc]):
return {'is_valid': False, 'message': 'Incomplete URL', 'details': 'Missing scheme or domain'}
# Try HEAD request first to check accessibility
try:
head_response = self.session.head(url, timeout=5)
head_response.raise_for_status()
except requests.exceptions.RequestException:
# If HEAD fails, try GET as some servers don't support HEAD
response = self.session.get(url, timeout=self.timeout)
response.raise_for_status()
return {
'is_valid': True,
'message': 'URL is valid and accessible',
'details': {
'content_type': head_response.headers.get('Content-Type', 'unknown'),
'server': head_response.headers.get('Server', 'unknown'),
'size': head_response.headers.get('Content-Length', 'unknown')
}
}
except Exception as e:
return {'is_valid': False, 'message': f'URL validation failed: {str(e)}', 'details': str(e)}
def fetch_content(self, url: str, retry_count: int = 0) -> Optional[Dict]:
"""Enhanced content fetcher with retry mechanism and complete character extraction"""
try:
logger.info(f"Fetching content from URL: {url} (Attempt {retry_count + 1}/{self.max_retries})")
# Update User-Agent randomly for each request
self.session.headers.update({'User-Agent': self.user_agent.random})
response = self.session.get(url, timeout=self.timeout)
response.raise_for_status()
# Detect encoding
if response.encoding is None:
encoding = chardet.detect(response.content)['encoding'] or 'utf-8'
else:
encoding = response.encoding
# Decode content with fallback
try:
raw_content = response.content.decode(encoding, errors='replace')
except (UnicodeDecodeError, LookupError):
raw_content = response.content.decode('utf-8', errors='replace')
# Extract metadata
metadata = {
'url': url,
'timestamp': datetime.now().isoformat(),
'encoding': encoding,
'content_type': response.headers.get('Content-Type', ''),
'content_length': len(response.content),
'headers': dict(response.headers),
'status_code': response.status_code
}
# Process based on content type
content_type = response.headers.get('Content-Type', '').lower()
if 'text/html' in content_type:
processed_content = self._process_html_content(raw_content, url)
else:
processed_content = raw_content
return {
'content': processed_content,
'raw_content': raw_content,
'metadata': metadata
}
except requests.exceptions.RequestException as e:
if retry_count < self.max_retries - 1:
logger.warning(f"Retry {retry_count + 1}/{self.max_retries} for URL: {url}")
time.sleep(2 ** retry_count) # Exponential backoff
return self.fetch_content(url, retry_count + 1)
logger.error(f"Failed to fetch content after {self.max_retries} attempts: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error while fetching content: {e}")
return None
def _process_html_content(self, content: str, base_url: str) -> str:
"""Process HTML content while preserving all characters"""
try:
soup = BeautifulSoup(content, 'html.parser')
# Convert relative URLs to absolute
for tag in soup.find_all(['a', 'img', 'link', 'script']):
for attr in ['href', 'src']:
if tag.get(attr):
try:
tag[attr] = urljoin(base_url, tag[attr])
except Exception:
pass
# Extract all text content
text_parts = []
for element in soup.stripped_strings:
text_parts.append(str(element))
return '\n'.join(text_parts)
except Exception as e:
logger.error(f"HTML processing error: {e}")
return content
class EnhancedFileProcessor:
"""Advanced file processing with complete content extraction"""
def __init__(self, max_file_size: int = 5 * 1024 * 1024 * 1024): # 5GB default
self.max_file_size = max_file_size
self.supported_extensions = {
'.txt', '.md', '.csv', '.json', '.xml', '.html', '.htm',
'.log', '.yml', '.yaml', '.ini', '.conf', '.cfg',
'.zip', '.tar', '.gz', '.bz2', '.7z', '.rar',
'.pdf', '.doc', '.docx', '.rtf', '.odt'
}
def process_file(self, file) -> List[Dict]:
"""Process uploaded file with enhanced error handling and complete extraction"""
if not file:
return []
dataset = []
try:
file_size = os.path.getsize(file.name)
if file_size > self.max_file_size:
logger.warning(f"File size ({file_size} bytes) exceeds maximum allowed size")
return []
with tempfile.TemporaryDirectory() as temp_dir:
temp_dir_path = Path(temp_dir)
# Handle different archive types
if self._is_archive(file.name):
dataset.extend(self._process_archive(file.name, temp_dir_path))
elif Path(file.name).suffix.lower() in self.supported_extensions:
dataset.extend(self._process_single_file(file))
else:
logger.warning(f"Unsupported file type: {file.name}")
except Exception as e:
logger.error(f"Error processing file: {str(e)}")
return []
return dataset
def _is_archive(self, filepath: str) -> bool:
"""Check if file is an archive"""
return any(filepath.lower().endswith(ext) for ext in [
'.zip', '.tar', '.gz', '.bz2', '.7z', '.rar'
])
def _process_single_file(self, file) -> List[Dict]:
"""Process a single file with enhanced character extraction and JSON handling"""
try:
file_stat = os.stat(file.name)
file_size = file_stat.st_size
# Initialize content storage
content_parts = []
# Process file in chunks for large files
chunk_size = 10 * 1024 * 1024 # 10MB chunks
with open(file.name, 'rb') as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
# Detect encoding for each chunk
encoding = chardet.detect(chunk)['encoding'] or 'utf-8'
try:
decoded_chunk = chunk.decode(encoding, errors='replace')
content_parts.append(decoded_chunk)
except (UnicodeDecodeError, LookupError):
decoded_chunk = chunk.decode('utf-8', errors='replace')
content_parts.append(decoded_chunk)
# Combine all chunks
complete_content = ''.join(content_parts)
# Check if the content is valid JSON regardless of file extension
try:
if mimetypes.guess_type(file.name)[0] == 'application/json' or file.name.lower().endswith('.json'):
# It's a JSON file by type or extension
json_data = json.loads(complete_content)
return [{
'source': 'json_file',
'filename': os.path.basename(file.name),
'file_size': file_size,
'mime_type': 'application/json',
'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(),
'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(),
'content': json_data, # Store the parsed JSON object
'raw_content': complete_content, # Store the original JSON string
'timestamp': datetime.now().isoformat()
}]
else:
# Try to parse as JSON anyway
try:
json_data = json.loads(complete_content)
# If we get here, it's valid JSON despite the extension
return [{
'source': 'json_content',
'filename': os.path.basename(file.name),
'file_size': file_size,
'mime_type': 'application/json',
'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(),
'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(),
'content': json_data, # Store the parsed JSON object
'raw_content': complete_content, # Store the original JSON string
'timestamp': datetime.now().isoformat()
}]
except json.JSONDecodeError:
logger.warning(f"File {file.name} is not valid JSON.")
except Exception as e:
logger.error(f"Error during JSON processing: {e}")
return [{
'source': 'file',
'filename': os.path.basename(file.name),
'file_size': file_size,
'mime_type': mimetypes.guess_type(file.name)[0],
'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(),
'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(),
'content': complete_content,
'timestamp': datetime.now().isoformat()
}]
except Exception as e:
logger.error(f"File processing error: {e}")
return []
def _process_archive(self, archive_path: str, extract_to: Path) -> List[Dict]:
"""Process an archive file with enhanced extraction"""
dataset = []
try:
# Handle ZIP archives
if zipfile.is_zipfile(archive_path):
with zipfile.ZipFile(archive_path, 'r') as zip_ref:
zip_ref.extractall(extract_to)
for file_info in zip_ref.infolist():
if file_info.file_size > 0 and not file_info.filename.endswith('/'):
extracted_path = extract_to / file_info.filename
if extracted_path.suffix.lower() in self.supported_extensions:
with open(extracted_path, 'rb') as f:
dataset.extend(self._process_single_file(f))
# Handle TAR archives
elif archive_path.lower().endswith(('.tar', '.tar.gz', '.tgz')):
try:
with tarfile.open(archive_path, 'r:*') as tar_ref:
for member in tar_ref.getmembers():
if member.isfile():
extracted_path = extract_to / member.name
tar_ref.extract(member, path=extract_to)
if extracted_path.suffix.lower() in self.supported_extensions:
with open(extracted_path, 'rb') as f:
dataset.extend(self._process_single_file(f))
except tarfile.TarError as e:
logger.error(f"Error processing TAR archive: {e}")
# Handle GZIP archives (single file)
elif archive_path.lower().endswith('.gz'):
extracted_path = extract_to / Path(archive_path).stem
try:
with gzip.open(archive_path, 'rb') as gz_file, open(extracted_path, 'wb') as outfile:
outfile.write(gz_file.read())
if extracted_path.suffix.lower() in self.supported_extensions:
with open(extracted_path, 'rb') as f:
dataset.extend(self._process_single_file(f))
except gzip.GzipFile as e:
logger.error(f"Error processing GZIP archive: {e}")
# TODO: Add support for other archive types (.bz2, .7z, .rar) - may require external libraries
elif archive_path.lower().endswith(('.bz2', '.7z', '.rar')):
logger.warning(f"Support for {Path(archive_path).suffix} archives is not yet fully implemented.")
except Exception as e:
logger.error(f"Archive processing error: {e}")
return dataset
def chunk_data(self, data: Union[Dict, List], max_size: int = 2953) -> List[Dict]:
"""Enhanced data chunking with sequence metadata"""
try:
# Convert data to JSON string
json_str = json.dumps(data, ensure_ascii=False)
total_length = len(json_str)
# Calculate overhead for metadata
metadata_template = {
"chunk_index": 0,
"total_chunks": 1,
"total_length": total_length,
"chunk_hash": "",
"data": ""
}
overhead = len(json.dumps(metadata_template)) + 20 # Extra padding for safety
# Calculate effective chunk size
effective_chunk_size = max_size - overhead
if total_length <= effective_chunk_size:
# Data fits in one chunk
chunk = {
"chunk_index": 0,
"total_chunks": 1,
"total_length": total_length,
"chunk_hash": hash(json_str) & 0xFFFFFFFF, # 32-bit hash
"data": json_str
}
return [chunk]
# Calculate number of chunks needed
num_chunks = -(-total_length // effective_chunk_size) # Ceiling division
chunk_size = -(-total_length // num_chunks) # Even distribution
chunks = []
for i in range(num_chunks):
start_idx = i * chunk_size
end_idx = min(start_idx + chunk_size, total_length)
chunk_data = json_str[start_idx:end_idx]
chunk = {
"chunk_index": i,
"total_chunks": num_chunks,
"total_length": total_length,
"chunk_hash": hash(chunk_data) & 0xFFFFFFFF,
"data": chunk_data
}
chunks.append(chunk)
return chunks
except Exception as e:
logger.error(f"Error chunking data: {e}")
return []
def generate_stylish_qr(data: Union[str, Dict],
filename: str,
size: int = 10,
border: int = 4,
fill_color: str = "#000000",
back_color: str = "#FFFFFF") -> str:
"""Generate a stylish QR code with enhanced visual appeal"""
try:
qr = qrcode.QRCode(
version=None,
error_correction=qrcode.constants.ERROR_CORRECT_S,
box_size=size,
border=border
)
# Add data to QR code
if isinstance(data, dict):
qr.add_data(json.dumps(data, ensure_ascii=False))
else:
qr.add_data(data)
qr.make(fit=True)
# Create QR code image with custom colors
qr_image = qr.make_image(fill_color=fill_color, back_color=back_color)
# Convert to RGBA for transparency support
qr_image = qr_image.convert('RGBA')
# Add subtle gradient overlay
gradient = Image.new('RGBA', qr_image.size, (0, 0, 0, 0))
draw = ImageDraw.Draw(gradient)
for i in range(qr_image.width):
alpha = int(255 * (1 - i/qr_image.width) * 0.1) # 10% maximum opacity
draw.line([(i, 0), (i, qr_image.height)], fill=(255, 255, 255, alpha))
# Combine images
final_image = Image.alpha_composite(qr_image, gradient)
# Save the image
output_path = QR_CODES_DIR / filename
final_image.save(output_path, quality=95)
return str(output_path)
except Exception as e:
logger.error(f"QR generation error: {e}")
return ""
def generate_qr_codes(data: Union[str, Dict, List], combined: bool = True) -> List[str]:
"""Generate QR codes with enhanced visual appeal and metadata"""
try:
file_processor = EnhancedFileProcessor()
paths = []
if combined:
# Process combined data
chunks = file_processor.chunk_data(data)
for i, chunk in enumerate(chunks):
filename = f'combined_qr_{int(time.time())}_{i+1}_of_{len(chunks)}.png'
qr_path = generate_stylish_qr(
data=chunk,
filename=filename,
fill_color="#1a365d", # Deep blue
back_color="#ffffff"
)
if qr_path:
paths.append(qr_path)
else:
# Process individual items
if isinstance(data, list):
for idx, item in enumerate(data):
chunks = file_processor.chunk_data(item)
for chunk_idx, chunk in enumerate(chunks):
filename = f'item_{idx+1}_chunk_{chunk_idx+1}_of_{len(chunks)}_{int(time.time())}.png'
qr_path = generate_stylish_qr(
data=chunk,
filename=filename,
fill_color="#1a365d", # Deep blue
back_color="#ffffff"
)
if qr_path:
paths.append(qr_path)
else:
chunks = file_processor.chunk_data(data)
for i, chunk in enumerate(chunks):
filename = f'single_qr_{i+1}_of_{len(chunks)}_{int(time.time())}.png'
qr_path = generate_stylish_qr(
data=chunk,
filename=filename,
fill_color="#1a365d", # Deep blue
back_color="#ffffff"
)
if qr_path:
paths.append(qr_path)
return paths
except Exception as e:
logger.error(f"QR code generation error: {e}")
return []
def create_qr_sequence_visualizer(output_gallery):
"""Add QR sequence visualization capabilities to the application"""
# Create a new tab for the QR code sequence visualization
with gr.Tab("πŸ”„ QR Sequence Visualizer"):
gr.Markdown("""
## QR Code Sequence Visualizer
Arrange and visualize your QR code sequences. Enable or disable individual QR codes to see how they connect.
""")
# Inputs for the visualizer
with gr.Row():
qr_input = gr.File(
label="Upload QR Codes",
file_types=["image/png", "image/jpeg"],
file_count="multiple"
)
with gr.Column():
visualize_btn = gr.Button("πŸ”„ Generate Visualization", variant="primary")
reset_btn = gr.Button("πŸ—‘οΈ Reset", variant="secondary")
# Container for QR code toggles
qr_toggles_container = gr.HTML(label="QR Code Controls")
# Output visualization
with gr.Row():
qr_visualization = gr.Image(label="QR Code Sequence Map", height=600)
qr_preview = gr.Gallery(label="Selected QR Codes", columns=2, height=600)
# Status output
visualization_status = gr.Textbox(label="Visualization Status", interactive=False)
# Function to process uploaded QR codes
def process_qr_codes(files):
if not files:
return "Please upload QR code images.", None, None, "⚠️ No QR codes uploaded"
try:
# Load QR codes and extract metadata
qr_data = []
qr_paths = []
for file in files:
try:
img = Image.open(file.name)
# Try to decode QR code
try:
detector = qrcode.QRCodeDetector()
data, bbox, _ = detector.detectAndDecode(np.array(img))
if data:
try:
qr_json = json.loads(data)
qr_data.append(qr_json)
qr_paths.append(file.name)
except json.JSONDecodeError:
logger.warning(f"Could not decode JSON from QR: {data}")
qr_data.append({"data": data}) # Store raw data if JSON fails
qr_paths.append(file.name)
else:
qr_data.append({"data": "Empty QR"})
qr_paths.append(file.name)
except Exception as e:
logger.warning(f"Could not decode QR: {e}")
# Add with default metadata
qr_data.append({
"chunk_index": len(qr_data),
"total_chunks": len(files),
"data": "Unknown"
})
qr_paths.append(file.name)
except Exception as e:
logger.error(f"Error processing QR image {file.name}: {e}")
if not qr_data:
return "No valid QR codes found.", None, None, "❌ Failed to process QR codes"
# Sort by chunk_index if available
try:
sorted_data = sorted(zip(qr_data, qr_paths), key=lambda x: x[0].get("chunk_index", 0))
qr_data = [d[0] for d in sorted_data]
qr_paths = [d[1] for d in sorted_data]
except Exception as e:
logger.error(f"Error sorting QR data: {e}")
# Generate toggle controls HTML
toggle_html = '<div style="max-height: 500px; overflow-y: auto; padding: 10px;">'
toggle_html += '<h3>Enable/Disable QR Codes:</h3>'
for i, path in enumerate(qr_paths):
toggle_html += f'<div><input type="checkbox" id="qr_toggle_{i}" checked> <label for="qr_toggle_{i}">{os.path.basename(path)}</label></div>'
toggle_html += '</div>'
# Update the toggles container
qr_toggles_container.update(value=toggle_html)
# Create initial visualization (replace with actual visualization logic)
initial_visualization = "Visualization will appear here." # Replace with your composite image generation
qr_visualization.update(value=initial_visualization)
return "QR codes processed successfully.", qr_paths, qr_data, "βœ… Visualization ready!"
except Exception as e:
logger.error(f"Error processing QR codes: {e}")
return "An error occurred while processing QR codes.", None, None, "❌ Error"
# Function to generate visualization (replace with actual logic)
def generate_visualization(qr_paths):
enabled_indices = [i for i in range(len(qr_paths))] # Start with all enabled
composite_image = "Updated visualization will appear here." # Replace with your composite image generation based on enabled_indices
qr_visualization.update(value=composite_image)
# Event handlers
visualize_btn.click(process_qr_codes, inputs=qr_input, outputs=[visualization_status, qr_visualization, qr_preview])
reset_btn.click(lambda: (None, None, None, "⚠️ Visualization reset."), outputs=[visualization_status, qr_visualization, qr_preview])
# Integrate the visualizer into the main application
def visualize_qr_codes(qr_paths):
"""Visualize the generated QR codes with enable/disable functionality"""
# This function currently receives the output gallery content (list of file paths)
# You might need to adapt this based on how you want to visualize.
# For now, let's just log the paths.
logger.info(f"Visualizing QR codes: {qr_paths}")
return "Visualization placeholder" # Replace with actual visualization logic
def create_modern_interface():
"""Create a modern and visually appealing Gradio interface"""
# Modern CSS styling
css = """
/* Modern color scheme */
:root {
--primary-color: #1a365d;
--secondary-color: #2d3748;
--accent-color: #4299e1;
--background-color: #f7fafc;
--success-color: #48bb78;
--error-color: #f56565;
--warning-color: #ed8936;
}
/* Container styling */
.container {
max-width: 1200px;
margin: auto;
padding: 2rem;
background-color: var(--background-color);
border-radius: 1rem;
box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1);
}
/* Component styling */
.input-container {
background-color: white;
padding: 1.5rem;
border-radius: 0.5rem;
border: 1px solid #e2e8f0;
margin-bottom: 1rem;
}
/* Button styling */
.primary-button {
background-color: var(--primary-color);
color: white;
padding: 0.75rem 1.5rem;
border-radius: 0.375rem;
border: none;
cursor: pointer;
transition: all 0.2s;
}
.primary-button:hover {
background-color: var(--accent-color);
transform: translateY(-1px);
}
/* Status messages */
.status {
padding: 1rem;
border-radius: 0.375rem;
margin: 1rem 0;
}
.status.success { background-color: #f0fff4; color: var(--success-color); }
.status.error { background-color: #fff5f5; color: var(--error-color); }
.status.warning { background-color: #fffaf0; color: var(--warning-color); }
/* Gallery styling */
.gallery {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(200px, 1fr));
gap: 1rem;
padding: 1rem;
background-color: white;
border-radius: 0.5rem;
border: 1px solid #e2e8f0;
}
.gallery img {
width: 100%;
height: auto;
border-radius: 0.375rem;
transition: transform 0.2s;
}
.gallery img:hover {
transform: scale(1.05);
}
"""
# Create interface with modern design
with gr.Blocks(css=css, title="Advanced Data Processor & QR Generator") as interface:
gr.Markdown("""
# 🌐 Advanced Data Processing & QR Code Generator
Transform your data into beautifully designed, sequenced QR codes with our cutting-edge processor.
""")
with gr.Tab("πŸ“ URL Processing"):
url_input = gr.Textbox(
label="Enter URLs (comma or newline separated)",
lines=5,
placeholder="https://example1.com\nhttps://example2.com",
value=""
)
with gr.Tab("πŸ“ File Input"):
file_input = gr.File(
label="Upload Files",
file_types=["*"], # Accept all file types
file_count="multiple"
)
with gr.Tab("πŸ“‹ JSON Input"):
text_input = gr.TextArea(
label="Direct JSON Input",
lines=15,
placeholder="Paste your JSON data here...",
value=""
)
with gr.Row():
example_btn = gr.Button("πŸ“ Load Example", variant="secondary")
clear_btn = gr.Button("πŸ—‘οΈ Clear", variant="secondary")
with gr.Row():
combine_data = gr.Checkbox(
label="Combine all data into sequence ",
value=True,
info="Generate sequential QR codes for combined data"
)
process_btn = gr.Button(
"πŸ”„ Process & Generate QR",
variant="primary"
)
# Output components
output_json = gr.JSON(label="Processed Data")
output_gallery = gr.Gallery(
label="Generated QR Codes",
columns=3,
height=400,
show_label=True
)
output_text = gr.Textbox(
label="Processing Status",
interactive=False
)
# Load example data
def load_example():
example = {
"type": "product_catalog",
"items": [
{
"id": "123",
"name": "Premium Widget",
"description": "High-quality widget with advanced features",
"price": 299.99,
"category": "electronics",
"tags": ["premium", "featured", "new"]
},
{
"id": "456",
"name": "Basic Widget",
"description": "Reliable widget for everyday use",
"price": 149.99,
"category": "electronics",
"tags": ["basic", "popular"]
}
],
"metadata": {
"timestamp": datetime.now().isoformat(),
"version": "2.0",
"source": "example"
}
}
return json.dumps(example, indent=2)
def clear_input():
return ""
def process_inputs(urls, files, text, combine):
"""Process all inputs and generate QR codes"""
try:
results = []
url_processor = EnhancedURLProcessor()
file_processor = EnhancedFileProcessor()
# Process JSON input
if text and text.strip():
try:
json_data = json.loads(text)
if isinstance(json_data, list):
results.extend(json_data)
else:
results.append(json_data)
except json.JSONDecodeError as e:
return None, [], f"❌ Invalid JSON format: {str(e)}"
# Process URLs
if urls and urls.strip():
url_list = re.split(r'[,\n]', urls)
url_list = [url.strip() for url in url_list if url.strip()]
for url in url_list:
validation = url_processor.validate_url(url)
if validation['is_valid']:
content = url_processor.fetch_content(url)
if content:
results.append({
'source': 'url',
'url': url,
'content': content,
'timestamp': datetime.now().isoformat()
})
# Process files
if files:
for file in files:
file_results = file_processor.process_file(file)
if file_results:
results.extend(file_results)
# Generate QR codes
if results:
qr_paths = generate_qr_codes(results, combine)
if qr_paths:
return (
results,
[str(path) for path in qr_paths],
f"βœ… Successfully processed {len(results)} items and generated {len(qr_paths)} QR codes!"
)
else:
return None, [], "❌ Failed to generate QR codes"
else:
return None, [], "⚠️ No valid content to process"
except Exception as e:
logger.error(f"Processing error: {e}")
return None, [], f"❌ Error: {str(e)}"
# Set up event handlers
example_btn.click(load_example, outputs=[text_input])
clear_btn.click(clear_input, outputs=[text_input])
process_btn.click(
process_inputs,
inputs=[url_input, file_input, text_input, combine_data],
outputs=[output_json, output_gallery, output_text]
)
# Add the visualization button and its click event within the interface scope
visualize_btn = gr.Button("πŸ” Visualize QR Codes")
visualize_btn.click(visualize_qr_codes, inputs=output_gallery, outputs=None)
# Add helpful documentation
gr.Markdown("""
### πŸš€ Features
- **Complete URL Scraping**: Extracts every character from web pages
- **Advanced File Processing**: Full content extraction from various text-based files and common archives. Supports flexible JSON handling.
- **Smart JSON Handling**: Processes any size JSON with automatic chunking, either via direct input or file upload.
- **Sequential QR Codes**: Maintains data integrity across multiple codes
- **Modern Design**: Clean, responsive interface with visual feedback
### πŸ’‘ Tips
1. **URLs**: Enter multiple URLs separated by commas or newlines
2. **Files**: Upload any type of file. The processor will attempt to handle supported text-based files, archives (.zip, .tar, .gz), and JSON files.
3. **JSON**: Use the example button to see the expected format or upload a .json file. The system will also try to detect JSON content in other file types.
4. **QR Codes**: Choose whether to combine data into sequential codes
5. **Processing**: Monitor the status for real-time feedback
### 🎨 Output
- Generated QR codes are saved in the `output/qr_codes` directory
- Each QR code contains metadata for proper sequencing
- Hover over QR codes in the gallery to see details
""")
return interface
def main():
"""Initialize and launch the application"""
try:
# Configure system settings
mimetypes.init()
# Create and launch interface
interface = create_modern_interface()
# Add the QR sequence visualizer tab
with interface:
create_qr_sequence_visualizer(None) # output_gallery might not be relevant here
# Launch with configuration
interface.launch(
share=False,
debug=False,
show_error=True,
show_api=False
)
except Exception as e:
logger.error(f"Application startup error: {e}")
raise
if __name__ == "__main__":
main()