Spaces:
Running
Running
import pdfplumber | |
import logging | |
from typing import List, Union, Tuple | |
import os | |
from concurrent.futures import ThreadPoolExecutor, as_completed | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
def extract_page_content(args) -> Union[str, None]: | |
""" | |
Extract content from a specific page number of a PDF. | |
Opens the PDF file independently for thread safety. | |
""" | |
pdf_path, page_number = args | |
try: | |
with pdfplumber.open(pdf_path) as pdf: | |
page = pdf.pages[page_number] | |
# Extract tables | |
tables = page.extract_tables() | |
table_strings = [] | |
for table in tables: | |
if table: | |
table_str = "\n".join( | |
["\t".join(str(cell) if cell is not None else "" for cell in row)] | |
) | |
table_strings.append(f"[TABLE]\n{table_str}\n[/TABLE]") | |
# Extract text | |
text = page.extract_text() | |
content = [] | |
if table_strings: | |
content.extend(table_strings) | |
if text and text.strip(): | |
content.append(text.strip()) | |
return "\n".join(content) if content else None | |
except Exception as e: | |
logger.error(f"Error processing page {page_number} of {pdf_path}: {str(e)}") | |
return None | |
def extract_pdf_content(pdf_path: str, max_workers: int = 4) -> List[str]: | |
""" | |
Extract all pages of a PDF in parallel using threads. | |
Args: | |
pdf_path (str): Path to the PDF file | |
max_workers (int): Number of threads to use | |
Returns: | |
List[str]: List of extracted content chunks (per page) | |
""" | |
if not os.path.exists(pdf_path): | |
logger.error(f"PDF file not found: {pdf_path}") | |
return [] | |
try: | |
with pdfplumber.open(pdf_path) as pdf: | |
total_pages = len(pdf.pages) | |
logger.info(f"Processing {total_pages} pages from {pdf_path} in parallel.") | |
page_args = [(pdf_path, i) for i in range(total_pages)] | |
results = [] | |
with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
future_to_page = { | |
executor.submit(extract_page_content, args): args[1] | |
for args in page_args | |
} | |
for future in as_completed(future_to_page): | |
page_num = future_to_page[future] | |
try: | |
result = future.result() | |
if result: | |
results.append(result) | |
except Exception as exc: | |
logger.error(f"Page {page_num} generated an exception: {exc}") | |
# Maintain page order based on index | |
return results | |
except Exception as e: | |
logger.error(f"Error opening {pdf_path}: {str(e)}") | |
return [] |