Spaces:
Running
Running
import json | |
import sys | |
sys.path.append(./config') | |
import config | |
import os | |
import re | |
import time | |
import logging | |
import mimetypes | |
import tempfile | |
from datetime import datetime | |
from pathlib import Path | |
from urllib.parse import urlparse | |
from typing import List, Dict, Tuple, Union, Optional | |
import requests | |
import validators | |
import gradio as gr | |
from bs4 import BeautifulSoup | |
from fake_useragent import UserAgent | |
from cleantext import clean | |
import qrcode | |
import zipfile | |
# Setup logging with detailed configuration | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s', | |
handlers=[ | |
logging.StreamHandler(), | |
logging.FileHandler('app.log', encoding='utf-8') | |
]) | |
logger = logging.getLogger(__name__) | |
# Add these imports at the top | |
from config import Config | |
from proxy_handler import ProxyHandler | |
from robots_handler import RobotsHandler | |
import asyncio | |
import aiohttp | |
from tqdm import tqdm | |
# Add new imports for rate limiting and testing | |
from ratelimit import limits, sleep_and_retry | |
from typing import Dict, Any, Optional, List | |
import pytest | |
from urllib.robotparser import RobotFileParser | |
import concurrent.futures | |
class URLProcessor: | |
def __init__(self): | |
self.config = Config() | |
self.proxy_handler = ProxyHandler(self.config.get('PROXY_URL')) | |
self.robots_handler = RobotsHandler() | |
self.session = self._create_session() | |
self.rate_limit = self.config.get('RATE_LIMIT', 60) # requests per minute | |
self.timeout = self.config.get('TIMEOUT', 10) | |
@sleep_and_retry | |
@limits(calls=60, period=60) # Rate limiting decorator | |
def fetch_content(self, url: str) -> Optional[Dict]: | |
"""Fetch content with rate limiting""" | |
if self.config.get('RESPECT_ROBOTS', True): | |
if not self.robots_handler.can_fetch(url): | |
logger.warning(f"Skipping {url} - robots.txt disallowed") | |
return None | |
def _create_session(self): | |
session = requests.Session() | |
if self.config.get('USE_PROXY'): | |
session.proxies = self.proxy_handler.get_proxy_config() | |
session.headers.update({ | |
'User-Agent': UserAgent().random, | |
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.5', | |
'Accept-Encoding': 'gzip, deflate, br', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1' | |
}) | |
return session | |
def _fetch_with_selenium(self, url: str) -> Optional[str]: | |
try: | |
chrome_options = Options() | |
from selenium import webdriver | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.common.exceptions import TimeoutException | |
import time | |
logger.info(f"Attempting to fetch {url} with Selenium") | |
# Set up Chrome options | |
chrome_options = Options() | |
chrome_options.add_argument("--headless") | |
chrome_options.add_argument("--no-sandbox") | |
chrome_options.add_argument("--disable-dev-shm-usage") | |
chrome_options.add_argument("--disable-gpu") | |
chrome_options.add_argument("--window-size=1920,1080") | |
chrome_options.add_argument( | |
"user-agent=Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36") | |
# Initialize the driver | |
driver = webdriver.Chrome(options=chrome_options) | |
try: | |
# Navigate to the URL | |
driver.get(url) | |
# Wait for the page to load | |
WebDriverWait(driver, 10).until( | |
EC.presence_of_element_located((By.TAG_NAME, "body")) | |
) | |
# Simulate pressing ESC key to dismiss overlays | |
from selenium.webdriver.common.keys import Keys | |
action_chains = webdriver.ActionChains(driver) | |
action_chains.send_keys(Keys.ESCAPE).perform() | |
time.sleep(1) # give it a moment to take effect | |
action_chains.reset_actions() # Clear actions | |
# try again | |
action_chains.send_keys(Keys.ESCAPE).perform() | |
time.sleep(1) # give it a moment to take effect | |
action_chains.reset_actions() | |
# Get the page source | |
page_source = driver.page_source | |
# Save the Selenium HTML for debugging | |
debug_path = f"/Users/a2014/urld/debug_selenium_{int(time.time())}.html" | |
with open(debug_path, "w", encoding="utf-8") as f: | |
f.write(page_source) | |
logger.info(f"Saved Selenium HTML to {debug_path}") | |
return page_source | |
finally: | |
driver.quit() | |
except ImportError: | |
logger.error("Selenium is not installed. Cannot use browser automation.") | |
return None | |
except Exception as e: | |
logger.error(f"Selenium processing failed for {url}: {e}") | |
return None | |
async def fetch_urls_async(self, urls: List[str]) -> List[Dict]: | |
"""Asynchronous URL fetching with rate limiting""" | |
async with aiohttp.ClientSession() as session: | |
tasks = [] | |
for url in urls: | |
if len(tasks) >= self.rate_limit: | |
await asyncio.sleep(60) # Rate limiting | |
tasks = [] | |
tasks.append(self.fetch_content_async(session, url)) | |
return await asyncio.gather(*tasks) | |
def create_interface(): | |
css = """ | |
.container { max-width: 1200px; margin: auto; } | |
.warning { background-color: #fff3cd; color: #856404; } | |
.error { background-color: #f8d7da; color: #721c24; } | |
""" | |
with gr.Blocks(css=css, title="Advanced Text & URL Processor") as interface: | |
with gr.Tab("Settings"): | |
respect_robots = gr.Checkbox(label="Respect robots.txt", value=True) | |
use_proxy = gr.Checkbox(label="Use Proxy", value=False) | |
proxy_url = gr.Textbox(label="Proxy URL", placeholder="http://proxy:port") | |
request_delay = gr.Slider(minimum=0, maximum=10, value=1, label="Request Delay (seconds)") | |
output_format = gr.Dropdown(choices=["json", "csv", "txt"], value="json", label="Output Format") | |
gr.Markdown("# 🌐 Advanced URL & Text Processing Toolkit") | |
with gr.Tab("URL Processing"): | |
url_input = gr.Textbox( | |
label="Enter URLs (comma or newline separated)", | |
lines=5, | |
placeholder="https://example1.com\nhttps://example2.com" | |
) | |
with gr.Tab("File Input"): | |
file_input = gr.File( | |
label="Upload text file or ZIP archive", | |
file_types=[".txt", ".zip", ".md", ".csv", ".json", ".xml"] | |
) | |
with gr.Tab("Text Input"): | |
text_input = gr.Textbox( | |
label="Raw Text Input", | |
lines=5, | |
placeholder="Paste your text here..." | |
) | |
with gr.Tab("JSON Editor"): | |
json_editor = gr.Textbox( | |
label="JSON Editor", | |
lines=20, | |
placeholder="View and edit your JSON data here...", | |
interactive=True, | |
elem_id="json-editor" # Optional: for custom styling | |
) | |
with gr.Tab("Scratchpad"): | |
scratchpad = gr.Textbox( | |
label="Scratchpad", | |
lines=10, | |
placeholder="Quick notes or text collections...", | |
interactive=True | |
) | |
process_btn = gr.Button("Process Input", variant="primary") | |
qr_btn = gr.Button("Generate QR Code", variant="secondary") | |
output_text = gr.Textbox(label="Processing Results", interactive=False) | |
output_file = gr.File(label="Processed Output") | |
qr_output = gr.Image(label="QR Code", type="filepath") # To display the generated QR code | |
process_btn.click( | |
process_all_inputs, | |
inputs=[url_input, file_input, text_input, scratchpad], | |
outputs=[output_file, output_text, json_editor] # Update outputs to include JSON editor | |
) | |
qr_btn.click( | |
generate_qr_code, | |
inputs=json_editor, | |
outputs=qr_output | |
) | |
gr.Markdown(""" | |
### Usage Guidelines | |
- **URL Processing**: Enter valid HTTP/HTTPS URLs | |
- **File Input**: Upload text files or ZIP archives | |
- ** Text Input**: Direct text processing | |
- **JSON Editor**: View and edit your JSON data | |
- **Scratchpad**: Quick notes or text collections | |
- Advanced cleaning and validation included | |
""") | |
return interface | |
def check_network_connectivity(): | |
"""Check if the network is working properly by testing connection to common sites""" | |
test_sites = ["https://www.google.com", "https://www.cloudflare.com", "https://www.amazon.com"] | |
results = [] | |
for site in test_sites: | |
try: | |
response = requests.get(site, timeout=5) | |
results.append({ | |
"site": site, | |
"status": "OK" if response.status_code == 200 else f"Error: {response.status_code}", | |
"response_time": response.elapsed.total_seconds() | |
}) | |
except Exception as e: | |
results.append({ | |
"site": site, | |
"status": f"Error: {str(e)}", | |
"response_time": None | |
}) | |
# If all sites failed, there might be a network issue | |
if all(result["status"].startswith("Error") for result in results): | |
logger.error("Network connectivity issue detected. All test sites failed.") | |
return False, results | |
return True, results | |
def validate_config(config: Dict[str, Any]) -> Dict[str, str]: | |
"""Validate configuration settings""" | |
errors = {} | |
if config.get('RATE_LIMIT', 0) < 1: | |
errors['rate_limit'] = "Rate limit must be positive" | |
if config.get('TIMEOUT', 0) < 1: | |
errors['timeout'] = "Timeout must be positive" | |
if config.get('USE_PROXY') and not config.get('PROXY_URL'): | |
errors['proxy'] = "Proxy URL required when proxy is enabled" | |
return errors | |
def update_settings(respect_robots: bool, use_proxy: bool, proxy_url: str, | |
request_delay: float, output_format: str) -> str: | |
"""Update application settings""" | |
config = Config() | |
new_settings = { | |
'RESPECT_ROBOTS': respect_robots, | |
'USE_PROXY': use_proxy, | |
'PROXY_URL': proxy_url, | |
'REQUEST_DELAY': request_delay, | |
'OUTPUT_FORMAT': output_format | |
} | |
# Validate settings before updating | |
errors = validate_config(new_settings) | |
if errors: | |
return f"Configuration error: {', '.join(errors.values())}" | |
config.update(new_settings) | |
return "Configuration updated successfully" | |
def create_settings_tab() -> gr.Tab: | |
"""Create settings tab with configuration controls""" | |
with gr.Tab("Settings") as settings_tab: | |
respect_robots = gr.Checkbox(label="Respect robots.txt", value=True) | |
use_proxy = gr.Checkbox(label="Use Proxy", value=False) | |
proxy_url = gr.Textbox(label="Proxy URL", placeholder="http://proxy:port") | |
request_delay = gr.Slider(minimum=0, maximum=10, value=1, label="Request Delay (seconds)") | |
output_format = gr.Dropdown(choices=["json", "csv", "txt"], value="json", label="Output Format") | |
settings_btn = gr.Button("Update Settings") | |
settings_output = gr.Textbox(label="Settings Status") | |
settings_btn.click( | |
update_settings, | |
inputs=[respect_robots, use_proxy, proxy_url, request_delay, output_format], | |
outputs=settings_output | |
) | |
return settings_tab | |
def main(): | |
"""Main application entry point""" | |
try: | |
# Initialize system settings | |
mimetypes.init() | |
# Validate initial configuration | |
config = Config() | |
errors = validate_config(config.get_all()) | |
if errors: | |
logger.error(f"Configuration errors found: {errors}") | |
sys.exit(1) | |
# Check network connectivity | |
network_ok, network_results = check_network_connectivity() | |
if not network_ok: | |
logger.warning("Network connectivity issues detected. Some features may not work properly.") | |
for result in network_results: | |
logger.warning(f"Test site {result['site']}: {result['status']}") | |
# Create and launch interface | |
interface = create_interface() | |
# Launch with proper configuration | |
interface.launch( | |
server_name="0.0.0.0", | |
server_port=7860, | |
show_error=True, | |
share=False, | |
inbrowser=True, | |
debug=True | |
) | |
except Exception as e: | |
logger.error(f"Application startup failed: {str(e)}") | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() | |