|
import dash |
|
import dash_bootstrap_components as dbc |
|
from dash import html, dcc, Input, Output, State, ctx, ALL |
|
import flask |
|
import uuid |
|
import os |
|
import tempfile |
|
import shutil |
|
import logging |
|
from flask import send_file |
|
import threading |
|
from PyPDF2 import PdfReader, PdfWriter |
|
import re |
|
import zipfile |
|
import base64 |
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s') |
|
|
|
SESSION_DATA = {} |
|
SESSION_LOCKS = {} |
|
|
|
def get_session_id(): |
|
session_id = flask.request.cookies.get('session-id') |
|
if not session_id: |
|
session_id = str(uuid.uuid4()) |
|
return session_id |
|
|
|
def get_session_dir(session_id): |
|
base_tmp = tempfile.gettempdir() |
|
path = os.path.join(base_tmp, f'dash_pdfsplit_{session_id}') |
|
os.makedirs(path, exist_ok=True) |
|
return path |
|
|
|
def clean_session(session_id): |
|
try: |
|
session_dir = get_session_dir(session_id) |
|
if os.path.exists(session_dir): |
|
shutil.rmtree(session_dir) |
|
SESSION_DATA.pop(session_id, None) |
|
SESSION_LOCKS.pop(session_id, None) |
|
except Exception as e: |
|
logging.error(f"Error cleaning session {session_id}: {e}") |
|
|
|
def get_session_lock(session_id): |
|
if session_id not in SESSION_LOCKS: |
|
SESSION_LOCKS[session_id] = threading.Lock() |
|
return SESSION_LOCKS[session_id] |
|
|
|
def allowed_file(filename): |
|
return '.' in filename and filename.lower().endswith('.pdf') |
|
|
|
def extract_text_headers(reader, page_num): |
|
try: |
|
page = reader.pages[page_num] |
|
text = page.extract_text() or "" |
|
lines = [line.strip() for line in text.split('\n') if line.strip()] |
|
header = lines[0] if lines else "" |
|
return header |
|
except Exception as e: |
|
logging.warning(f"Failed extracting header from page {page_num}: {e}") |
|
return "" |
|
|
|
def is_blank_page(reader, page_num): |
|
try: |
|
page = reader.pages[page_num] |
|
text = (page.extract_text() or "").strip() |
|
return len(text) == 0 |
|
except Exception as e: |
|
logging.warning(f"Failed to check blank page at {page_num}: {e}") |
|
return False |
|
|
|
def is_chapter_header(header): |
|
patterns = [ |
|
r'^\s*chapter\b', r'^\s*section\b', r'^\s*part\b', r'^\s*appendix\b', |
|
r'^\s*[ivxlcdm]+\.', r'^\s*\d+(\.\d+)*\s', r'^\s*introduction\b' |
|
] |
|
for pat in patterns: |
|
if re.match(pat, header, re.IGNORECASE): |
|
return True |
|
return False |
|
|
|
def estimate_writer_size(writer): |
|
import io |
|
f = io.BytesIO() |
|
writer.write(f) |
|
return f.tell() |
|
|
|
def intelligent_pdf_split(input_path, session_dir, max_mb=5, min_split_mb=4): |
|
reader = PdfReader(input_path) |
|
n_pages = len(reader.pages) |
|
splits = [] |
|
current_writer = PdfWriter() |
|
split_points = [] |
|
last_header = None |
|
last_split_at = 0 |
|
for i in range(n_pages): |
|
page = reader.pages[i] |
|
current_writer.add_page(page) |
|
size = estimate_writer_size(current_writer) / (1024 * 1024) |
|
header = extract_text_headers(reader, i) |
|
blank = is_blank_page(reader, i) |
|
chapter = is_chapter_header(header) |
|
split_here = False |
|
|
|
if size >= max_mb: |
|
split_here = True |
|
elif size >= min_split_mb: |
|
if blank or chapter or (header and header != last_header): |
|
split_here = True |
|
|
|
if split_here: |
|
splits.append((last_split_at, i+1)) |
|
last_split_at = i+1 |
|
current_writer = PdfWriter() |
|
last_header = header |
|
|
|
if last_split_at < n_pages: |
|
splits.append((last_split_at, n_pages)) |
|
|
|
split_files = [] |
|
for idx, (start, end) in enumerate(splits): |
|
writer = PdfWriter() |
|
for i in range(start, end): |
|
writer.add_page(reader.pages[i]) |
|
out_path = os.path.join(session_dir, f'split_part_{idx+1}.pdf') |
|
with open(out_path, 'wb') as f: |
|
writer.write(f) |
|
size = os.path.getsize(out_path) / (1024 * 1024) |
|
split_files.append({'filename': os.path.basename(out_path), 'size': size, 'path': out_path}) |
|
return split_files |
|
|
|
def make_zip_of_splits(split_files, session_dir): |
|
zip_path = os.path.join(session_dir, "split_files.zip") |
|
with zipfile.ZipFile(zip_path, 'w') as zipf: |
|
for file in split_files: |
|
zipf.write(file['path'], arcname=file['filename']) |
|
return zip_path |
|
|
|
external_stylesheets = [dbc.themes.BOOTSTRAP] |
|
app = dash.Dash(__name__, external_stylesheets=external_stylesheets, suppress_callback_exceptions=True) |
|
server = app.server |
|
app.title = "Intelligent PDF Splitter" |
|
|
|
def get_split_results_placeholder(): |
|
return html.Div("", id="split-results-inner") |
|
|
|
app.layout = dbc.Container( |
|
[ |
|
dcc.Store(id='session-store', storage_type='session'), |
|
html.Div(id='dummy-div', style={'display': 'none'}), |
|
dbc.Row( |
|
[ |
|
dbc.Col( |
|
dbc.Card( |
|
[ |
|
dbc.CardHeader(html.H2("Intelligent PDF Splitter")), |
|
dbc.CardBody( |
|
[ |
|
html.P("Upload your PDF. The tool will split it into context-preserving sections, each under 5MB."), |
|
dcc.Upload( |
|
id='upload-pdf', |
|
children=html.Div([ |
|
'Drag and Drop or ', |
|
html.A('Select PDF File') |
|
]), |
|
style={ |
|
'width': '100%', 'height': '80px', 'lineHeight': '80px', |
|
'borderWidth': '1px', 'borderStyle': 'dashed', 'borderRadius': '5px', |
|
'textAlign': 'center', 'margin': '10px 0' |
|
}, |
|
multiple=False, |
|
accept='.pdf' |
|
), |
|
html.Div(id='file-info', className='mb-4'), |
|
dbc.Button( |
|
"Split PDF", id='split-btn', |
|
color='primary', className='mb-3 mt-2', |
|
n_clicks=0, style={'width': '180px', 'fontWeight': 'bold'}, |
|
disabled=True |
|
), |
|
dbc.Button("Clear Session", id='clear-session', color='secondary', className='mt-2 mb-2'), |
|
dcc.Loading( |
|
id="loading", type="default", |
|
children=[html.Div(id='split-results', children=get_split_results_placeholder())] |
|
) |
|
] |
|
) |
|
], |
|
className="mt-4" |
|
), |
|
width=12 |
|
), |
|
] |
|
) |
|
], |
|
fluid=True, |
|
className="p-4" |
|
) |
|
|
|
@app.callback( |
|
Output('file-info', 'children'), |
|
Output('split-btn', 'disabled'), |
|
Output('split-results', 'children'), |
|
Output('session-store', 'data'), |
|
Input('upload-pdf', 'contents'), |
|
State('upload-pdf', 'filename'), |
|
Input('clear-session', 'n_clicks'), |
|
Input({'type': 'delete-upload-btn', 'index': ALL}, 'n_clicks'), |
|
Input('split-btn', 'n_clicks'), |
|
State('session-store', 'data'), |
|
prevent_initial_call=True |
|
) |
|
def handle_upload(contents, filename, clear_n, delete_upload_n_list, split_n, session_data): |
|
trigger = ctx.triggered_id |
|
session_id = get_session_id() |
|
flask.g.session_id = session_id |
|
session_dir = get_session_dir(session_id) |
|
lock = get_session_lock(session_id) |
|
|
|
if session_data is None: |
|
session_data = {} |
|
|
|
|
|
if trigger == 'clear-session': |
|
clean_session(session_id) |
|
resp_data = {} |
|
return "", True, get_split_results_placeholder(), resp_data |
|
|
|
|
|
delete_pressed = False |
|
if isinstance(trigger, dict) and trigger.get('type') == 'delete-upload-btn': |
|
delete_pressed = True |
|
if not delete_pressed and delete_upload_n_list is not None and len(delete_upload_n_list) > 0: |
|
if any(n is not None and n > 0 for n in delete_upload_n_list): |
|
delete_pressed = True |
|
|
|
if delete_pressed: |
|
orig_filename = session_data.get('orig_filename', '') |
|
pdf_path = os.path.join(session_dir, orig_filename) |
|
if os.path.exists(pdf_path): |
|
os.remove(pdf_path) |
|
session_data = {} |
|
if os.path.exists(session_dir): |
|
for file in os.listdir(session_dir): |
|
os.remove(os.path.join(session_dir, file)) |
|
return "", True, get_split_results_placeholder(), {} |
|
|
|
|
|
if trigger == 'upload-pdf': |
|
if not contents: |
|
return "", True, get_split_results_placeholder(), {} |
|
|
|
if not allowed_file(filename): |
|
return html.Div("Only .pdf files are allowed.", style={'color': 'red'}), True, get_split_results_placeholder(), {} |
|
|
|
try: |
|
header, b64data = contents.split(',', 1) |
|
pdf_bytes = base64.b64decode(b64data) |
|
pdf_path = os.path.join(session_dir, filename) |
|
with open(pdf_path, 'wb') as f: |
|
f.write(pdf_bytes) |
|
logging.info(f"PDF uploaded and saved to {pdf_path} for session {session_id}") |
|
|
|
session_data = { |
|
'orig_filename': filename, |
|
'split_files': None, |
|
'zip_ready': False, |
|
} |
|
file_info = dbc.Row([ |
|
dbc.Col(html.Div(f"Uploaded: {filename} ({len(pdf_bytes)/1024/1024:.2f} MB)"), width=9, style={'display': 'flex', 'alignItems': 'center'}), |
|
dbc.Col( |
|
dbc.Button("Delete", id={'type': 'delete-upload-btn', 'index': 0}, color='danger', n_clicks=0, className='ms-5'), |
|
width=3, style={'display': 'flex', 'justifyContent': 'end'} |
|
) |
|
], className='mb-3', align='center', style={'marginTop': "15px", 'marginBottom': '25px'}) |
|
return file_info, False, get_split_results_placeholder(), session_data |
|
except Exception as e: |
|
logging.error(f"Error processing PDF: {e}") |
|
return html.Div(f"Error: {e}", style={'color': 'red'}), True, get_split_results_placeholder(), {} |
|
|
|
|
|
if session_data.get('orig_filename') and not session_data.get('split_files'): |
|
file_info = dbc.Row([ |
|
dbc.Col(html.Div(f"Uploaded: {session_data['orig_filename']}"), width=9, style={'display': 'flex', 'alignItems': 'center'}), |
|
dbc.Col( |
|
dbc.Button("Delete", id={'type': 'delete-upload-btn', 'index': 0}, color='danger', n_clicks=0, className='ms-5'), |
|
width=3, style={'display': 'flex', 'justifyContent': 'end'} |
|
) |
|
], className='mb-3', align='center', style={'marginTop': "15px", 'marginBottom': '25px'}) |
|
return file_info, False, get_split_results_placeholder(), session_data |
|
|
|
|
|
if trigger == 'split-btn': |
|
orig_filename = session_data.get('orig_filename') |
|
if not orig_filename: |
|
return html.Div("No file to split.", style={'color': 'red'}), True, get_split_results_placeholder(), session_data |
|
pdf_path = os.path.join(session_dir, orig_filename) |
|
if not os.path.exists(pdf_path): |
|
return html.Div("Uploaded file not found. Please upload again.", style={'color': 'red'}), True, get_split_results_placeholder(), {} |
|
try: |
|
logging.info(f"Splitting PDF for session {session_id} on user request.") |
|
with lock: |
|
split_files = intelligent_pdf_split(pdf_path, session_dir) |
|
zip_path = make_zip_of_splits(split_files, session_dir) |
|
session_data['split_files'] = split_files |
|
session_data['zip_ready'] = True |
|
file_info = dbc.Row([ |
|
dbc.Col(html.Div(f"Uploaded: {orig_filename}"), width=9, style={'display': 'flex', 'alignItems': 'center'}), |
|
dbc.Col( |
|
dbc.Button("Delete", id={'type': 'delete-upload-btn', 'index': 0}, color='danger', n_clicks=0, className='ms-5'), |
|
width=3, style={'display': 'flex', 'justifyContent': 'end'} |
|
) |
|
], className='mb-3', align='center', style={'marginTop': "15px", 'marginBottom': '25px'}) |
|
split_files_list = html.Ul([ |
|
html.Li([ |
|
f"{fi['filename']} ({fi['size']:.2f} MB)" |
|
]) for fi in split_files |
|
]) |
|
download_zip_btn = dbc.Button( |
|
"Download All (ZIP)", color="primary", size="lg", className='mb-3 mt-4', |
|
href=f"/download_zip/{session_id}/split_files.zip" |
|
) |
|
results = html.Div([ |
|
html.H5("Split Files:"), |
|
split_files_list, |
|
html.Div(download_zip_btn, style={'marginTop': '30px'}) |
|
], id="split-results-inner") |
|
logging.info(f"PDF split into {len(split_files)} chunks for session {session_id}, zip ready.") |
|
return file_info, False, results, session_data |
|
except Exception as e: |
|
logging.error(f"Error splitting PDF: {e}") |
|
return html.Div(f"Error: {e}", style={'color': 'red'}), False, get_split_results_placeholder(), session_data |
|
|
|
|
|
if session_data.get('split_files'): |
|
split_files = session_data['split_files'] |
|
orig_filename = session_data.get('orig_filename', '') |
|
file_info = dbc.Row([ |
|
dbc.Col(html.Div(f"Uploaded: {orig_filename}"), width=9, style={'display': 'flex', 'alignItems': 'center'}), |
|
dbc.Col( |
|
dbc.Button("Delete", id={'type': 'delete-upload-btn', 'index': 0}, color='danger', n_clicks=0, className='ms-5'), |
|
width=3, style={'display': 'flex', 'justifyContent': 'end'} |
|
) |
|
], className='mb-3', align='center', style={'marginTop': "15px", 'marginBottom': '25px'}) |
|
split_files_list = html.Ul([ |
|
html.Li([ |
|
f"{fi['filename']} ({fi['size']:.2f} MB)" |
|
]) for fi in split_files |
|
]) |
|
download_zip_btn = dbc.Button( |
|
"Download All (ZIP)", color="primary", size="lg", className='mb-3 mt-4', |
|
href=f"/download_zip/{session_id}/split_files.zip" |
|
) |
|
results = html.Div([ |
|
html.H5("Split Files:"), |
|
split_files_list, |
|
html.Div(download_zip_btn, style={'marginTop': '30px'}) |
|
], id="split-results-inner") |
|
return file_info, False, results, session_data |
|
|
|
return "", True, get_split_results_placeholder(), session_data |
|
|
|
@app.server.route('/download_zip/<session_id>/<filename>') |
|
def download_zip_file(session_id, filename): |
|
session_dir = get_session_dir(session_id) |
|
file_path = os.path.join(session_dir, filename) |
|
if os.path.exists(file_path): |
|
logging.info(f"Serving zip file {file_path} for session {session_id}") |
|
return send_file(file_path, mimetype='application/zip', as_attachment=True, download_name=filename) |
|
else: |
|
logging.error(f"ZIP file not found for download: {file_path}") |
|
return "File not found", 404 |
|
|
|
@app.callback( |
|
Output('dummy-div', 'children'), |
|
Input('session-store', 'data'), |
|
prevent_initial_call=True |
|
) |
|
def set_cookie_on_load(session_data): |
|
session_id = get_session_id() |
|
resp = flask.make_response("") |
|
resp.set_cookie('session-id', session_id, max_age=60*60*24*3) |
|
return "" |
|
|
|
@app.server.before_request |
|
def persist_session_cookie(): |
|
session_id = get_session_id() |
|
flask.g.session_id = session_id |
|
|
|
if __name__ == '__main__': |
|
print("Starting the Dash application...") |
|
app.run(debug=True, host='0.0.0.0', port=7860, threaded=True) |
|
print("Dash application has finished running.") |