Chandima Prabhath
Add upload endpoint for document ingestion and processing
cf4d43c
# src/query_service/api.py
from fastapi import FastAPI, HTTPException, UploadFile, File
from fastapi.middleware.cors import CORSMiddleware # Import CORSMiddleware
from pydantic import BaseModel
from src.retrieval_handler.retriever import RetrievalHandler
from src.llm_integrator.llm import LLMIntegrator
from src.embedding_generator.embedder import EmbeddingGenerator
from src.vector_store_manager.chroma_manager import ChromaManager
import logging
from typing import Literal, Optional, Dict, Any, List # Import List
from langchain_core.messages import HumanMessage, SystemMessage, AIMessage
import shutil
import uuid
logger = logging.getLogger(__name__)
# Initialize core components (these should ideally be dependency injected in a larger app)
# For simplicity in this example, we initialize them globally.
embedding_generator: Optional[EmbeddingGenerator] = None
vector_store_manager: Optional[ChromaManager] = None
retrieval_handler: Optional[RetrievalHandler] = None
llm_integrator: Optional[LLMIntegrator] = None
try:
embedding_generator = EmbeddingGenerator()
vector_store_manager = ChromaManager(embedding_generator)
retrieval_handler = RetrievalHandler(embedding_generator, vector_store_manager)
llm_integrator = LLMIntegrator()
logger.info("Initialized core RAG components.")
except Exception as e:
logger.critical(f"Failed to initialize core RAG components: {e}")
# Depending on severity, you might want to exit or raise an error here
# For a production API, you might want to return a 500 error on relevant endpoints
# if components fail to initialize, rather than crashing the app startup.
app = FastAPI(
title="Insight AI API",
description="API for querying financial information.",
version="1.0.0"
)
# --- CORS Middleware ---
# Add CORSMiddleware to allow cross-origin requests from your frontend.
# For development, you can allow all origins (*).
# For production, you should restrict this to your frontend's specific origin(s).
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allows all origins. Change this to your frontend's URL in production.
allow_credentials=True,
allow_methods=["*"], # Allows all methods (GET, POST, OPTIONS, etc.)
allow_headers=["*"], # Allows all headers
)
# -----------------------
class Message(BaseModel):
role: Literal['user', 'assistant', 'system']
content: str
class QueryRequest(BaseModel):
query: str
chat_history: Optional[List[Message]] = []
filters: Optional[Dict[str, Any]] = None # Allow passing metadata filters
# Define interfaces matching the backend response structure
class SourceMetadata(BaseModel):
source: Optional[str] = None
ruling_date: Optional[str] = None
# Add other expected metadata fields here
# Example: topic: Optional[str] = None
class RetrievedSource(BaseModel):
content_snippet: str
metadata: Optional[SourceMetadata] = None
class QueryResponse(BaseModel):
answer: str
retrieved_sources: Optional[List[RetrievedSource]] = None
class TitleResponse(BaseModel):
title: str
class TitleRequest(BaseModel):
query: str
@app.post("/query", response_model=QueryResponse)
async def query_rulings(request: QueryRequest):
"""
Receives a user query and returns a generated answer based on retrieved rulings.
"""
logger.info(f"Received query: {request.query}")
if request.filters:
logger.info(f"Received filters: {request.filters}")
# Check if RAG components were initialized successfully
if not retrieval_handler or not llm_integrator:
logger.error("RAG components not initialized.")
raise HTTPException(status_code=500, detail="System components not ready.")
try:
# 1. Retrieve relevant documents based on the query and filters
# Pass filters if your RetrievalHandler/ChromaManager supports using them in search
# Current simple implementation in RetrievalHandler doesn't directly use filters in invoke,
# requires adjustment in RetrievalHandler.retrieve_documents if needed.
retrieved_docs = retrieval_handler.retrieve_documents(request.query, filters=request.filters)
if not retrieved_docs:
logger.warning("No relevant documents retrieved for query.")
return QueryResponse(answer="Could not find relevant rulings for your query.")
# Convert chat_history to appropriate LangChain message types
chat_history = []
logger.debug(f"Raw chat history input: {request.chat_history}")
for msg in request.chat_history:
logger.debug(f"Processing message - Role: {msg.role}, Content: {msg.content[:50]}...")
if msg.role == "user":
new_msg = HumanMessage(content=msg.content)
elif msg.role == "assistant":
new_msg = AIMessage(content=msg.content)
elif msg.role == "system":
new_msg = SystemMessage(content=msg.content)
else:
logger.warning(f"Invalid message role: {msg.role}. Skipping message.")
continue
logger.debug(f"Converted to: {type(new_msg).__name__}")
chat_history.append(new_msg)
logger.debug(f"Final chat history types: {[type(m).__name__ for m in chat_history]}")
# 2. Generate response using the LLM based on the query, retrieved context, and chat history
answer = llm_integrator.generate_response(request.query, retrieved_docs, chat_history)
# 3. Prepare retrieved source information for the response
retrieved_sources = []
for doc in retrieved_docs:
# Ensure the structure matches the RetrievedSource Pydantic model
source_metadata = SourceMetadata(**doc.metadata) if doc.metadata else None
retrieved_sources.append(RetrievedSource(
content_snippet=doc.page_content[:500] + "..." if len(doc.page_content) > 500 else doc.page_content, # Snippet of content
metadata=source_metadata # Include all metadata
))
logger.info("Successfully processed query and generated response.")
return QueryResponse(answer=answer, retrieved_sources=retrieved_sources)
except Exception as e:
logger.error(f"An error occurred during query processing: {e}")
# Provide a more informative but secure error message to the user.
raise HTTPException(status_code=500, detail="An internal error occurred while processing your query.")
@app.post("/generate-title", response_model=TitleResponse)
async def generate_chat_title(request: TitleRequest):
try:
title = llm_integrator.generate_chat_title(request.query)
return {"title": title}
except Exception as e:
logger.error(f"Title generation error: {e}")
return {"title": "New Chat"}
@app.post("/upload-docs")
async def upload_docs(files: list[UploadFile] = File(...)):
"""
Upload new documents and trigger ingestion for them.
"""
import os
from src.ingestion_orchestrator.orchestrator import IngestionOrchestrator
# Create a unique folder in /tmp
upload_id = str(uuid.uuid4())
upload_dir = f"/tmp/ingest_{upload_id}"
os.makedirs(upload_dir, exist_ok=True)
saved_files = []
for file in files:
file_path = os.path.join(upload_dir, file.filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
saved_files.append(file.filename)
# Run the ingestion pipeline for the uploaded folder
try:
orchestrator = IngestionOrchestrator()
orchestrator.run_ingestion_pipeline(docs_folder=upload_dir)
logger.info(f"Ingested files: {saved_files}")
return {"status": "success", "files": saved_files}
except Exception as e:
logger.error(f"Ingestion failed: {e}")
raise HTTPException(status_code=500, detail="Ingestion failed.")
# You can add more endpoints here, e.g., /health for health checks
# @app.get("/health")
# async def health_check():
# # Check connectivity to ChromaDB, LLM service, etc.
# # This requires adding health check methods to your ChromaManager and LLMIntegrator
# chroma_status = vector_store_manager.check_health() if vector_store_manager else "uninitialized"
# llm_status = llm_integrator.check_health() if llm_integrator else "uninitialized"
# return {"chroma": chroma_status, "llm": llm_status}