import gradio as gr import requests from bs4 import BeautifulSoup from urllib.parse import urljoin, urlparse from fpdf import FPDF import tempfile import re import logging import asyncio import aiohttp from aiolimiter import AsyncLimiter import sqlite3 from contextlib import contextmanager from threading import local logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Thread-local storage for database connections thread_local = local() # Rate limiter: 10 requests per second rate_limiter = AsyncLimiter(10, 1) @contextmanager def get_db_connection(): if not hasattr(thread_local, "connection"): thread_local.connection = sqlite3.connect('crawl_cache.db') try: yield thread_local.connection finally: pass # We'll keep the connection open for reuse def init_db(): with get_db_connection() as conn: c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS pages (url TEXT PRIMARY KEY, content TEXT, depth INTEGER)''') conn.commit() init_db() def clean_text(text): text = ''.join(char for char in text if char.isprintable()) text = re.sub(r'[^\x00-\x7F]+', ' ', text) return text async def get_page_content(session, url): try: async with rate_limiter: async with session.get(url, timeout=30) as response: if response.status == 200: text = await response.text() soup = BeautifulSoup(text, 'html.parser') content = [] main_content = soup.find('article') or soup.find('main') or soup if main_content: for tag in ['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li']: for element in main_content.find_all(tag): text = clean_text(element.get_text(strip=True)) if text: content.append(text) logger.info(f"Found {len(content)} content items for {url}") return content else: logger.error(f"Error fetching {url}: HTTP {response.status}") return [f"Error fetching {url}: HTTP {response.status}"] except Exception as e: logger.error(f"Error processing {url}: {str(e)}") return [f"Error processing {url}: {str(e)}"] async def get_links(session, url, base_url): try: async with rate_limiter: async with session.get(url, timeout=30) as response: if response.status == 200: text = await response.text() soup = BeautifulSoup(text, 'html.parser') links = soup.find_all('a', href=True) valid_links = [] for link in links: full_url = urljoin(url, link['href']) if full_url.startswith(base_url) and full_url != url: valid_links.append(full_url) return valid_links else: logger.error(f"Error fetching links from {url}: HTTP {response.status}") return [] except Exception as e: logger.error(f"Error getting links from {url}: {str(e)}") return [] async def crawl_pages(base_url, max_depth): visited = set() to_visit = [(base_url, 0)] all_pages = [] async with aiohttp.ClientSession() as session: while to_visit: current_url, depth = to_visit.pop(0) if current_url in visited or depth > max_depth: continue visited.add(current_url) with get_db_connection() as conn: c = conn.cursor() c.execute("SELECT content FROM pages WHERE url = ?", (current_url,)) result = c.fetchone() if result: content = eval(result[0]) # Convert string back to list else: content = await get_page_content(session, current_url) with get_db_connection() as conn: c = conn.cursor() c.execute("INSERT INTO pages VALUES (?, ?, ?)", (current_url, str(content), depth)) conn.commit() all_pages.append((current_url, content)) logger.info(f"Processed page: {current_url} at depth {depth}") if depth < max_depth: links = await get_links(session, current_url, base_url) for link in links: if link not in visited: to_visit.append((link, depth + 1)) return all_pages def website_to_pdf(all_pages): logger.info(f"Starting PDF generation for {len(all_pages)} pages") pdf = FPDF() pdf.set_auto_page_break(auto=True, margin=15) pdf.add_page() pdf.set_font("Arial", size=12) for page_url, content in all_pages: pdf.cell(0, 10, txt=page_url, ln=True) pdf.ln(5) for text in content: try: pdf.multi_cell(0, 10, txt=text[:200]) # Limit text length to avoid issues except Exception as e: logger.error(f"Error writing text to PDF: {str(e)}") pdf.add_page() with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp: pdf_path = tmp.name pdf.output(pdf_path) logger.info(f"PDF saved to: {pdf_path}") return pdf_path async def process_url(url, depth): try: all_pages = await crawl_pages(url, depth) pdf_file = website_to_pdf(all_pages) return pdf_file except Exception as e: logger.error(f"Error in process_url: {str(e)}") return f"An error occurred: {str(e)}" def run_async(url, depth): return asyncio.run(process_url(url, depth)) iface = gr.Interface( fn=run_async, inputs=[ gr.Textbox(label="Enter website URL (e.g., https://www.gradio.app/docs)"), gr.Slider(minimum=1, maximum=10, value=3, step=1, label="Crawl Depth") ], outputs=gr.File(label="Download PDF"), title="Website to PDF Converter", description="Enter docs URL and crawl depth to convert documentation pages into a PDF. Be responsible for sites you have permission to do this" ) if __name__ == "__main__": iface.launch()