website-to-pdf / app.py
bluenevus's picture
Update app.py
78a3c51 verified
raw
history blame
4.86 kB
import gradio as gr
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from fpdf import FPDF
import tempfile
import re
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def clean_text(text):
text = ''.join(char for char in text if char.isprintable())
text = re.sub(r'[^\x00-\x7F]+', ' ', text)
return text
def get_page_content(url):
try:
logger.info(f"Fetching content from: {url}")
response = requests.get(url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
content = []
main_content = soup.find('article') or soup.find('main') or soup
if main_content:
for tag in ['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li']:
for element in main_content.find_all(tag):
text = clean_text(element.get_text(strip=True))
if text:
content.append(text)
logger.info(f"Found {len(content)} content items for {url}")
return content
except Exception as e:
logger.error(f"Error processing {url}: {str(e)}")
return [f"Error processing {url}: {str(e)}"]
def get_links(url, base_url):
try:
response = requests.get(url, timeout=10)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
links = soup.find_all('a', href=True)
valid_links = []
for link in links:
full_url = urljoin(url, link['href'])
if full_url.startswith(base_url) and full_url != url:
valid_links.append(full_url)
return valid_links
except Exception as e:
logger.error(f"Error getting links from {url}: {str(e)}")
return []
def crawl_pages(base_url, max_depth):
visited = set()
to_visit = [(base_url, 0)]
all_pages = []
def process_page(url, depth):
content = get_page_content(url)
logger.info(f"Processed page: {url} at depth {depth}")
return url, content, depth
with ThreadPoolExecutor(max_workers=10) as executor: # Adjust max_workers as needed
futures = []
while to_visit:
current_url, depth = to_visit.pop(0)
if current_url in visited or depth > max_depth:
continue
visited.add(current_url)
futures.append(executor.submit(process_page, current_url, depth))
if depth < max_depth:
links = get_links(current_url, base_url)
for link in links:
if link not in visited:
to_visit.append((link, depth + 1))
for future in as_completed(futures):
url, content, depth = future.result()
all_pages.append((url, content))
return all_pages
def website_to_pdf(url, max_depth):
logger.info(f"Starting to process: {url} with max depth: {max_depth}")
all_pages = crawl_pages(url, max_depth)
logger.info(f"Found {len(all_pages)} pages to process")
pdf = FPDF()
pdf.set_auto_page_break(auto=True, margin=15)
pdf.add_page()
pdf.set_font("Arial", size=12)
for page_url, content in all_pages:
pdf.cell(0, 10, txt=page_url, ln=True)
pdf.ln(5)
for text in content:
try:
pdf.multi_cell(0, 10, txt=text[:200]) # Limit text length to avoid issues
except Exception as e:
logger.error(f"Error writing text to PDF: {str(e)}")
pdf.add_page()
with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp:
pdf_path = tmp.name
pdf.output(pdf_path)
logger.info(f"PDF saved to: {pdf_path}")
return pdf_path
def process_url(url, depth):
try:
pdf_file = website_to_pdf(url, depth)
return pdf_file
except Exception as e:
logger.error(f"Error in process_url: {str(e)}")
return f"An error occurred: {str(e)}"
# Add this new function
def threaded_process_url(url, depth):
with ThreadPoolExecutor() as executor:
future = executor.submit(process_url, url, depth)
return future.result()
iface = gr.Interface(
fn=threaded_process_url, # Use the new threaded function
inputs=[
gr.Textbox(label="Enter website URL (e.g., https://www.gradio.app/docs)"),
gr.Slider(minimum=1, maximum=5, value=3, step=1, label="Crawl Depth")
],
outputs=gr.File(label="Download PDF"),
title="Website to PDF Converter",
description="Enter docs URL and crawl depth to convert documentation pages into a PDF. Be responsible for sites you have permission to do this"
)
if __name__ == "__main__":
iface.launch()