File size: 4,856 Bytes
6ecf729
 
 
 
9f222f2
56c5685
9f222f2
1748e66
78a3c51
 
1748e66
 
 
9f222f2
 
 
 
 
6ecf729
12928b4
 
1748e66
 
 
12928b4
 
cb8ca6c
04286a4
 
 
 
 
 
1748e66
12928b4
 
1748e66
12928b4
 
d0bb3ed
6ecf729
1748e66
 
6ecf729
cb8ca6c
d0bb3ed
 
 
 
 
 
6ecf729
04286a4
 
6ecf729
cb8ca6c
 
 
 
 
81f7834
 
 
 
cb8ca6c
81f7834
 
 
 
 
 
cb8ca6c
81f7834
 
 
 
 
 
 
 
 
 
 
 
cb8ca6c
 
 
 
 
 
 
6ecf729
9f222f2
13c4089
9f222f2
d6fec81
de0ffde
cb8ca6c
13c4089
 
12928b4
2ad1f6f
d6fec81
2ad1f6f
 
9f222f2
0f462a3
56c5685
1748e66
 
 
6ecf729
1748e66
6ecf729
cb8ca6c
6ecf729
cb8ca6c
6ecf729
 
1748e66
6ecf729
 
733d87e
 
 
 
 
 
6ecf729
733d87e
cb8ca6c
 
 
 
6ecf729
31c9130
 
6ecf729
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import gradio as gr
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from fpdf import FPDF
import tempfile
import re
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed


logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def clean_text(text):
    text = ''.join(char for char in text if char.isprintable())
    text = re.sub(r'[^\x00-\x7F]+', ' ', text)
    return text

def get_page_content(url):
    try:
        logger.info(f"Fetching content from: {url}")
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        soup = BeautifulSoup(response.text, 'html.parser')
        content = []
        main_content = soup.find('article') or soup.find('main') or soup
        if main_content:
            for tag in ['p', 'h1', 'h2', 'h3', 'h4', 'h5', 'h6', 'li']:
                for element in main_content.find_all(tag):
                    text = clean_text(element.get_text(strip=True))
                    if text:
                        content.append(text)
        logger.info(f"Found {len(content)} content items for {url}")
        return content
    except Exception as e:
        logger.error(f"Error processing {url}: {str(e)}")
        return [f"Error processing {url}: {str(e)}"]

def get_links(url, base_url):
    try:
        response = requests.get(url, timeout=10)
        response.raise_for_status()
        soup = BeautifulSoup(response.text, 'html.parser')
        links = soup.find_all('a', href=True)
        valid_links = []
        for link in links:
            full_url = urljoin(url, link['href'])
            if full_url.startswith(base_url) and full_url != url:
                valid_links.append(full_url)
        return valid_links
    except Exception as e:
        logger.error(f"Error getting links from {url}: {str(e)}")
        return []

def crawl_pages(base_url, max_depth):
    visited = set()
    to_visit = [(base_url, 0)]
    all_pages = []

    def process_page(url, depth):
        content = get_page_content(url)
        logger.info(f"Processed page: {url} at depth {depth}")
        return url, content, depth

    with ThreadPoolExecutor(max_workers=10) as executor:  # Adjust max_workers as needed
        futures = []
        while to_visit:
            current_url, depth = to_visit.pop(0)
            if current_url in visited or depth > max_depth:
                continue

            visited.add(current_url)
            futures.append(executor.submit(process_page, current_url, depth))

            if depth < max_depth:
                links = get_links(current_url, base_url)
                for link in links:
                    if link not in visited:
                        to_visit.append((link, depth + 1))

        for future in as_completed(futures):
            url, content, depth = future.result()
            all_pages.append((url, content))

    return all_pages

def website_to_pdf(url, max_depth):
    logger.info(f"Starting to process: {url} with max depth: {max_depth}")
    all_pages = crawl_pages(url, max_depth)
    logger.info(f"Found {len(all_pages)} pages to process")
    
    pdf = FPDF()
    pdf.set_auto_page_break(auto=True, margin=15)
    pdf.add_page()
    pdf.set_font("Arial", size=12)

    for page_url, content in all_pages:
        pdf.cell(0, 10, txt=page_url, ln=True)
        pdf.ln(5)
        for text in content:
            try:
                pdf.multi_cell(0, 10, txt=text[:200])  # Limit text length to avoid issues
            except Exception as e:
                logger.error(f"Error writing text to PDF: {str(e)}")
        pdf.add_page()

    with tempfile.NamedTemporaryFile(delete=False, suffix='.pdf') as tmp:
        pdf_path = tmp.name
        pdf.output(pdf_path)
        logger.info(f"PDF saved to: {pdf_path}")
    
    return pdf_path

def process_url(url, depth):
    try:
        pdf_file = website_to_pdf(url, depth)
        return pdf_file
    except Exception as e:
        logger.error(f"Error in process_url: {str(e)}")
        return f"An error occurred: {str(e)}"

# Add this new function
def threaded_process_url(url, depth):
    with ThreadPoolExecutor() as executor:
        future = executor.submit(process_url, url, depth)
        return future.result()

iface = gr.Interface(
    fn=threaded_process_url,  # Use the new threaded function
    inputs=[
        gr.Textbox(label="Enter website URL (e.g., https://www.gradio.app/docs)"),
        gr.Slider(minimum=1, maximum=5, value=3, step=1, label="Crawl Depth")
    ],
    outputs=gr.File(label="Download PDF"),
    title="Website to PDF Converter",
    description="Enter docs URL and crawl depth to convert documentation pages into a PDF. Be responsible for sites you have permission to do this"
)

if __name__ == "__main__":
    iface.launch()