Spaces:
Sleeping
Sleeping
import dash | |
from dash import dcc, html, Input, Output, State | |
import dash_bootstrap_components as dbc | |
from dash.exceptions import PreventUpdate | |
import base64 | |
import requests | |
from bs4 import BeautifulSoup | |
from urllib.parse import urljoin, urlparse | |
from fpdf import FPDF | |
import re | |
import logging | |
import asyncio | |
import aiohttp | |
from aiolimiter import AsyncLimiter | |
import sqlite3 | |
from contextlib import contextmanager | |
from threading import local | |
import time | |
import os | |
import ssl | |
from io import BytesIO | |
from concurrent.futures import ThreadPoolExecutor | |
import math | |
from PyPDF2 import PdfMerger | |
# Initialize Dash app | |
app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP]) | |
server = app.server | |
# Logging setup | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# Thread-local storage for database connections | |
thread_local = local() | |
# Rate limiter: 10 requests per second | |
rate_limiter = AsyncLimiter(10, 1) | |
# Create an SSL context that ignores certificate verification | |
ssl_context = ssl.create_default_context() | |
ssl_context.check_hostname = False | |
ssl_context.verify_mode = ssl.CERT_NONE | |
def get_db_connection(): | |
if not hasattr(thread_local, "connection"): | |
thread_local.connection = sqlite3.connect('crawl_cache.db') | |
try: | |
yield thread_local.connection | |
finally: | |
pass # We'll keep the connection open for reuse | |
def init_db(): | |
with get_db_connection() as conn: | |
c = conn.cursor() | |
c.execute('''CREATE TABLE IF NOT EXISTS pages | |
(url TEXT PRIMARY KEY, content TEXT, depth INTEGER)''') | |
c.execute('''CREATE INDEX IF NOT EXISTS idx_url ON pages(url)''') | |
conn.commit() | |
init_db() | |
def clean_text(text): | |
text = ''.join(char for char in text if char.isprintable()) | |
text = re.sub(r'[^\x00-\x7F]+', ' ', text) | |
return text | |
async def get_page_content(session, url): | |
try: | |
async with rate_limiter: | |
async with session.get(url, timeout=30) as response: | |
if response.status == 200: | |
text = await response.text() | |
soup = BeautifulSoup(text, 'html.parser') | |
content = [] | |
main_content = soup.find('article') or soup.find('main') or soup | |
if main_content: | |
for tag in ['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li']: | |
for element in main_content.find_all(tag): | |
text = clean_text(element.get_text(strip=True)) | |
if text: | |
content.append(text) | |
logger.info(f"Found {len(content)} content items for {url}") | |
return content | |
else: | |
logger.error(f"Error fetching {url}: HTTP {response.status}") | |
return [f"Error fetching {url}: HTTP {response.status}"] | |
except Exception as e: | |
logger.error(f"Error processing {url}: {str(e)}") | |
return [f"Error processing {url}: {str(e)}"] | |
async def get_links(session, url, base_url): | |
try: | |
async with rate_limiter: | |
async with session.get(url, timeout=30) as response: | |
if response.status == 200: | |
text = await response.text() | |
soup = BeautifulSoup(text, 'html.parser') | |
links = soup.find_all('a', href=True) | |
valid_links = [] | |
for link in links: | |
full_url = urljoin(url, link['href']) | |
if full_url.startswith(base_url) and full_url != url: | |
valid_links.append(full_url) | |
return valid_links | |
else: | |
logger.error(f"Error fetching links from {url}: HTTP {response.status}") | |
return [] | |
except Exception as e: | |
logger.error(f"Error getting links from {url}: {str(e)}") | |
return [] | |
async def crawl_pages(base_url, max_depth, progress_callback): | |
visited = set() | |
to_visit = [(base_url, 0)] | |
all_pages = [] | |
async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=ssl_context)) as session: | |
while to_visit: | |
current_url, depth = to_visit.pop(0) | |
if current_url in visited or depth > max_depth: | |
continue | |
visited.add(current_url) | |
start_time = time.time() | |
try: | |
with get_db_connection() as conn: | |
c = conn.cursor() | |
c.execute("SELECT content FROM pages WHERE url = ?", (current_url,)) | |
result = c.fetchone() | |
if result: | |
content = eval(result[0]) # Convert string back to list | |
else: | |
content = await get_page_content(session, current_url) | |
with get_db_connection() as conn: | |
c = conn.cursor() | |
c.execute("INSERT INTO pages VALUES (?, ?, ?)", (current_url, str(content), depth)) | |
conn.commit() | |
all_pages.append((current_url, content)) | |
logger.info(f"Processed page: {current_url} at depth {depth} in {time.time() - start_time:.2f} seconds") | |
progress = len(all_pages) / (max_depth * 10) # Rough estimate | |
progress_callback(f"Crawling pages... {progress:.0%}") | |
if depth < max_depth: | |
links = await get_links(session, current_url, base_url) | |
for link in links: | |
if link not in visited: | |
to_visit.append((link, depth + 1)) | |
except Exception as e: | |
logger.error(f"Error processing {current_url}: {str(e)}") | |
# Continue with the next URL even if this one fails | |
return all_pages | |
def create_pdf_chunk(chunk, start_index): | |
pdf = FPDF() | |
pdf.set_auto_page_break(auto=True, margin=15) | |
pdf.add_page() | |
pdf.set_font("Arial", size=12) | |
for i, (page_url, content) in enumerate(chunk, start=start_index): | |
pdf.cell(0, 10, txt=f"Page {i+1}: {page_url}", ln=True) | |
pdf.ln(5) | |
for text in content: | |
try: | |
pdf.multi_cell(0, 10, txt=text[:200]) # Limit text length to avoid issues | |
except Exception as e: | |
logger.error(f"Error writing text to PDF: {str(e)}") | |
pdf.add_page() | |
return pdf.output(dest='S').encode('latin-1') | |
async def website_to_pdf(all_pages, progress_callback): | |
logger.info(f"Starting PDF generation for {len(all_pages)} pages") | |
chunk_size = 100 | |
num_chunks = math.ceil(len(all_pages) / chunk_size) | |
pdf_chunks = [] | |
with ThreadPoolExecutor() as executor: | |
futures = [] | |
for i in range(num_chunks): | |
start = i * chunk_size | |
end = min((i + 1) * chunk_size, len(all_pages)) | |
chunk = all_pages[start:end] | |
future = executor.submit(create_pdf_chunk, chunk, start) | |
futures.append(future) | |
for i, future in enumerate(futures): | |
try: | |
pdf_chunk = await asyncio.wrap_future(future) | |
pdf_chunks.append(pdf_chunk) | |
progress = (i + 1) / num_chunks | |
progress_callback(f"Generating PDF... {progress:.0%}") | |
except Exception as e: | |
logger.error(f"Error generating PDF chunk {i}: {str(e)}") | |
# Combine PDF chunks using PyPDF2 | |
merger = PdfMerger() | |
for chunk in pdf_chunks: | |
merger.append(BytesIO(chunk)) | |
output = BytesIO() | |
merger.write(output) | |
merger.close() | |
return output.getvalue() | |
async def process_url(url, depth, progress_callback): | |
try: | |
all_pages = await asyncio.wait_for(crawl_pages(url, depth, progress_callback), timeout=3600) # 1 hour timeout | |
if not all_pages: | |
return "No pages were successfully crawled. Please check the URL and try again." | |
pdf_content = await asyncio.wait_for(website_to_pdf(all_pages, progress_callback), timeout=3600) # 1 hour timeout for PDF generation | |
return pdf_content | |
except asyncio.TimeoutError: | |
logger.error("Process timed out after 1 hour") | |
return "The process timed out after 1 hour. Please try again with a smaller depth or a more specific URL." | |
except Exception as e: | |
logger.error(f"Error in process_url: {str(e)}") | |
return f"An error occurred: {str(e)}" | |
# App layout | |
app.layout = dbc.Container([ | |
dcc.Store(id='pdf-store'), | |
dcc.Store(id='progress-store'), | |
dbc.Navbar( | |
dbc.Container([ | |
html.A( | |
dbc.Row([ | |
dbc.Col(html.Img(src="/assets/logo.png", height="30px")), | |
dbc.Col(dbc.NavbarBrand("Website to PDF Converter", className="ms-2")), | |
], | |
align="center", | |
className="g-0", | |
), | |
href="/", | |
style={"textDecoration": "none"}, | |
) | |
]), | |
color="#116F70", | |
dark=True, | |
), | |
dbc.Card( | |
dbc.CardBody([ | |
html.H1("Website to PDF Converter", className="text-center mb-4"), | |
html.P("Enter docs URL and crawl depth to convert documentation pages into a PDF. Be responsible for sites you have permission to do this", className="text-center mb-4"), | |
dbc.Input(id="url-input", type="text", placeholder="Enter website URL (e.g., https://www.gradio.app/docs)", className="mb-3"), | |
dcc.Slider(id="depth-slider", min=1, max=10, step=1, value=3, marks={i: str(i) for i in range(1, 11)}, className="mb-3"), | |
dbc.Button("Convert to PDF", id="submit-button", color="primary", className="mb-3 w-100"), | |
dbc.Progress(id="progress-bar", animated=True, striped=True, className="mb-3"), | |
dbc.Spinner(html.Div(id="output-area"), color="primary", type="grow"), | |
dcc.Interval(id="progress-interval", interval=1000, n_intervals=0, disabled=True), | |
]), | |
className="mt-4" | |
) | |
], fluid=True) | |
def update_output(n_clicks, n_intervals, url, depth, progress): | |
ctx = dash.callback_context | |
if not ctx.triggered: | |
raise PreventUpdate | |
triggered_id = ctx.triggered[0]['prop_id'].split('.')[0] | |
if triggered_id == "submit-button": | |
if not url: | |
return "Please enter a valid URL.", True, 0, "" | |
return "Processing... Please wait.", False, 0, "0%" | |
elif triggered_id == "progress-interval": | |
store = dash.callback_context.inputs.get('pdf-store', None) | |
if store is None: | |
if progress: | |
return "Processing... Please wait.", False, int(progress.split('%')[0]), progress | |
return "Processing... Please wait.", False, 0, "0%" | |
if isinstance(store, str) and store.startswith("Error"): | |
return store, True, 100, "100%" | |
try: | |
encoded = base64.b64encode(store).decode() | |
return html.Div([ | |
html.H4("PDF Generated Successfully"), | |
html.A( | |
dbc.Button("Download PDF", color="success", className="mt-2"), | |
href=f"data:application/pdf;base64,{encoded}", | |
download="website_content.pdf" | |
) | |
]), True, 100, "100%" | |
except Exception as e: | |
logger.error(f"Error creating download link: {str(e)}") | |
return f"An error occurred while creating the download link: {str(e)}", True, 100, "100%" | |
raise PreventUpdate | |
def generate_pdf(n_clicks, url, depth): | |
if not url: | |
return "Please enter a valid URL.", "0%" | |
progress_store = {'progress': "0%"} | |
def progress_callback(message): | |
progress_store['progress'] = message | |
pdf_content = asyncio.run(process_url(url, depth, progress_callback)) | |
if isinstance(pdf_content, str): | |
return pdf_content, "100%" # This is an error message | |
return pdf_content, "100%" | |
if __name__ == '__main__': | |
print("Starting the Dash application...") | |
app.run(debug=True, host='0.0.0.0', port=7860) | |
print("Dash application has finished running.") |