File size: 4,991 Bytes
6ecf729
 
 
 
9f222f2
56c5685
9f222f2
1748e66
78a3c51
 
1748e66
 
9f222f2
 
 
 
 
6ecf729
12928b4
 
1748e66
 
 
12928b4
 
cb8ca6c
04286a4
 
 
 
 
 
1748e66
12928b4
 
1748e66
af244f7
12928b4
d0bb3ed
6ecf729
1748e66
 
6ecf729
cb8ca6c
d0bb3ed
 
 
 
 
 
6ecf729
04286a4
 
6ecf729
cb8ca6c
 
 
 
 
81f7834
 
 
 
cb8ca6c
af244f7
81f7834
 
 
 
 
cb8ca6c
81f7834
 
 
 
 
 
 
 
 
 
 
af244f7
 
cb8ca6c
 
 
 
 
 
 
6ecf729
9f222f2
13c4089
9f222f2
d6fec81
de0ffde
cb8ca6c
13c4089
 
12928b4
2ad1f6f
d6fec81
2ad1f6f
 
9f222f2
0f462a3
56c5685
1748e66
 
 
6ecf729
1748e66
6ecf729
cb8ca6c
6ecf729
cb8ca6c
6ecf729
 
1748e66
af244f7
6ecf729
733d87e
 
 
af244f7
 
 
 
733d87e
6ecf729
af244f7
cb8ca6c
 
 
 
6ecf729
31c9130
 
6ecf729
 
 
af244f7
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import gradio as gr
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from fpdf import FPDF
import tempfile
import re
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def clean_text(text):
    text = ''.join(char for char in text if char.isprintable())
    text = re.sub(r'[^\x00-\x7F]+', ' ', text)
    return text

def get_page_content(url):
    try:
        logger.info(f"Fetching content from: {url}")
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        soup = BeautifulSoup(response.text, 'html.parser')
        content = []
        main_content = soup.find('article') or soup.find('main') or soup
        if main_content:
            for tag in ['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li']:
                for element in main_content.find_all(tag):
                    text = clean_text(element.get_text(strip=True))
                    if text:
                        content.append(text)
        logger.info(f"Found {len(content)} content items for {url}")
        return content
    except Exception as e:
        logger.error(f"Error processing {url}: {str(e)}")
        return []  # Return an empty list instead of error message

def get_links(url, base_url):
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        soup = BeautifulSoup(response.text, 'html.parser')
        links = soup.find_all('a', href=True)
        valid_links = []
        for link in links:
            full_url = urljoin(url, link['href'])
            if full_url.startswith(base_url) and full_url != url:
                valid_links.append(full_url)
        return valid_links
    except Exception as e:
        logger.error(f"Error getting links from {url}: {str(e)}")
        return []

def crawl_pages(base_url, max_depth):
    visited = set()
    to_visit = [(base_url, 0)]
    all_pages = []

    def process_page(url, depth):
        content = get_page_content(url)
        logger.info(f"Processed page: {url} at depth {depth}")
        return url, content, depth

    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = []
        while to_visit:
            current_url, depth = to_visit.pop(0)
            if current_url in visited or depth > max_depth:
                continue

            visited.add(current_url)
            futures.append(executor.submit(process_page, current_url, depth))

            if depth < max_depth:
                links = get_links(current_url, base_url)
                for link in links:
                    if link not in visited:
                        to_visit.append((link, depth + 1))

        for future in as_completed(futures):
            url, content, depth = future.result()
            if content:  # Only add pages with content
                all_pages.append((url, content))

    return all_pages

def website_to_pdf(url, max_depth):
    logger.info(f"Starting to process: {url} with max depth: {max_depth}")
    all_pages = crawl_pages(url, max_depth)
    logger.info(f"Found {len(all_pages)} pages to process")
    
    pdf = FPDF()
    pdf.set_auto_page_break(auto=True, margin=15)
    pdf.add_page()
    pdf.set_font("Arial", size=12)

    for page_url, content in all_pages:
        pdf.cell(0, 10, txt=page_url, ln=True)
        pdf.ln(5)
        for text in content:
            try:
                pdf.multi_cell(0, 10, txt=text[:200])  # Limit text length to avoid issues
            except Exception as e:
                logger.error(f"Error writing text to PDF: {str(e)}")
        pdf.add_page()

    with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp:
        pdf_path = tmp.name
        pdf.output(pdf_path)
        logger.info(f"PDF saved to: {pdf_path}")
    
    return pdf_path

def process_url(url, depth):
    try:
        pdf_file = website_to_pdf(url, depth)
        return pdf_file
    except Exception as e:
        logger.error(f"Error in process_url: {str(e)}")
        return None  # Return None instead of error message

def threaded_process_url(url, depth):
    with ThreadPoolExecutor() as executor:
        future = executor.submit(process_url, url, depth)
        result = future.result()
        if result is None:
            return gr.update(value=None, visible=False)
        return gr.update(value=result, visible=True)

iface = gr.Interface(
    fn=threaded_process_url,
    inputs=[
        gr.Textbox(label="Enter website URL (e.g., https://www.gradio.app/docs)"),
        gr.Slider(minimum=1, maximum=5, value=3, step=1, label="Crawl Depth")
    ],
    outputs=gr.File(label="Download PDF"),
    title="Website to PDF Converter",
    description="Enter docs URL and crawl depth to convert documentation pages into a PDF. Be responsible for sites you have permission to do this"
)

if __name__ == "__main__":
    iface.launch()