rag_hydro / file_processing.py
Anas Bader
redo
4cbe4e9
import pdfplumber
from docx import Document
from openpyxl import load_workbook
import pdfplumber
import logging
from typing import List, Union, Tuple
import os
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def extract_pdf_content(pdf_path: str) -> List[str]:
"""
Extract text and tables from PDF in their natural reading order.
Simplified version without positional processing.
Args:
pdf_path (str): Path to the PDF file
Returns:
List[str]: List of extracted content chunks (text and tables)
"""
if not os.path.exists(pdf_path):
logger.error(f"PDF file not found: {pdf_path}")
return []
try:
with pdfplumber.open(pdf_path) as pdf:
content = []
for page in pdf.pages:
# First extract tables
tables = page.extract_tables()
for table in tables:
if table:
# Convert table to string representation
table_str = "\n".join(
["\t".join(str(cell) for cell in row) for row in table]
)
content.append(f"[TABLE]\n{table_str}\n[/TABLE]")
# Then extract regular text
text = page.extract_text()
if text and text.strip():
content.append(text.strip())
logger.info(f"Successfully extracted content from {pdf_path}")
return content
except Exception as e:
logger.error(f"Error processing {pdf_path}: {str(e)}")
return []
from docx import Document
from typing import List
import os
def extract_docx_content(docx_path: str) -> List[str]:
"""
Extract text and tables from DOCX file with clear table markers.
Args:
docx_path (str): Path to the DOCX file
Returns:
List[str]: List of extracted content chunks with tables marked as [TABLE]...[/TABLE]
"""
if not os.path.exists(docx_path):
raise FileNotFoundError(f"DOCX file not found: {docx_path}")
doc = Document(docx_path)
content = []
# Process all paragraphs first
for paragraph in doc.paragraphs:
text = paragraph.text.strip()
if text:
content.append(text)
# Process all tables after paragraphs
for table in doc.tables:
table_str = "\n".join(
["\t".join(cell.text.strip() for cell in row.cells)
for row in table.rows]
)
if table_str.strip():
content.append(f"[TABLE]\n{table_str}\n[/TABLE]")
return content
def extract_xlsx_content(file_path: str):
wb = load_workbook(file_path)
sheets_text = []
for sheet in wb:
sheet_str = f"--- Sheet: {sheet.title} ---\n"
for row in sheet.iter_rows():
row_str = "\t".join(str(cell.value) if cell.value else "" for cell in row)
sheet_str += row_str + "\n"
sheets_text.append(sheet_str.strip())
return sheets_text