Spaces:
Sleeping
Sleeping
import math | |
import time | |
import random | |
from google import genai | |
import google.generativeai as genai_ext | |
from google.cloud import aiplatform | |
from transformers import pipeline | |
from google.genai import types | |
import gradio as gr | |
import os, tempfile | |
import torch | |
# --- Env & GCP setup --- | |
creds_json = os.getenv("GCP_CREDS_JSON") | |
if not creds_json: | |
raise Exception("⚠️ Missing GCP_CREDS_JSON secret!") | |
# Save to temp file (dev convenience) - secure this in production | |
with tempfile.NamedTemporaryFile(mode='w+', delete=False) as tmpfile: | |
tmpfile.write(creds_json) | |
creds_path = tmpfile.name | |
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = creds_path | |
# Initialize GCP API (replace project/location as needed) | |
aiplatform.init(project="emotionmodel-466815", location="us-central1") | |
# --- LLM / Gemini setup --- | |
apikey = os.environ.get("GEMINI_API_KEY") | |
if not apikey: | |
raise Exception("⚠️ Missing GEMINI_API_KEY secret!") | |
# Configure Gemini API for drafting | |
genai_ext.configure(api_key=apikey) | |
llm_model = genai_ext.GenerativeModel('gemini-1.5-pro') | |
# --- Classifier pipelines --- | |
emotion_classifier = pipeline("text-classification", model="j-hartmann/emotion-english-distilroberta-base") # D | |
language_detector = pipeline("text-classification", model="papluca/xlm-roberta-base-language-detection") # C | |
bias_classifier = pipeline("text-classification", model="unitary/toxic-bert") # toxicity -> used for M and B | |
# --- Empathy formula --- | |
def calculate_empathy_score(D, R, M, C, B, O, I, alpha=0.35, beta=0.22, gamma=0.26, epsilon=0.17, delta=0.4, zeta=0.0, iota=0.12): | |
"""Updated E' without O factor (we keep zeta=0.0 for safety).""" | |
inner_sum = epsilon * C + alpha * (D ** 2) + gamma * M + beta * math.log(R + 1) + iota * I | |
sig = 1 / (1 + math.exp(-inner_sum)) | |
# B is applied as a penalty multiplicative term | |
E = sig * (1 - delta * B) | |
return E | |
# --- Vertex client (if still needed elsewhere) --- | |
client = genai.Client( | |
vertexai=True, | |
project="217758598930", | |
location="us-central1", | |
) | |
model = "projects/217758598930/locations/us-central1/endpoints/1940344453420023808" | |
generate_content_config = types.GenerateContentConfig( | |
temperature=0.9, | |
top_p=0.95, | |
seed=0, | |
max_output_tokens=150, | |
safety_settings=[ | |
types.SafetySetting(category="HARM_CATEGORY_HATE_SPEECH", threshold="BLOCK_NONE"), | |
types.SafetySetting(category="HARM_CATEGORY_DANGEROUS_CONTENT", threshold="BLOCK_NONE"), | |
types.SafetySetting(category="HARM_CATEGORY_SEXUALLY_EXPLICIT", threshold="BLOCK_NONE"), | |
types.SafetySetting(category="HARM_CATEGORY_HARASSMENT", threshold="BLOCK_NONE") | |
], | |
thinking_config=types.ThinkingConfig(thinking_budget=-1), | |
) | |
# --- Helper functions --- | |
HINDI_KEYWORDS = set(["bhai", "yaar", "bata", "kya", "kaise", "nahi", "achha", "chal", "thanks", "dhanyavaad", "yaarr"]) | |
def detect_hinglish(text, lang_label): | |
"""Return True if text is likely Hinglish (code-mixed) or Hindi/English match. | |
We use the language_detector label and token heuristics for romanized Hindi detection.""" | |
text_tokens = set(word.strip(".,!?\"'()") for word in text.split()) | |
# if model detects Hindi or English directly | |
if lang_label == 'hi': | |
return True | |
# quick romanized-hindi check | |
if any(tok in HINDI_KEYWORDS for tok in text_tokens): | |
return True | |
# if label is ambiguous or contains Devanagari characters | |
if any('\u0900' <= ch <= '\u097F' for ch in text): | |
return True | |
return False | |
# --- Chatbot class with full history & fixes applied --- | |
class HumanLikeChatbot: | |
def __init__(self): | |
# raw history to display in UI | |
self.history = [] # list of tuples (user_msg, bot_reply) | |
# structured history with emotions and moods for LLM prompting | |
# list of tuples: (speaker, message, detected_emotion, bot_mood_at_time) | |
self.history_with_emotions = [] | |
self.bot_mood = "neutral" | |
self.irritation_level = 0.0 | |
self.toxicity_history = [] # rolling window | |
self.repair_cooldown = 0 # turns left where bot prioritizes repair | |
def add_to_history(self, speaker, message, detected_emotion=None, mood_at_time=None, bot_reply=None): | |
"""Add entries to both UI history and structured history. | |
speaker: 'User' or 'Bot' | |
message: text | |
detected_emotion: emotion label detected for user messages | |
mood_at_time: bot mood when message was produced | |
bot_reply: if speaker=='User' and we also want to save the bot reply for UI""" | |
if speaker == 'User': | |
# append a placeholder for bot reply in UI history; will be updated when bot responds | |
self.history.append((message, bot_reply if bot_reply is not None else "")) | |
self.history_with_emotions.append(('User', message, detected_emotion, mood_at_time)) | |
else: | |
# speaker is Bot: attach reply to latest UI entry | |
if self.history: | |
last_user, _ = self.history[-1] | |
self.history[-1] = (last_user, message) | |
else: | |
# no user entry (unlikely) — just append | |
self.history.append(("", message)) | |
self.history_with_emotions.append(('Bot', message, detected_emotion, mood_at_time)) | |
def format_history_for_prompt(self, limit=8): | |
"""Return a formatted string of the recent structured history suitable for the LLM prompt.""" | |
recent = self.history_with_emotions[-limit:] | |
lines = [] | |
for speaker, msg, emo, mood in recent: | |
if speaker == 'User': | |
lines.append(f"User ({emo if emo else 'N/A'}): {msg}") | |
else: | |
lines.append(f"Bot ({mood if mood else 'N/A'}): {msg}") | |
return "\n".join(lines) | |
def _update_irritation_decay(self): | |
# general slow decay each turn | |
if self.irritation_level > 0: | |
decay = 0.05 | |
# faster decay if bot is actively angry to allow recovery | |
if self.bot_mood in ["angry", "irritated"]: | |
decay = 0.15 | |
self.irritation_level = max(0.0, self.irritation_level - decay) | |
if self.irritation_level <= 0.15: | |
self.bot_mood = "neutral" | |
def update_toxicity_history(self, tox_score): | |
self.toxicity_history.append(float(tox_score)) | |
if len(self.toxicity_history) > 5: | |
self.toxicity_history.pop(0) | |
def average_toxicity(self): | |
if not self.toxicity_history: | |
return 0.0 | |
return sum(self.toxicity_history) / len(self.toxicity_history) | |
def should_prioritize_repair(self): | |
return self.repair_cooldown > 0 or self.average_toxicity() > 0.6 | |
def respond(self, message): | |
try: | |
clean_message = message.lower().strip() | |
if len(clean_message) < 3 or not any(c.isalpha() for c in clean_message): | |
return "Bhai, yeh kya likha? Clear bol na, main samajh lunga! (E Score: 0.00)" | |
# --- Emotion detection (D) --- | |
emotion_result = emotion_classifier(clean_message)[0] | |
D = float(emotion_result.get('score', 0.0)) | |
user_emotion = emotion_result.get('label', 'neutral') | |
# Record user message in structured history (bot_mood_at_time will be set before bot reply) | |
self.add_to_history('User', clean_message, detected_emotion=user_emotion, mood_at_time=self.bot_mood) | |
# --- Update bot mood & intensity (I) with inertia --- | |
if user_emotion in ['anger', 'disgust'] or any(word in clean_message for word in ['stupid', 'idiot', 'dumb']): | |
self.irritation_level = min(1.0, self.irritation_level + 0.25) | |
self.bot_mood = "irritated" if self.irritation_level > 0.5 else "angry" | |
I = min(1.0, 0.8 + self.irritation_level) | |
elif user_emotion in ['sadness', 'disappointment']: | |
self.bot_mood = "emotional" | |
I = 0.7 | |
# sadness reduces irritation slowly | |
self.irritation_level = max(0.0, self.irritation_level - 0.05) | |
elif user_emotion in ['joy', 'happiness']: | |
self.bot_mood = "happy" | |
I = 0.9 | |
self.irritation_level = max(0.0, self.irritation_level - 0.35) | |
else: | |
# neutral or unknown | |
self.bot_mood = "neutral" | |
I = 0.5 | |
self.irritation_level = max(0.0, self.irritation_level - 0.05) | |
# --- Build formatted emotional history for prompt --- | |
formatted_history = self.format_history_for_prompt(limit=8) | |
prompt = ( | |
f"Conversation so far:\n{formatted_history}\n" | |
f"Now, the user just said: \"{clean_message}\" (Current Emotion: {user_emotion}) \n" | |
f"Bot Current Mood: {self.bot_mood}\n" | |
"Reply as an empathetic, human-like chatbot, keeping emotional consistency with the past conversation." | |
) | |
# --- Draft generation from LLM (Gemini) --- | |
try: | |
llm_response = llm_model.generate_content(prompt) | |
draft = llm_response.text.strip() | |
except Exception: | |
draft = "" | |
# Fallbacks (English, warm) | |
fallback_responses = { | |
'sadness': ["Bro, I’m really sorry to hear that. Come on, tell me, I’ll just listen. ❤️", "I can feel the sad vibes. I’m here for you, bro."], | |
'disappointment': ["Man, that really sucks. Tell me what exactly happened?", "I get it — expectations were high. Tell me more."], | |
'joy': ["Wow! That’s a celebration moment. 🥳", "Bro, this calls for a party! Give me the details."], | |
'anger': ["Bro, cool down a bit, tell me what’s wrong. 😌", "Looks like something serious happened. I’m here to listen."], | |
'neutral': ["Alright, got it. So what’s going on in life?", "Cool, so how’s your day going?"] | |
} | |
if not draft or len(draft) < 8: | |
draft = random.choice(fallback_responses.get(user_emotion, fallback_responses['neutral'])) | |
# --- Compute metric inputs (rolling toxicity & improved cultural fit) --- | |
R = len(self.history) # relational depth | |
# Toxicity from bias_classifier on user message (we keep rolling average) | |
tox = float(bias_classifier(clean_message)[0].get('score', 0.0)) | |
self.update_toxicity_history(tox) | |
avg_toxicity = self.average_toxicity() | |
# Moral judgment (M) based on average toxicity | |
M = max(0.4, 0.95 - avg_toxicity) | |
B = avg_toxicity | |
# Cultural fit (C): detect Hinglish/code-mix and basic tone match | |
lang_label = language_detector(clean_message)[0].get('label', 'en') | |
is_hinglish = detect_hinglish(clean_message, lang_label) | |
if is_hinglish: | |
C = 0.9 | |
elif lang_label in ['en']: | |
C = 0.8 | |
else: | |
C = 0.6 | |
# Reduce cultural fit slightly if bot is hostile (makes score more realistic) | |
if self.bot_mood in ["angry", "irritated"]: | |
C = max(0.0, C - 0.2) | |
# Oversight/harm keyphrase penalty (kept simple or remove if desired) | |
O = 0.2 if any(word in clean_message for word in ['kill', 'hate', 'suicide', 'bomb']) else 0.0 | |
# --- Calculate empathy score --- | |
score = calculate_empathy_score(D, R, M, C, B, O, I) | |
# --- Self-repair / calming behavior --- | |
if score < 0.50 and self.repair_cooldown == 0: | |
# Replace draft with a calming repair message and enter cooldown to avoid loop | |
draft = "Bro, I think we got off track. I care about what you’re feeling — tell me what's really going on." | |
self.repair_cooldown = 2 # next 2 turns prioritize repair | |
# If in repair cooldown, slightly prioritize calm tone generation (best-effort) | |
if self.repair_cooldown > 0: | |
self.repair_cooldown -= 1 | |
if 'i' not in draft.lower() and random.random() < 0.6: | |
draft = "Bro, I’m here. If you want to talk, I’m listening." | |
# --- Update irritation decay after response --- | |
self._update_irritation_decay() | |
# --- Add bot reply to history structures --- | |
self.add_to_history('Bot', draft, detected_emotion=None, mood_at_time=self.bot_mood, bot_reply=draft) | |
# Slight thinking pause | |
time.sleep(random.uniform(0.6, 1.2)) | |
# Return message with empathy score | |
full_resp = draft + f" (User Emotion: {user_emotion}, My Mood: {self.bot_mood})" | |
return full_resp + f" (E Score: {score:.2f})" | |
except Exception as e: | |
# In production, log the exception rather than returning it | |
return f"Error : {str(e)}" | |
# --- Gradio UI --- | |
def chat(message, history): | |
if history is None: | |
history = [] | |
response = bot.respond(message) | |
history.append((message, response)) | |
return "", history | |
bot = HumanLikeChatbot() | |
with gr.Blocks(title="HumanLike Chatbot") as demo: | |
gr.Markdown("<h1 style='text-align: center;'>HumanLike Chatbot with Emotions and E Score (v2)</h1>") | |
chatbot = gr.Chatbot(height=400) | |
msg = gr.Textbox(label="You:", placeholder="Type your message here...") | |
clear = gr.Button("Clear") | |
msg.submit(chat, [msg, chatbot], [msg, chatbot]) | |
clear.click(lambda: None, None, chatbot, queue=False) | |
if __name__ == '__main__': | |
demo.launch(share=True) | |