import json import os import re import time import logging import mimetypes import zipfile import tempfile import chardet from datetime import datetime from typing import List, Dict, Optional, Union, Tuple from pathlib import Path from urllib.parse import urlparse, urljoin import requests import validators import gradio as gr from diskcache import Cache from bs4 import BeautifulSoup from fake_useragent import UserAgent from cleantext import clean import qrcode from PIL import Image, ImageDraw, ImageFont import numpy as np # Setup enhanced logging with more detailed formatting logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', handlers=[ logging.StreamHandler(), logging.FileHandler('app.log', encoding='utf-8') ] ) logger = logging.getLogger(__name__) # Ensure output directories exist with modern structure OUTPUTS_DIR = Path('output') QR_CODES_DIR = OUTPUTS_DIR / 'qr_codes' TEMP_DIR = OUTPUTS_DIR / 'temp' for directory in [OUTPUTS_DIR, QR_CODES_DIR, TEMP_DIR]: directory.mkdir(parents=True, exist_ok=True) class EnhancedURLProcessor: """Advanced URL processing with complete content extraction""" def __init__(self): self.session = requests.Session() self.timeout = 15 # Extended timeout for larger content self.max_retries = 3 self.user_agent = UserAgent() # Enhanced headers for better site compatibility self.session.headers.update({ 'User-Agent': self.user_agent.random, 'Accept': '*/*', # Accept all content types 'Accept-Language': 'en-US,en;q=0.9', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', 'Sec-Fetch-User': '?1', 'DNT': '1' }) def validate_url(self, url: str) -> Dict: """Enhanced URL validation with detailed feedback""" try: if not validators.url(url): return {'is_valid': False, 'message': 'Invalid URL format', 'details': 'URL must begin with http:// or https://'} parsed = urlparse(url) if not all([parsed.scheme, parsed.netloc]): return {'is_valid': False, 'message': 'Incomplete URL', 'details': 'Missing scheme or domain'} # Try HEAD request first to check accessibility try: head_response = self.session.head(url, timeout=5) head_response.raise_for_status() except requests.exceptions.RequestException: # If HEAD fails, try GET as some servers don't support HEAD response = self.session.get(url, timeout=self.timeout) response.raise_for_status() return { 'is_valid': True, 'message': 'URL is valid and accessible', 'details': { 'content_type': head_response.headers.get('Content-Type', 'unknown'), 'server': head_response.headers.get('Server', 'unknown'), 'size': head_response.headers.get('Content-Length', 'unknown') } } except Exception as e: return {'is_valid': False, 'message': f'URL validation failed: {str(e)}', 'details': str(e)} def fetch_content(self, url: str, retry_count: int = 0) -> Optional[Dict]: """Enhanced content fetcher with retry mechanism and complete character extraction""" try: logger.info(f"Fetching content from URL: {url} (Attempt {retry_count + 1}/{self.max_retries})") # Update User-Agent randomly for each request self.session.headers.update({'User-Agent': self.user_agent.random}) response = self.session.get(url, timeout=self.timeout) response.raise_for_status() # Detect encoding if response.encoding is None: encoding = chardet.detect(response.content)['encoding'] or 'utf-8' else: encoding = response.encoding # Decode content with fallback try: raw_content = response.content.decode(encoding, errors='replace') except (UnicodeDecodeError, LookupError): raw_content = response.content.decode('utf-8', errors='replace') # Extract metadata metadata = { 'url': url, 'timestamp': datetime.now().isoformat(), 'encoding': encoding, 'content_type': response.headers.get('Content-Type', ''), 'content_length': len(response.content), 'headers': dict(response.headers), 'status_code': response.status_code } # Process based on content type content_type = response.headers.get('Content-Type', '').lower() if 'text/html' in content_type: processed_content = self._process_html_content(raw_content, url) else: processed_content = raw_content return { 'content': processed_content, 'raw_content': raw_content, 'metadata': metadata } except requests.exceptions.RequestException as e: if retry_count < self.max_retries - 1: logger.warning(f"Retry {retry_count + 1}/{self.max_retries} for URL: {url}") time.sleep(2 ** retry_count) # Exponential backoff return self.fetch_content(url, retry_count + 1) logger.error(f"Failed to fetch content after {self.max_retries} attempts: {e}") return None except Exception as e: logger.error(f"Unexpected error while fetching content: {e}") return None def _process_html_content(self, content: str, base_url: str) -> str: """Process HTML content while preserving all characters""" try: soup = BeautifulSoup(content, 'html.parser') # Convert relative URLs to absolute for tag in soup.find_all(['a', 'img', 'link', 'script']): for attr in ['href', 'src']: if tag.get(attr): try: tag[attr] = urljoin(base_url, tag[attr]) except Exception: pass # Extract all text content text_parts = [] for element in soup.stripped_strings: text_parts.append(str(element)) return '\n'.join(text_parts) except Exception as e: logger.error(f"HTML processing error: {e}") return content class EnhancedFileProcessor: """Advanced file processing with complete content extraction""" def __init__(self, max_file_size: int = 5 * 1024 * 1024 * 1024 * 1024 *1024): self.max_file_size = max_file_size self.supported_extensions = { '.txt', '.md', '.csv', '.json', '.xml', '.html', '.html', '.log', '.yml', '.yaml', '.ini', '.conf', '.cfg', '.zip', '.tar', '.gz', '.bz2', '.7z', '.rar', '.pdf', '.doc', '.docx', '.rtf', '.odt' } def process_file(self, file) -> List[Dict]: """Process uploaded file with enhanced error handling and complete extraction""" if not file: return [] dataset = [] try: file_size = os.path.getsize(file.name) if file_size > self.max_file_size: logger.warning(f"File size ({{file_size}} bytes) exceeds maximum allowed size") return [] with tempfile.TemporaryDirectory() as temp_dir: temp_dir_path = Path(temp_dir) # Handle different archive types if self._is_archive(file.name): dataset.extend(self._process_archive(file.name, temp_dir_path)) else: dataset.extend(self._process_single_file(file)) except Exception as e: logger.error(f"Error processing file: {{str(e)}}") return [] return dataset def _is_archive(self, filepath: str) -> bool: """Check if file is an archive""" return any(filepath.lower().endswith(ext) for ext in [ '.zip', '.tar', '.gz', '.bz2', '.7z', '.rar' ]) def _process_single_file(self, file) -> List[Dict]: """Process a single file with enhanced character extraction""" try: file_stat = os.stat(file.name) file_size = file_stat.st_size # Initialize content storage content_parts = [] # Process file in chunks for large files chunk_size = 10 * 1024 * 1024 # 10MB chunks with open(file.name, 'rb') as f: while True: chunk = f.read(chunk_size) if not chunk: break # Detect encoding for each chunk encoding = chardet.detect(chunk)['encoding'] or 'utf-8' try: decoded_chunk = chunk.decode(encoding, errors='replace') content_parts.append(decoded_chunk) except (UnicodeDecodeError, LookupError): decoded_chunk = chunk.decode('utf-8', errors='replace') content_parts.append(decoded_chunk) # Combine all chunks complete_content = ''.join(content_parts) return [{ 'source': 'file', 'filename': os.path.basename(file.name), 'file_size': file_size, 'mime_type': mimetypes.guess_type(file.name)[0], 'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(), 'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(), 'content': complete_content, 'timestamp': datetime.now().isoformat() }] except Exception as e: logger.error(f"File processing error: {{e}}") return [] def _process_archive(self, archive_path: str, extract_to: Path) -> List[Dict]: """Process an archive file with enhanced extraction""" dataset = [] try: if zipfile.is_zipfile(archive_path): with zipfile.ZipFile(archive_path, 'r') as zip_ref: zip_ref.extractall(extract_to) for file_info in zip_ref.infolist(): if file_info.file_size > 0 and not file_info.filename.endswith('/'): extracted_path = extract_to / file_info.filename if extracted_path.suffix.lower() in self.supported_extensions: with open(extracted_path, 'rb') as f: dataset.extend(self._process_single_file(f)) elif tarfile.is_tarfile(archive_path): with tarfile.open(archive_path, 'r') as tar_ref: tar_ref.extractall(extract_to) for member in tar_ref.getmembers(): if member.isfile(): extracted_path = extract_to / member.name if extracted_path.suffix.lower() in self.supported_extensions: with open(extracted_path, 'rb') as f: dataset.extend(self._process_single_file(f)) except Exception as e: logger.error(f"Archive processing error: {e}") return dataset def chunk_data(self, data: Union[Dict, List], max_size: int = 2953) -> List[Dict]: try: # Convert data to JSON bytes json_str = json.dumps(data, ensure_ascii=False) json_bytes = json_str.encode('utf-8') total_length = len(json_bytes) # Calculate metadata overhead in bytes metadata_template = { "chunk_index": 0, "total_chunks": 1, "total_length": total_length, "chunk_hash": "", "data": "" } overhead_bytes = len(json.dumps(metadata_template).encode('utf-8')) + 20 # Add padding effective_chunk_size = max_size - overhead_bytes if effective_chunk_size <= 0: raise ValueError("Max size is too small after accounting for metadata overhead") chunks = [] start = 0 while start < total_length: end = start + effective_chunk_size # Ensure valid Unicode by decoding chunk_str = json_bytes[start:end].decode('utf-8', errors='replace') chunk = { "chunk_index": len(chunks), "total_chunks": -1, # To be set later "total_length": total_length, "chunk_hash": hash(chunk_str) & 0xFFFFFFFF, "data": chunk_str } chunks.append(chunk) start = end # Update total_chunks in each chunk for i, chunk in enumerate(chunks): chunk["total_chunks"] = len(chunks) return chunks except Exception as e: logger.error(f"Error chunking data: {{e}}") return [] # Calculate number of chunks needed num_chunks = -(-total_length // effective_chunk_size) # Ceiling division chunk_size = -(-total_length // num_chunks) # Even distribution chunks = [] for i in range(num_chunks): start_idx = i * chunk_size end_idx = min(start_idx + chunk_size, total_length) chunk_data = json_str[start_idx:end_idx] chunk = { "chunk_index": i, "total_chunks": num_chunks, "total_length": total_length, "chunk_hash": hash(chunk_data) & 0xFFFFFFFF, "data": chunk_data } chunks.append(chunk) return chunks except Exception as e: logger.error(f"Error chunking data: {{e}}") return [] def generate_stylish_qr(data: Union[str, Dict], filename: str, size: int = 10, border: int = 4, fill_color: str = "#000000", back_color: str = "#FFFFFF") -> str: """Generate a stylish QR code with enhanced visual appeal""" try: qr = qrcode.QRCode( version=None, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=size, border=border ) # Add data to QR code if isinstance(data, dict): qr.add_data(json.dumps(data, ensure_ascii=False)) else: qr.add_data(data) qr.make(fit=True) # Create QR code image with custom colors qr_image = qr.make_image(fill_color=fill_color, back_color=back_color) # Convert to RGBA for transparency support qr_image = qr_image.convert('RGBA') # Add subtle gradient overlay gradient = Image.new('RGBA', qr_image.size, (0, 0, 0, 0)) draw = ImageDraw.Draw(gradient) for i in range(qr_image.width): alpha = int(255 * (1 - i/qr_image.width) * 0.1) # 10% maximum opacity draw.line([(i, 0), (i, qr_image.height)], fill=(255, 255, 255, alpha)) # Combine images final_image = Image.alpha_composite(qr_image, gradient) # Save the image output_path = QR_CODES_DIR / filename final_image.save(output_path, quality=95) return str(output_path) except Exception as e: logger.error(f"QR generation error: {e}") return "" def generate_qr_codes(data: Union[str, Dict, List], combined: bool = True) -> List[str]: """Generate QR codes with enhanced visual appeal and metadata""" try: file_processor = EnhancedFileProcessor() paths = [] if combined: # Process combined data chunks = file_processor.chunk_data(data) for i, chunk in enumerate(chunks): filename = f'combined_qr_{int(time.time())}_{i+1}_of_{len(chunks)}.png' qr_path = generate_stylish_qr( data=chunk, filename=filename, fill_color="#1a365d", # Deep blue back_color="#ffffff" ) if qr_path: paths.append(qr_path) else: # Process individual items if isinstance(data, list): for idx, item in enumerate(data): chunks = file_processor.chunk_data(item) for chunk_idx, chunk in enumerate(chunks): filename = f'item_{idx+1}_chunk_{chunk_idx+1}_of_{len(chunks)}_{int(time.time())}.png' qr_path = generate_stylish_qr( data=chunk, filename=filename, fill_color="#1a365d", # Deep blue back_color="#ffffff" ) if qr_path: paths.append(qr_path) else: chunks = file_processor.chunk_data(data) for i, chunk in enumerate(chunks): filename = f'single_qr_{i+1}_of_{len(chunks)}_{int(time.time())}.png' qr_path = generate_stylish_qr( data=chunk, filename=filename, fill_color="#1a365d", # Deep blue back_color="#ffffff" ) if qr_path: paths.append(qr_path) return paths except Exception as e: logger.error(f"QR code generation error: {e}") return [] def create_modern_interface(): """Create a modern and visually appealing Gradio interface""" # Modern CSS styling css = """ /* Modern color scheme */ :root { --primary-color: #1a365d; --secondary-color: #2d3748; --accent-color: #4299e1; --background-color: #f7fafc; --success-color: #48bb78; --error-color: #f56565; --warning-color: #ed8936; } /* Container styling */ .container { max-width: 1200px; margin: auto; padding: 2rem; background-color: var(--background-color); border-radius: 1rem; box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); } /* Component styling */ .input-container { background-color: white; padding: 1.5rem; border-radius: 0.5rem; border: 1px solid #e2e8f0; margin-bottom: 1rem; } /* Button styling */ .primary-button { background-color: var(--primary-color); color: white; padding: 0.75rem 1.5rem; border-radius: 0.375rem; border: none; cursor: pointer; transition: all 0.2s; } .primary-button:hover { background-color: var(--accent-color); transform: translateY(-1px); } /* Status messages */ .status { padding: 1rem; border-radius: 0.375rem; margin: 1rem 0; } .status.success { background-color: #f0fff4; color: var(--success-color); } .status.error { background-color: #fff5f5; color: var(--error-color); } .status.warning { background-color: #fffaf0; color: var(--warning-color); } /* Gallery styling */ .gallery { display: grid; grid-template-columns: repeat(auto-fit, minmax(200px, 1fr)); gap: 1rem; padding: 1rem; background-color: white; border-radius: 0.5rem; border: 1px solid #e2e8f0; } .gallery img { width: 100%; height: auto; border-radius: 0.375rem; transition: transform 0.2s; } .gallery img:hover { transform: scale(1.05); } """ # Create interface with modern design with gr.Blocks(css=css, title="Advanced Data Processor & QR Generator") as interface: gr.Markdown(""" # 🌐 Advanced Data Processing & QR Code Generator Transform your data into beautifully designed, sequenced QR codes with our cutting-edge processor. """) with gr.Tab("📝 URL Processing"): url_input = gr.Textbox( label="Enter URLs (comma or newline separated)", lines=5, placeholder="https://example1.com\nhttps://example2.com", value="" ) with gr.Tab("📁 File Input"): file_input = gr.File( label="Upload Files", file_types=["*"], # Allow all file types file_count="multiple" ) with gr.Tab("📋 JSON Input"): text_input = gr.TextArea( label="Direct JSON Input", lines=15, placeholder="Paste your JSON data here...", value="" ) with gr.Row(): example_btn = gr.Button("📝 Load Example", variant="secondary") clear_btn = gr.Button("🗑️ Clear", variant="secondary") with gr.Row(): combine_data = gr.Checkbox( label="Combine all data into sequence", value=True, info="Generate sequential QR codes for combined data" ) process_btn = gr.Button( "🔄 Process & Generate QR", variant="primary" ) # Output components output_json = gr.JSON(label="Processed Data") output_gallery = gr.Gallery( label="Generated QR Codes", columns=3, height=400, show_label=True ) output_text = gr.Textbox( label="Processing Status", interactive=False ) # Load example data def load_example(): example = { "type": "product_catalog", "items": [ { "id": "123", "name": "Premium Widget", "description": "High-quality widget with advanced features", "price": 299.99, "category": "electronics", "tags": ["premium", "featured", "new"] }, { "id": "456", "name": "Basic Widget", "description": "Reliable widget for everyday use", "price": 149.99, "category": "electronics", "tags": ["basic", "popular"] } ], "metadata": { "timestamp": datetime.now().isoformat(), "version": "2.0", "source": "example" } } return json.dumps(example, indent=2) def clear_input(): return "" def process_inputs(urls, files, text, combine): """Process all inputs and generate QR codes""" try: results = [] url_processor = EnhancedURLProcessor() file_processor = EnhancedFileProcessor() # Process JSON input if text and text.strip(): try: json_data = json.loads(text) if isinstance(json_data, list): results.extend(json_data) else: results.append(json_data) except json.JSONDecodeError as e: return None, [], f"❌ Invalid JSON format: {str(e)}" # Process URLs if urls and urls.strip(): url_list = re.split(r'[,\n]', urls) url_list = [url.strip() for url in url_list if url.strip()] for url in url_list: validation = url_processor.validate_url(url) if validation['is_valid']: content = url_processor.fetch_content(url) if content: results.append({ 'source': 'url', 'url': url, 'content': content, 'timestamp': datetime.now().isoformat() }) # Process files if files: for file in files: file_results = file_processor.process_file(file) if file_results: results.extend(file_results) # Generate QR codes if results: qr_paths = generate_qr_codes(results, combine) if qr_paths: return ( results, [str(path) for path in qr_paths], f"✅ Successfully processed {len(results)} items and generated {len(qr_paths)} QR codes!" ) else: return None, [], "❌ Failed to generate QR codes" else: return None, [], "⚠️ No valid content to process" except Exception as e: logger.error(f"Processing error: {e}") return None, [], f"❌ Error: {str(e)}" # Set up event handlers example_btn.click(load_example, outputs=[text_input]) clear_btn.click(clear_input, outputs=[text_input]) process_btn.click( process_inputs, inputs=[url_input, file_input, text_input, combine_data], outputs=[output_json, output_gallery, output_text] ) # Add helpful documentation gr.Markdown(""" ### 🚀 Features - **Complete URL Scraping**: Extracts every character from web pages - **Advanced File Processing**: Full content extraction from text files and archives - **Smart JSON Handling**: Processes any size JSON with automatic chunking - **Sequential QR Codes**: Maintains data integrity across multiple codes - **Modern Design**: Clean, responsive interface with visual feedback ### 💡 Tips 1. **URLs**: Enter multiple URLs separated by commas or newlines 2. **Files**: Upload text files or ZIP archives containing text files 3. **JSON**: Use the example button to see the expected format 4. **QR Codes**: Choose whether to combine data into sequential codes 5. **Processing**: Monitor the status for real-time feedback ### 🎨 Output - Generated QR codes are saved in the `output/qr_codes` directory - Each QR code contains metadata for proper sequencing - Hover over QR codes in the gallery to see details """) return interface def main(): """Initialize and launch the application""" try: # Configure system settings mimetypes.init() # Create and launch interface interface = create_modern_interface() # Launch with configuration interface.launch( share=False, debug=False, show_error=True, show_api=False ) except Exception as e: logger.error(f"Application startup error: {e}") raise if __name__ == "__main__": main()