Prathamesh1420's picture
Update app.py
04771f5 verified
import torch
import mauve
from sacrebleu import corpus_bleu
from rouge_score import rouge_scorer
from bert_score import score
from transformers import GPT2LMHeadModel, GPT2Tokenizer, pipeline, AutoTokenizer, AutoModelForCausalLM
import re
from mauve import compute_mauve
import os
import gradio as gr
import requests
import mlflow
import dagshub
from pinecone import Pinecone
from langchain.prompts import PromptTemplate
from langchain.chains.llm import LLMChain
from langchain.llms.base import LLM
from typing import Optional, List, Mapping, Any
import time
from langchain_community.embeddings import HuggingFaceEmbeddings
from dotenv import load_dotenv
from datetime import datetime
# ------------------ Load Environment ------------------
load_dotenv()
pinecone_api_key = os.environ.get("PINECONE_API_KEY")
mlflow_tracking_uri = os.environ.get("MLFLOW_TRACKING_URI")
# ------------------ DagsHub & MLflow Setup ------------------
try:
dagshub.init(
repo_owner='prathamesh.khade20',
repo_name='Maintenance_AI_website',
mlflow=True
)
mlflow.set_tracking_uri(mlflow_tracking_uri)
mlflow.set_experiment("Maintenance-RAG-Chatbot")
mlflow.langchain.autolog()
except Exception as e:
print(f"MLflow/DagsHub initialization failed: {e}")
# ------------------ RAG Evaluator ------------------
class RAGEvaluator:
def __init__(self):
try:
self.gpt2_model, self.gpt2_tokenizer = self.load_gpt2_model()
self.bias_pipeline = pipeline("zero-shot-classification", model="Hate-speech-CNERG/dehatebert-mono-english")
self.tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
except Exception as e:
print(f"Evaluator initialization failed: {e}")
def load_gpt2_model(self):
model = GPT2LMHeadModel.from_pretrained('gpt2')
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
return model, tokenizer
def evaluate_bleu_rouge(self, candidates, references):
try:
bleu_score = corpus_bleu(candidates, [references]).score
scorer = rouge_scorer.RougeScorer(['rouge1', 'rouge2', 'rougeL'], use_stemmer=True)
rouge_scores = [scorer.score(ref, cand) for ref, cand in zip(references, candidates)]
rouge1 = sum([score['rouge1'].fmeasure for score in rouge_scores]) / len(rouge_scores)
rouge2 = sum([score['rouge2'].fmeasure for score in rouge_scores]) / len(rouge_scores)
rougeL = sum([score['rougeL'].fmeasure for score in rouge_scores]) / len(rouge_scores)
return bleu_score, rouge1, rouge2, rougeL
except Exception as e:
print(f"BLEU/ROUGE evaluation failed: {e}")
return 0, 0, 0, 0
def evaluate_bert_score(self, candidates, references):
try:
P, R, F1 = score(candidates, references, lang="en", model_type='bert-base-multilingual-cased')
return P.mean().item(), R.mean().item(), F1.mean().item()
except Exception as e:
print(f"BERT score evaluation failed: {e}")
return 0, 0, 0
def evaluate_perplexity(self, text):
try:
encodings = self.gpt2_tokenizer(text, return_tensors='pt')
max_length = self.gpt2_model.config.n_positions
stride = 512
lls = []
for i in range(0, encodings.input_ids.size(1), stride):
begin_loc = max(i + stride - max_length, 0)
end_loc = min(i + stride, encodings.input_ids.size(1))
trg_len = end_loc - i
input_ids = encodings.input_ids[:, begin_loc:end_loc]
target_ids = input_ids.clone()
target_ids[:, :-trg_len] = -100
with torch.no_grad():
outputs = self.gpt2_model(input_ids, labels=target_ids)
log_likelihood = outputs[0] * trg_len
lls.append(log_likelihood)
ppl = torch.exp(torch.stack(lls).sum() / end_loc)
return ppl.item()
except Exception as e:
print(f"Perplexity evaluation failed: {e}")
return 1000.0
def evaluate_diversity(self, texts):
try:
all_tokens = []
for text in texts:
tokens = self.tokenizer.tokenize(text)
all_tokens.extend(tokens)
unique_bigrams = set()
for i in range(len(all_tokens) - 1):
unique_bigrams.add((all_tokens[i], all_tokens[i+1]))
return len(unique_bigrams) / len(all_tokens) if all_tokens else 0
except Exception as e:
print(f"Diversity evaluation failed: {e}")
return 0
def evaluate_racial_bias(self, text):
try:
results = self.bias_pipeline([text], candidate_labels=["hate speech", "not hate speech"])
bias_score = results[0]['scores'][results[0]['labels'].index('hate speech')]
return bias_score
except Exception as e:
print(f"Bias evaluation failed: {e}")
return 0
def evaluate_meteor(self, candidates, references):
try:
meteor_scores = []
for ref, cand in zip(references, candidates):
ref_tokens = self.tokenizer.tokenize(ref)
cand_tokens = self.tokenizer.tokenize(cand)
common_tokens = set(ref_tokens) & set(cand_tokens)
precision = len(common_tokens) / len(cand_tokens) if cand_tokens else 0
recall = len(common_tokens) / len(ref_tokens) if ref_tokens else 0
if precision + recall == 0:
f_score = 0
else:
f_score = (10 * precision * recall) / (9 * precision + recall)
meteor_scores.append(f_score)
return sum(meteor_scores) / len(meteor_scores) if meteor_scores else 0
except Exception as e:
print(f"METEOR evaluation failed: {e}")
return 0
def evaluate_chrf(self, candidates, references):
try:
chrf_scores = []
for ref, cand in zip(references, candidates):
ref_chars = list(ref)
cand_chars = list(cand)
ref_ngrams = set()
cand_ngrams = set()
for i in range(len(ref_chars) - 5):
ref_ngrams.add(tuple(ref_chars[i:i+6]))
for i in range(len(cand_chars) - 5):
cand_ngrams.add(tuple(cand_chars[i:i+6]))
common_ngrams = ref_ngrams & cand_ngrams
precision = len(common_ngrams) / len(cand_ngrams) if cand_ngrams else 0
recall = len(common_ngrams) / len(ref_ngrams) if ref_ngrams else 0
chrf_score = 2 * precision * recall / (precision + recall) if precision + recall else 0
chrf_scores.append(chrf_score)
return sum(chrf_scores) / len(chrf_scores) if chrf_scores else 0
except Exception as e:
print(f"CHRF evaluation failed: {e}")
return 0
def evaluate_readability(self, text):
try:
words = re.findall(r'\b\w+\b', text.lower())
sentences = re.split(r'[.!?]+', text)
num_words = len(words)
num_sentences = len([s for s in sentences if s.strip()])
avg_word_length = sum(len(word) for word in words) / num_words if num_words else 0
words_per_sentence = num_words / num_sentences if num_sentences else 0
flesch_ease = 206.835 - (1.015 * words_per_sentence) - (84.6 * avg_word_length)
flesch_grade = (0.39 * words_per_sentence) + (11.8 * avg_word_length) - 15.59
return flesch_ease, flesch_grade
except Exception as e:
print(f"Readability evaluation failed: {e}")
return 0, 0
def evaluate_mauve(self, reference_texts, generated_texts):
try:
out = compute_mauve(
p_text=reference_texts,
q_text=generated_texts,
device_id=0,
max_text_length=1024,
verbose=False
)
return out.mauve
except Exception as e:
print(f"MAUVE evaluation failed: {e}")
return 0.0
def evaluate_all(self, question, response, reference):
try:
candidates = [response]
references = [reference]
bleu, rouge1, rouge2, rougeL = self.evaluate_bleu_rouge(candidates, references)
bert_p, bert_r, bert_f1 = self.evaluate_bert_score(candidates, references)
perplexity = self.evaluate_perplexity(response)
diversity = self.evaluate_diversity(candidates)
racial_bias = self.evaluate_racial_bias(response)
meteor = self.evaluate_meteor(candidates, references)
chrf = self.evaluate_chrf(candidates, references)
flesch_ease, flesch_grade = self.evaluate_readability(response)
mauve_score = self.evaluate_mauve(references, candidates) if len(references) > 1 else 0.0
return {
"BLEU": bleu,
"ROUGE-1": rouge1,
"ROUGE-2": rouge2,
"ROUGE-L": rougeL,
"BERT_Precision": bert_p,
"BERT_Recall": bert_r,
"BERT_F1": bert_f1,
"Perplexity": perplexity,
"Diversity": diversity,
"Racial_Bias": racial_bias,
"MAUVE": mauve_score,
"METEOR": meteor,
"CHRF": chrf,
"Flesch_Reading_Ease": flesch_ease,
"Flesch_Kincaid_Grade": flesch_grade,
}
except Exception as e:
print(f"Complete evaluation failed: {e}")
return {"error": str(e)}
# Initialize evaluator
evaluator = RAGEvaluator()
# ------------------ Pinecone ------------------
def init_pinecone():
try:
pc = Pinecone(api_key=pinecone_api_key)
return pc.Index("rag-granite-index")
except Exception as e:
print(f"Pinecone initialization failed: {e}")
return None
index = init_pinecone()
# ------------------ Embeddings ------------------
try:
embeddings_model = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
except Exception as e:
print(f"Embeddings initialization failed: {e}")
embeddings_model = None
def get_retrieved_context(query: str, top_k=3):
if not index or not embeddings_model:
return "No context available - system initialization failed"
try:
start = time.time()
query_embedding = embeddings_model.embed_query(query)
if mlflow.active_run():
mlflow.log_metric("embedding_latency", time.time() - start)
results = index.query(
namespace="rag-ns",
vector=query_embedding,
top_k=top_k,
include_metadata=True
)
if mlflow.active_run():
mlflow.log_metric("retrieved_chunks", len(results['matches']))
context_texts = [m['metadata']['text'] for m in results['matches']]
return "\n".join(context_texts) if context_texts else "No relevant context found."
except Exception as e:
print(f"Context retrieval failed: {e}")
return f"Context retrieval error: {str(e)}"
# ------------------ Fallback LLM Models ------------------
class FallbackLLM:
def __init__(self):
self.models_loaded = False
self.pipeline = None
self.load_fallback_models()
def load_fallback_models(self):
"""Load local models as fallback"""
try:
# Use a smaller model for fallback
self.pipeline = pipeline(
"text-generation",
model="microsoft/DialoGPT-small",
tokenizer="microsoft/DialoGPT-small",
max_length=150,
do_sample=True,
temperature=0.7
)
self.models_loaded = True
print("Fallback model loaded successfully")
except Exception as e:
print(f"Fallback model loading failed: {e}")
self.models_loaded = False
def generate_response(self, context, question):
if not self.models_loaded:
return "I'm currently experiencing technical difficulties. Please try again later."
try:
prompt = f"""
Based on the following context, please provide a concise answer to the question.
Context: {context}
Question: {question}
Answer: """
response = self.pipeline(
prompt,
max_new_tokens=100,
num_return_sequences=1,
pad_token_id=50256
)
if response and len(response) > 0:
full_response = response[0]['generated_text']
# Extract only the answer part
if "Answer:" in full_response:
answer = full_response.split("Answer:")[-1].strip()
return answer
return full_response.strip()
else:
return "I couldn't generate a response at the moment. Please try again."
except Exception as e:
print(f"Fallback model generation failed: {e}")
return "I'm having trouble generating a response. Please try again later."
# Initialize fallback LLM
fallback_llm = FallbackLLM()
# ------------------ Custom LLM with Fallback ------------------
class RobustLitServeLLM(LLM):
endpoint_url: str
use_fallback: bool = True
def _call(self, prompt: str, stop: Optional[List[str]] = None) -> str:
# Try the primary endpoint first
primary_success, primary_response = self._try_primary_endpoint(prompt)
if primary_success:
return primary_response
# If primary fails and fallback is enabled, use fallback
if self.use_fallback:
print("Using fallback LLM due to primary endpoint failure")
# Extract context and question from prompt
context, question = self._extract_context_question(prompt)
return fallback_llm.generate_response(context, question)
else:
return "I apologize, but the AI service is currently unavailable. Please try again later."
def _try_primary_endpoint(self, prompt: str):
"""Try to get response from primary endpoint"""
try:
payload = {"prompt": prompt}
start_time = time.time()
response = requests.post(self.endpoint_url, json=payload, timeout=30)
if mlflow.active_run():
mlflow.log_metric("lit_serve_latency", time.time() - start_time)
if response.status_code == 200:
data = response.json()
if mlflow.active_run():
mlflow.log_metric("response_tokens", len(data.get("response", "").split()))
return True, data.get("response", "").strip()
else:
if mlflow.active_run():
mlflow.log_metric("request_errors", 1)
print(f"Primary endpoint failed with status: {response.status_code}")
return False, ""
except Exception as e:
print(f"Primary endpoint error: {e}")
return False, ""
def _extract_context_question(self, prompt: str):
"""Extract context and question from the prompt template"""
try:
if "Context:" in prompt and "Question:" in prompt:
context_part = prompt.split("Context:")[1].split("Question:")[0].strip()
question_part = prompt.split("Question:")[1].split("Answer:")[0].strip()
return context_part, question_part
return "", prompt
except:
return "", prompt
@property
def _identifying_params(self) -> Mapping[str, Any]:
return {"endpoint_url": self.endpoint_url, "use_fallback": self.use_fallback}
@property
def _llm_type(self) -> str:
return "robust_litserve_llm"
# Initialize the robust model
try:
model = RobustLitServeLLM(
endpoint_url="https://8001-01k2h9d9mervcmgfn66ybkpwvq.cloudspaces.litng.ai/predict",
use_fallback=True
)
print("Robust LLM initialized successfully")
except Exception as e:
print(f"Robust LLM initialization failed: {e}")
model = None
# ------------------ Prompt Template ------------------
prompt = PromptTemplate(
input_variables=["context", "question"],
template="""
You are a smart maintenance assistant. Based on the provided context, answer the question concisely in 1-2 lines.
Context:
{context}
Question: {question}
Answer:
"""
)
# Initialize LLM chain
try:
if model:
llm_chain = LLMChain(llm=model, prompt=prompt)
print("LLM chain initialized successfully")
else:
llm_chain = None
print("LLM chain not initialized - no model available")
except Exception as e:
print(f"LLM chain initialization failed: {e}")
llm_chain = None
# ------------------ RAG Pipeline ------------------
def get_rag_response(question):
"""Get the complete RAG response with robust error handling"""
try:
if not question.strip():
return "Please enter a valid question.", ""
# Get context from Pinecone
retrieved_context = get_retrieved_context(question)
# If we have an LLM chain, use it
if llm_chain:
result = llm_chain.invoke({
"context": retrieved_context,
"question": question
})
full_response = result["text"].strip()
# Clean up the response
if "Answer:" in full_response:
full_response = full_response.split("Answer:")[-1].strip()
return full_response, retrieved_context
else:
# Use direct fallback
fallback_response = fallback_llm.generate_response(retrieved_context, question)
return fallback_response, retrieved_context
except Exception as e:
error_msg = f"Error in RAG pipeline: {str(e)}"
print(error_msg)
# Final fallback - simple response based on context
if "context" in locals() and retrieved_context:
return f"Based on available information: I found relevant maintenance data, but encountered an issue processing it. Context available: {len(retrieved_context)} characters.", retrieved_context
else:
return "I apologize, but I'm experiencing technical difficulties. Please try again later or contact support.", "No context retrieved"
def rag_pipeline_stream(question):
"""Streaming version of RAG pipeline"""
try:
full_response, _ = get_rag_response(question)
# Stream word by word for better UX
words = full_response.split()
current_text = ""
for word in words:
current_text += word + " "
yield current_text
time.sleep(0.03) # Faster streaming
except Exception as e:
error_msg = f"Error in streaming: {str(e)}"
print(error_msg)
yield "I apologize, but I encountered an error while generating the response."
# ------------------ Gradio UI ------------------
with gr.Blocks(theme=gr.themes.Soft(), title="Maintenance AI Assistant") as demo:
gr.Markdown("""
# πŸ›  Maintenance AI Assistant
*Your intelligent companion for maintenance queries and troubleshooting*
**Note**: This system uses multiple fallback mechanisms to ensure reliability.
""")
usage_counter = gr.State(value=0)
session_start = gr.State(value=datetime.now().isoformat())
current_response = gr.State(value="")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### πŸ’¬ Chat Interface")
question_input = gr.Textbox(
label="Ask your maintenance question",
placeholder="e.g., How do I troubleshoot a leaking valve? What's the maintenance schedule for pumps?",
lines=3
)
ask_button = gr.Button("Get Answer πŸš€", variant="primary", size="lg")
with gr.Row():
clear_btn = gr.Button("Clear Chat πŸ—‘οΈ")
evaluate_btn = gr.Button("Show Metrics πŸ“ˆ", variant="secondary")
feedback = gr.Radio(
["Helpful", "Not Helpful"],
label="Was this response helpful?",
info="Your feedback helps improve the system"
)
with gr.Column(scale=1):
gr.Markdown("### πŸ€– AI Response")
answer_output = gr.Textbox(
label="Response",
lines=8,
interactive=False,
show_copy_button=True,
autoscroll=True
)
gr.Markdown("### πŸ“Š Evaluation Metrics")
metrics_output = gr.JSON(
label="Quality Metrics",
visible=False,
show_label=True
)
def track_usage(question, count, session_start, feedback_value=None):
"""Track usage and get response"""
if not question.strip():
return "Please enter a question.", count, session_start, ""
count += 1
try:
# Only use MLflow if properly configured
if mlflow_tracking_uri:
with mlflow.start_run(run_name=f"User-Interaction-{count}", nested=True):
mlflow.log_param("question", question)
mlflow.log_param("session_start", session_start)
mlflow.log_param("user_feedback", feedback_value or "No feedback")
if feedback_value:
mlflow.log_metric("helpful_responses", 1 if feedback_value == "Helpful" else 0)
mlflow.log_metric("total_queries", count)
# Get response and context
response, context = get_rag_response(question)
mlflow.log_metric("response_length", len(response))
mlflow.log_metric("response_tokens", len(response.split()))
mlflow.log_metric("context_length", len(context))
return response, count, session_start, response
else:
# Without MLflow
response, context = get_rag_response(question)
return response, count, session_start, response
except Exception as e:
print(f"Tracking error: {e}")
error_response = f"I encountered a system error. Please try again. Error: {str(e)}"
return error_response, count, session_start, error_response
def evaluate_response(question, response):
"""Evaluate the response and return metrics"""
if not question or not response:
return gr.update(value={"info": "No question or response to evaluate"}, visible=True)
# Skip evaluation for error responses
if any(error_word in response.lower() for error_word in ["error", "apologize", "unavailable", "technical"]):
return gr.update(value={"info": "Evaluation skipped for error response"}, visible=True)
try:
context = get_retrieved_context(question)
metrics = evaluator.evaluate_all(question, response, context)
return gr.update(value=metrics, visible=True)
except Exception as e:
print(f"Evaluation error: {e}")
return gr.update(value={"error": f"Evaluation failed: {str(e)}"}, visible=True)
def clear_chat():
"""Clear the chat interface"""
return "", "", gr.update(visible=False)
# Main interaction flow
ask_button.click(
fn=lambda: ("", gr.update(visible=False)), # Clear previous metrics and response
outputs=[answer_output, metrics_output]
).then(
fn=rag_pipeline_stream,
inputs=[question_input],
outputs=[answer_output]
).then(
fn=track_usage,
inputs=[question_input, usage_counter, session_start, feedback],
outputs=[answer_output, usage_counter, session_start, current_response]
)
# Evaluation flow
evaluate_btn.click(
fn=evaluate_response,
inputs=[question_input, current_response],
outputs=[metrics_output]
)
# Clear chat
clear_btn.click(
fn=clear_chat,
outputs=[question_input, answer_output, metrics_output]
)
# Handle feedback
def handle_feedback(feedback_val):
try:
if mlflow_tracking_uri and mlflow.active_run():
mlflow.log_metric("user_feedback_score", 1 if feedback_val == "Helpful" else 0)
except:
pass # Silently fail if feedback logging doesn't work
feedback.change(
fn=handle_feedback,
inputs=[feedback],
outputs=[]
)
if __name__ == "__main__":
print("πŸš€ Starting Maintenance AI Assistant...")
print("βœ… System initialized with fallback mechanisms")
print("🌐 Web interface available at http://0.0.0.0:7860")
demo.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True,
debug=False
)