rag-medical / chroma_operations /pdf_processing.py
baderanas's picture
Update chroma_operations/pdf_processing.py
8436d23 verified
import pdfplumber
import logging
from typing import List, Union, Tuple
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def extract_page_content(args) -> Union[str, None]:
"""
Extract content from a specific page number of a PDF.
Opens the PDF file independently for thread safety.
"""
pdf_path, page_number = args
try:
with pdfplumber.open(pdf_path) as pdf:
page = pdf.pages[page_number]
# Extract tables
tables = page.extract_tables()
table_strings = []
for table in tables:
if table:
table_str = "\n".join(
["\t".join(str(cell) if cell is not None else "" for cell in row)]
)
table_strings.append(f"[TABLE]\n{table_str}\n[/TABLE]")
# Extract text
text = page.extract_text()
content = []
if table_strings:
content.extend(table_strings)
if text and text.strip():
content.append(text.strip())
return "\n".join(content) if content else None
except Exception as e:
logger.error(f"Error processing page {page_number} of {pdf_path}: {str(e)}")
return None
def extract_pdf_content(pdf_path: str, max_workers: int = 4) -> List[str]:
"""
Extract all pages of a PDF in parallel using threads.
Args:
pdf_path (str): Path to the PDF file
max_workers (int): Number of threads to use
Returns:
List[str]: List of extracted content chunks (per page)
"""
if not os.path.exists(pdf_path):
logger.error(f"PDF file not found: {pdf_path}")
return []
try:
with pdfplumber.open(pdf_path) as pdf:
total_pages = len(pdf.pages)
logger.info(f"Processing {total_pages} pages from {pdf_path} in parallel.")
page_args = [(pdf_path, i) for i in range(total_pages)]
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_page = {
executor.submit(extract_page_content, args): args[1]
for args in page_args
}
for future in as_completed(future_to_page):
page_num = future_to_page[future]
try:
result = future.result()
if result:
results.append(result)
except Exception as exc:
logger.error(f"Page {page_num} generated an exception: {exc}")
# Maintain page order based on index
return results
except Exception as e:
logger.error(f"Error opening {pdf_path}: {str(e)}")
return []