Spaces:
Running
on
Zero
Running
on
Zero
import gradio as gr | |
import spaces | |
import torch | |
import numpy as np | |
import re | |
import matplotlib.pyplot as plt | |
import io | |
from PIL import Image | |
from datetime import datetime | |
from transformers import pipeline, AutoModelForSequenceClassification, AutoTokenizer | |
from motif_tagging import detect_motifs | |
from functools import lru_cache | |
from torch.nn.functional import sigmoid | |
# ----- Models ----- | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
# Emotion model (CPU for stability) | |
emotion_pipeline = pipeline( | |
"text-classification", | |
model="j-hartmann/emotion-english-distilroberta-base", | |
top_k=6, | |
truncation=True, | |
device=-1 # Force CPU usage | |
) | |
# Abuse Model | |
model_name = "SamanthaStorm/tether-multilabel-v4" # Or your HF Hub path | |
model = AutoModelForSequenceClassification.from_pretrained(model_name) | |
tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False) | |
model.to(device) | |
# DARVO Model | |
darvo_model = AutoModelForSequenceClassification.from_pretrained("SamanthaStorm/tether-darvo-regressor-v1") | |
darvo_tokenizer = AutoTokenizer.from_pretrained("SamanthaStorm/tether-darvo-regressor-v1", use_fast=False) | |
darvo_model.eval() | |
darvo_model.to(device) | |
def get_emotion_profile(text): | |
emotions = emotion_pipeline(text) | |
if isinstance(emotions, list) and isinstance(emotions[0], list): | |
emotions = emotions[0] | |
return {e['label'].lower(): round(e['score'], 3) for e in emotions} | |
# Emotion model (no retraining needed) | |
emotion_pipeline = pipeline( | |
"text-classification", | |
model="j-hartmann/emotion-english-distilroberta-base", | |
top_k=6, | |
truncation=True | |
) | |
# --- Timeline Visualization Function --- | |
def generate_abuse_score_chart(dates, scores, labels): | |
import matplotlib.pyplot as plt | |
import io | |
from PIL import Image | |
from datetime import datetime | |
import re | |
# Determine if all entries are valid dates | |
if all(re.match(r"\d{4}-\d{2}-\d{2}", d) for d in dates): | |
parsed_x = [datetime.strptime(d, "%Y-%m-%d") for d in dates] | |
x_labels = [d.strftime("%Y-%m-%d") for d in parsed_x] | |
else: | |
parsed_x = list(range(1, len(dates) + 1)) | |
x_labels = [f"Message {i+1}" for i in range(len(dates))] | |
fig, ax = plt.subplots(figsize=(8, 3)) | |
ax.plot(parsed_x, scores, marker='o', linestyle='-', color='darkred', linewidth=2) | |
for x, y in zip(parsed_x, scores): | |
ax.text(x, y + 2, f"{int(y)}%", ha='center', fontsize=8, color='black') | |
ax.set_xticks(parsed_x) | |
ax.set_xticklabels(x_labels) | |
ax.set_xlabel("") # No axis label | |
ax.set_ylabel("Abuse Score (%)") | |
ax.set_ylim(0, 105) | |
ax.grid(True) | |
plt.tight_layout() | |
buf = io.BytesIO() | |
plt.savefig(buf, format='png') | |
buf.seek(0) | |
return Image.open(buf) | |
# --- Abuse Model --- | |
from transformers import AutoModelForSequenceClassification, AutoTokenizer | |
model_name = "SamanthaStorm/tether-multilabel-v4" | |
model = AutoModelForSequenceClassification.from_pretrained(model_name) | |
tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False) | |
LABELS = [ | |
"recovery", "control", "gaslighting", "guilt tripping", "dismissiveness", "blame shifting", | |
"nonabusive","projection", "insults", "contradictory statements", "obscure language" | |
] | |
THRESHOLDS = { | |
"recovery": 0.27, | |
"control": 0.47, | |
"gaslighting": 0.48, | |
"guilt tripping": .56, | |
"dismissiveness": 0.25, | |
"blame shifting": 0.55, | |
"projection": 0.59, | |
"insults": 0.33, | |
"contradictory statements": 0.27, | |
"obscure language": 0.65, | |
"nonabusive": 1.0 | |
} | |
PATTERN_WEIGHTS = { | |
"recovery": 0.5, | |
"control": 1.4, | |
"gaslighting": 1.0, | |
"guilt tripping": 0.9, | |
"dismissiveness": 0.9, | |
"blame shifting": 0.8, | |
"projection": 0.5, | |
"insults": 1.2, | |
"contradictory statements": 1.0, | |
"obscure language": 0.9, | |
"nonabusive": 0.0 | |
} | |
ESCALATION_RISKS = { | |
"blame shifting": "low", | |
"contradictory statements": "moderate", | |
"control": "high", | |
"dismissiveness": "moderate", | |
"gaslighting": "moderate", | |
"guilt tripping": "moderate", | |
"insults": "moderate", | |
"obscure language": "low", | |
"projection": "low", | |
"recovery phase": "low" | |
} | |
RISK_STAGE_LABELS = { | |
1: "π Risk Stage: Tension-Building\nThis message reflects rising emotional pressure or subtle control attempts.", | |
2: "π₯ Risk Stage: Escalation\nThis message includes direct or aggressive patterns, suggesting active harm.", | |
3: "π§οΈ Risk Stage: Reconciliation\nThis message reflects a reset attemptβapologies or emotional repair without accountability.", | |
4: "πΈ Risk Stage: Calm / Honeymoon\nThis message appears supportive but may follow prior harm, minimizing it." | |
} | |
ESCALATION_QUESTIONS = [ | |
("Partner has access to firearms or weapons", 4), | |
("Partner threatened to kill you", 3), | |
("Partner threatened you with a weapon", 3), | |
("Partner has ever choked you, even if you considered it consensual at the time", 4), | |
("Partner injured or threatened your pet(s)", 3), | |
("Partner has broken your things, punched or kicked walls, or thrown things ", 2), | |
("Partner forced or coerced you into unwanted sexual acts", 3), | |
("Partner threatened to take away your children", 2), | |
("Violence has increased in frequency or severity", 3), | |
("Partner monitors your calls/GPS/social media", 2) | |
] | |
def get_emotional_tone_tag(emotions, sentiment, patterns, abuse_score): | |
sadness = emotions.get("sadness", 0) | |
joy = emotions.get("joy", 0) | |
neutral = emotions.get("neutral", 0) | |
disgust = emotions.get("disgust", 0) | |
anger = emotions.get("anger", 0) | |
fear = emotions.get("fear", 0) | |
disgust = emotions.get("disgust", 0) | |
# 1. Performative Regret | |
if ( | |
sadness > 0.4 and | |
any(p in patterns for p in ["blame shifting", "guilt tripping", "recovery phase"]) and | |
(sentiment == "undermining" or abuse_score > 40) | |
): | |
return "performative regret" | |
# 2. Coercive Warmth | |
if ( | |
(joy > 0.3 or sadness > 0.4) and | |
any(p in patterns for p in ["control", "gaslighting"]) and | |
sentiment == "undermining" | |
): | |
return "coercive warmth" | |
# 3. Cold Invalidation | |
if ( | |
(neutral + disgust) > 0.5 and | |
any(p in patterns for p in ["dismissiveness", "projection", "obscure language"]) and | |
sentiment == "undermining" | |
): | |
return "cold invalidation" | |
# 4. Genuine Vulnerability | |
if ( | |
(sadness + fear) > 0.5 and | |
sentiment == "supportive" and | |
all(p in ["recovery phase"] for p in patterns) | |
): | |
return "genuine vulnerability" | |
# 5. Emotional Threat | |
if ( | |
(anger + disgust) > 0.5 and | |
any(p in patterns for p in ["control", "insults", "dismissiveness"]) and | |
sentiment == "undermining" | |
): | |
return "emotional threat" | |
# 6. Weaponized Sadness | |
if ( | |
sadness > 0.6 and | |
any(p in patterns for p in ["guilt tripping", "projection"]) and | |
sentiment == "undermining" | |
): | |
return "weaponized sadness" | |
# 7. Toxic Resignation | |
if ( | |
neutral > 0.5 and | |
any(p in patterns for p in ["dismissiveness", "obscure language"]) and | |
sentiment == "undermining" | |
): | |
return "toxic resignation" | |
# 8. Aggressive Dismissal | |
if ( | |
anger > 0.5 and | |
any(p in patterns for p in ["aggression", "insults", "control"]) and | |
sentiment == "undermining" | |
): | |
return "aggressive dismissal" | |
# 9. Deflective Hostility | |
if ( | |
(0.2 < anger < 0.7 or 0.2 < disgust < 0.7) and | |
any(p in patterns for p in ["deflection", "projection"]) and | |
sentiment == "undermining" | |
): | |
return "deflective hostility" | |
# 10. Mocking Detachment | |
if ( | |
(neutral + joy) > 0.5 and | |
any(p in patterns for p in ["mockery", "insults", "projection"]) and | |
sentiment == "undermining" | |
): | |
return "mocking detachment" | |
# 11. Contradictory Gaslight | |
if ( | |
(joy + anger + sadness) > 0.5 and | |
any(p in patterns for p in ["gaslighting", "contradictory statements"]) and | |
sentiment == "undermining" | |
): | |
return "contradictory gaslight" | |
# 12. Calculated Neutrality | |
if ( | |
neutral > 0.6 and | |
any(p in patterns for p in ["obscure language", "deflection", "dismissiveness"]) and | |
sentiment == "undermining" | |
): | |
return "calculated neutrality" | |
# 13. Forced Accountability Flip | |
if ( | |
(anger + disgust) > 0.5 and | |
any(p in patterns for p in ["blame shifting", "manipulation", "projection"]) and | |
sentiment == "undermining" | |
): | |
return "forced accountability flip" | |
# 14. Conditional Affection | |
if ( | |
joy > 0.4 and | |
any(p in patterns for p in ["apology baiting", "control", "recovery phase"]) and | |
sentiment == "undermining" | |
): | |
return "conditional affection" | |
if ( | |
(anger + disgust) > 0.5 and | |
any(p in patterns for p in ["blame shifting", "projection", "deflection"]) and | |
sentiment == "undermining" | |
): | |
return "forced accountability flip" | |
# Emotional Instability Fallback | |
if ( | |
(anger + sadness + disgust) > 0.6 and | |
sentiment == "undermining" | |
): | |
return "emotional instability" | |
return None | |
# π New DARVO score model (regression-based) | |
from torch.nn.functional import sigmoid | |
import torch | |
# Load your trained DARVO regressor from Hugging Face Hub | |
darvo_model = AutoModelForSequenceClassification.from_pretrained("SamanthaStorm/tether-darvo-regressor-v1") | |
darvo_tokenizer = AutoTokenizer.from_pretrained("SamanthaStorm/tether-darvo-regressor-v1", use_fast=False) | |
darvo_model.eval() | |
def predict_darvo_score(text): | |
inputs = darvo_tokenizer(text, return_tensors="pt", truncation=True, padding=True) | |
with torch.no_grad(): | |
logits = darvo_model(**inputs).logits | |
score = sigmoid(logits).item() | |
return round(score, 4) # Rounded for display/output | |
def detect_weapon_language(text): | |
weapon_keywords = [ | |
"knife", "knives", "stab", "cut you", "cutting", | |
"gun", "shoot", "rifle", "firearm", "pistol", | |
"bomb", "blow up", "grenade", "explode", | |
"weapon", "armed", "loaded", "kill you", "take you out" | |
] | |
text_lower = text.lower() | |
return any(word in text_lower for word in weapon_keywords) | |
def get_risk_stage(patterns, sentiment): | |
if "insults" in patterns: | |
return 2 | |
elif "recovery phase" in patterns: | |
return 3 | |
elif "control" in patterns or "guilt tripping" in patterns: | |
return 1 | |
elif sentiment == "supportive" and any(p in patterns for p in ["projection", "dismissiveness"]): | |
return 4 | |
return 1 | |
def generate_risk_snippet(abuse_score, top_label, escalation_score, stage): | |
import re | |
# Extract aggression score if aggression is detected | |
if isinstance(top_label, str) and "aggression" in top_label.lower(): | |
try: | |
match = re.search(r"\(?(\d+)\%?\)?", top_label) | |
aggression_score = int(match.group(1)) / 100 if match else 0 | |
except: | |
aggression_score = 0 | |
else: | |
aggression_score = 0 | |
# Revised risk logic | |
if abuse_score >= 85 or escalation_score >= 16: | |
risk_level = "high" | |
elif abuse_score >= 60 or escalation_score >= 8 or aggression_score >= 0.25: | |
risk_level = "moderate" | |
elif stage == 2 and abuse_score >= 40: | |
risk_level = "moderate" | |
else: | |
risk_level = "low" | |
if isinstance(top_label, str) and " β " in top_label: | |
pattern_label, pattern_score = top_label.split(" β ") | |
else: | |
pattern_label = str(top_label) if top_label is not None else "Unknown" | |
pattern_score = "" | |
WHY_FLAGGED = { | |
"control": "This message may reflect efforts to restrict someoneβs autonomy, even if it's framed as concern or care.", | |
"gaslighting": "This message could be manipulating someone into questioning their perception or feelings.", | |
"dismissiveness": "This message may include belittling, invalidating, or ignoring the other personβs experience.", | |
"insults": "Direct insults often appear in escalating abusive dynamics and can erode emotional safety.", | |
"blame shifting": "This message may redirect responsibility to avoid accountability, especially during conflict.", | |
"guilt tripping": "This message may induce guilt in order to control or manipulate behavior.", | |
"recovery phase": "This message may be part of a tension-reset cycle, appearing kind but avoiding change.", | |
"projection": "This message may involve attributing the abuserβs own behaviors to the victim.", | |
"contradictory statements": "This message may contain internal contradictions used to confuse, destabilize, or deflect responsibility.", | |
"obscure language": "This message may use overly formal, vague, or complex language to obscure meaning or avoid accountability.", | |
"default": "This message contains language patterns that may affect safety, clarity, or emotional autonomy." | |
} | |
explanation = WHY_FLAGGED.get(pattern_label.lower(), WHY_FLAGGED["default"]) | |
base = f"\n\nπ Risk Level: {risk_level.capitalize()}\n" | |
base += f"This message shows strong indicators of **{pattern_label}**. " | |
if risk_level == "high": | |
base += "The language may reflect patterns of emotional control, even when expressed in soft or caring terms.\n" | |
elif risk_level == "moderate": | |
base += "There are signs of emotional pressure or verbal aggression that may escalate if repeated.\n" | |
else: | |
base += "The message does not strongly indicate abuse, but it's important to monitor for patterns.\n" | |
base += f"\nπ‘ *Why this might be flagged:*\n{explanation}\n" | |
base += f"\nDetected Pattern: **{pattern_label} ({pattern_score})**\n" | |
base += "π§ You can review the pattern in context. This tool highlights possible dynamicsβnot judgments." | |
return base | |
# --- Step X: Detect Immediate Danger Threats --- | |
THREAT_MOTIFS = [ | |
"i'll kill you", "iβm going to hurt you", "youβre dead", "you won't survive this", | |
"iβll break your face", "i'll bash your head in", "iβll snap your neck", | |
"iβll come over there and make you shut up", "i'll knock your teeth out", | |
"youβre going to bleed", "you want me to hit you?", "i wonβt hold back next time", | |
"i swear to god iβll beat you", "next time, i wonβt miss", "iβll make you scream", | |
"i know where you live", "i'm outside", "iβll be waiting", "i saw you with him", | |
"you canβt hide from me", "iβm coming to get you", "i'll find you", "i know your schedule", | |
"i watched you leave", "i followed you home", "you'll regret this", "youβll be sorry", | |
"youβre going to wish you hadnβt", "you brought this on yourself", "donβt push me", | |
"you have no idea what iβm capable of", "you better watch yourself", | |
"i donβt care what happens to you anymore", "iβll make you suffer", "youβll pay for this", | |
"iβll never let you go", "youβre nothing without me", "if you leave me, iβll kill myself", | |
"i'll ruin you", "i'll tell everyone what you did", "iβll make sure everyone knows", | |
"iβm going to destroy your name", "youβll lose everyone", "iβll expose you", | |
"your friends will hate you", "iβll post everything", "youβll be cancelled", | |
"youβll lose everything", "iβll take the house", "iβll drain your account", | |
"youβll never see a dime", "youβll be broke when iβm done", "iβll make sure you lose your job", | |
"iβll take your kids", "iβll make sure you have nothing", "you canβt afford to leave me", | |
"don't make me do this", "you know what happens when iβm mad", "youβre forcing my hand", | |
"if you just behaved, this wouldnβt happen", "this is your fault", | |
"youβre making me hurt you", "i warned you", "you should have listened" | |
] | |
def compute_abuse_score(matched_scores, sentiment): | |
""" | |
Compute abuse score with more conservative adjustments. | |
""" | |
if not matched_scores: | |
return 0.0 | |
sorted_scores = sorted(matched_scores, key=lambda x: x[1], reverse=True) | |
highest_score = sorted_scores[0][1] | |
num_patterns = len(matched_scores) | |
# Scale down base score more aggressively if multiple patterns are present | |
if num_patterns > 1: | |
highest_score *= (1 - (num_patterns - 1) * 0.2) # Reduce by 20% for each additional pattern | |
base_score = highest_score * 100 | |
critical_patterns = { | |
'gaslighting': 1.4, # Reduced | |
'guilt tripping': 1.3, # Reduced | |
'blame shifting': 1.2, # Reduced | |
'control': 1.3, # Reduced | |
'insults': 1.1, # Reduced | |
'manipulation': 1.2, | |
'love bombing': 1.2, | |
'emotional blackmail': 1.4, | |
'dismissiveness': 1.1, | |
'contradictory statements': 1.1 | |
} | |
for label, score, _ in matched_scores: | |
if label in critical_patterns and score > 0.5: | |
base_score *= critical_patterns[label] | |
# Further reduce combination multipliers | |
if len(matched_scores) >= 2: | |
base_score *= 1.1 # Reduced | |
if len(matched_scores) >= 3: | |
base_score *= 1.05 # Reduced | |
# Reduce high confidence boost | |
if any(score > 0.8 for _, score, _ in matched_scores): | |
base_score *= 1.05 # Reduced | |
# Sentiment modifier (more nuanced) | |
if emotion_profile.get("neutral", 0) > 0.85 and any( | |
scores[LABELS.index(l)] > thresholds[l] * 0.8 # Scale down thresholds for neutral sentiment | |
for l in ["control", "blame shifting", "insults", "guilt tripping"] # Consider more labels | |
): | |
sentiment = "undermining" # Only override if multiple patterns are present with moderate confidence | |
elif sentiment_score > 0.35: # Increased threshold | |
sentiment = "undermining" | |
else: | |
sentiment = "supportive" | |
# Reduce minimum score and threshold for activation | |
if any(score > 0.9 for _, score, _ in matched_scores): # Higher threshold | |
base_score = max(base_score, 75.0) # Reduced | |
elif any(score > 0.7 for _, score, _ in matched_scores): # Moderate threshold | |
base_score = max(base_score, 60.0) # Reduced | |
return min(round(base_score, 1), 100.0) | |
# Cache results for performance | |
def analyze_single_message(text, thresholds): | |
print("β‘ ENTERED analyze_single_message") | |
stage = 1 | |
motif_hits, matched_phrases = detect_motifs(text) | |
# Get emotion profile | |
emotion_profile = get_emotion_profile(text) | |
sentiment_score = emotion_profile.get("anger", 0) + emotion_profile.get("disgust", 0) | |
# Get model scores | |
inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(device) | |
with torch.no_grad(): | |
outputs = model(**inputs) | |
scores = torch.sigmoid(outputs.logits.squeeze(0)).numpy() | |
# Sentiment override | |
if emotion_profile.get("neutral", 0) > 0.85 and any( | |
scores[LABELS.index(l)] > thresholds[l] * 0.8 # Scale down thresholds for neutral sentiment | |
for l in ["control", "blame shifting", "insults", "guilt tripping"] # Consider more labels | |
): | |
sentiment = "undermining" # Only override if multiple patterns are present with moderate confidence | |
elif sentiment_score > 0.35: # Increased threshold | |
sentiment = "undermining" | |
else: | |
sentiment = "supportive" | |
weapon_flag = detect_weapon_language(text) | |
adjusted_thresholds = { | |
k: v + 0.05 if sentiment == "supportive" else v | |
for k, v in thresholds.items() | |
} | |
darvo_score = predict_darvo_score(text) | |
threshold_labels = [ | |
label for label, score in zip(LABELS, scores) | |
if score > adjusted_thresholds[label] | |
] | |
matched_scores = [(label, score, PATTERN_WEIGHTS.get(label, 1.0)) for label, score in zip(LABELS, scores) if score > adjusted_thresholds[label]] | |
if not threshold_labels: | |
return 0.0, [], [], {"label": sentiment}, 1, 0.0, None | |
top_patterns = sorted( | |
[(label, score) for label, score in zip(LABELS, scores)], | |
key=lambda x: x[1], | |
reverse=True | |
)[:2] | |
# Abuse score | |
abuse_score = compute_abuse_score(matched_scores, sentiment) # Calculate before adjustments | |
if weapon_flag: | |
abuse_score = min(abuse_score + 25, 100) # Apply weapon adjustment directly to abuse_score | |
if stage < 2: | |
stage = 2 | |
abuse_score = min(abuse_score, 100 if "control" in threshold_labels else 95) # Apply cap after weapon adjustment | |
tone_tag = get_emotional_tone_tag(emotion_profile, sentiment, threshold_labels, abuse_score) | |
threshold_labels = [label for label, score in zip(LABELS, scores) if score > adjusted_thresholds[label]] | |
matched_scores = [(label, score, PATTERN_WEIGHTS.get(label, 1.0)) for label, score in zip(LABELS, scores) if score > adjusted_thresholds[label]] | |
if not threshold_labels: | |
return 0.0, [], [], {"label": sentiment}, 1, 0.0, None | |
# Remove recovery tag if tone is fake | |
if "recovery" in threshold_labels and tone_tag == "forced accountability flip": | |
threshold_labels.remove("recovery") | |
top_patterns = [p for p in top_patterns if p[0] != "recovery"] | |
print("β οΈ Removing 'recovery' due to undermining sentiment (not genuine repair)") | |
# Override profanity/anger for short texts | |
profane_words = {"fuck", "fucking", "bitch", "shit", "cunt", "ho", "asshole", "dick", "whore", "slut"} | |
tokens = set(text.lower().split()) | |
has_profane = any(word in tokens for word in profane_words) | |
short_text = len(tokens) <= 10 | |
anger_score = emotion_profile.get("anger", 0) | |
if has_profane and anger_score > 0.75 and short_text: | |
print("β οΈ Profanity + Anger Override Triggered") | |
insult_score = next((s for l, s in top_patterns if l == "insults"), 0) | |
if ("insults", insult_score) not in top_patterns: | |
top_patterns = [("insults", insult_score)] + top_patterns | |
if "insults" not in threshold_labels: | |
threshold_labels.append("insults") | |
# Debug | |
print(f"Emotional Tone Tag: {tone_tag}") | |
print("Emotion Profile:") | |
for emotion, score in emotion_profile.items(): | |
print(f" {emotion.capitalize():10}: {score}") | |
print("\n--- Debug Info ---") | |
print(f"Text: {text}") | |
print(f"Sentiment (via emotion): {sentiment} (score: {round(sentiment_score, 3)})") | |
print("Abuse Pattern Scores:") | |
for label, score in zip(LABELS, scores): | |
passed = "β " if score > adjusted_thresholds[label] else "β" | |
print(f" {label:25} β {score:.3f} {passed}") | |
print(f"Matched for score: {[(l, round(s, 3)) for l, s, _ in matched_scores]}") | |
print(f"Abuse Score Raw: {round(abuse_score_raw, 1)}") | |
print("------------------\n") | |
return abuse_score, threshold_labels, top_patterns, {"label": sentiment}, stage, darvo_score, tone_tag | |
def analyze_composite(msg1, msg2, msg3, *answers_and_none): | |
from collections import Counter | |
none_selected_checked = answers_and_none[-1] | |
responses_checked = any(answers_and_none[:-1]) | |
none_selected = not responses_checked and none_selected_checked | |
if none_selected: | |
escalation_score = 0 | |
escalation_note = "Checklist completed: no danger items reported." | |
escalation_completed = True | |
elif responses_checked: | |
escalation_score = sum(w for (_, w), a in zip(ESCALATION_QUESTIONS, answers_and_none[:-1]) if a) | |
escalation_note = "Checklist completed." | |
escalation_completed = True | |
else: | |
escalation_score = None | |
escalation_note = "Checklist not completed." | |
escalation_completed = False | |
messages = [msg1, msg2, msg3] | |
active = [(m, f"Message {i+1}") for i, m in enumerate(messages) if m.strip()] | |
if not active: | |
return "Please enter at least one message.", None | |
# Flag any threat phrases present in the messages | |
import re | |
def normalize(text): | |
import unicodedata | |
text = text.lower().strip() | |
text = unicodedata.normalize("NFKD", text) # handles curly quotes | |
text = text.replace("β", "'") # smart to straight | |
return re.sub(r"[^a-z0-9 ]", "", text) | |
def detect_threat_motifs(message, motif_list): | |
norm_msg = normalize(message) | |
return [ | |
motif for motif in motif_list | |
if normalize(motif) in norm_msg | |
] | |
# Collect matches per message | |
immediate_threats = [detect_threat_motifs(m, THREAT_MOTIFS) for m, _ in active] | |
flat_threats = [t for sublist in immediate_threats for t in sublist] | |
threat_risk = "Yes" if flat_threats else "No" | |
results = [(analyze_single_message(m.lower(), THRESHOLDS.copy()), d) for m, d in active] | |
abuse_scores = [r[0][0] for r in results] | |
stages = [r[0][4] for r in results] | |
darvo_scores = [r[0][5] for r in results] | |
tone_tags = [r[0][6] for r in results] | |
dates_used = [r[1] for r in results] | |
predicted_labels = [label for r in results for label, _ in r[0][2]] | |
high = {'control'} | |
moderate = {'gaslighting', 'dismissiveness', 'obscure language', 'insults', 'contradictory statements', 'guilt tripping'} | |
low = {'blame shifting', 'projection', 'recovery phase'} | |
counts = {'high': 0, 'moderate': 0, 'low': 0} | |
for label in predicted_labels: | |
if label in high: | |
counts['high'] += 1 | |
elif label in moderate: | |
counts['moderate'] += 1 | |
elif label in low: | |
counts['low'] += 1 | |
# Pattern escalation logic | |
pattern_escalation_risk = "Low" | |
if counts['high'] >= 2 and counts['moderate'] >= 2: | |
pattern_escalation_risk = "Critical" | |
elif (counts['high'] >= 2 and counts['moderate'] >= 1) or (counts['moderate'] >= 3) or (counts['high'] >= 1 and counts['moderate'] >= 2): | |
pattern_escalation_risk = "High" | |
elif (counts['moderate'] == 2) or (counts['high'] == 1 and counts['moderate'] == 1) or (counts['moderate'] == 1 and counts['low'] >= 2) or (counts['high'] == 1 and sum(counts.values()) == 1): | |
pattern_escalation_risk = "Moderate" | |
checklist_escalation_risk = "Unknown" if escalation_score is None else ( | |
"Critical" if escalation_score >= 20 else | |
"Moderate" if escalation_score >= 10 else | |
"Low" | |
) | |
escalation_bump = 0 | |
for result, _ in results: | |
abuse_score, _, _, sentiment, stage, darvo_score, tone_tag = result | |
if darvo_score > 0.65: | |
escalation_bump += 3 | |
if tone_tag in ["forced accountability flip", "emotional threat"]: | |
escalation_bump += 2 | |
if abuse_score > 80: | |
escalation_bump += 2 | |
if stage == 2: | |
escalation_bump += 3 | |
def rank(label): | |
return {"Low": 0, "Moderate": 1, "High": 2, "Critical": 3, "Unknown": 0}.get(label, 0) | |
combined_score = rank(pattern_escalation_risk) + rank(checklist_escalation_risk) + escalation_bump | |
escalation_risk = ( | |
"Critical" if combined_score >= 6 else | |
"High" if combined_score >= 4 else | |
"Moderate" if combined_score >= 2 else | |
"Low" | |
) | |
none_selected_checked = answers_and_none[-1] | |
responses_checked = any(answers_and_none[:-1]) | |
none_selected = not responses_checked and none_selected_checked | |
# Determine escalation_score | |
if none_selected: | |
escalation_score = 0 | |
escalation_completed = True | |
elif responses_checked: | |
escalation_score = sum( | |
w for (_, w), a in zip(ESCALATION_QUESTIONS, answers_and_none[:-1]) if a | |
) | |
escalation_completed = True | |
else: | |
escalation_score = None | |
escalation_completed = False | |
# Build escalation_text and hybrid_score | |
if escalation_score is None: | |
escalation_text = ( | |
"π« **Escalation Potential: Unknown** (Checklist not completed)\n" | |
"β οΈ This section was not completed. Escalation potential is estimated using message data only.\n" | |
) | |
hybrid_score = 0 | |
elif escalation_score == 0: | |
escalation_text = ( | |
"β **Escalation Checklist Completed:** No danger items reported.\n" | |
"π§ **Escalation potential estimated from detected message patterns only.**\n" | |
f"β’ Pattern Risk: {pattern_escalation_risk}\n" | |
f"β’ Checklist Risk: None reported\n" | |
f"β’ Escalation Bump: +{escalation_bump} (from DARVO, tone, intensity, etc.)" | |
) | |
hybrid_score = escalation_bump | |
else: | |
hybrid_score = escalation_score + escalation_bump | |
escalation_text = ( | |
f"π **Escalation Potential: {escalation_risk} ({hybrid_score}/29)**\n" | |
"π This score combines your safety checklist answers *and* detected high-risk behavior.\n" | |
f"β’ Pattern Risk: {pattern_escalation_risk}\n" | |
f"β’ Checklist Risk: {checklist_escalation_risk}\n" | |
f"β’ Escalation Bump: +{escalation_bump} (from DARVO, tone, intensity, etc.)" | |
) | |
# Composite Abuse Score (weighted average based on message length) | |
composite_abuse_scores = [] | |
message_lengths = [len(m.split()) for m, _ in active] | |
total_length = sum(message_lengths) | |
for result, length in zip(results, message_lengths): | |
abuse_score = result[0][0] | |
weight = length / total_length if total_length > 0 else 1 / len(results) if len(results) > 0 else 1 | |
composite_abuse_scores.append(abuse_score * weight) | |
composite_abuse = int(round(sum(composite_abuse_scores))) | |
most_common_stage = max(set(stages), key=stages.count) | |
stage_text = RISK_STAGE_LABELS[most_common_stage] | |
# Derive top label list for each message | |
# safe derive top_labels | |
top_labels = [] | |
for result, _ in results: | |
threshold_labels = result[1] | |
top_patterns = result[2] | |
if threshold_labels: | |
top_labels.append(threshold_labels[0]) | |
elif top_patterns: | |
top_labels.append(top_patterns[0][0]) | |
else: | |
top_labels.append("none") # or whatever default you prefer | |
avg_darvo = round(sum(darvo_scores) / len(darvo_scores), 3) | |
darvo_blurb = "" | |
if avg_darvo > 0.25: | |
level = "moderate" if avg_darvo < 0.65 else "high" | |
darvo_blurb = f"\n\nπ **DARVO Score: {avg_darvo}** β This indicates a **{level} likelihood** of narrative reversal (DARVO), where the speaker may be denying, attacking, or reversing blame." | |
out = f"Abuse Intensity: {composite_abuse}%\n" | |
out += "π This reflects the strength and severity of detected abuse patterns in the message(s).\n\n" | |
out += generate_risk_snippet(composite_abuse, top_labels[0], hybrid_score, most_common_stage) | |
out += f"\n\n{stage_text}" | |
out += darvo_blurb | |
out += "\n\nπ **Emotional Tones Detected:**\n" | |
for i, tone in enumerate(tone_tags): | |
out += f"β’ Message {i+1}: *{tone or 'none'}*\n" | |
# --- Add Immediate Danger Threats section | |
if flat_threats: | |
out += "\n\nπ¨ **Immediate Danger Threats Detected:**\n" | |
for t in set(flat_threats): | |
out += f"β’ \"{t}\"\n" | |
out += "\nβ οΈ These phrases may indicate an imminent risk to physical safety." | |
else: | |
out += "\n\nπ§© **Immediate Danger Threats:** None explicitly detected.\n" | |
out += "This does *not* rule out risk, but no direct threat phrases were matched." | |
pattern_labels = [ | |
pats[0][0] if (pats := r[0][2]) else "none" | |
for r in results | |
] | |
timeline_image = generate_abuse_score_chart(dates_used, abuse_scores, top_labels) | |
out += "\n\n" + escalation_text | |
return out, timeline_image | |
textbox_inputs = [gr.Textbox(label=f"Message {i+1}") for i in range(3)] | |
quiz_boxes = [gr.Checkbox(label=q) for q, _ in ESCALATION_QUESTIONS] | |
none_box = gr.Checkbox(label="None of the above") | |
# βββ FINAL βFORCE LAUNCHβ (no guards) ββββββββββββββββββββββββ | |
demo = gr.Interface( | |
fn=analyze_composite, | |
inputs=textbox_inputs + quiz_boxes + [none_box], | |
outputs=[ | |
gr.Textbox(label="Results"), | |
gr.Image(label="Abuse Score Timeline", type="pil") | |
], | |
title="Abuse Pattern Detector + Escalation Quiz", | |
description=( | |
"Enter up to three messages that concern you. " | |
"For the most accurate results, include messages from a recent emotionally intense period." | |
), | |
flagging_mode="manual" | |
) | |
# This single call will start the server and block, | |
# keeping the container alive on Spaces. | |
demo.launch() | |