|
import gradio as gr |
|
import json |
|
from sentence_transformers import SentenceTransformer |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
import numpy as np |
|
import os |
|
from huggingface_hub import upload_file, hf_hub_download, InferenceClient |
|
|
|
PUP_Themed_css = """ |
|
html, body, .gradio-container, .gr-app { |
|
height: 100% !important; |
|
margin: 0 !important; |
|
padding: 0 !important; |
|
background: linear-gradient(to bottom right, #800000, #ff0000, #ffeb3b, #ffa500) !important; |
|
font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif !important; |
|
color: #1b4332 !important; |
|
} |
|
""" |
|
|
|
embedding_model = SentenceTransformer('paraphrase-mpnet-base-v2') |
|
inference_token = os.getenv("HF_TOKEN") or os.getenv("PUP_AI_Chatbot_Token") |
|
inference_client = InferenceClient( |
|
model="mistralai/Mixtral-8x7B-Instruct-v0.1", |
|
token=inference_token |
|
) |
|
|
|
with open("dataset.json", "r") as f: |
|
dataset = json.load(f) |
|
|
|
questions = [item["question"] for item in dataset] |
|
answers = [item["answer"] for item in dataset] |
|
question_embeddings = embedding_model.encode(questions, convert_to_tensor=True) |
|
|
|
chat_history = [] |
|
feedback_data = [] |
|
feedback_questions = [] |
|
feedback_embeddings = None |
|
dev_mode = {"enabled": False} |
|
|
|
feedback_path = "outputs/feedback.json" |
|
os.makedirs("outputs", exist_ok=True) |
|
|
|
try: |
|
hf_token = os.getenv("PUP_AI_Chatbot_Token") |
|
downloaded_path = hf_hub_download( |
|
repo_id="oceddyyy/University_Inquiries_Feedback", |
|
filename="feedback.json", |
|
repo_type="dataset", |
|
token=hf_token |
|
) |
|
with open(downloaded_path, "r") as f: |
|
feedback_data = json.load(f) |
|
feedback_questions = [item["question"] for item in feedback_data] |
|
if feedback_questions: |
|
feedback_embeddings = embedding_model.encode(feedback_questions, convert_to_tensor=True) |
|
|
|
with open(feedback_path, "w") as f_local: |
|
json.dump(feedback_data, f_local, indent=4) |
|
|
|
except Exception as e: |
|
print(f"[Startup] No feedback loaded from HF: {e}") |
|
feedback_data = [] |
|
|
|
def upload_feedback_to_hf(): |
|
hf_token = os.getenv("PUP_AI_Chatbot_Token") |
|
if not hf_token: |
|
raise ValueError("Hugging Face token not found in environment variables!") |
|
|
|
try: |
|
upload_file( |
|
path_or_fileobj=feedback_path, |
|
path_in_repo="feedback.json", |
|
repo_id="oceddyyy/University_Inquiries_Feedback", |
|
repo_type="dataset", |
|
token=hf_token |
|
) |
|
print("Feedback uploaded to Hugging Face successfully.") |
|
except Exception as e: |
|
print(f"Error uploading feedback to HF: {e}") |
|
|
|
def chatbot_response(query, chat_history): |
|
query_embedding = embedding_model.encode([query], convert_to_tensor=True) |
|
|
|
if feedback_embeddings is not None: |
|
feedback_scores = cosine_similarity(query_embedding.cpu().numpy(), feedback_embeddings.cpu().numpy())[0] |
|
best_idx = int(np.argmax(feedback_scores)) |
|
best_score = feedback_scores[best_idx] |
|
matched_feedback = feedback_data[best_idx] |
|
|
|
base_threshold = 0.8 |
|
upvotes = matched_feedback.get("upvotes", 0) |
|
downvotes = matched_feedback.get("downvotes", 0) |
|
adjusted_threshold = base_threshold - (0.01 * upvotes) + (0.01 * downvotes) |
|
dynamic_threshold = min(max(adjusted_threshold, 0.4), 1.0) |
|
|
|
if best_score >= dynamic_threshold: |
|
response = matched_feedback["response"] |
|
chat_history.append((query, response)) |
|
return "", chat_history, gr.update(visible=True) |
|
|
|
similarity_scores = cosine_similarity(query_embedding.cpu().numpy(), question_embeddings.cpu().numpy())[0] |
|
best_idx = int(np.argmax(similarity_scores)) |
|
best_score = similarity_scores[best_idx] |
|
matched_item = dataset[best_idx] |
|
matched_a = matched_item.get("answer", "") |
|
|
|
if best_score < 0.4: |
|
response = "Sorry, but the PUP handbook does not contain such information." |
|
else: |
|
if dev_mode["enabled"]: |
|
prompt = ( |
|
f"A student asked:\n\"{query}\"\n\n" |
|
f"Relevant handbook info:\n\"{matched_a}\"\n\n" |
|
f"Please answer based only on this handbook content." |
|
) |
|
try: |
|
response = inference_client.text_generation(prompt, max_new_tokens=200, temperature=0.7) |
|
except Exception as e: |
|
print(f"[ERROR] HF inference failed: {e}") |
|
response = f"(Fallback) {matched_a}" |
|
else: |
|
if "month" in matched_item and "year" in matched_item: |
|
response = f"As of {matched_item['month']}, {matched_item['year']}, {matched_a}" |
|
else: |
|
response = f"According to 2019 Proposed PUP Handbook, {matched_a}" |
|
|
|
chat_history.append((query, response.strip())) |
|
return "", chat_history, gr.update(visible=True) |
|
|
|
|
|
def record_feedback(feedback, chat_history): |
|
global feedback_embeddings, feedback_questions |
|
if chat_history: |
|
last_query, last_response = chat_history[-1] |
|
matched = False |
|
new_embedding = embedding_model.encode([last_query], convert_to_tensor=True) |
|
|
|
for item in feedback_data: |
|
existing_embedding = embedding_model.encode([item["question"]], convert_to_tensor=True) |
|
similarity = cosine_similarity(existing_embedding.cpu().numpy(), new_embedding.cpu().numpy())[0][0] |
|
if similarity >= 0.8 and item["response"] == last_response: |
|
matched = True |
|
votes = {"positive": "upvotes", "negative": "downvotes"} |
|
item[votes[feedback]] = item.get(votes[feedback], 0) + 1 |
|
break |
|
|
|
if not matched: |
|
entry = { |
|
"question": last_query, |
|
"response": last_response, |
|
"feedback": feedback, |
|
"upvotes": 1 if feedback == "positive" else 0, |
|
"downvotes": 1 if feedback == "negative" else 0 |
|
} |
|
feedback_data.append(entry) |
|
|
|
with open(feedback_path, "w") as f: |
|
json.dump(feedback_data, f, indent=4) |
|
|
|
feedback_questions = [item["question"] for item in feedback_data] |
|
if feedback_questions: |
|
feedback_embeddings = embedding_model.encode(feedback_questions, convert_to_tensor=True) |
|
|
|
upload_feedback_to_hf() |
|
|
|
return gr.update(visible=False) |
|
|
|
with gr.Blocks(css=PUP_Themed_css, title="University Handbook AI Chatbot") as demo: |
|
gr.Markdown( |
|
""" |
|
<div style=' |
|
background-color: var(--block-background-fill); |
|
border-radius: 16px; |
|
padding: 24px 16px; |
|
margin-bottom: 24px; |
|
box-shadow: 0 6px 16px rgba(0, 0, 0, 0.15); |
|
max-width: 700px; |
|
margin-left: auto; |
|
margin-right: auto; |
|
text-align: center; |
|
color: var(--text-color);'> |
|
<h1 style='font-size: 2.2rem; margin: 0;'>University Inquiries AI Chatbot</h1> |
|
</div> |
|
""" |
|
) |
|
|
|
state = gr.State(chat_history) |
|
chatbot_ui = gr.Chatbot(label="Chat", show_label=False) |
|
|
|
with gr.Row(): |
|
dev_btn = gr.Button("DevMode π") |
|
password_box = gr.Textbox(placeholder="Enter Dev password", type="password", visible=False, show_label=False) |
|
confirm_btn = gr.Button("Confirm", visible=False) |
|
|
|
dev_pass = os.getenv("DEV_MODE_PASSWORD", "letmein") |
|
|
|
def show_password_input(): |
|
return gr.update(visible=True), gr.update(visible=True) |
|
|
|
def enable_devmode(password_input): |
|
if password_input == dev_pass: |
|
dev_mode["enabled"] = True |
|
return gr.update(visible=False), gr.update(visible=False), gr.update(value="DevMode β
", interactive=False) |
|
return gr.update(visible=True), gr.update(visible=True), gr.update(value="Wrong password. Try again.") |
|
|
|
dev_btn.click(show_password_input, outputs=[password_box, confirm_btn]) |
|
confirm_btn.click(enable_devmode, inputs=[password_box], outputs=[password_box, confirm_btn, dev_btn]) |
|
|
|
with gr.Row(): |
|
query_input = gr.Textbox(placeholder="Type your question here...", show_label=False) |
|
submit_btn = gr.Button("Submit") |
|
|
|
with gr.Row(visible=False) as feedback_row: |
|
gr.Markdown("Was this helpful?") |
|
thumbs_up = gr.Button("π") |
|
thumbs_down = gr.Button("π") |
|
|
|
def handle_submit(message, chat_state): |
|
return chatbot_response(message, chat_state) |
|
|
|
submit_btn.click(handle_submit, [query_input, state], [query_input, chatbot_ui, feedback_row]) |
|
query_input.submit(handle_submit, [query_input, state], [query_input, chatbot_ui, feedback_row]) |
|
|
|
thumbs_up.click(lambda state: record_feedback("positive", state), inputs=[state], outputs=[feedback_row]) |
|
thumbs_down.click(lambda state: record_feedback("negative", state), inputs=[state], outputs=[feedback_row]) |
|
|
|
if __name__ == "__main__": |
|
demo.launch() |
|
|