Spaces:
Running
Running
import os | |
import shutil | |
import logging | |
from datetime import datetime | |
import hashlib | |
from typing import List | |
import chromadb | |
import openai | |
import os | |
import shutil | |
import logging | |
from datetime import datetime | |
import hashlib | |
from typing import List, Optional | |
import chromadb | |
from chromadb.utils.embedding_functions import OpenAIEmbeddingFunction | |
from dotenv import load_dotenv | |
load_dotenv() | |
from semantic_chunking import semantic_chunking | |
from chroma_operations.pdf_processing import extract_pdf_content | |
# Configure logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(levelname)s - %(message)s", | |
handlers=[ | |
logging.FileHandler("certification_processing.log"), | |
logging.StreamHandler(), | |
], | |
) | |
logger = logging.getLogger(__name__) | |
def generate_chunk_id(file: str, chunk: str, position: int) -> str: | |
unique_str = f"{file}_{position}_{chunk}" | |
return hashlib.sha256(unique_str.encode()).hexdigest() | |
def move_processed_file(source_path: str, destination_dir: str) -> bool: | |
"""Move a processed file to the destination directory.""" | |
try: | |
if not os.path.exists(destination_dir): | |
os.makedirs(destination_dir) | |
destination_path = os.path.join(destination_dir, os.path.basename(source_path)) | |
os.rename(source_path, destination_path) | |
return True | |
except Exception as e: | |
logger.error(f"Error moving file {source_path}: {str(e)}") | |
return False | |
def get_chroma_client(): | |
"""Initialize ChromaDB client with OpenAI embeddings.""" | |
try: | |
# Initialize embedding function | |
embedding_function = OpenAIEmbeddingFunction( | |
api_key=os.getenv("OPENAI_API_KEY"), model_name="text-embedding-3-small" | |
) | |
client = chromadb.PersistentClient(path="./chroma_db") | |
logger.info("Successfully connected to ChromaDB with OpenAI embeddings") | |
return client, embedding_function | |
except Exception as e: | |
logger.error(f"Error connecting to ChromaDB: {str(e)}") | |
logger.exception("Detailed stack trace:") | |
return None, None | |
def create_chroma_collection( | |
client, embedding_function, collection_name="rag_collection" | |
): | |
"""Create or get a ChromaDB collection with proper embedding function.""" | |
try: | |
collection = client.get_or_create_collection( | |
name=collection_name, | |
embedding_function=embedding_function, | |
metadata={"description": "medical documents"}, | |
) | |
logger.info(f"Initialized collection: {collection_name}") | |
return collection | |
except Exception as e: | |
logger.error(f"Error creating ChromaDB collection: {str(e)}") | |
logger.exception("Detailed stack trace:") | |
return None | |
def process_file(file_path: str, collection) -> bool: | |
"""Process a file and add to ChromaDB collection.""" | |
try: | |
if file_path.endswith(".pdf"): | |
contents = extract_pdf_content(file_path) | |
else: | |
logger.warning(f"Skipping unsupported file type: {file_path}") | |
return False | |
if not contents: | |
logger.warning(f"No content extracted from file: {file_path}") | |
return False | |
chunks: List[str] = [] | |
for content in contents: | |
if not content.strip(): | |
continue | |
if "\t" in content or "[TABLE]" in content: | |
chunks.append(content) | |
else: | |
try: | |
chunks.extend(semantic_chunking(content)) | |
except Exception as e: | |
logger.error( | |
f"Error during chunking for file {file_path}: {str(e)}" | |
) | |
continue | |
if not chunks: | |
logger.warning(f"No valid chunks created for file: {file_path}") | |
return False | |
documents = [] | |
metadatas = [] | |
ids = [] | |
source_file = os.path.basename(file_path) | |
if source_file.endswith(".pdf"): | |
source_file = source_file[:-4].strip() | |
for i, chunk in enumerate(chunks): | |
if not chunk.strip(): | |
continue | |
try: | |
chunk_id = generate_chunk_id(file_path, chunk, i) | |
documents.append(chunk) | |
metadatas.append( | |
{ | |
"chunk_id": chunk_id, | |
"source_file": source_file, | |
"position": i, | |
"processed_at": datetime.now().isoformat(), | |
} | |
) | |
ids.append(chunk_id) | |
except Exception as e: | |
logger.error(f"Error processing chunk from file {file_path}: {str(e)}") | |
continue | |
if documents: | |
try: | |
# Chroma will automatically generate embeddings using the collection's embedding function | |
collection.add(documents=documents, metadatas=metadatas, ids=ids) | |
logger.info( | |
f"Added {len(documents)} chunks from {file_path} to ChromaDB" | |
) | |
return True | |
except Exception as e: | |
logger.error(f"Error adding documents to ChromaDB: {str(e)}") | |
return False | |
return False | |
except Exception as e: | |
logger.error(f"Error processing file {file_path}: {str(e)}") | |
return False | |
def ingest(): | |
try: | |
# Get client and embedding function together | |
chroma_client, embedding_function = get_chroma_client() | |
if not chroma_client or not embedding_function: | |
logger.error("Failed to initialize ChromaDB with embeddings") | |
return False | |
collection = create_chroma_collection(chroma_client, embedding_function) | |
if not collection: | |
logger.error("Failed to create or get ChromaDB collection") | |
return False | |
logger.info(f"Collection ready: {collection.name}") | |
unprocessed_dir = "docs/unprocessed" | |
processed_dir = "docs/processed" | |
if not os.path.exists(unprocessed_dir): | |
logger.error(f"Directory not found: {unprocessed_dir}") | |
return False | |
for file in os.listdir(unprocessed_dir): | |
file_path = os.path.join(unprocessed_dir, file) | |
if not os.path.isfile(file_path) or not file.lower().endswith(".pdf"): | |
continue | |
logger.info(f"Processing file: {file_path}") | |
if process_file(file_path, collection): | |
if not move_processed_file(file_path, processed_dir): | |
logger.error(f"Failed to move processed file: {file_path}") | |
else: | |
logger.error(f"Failed to process file: {file_path}") | |
logger.info("Processing completed") | |
return True | |
except Exception as e: | |
logger.error(f"Fatal error in ingestion: {str(e)}") | |
logger.exception("Detailed stack trace:") | |
return False | |