churnsight-ai / main.py
Hasitha16's picture
Update main.py
b94276a verified
raw
history blame
10.8 kB
from fastapi import FastAPI, Request, Header, HTTPException, Query
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.openapi.docs import get_swagger_ui_html
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from datetime import datetime
import uuid
import os
from openai import OpenAI
client = OpenAI() # Automatically picks up your Hugging Face secret OPENAI_API_KEY
from transformers import pipeline
import logging, traceback
from typing import Optional, List, Union
from model import (
summarize_review, smart_summarize, detect_industry,
detect_product_category, detect_emotion, answer_followup, answer_only,
assess_churn_risk, extract_pain_points # βœ… Added extract_pain_points
)
app = FastAPI(
title="🧠 ChurnSight AI",
description="Multilingual GenAI for smarter feedback β€” summarization, sentiment, emotion, aspects, Q&A and tags.",
version="2025.1.0",
openapi_url="/openapi.json",
docs_url=None,
redoc_url="/redoc"
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
logging.basicConfig(level=logging.INFO)
VALID_API_KEY = "my-secret-key"
log_store = [] # βœ… Shared in-memory churn log
@app.get("/", response_class=HTMLResponse)
def root():
return "<h1>ChurnSight AI Backend is Running</h1>"
@app.get("/docs", include_in_schema=False)
def custom_swagger_ui():
return get_swagger_ui_html(
openapi_url=app.openapi_url,
title="🧠 Swagger UI - ChurnSight AI",
swagger_favicon_url="https://cdn-icons-png.flaticon.com/512/3794/3794616.png",
swagger_js_url="https://cdn.jsdelivr.net/npm/[email protected]/swagger-ui-bundle.js",
swagger_css_url="https://cdn.jsdelivr.net/npm/[email protected]/swagger-ui.css",
)
@app.exception_handler(Exception)
async def exception_handler(request: Request, exc: Exception):
logging.error(f"Unhandled Exception: {traceback.format_exc()}")
return JSONResponse(status_code=500, content={"detail": "Internal Server Error. Please contact support."})
# ==== SCHEMAS ====
class ReviewInput(BaseModel):
text: str
model: str = "distilbert-base-uncased-finetuned-sst-2-english"
industry: Optional[str] = None
aspects: bool = False
follow_up: Optional[Union[str, List[str]]] = None
product_category: Optional[str] = None
device: Optional[str] = None
intelligence: Optional[bool] = False
verbosity: Optional[str] = "detailed"
class BulkReviewInput(BaseModel):
reviews: List[str]
model: str = "distilbert-base-uncased-finetuned-sst-2-english"
industry: Optional[List[str]] = None
aspects: bool = False
product_category: Optional[List[str]] = None
device: Optional[List[str]] = None
follow_up: Optional[List[Union[str, List[str]]]] = None
intelligence: Optional[bool] = False
explain_bulk: Optional[bool] = False
class FollowUpRequest(BaseModel):
text: str
question: str
verbosity: Optional[str] = "brief"
# ==== HELPERS ====
def auto_fill(value: Optional[str], fallback: str) -> str:
if not value or value.lower() == "auto-detect":
return fallback
return value
# ==== ENDPOINTS ====
@app.post("/analyze/")
async def analyze(data: ReviewInput, x_api_key: str = Header(None)):
if x_api_key and x_api_key != VALID_API_KEY:
raise HTTPException(status_code=401, detail="❌ Invalid API key")
if len(data.text.split()) < 20:
raise HTTPException(status_code=400, detail="⚠️ Review too short for analysis (min. 20 words).")
global log_store
try:
# === Generate Summary ===
summary = (
summarize_review(data.text, max_len=40, min_len=8)
if data.verbosity.lower() == "brief"
else smart_summarize(data.text, n_clusters=2 if data.intelligence else 1)
)
# === Sentiment + Emotion ===
sentiment_pipeline = pipeline("sentiment-analysis", model=data.model)
sentiment = sentiment_pipeline(data.text)[0]
emotion = detect_emotion(data.text)
churn_risk = assess_churn_risk(sentiment["label"], emotion)
# === Auto-detect metadata ===
industry = detect_industry(data.text) if not data.industry or "auto" in data.industry.lower() else data.industry
product_category = detect_product_category(data.text) if not data.product_category or "auto" in data.product_category.lower() else data.product_category
# === Optional: Pain Points ===
pain_points = extract_pain_points(data.text) if data.aspects else []
# === Log entry ===
log_store.append({
"timestamp": datetime.now(),
"product": product_category,
"churn_risk": churn_risk,
"user_id": str(uuid.uuid4())
})
if len(log_store) > 1000:
log_store = log_store[-1000:]
# === Final API Response ===
response = {
"summary": summary,
"sentiment": sentiment,
"emotion": emotion,
"product_category": product_category,
"device": "Web",
"industry": industry,
"churn_risk": churn_risk,
"pain_points": pain_points
}
if data.follow_up:
response["follow_up"] = answer_followup(data.text, data.follow_up, verbosity=data.verbosity)
return response
except Exception as e:
logging.error(f"πŸ”₯ Unexpected analysis failure: {traceback.format_exc()}")
raise HTTPException(status_code=500, detail="Internal Server Error during analysis.")
@app.post("/followup/")
async def followup(request: FollowUpRequest, x_api_key: str = Header(None)):
if x_api_key and x_api_key != VALID_API_KEY:
raise HTTPException(status_code=401, detail="Invalid API key")
try:
if not request.question or len(request.text.split()) < 10:
raise HTTPException(status_code=400, detail="Question or text is too short.")
return {"answer": answer_only(request.text, request.question)}
except Exception as e:
logging.error(f"❌ Follow-up failed: {traceback.format_exc()}")
raise HTTPException(status_code=500, detail="Follow-up generation failed.")
@app.get("/log/")
async def get_churn_log(x_api_key: str = Header(None)):
if x_api_key and x_api_key != VALID_API_KEY:
raise HTTPException(status_code=401, detail="Unauthorized")
return {"log": log_store}
@app.post("/bulk/")
async def bulk_analyze(data: BulkReviewInput, token: str = Query(None)):
if token != VALID_API_KEY:
raise HTTPException(status_code=401, detail="❌ Unauthorized: Invalid API token")
global log_store
try:
results = []
sentiment_pipeline = pipeline("sentiment-analysis", model=data.model)
for i, review_text in enumerate(data.reviews):
if not review_text.strip():
continue # Skip empty reviews
if len(review_text.split()) < 20:
results.append({
"review": review_text,
"error": "Too short to analyze"
})
continue
summary = smart_summarize(review_text, n_clusters=2 if data.intelligence else 1)
sentiment = sentiment_pipeline(review_text)[0]
emotion = detect_emotion(review_text)
churn = assess_churn_risk(sentiment["label"], emotion)
pain = extract_pain_points(review_text) if data.aspects else []
ind = auto_fill(data.industry[i] if data.industry else None, detect_industry(review_text))
prod = auto_fill(data.product_category[i] if data.product_category else None, detect_product_category(review_text))
dev = auto_fill(data.device[i] if data.device else None, "Web")
result = {
"review": review_text,
"summary": summary,
"sentiment": sentiment["label"],
"score": sentiment["score"],
"emotion": emotion,
"industry": ind,
"product_category": prod,
"device": dev,
"churn_risk": churn,
"pain_points": pain
}
# βœ… Optional follow-up
if data.follow_up and i < len(data.follow_up):
follow_q = data.follow_up[i]
result["follow_up"] = answer_followup(review_text, follow_q)
# βœ… Log churn entry
log_store.append({
"timestamp": datetime.now(),
"product": prod,
"churn_risk": churn,
"user_id": str(uuid.uuid4())
})
results.append(result)
# βœ… Cap log size
if len(log_store) > 1000:
log_store = log_store[-1000:]
return {"results": results}
except Exception as e:
logging.error(f"πŸ”₯ Bulk processing failed: {traceback.format_exc()}")
raise HTTPException(status_code=500, detail="Failed to analyze bulk reviews")
# Already set with os.environ β€” nothing else needed
@app.post("/rootcause/")
async def root_cause_analysis(payload: dict, x_api_key: str = Header(None)):
if x_api_key and x_api_key != VALID_API_KEY:
raise HTTPException(status_code=401, detail="Invalid API key")
try:
text = payload.get("text", "").strip()
if not text or len(text.split()) < 5:
raise HTTPException(status_code=400, detail="Insufficient input for root cause analysis.")
prompt = f"""
Analyze the following customer feedback and extract:
1. The main problem
2. The possible root cause
3. A suggested fix or which team might need to handle it
Feedback: '''{text}'''
Format your answer as:
Problem: ...
Cause: ...
Suggestion: ...
"""
response = client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": prompt}]
)
output = response.choices[0].message.content
lines = output.splitlines()
def extract_line(tag):
for line in lines:
if line.lower().startswith(tag.lower()):
return line.split(":", 1)[-1].strip()
return "β€”"
return {
"problem": extract_line("Problem"),
"cause": extract_line("Cause"),
"suggestion": extract_line("Suggestion")
}
except Exception as e:
logging.error(f"Root cause analysis failed: {traceback.format_exc()}")
return JSONResponse(status_code=500, content={"detail": f"Root cause generation failed: {str(e)}"})