rag_hydro / app.py
Anas Bader
yeees
df95e65
raw
history blame
3.76 kB
import os
from fastapi import FastAPI, HTTPException
from langchain.prompts import PromptTemplate
from pydantic import BaseModel
from typing import Optional
from dotenv import load_dotenv
from embeddings.embeddings import generate_embeddings
from elastic.retrieval import search_certification_chunks
from prompting.rewrite_question import classify_certification, initialize_llms, process_query
load_dotenv()
app = FastAPI(
title="Hydrogen Certification RAG System",
description="API for querying hydrogen certification documents using RAG",
version="0.1.0"
)
# Initialize LLMs and Elasticsearch client
llms = initialize_llms()
# Request models
class QueryRequest(BaseModel):
query: str
llm = initialize_llms()["rewrite_llm"]
# Endpoints
@app.post("/query")
async def handle_query(request: QueryRequest):
"""
Process a query through the full RAG pipeline:
1. Classify certification (if not provided)
2. Optimize query based on specificity
3. Search relevant chunks
"""
try:
# Step 1: Determine certification
query = request.query
certification = classify_certification(request.query, llms["rewrite_llm"])
if "no certification mentioned" in certification :
raise HTTPException(
status_code=400,
detail="No certification specified in query and none provided"
)
# Step 2: Process query
processed_query = process_query(request.query, llms)
question_vector = generate_embeddings(processed_query)
# Step 3: Search
results = search_certification_chunks(
index_name="certif_index",
certification_name=certification,
text_query=processed_query,
vector_query=question_vector,
)
results_ = search_certification_chunks(
index_name="certification_index",
certification_name=certification,
text_query=processed_query,
vector_query=question_vector,
)
results_merged = ". ".join([result["text"] for result in results])
results_merged_ = ". ".join([result["text"] for result in results_])
template = """
You are an AI assistant tasked with providing answers based on the given context about a specific hydrogen certification.
Provide a clear, concise response that directly addresses the question without unnecessary information.
Question: {question}
Certification: {certification}
Context: {context}
Answer:
"""
prompt = PromptTemplate(
input_variables=["question", "certification", "context"],
template=template
)
chain = prompt | llm
answer = chain.invoke({"question": processed_query, "certification": certification, "context": results_merged}).content
answer_ = chain.invoke({"question": processed_query, "certification": certification, "context": results_merged_}).content
return {
"certification": certification,
"certif_index": answer,
"certification_index": answer_,
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/certifications", response_model=list[str])
async def list_certifications():
"""List all available certifications"""
try:
certs_dir = "docs/processed"
return [f for f in os.listdir(certs_dir) if os.path.isdir(os.path.join(certs_dir, f))]
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)