|
import gradio as gr |
|
import json |
|
from sentence_transformers import SentenceTransformer |
|
from transformers import pipeline |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
import numpy as np |
|
import os |
|
from huggingface_hub import upload_file, hf_hub_download |
|
|
|
PUP_Themed_css = """ |
|
html, body, .gradio-container, .gr-app { |
|
height: 100% !important; |
|
margin: 0 !important; |
|
padding: 0 !important; |
|
background: linear-gradient(to bottom right, #800000, #ff0000, #ffeb3b, #ffa500) !important; |
|
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif !important; |
|
color: #1b4332 !important; |
|
} |
|
""" |
|
|
|
embedding_model = SentenceTransformer('paraphrase-mpnet-base-v2') |
|
llm = pipeline("text2text-generation", model="google/flan-t5-small") |
|
|
|
with open("dataset.json", "r") as f: |
|
dataset = json.load(f) |
|
|
|
questions = [item["question"] for item in dataset] |
|
answers = [item["answer"] for item in dataset] |
|
question_embeddings = embedding_model.encode(questions, convert_to_tensor=True) |
|
|
|
chat_history = [] |
|
feedback_data = [] |
|
feedback_questions = [] |
|
feedback_answers = [] |
|
feedback_embeddings = None |
|
|
|
feedback_path = "outputs/feedback.json" |
|
os.makedirs("outputs", exist_ok=True) |
|
|
|
try: |
|
hf_token = os.getenv("PUP_AI_Chatbot_Token") |
|
downloaded_path = hf_hub_download( |
|
repo_id="oceddyyy/University_Inquiries_Feedback", |
|
filename="feedback.json", |
|
repo_type="dataset", |
|
token=hf_token |
|
) |
|
with open(downloaded_path, "r") as f: |
|
feedback_data = json.load(f) |
|
feedback_questions = [item["question"] for item in feedback_data] |
|
feedback_answers = [item["response"] for item in feedback_data] |
|
if feedback_questions: |
|
feedback_embeddings = embedding_model.encode(feedback_questions, convert_to_tensor=True) |
|
|
|
with open(feedback_path, "w") as f_local: |
|
json.dump(feedback_data, f_local, indent=4) |
|
|
|
except Exception as e: |
|
print(f"[Startup] No feedback loaded from HF: {e}") |
|
feedback_data = [] |
|
|
|
def upload_feedback_to_hf(): |
|
hf_token = os.getenv("PUP_AI_Chatbot_Token") |
|
if not hf_token: |
|
raise ValueError("Hugging Face token not found in environment variables!") |
|
|
|
try: |
|
upload_file( |
|
path_or_fileobj=feedback_path, |
|
path_in_repo="feedback.json", |
|
repo_id="oceddyyy/University_Inquiries_Feedback", |
|
repo_type="dataset", |
|
token=hf_token |
|
) |
|
print("Feedback uploaded to Hugging Face successfully.") |
|
except Exception as e: |
|
print(f"Error uploading feedback to HF: {e}") |
|
|
|
def chatbot_response(query, chat_history): |
|
query_embedding = embedding_model.encode([query], convert_to_tensor=True) |
|
|
|
if feedback_embeddings is not None: |
|
feedback_scores = cosine_similarity(query_embedding.cpu().numpy(), feedback_embeddings.cpu().numpy())[0] |
|
best_idx = int(np.argmax(feedback_scores)) |
|
best_score = feedback_scores[best_idx] |
|
matched_feedback = feedback_data[best_idx] |
|
|
|
base_threshold = 0.8 |
|
upvotes = matched_feedback.get("upvotes", 0) |
|
downvotes = matched_feedback.get("downvotes", 0) |
|
adjusted_threshold = base_threshold - (0.01 * upvotes) + (0.01 * downvotes) |
|
dynamic_threshold = min(max(adjusted_threshold, 0.4), 1.0) |
|
|
|
if best_score >= dynamic_threshold: |
|
response = matched_feedback["response"] |
|
chat_history.append((query, response)) |
|
return "", chat_history, gr.update(visible=True) |
|
|
|
similarity_scores = cosine_similarity(query_embedding.cpu().numpy(), question_embeddings.cpu().numpy())[0] |
|
best_idx = int(np.argmax(similarity_scores)) |
|
best_score = similarity_scores[best_idx] |
|
matched_q = questions[best_idx] |
|
matched_a = answers[best_idx] |
|
|
|
if best_score < 0.4: |
|
response = "Sorry, but the PUP handbook does not contain such information." |
|
chat_history.append((query, response)) |
|
return "", chat_history, gr.update(visible=True) |
|
|
|
prompt = ( |
|
f"\"{matched_a}\"\n\n" |
|
f"Please explain this to a student in a short, natural, and easy-to-understand way. " |
|
f"Use simple words, and do not add new information." |
|
) |
|
|
|
llm_response = llm(prompt, max_length=200, do_sample=True, temperature=0.7, top_p=0.9)[0]["generated_text"].strip() |
|
if not llm_response: |
|
llm_response = "I'm sorry, I couldn't simplify that at the moment." |
|
|
|
a_embedding = embedding_model.encode([matched_a], convert_to_tensor=True) |
|
llm_embedding = embedding_model.encode([llm_response], convert_to_tensor=True) |
|
explanation_similarity = cosine_similarity(a_embedding.cpu().numpy(), llm_embedding.cpu().numpy())[0][0] |
|
|
|
if explanation_similarity >= 0.95: |
|
final_response = f"According to the university handbook, {matched_a}" |
|
else: |
|
final_response = f"According to the university handbook, {matched_a} In simpler terms, {llm_response}" |
|
|
|
chat_history.append((query, final_response)) |
|
return "", chat_history, gr.update(visible=True) |
|
|
|
def record_feedback(feedback, chat_history): |
|
global feedback_embeddings |
|
if chat_history: |
|
last_query, last_response = chat_history[-1] |
|
matched = False |
|
|
|
for item in feedback_data: |
|
existing_embedding = embedding_model.encode([item["question"]], convert_to_tensor=True) |
|
new_embedding = embedding_model.encode([last_query], convert_to_tensor=True) |
|
similarity = cosine_similarity(existing_embedding.cpu().numpy(), new_embedding.cpu().numpy())[0][0] |
|
if similarity >= 0.8 and item["response"] == last_response: |
|
matched = True |
|
votes = {"positive": "upvotes", "negative": "downvotes"} |
|
item[votes[feedback]] = item.get(votes[feedback], 0) + 1 |
|
break |
|
|
|
if not matched: |
|
entry = { |
|
"question": last_query, |
|
"response": last_response, |
|
"feedback": feedback, |
|
"upvotes": 1 if feedback == "positive" else 0, |
|
"downvotes": 1 if feedback == "negative" else 0 |
|
} |
|
feedback_data.append(entry) |
|
|
|
with open(feedback_path, "w") as f: |
|
json.dump(feedback_data, f, indent=4) |
|
|
|
feedback_questions = [item["question"] for item in feedback_data] |
|
if feedback_questions: |
|
feedback_embeddings = embedding_model.encode(feedback_questions, convert_to_tensor=True) |
|
|
|
upload_feedback_to_hf() |
|
|
|
return gr.update(visible=False) |
|
|
|
with gr.Blocks(css=PUP_Themed_css, title="University Handbook AI Chatbot") as demo: |
|
gr.Markdown( |
|
"<div style='" |
|
"background-color: #ffffff; " |
|
"border-radius: 16px; " |
|
"padding: 24px 16px; " |
|
"margin-bottom: 24px; " |
|
"box-shadow: 0 6px 16px rgba(0, 0, 0, 0.15); " |
|
"max-width: 700px; " |
|
"margin-left: auto; " |
|
"margin-right: auto; " |
|
"text-align: center;'>" |
|
"<h1 style='font-size: 2.2rem; margin: 0;'>University Inquiries AI Chatbot</h1>" |
|
"</div>" |
|
) |
|
|
|
state = gr.State(chat_history) |
|
chatbot_ui = gr.Chatbot(label="Chat", show_label=False) |
|
|
|
with gr.Row(): |
|
query_input = gr.Textbox(placeholder="Type your question here...", show_label=False) |
|
submit_btn = gr.Button("Submit") |
|
|
|
with gr.Row(visible=False) as feedback_row: |
|
gr.Markdown("Was this helpful?") |
|
thumbs_up = gr.Button("👍") |
|
thumbs_down = gr.Button("👎") |
|
|
|
def handle_submit(message, chat_state): |
|
return chatbot_response(message, chat_state) |
|
|
|
submit_btn.click(handle_submit, [query_input, state], [query_input, chatbot_ui, feedback_row]) |
|
query_input.submit(handle_submit, [query_input, state], [query_input, chatbot_ui, feedback_row]) |
|
|
|
thumbs_up.click(lambda state: record_feedback("positive", state), inputs=[state], outputs=[feedback_row]) |
|
thumbs_down.click(lambda state: record_feedback("negative", state), inputs=[state], outputs=[feedback_row]) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |
|
|