Spaces:
Running
Running
import json | |
import os | |
import re | |
import logging | |
import mimetypes | |
import time | |
from PIL import Image | |
import zxing | |
import io | |
import zipfile | |
import tempfile | |
from datetime import datetime | |
from typing import List, Dict, Optional, Union, Any | |
from pathlib import Path | |
import requests | |
import validators | |
import gradio as gr | |
from bs4 import BeautifulSoup | |
from fake_useragent import UserAgent | |
from cleantext import clean | |
import qrcode | |
import cv2 # Add this import for the decode_qr_code function | |
# Setup logging | |
import sys | |
import argparse | |
import base64 | |
import io | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler('app.log', encoding='utf-8') | |
] | |
) | |
logger = logging.getLogger(__name__) | |
# Ensure output directories exist | |
Path('output/qr_codes').mkdir(parents=True, exist_ok=True) | |
class URLProcessor: | |
def __init__(self): | |
self.session = requests.Session() | |
self.timeout = 10 | |
self.max_retries = 3 | |
self.request_delay = 1.0 | |
self.respect_robots = True | |
self.use_proxy = False | |
self.proxy_url = None | |
self.rate_limits = {} # Track rate limits per domain | |
self.selenium_driver = None | |
# Update session headers with rotating user agents | |
self.update_user_agent() | |
if self.use_proxy and self.proxy_url: | |
self.session.proxies = { | |
'http': self.proxy_url, | |
'https': self.proxy_url | |
} | |
def update_user_agent(self): | |
"""Rotate user agents to avoid detection""" | |
try: | |
self.session.headers.update({ | |
'User-Agent': UserAgent().random, | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'Accept-Encoding': 'gzip, deflate, br', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1', | |
'Cache-Control': 'max-age=0' | |
}) | |
except Exception as e: | |
logger.warning(f"Failed to update user agent: {e}") | |
# Fallback to a common user agent | |
self.session.headers.update({ | |
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' | |
}) | |
def get_selenium_driver(self): | |
"""Initialize Selenium WebDriver for interactive sites""" | |
if self.selenium_driver is not None: | |
return self.selenium_driver | |
try: | |
from selenium import webdriver | |
from selenium.webdriver.chrome.service import Service | |
from selenium.webdriver.chrome.options import Options | |
from webdriver_manager.chrome import ChromeDriverManager | |
options = Options() | |
options.add_argument("--headless") | |
options.add_argument("--no-sandbox") | |
options.add_argument("--disable-dev-shm-usage") | |
options.add_argument(f"user-agent={self.session.headers['User-Agent']}") | |
options.add_argument("--disable-notifications") | |
options.add_argument("--disable-popup-blocking") | |
options.add_argument("--disable-extensions") | |
service = Service(ChromeDriverManager().install()) | |
self.selenium_driver = webdriver.Chrome(service=service, options=options) | |
return self.selenium_driver | |
except Exception as e: | |
logger.error(f"Failed to initialize Selenium: {e}") | |
return None | |
def handle_rate_limits(self, domain): | |
"""Smart rate limiting based on domain""" | |
from urllib.parse import urlparse | |
import time | |
# Extract domain from URL | |
parsed_domain = urlparse(domain).netloc | |
# Check if we've accessed this domain recently | |
current_time = time.time() | |
if parsed_domain in self.rate_limits: | |
last_access, count = self.rate_limits[parsed_domain] | |
# Different delay strategies for different domains | |
if "facebook" in parsed_domain or "instagram" in parsed_domain: | |
min_delay = 5.0 # Longer delay for social media sites | |
elif "gov" in parsed_domain: | |
min_delay = 2.0 # Be respectful with government sites | |
else: | |
min_delay = self.request_delay | |
# Exponential backoff if we're making many requests | |
if count > 10: | |
min_delay *= 2 | |
# Wait if needed | |
elapsed = current_time - last_access | |
if elapsed < min_delay: | |
time.sleep(min_delay - elapsed) | |
# Update count | |
self.rate_limits[parsed_domain] = (time.time(), count + 1) | |
else: | |
# First time accessing this domain | |
self.rate_limits[parsed_domain] = (current_time, 1) | |
def handle_interactive_site(self, url): | |
"""Handle sites that require interaction to bypass blocks""" | |
driver = self.get_selenium_driver() | |
if not driver: | |
return None | |
try: | |
driver.get(url) | |
# Wait for page to load | |
import time | |
time.sleep(3) | |
# Handle different types of sites | |
if "facebook.com" in url or "instagram.com" in url: | |
self._handle_social_media_site(driver) | |
elif "google.com" in url: | |
self._handle_google_site(driver) | |
# Get the page source after interaction | |
page_source = driver.page_source | |
return { | |
'content': page_source, | |
'content_type': 'text/html', | |
'url': url, | |
'title': driver.title | |
} | |
except Exception as e: | |
logger.error(f"Error handling interactive site {url}: {e}") | |
return None | |
def _handle_social_media_site(self, driver): | |
"""Handle Facebook/Instagram login walls""" | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.common.keys import Keys | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
try: | |
# Try to find and close login popups | |
close_buttons = driver.find_elements(By.XPATH, "//button[contains(@aria-label, 'Close')]") | |
if close_buttons: | |
close_buttons[0].click() | |
time.sleep(1) | |
# Press ESC key to dismiss popups | |
webdriver.ActionChains(driver).send_keys(Keys.ESCAPE).perform() | |
time.sleep(1) | |
# Scroll down to load more content | |
driver.execute_script("window.scrollTo(0, document.body.scrollHeight/2);") | |
time.sleep(2) | |
except Exception as e: | |
logger.warning(f"Error handling social media site: {e}") | |
def _handle_google_site(self, driver): | |
"""Handle Google authentication and consent pages""" | |
from selenium.webdriver.common.by import By | |
try: | |
# Look for consent buttons | |
consent_buttons = driver.find_elements(By.XPATH, "//button[contains(text(), 'Accept all')]") | |
if consent_buttons: | |
consent_buttons[0].click() | |
time.sleep(1) | |
# Look for "I agree" buttons | |
agree_buttons = driver.find_elements(By.XPATH, "//button[contains(text(), 'I agree')]") | |
if agree_buttons: | |
agree_buttons[0].click() | |
time.sleep(1) | |
except Exception as e: | |
logger.warning(f"Error handling Google site: {e}") | |
def fetch_content(self, url: str) -> Optional[Dict]: | |
"""Fetch content with smart handling for different sites""" | |
# Check if URL is allowed by robots.txt | |
if self.respect_robots and not self.check_robots_txt(url): | |
logger.warning(f"URL {url} is disallowed by robots.txt") | |
return None | |
# Apply rate limiting | |
self.handle_rate_limits(url) | |
# Rotate user agent occasionally | |
if random.random() < 0.3: # 30% chance to rotate | |
self.update_user_agent() | |
# Determine if site needs special handling | |
needs_selenium = any(domain in url.lower() for domain in [ | |
'facebook.com', 'instagram.com', 'linkedin.com', | |
'google.com/search', 'twitter.com', 'x.com' | |
]) | |
for attempt in range(self.max_retries): | |
try: | |
if needs_selenium: | |
return self.handle_interactive_site(url) | |
# Try with cloudscraper first for sites with anti-bot measures | |
if any(domain in url.lower() for domain in ['cloudflare', '.gov']): | |
import cloudscraper | |
scraper = cloudscraper.create_scraper( | |
browser={'browser': 'chrome', 'platform': 'darwin', 'mobile': False} | |
) | |
response = scraper.get(url, timeout=self.timeout) | |
else: | |
# Standard request for most sites | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
return { | |
'content': response.text, | |
'content_type': response.headers.get('Content-Type', ''), | |
'url': url, | |
'status_code': response.status_code | |
} | |
except Exception as e: | |
logger.warning(f"Attempt {attempt + 1} failed for {url}: {e}") | |
if attempt < self.max_retries - 1: | |
# Exponential backoff | |
time.sleep(self.request_delay * (2 ** attempt)) | |
logger.error(f"All attempts failed for {url}") | |
return None | |
def check_robots_txt(self, url: str) -> bool: | |
"""Check if URL is allowed by robots.txt""" | |
if not self.respect_robots: | |
return True | |
try: | |
from urllib.parse import urlparse | |
from urllib.robotparser import RobotFileParser | |
parsed_url = urlparse(url) | |
robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt" | |
rp = RobotFileParser() | |
rp.set_url(robots_url) | |
rp.read() | |
return rp.can_fetch(self.session.headers['User-Agent'], url) | |
except Exception as e: | |
logger.warning(f"Error checking robots.txt: {e}") | |
return True | |
def fetch_content(self, url: str) -> Optional[Dict]: | |
"""Fetch content with built-in rate limiting and robots.txt checking""" | |
if not self.check_robots_txt(url): | |
logger.warning(f"URL {url} is disallowed by robots.txt") | |
return None | |
time.sleep(self.request_delay) # Basic rate limiting | |
for attempt in range(self.max_retries): | |
try: | |
if 'drive.google.com' in url: | |
return self._handle_google_drive(url) | |
if 'calendar.google.com' in url: | |
return self._handle_google_calendar(url) | |
return self._fetch_html_content(url) | |
except Exception as e: | |
logger.error(f"Attempt {attempt + 1} failed: {e}") | |
if attempt < self.max_retries - 1: | |
time.sleep(self.request_delay * (attempt + 1)) | |
return None | |
def advanced_text_cleaning(self, text: str) -> str: | |
"""Robust text cleaning with version compatibility""" | |
try: | |
cleaned_text = clean( | |
text, | |
fix_unicode=True, | |
to_ascii=True, | |
lower=True, | |
no_line_breaks=True, | |
no_urls=True, | |
no_emails=True, | |
no_phone_numbers=True, | |
no_numbers=False, | |
no_digits=False, | |
no_currency_symbols=True, | |
no_punct=False | |
).strip() | |
return cleaned_text | |
except Exception as e: | |
logger.warning(f"Text cleaning error: {e}. Using fallback method.") | |
text = re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text) | |
text = text.encode('ascii', 'ignore').decode('ascii') | |
text = re.sub(r'\s+', ' ', text) | |
return text.strip() | |
def validate_url(self, url: str) -> Dict: | |
"""Validate URL format and accessibility""" | |
try: | |
if not validators.url(url): | |
return {'is_valid': False, 'message': 'Invalid URL format'} | |
response = self.session.head(url, timeout=self.timeout) | |
response.raise_for_status() | |
return {'is_valid': True, 'message': 'URL is valid and accessible'} | |
except Exception as e: | |
return {'is_valid': False, 'message': f'URL validation failed: {str(e)}'} | |
def fetch_content(self, url: str) -> Optional[Dict]: | |
"""Universal content fetcher with special case handling""" | |
try: | |
if 'drive.google.com' in url: | |
return self._handle_google_drive(url) | |
if 'calendar.google.com' in url and 'ical' in url: | |
return self._handle_google_calendar(url) | |
return self._fetch_html_content(url) | |
except Exception as e: | |
logger.error(f"Content fetch failed: {e}") | |
return None | |
def _handle_google_drive(self, url: str) -> Optional[Dict]: | |
"""Process Google Drive file links""" | |
try: | |
file_id = re.search(r'/file/d/([a-zA-Z0-9_-]+)', url) | |
if not file_id: | |
logger.error(f"Invalid Google Drive URL: {url}") | |
return None | |
direct_url = f"https://drive.google.com/uc?export=download&id={file_id.group(1)}" | |
response = self.session.get(direct_url, timeout=self.timeout) | |
response.raise_for_status() | |
return { | |
'content': response.text, | |
'content_type': response.headers.get('Content-Type', ''), | |
'timestamp': datetime.now().isoformat() | |
} | |
except Exception as e: | |
logger.error(f"Google Drive processing failed: {e}") | |
return None | |
def _handle_google_calendar(self, url: str) -> Optional[Dict]: | |
"""Process Google Calendar ICS feeds""" | |
try: | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
return { | |
'content': response.text, | |
'content_type': 'text/calendar', | |
'timestamp': datetime.now().isoformat() | |
} | |
except Exception as e: | |
logger.error(f"Calendar fetch failed: {e}") | |
return None | |
def _fetch_html_content(self, url: str) -> Optional[Dict]: | |
"""Standard HTML content processing""" | |
try: | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.text, 'html.parser') | |
for element in soup(['script', 'style', 'nav', 'footer', 'header', 'meta', 'link']): | |
element.decompose() | |
main_content = soup.find('main') or soup.find('article') or soup.body | |
if main_content is None: | |
logger.warning(f"No main content found for URL: {url}") | |
return { | |
'content': '', | |
'content_type': response.headers.get('Content-Type', ''), | |
'timestamp': datetime.now().isoformat() | |
} | |
text_content = main_content.get_text(separator='\n', strip=True) | |
cleaned_content = self.advanced_text_cleaning(text_content) | |
return { | |
'content': cleaned_content, | |
'content_type': response.headers.get('Content-Type', ''), | |
'timestamp': datetime.now().isoformat() | |
} | |
except Exception as e: | |
logger.error(f"HTML processing failed: {e}") | |
return None | |
class FileProcessor: | |
"""Class to handle file processing with enhanced capabilities""" | |
def __init__(self, max_file_size: int = 2 * 1024 * 1024 * 1024): # 2GB default | |
self.max_file_size = max_file_size | |
self.supported_text_extensions = {'.txt', '.md', '.csv', '.json', '.xml', '.html', '.htm', '.js', '.css', '.py', '.java', '.c', '.cpp', '.h', '.rb', '.php', '.sql', '.yaml', '.yml', '.ini', '.cfg', '.conf', '.log', '.sh', '.bat', '.ps1'} | |
self.supported_binary_extensions = {'.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.zip', '.tar', '.gz', '.rar', '.7z', '.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff', '.mp3', '.mp4', '.avi', '.mov', '.wmv', '.flv', '.wav', '.ogg'} | |
def is_text_file(self, filepath: str) -> bool: | |
"""Check if file is a text file""" | |
try: | |
mime_type, _ = mimetypes.guess_type(filepath) | |
ext = os.path.splitext(filepath)[1].lower() | |
# Check by extension first | |
if ext in self.supported_text_extensions: | |
return True | |
# Then check by mime type | |
if mime_type and mime_type.startswith('text/'): | |
return True | |
# Try to read the file as text | |
if os.path.exists(filepath) and os.path.getsize(filepath) < 1024 * 1024: # Only try for files < 1MB | |
try: | |
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: | |
sample = f.read(1024) # Read first 1KB | |
# Check if it's mostly printable ASCII | |
printable_ratio = sum(c.isprintable() for c in sample) / len(sample) if sample else 0 | |
return printable_ratio > 0.8 | |
except Exception: | |
pass | |
return False | |
except Exception as e: | |
logger.error(f"Error checking if file is text: {e}") | |
return False | |
def process_file(self, file) -> List[Dict]: | |
"""Process uploaded file with enhanced error handling and binary support""" | |
if not file: | |
return [{"error": "No file provided"}] | |
dataset = [] | |
try: | |
file_size = os.path.getsize(file.name) | |
if file_size > self.max_file_size: | |
logger.warning(f"File size ({file_size} bytes) exceeds maximum allowed size") | |
return [{"error": f"File size ({file_size} bytes) exceeds maximum allowed size of {self.max_file_size} bytes"}] | |
with tempfile.TemporaryDirectory() as temp_dir: | |
# Check if it's an archive file | |
if zipfile.is_zipfile(file.name): | |
dataset.extend(self._process_zip_file(file.name, temp_dir)) | |
elif file.name.endswith('.tar.gz') or file.name.endswith('.tgz'): | |
dataset.extend(self._process_tar_file(file.name, temp_dir)) | |
elif file.name.endswith('.rar'): | |
dataset.extend(self._process_rar_file(file.name, temp_dir)) | |
elif file.name.endswith('.7z'): | |
dataset.extend(self._process_7z_file(file.name, temp_dir)) | |
# Check if it's a document file | |
elif file.name.endswith(('.doc', '.docx')): | |
dataset.extend(self._process_word_file(file.name)) | |
elif file.name.endswith(('.xls', '.xlsx')): | |
dataset.extend(self._process_excel_file(file.name)) | |
elif file.name.endswith(('.ppt', '.pptx')): | |
dataset.extend(self._process_powerpoint_file(file.name)) | |
elif file.name.endswith('.pdf'): | |
dataset.extend(self._process_pdf_file(file.name)) | |
# Check if it's an image file | |
elif file.name.endswith(('.jpg', '.jpeg', '.png', '.gif', '.bmp', '.tiff')): | |
dataset.extend(self._process_image_file(file.name)) | |
# Check if it's an audio/video file | |
elif file.name.endswith(('.mp3', '.wav', '.ogg', '.mp4', '.avi', '.mov', '.wmv', '.flv')): | |
dataset.extend(self._process_media_file(file.name)) | |
# Default to text file processing | |
else: | |
dataset.extend(self._process_single_file(file)) | |
if not dataset: | |
return [{"warning": "No extractable content found in the file"}] | |
except Exception as e: | |
logger.error(f"Error processing file: {str(e)}") | |
return [{"error": f"Error processing file: {str(e)}"}] | |
return dataset | |
def _process_zip_file(self, zip_path: str, temp_dir: str) -> List[Dict]: | |
"""Process ZIP file contents with enhanced extraction""" | |
results = [] | |
try: | |
with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
# Get file list first | |
file_list = zip_ref.namelist() | |
total_files = len(file_list) | |
# Extract all files | |
zip_ref.extractall(temp_dir) | |
# Process each file | |
processed_count = 0 | |
for root, dirs, files in os.walk(temp_dir): | |
for filename in files: | |
filepath = os.path.join(root, filename) | |
rel_path = os.path.relpath(filepath, temp_dir) | |
# Get file info from zip | |
try: | |
zip_info = zip_ref.getinfo(rel_path.replace('\\', '/')) | |
file_size = zip_info.file_size | |
compressed_size = zip_info.compress_size | |
compression_ratio = (1 - compressed_size / file_size) * 100 if file_size > 0 else 0 | |
except Exception: | |
file_size = os.path.getsize(filepath) | |
compressed_size = None | |
compression_ratio = None | |
# Process based on file type | |
if self.is_text_file(filepath): | |
try: | |
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read() | |
results.append({ | |
"source": "zip", | |
"archive": os.path.basename(zip_path), | |
"filename": filename, | |
"path": rel_path, | |
"size": file_size, | |
"compressed_size": compressed_size, | |
"compression_ratio": f"{compression_ratio:.2f}%" if compression_ratio is not None else None, | |
"content": content, | |
"timestamp": datetime.now().isoformat() | |
}) | |
processed_count += 1 | |
except Exception as e: | |
logger.error(f"Error reading file {filename}: {str(e)}") | |
else: | |
# For binary files, just record metadata | |
mime_type, _ = mimetypes.guess_type(filepath) | |
results.append({ | |
"source": "zip", | |
"archive": os.path.basename(zip_path), | |
"filename": filename, | |
"path": rel_path, | |
"size": file_size, | |
"compressed_size": compressed_size, | |
"compression_ratio": f"{compression_ratio:.2f}%" if compression_ratio is not None else None, | |
"mime_type": mime_type, | |
"content": f"[Binary file: {mime_type or 'unknown type'}]", | |
"timestamp": datetime.now().isoformat() | |
}) | |
processed_count += 1 | |
# Add summary | |
results.append({ | |
"source": "zip_summary", | |
"archive": os.path.basename(zip_path), | |
"total_files": total_files, | |
"processed_files": processed_count, | |
"timestamp": datetime.now().isoformat() | |
}) | |
except Exception as e: | |
logger.error(f"Error processing ZIP file: {str(e)}") | |
results.append({"error": f"Error processing ZIP file: {str(e)}"}) | |
return results | |
def _process_tar_file(self, tar_path: str, temp_dir: str) -> List[Dict]: | |
"""Process TAR/GZ file contents""" | |
results = [] | |
try: | |
import tarfile | |
with tarfile.open(tar_path, 'r:*') as tar: | |
# Get file list | |
file_list = tar.getnames() | |
total_files = len(file_list) | |
# Extract all files | |
tar.extractall(temp_dir) | |
# Process each file | |
processed_count = 0 | |
for root, dirs, files in os.walk(temp_dir): | |
for filename in files: | |
filepath = os.path.join(root, filename) | |
rel_path = os.path.relpath(filepath, temp_dir) | |
# Process based on file type | |
if self.is_text_file(filepath): | |
try: | |
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read() | |
results.append({ | |
"source": "tar", | |
"archive": os.path.basename(tar_path), | |
"filename": filename, | |
"path": rel_path, | |
"size": os.path.getsize(filepath), | |
"content": content, | |
"timestamp": datetime.now().isoformat() | |
}) | |
processed_count += 1 | |
except Exception as e: | |
logger.error(f"Error reading file {filename}: {str(e)}") | |
else: | |
# For binary files, just record metadata | |
mime_type, _ = mimetypes.guess_type(filepath) | |
results.append({ | |
"source": "tar", | |
"archive": os.path.basename(tar_path), | |
"filename": filename, | |
"path": rel_path, | |
"size": os.path.getsize(filepath), | |
"mime_type": mime_type, | |
"content": f"[Binary file: {mime_type or 'unknown type'}]", | |
"timestamp": datetime.now().isoformat() | |
}) | |
processed_count += 1 | |
# Add summary | |
results.append({ | |
"source": "tar_summary", | |
"archive": os.path.basename(tar_path), | |
"total_files": total_files, | |
"processed_files": processed_count, | |
"timestamp": datetime.now().isoformat() | |
}) | |
except Exception as e: | |
logger.error(f"Error processing TAR file: {str(e)}") | |
results.append({"error": f"Error processing TAR file: {str(e)}"}) | |
return results | |
def _process_single_file(self, file) -> List[Dict]: | |
"""Process a single file with enhanced metadata extraction""" | |
try: | |
file_stat = os.stat(file.name) | |
file_path = file.name | |
filename = os.path.basename(file_path) | |
mime_type, _ = mimetypes.guess_type(file_path) | |
# For text files | |
if self.is_text_file(file_path): | |
if file_stat.st_size > 100 * 1024 * 1024: # 100MB | |
logger.info(f"Processing large file: {file_path} ({file_stat.st_size} bytes)") | |
with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read(1 * 1024 * 1024) # First 1MB | |
content += "\n...[Content truncated due to large file size]...\n" | |
f.seek(max(0, file_stat.st_size - 1 * 1024 * 1024)) | |
content += f.read() # Last 1MB | |
else: | |
with open(file.name, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read() | |
else: | |
# For binary files, just record metadata | |
content = f"[Binary file: {mime_type or 'unknown type'}]" | |
return [{ | |
'source': 'file', | |
'filename': os.path.basename(file.name), | |
'file_size': file_stat.st_size, | |
'mime_type': mimetypes.guess_type(file.name)[0], | |
'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(), | |
'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(), | |
'content': content, | |
'timestamp': datetime.now().isoformat() | |
}] | |
except Exception as e: | |
logger.error(f"File processing error: {e}") | |
return [] | |
def clean_json(self, data: Union[str, Dict]) -> Optional[Dict]: | |
"""Clean and validate JSON data""" | |
try: | |
if isinstance(data, str): | |
data = data.strip() | |
data = json.loads(data) | |
cleaned = json.loads(json.dumps(data)) | |
return cleaned | |
except json.JSONDecodeError as e: | |
logger.error(f"JSON cleaning error: {e}") | |
return None | |
except Exception as e: | |
logger.error(f"Unexpected error while cleaning JSON: {e}") | |
return None | |
def generate_qr_code(self, data: Union[str, Dict], combined: bool = True) -> List[str]: | |
"""Generate QR code(s) from data""" | |
try: | |
output_dir = Path('output/qr_codes') | |
output_dir.mkdir(parents=True, exist_ok=True) | |
if combined: | |
cleaned_data = self.clean_json(data) | |
if cleaned_data: | |
qr = qrcode.QRCode( | |
version=None, | |
error_correction=qrcode.constants.ERROR_CORRECT_L, | |
box_size=10, | |
border=4, | |
) | |
json_str = json.dumps(cleaned_data, ensure_ascii=False) | |
qr.add_data(json_str) | |
qr.make(fit=True) | |
img = qr.make_image(fill_color="black", back_color="white") | |
output_path = output_dir / f'combined_qr_{int(time.time())}.png' | |
img.save(str(output_path)) | |
return [str(output_path)] | |
else: | |
if isinstance(data, list): | |
paths = [] | |
for idx, item in enumerate(data): | |
cleaned_item = self.clean_json(item) | |
if cleaned_item: | |
qr = qrcode.QRCode( | |
version=None, | |
error_correction=qrcode.constants.ERROR_CORRECT_L, | |
box_size=10, | |
border=4, | |
) | |
json_str = json.dumps(cleaned_item, ensure_ascii=False) | |
qr.add_data(json_str) | |
qr.make(fit=True) | |
img = qrcode.make_image(fill_color="black", back_color="white") | |
output_path = output_dir / f'item_{idx}_qr_{int(time.time())}.png' | |
img.save(str(output_path)) | |
paths.append(str(output_path)) | |
return paths | |
else: | |
cleaned_item = self.clean_json(data) | |
if cleaned_item: | |
qr = qrcode.QRCode( | |
version=None, | |
error_correction=qrcode.constants.ERROR_CORRECT_L, | |
box_size=10, | |
border=4, | |
) | |
json_str = json.dumps(cleaned_item, ensure_ascii=False) | |
qr.add_data(json_str) | |
qr.make(fit=True) | |
img = qrcode.make_image(fill_color="black", back_color="white") | |
output_path = output_dir / f'single_qr_{int(time.time())}.png' | |
img.save(str(output_path)) | |
return [str(output_path)] | |
return [] | |
except Exception as e: | |
logger.error(f"QR generation error: {e}") | |
return [] | |
def decode_qr_code(image_path: str) -> Optional[str]: | |
"""Decode QR code from an image file using OpenCV with improved binary handling""" | |
try: | |
# Read image using OpenCV | |
img = cv2.imread(image_path) | |
if img is None: | |
logger.error(f"Failed to read image: {image_path}") | |
return None | |
# Convert to grayscale | |
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
# Initialize QRCode detector | |
detector = cv2.QRCodeDetector() | |
# Detect and decode | |
data, vertices, _ = detector.detectAndDecode(gray) | |
if vertices is not None and data: | |
# Check if this might be binary data (like a PDF) | |
if data.startswith("%PDF") or not all(ord(c) < 128 for c in data): | |
# This is likely binary data, encode as base64 | |
try: | |
# If it's already a string representation, convert to bytes first | |
if isinstance(data, str): | |
data_bytes = data.encode('latin-1') # Use latin-1 to preserve byte values | |
else: | |
data_bytes = data | |
# Encode as base64 | |
base64_data = base64.b64encode(data_bytes).decode('ascii') | |
return f"base64:{base64_data}" | |
except Exception as e: | |
logger.error(f"Error encoding binary data: {e}") | |
return data | |
logger.warning("No QR code found in image") | |
return None | |
except Exception as e: | |
logger.error(f"QR decoding error: {e}") | |
return None | |
# Also update the datachat_interface function to handle base64 data | |
def datachat_interface(mode: str, data_source: str, json_input: str, qr_image: str, query: str) -> str: | |
"""Interface for DataChat functionality with binary data support""" | |
data = None | |
if data_source == "JSON Input": | |
data = json_input | |
elif data_source == "QR Code": | |
try: | |
decoded_data = decode_qr_code(qr_image) | |
# Handle base64 encoded data | |
if decoded_data and decoded_data.startswith("base64:"): | |
base64_part = decoded_data[7:] # Remove the "base64:" prefix | |
try: | |
# For PDFs and other binary data, provide info about the content | |
binary_data = base64.b64decode(base64_part) | |
if binary_data.startswith(b"%PDF"): | |
data = "The QR code contains a PDF document. Binary data cannot be processed directly." | |
else: | |
# Try to decode as text as a fallback | |
data = binary_data.decode('utf-8', errors='replace') | |
except Exception as e: | |
logger.error(f"Error processing base64 data: {e}") | |
data = "The QR code contains binary data that cannot be processed directly." | |
else: | |
data = decoded_data | |
if not data: | |
return "No QR code found in the provided image." | |
except Exception as e: | |
return f"Invalid QR code data provided: {e}" | |
else: | |
return "No valid data source selected." | |
if mode == "Trained with Data": | |
return datachat_trained(data, query) | |
elif mode == "Chat about Data": | |
return datachat_simple(data, query) | |
else: | |
return "Invalid mode selected." | |
# Replace the create_interface function with this version | |
def create_interface(): | |
"""Create a comprehensive Gradio interface with advanced features""" | |
css = """ | |
.container { max-width: 1200px; margin: auto; } | |
.warning { background-color: #fff3cd; color: #856404; padding: 10px; border-radius: 4px; } | |
.error { background-color: #f8d7da; color: #721c24; padding: 10px; border-radius: 4px; } | |
.success { background-color: #d4edda; color: #155724; padding: 10px; border-radius: 4px; } | |
""" | |
# Use Interface instead of Blocks | |
demo = gr.Interface( | |
fn=datachat_interface, | |
inputs=[ | |
gr.Radio(["Trained with Data", "Chat about Data"], label="Mode"), | |
gr.Radio(["JSON Input", "QR Code"], label="Data Source"), | |
gr.Textbox(lines=8, label="JSON Data"), | |
gr.Image(label="QR Code Image", type="filepath"), | |
gr.Textbox(label="Query") | |
], | |
outputs=gr.Textbox(label="Response"), | |
title="Advanced Data Processor & QR Code Generator", | |
description="# π Advanced Data Processing & QR Code Generator", | |
css=css | |
) | |
return interface | |
def main(): | |
"""Main entry point for the application""" | |
parser = argparse.ArgumentParser(description='URL and File Processor') | |
parser.add_argument('--mode', choices=['web', 'cli'], default='web', help='Run mode (web interface or CLI)') | |
parser.add_argument('--url', help='URL to process (CLI mode)') | |
parser.add_argument('--file', help='File to process (CLI mode)') | |
parser.add_argument('--output', help='Output directory for results (CLI mode)') | |
parser.add_argument('--share', action='store_true', help='Share the web interface publicly (web mode)') | |
parser.add_argument('--check-deps', action='store_true', help='Check dependencies and install missing ones') | |
args = parser.parse_args() | |
# Check dependencies if requested | |
if args.check_deps: | |
from utils import check_dependencies, install_missing_dependencies | |
logger.info("Checking dependencies...") | |
deps = check_dependencies() | |
missing = [pkg for pkg, installed in deps.items() if not installed] | |
if missing: | |
logger.info(f"Missing dependencies: {', '.join(missing)}") | |
if input("Install missing dependencies? (y/n): ").lower() == 'y': | |
install_missing_dependencies(missing) | |
else: | |
logger.warning("Some features may not work without required dependencies.") | |
else: | |
logger.info("All dependencies are installed.") | |
# Run in web mode | |
if args.mode == 'web': | |
try: | |
import gradio | |
except ImportError: | |
logger.error("Gradio is required for web mode. Install with 'pip install gradio'") | |
sys.exit(1) | |
from interface import Interface | |
logger.info("Starting web interface...") | |
interface = Interface() | |
interface.launch(share=args.share) | |
# Run in CLI mode | |
elif args.mode == 'cli': | |
if not args.url and not args.file: | |
logger.error("In CLI mode, you must provide either --url or --file") | |
sys.exit(1) | |
results = [] | |
# Process URL if provided | |
if args.url: | |
from url_processor import URLProcessor | |
logger.info(f"Processing URL: {args.url}") | |
url_processor = URLProcessor() | |
url_results = url_processor.process_urls([args.url]) | |
results.extend(url_results) | |
# Process file if provided | |
if args.file: | |
from file_processor import FileProcessor | |
if not os.path.exists(args.file): | |
logger.error(f"File not found: {args.file}") | |
sys.exit(1) | |
logger.info(f"Processing file: {args.file}") | |
file_processor = FileProcessor() | |
# Create a file-like object with a name attribute | |
class FileObj: | |
def __init__(self, path): | |
self.name = path | |
file_results = file_processor.process_file(FileObj(args.file)) | |
results.extend(file_results) | |
# Save results | |
if results: | |
from utils import save_results | |
output_dir = args.output or os.getcwd() | |
filepath = save_results(results, output_dir) | |
if filepath: | |
logger.info(f"Results saved to: {filepath}") | |
else: | |
logger.error("Failed to save results") | |
else: | |
logger.warning("No results to save") | |
if __name__ == "__main__": | |
main() | |