baderanas's picture
Upload 12 files
cdf244e verified
import os
import shutil
import logging
from datetime import datetime
import hashlib
from typing import List
import chromadb
import openai
import os
import shutil
import logging
from datetime import datetime
import hashlib
from typing import List, Optional
import chromadb
from chromadb.utils.embedding_functions import OpenAIEmbeddingFunction
from dotenv import load_dotenv
load_dotenv()
from semantic_chunking import semantic_chunking
from chroma_operations.pdf_processing import extract_pdf_content
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("certification_processing.log"),
logging.StreamHandler(),
],
)
logger = logging.getLogger(__name__)
def generate_chunk_id(file: str, chunk: str, position: int) -> str:
unique_str = f"{file}_{position}_{chunk}"
return hashlib.sha256(unique_str.encode()).hexdigest()
def move_processed_file(source_path: str, destination_dir: str) -> bool:
"""Move a processed file to the destination directory."""
try:
if not os.path.exists(destination_dir):
os.makedirs(destination_dir)
destination_path = os.path.join(destination_dir, os.path.basename(source_path))
os.rename(source_path, destination_path)
return True
except Exception as e:
logger.error(f"Error moving file {source_path}: {str(e)}")
return False
def get_chroma_client():
"""Initialize ChromaDB client with OpenAI embeddings."""
try:
# Initialize embedding function
embedding_function = OpenAIEmbeddingFunction(
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small"
)
client = chromadb.PersistentClient(path="./chroma_db")
logger.info("Successfully connected to ChromaDB with OpenAI embeddings")
return client, embedding_function
except Exception as e:
logger.error(f"Error connecting to ChromaDB: {str(e)}")
logger.exception("Detailed stack trace:")
return None, None
def create_chroma_collection(
client, embedding_function, collection_name="rag_collection"
):
"""Create or get a ChromaDB collection with proper embedding function."""
try:
collection = client.get_or_create_collection(
name=collection_name,
embedding_function=embedding_function,
metadata={"description": "medical documents"},
)
logger.info(f"Initialized collection: {collection_name}")
return collection
except Exception as e:
logger.error(f"Error creating ChromaDB collection: {str(e)}")
logger.exception("Detailed stack trace:")
return None
def process_file(file_path: str, collection) -> bool:
"""Process a file and add to ChromaDB collection."""
try:
if file_path.endswith(".pdf"):
contents = extract_pdf_content(file_path)
else:
logger.warning(f"Skipping unsupported file type: {file_path}")
return False
if not contents:
logger.warning(f"No content extracted from file: {file_path}")
return False
chunks: List[str] = []
for content in contents:
if not content.strip():
continue
if "\t" in content or "[TABLE]" in content:
chunks.append(content)
else:
try:
chunks.extend(semantic_chunking(content))
except Exception as e:
logger.error(
f"Error during chunking for file {file_path}: {str(e)}"
)
continue
if not chunks:
logger.warning(f"No valid chunks created for file: {file_path}")
return False
documents = []
metadatas = []
ids = []
source_file = os.path.basename(file_path)
if source_file.endswith(".pdf"):
source_file = source_file[:-4].strip()
for i, chunk in enumerate(chunks):
if not chunk.strip():
continue
try:
chunk_id = generate_chunk_id(file_path, chunk, i)
documents.append(chunk)
metadatas.append(
{
"chunk_id": chunk_id,
"source_file": source_file,
"position": i,
"processed_at": datetime.now().isoformat(),
}
)
ids.append(chunk_id)
except Exception as e:
logger.error(f"Error processing chunk from file {file_path}: {str(e)}")
continue
if documents:
try:
# Chroma will automatically generate embeddings using the collection's embedding function
collection.add(documents=documents, metadatas=metadatas, ids=ids)
logger.info(
f"Added {len(documents)} chunks from {file_path} to ChromaDB"
)
return True
except Exception as e:
logger.error(f"Error adding documents to ChromaDB: {str(e)}")
return False
return False
except Exception as e:
logger.error(f"Error processing file {file_path}: {str(e)}")
return False
def ingest():
try:
# Get client and embedding function together
chroma_client, embedding_function = get_chroma_client()
if not chroma_client or not embedding_function:
logger.error("Failed to initialize ChromaDB with embeddings")
return False
collection = create_chroma_collection(chroma_client, embedding_function)
if not collection:
logger.error("Failed to create or get ChromaDB collection")
return False
logger.info(f"Collection ready: {collection.name}")
unprocessed_dir = "docs/unprocessed"
processed_dir = "docs/processed"
if not os.path.exists(unprocessed_dir):
logger.error(f"Directory not found: {unprocessed_dir}")
return False
for file in os.listdir(unprocessed_dir):
file_path = os.path.join(unprocessed_dir, file)
if not os.path.isfile(file_path) or not file.lower().endswith(".pdf"):
continue
logger.info(f"Processing file: {file_path}")
if process_file(file_path, collection):
if not move_processed_file(file_path, processed_dir):
logger.error(f"Failed to move processed file: {file_path}")
else:
logger.error(f"Failed to process file: {file_path}")
logger.info("Processing completed")
return True
except Exception as e:
logger.error(f"Fatal error in ingestion: {str(e)}")
logger.exception("Detailed stack trace:")
return False