Spaces:
Running
Running
import json | |
import os | |
import re | |
import logging | |
import mimetypes | |
import time | |
from PIL import Image | |
import zxing | |
import io | |
import zipfile | |
import tempfile | |
from datetime import datetime | |
from typing import List, Dict, Optional, Union | |
from pathlib import Path | |
import requests | |
import validators | |
import gradio as gr | |
from bs4 import BeautifulSoup | |
from fake_useragent import UserAgent | |
from cleantext import clean | |
import qrcode# Setup logging | |
import base64 | |
import io | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler('app.log', encoding='utf-8') | |
] | |
) | |
logger = logging.getLogger(__name__) | |
# Ensure output directories exist | |
Path('output/qr_codes').mkdir(parents=True, exist_ok=True) | |
class URLProcessor: | |
def __init__(self): | |
self.session = requests.Session() | |
self.timeout = 10 | |
self.max_retries = 3 | |
self.request_delay = 1.0 | |
self.respect_robots = True | |
self.use_proxy = False | |
self.proxy_url = None | |
self.rate_limits = {} # Track rate limits per domain | |
self.selenium_driver = None | |
# Update session headers with rotating user agents | |
self.update_user_agent() | |
if self.use_proxy and self.proxy_url: | |
self.session.proxies = { | |
'http': self.proxy_url, | |
'https': self.proxy_url | |
} | |
def update_user_agent(self): | |
"""Rotate user agents to avoid detection""" | |
try: | |
self.session.headers.update({ | |
'User-Agent': UserAgent().random, | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'Accept-Encoding': 'gzip, deflate, br', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1', | |
'Cache-Control': 'max-age=0' | |
}) | |
except Exception as e: | |
logger.warning(f"Failed to update user agent: {e}") | |
# Fallback to a common user agent | |
self.session.headers.update({ | |
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36' | |
}) | |
def get_selenium_driver(self): | |
"""Initialize Selenium WebDriver for interactive sites""" | |
if self.selenium_driver is not None: | |
return self.selenium_driver | |
try: | |
from selenium import webdriver | |
from selenium.webdriver.chrome.service import Service | |
from selenium.webdriver.chrome.options import Options | |
from webdriver_manager.chrome import ChromeDriverManager | |
options = Options() | |
options.add_argument("--headless") | |
options.add_argument("--no-sandbox") | |
options.add_argument("--disable-dev-shm-usage") | |
options.add_argument(f"user-agent={self.session.headers['User-Agent']}") | |
options.add_argument("--disable-notifications") | |
options.add_argument("--disable-popup-blocking") | |
options.add_argument("--disable-extensions") | |
service = Service(ChromeDriverManager().install()) | |
self.selenium_driver = webdriver.Chrome(service=service, options=options) | |
return self.selenium_driver | |
except Exception as e: | |
logger.error(f"Failed to initialize Selenium: {e}") | |
return None | |
def handle_rate_limits(self, domain): | |
"""Smart rate limiting based on domain""" | |
from urllib.parse import urlparse | |
import time | |
# Extract domain from URL | |
parsed_domain = urlparse(domain).netloc | |
# Check if we've accessed this domain recently | |
current_time = time.time() | |
if parsed_domain in self.rate_limits: | |
last_access, count = self.rate_limits[parsed_domain] | |
# Different delay strategies for different domains | |
if "facebook" in parsed_domain or "instagram" in parsed_domain: | |
min_delay = 5.0 # Longer delay for social media sites | |
elif "gov" in parsed_domain: | |
min_delay = 2.0 # Be respectful with government sites | |
else: | |
min_delay = self.request_delay | |
# Exponential backoff if we're making many requests | |
if count > 10: | |
min_delay *= 2 | |
# Wait if needed | |
elapsed = current_time - last_access | |
if elapsed < min_delay: | |
time.sleep(min_delay - elapsed) | |
# Update count | |
self.rate_limits[parsed_domain] = (time.time(), count + 1) | |
else: | |
# First time accessing this domain | |
self.rate_limits[parsed_domain] = (current_time, 1) | |
def handle_interactive_site(self, url): | |
"""Handle sites that require interaction to bypass blocks""" | |
driver = self.get_selenium_driver() | |
if not driver: | |
return None | |
try: | |
driver.get(url) | |
# Wait for page to load | |
import time | |
time.sleep(3) | |
# Handle different types of sites | |
if "facebook.com" in url or "instagram.com" in url: | |
self._handle_social_media_site(driver) | |
elif "google.com" in url: | |
self._handle_google_site(driver) | |
# Get the page source after interaction | |
page_source = driver.page_source | |
return { | |
'content': page_source, | |
'content_type': 'text/html', | |
'url': url, | |
'title': driver.title | |
} | |
except Exception as e: | |
logger.error(f"Error handling interactive site {url}: {e}") | |
return None | |
def _handle_social_media_site(self, driver): | |
"""Handle Facebook/Instagram login walls""" | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.common.keys import Keys | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
try: | |
# Try to find and close login popups | |
close_buttons = driver.find_elements(By.XPATH, "//button[contains(@aria-label, 'Close')]") | |
if close_buttons: | |
close_buttons[0].click() | |
time.sleep(1) | |
# Press ESC key to dismiss popups | |
webdriver.ActionChains(driver).send_keys(Keys.ESCAPE).perform() | |
time.sleep(1) | |
# Scroll down to load more content | |
driver.execute_script("window.scrollTo(0, document.body.scrollHeight/2);") | |
time.sleep(2) | |
except Exception as e: | |
logger.warning(f"Error handling social media site: {e}") | |
def _handle_google_site(self, driver): | |
"""Handle Google authentication and consent pages""" | |
from selenium.webdriver.common.by import By | |
try: | |
# Look for consent buttons | |
consent_buttons = driver.find_elements(By.XPATH, "//button[contains(text(), 'Accept all')]") | |
if consent_buttons: | |
consent_buttons[0].click() | |
time.sleep(1) | |
# Look for "I agree" buttons | |
agree_buttons = driver.find_elements(By.XPATH, "//button[contains(text(), 'I agree')]") | |
if agree_buttons: | |
agree_buttons[0].click() | |
time.sleep(1) | |
except Exception as e: | |
logger.warning(f"Error handling Google site: {e}") | |
def fetch_content(self, url: str) -> Optional[Dict]: | |
"""Fetch content with smart handling for different sites""" | |
# Check if URL is allowed by robots.txt | |
if self.respect_robots and not self.check_robots_txt(url): | |
logger.warning(f"URL {url} is disallowed by robots.txt") | |
return None | |
# Apply rate limiting | |
self.handle_rate_limits(url) | |
# Rotate user agent occasionally | |
if random.random() < 0.3: # 30% chance to rotate | |
self.update_user_agent() | |
# Determine if site needs special handling | |
needs_selenium = any(domain in url.lower() for domain in [ | |
'facebook.com', 'instagram.com', 'linkedin.com', | |
'google.com/search', 'twitter.com', 'x.com' | |
]) | |
for attempt in range(self.max_retries): | |
try: | |
if needs_selenium: | |
return self.handle_interactive_site(url) | |
# Try with cloudscraper first for sites with anti-bot measures | |
if any(domain in url.lower() for domain in ['cloudflare', '.gov']): | |
import cloudscraper | |
scraper = cloudscraper.create_scraper( | |
browser={'browser': 'chrome', 'platform': 'darwin', 'mobile': False} | |
) | |
response = scraper.get(url, timeout=self.timeout) | |
else: | |
# Standard request for most sites | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
return { | |
'content': response.text, | |
'content_type': response.headers.get('Content-Type', ''), | |
'url': url, | |
'status_code': response.status_code | |
} | |
except Exception as e: | |
logger.warning(f"Attempt {attempt + 1} failed for {url}: {e}") | |
if attempt < self.max_retries - 1: | |
# Exponential backoff | |
time.sleep(self.request_delay * (2 ** attempt)) | |
logger.error(f"All attempts failed for {url}") | |
return None | |
def check_robots_txt(self, url: str) -> bool: | |
"""Check if URL is allowed by robots.txt""" | |
if not self.respect_robots: | |
return True | |
try: | |
from urllib.parse import urlparse | |
from urllib.robotparser import RobotFileParser | |
parsed_url = urlparse(url) | |
robots_url = f"{parsed_url.scheme}://{parsed_url.netloc}/robots.txt" | |
rp = RobotFileParser() | |
rp.set_url(robots_url) | |
rp.read() | |
return rp.can_fetch(self.session.headers['User-Agent'], url) | |
except Exception as e: | |
logger.warning(f"Error checking robots.txt: {e}") | |
return True | |
def fetch_content(self, url: str) -> Optional[Dict]: | |
"""Fetch content with built-in rate limiting and robots.txt checking""" | |
if not self.check_robots_txt(url): | |
logger.warning(f"URL {url} is disallowed by robots.txt") | |
return None | |
time.sleep(self.request_delay) # Basic rate limiting | |
for attempt in range(self.max_retries): | |
try: | |
if 'drive.google.com' in url: | |
return self._handle_google_drive(url) | |
if 'calendar.google.com' in url: | |
return self._handle_google_calendar(url) | |
return self._fetch_html_content(url) | |
except Exception as e: | |
logger.error(f"Attempt {attempt + 1} failed: {e}") | |
if attempt < self.max_retries - 1: | |
time.sleep(self.request_delay * (attempt + 1)) | |
return None | |
def advanced_text_cleaning(self, text: str) -> str: | |
"""Robust text cleaning with version compatibility""" | |
try: | |
cleaned_text = clean( | |
text, | |
fix_unicode=True, | |
to_ascii=True, | |
lower=True, | |
no_line_breaks=True, | |
no_urls=True, | |
no_emails=True, | |
no_phone_numbers=True, | |
no_numbers=False, | |
no_digits=False, | |
no_currency_symbols=True, | |
no_punct=False | |
).strip() | |
return cleaned_text | |
except Exception as e: | |
logger.warning(f"Text cleaning error: {e}. Using fallback method.") | |
text = re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text) | |
text = text.encode('ascii', 'ignore').decode('ascii') | |
text = re.sub(r'\s+', ' ', text) | |
return text.strip() | |
def validate_url(self, url: str) -> Dict: | |
"""Validate URL format and accessibility""" | |
try: | |
if not validators.url(url): | |
return {'is_valid': False, 'message': 'Invalid URL format'} | |
response = self.session.head(url, timeout=self.timeout) | |
response.raise_for_status() | |
return {'is_valid': True, 'message': 'URL is valid and accessible'} | |
except Exception as e: | |
return {'is_valid': False, 'message': f'URL validation failed: {str(e)}'} | |
def fetch_content(self, url: str) -> Optional[Dict]: | |
"""Universal content fetcher with special case handling""" | |
try: | |
if 'drive.google.com' in url: | |
return self._handle_google_drive(url) | |
if 'calendar.google.com' in url and 'ical' in url: | |
return self._handle_google_calendar(url) | |
return self._fetch_html_content(url) | |
except Exception as e: | |
logger.error(f"Content fetch failed: {e}") | |
return None | |
def _handle_google_drive(self, url: str) -> Optional[Dict]: | |
"""Process Google Drive file links""" | |
try: | |
file_id = re.search(r'/file/d/([a-zA-Z0-9_-]+)', url) | |
if not file_id: | |
logger.error(f"Invalid Google Drive URL: {url}") | |
return None | |
direct_url = f"https://drive.google.com/uc?export=download&id={file_id.group(1)}" | |
response = self.session.get(direct_url, timeout=self.timeout) | |
response.raise_for_status() | |
return { | |
'content': response.text, | |
'content_type': response.headers.get('Content-Type', ''), | |
'timestamp': datetime.now().isoformat() | |
} | |
except Exception as e: | |
logger.error(f"Google Drive processing failed: {e}") | |
return None | |
def _handle_google_calendar(self, url: str) -> Optional[Dict]: | |
"""Process Google Calendar ICS feeds""" | |
try: | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
return { | |
'content': response.text, | |
'content_type': 'text/calendar', | |
'timestamp': datetime.now().isoformat() | |
} | |
except Exception as e: | |
logger.error(f"Calendar fetch failed: {e}") | |
return None | |
def _fetch_html_content(self, url: str) -> Optional[Dict]: | |
"""Standard HTML content processing""" | |
try: | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.text, 'html.parser') | |
for element in soup(['script', 'style', 'nav', 'footer', 'header', 'meta', 'link']): | |
element.decompose() | |
main_content = soup.find('main') or soup.find('article') or soup.body | |
if main_content is None: | |
logger.warning(f"No main content found for URL: {url}") | |
return { | |
'content': '', | |
'content_type': response.headers.get('Content-Type', ''), | |
'timestamp': datetime.now().isoformat() | |
} | |
text_content = main_content.get_text(separator='\n', strip=True) | |
cleaned_content = self.advanced_text_cleaning(text_content) | |
return { | |
'content': cleaned_content, | |
'content_type': response.headers.get('Content-Type', ''), | |
'timestamp': datetime.now().isoformat() | |
} | |
except Exception as e: | |
logger.error(f"HTML processing failed: {e}") | |
return None | |
class FileProcessor: | |
"""Class to handle file processing""" | |
def __init__(self, max_file_size: int = 2 * 1024 * 1024 * 1024): # 2GB default | |
self.max_file_size = max_file_size | |
self.supported_text_extensions = {'.txt', '.md', '.csv', '.json', '.xml'} | |
def is_text_file(self, filepath: str) -> bool: | |
"""Check if file is a text file""" | |
try: | |
mime_type, _ = mimetypes.guess_type(filepath) | |
return (mime_type and mime_type.startswith('text/')) or \ | |
(os.path.splitext(filepath)[1].lower() in self.supported_text_extensions) | |
except Exception: | |
return False | |
def process_file(self, file) -> List[Dict]: | |
"""Process uploaded file with enhanced error handling""" | |
if not file: | |
return [] | |
dataset = [] | |
try: | |
file_size = os.path.getsize(file.name) | |
if file_size > self.max_file_size: | |
logger.warning(f"File size ({file_size} bytes) exceeds maximum allowed size") | |
return [] | |
with tempfile.TemporaryDirectory() as temp_dir: | |
if zipfile.is_zipfile(file.name): | |
dataset.extend(self._process_zip_file(file.name, temp_dir)) | |
else: | |
dataset.extend(self._process_single_file(file)) | |
except Exception as e: | |
logger.error(f"Error processing file: {str(e)}") | |
return [] | |
return dataset | |
def _process_zip_file(self, zip_path: str, temp_dir: str) -> List[Dict]: | |
"""Process ZIP file contents""" | |
results = [] | |
with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
zip_ref.extractall(temp_dir) | |
for root, _, files in os.walk(temp_dir): | |
for filename in files: | |
filepath = os.path.join(root, filename) | |
if self.is_text_file(filepath): | |
try: | |
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read() | |
if content.strip(): | |
results.append({ | |
"source": "file", | |
"filename": filename, | |
"content": content, | |
"timestamp": datetime.now ().isoformat() | |
}) | |
except Exception as e: | |
logger.error(f"Error reading file {filename}: {str(e)}") | |
return results | |
def _process_single_file(self, file) -> List[Dict]: | |
"""Process a single file""" | |
try: | |
file_stat = os.stat(file.name) | |
if file_stat.st_size > 100 * 1024 * 1024: # 100MB | |
logger.info(f"Processing large file: {file.name} ({file_stat.st_size} bytes)") | |
content = "" | |
with open(file.name, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read(1 * 1024 * 1024) # First 1MB | |
content += "\n...[Content truncated due to large file size]...\n" | |
f.seek(max(0, file_stat.st_size - 1 * 1024 * 1024)) | |
content += f.read() # Last 1MB | |
else: | |
with open(file.name, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read() | |
return [{ | |
'source': 'file', | |
'filename': os.path.basename(file.name), | |
'file_size': file_stat.st_size, | |
'mime_type': mimetypes.guess_type(file.name)[0], | |
'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(), | |
'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(), | |
'content': content, | |
'timestamp': datetime.now().isoformat() | |
}] | |
except Exception as e: | |
logger.error(f"File processing error: {e}") | |
return [] | |
def clean_json(data: Union[str, Dict]) -> Optional[Dict]: | |
"""Clean and validate JSON data""" | |
try: | |
if isinstance(data, str): | |
data = data.strip() | |
data = json.loads(data) | |
cleaned = json.loads(json.dumps(data)) | |
return cleaned | |
except json.JSONDecodeError as e: | |
logger.error(f"JSON cleaning error: {e}") | |
return None | |
except Exception as e: | |
logger.error(f"Unexpected error while cleaning JSON: {e}") | |
return None | |
def generate_qr(json_data): | |
data = FileProcessor.clean_json(json_data) | |
if data: | |
return FileProcessor.generate_qr_code(data) | |
return None | |
if combined: | |
cleaned_data = clean_json(data) | |
if cleaned_data: | |
qr = qrcode.QRCode( | |
version=None, | |
error_correction=qrcode.constants.ERROR_CORRECT_L, | |
box_size=10, | |
border=4, | |
) | |
json_str = json.dumps(cleaned_data, ensure_ascii=False) | |
qr.add_data(json_str) | |
qr.make(fit=True) | |
img = qr.make_image(fill_color="black", back_color="white") | |
output_path = output_dir / f'combined_qr_{int(time.time())}.png' | |
img.save(str(output_path)) | |
return [str(output_path)] | |
else: | |
if isinstance(data, list): | |
paths = [] | |
for idx, item in enumerate(data): | |
cleaned_item = clean_json(item) | |
if cleaned_item: | |
qr = qrcode.QRCode( | |
version=None, | |
error_correction=qrcode.constants.ERROR_CORRECT_L, | |
box_size=10, | |
border=4, | |
) | |
json_str = json.dumps(cleaned_item, ensure_ascii=False) | |
qr.add_data(json_str) | |
qr.make(fit=True) | |
img = qrcode.make_image(fill_color="black", back_color="white") | |
output_path = output_dir / f'item_{idx}_qr_{int(time.time())}.png' | |
img.save(str(output_path)) | |
paths.append(str(output_path)) | |
return paths | |
else: | |
cleaned_item = clean_json(data) | |
if cleaned_item: | |
qr = qrcode.QRCode( | |
version=None, | |
error_correction=qrcode.constants.ERROR_CORRECT_L, | |
box_size=10, | |
border=4, | |
) | |
json_str = json.dumps(cleaned_item, ensure_ascii=False) | |
qr.add_data(json_str) | |
qr.make(fit=True) | |
img = qrcode.make_image(fill_color="black", back_color="white") | |
output_path = output_dir / f'single_qr_{int(time.time())}.png' | |
img.save(str(output_path)) | |
return [str(output_path)] | |
return [] | |
except Exception as e: | |
logger.error(f"QR generation error: {e}") | |
return [] | |
def decode_qr_code(image_path: str) -> Optional[str]: | |
"""Decode QR code from an image file using OpenCV with improved binary handling""" | |
try: | |
# Read image using OpenCV | |
img = cv2.imread(image_path) | |
if img is None: | |
logger.error(f"Failed to read image: {image_path}") | |
return None | |
# Convert to grayscale | |
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |
# Initialize QRCode detector | |
detector = cv2.QRCodeDetector() | |
# Detect and decode | |
data, vertices, _ = detector.detectAndDecode(gray) | |
if vertices is not None and data: | |
# Check if this might be binary data (like a PDF) | |
if data.startswith("%PDF") or not all(ord(c) < 128 for c in data): | |
# This is likely binary data, encode as base64 | |
try: | |
# If it's already a string representation, convert to bytes first | |
if isinstance(data, str): | |
data_bytes = data.encode('latin-1') # Use latin-1 to preserve byte values | |
else: | |
data_bytes = data | |
# Encode as base64 | |
base64_data = base64.b64encode(data_bytes).decode('ascii') | |
return f"base64:{base64_data}" | |
except Exception as e: | |
logger.error(f"Error encoding binary data: {e}") | |
return data | |
logger.warning("No QR code found in image") | |
return None | |
except Exception as e: | |
logger.error(f"QR decoding error: {e}") | |
return None | |
# Also update the datachat_interface function to handle base64 data | |
def datachat_interface(mode: str, data_source: str, json_input: str, qr_image: str, query: str) -> str: | |
"""Interface for DataChat functionality with binary data support""" | |
data = None | |
if data_source == "JSON Input": | |
data = json_input | |
elif data_source == "QR Code": | |
try: | |
decoded_data = decode_qr_code(qr_image) | |
# Handle base64 encoded data | |
if decoded_data and decoded_data.startswith("base64:"): | |
base64_part = decoded_data[7:] # Remove the "base64:" prefix | |
try: | |
# For PDFs and other binary data, provide info about the content | |
binary_data = base64.b64decode(base64_part) | |
if binary_data.startswith(b"%PDF"): | |
data = "The QR code contains a PDF document. Binary data cannot be processed directly." | |
else: | |
# Try to decode as text as a fallback | |
data = binary_data.decode('utf-8', errors='replace') | |
except Exception as e: | |
logger.error(f"Error processing base64 data: {e}") | |
data = "The QR code contains binary data that cannot be processed directly." | |
else: | |
data = decoded_data | |
if not data: | |
return "No QR code found in the provided image." | |
except Exception as e: | |
return f"Invalid QR code data provided: {e}" | |
else: | |
return "No valid data source selected." | |
if mode == "Trained with Data": | |
return datachat_trained(data, query) | |
elif mode == "Chat about Data": | |
return datachat_simple(data, query) | |
else: | |
return "Invalid mode selected." | |
def create_interface(): | |
"""Create a comprehensive Gradio interface with advanced features""" | |
css = """ | |
.container { max-width: 1200px; margin: auto; } | |
.warning { background-color: #fff3cd; color: #856404; padding: 10px; border-radius: 4px; } | |
.error { background-color: #f8d7da; color: #721c24; padding: 10px; border-radius: 4px; } | |
.success { background-color: #d4edda; color: #155724; padding: 10px; border-radius: 4px; } | |
""" | |
with gr.Blocks(css=css, title="Advanced Data Processor & QR Code Generator") as interface: | |
gr.Markdown("# 🌐 Advanced Data Processing & QR Code Generator") | |
# URL Extraction Tab | |
with gr.Tab("URL Extraction"): | |
url_input = gr.Textbox(label="URL to Process", placeholder="https://example.com") | |
depth_slider = gr.Slider(minimum=1, maximum=5, value=1, step=1, label="Crawl Depth (Higher values may affect performance)") | |
respect_robots = gr.Checkbox(label="Respect robots.txt", value=True) | |
extract_btn = gr.Button("Extract Content") | |
url_output = gr.JSON(label="Extracted Data") | |
download_btn = gr.Button("Download Results as ZIP") | |
download_output = gr.File(label="Download") | |
# Warning about depth | |
gr.Markdown(""" | |
<div class="warning"> | |
⚠️ <strong>Warning:</strong> Higher depth values (>2) may significantly increase processing time and resource usage. | |
</div> | |
""") | |
# URL processor instance | |
url_processor = URLProcessor() | |
def process_url(url, depth, respect_robots): | |
url_processor.respect_robots = respect_robots | |
results = [] | |
try: | |
# Validate URL | |
validation = url_processor.validate_url(url) | |
if not validation['is_valid']: | |
return {"error": validation['message']} | |
# Process with depth | |
processed_urls = set() | |
urls_to_process = [(url, 0)] # (url, current_depth) | |
while urls_to_process: | |
current_url, current_depth = urls_to_process.pop(0) | |
if current_url in processed_urls: | |
continue | |
processed_urls.add(current_url) | |
content = url_processor.fetch_content(current_url) | |
if content: | |
results.append({ | |
"url": current_url, | |
"content": content.get('content', ''), | |
"content_type": content.get('content_type', ''), | |
"timestamp": datetime.now().isoformat() | |
}) | |
# If we haven't reached max depth, extract and queue more URLs | |
if current_depth < depth: | |
soup = BeautifulSoup(content.get('content', ''), 'html.parser') | |
for link in soup.find_all('a', href=True): | |
next_url = link['href'] | |
if next_url.startswith('/'): | |
# Convert relative URL to absolute | |
from urllib.parse import urlparse, urljoin | |
parsed_url = urlparse(current_url) | |
base_url = f"{parsed_url.scheme}://{parsed_url.netloc}" | |
next_url = urljoin(base_url, next_url) | |
if validators.url(next_url) and next_url not in processed_urls: | |
urls_to_process.append((next_url, current_depth + 1)) | |
return results | |
except Exception as e: | |
logger.error(f"URL processing error: {e}") | |
return {"error": str(e)} | |
def create_download_zip(results): | |
if not results or (isinstance(results, dict) and 'error' in results): | |
return None | |
try: | |
# Create a temporary zip file | |
with tempfile.NamedTemporaryFile(suffix='.zip', delete=False) as tmp: | |
with zipfile.ZipFile(tmp.name, 'w') as zipf: | |
# Add JSON data | |
zipf.writestr('extracted_data.json', json.dumps(results, indent=2)) | |
# Add individual text files for each URL | |
for idx, item in enumerate(results): | |
if 'content' in item: | |
zipf.writestr(f'content_{idx}_{int(time.time())}.txt', item['content']) | |
return tmp.name | |
except Exception as e: | |
logger.error(f"Error creating ZIP file: {e}") | |
return None | |
extract_btn.click(process_url, [url_input, depth_slider, respect_robots], url_output) | |
download_btn.click(create_download_zip, [url_output], download_output) | |
# ZIP File Extractor Tab | |
with gr.Tab("ZIP File Extractor"): | |
zip_file_input = gr.File(label="Upload ZIP File") | |
extract_zip_btn = gr.Button("Extract and Process") | |
zip_output = gr.JSON(label="Extracted Data") | |
zip_qr_btn = gr.Button("Generate QR Code") | |
zip_qr_output = gr.Image(label="QR Code") | |
file_processor = FileProcessor() | |
def process_zip_file(file): | |
if not file: | |
return {"error": "No file uploaded"} | |
try: | |
results = file_processor.process_file(file) | |
return results | |
except Exception as e: | |
logger.error(f"ZIP processing error: {e}") | |
return {"error": str(e)} | |
def generate_zip_qr(data): | |
if not data or (isinstance(data, dict) and 'error' in data): | |
return None | |
try: | |
return file_processor.generate_qr_code(data, combined=True)[0] | |
except Exception as e: | |
logger.error(f"QR generation error: {e}") | |
return None | |
extract_zip_btn.click(process_zip_file, [zip_file_input], zip_output) | |
zip_qr_btn.click(generate_zip_qr, [zip_output], zip_qr_output) | |
# Raw Text to JSON Tab | |
with gr.Tab("Text to JSON"): | |
text_input = gr.Textbox(lines=10, label="Raw Text Input") | |
json_structure = gr.Dropdown( | |
choices=["Simple", "Structured", "Key-Value Pairs"], | |
label="JSON Structure", | |
value="Simple" | |
) | |
convert_btn = gr.Button("Convert to JSON") | |
json_output = gr.JSON(label="JSON Output") | |
combine_json_btn = gr.Button("Combine with Previous JSON") | |
previous_json = gr.Textbox(lines=5, label="Previous JSON (Optional)") | |
combined_output = gr.JSON(label="Combined JSON") | |
text_qr_btn = gr.Button("Generate QR Code") | |
text_qr_output = gr.Image(label="QR Code") | |
def convert_text_to_json(text, structure): | |
if not text.strip(): | |
return {"error": "No text provided"} | |
try: | |
if structure == "Simple": | |
return { | |
"text": text, | |
"timestamp": datetime.now().isoformat() | |
} | |
elif structure == "Structured": | |
lines = text.split('\n') | |
paragraphs = [] | |
current_para = [] | |
for line in lines: | |
if line.strip(): | |
current_para.append(line) | |
elif current_para: | |
paragraphs.append(' '.join(current_para)) | |
current_para = [] | |
if current_para: | |
paragraphs.append(' '.join(current_para)) | |
return { | |
"title": paragraphs[0] if paragraphs else "", | |
"paragraphs": paragraphs[1:] if len(paragraphs) > 1 else [], | |
"timestamp": datetime.now().isoformat() | |
} | |
elif structure == "Key-Value Pairs": | |
pairs = {} | |
lines = text.split('\n') | |
for line in lines: | |
if ':' in line: | |
key, value = line.split(':', 1) | |
pairs[key.strip()] = value.strip() | |
pairs["timestamp"] = datetime.now().isoformat() | |
return pairs | |
return {"error": "Invalid structure selected"} | |
except Exception as e: | |
logger.error(f"Text to JSON conversion error: {e}") | |
return {"error": str(e)} | |
def combine_json_data(current, previous): | |
if not current or (isinstance(current, dict) and 'error' in current): | |
return {"error": "No valid current JSON"} | |
try: | |
if not previous.strip(): | |
return current | |
prev_json = json.loads(previous) | |
# Determine how to combine based on types | |
if isinstance(prev_json, list) and isinstance(current, list): | |
return prev_json + current | |
elif isinstance(prev_json, list): | |
return prev_json + [current] | |
elif isinstance(current, list): | |
return [prev_json] + current | |
else: | |
# Both are objects, merge them | |
combined = {**prev_json, **current} | |
# Add a combined timestamp | |
combined["combined_timestamp"] = datetime.now().isoformat() | |
return combined | |
except json.JSONDecodeError: | |
return {"error": "Previous JSON is invalid"} | |
except Exception as e: | |
logger.error(f"JSON combination error: {e}") | |
return {"error": str(e)} | |
convert_btn.click(convert_text_to_json, [text_input, json_structure], json_output) | |
combine_json_btn.click(combine_json_data, [json_output, previous_json], combined_output) | |
text_qr_btn.click(generate_zip_qr, [json_output], text_qr_output) | |
# DataChat Tab (existing) | |
with gr.Tab("DataChat"): | |
mode = gr.Radio(["Trained with Data", "Chat about Data"], label="Mode") | |
data_source = gr.Radio(["JSON Input", "QR Code"], label="Data Source") | |
json_input = gr.Textbox(lines=8, label="JSON Data") | |
qr_image = gr.Image(label="QR Code Image", type="filepath") | |
query = gr.Textbox(label="Query") | |
submit_btn = gr.Button("Submit") | |
output = gr.Textbox(label="Response") | |
submit_btn.click(datachat_interface, [mode, data_source, json_input, qr_image, query], output) | |
# QR Generator Tab (existing) | |
with gr.Tab("QR Generator"): | |
qr_input = gr.Textbox(lines=8, label="Input JSON for QR") | |
generate_btn = gr.Button("Generate QR") | |
qr_output = gr.Image(label="Generated QR Code") | |
def generate_qr(json_data): | |
data = file_processor.clean_json(json_data) | |
if data: | |
return file_processor.generate_qr_code(data) | |
return None | |
generate_btn.click(generate_qr, qr_input, qr_output) | |
return interface | |
def main(): | |
mimetypes.init() | |
Path('output/qr_codes').mkdir(parents=True, exist_ok=True) | |
# Create and launch the interface | |
interface = create_interface() | |
interface.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
show_error=True, | |
share=False, | |
inbrowser=True, | |
debug=True | |
) | |
if __name__ == "__main__": | |
main() |