from langchain_community.document_loaders import PyPDFLoader, PDFPlumberLoader, UnstructuredPDFLoader, PyMuPDFLoader from langchain_community.document_loaders import UnstructuredPowerPointLoader from langchain_community.document_loaders import UnstructuredWordDocumentLoader from langchain_community.document_loaders import TextLoader from langchain.schema import Document import os import fitz # PyMuPDF import pytesseract from PIL import Image import io import numpy as np import cv2 from pdf2image import convert_from_path import tempfile import shutil class DocumentLoader: def __init__(self, file_path: str): self.file_path = file_path self.extension = os.path.splitext(file_path)[1].lower() def load(self): """Load documents with enhanced PDF processing for scanned documents""" try: if self.extension == ".pdf": return self._load_pdf_with_ocr() elif self.extension == ".pptx": return UnstructuredPowerPointLoader(self.file_path).load() elif self.extension == ".docx": return UnstructuredWordDocumentLoader(self.file_path).load() elif self.extension == ".txt": return TextLoader(self.file_path).load() else: raise ValueError(f"Unsupported file type: {self.extension}") except Exception as e: print(f"[ERROR] Document loading failed for {self.file_path}: {e}") # Return a basic error document return [Document( page_content=f"Error loading document: {str(e)}. Please ensure the file is not corrupted and is in a supported format.", metadata={"page": 1, "source": self.file_path, "error": str(e)} )] def _load_pdf_with_ocr(self): """Enhanced PDF loading with OCR support for scanned documents""" try: # First, try to extract text using PyMuPDF (most reliable for text-based PDFs) print(f"[INFO] Attempting to extract text using PyMuPDF...") documents = self._extract_text_with_pymupdf() # Check if we got meaningful text content total_text = " ".join([doc.page_content for doc in documents]) if len(total_text.strip()) > 50: # If we have substantial text, use it print(f"[INFO] Successfully extracted {len(total_text)} characters using PyMuPDF") return documents # If text extraction failed or returned minimal content, try OCR print(f"[INFO] Text extraction returned minimal content ({len(total_text)} chars). Attempting OCR...") documents = self._extract_text_with_ocr() if documents: total_text = " ".join([doc.page_content for doc in documents]) print(f"[INFO] Successfully extracted {len(total_text)} characters using OCR") return documents # If OCR also fails, try other PDF loaders as fallback print(f"[INFO] OCR failed. Trying alternative PDF loaders...") documents = self._try_alternative_pdf_loaders() if documents: total_text = " ".join([doc.page_content for doc in documents]) print(f"[INFO] Successfully extracted {len(total_text)} characters using alternative loaders") return documents # If all methods fail, create a placeholder document with instructions print(f"[WARNING] All text extraction methods failed. Creating placeholder document.") return [Document( page_content="This appears to be a scanned document or image-based PDF. To enable full text extraction, please install Tesseract OCR. For now, you can still use the document for basic operations.", metadata={"page": 1, "source": self.file_path, "method": "placeholder"} )] except Exception as e: print(f"[ERROR] PDF processing failed: {e}") # Final fallback to basic PDF loader return PyPDFLoader(self.file_path).load() def _extract_text_with_pymupdf(self): """Extract text using PyMuPDF (handles most PDF types well)""" try: doc = fitz.open(self.file_path) documents = [] for page_num in range(len(doc)): page = doc.load_page(page_num) # Try to extract text text = page.get_text() # If text is empty or very short, try to get text with more options if not text or len(text.strip()) < 10: text = page.get_text("text") # If still no text, try to get text with layout preservation if not text or len(text.strip()) < 10: text = page.get_text("dict") # Extract text from the dict structure if "blocks" in text: text_content = [] for block in text["blocks"]: if "lines" in block: for line in block["lines"]: for span in line["spans"]: text_content.append(span["text"]) text = " ".join(text_content) if text and len(text.strip()) > 0: documents.append(Document( page_content=text.strip(), metadata={"page": page_num + 1, "source": self.file_path} )) doc.close() return documents except Exception as e: print(f"[WARNING] PyMuPDF extraction failed: {e}") return [] def _extract_text_with_ocr(self): """Extract text from scanned PDFs using OCR""" try: # Check if Tesseract is available and configure it try: import pytesseract # Set Tesseract executable path explicitly tesseract_path = r"C:\Program Files\Tesseract-OCR\tesseract.exe" if os.path.exists(tesseract_path): pytesseract.pytesseract.tesseract_cmd = tesseract_path print(f"[INFO] Tesseract found at: {tesseract_path}") else: # Try to find tesseract in PATH import subprocess try: result = subprocess.run(['tesseract', '--version'], capture_output=True, text=True) if result.returncode == 0: print("[INFO] Tesseract found in PATH") else: raise Exception("Tesseract not found in PATH") except: raise Exception("Tesseract executable not found") # Test if tesseract is working version = pytesseract.get_tesseract_version() print(f"[INFO] Tesseract version: {version}") except Exception as e: print(f"[WARNING] Tesseract not available: {e}") print("[INFO] Skipping OCR - Tesseract needs to be installed for OCR functionality") return [] # Convert PDF to images print(f"[INFO] Converting PDF to images for OCR...") # Specify the Poppler path explicitly poppler_path = r"C:\poppler\poppler-23.11.0\Library\bin" images = convert_from_path(self.file_path, dpi=300, poppler_path=poppler_path) documents = [] for page_num, image in enumerate(images): print(f"[INFO] Processing page {page_num + 1} with OCR...") # Convert PIL image to OpenCV format for preprocessing img_cv = cv2.cvtColor(np.array(image), cv2.COLOR_RGB2BGR) # Preprocess image for better OCR (returns multiple versions) processed_images = self._preprocess_image_for_ocr(img_cv) # Convert all processed images to PIL format pil_images = [] for processed_img in processed_images: try: pil_img = Image.fromarray(processed_img) pil_images.append(pil_img) except: # If conversion fails, use original image pil_images.append(image) # Perform OCR with multiple attempts and configurations best_text = "" best_length = 0 # OCR configurations to try (in order of preference) ocr_configs = [ # Default configuration {"config": "--oem 3 --psm 6", "name": "default"}, # Single uniform block of text {"config": "--oem 3 --psm 6 -c tessedit_char_whitelist=ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789.,!?;:()[]{}'\"- ", "name": "alphanumeric"}, # Sparse text with OSD {"config": "--oem 3 --psm 3", "name": "sparse_text"}, # Single text line {"config": "--oem 3 --psm 7", "name": "single_line"}, # Single word {"config": "--oem 3 --psm 8", "name": "single_word"}, # Single word in a circle {"config": "--oem 3 --psm 9", "name": "circular_text"}, # Single character {"config": "--oem 3 --psm 10", "name": "single_char"}, # Sparse text {"config": "--oem 3 --psm 11", "name": "sparse_text_alt"}, # Raw line {"config": "--oem 3 --psm 12", "name": "raw_line"}, # Uniform block of text {"config": "--oem 3 --psm 13", "name": "uniform_block"} ] try: # Try OCR on all preprocessed images with all configurations for img_idx, pil_image in enumerate(pil_images): for config in ocr_configs: try: text = pytesseract.image_to_string( pil_image, config=config["config"], lang='eng' # Specify English language ) # Clean the text cleaned_text = self._clean_ocr_text(text) # Check if this configuration produced better results if len(cleaned_text.strip()) > best_length: best_text = cleaned_text best_length = len(cleaned_text.strip()) print(f"[INFO] Better OCR result with image {img_idx+1}, config {config['name']}: {best_length} characters") except Exception as config_error: print(f"[DEBUG] OCR config {config['name']} failed for image {img_idx+1}: {config_error}") continue # Use the best result if best_text and len(best_text.strip()) > 10: documents.append(Document( page_content=best_text.strip(), metadata={"page": page_num + 1, "source": self.file_path, "method": "OCR"} )) print(f"[INFO] OCR extracted {len(best_text)} characters from page {page_num + 1}") else: print(f"[WARNING] OCR returned minimal text for page {page_num + 1} (best: {best_length} chars)") except Exception as e: print(f"[WARNING] OCR failed for page {page_num + 1}: {e}") continue return documents except Exception as e: print(f"[ERROR] OCR processing failed: {e}") return [] def _preprocess_image_for_ocr(self, image): """Preprocess image for better OCR results""" try: # Convert to grayscale gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) # Apply noise reduction denoised = cv2.fastNlMeansDenoising(gray) # Try multiple preprocessing approaches processed_images = [] # Approach 1: Adaptive thresholding try: thresh1 = cv2.adaptiveThreshold( denoised, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, 11, 2 ) processed_images.append(thresh1) except: pass # Approach 2: Otsu thresholding try: _, thresh2 = cv2.threshold(denoised, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) processed_images.append(thresh2) except: pass # Approach 3: Simple thresholding try: _, thresh3 = cv2.threshold(denoised, 127, 255, cv2.THRESH_BINARY) processed_images.append(thresh3) except: pass # Approach 4: Original grayscale (sometimes works better) processed_images.append(denoised) # Apply morphological operations to clean up cleaned_images = [] for img in processed_images: try: # Small kernel for fine details kernel_small = cv2.getStructuringElement(cv2.MORPH_RECT, (1, 1)) cleaned_small = cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel_small) cleaned_images.append(cleaned_small) # Medium kernel for general cleaning kernel_medium = cv2.getStructuringElement(cv2.MORPH_RECT, (2, 2)) cleaned_medium = cv2.morphologyEx(img, cv2.MORPH_CLOSE, kernel_medium) cleaned_images.append(cleaned_medium) except: cleaned_images.append(img) # Return all processed images for testing return cleaned_images except Exception as e: print(f"[WARNING] Image preprocessing failed: {e}") return [image] def _clean_ocr_text(self, text): """Clean and improve OCR text""" if not text: return text # Remove excessive whitespace text = ' '.join(text.split()) # Fix common OCR errors text = text.replace('|', 'I') # Common OCR error text = text.replace('0', 'O') # Sometimes numbers are confused with letters text = text.replace('1', 'l') # Sometimes 1 is confused with l text = text.replace('l', 'I') # Sometimes l is confused with I text = text.replace('rn', 'm') # Common OCR error text = text.replace('cl', 'd') # Common OCR error text = text.replace('vv', 'w') # Common OCR error # Remove lines that are likely noise (very short lines) lines = text.split('\n') cleaned_lines = [] for line in lines: line = line.strip() # Keep lines with more than 2 characters and not just punctuation if len(line) > 2 and not all(c in '.,!?;:()[]{}' for c in line): cleaned_lines.append(line) # Join lines and clean up result = '\n'.join(cleaned_lines) # Remove excessive newlines result = '\n'.join(line for line in result.split('\n') if line.strip()) return result def _try_alternative_pdf_loaders(self): """Try alternative PDF loaders if primary methods fail""" loaders = [ ("PDFPlumberLoader", lambda: PDFPlumberLoader(self.file_path).load()), ("UnstructuredPDFLoader", lambda: UnstructuredPDFLoader(self.file_path).load()), ("PyPDFLoader", lambda: PyPDFLoader(self.file_path).load()) ] for loader_name, loader_func in loaders: try: print(f"[INFO] Trying {loader_name}...") documents = loader_func() total_text = " ".join([doc.page_content for doc in documents]) if len(total_text.strip()) > 10: print(f"[INFO] {loader_name} successfully extracted {len(total_text)} characters") return documents except Exception as e: print(f"[WARNING] {loader_name} failed: {e}") continue return [] def get_page_count(self): """Get page count for different document types""" if self.extension == ".pdf": try: # Try PyMuPDF first (most reliable) doc = fitz.open(self.file_path) page_count = len(doc) doc.close() return page_count except Exception: try: # Fallback to PyPDF2 import PyPDF2 with open(self.file_path, "rb") as f: reader = PyPDF2.PdfReader(f) return len(reader.pages) except Exception: return None elif self.extension == ".pptx": try: from pptx import Presentation prs = Presentation(self.file_path) return len(prs.slides) except Exception: return None elif self.extension == ".docx": try: from docx import Document as DocxDocument doc = DocxDocument(self.file_path) # DOCX doesn't have strict pages, but we can estimate by section breaks or paragraphs return max(1, len(doc.paragraphs) // 30) # Rough estimate: 30 paragraphs per page except Exception: return None elif self.extension == ".txt": try: with open(self.file_path, "r", encoding="utf-8") as f: words = f.read().split() return max(1, len(words) // 500) except Exception: return None else: return None