Spaces:
Running
Running
import json | |
import sys | |
import os | |
import re | |
import time | |
import logging | |
import mimetypes | |
import tempfile | |
from datetime import datetime | |
from pathlib import Path | |
from urllib.parse import urlparse | |
from typing import List, Dict, Tuple, Union, Optional | |
import requests | |
import validators | |
import gradio as gr | |
from bs4 import BeautifulSoup | |
from fake_useragent import UserAgent | |
from cleantext import clean | |
import qrcode | |
import zipfile | |
# Setup logging with detailed configuration | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler('app.log', encoding='utf-8') | |
]) | |
logger = logging.getLogger(__name__) | |
# Add these imports at the top | |
from config import Config | |
from robots_handler import RobotsHandler | |
import asyncio | |
import aiohttp | |
from tqdm import tqdm | |
class Config: | |
def __init__(self): | |
self.settings = { | |
'TIMEOUT': int(os.getenv('URLD_TIMEOUT', 10)), | |
'MAX_FILE_SIZE': int(os.getenv('URLD_MAX_FILE_SIZE', 2 * 1024 * 1024 * 1024)), | |
'RESPECT_ROBOTS': os.getenv('URLD_RESPECT_ROBOTS', 'True').lower() == 'true', | |
'USE_PROXY': os.getenv('URLD_USE_PROXY', 'False').lower() == 'true', | |
'PROXY_URL': os.getenv('URLD_PROXY_URL', ''), | |
'REQUEST_DELAY': float(os.getenv('URLD_REQUEST_DELAY', 1.0)), | |
'MAX_RETRIES': int(os.getenv('URLD_MAX_RETRIES', 3)), | |
'OUTPUT_FORMAT': os.getenv('URLD_OUTPUT_FORMAT', 'json'), | |
'CHROME_DRIVER_PATH': os.getenv('URLD_CHROME_DRIVER_PATH', '/usr/local/bin/chromedriver'), | |
} | |
def get(self, key: str) -> Any: | |
return self.settings.get(key) | |
def update(self, settings: Dict[str, Any]) -> None: | |
self.settings.update(settings) | |
class URLProcessor: | |
def __init__(self): | |
self.config = Config() | |
self.proxy_handler = ProxyHandler(self.config.get('PROXY_URL')) | |
self.robots_handler = RobotsHandler() | |
self.session = self._create_session() | |
def _create_session(self): | |
session = requests.Session() | |
if self.config.get('USE_PROXY'): | |
session.proxies = self.proxy_handler.get_proxy_config() | |
session.headers.update({ | |
'User-Agent': UserAgent().random, | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'Accept-Encoding': 'gzip, deflate, br', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1' | |
}) | |
return session | |
def _fetch_with_selenium(self, url: str) -> Optional[str]: | |
try: | |
chrome_options = Options() | |
from selenium import webdriver | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.common.exceptions import TimeoutException | |
import time | |
logger.info(f"Attempting to fetch {url} with Selenium") | |
# Set up Chrome options | |
chrome_options = Options() | |
chrome_options.add_argument("--headless") | |
chrome_options.add_argument("--no-sandbox") | |
chrome_options.add_argument("--disable-dev-shm-usage") | |
chrome_options.add_argument("--disable-gpu") | |
chrome_options.add_argument("--window-size=1920,1080") | |
chrome_options.add_argument( | |
"user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36") | |
# Initialize the driver | |
driver = webdriver.Chrome(options=chrome_options) | |
try: | |
# Navigate to the URL | |
driver.get(url) | |
# Wait for the page to load | |
WebDriverWait(driver, 10).until( | |
EC.presence_of_element_located((By.TAG_NAME, "body")) | |
) | |
# Simulate pressing ESC key to dismiss overlays | |
from selenium.webdriver.common.keys import Keys | |
action_chains = webdriver.ActionChains(driver) | |
action_chains.send_keys(Keys.ESCAPE).perform() | |
time.sleep(1) # give it a moment to take effect | |
action_chains.reset_actions() # Clear actions | |
# try again | |
action_chains.send_keys(Keys.ESCAPE).perform() | |
time.sleep(1) # give it a moment to take effect | |
action_chains.reset_actions() | |
# Get the page source | |
page_source = driver.page_source | |
# Save the Selenium HTML for debugging | |
debug_path = f"/Users/a2014/urld/debug_selenium_{int(time.time())}.html" | |
with open(debug_path, "w", encoding="utf-8") as f: | |
f.write(page_source) | |
logger.info(f"Saved Selenium HTML to {debug_path}") | |
return page_source | |
finally: | |
driver.quit() | |
except ImportError: | |
logger.error("Selenium is not installed. Cannot use browser automation.") | |
return None | |
except Exception as e: | |
logger.error(f"Selenium processing failed for {url}: {e}") | |
return None | |
async def fetch_urls_async(self, urls: List[str]) -> List[Dict]: | |
async with aiohttp.ClientSession() as session: | |
tasks = [] | |
for url in urls: | |
if self.config.get('RESPECT_ROBOTS'): | |
if not self.robots_handler.can_fetch(url, self.session.headers['User-Agent']): | |
logger.warning(f"Skipping {url} due to robots.txt restrictions") | |
continue | |
tasks.append(self.fetch_content_async(session, url)) | |
return await asyncio.gather(*tasks) | |
def _fetch_html_content(self, url: str) -> Optional[Dict]: | |
"""Standard HTML content processing""" | |
try: | |
# Try with a different user agent if it's a social media site | |
if any(domain in url for domain in | |
['facebook.com', 'instagram.com', 'twitter.com', 'x.com', 'huggingface.co']): | |
# Use a more realistic browser user agent instead of random one | |
self.session.headers.update({ | |
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', | |
# Add cookie consent headers to bypass some login walls | |
'Cookie': 'c_user=0; xs=0; datr=0; locale=en_US; wd=1920x1080; consent_accepted=true; cookie_consent=accepted', | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7', | |
'sec-ch-ua': '"Not_A Brand";v="8", "Chromium";v="120", "Google Chrome";v="120"', | |
'sec-ch-ua-mobile': '?0', | |
'sec-ch-ua-platform': '"macOS"', | |
'Sec-Fetch-Dest': 'document', | |
'Sec-Fetch-Mode': 'navigate', | |
'Sec-Fetch-Site': 'none', | |
'Sec-Fetch-User': '?1', | |
'Upgrade-Insecure-Requests': '1' | |
}) | |
# For Facebook, try to access the mobile version which often has fewer restrictions | |
if 'facebook.com' in url and 'm.facebook.com' not in url: | |
url = url.replace('www.facebook.com', 'm.facebook.com') | |
logger.info(f"Switched to mobile Facebook URL: {url}") | |
# Add a delay to simulate human browsing | |
time.sleep(1) | |
# Try to get the page with multiple attempts | |
max_attempts = 3 | |
for attempt in range(max_attempts): | |
try: | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
break | |
except (requests.exceptions.RequestException, Exception) as e: | |
if attempt < max_attempts - 1: | |
logger.warning(f"Attempt {attempt + 1} failed for {url}: {e}. Retrying...") | |
time.sleep(2) # Wait longer between retries | |
else: | |
raise | |
logger.info(f"Response status: {response.status_code}, Content-Type: {response.headers.get('Content-Type')}") | |
# Save the raw HTML for debugging if needed | |
debug_path = f"/Users/a2014/urld/debug_raw_{int(time.time())}.html" | |
with open(debug_path, "w", encoding="utf-8") as f: | |
f.write(response.text) | |
logger.info(f"Saved raw HTML to {debug_path}") | |
# Check if we got a valid response with content | |
if not response.text or len(response.text) < 100: | |
logger.error(f"Empty or very short response from {url}") | |
return None | |
soup = BeautifulSoup(response.text, 'html.parser') | |
# Remove unwanted elements | |
for element in soup(['script', 'style', 'nav', 'footer', 'header', 'meta', 'link']): | |
element.decompose() | |
# Simulate "ESC key" by removing login walls and overlays common on social media sites | |
login_wall_selectors = [ | |
'.login-wall', '.signup-wall', '.overlay', '.modal', | |
'[role="dialog"]', '[aria-modal="true"]', '.login-overlay', | |
'.signup-overlay', '.uiLayer', '.fb_overlay', '.ReactModalPortal', | |
'[data-testid="login_dialog"]', '[data-testid="signup_dialog"]', | |
'.login-signup-modal', '.onboarding-modal', '.signup-wrapper', | |
'.login-wrapper', '.login-container', '.signup-container', | |
'.login-modal', '.signup-modal', '.auth-modal', '.auth-wall' | |
] | |
for selector in login_wall_selectors: | |
for element in soup.select(selector): | |
logger.info(f"Removing login wall element: {selector}") | |
element.decompose() | |
# Enhanced removal for social media sites | |
if 'facebook.com' in url: | |
# Facebook specific elements - simulating ESC key | |
fb_selectors = [ | |
'[data-testid="cookie-policy-manage-dialog"]', | |
'[role="banner"]', '[role="complementary"]', | |
'.login_form_container', '.login_form', '#login_form', | |
'.uiLayer', '.pluginConnectButton', '.fbPageBanner', | |
'._5hn6', '._67m7', '.nonLoggedInSignUp', | |
'#headerArea', '.uiContextualLayer', '.uiContextualLayerPositioner' | |
] | |
for selector in fb_selectors: | |
for element in soup.select(selector): | |
element.decompose() | |
# Look for the main content in mobile version | |
main_content = soup.select_one('#m_story_permalink_view') or soup.select_one( | |
'#mobile_injected_video_feed_pagelet') | |
if main_content: | |
logger.info("Found Facebook mobile main content") | |
elif 'instagram.com' in url: | |
# Instagram specific elements - simulating ESC key | |
ig_selectors = [ | |
'[role="presentation"]', '[role="banner"]', '[role="complementary"]', | |
'.RnEpo', '._acb3', '._ab8w', '._abn5', '.x1n2onr6', | |
'.x78zum5', '.x1q0g3np', '.xieb3on', '._a9-z', '._a9_1', | |
'._aa4b', '.x1i10hfl', '.x9f619', '.xnz67gz', '.x78zum5', | |
'.x1q0g3np', '.x1gslohp', '.xieb3on', '.x1lcm9me' | |
] | |
for selector in ig_selectors: | |
for element in soup.select(selector): | |
element.decompose() | |
# Try to find the main content | |
insta_content = soup.select_one('main article') or soup.select_one('._aagv') or soup.select_one( | |
'._ab1y') | |
if insta_content: | |
logger.info("Found Instagram main content") | |
elif 'twitter.com' in url or 'x.com' in url: | |
# X/Twitter already works well for public content, but clean up any remaining overlays | |
x_selectors = [ | |
'[data-testid="LoginForm"]', '[data-testid="SignupForm"]', | |
'[data-testid="sheetDialog"]', '[data-testid="mask"]', | |
'.r-zchlnj', '.r-1xcajam', '.r-1d2f490', '.r-1p0dtai', | |
'.r-1pi2tsx', '.r-u8s1d', '.css-175oi2r', '.css-1dbjc4n', | |
'.r-kemksi', '[data-testid="BottomBar"]' | |
] | |
for selector in x_selectors: | |
for element in soup.select(selector): | |
element.decompose() | |
elif 'huggingface.co' in url: | |
# Special handling for Hugging Face | |
logger.info("Applying special handling for Hugging Face") | |
# Try to find the main content | |
hf_selectors = ['.prose', '.space-content', '.model-description', | |
'.dataset-description', 'article', '.markdown'] | |
for selector in hf_selectors: | |
elements = soup.select(selector) | |
if elements: | |
logger.info(f"Found Hugging Face content with selector: {selector}") | |
break | |
# Extract content using a general approach - try multiple strategies | |
# Strategy 1: Look for semantic HTML5 elements | |
main_content = None | |
for selector in ['main', 'article', 'section', '.content', '.main', '.body', '.post', '.entry', | |
'.page']: | |
elements = soup.select(selector) | |
if elements: | |
main_content = elements[0] | |
logger.info(f"Found content with selector: {selector}") | |
break | |
# Strategy 2: If no semantic elements, try common class names | |
if not main_content or not main_content.get_text(strip=True): | |
for div in soup.find_all('div'): | |
class_name = div.get('class', []) | |
id_name = div.get('id', '') | |
if any(term in ' '.join(class_name).lower() for term in | |
['content', 'main', 'body', 'article', 'post']): | |
main_content = div | |
logger.info(f"Found content with div class: {class_name}") | |
break | |
if any(term in id_name.lower() for term in ['content', 'main', 'body', 'article', 'post']): | |
main_content = div | |
logger.info(f"Found content with div id: {id_name}") | |
break | |
# Strategy 3: Fall back to body | |
if not main_content or not main_content.get_text(strip=True): | |
logger.info(f"No main content container found for {url}, using body") | |
main_content = soup.body if soup.body else soup | |
# Extract text with proper spacing | |
text_content = main_content.get_text(separator='\n', strip=True) | |
# Strategy 4: If content is too short, extract all visible text | |
if len(text_content) < 100: | |
logger.info(f"Content too short for {url} ({len(text_content)} chars), using all visible text") | |
visible_text = [] | |
for element in soup.find_all(['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li', 'span', 'div']): | |
if element.get_text(strip=True): | |
visible_text.append(element.get_text(strip=True)) | |
text_content = '\n'.join(visible_text) | |
# Strategy 5: Last resort - get all text from the page | |
if len(text_content) < 50: | |
logger.info(f"Still insufficient content for {url} ({len(text_content)} chars), using entire page text") | |
text_content = soup.get_text(separator='\n', strip=True) | |
# Clean and structure content | |
cleaned_content = self.advanced_text_cleaning(text_content) | |
logger.info(f"Final content length: {len(cleaned_content)} chars") | |
# If we still have no content, this is a failure | |
if len(cleaned_content) < 20: | |
logger.error(f"Failed to extract meaningful content from {url}") | |
return None | |
return { | |
'content': cleaned_content, | |
'content_type': response.headers.get('Content-Type', ''), | |
'timestamp': datetime.now().isoformat(), | |
'url': url # Add the URL to the returned data for reference | |
} | |
except Exception as e: | |
logger.error(f"HTML processing failed for {url}: {e}") | |
return None | |
def _handle_google_drive(self, url: str) -> Optional[Dict]: | |
"""Handle Google Drive document URLs""" | |
try: | |
# Construct direct download URL | |
file_id = url.split("/d/")[1].split("/")[0] | |
download_url = f"https://drive.google.com/uc?export=download&id={file_id}" | |
response = self.session.get(download_url, stream=True, timeout=self.timeout) | |
response.raise_for_status() | |
# Read content (limit to the first 1MB) | |
content = b"" | |
for chunk in response.iter_content(chunk_size=8192): # 8KB chunks | |
content += chunk | |
if len(content) > 1024 * 1024: # 1MB limit | |
content = content[:1024 * 1024] | |
logger.warning(f"Truncated Google Drive file after 1MB") | |
break | |
text_content = content.decode('utf-8', errors='ignore') | |
cleaned_text = self.advanced_text_cleaning(text_content) | |
return { | |
'content': cleaned_text, | |
'content_type': 'text/plain', # Assume plain text for simplicity | |
'timestamp': datetime.now().isoformat(), | |
'url': url, | |
'source': 'google_drive' | |
} | |
except Exception as e: | |
logger.error(f"Error handling Google Drive URL {url}: {e}") | |
return None | |
def _handle_google_calendar(self, url: str) -> Optional[Dict]: | |
"""Handle Google Calendar ICS URLs""" | |
try: | |
response = self.session.get(url, timeout=self.timeout) | |
response.raise_for_status() | |
text_content = response.text | |
cleaned_text = self.advanced_text_cleaning(text_content) | |
return { | |
'content': cleaned_text, | |
'content_type': 'text/calendar', # Correct MIME type | |
'timestamp': datetime.now().isoformat(), | |
'url': url, | |
'source': 'google_calendar' | |
} | |
except Exception as e: | |
logger.error(f"Error handling Google Calendar URL {url}: {e}") | |
return None | |
def _fetch_with_selenium(self, url: str) -> Optional[str]: | |
"""Use Selenium as a fallback for difficult sites""" | |
try: | |
from selenium import webdriver | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.common.exceptions import TimeoutException | |
import time | |
logger.info(f"Attempting to fetch {url} with Selenium") | |
# Set up Chrome options | |
chrome_options = Options() | |
chrome_options.add_argument("--headless") | |
chrome_options.add_argument("--no-sandbox") | |
chrome_options.add_argument("--disable-dev-shm-usage") | |
chrome_options.add_argument("--disable-gpu") | |
chrome_options.add_argument("--window-size=1920,1080") | |
chrome_options.add_argument( | |
"user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36") | |
# Initialize the driver | |
driver = webdriver.Chrome(options=chrome_options) | |
try: | |
# Navigate to the URL | |
driver.get(url) | |
# Wait for the page to load | |
WebDriverWait(driver, 10).until( | |
EC.presence_of_element_located((By.TAG_NAME, "body")) | |
) | |
# Simulate pressing ESC key to dismiss overlays | |
from selenium.webdriver.common.keys import Keys | |
action_chains = webdriver.ActionChains(driver) | |
action_chains.send_keys(Keys.ESCAPE).perform() | |
time.sleep(1) # give it a moment to take effect | |
action_chains.reset_actions() # Clear actions | |
# try again | |
action_chains.send_keys(Keys.ESCAPE).perform() | |
time.sleep(1) # give it a moment to take effect | |
action_chains.reset_actions() | |
# Get the page source | |
page_source = driver.page_source | |
# Save the Selenium HTML for debugging | |
debug_path = f"/Users/a2014/urld/debug_selenium_{int(time.time())}.html" | |
with open(debug_path, "w", encoding="utf-8") as f: | |
f.write(page_source) | |
logger.info(f"Saved Selenium HTML to {debug_path}") | |
return page_source | |
finally: | |
driver.quit() | |
except ImportError: | |
logger.error("Selenium is not installed. Cannot use browser automation.") | |
return None | |
except Exception as e: | |
logger.error(f"Selenium processing failed for {url}: {e}") | |
return None | |
class FileProcessor: | |
"""Class to handle file processing""" | |
def __init__(self, max_file_size: int = 2 * 1024 * 1024 * 1024): # 2GB default | |
self.max_file_size = max_file_size | |
self.supported_text_extensions = {'.txt', '.md', '.csv', '.json', '.xml'} | |
def is_text_file(self, filepath: str) -> bool: | |
"""Check if file is a text file""" | |
try: | |
mime_type, _ = mimetypes.guess_type(filepath) | |
return (mime_type and mime_type.startswith('text/')) or \ | |
(os.path.splitext(filepath)[1].lower() in self.supported_text_extensions) | |
except Exception: | |
return False | |
def process_file(self, file) -> List[Dict]: | |
"""Process uploaded file with enhanced error handling""" | |
if not file: | |
return [] | |
dataset = [] | |
try: | |
file_size = os.path.getsize(file.name) | |
if file_size > self.max_file_size: | |
logger.warning(f"File size ({file_size} bytes) exceeds maximum allowed size") | |
return [] | |
with tempfile.TemporaryDirectory() as temp_dir: | |
if zipfile.is_zipfile(file.name): | |
dataset.extend(self._process_zip_file(file.name, temp_dir)) | |
else: | |
dataset.extend(self._process_single_file(file)) | |
except Exception as e: | |
logger.error(f"Error processing file: {str(e)}") | |
return [] | |
return dataset | |
def _process_zip_file(self, zip_path, temp_dir): | |
"""Extract and process files within a ZIP archive.""" | |
result = [] | |
with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
zip_ref.extractall(temp_dir) | |
for extracted_file in os.listdir(temp_dir): | |
extracted_file_path = os.path.join(temp_dir, extracted_file) | |
if os.path.isfile(extracted_file_path): | |
with open(extracted_file_path, 'r', encoding='utf-8', errors='ignore') as f: | |
result.append({ | |
'source': 'file_from_zip', | |
'filename': extracted_file, | |
'content': f.read(), | |
'timestamp': datetime.now().isoformat() | |
}) | |
return result | |
def _process_single_file(self, file) -> List[Dict]: | |
try: | |
file_stat = os.stat(file.name) | |
# For very large files, read in chunks and summarize | |
if file_stat.st_size > 100 * 1024 * 1024: # 100MB | |
logger.info(f"Processing large file: {file.name} ({file_stat.st_size} bytes)") | |
# Read first and last 1MB for extremely large files | |
content = "" | |
with open(file.name, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read(1 * 1024 * 1024) # First 1MB | |
content += "\n...[Content truncated due to large file size]...\n" | |
# Seek to the last 1MB | |
f.seek(max(0, file_stat.st_size - 1 * 1024 * 1024)) | |
content += f.read() # Last 1MB | |
else: | |
# Regular file processing | |
with open(file.name, 'r', encoding='utf-8', errors='ignore') as f: | |
content = f.read() | |
return [{ | |
'source': 'file', | |
'filename': os.path.basename(file.name), | |
'file_size': file_stat.st_size, | |
'mime_type': mimetypes.guess_type(file.name)[0], | |
'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(), | |
'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(), | |
'content': content, | |
'timestamp': datetime.now().isoformat() | |
}] | |
except Exception as e: | |
logger.error(f"File processing error: {e}") | |
return [] | |
# Move process_all_inputs outside of the FileProcessor class | |
def process_all_inputs(urls, file, text, notes): | |
"""Process all input types with progress tracking""" | |
try: | |
processor = URLProcessor() | |
file_processor = FileProcessor() | |
results = [] | |
# Process URLs | |
if urls: | |
url_list = re.split(r'[,\n]', urls) | |
url_list = [url.strip() for url in url_list if url.strip()] | |
for url in url_list: | |
validation = processor.validate_url(url) | |
if validation.get('is_valid'): | |
content = processor.fetch_content(url) | |
if content: | |
results.append({ | |
'source': 'url', | |
'url': url, | |
'content': content, | |
'timestamp': datetime.now().isoformat() | |
}) | |
# Process files | |
if file: | |
results.extend(file_processor.process_file(file)) | |
# Process text input | |
if text: | |
cleaned_text = processor.advanced_text_cleaning(text) | |
results.append({ | |
'source': 'direct_input', | |
'content': cleaned_text, | |
'timestamp': datetime.now().isoformat() | |
}) | |
# Generate output | |
if results: | |
output_dir = Path('output') / datetime.now().strftime('%Y-%m-%d') | |
output_dir.mkdir(parents=True, exist_ok=True) | |
output_path = output_dir / f'processed_{int(time.time())}.json' | |
with open(output_path, 'w', encoding='utf-8') as f: | |
json.dump(results, f, ensure_ascii=False, indent=2) | |
summary = f"Processed {len(results)} items successfully!" | |
json_data = json.dumps(results, indent=2) # Prepare JSON for QR code | |
return str(output_path), summary, json_data # Return JSON for editor | |
else: | |
return None, "No valid content to process.", "" | |
except Exception as e: | |
logger.error(f"Processing error: {e}") | |
return None, f"Error: {str(e)}", "" | |
# Also move generate_qr_code outside of the FileProcessor class | |
def generate_qr_code(json_data): | |
"""Generate QR code from JSON data and return the file path.""" | |
if json_data: | |
return generate_qr(json_data) | |
# Move generate_qr outside of the FileProcessor class as well | |
def generate_qr(json_data): | |
"""Generate QR code from JSON data and return the file path.""" | |
try: | |
# Try first with automatic version selection | |
qr = qrcode.QRCode( | |
error_correction=qrcode.constants.ERROR_CORRECT_L, | |
box_size=10, | |
border=4, | |
) | |
qr.add_data(json_data) | |
qr.make(fit=True) | |
img = qrcode.make_image(fill_color="black", back_color="white") | |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".png") | |
img.save(temp_file.name) | |
return temp_file.name | |
except Exception as e: | |
# If the data is too large for a QR code | |
logger.error(f"QR generation error: {e}") | |
# Create a simple QR with error message | |
qr = qrcode.QRCode( | |
version=1, | |
error_correction=qrcode.constants.ERROR_CORRECT_L, | |
box_size=10, | |
border=4, | |
) | |
qr.add_data("Error: Data too large for QR code") | |
qr.make(fit=True) | |
img = qrcode.make_image(fill_color="black", back_color="white") | |
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".png") | |
img.save(temp_file.name) | |
return temp_file.name | |
def create_interface(): | |
"""Create a comprehensive Gradio interface with advanced features""" | |
css = """ | |
.container { max-width: 1200px; margin: auto; } | |
.warning { background-color: #fff3cd; color: #856404; } | |
.error { background-color: #f8d7da; color: #721c24; } | |
""" | |
with gr.Blocks(css=css, title="Advanced Text & URL Processing") as interface: | |
gr.Markdown("# 🌐 Advanced URL & Text Processing Toolkit") | |
with gr.Tab("URL Processing"): | |
url_input = gr.Textbox( | |
label="Enter URLs (comma or newline separated)", | |
lines=5, | |
placeholder="https://example1.com\nhttps://example2.com" | |
) | |
with gr.Tab("File Input"): | |
file_input = gr.File( | |
label="Upload text file or ZIP archive", | |
file_types=[".txt", ".zip", ".md", ".csv", ".json", ".xml"] | |
) | |
with gr.Tab("Text Input"): | |
text_input = gr.Textbox( | |
label="Raw Text Input", | |
lines=5, | |
placeholder="Paste your text here..." | |
) | |
with gr.Tab("JSON Editor"): | |
json_editor = gr.Textbox( | |
label="JSON Editor", | |
lines=20, | |
placeholder="View and edit your JSON data here...", | |
interactive=True, | |
elem_id="json-editor" # Optional: for custom styling | |
) | |
with gr.Tab("Scratchpad"): | |
scratchpad = gr.Textbox( | |
label="Scratchpad", | |
lines=10, | |
placeholder="Quick notes or text collections...", | |
interactive=True | |
) | |
process_btn = gr.Button("Process Input", variant="primary") | |
qr_btn = gr.Button("Generate QR Code", variant="secondary") | |
output_text = gr.Textbox(label="Processing Results", interactive=False) | |
output_file = gr.File(label="Processed Output") | |
qr_output = gr.Image(label="QR Code", type="filepath") # To display the generated QR code | |
process_btn.click( | |
process_all_inputs, | |
inputs=[url_input, file_input, text_input, scratchpad], | |
outputs=[output_file, output_text, json_editor] # Update outputs to include JSON editor | |
) | |
qr_btn.click( | |
generate_qr_code, | |
inputs=json_editor, | |
outputs=qr_output | |
) | |
gr.Markdown(""" | |
### Usage Guidelines | |
- **URL Processing**: Enter valid HTTP/HTTPS URLs | |
- **File Input**: Upload text files or ZIP archives | |
- ** Text Input**: Direct text processing | |
- **JSON Editor**: View and edit your JSON data | |
- **Scratchpad**: Quick notes or text collections | |
- Advanced cleaning and validation included | |
""") | |
return interface | |
def check_network_connectivity(): | |
"""Check if the network is working properly by testing connection to common sites""" | |
test_sites = ["https://www.google.com", "https://www.cloudflare.com", "https://www.amazon.com"] | |
results = [] | |
for site in test_sites: | |
try: | |
response = requests.get(site, timeout=5) | |
results.append({ | |
"site": site, | |
"status": "OK" if response.status_code == 200 else f"Error: {response.status_code}", | |
"response_time": response.elapsed.total_seconds() | |
}) | |
except Exception as e: | |
results.append({ | |
"site": site, | |
"status": f"Error: {str(e)}", | |
"response_time": None | |
}) | |
# If all sites failed, there might be a network issue | |
if all(result["status"].startswith("Error") for result in results): | |
logger.error("Network connectivity issue detected. All test sites failed.") | |
return False, results | |
return True, results | |
# Add this to the main function | |
def main(): | |
# Configure system settings | |
mimetypes.init() | |
# Check network connectivity | |
network_ok, network_results = check_network_connectivity() | |
if not network_ok: | |
logger.warning("Network connectivity issues detected. Some features may not work properly.") | |
for result in network_results: | |
logger.warning(f"Test site {result['site']}: {result['status']}") | |
# Create and launch interface | |
interface = create_interface() | |
# Launch with proper configuration | |
interface.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
show_error=True, | |
share=False, | |
inbrowser=True, | |
debug=True | |
) | |
if __name__ == "__main__": | |
main() | |