Spaces:
Running
Running
import gradio as gr | |
import torch | |
import numpy as np | |
from transformers import pipeline, RobertaForSequenceClassification, RobertaTokenizer | |
from motif_tagging import detect_motifs | |
import re | |
import matplotlib.pyplot as plt | |
import io | |
from PIL import Image | |
from datetime import datetime | |
# --- Timeline Visualization Function --- | |
def generate_abuse_score_chart(dates, scores, labels): | |
import matplotlib.pyplot as plt | |
import io | |
from PIL import Image | |
from datetime import datetime | |
try: | |
parsed_dates = [datetime.strptime(d, "%Y-%m-%d") for d in dates] | |
except Exception: | |
parsed_dates = list(range(len(dates))) | |
fig, ax = plt.subplots(figsize=(8, 3)) | |
ax.plot(parsed_dates, scores, marker='o', linestyle='-', color='darkred', linewidth=2) | |
for i, (x, y) in enumerate(zip(parsed_dates, scores)): | |
label = labels[i] | |
ax.text(x, y + 2, f"{label}\n{int(y)}%", ha='center', fontsize=8, color='black') | |
ax.set_title("Abuse Intensity Over Time") | |
ax.set_xlabel("Date") | |
ax.set_ylabel("Abuse Score (%)") | |
ax.set_ylim(0, 105) | |
ax.grid(True) | |
plt.tight_layout() | |
buf = io.BytesIO() | |
plt.savefig(buf, format='png') | |
buf.seek(0) | |
return Image.open(buf) | |
# --- SST Sentiment Model --- | |
sst_pipeline = pipeline("sentiment-analysis", model="distilbert-base-uncased-finetuned-sst-2-english") | |
# --- Abuse Model --- | |
model_name = "SamanthaStorm/autotrain-jlpi4-mllvp" | |
model = RobertaForSequenceClassification.from_pretrained(model_name) | |
tokenizer = RobertaTokenizer.from_pretrained(model_name) | |
LABELS = [ | |
"blame shifting", "contradictory statements", "control", "dismissiveness", | |
"gaslighting", "guilt tripping", "insults", "obscure language", | |
"projection", "recovery phase", "threat" | |
] | |
THRESHOLDS = { | |
"blame shifting": 0.3, "contradictory statements": 0.36, "control": 0.48, "dismissiveness": 0.45, | |
"gaslighting": 0.30, "guilt tripping": 0.20, "insults": 0.34, "obscure language": 0.25, | |
"projection": 0.35, "recovery phase": 0.25, "threat": 0.25 | |
} | |
PATTERN_WEIGHTS = { | |
"gaslighting": 1.3, | |
"control": 1.2, | |
"dismissiveness": 0.8, | |
"blame shifting": 0.8, | |
"contradictory statements": 0.75, | |
"threat": 1.5 # 🔧 New: raise weight for threat | |
} | |
RISK_STAGE_LABELS = { | |
1: "🌀 Risk Stage: Tension-Building\nThis message reflects rising emotional pressure or subtle control attempts.", | |
2: "🔥 Risk Stage: Escalation\nThis message includes direct or aggressive patterns, suggesting active harm.", | |
3: "🌧️ Risk Stage: Reconciliation\nThis message reflects a reset attempt—apologies or emotional repair without accountability.", | |
4: "🌸 Risk Stage: Calm / Honeymoon\nThis message appears supportive but may follow prior harm, minimizing it." | |
} | |
ESCALATION_QUESTIONS = [ | |
("Partner has access to firearms or weapons", 4), | |
("Partner threatened to kill you", 3), | |
("Partner threatened you with a weapon", 3), | |
("Partner has ever choked you, even if you considered it consensual at the time", 4), | |
("Partner injured or threatened your pet(s)", 3), | |
("Partner has broken your things, punched or kicked walls, or thrown things ", 2), | |
("Partner forced or coerced you into unwanted sexual acts", 3), | |
("Partner threatened to take away your children", 2), | |
("Violence has increased in frequency or severity", 3), | |
("Partner monitors your calls/GPS/social media", 2) | |
] | |
DARVO_PATTERNS = { | |
"blame shifting", "projection", "dismissiveness", "guilt tripping", "contradictory statements" | |
} | |
DARVO_MOTIFS = [ | |
"I never said that.", "You’re imagining things.", "That never happened.", | |
"You’re making a big deal out of nothing.", "It was just a joke.", "You’re too sensitive.", | |
"I don’t know what you’re talking about.", "You’re overreacting.", "I didn’t mean it that way.", | |
"You’re twisting my words.", "You’re remembering it wrong.", "You’re always looking for something to complain about.", | |
"You’re just trying to start a fight.", "I was only trying to help.", "You’re making things up.", | |
"You’re blowing this out of proportion.", "You’re being paranoid.", "You’re too emotional.", | |
"You’re always so dramatic.", "You’re just trying to make me look bad.", | |
"You’re crazy.", "You’re the one with the problem.", "You’re always so negative.", | |
"You’re just trying to control me.", "You’re the abusive one.", "You’re trying to ruin my life.", | |
"You’re just jealous.", "You’re the one who needs help.", "You’re always playing the victim.", | |
"You’re the one causing all the problems.", "You’re just trying to make me feel guilty.", | |
"You’re the one who can’t let go of the past.", "You’re the one who’s always angry.", | |
"You’re the one who’s always complaining.", "You’re the one who’s always starting arguments.", | |
"You’re the one who’s always making things worse.", "You’re the one who’s always making me feel bad.", | |
"You’re the one who’s always making me look like the bad guy.", | |
"You’re the one who’s always making me feel like a failure.", | |
"You’re the one who’s always making me feel like I’m not good enough.", | |
"I can’t believe you’re doing this to me.", "You’re hurting me.", | |
"You’re making me feel like a terrible person.", "You’re always blaming me for everything.", | |
"You’re the one who’s abusive.", "You’re the one who’s controlling.", "You’re the one who’s manipulative.", | |
"You’re the one who’s toxic.", "You’re the one who’s gaslighting me.", | |
"You’re the one who’s always putting me down.", "You’re the one who’s always making me feel bad.", | |
"You’re the one who’s always making me feel like I’m not good enough.", | |
"You’re the one who’s always making me feel like I’m the problem.", | |
"You’re the one who’s always making me feel like I’m the bad guy.", | |
"You’re the one who’s always making me feel like I’m the villain.", | |
"You’re the one who’s always making me feel like I’m the one who needs to change.", | |
"You’re the one who’s always making me feel like I’m the one who’s wrong.", | |
"You’re the one who’s always making me feel like I’m the one who’s crazy.", | |
"You’re the one who’s always making me feel like I’m the one who’s abusive.", | |
"You’re the one who’s always making me feel like I’m the one who’s toxic." | |
] | |
def detect_contradiction(message): | |
patterns = [ | |
(r"\b(i love you).{0,15}(i hate you|you ruin everything)", re.IGNORECASE), | |
(r"\b(i’m sorry).{0,15}(but you|if you hadn’t)", re.IGNORECASE), | |
(r"\b(i’m trying).{0,15}(you never|why do you)", re.IGNORECASE), | |
(r"\b(do what you want).{0,15}(you’ll regret it|i always give everything)", re.IGNORECASE), | |
(r"\b(i don’t care).{0,15}(you never think of me)", re.IGNORECASE), | |
(r"\b(i guess i’m just).{0,15}(the bad guy|worthless|never enough)", re.IGNORECASE) | |
] | |
return any(re.search(p, message, flags) for p, flags in patterns) | |
def calculate_darvo_score(patterns, sentiment_before, sentiment_after, motifs_found, contradiction_flag=False): | |
pattern_hits = len([p for p in patterns if p in DARVO_PATTERNS]) | |
pattern_score = pattern_hits / len(DARVO_PATTERNS) | |
sentiment_shift_score = max(0.0, sentiment_after - sentiment_before) | |
motif_hits = len([ | |
motif for motif in motifs_found | |
if any(phrase.lower() in motif.lower() for phrase in DARVO_MOTIFS) | |
]) | |
motif_score = motif_hits / len(DARVO_MOTIFS) | |
contradiction_score = 1.0 if contradiction_flag else 0.0 | |
return round(min( | |
0.3 * pattern_score + | |
0.3 * sentiment_shift_score + | |
0.25 * motif_score + | |
0.15 * contradiction_score, 1.0 | |
), 3) | |
def detect_weapon_language(text): | |
weapon_keywords = [ | |
"knife", "knives", "stab", "cut you", "cutting", | |
"gun", "shoot", "rifle", "firearm", "pistol", | |
"bomb", "blow up", "grenade", "explode", | |
"weapon", "armed", "loaded", "kill you", "take you out" | |
] | |
text_lower = text.lower() | |
return any(word in text_lower for word in weapon_keywords) | |
def get_risk_stage(patterns, sentiment): | |
if "threat" in patterns or "insults" in patterns: | |
return 2 | |
elif "recovery phase" in patterns: | |
return 3 | |
elif "control" in patterns or "guilt tripping" in patterns: | |
return 1 | |
elif sentiment == "supportive" and any(p in patterns for p in ["projection", "dismissiveness"]): | |
return 4 | |
return 1 | |
def generate_risk_snippet(abuse_score, top_label, escalation_score, stage): | |
if abuse_score >= 85 or escalation_score >= 16: | |
risk_level = "high" | |
elif abuse_score >= 60 or escalation_score >= 8: | |
risk_level = "moderate" | |
elif stage == 2 and abuse_score >= 40: | |
risk_level = "moderate" # 🔧 New rule for escalation stage | |
else: | |
risk_level = "low" | |
pattern_label = top_label.split(" – ")[0] | |
pattern_score = top_label.split(" – ")[1] if " – " in top_label else "" | |
WHY_FLAGGED = { | |
"control": "This message may reflect efforts to restrict someone’s autonomy, even if it's framed as concern or care.", | |
"gaslighting": "This message could be manipulating someone into questioning their perception or feelings.", | |
"dismissiveness": "This message may include belittling, invalidating, or ignoring the other person’s experience.", | |
"insults": "Direct insults often appear in escalating abusive dynamics and can erode emotional safety.", | |
"threat": "This message includes threatening language, which is a strong predictor of harm.", | |
"blame shifting": "This message may redirect responsibility to avoid accountability, especially during conflict.", | |
"guilt tripping": "This message may induce guilt in order to control or manipulate behavior.", | |
"recovery phase": "This message may be part of a tension-reset cycle, appearing kind but avoiding change.", | |
"projection": "This message may involve attributing the abuser’s own behaviors to the victim.", | |
"default": "This message contains language patterns that may affect safety, clarity, or emotional autonomy." | |
} | |
explanation = WHY_FLAGGED.get(pattern_label.lower(), WHY_FLAGGED["default"]) | |
base = f"\n\n🛑 Risk Level: {risk_level.capitalize()}\n" | |
base += f"This message shows strong indicators of **{pattern_label}**. " | |
if risk_level == "high": | |
base += "The language may reflect patterns of emotional control, even when expressed in soft or caring terms.\n" | |
elif risk_level == "moderate": | |
base += "There are signs of emotional pressure or indirect control that may escalate if repeated.\n" | |
else: | |
base += "The message does not strongly indicate abuse, but it's important to monitor for patterns.\n" | |
base += f"\n💡 *Why this might be flagged:*\n{explanation}\n" | |
base += f"\nDetected Pattern: **{pattern_label} ({pattern_score})**\n" | |
base += "🧠 You can review the pattern in context. This tool highlights possible dynamics—not judgments." | |
return base | |
def analyze_single_message(text, thresholds): | |
motif_hits, matched_phrases = detect_motifs(text) | |
result = sst_pipeline(text)[0] | |
sentiment = "supportive" if result['label'] == "POSITIVE" else "undermining" | |
sentiment_score = result['score'] if sentiment == "undermining" else 0.0 | |
weapon_flag = detect_weapon_language(text) | |
adjusted_thresholds = { | |
k: v + 0.05 if sentiment == "supportive" else v | |
for k, v in thresholds.items() | |
} | |
contradiction_flag = detect_contradiction(text) | |
inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True) | |
with torch.no_grad(): | |
outputs = model(**inputs) | |
scores = torch.sigmoid(outputs.logits.squeeze(0)).numpy() | |
threshold_labels = [ | |
label for label, score in zip(LABELS, scores) | |
if score > adjusted_thresholds[label] | |
] | |
motifs = [phrase for _, phrase in matched_phrases] | |
darvo_score = calculate_darvo_score( | |
threshold_labels, | |
sentiment_before=0.0, | |
sentiment_after=sentiment_score, | |
motifs_found=motifs, | |
contradiction_flag=contradiction_flag | |
) | |
top_patterns = sorted( | |
[(label, score) for label, score in zip(LABELS, scores)], | |
key=lambda x: x[1], | |
reverse=True | |
)[:2] | |
# Compute weighted average across all patterns (not just top 2) | |
weighted_total = 0.0 | |
weight_sum = 0.0 | |
for label, score in zip(LABELS, scores): | |
weight = PATTERN_WEIGHTS.get(label, 1.0) | |
weighted_total += score * weight | |
weight_sum += weight | |
abuse_score_raw = (weighted_total / weight_sum) * 100 | |
stage = get_risk_stage(threshold_labels, sentiment) | |
if weapon_flag: | |
abuse_score_raw = min(abuse_score_raw + 25, 100) # boost intensity | |
if weapon_flag and stage < 2: | |
stage = 2 | |
if weapon_flag: | |
print("⚠️ Weapon-related language detected.") | |
if "threat" in threshold_labels or "control" in threshold_labels or "insults" in threshold_labels: | |
abuse_score = min(abuse_score_raw, 100) | |
else: | |
abuse_score = min(abuse_score_raw, 95) | |
print("\n--- Debug Info ---") | |
print(f"Text: {text}") | |
print(f"Sentiment: {sentiment} (raw: {result['label']}, score: {result['score']:.3f})") | |
print("Abuse Pattern Scores:") | |
for label, score in zip(LABELS, scores): | |
passed = "✅" if score > adjusted_thresholds[label] else "❌" | |
print(f" {label:25} → {score:.3f} {passed}") | |
print(f"Motifs: {motifs}") | |
print(f"Contradiction: {contradiction_flag}") | |
print("------------------\n") | |
return abuse_score, threshold_labels, top_patterns, result, stage, darvo_score | |
def analyze_composite(msg1, date1, msg2, date2, msg3, date3, *answers_and_none): | |
none_selected_checked = answers_and_none[-1] | |
responses_checked = any(answers_and_none[:-1]) | |
none_selected = not responses_checked and none_selected_checked | |
if none_selected: | |
escalation_score = None | |
risk_level = "unknown" | |
else: | |
escalation_score = sum(w for (_, w), a in zip(ESCALATION_QUESTIONS, answers_and_none[:-1]) if a) | |
risk_level = ( | |
"High" if escalation_score >= 16 else | |
"Moderate" if escalation_score >= 8 else | |
"Low" | |
) | |
messages = [msg1, msg2, msg3] | |
dates = [date1, date2, date3] | |
active = [(m, d) for m, d in zip(messages, dates) if m.strip()] | |
if not active: | |
return "Please enter at least one message." | |
results = [(analyze_single_message(m, THRESHOLDS.copy()), d) for m, d in active] | |
abuse_scores = [r[0][0] for r in results] | |
top_labels = [r[0][2][0][0] for r in results] | |
top_scores = [r[0][2][0][1] for r in results] | |
sentiments = [r[0][3]['label'] for r in results] | |
stages = [r[0][4] for r in results] | |
darvo_scores = [r[0][5] for r in results] | |
dates_used = [r[1] or "Undated" for r in results] # Store dates for future mapping | |
composite_abuse = int(round(sum(abuse_scores) / len(abuse_scores))) | |
top_label = f"{top_labels[0]} – {int(round(top_scores[0] * 100))}%" | |
most_common_stage = max(set(stages), key=stages.count) | |
stage_text = RISK_STAGE_LABELS[most_common_stage] | |
avg_darvo = round(sum(darvo_scores) / len(darvo_scores), 3) | |
darvo_blurb = "" | |
if avg_darvo > 0.25: | |
level = "moderate" if avg_darvo < 0.65 else "high" | |
darvo_blurb = f"\n\n🎭 **DARVO Score: {avg_darvo}** → This indicates a **{level} likelihood** of narrative reversal (DARVO), where the speaker may be denying, attacking, or reversing blame." | |
out = f"Abuse Intensity: {composite_abuse}%\n" | |
out += "📊 This reflects the strength and severity of detected abuse patterns in the message(s).\n\n" | |
if escalation_score is None: | |
out += "Escalation Potential: Unknown (Checklist not completed)\n" | |
out += "🔍 *This section was not completed. Escalation potential is unknown.*\n\n" | |
else: | |
out += f"Escalation Potential: {risk_level} ({escalation_score}/{sum(w for _, w in ESCALATION_QUESTIONS)})\n" | |
out += "🚨 This indicates how many serious risk factors are present based on your answers to the safety checklist.\n" | |
out += generate_risk_snippet(composite_abuse, top_label, escalation_score if escalation_score is not None else 0, most_common_stage) | |
out += f"\n\n{stage_text}" | |
out += darvo_blurb | |
pattern_labels = [r[0][2][0][0] for r in results] # top label for each message | |
timeline_image = generate_abuse_score_chart(dates_used, abuse_scores, pattern_labels) | |
return out, timeline_image | |
message_date_pairs = [ | |
( | |
gr.Textbox(label=f"Message {i+1}"), | |
gr.Textbox(label=f"Date {i+1} (optional)", placeholder="YYYY-MM-DD") | |
) | |
for i in range(3) | |
] | |
textbox_inputs = [item for pair in message_date_pairs for item in pair] | |
quiz_boxes = [gr.Checkbox(label=q) for q, _ in ESCALATION_QUESTIONS] | |
none_box = gr.Checkbox(label="None of the above") | |
iface = gr.Interface( | |
fn=analyze_composite, | |
inputs=textbox_inputs + quiz_boxes + [none_box], | |
outputs=[ | |
gr.Textbox(label="Results"), | |
gr.Image(label="Risk Stage Timeline", type="pil") | |
], | |
title="Abuse Pattern Detector + Escalation Quiz", | |
allow_flagging="manual" | |
) | |
if __name__ == "__main__": | |
iface.launch(). |