import gradio as gr import spaces import torch import numpy as np from transformers import AutoModelForSequenceClassification, AutoTokenizer, pipeline as hf_pipeline import re import matplotlib.pyplot as plt import io from PIL import Image from datetime import datetime from torch.nn.functional import sigmoid from collections import Counter import logging # Set up logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) # Device configuration device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') logger.info(f"Using device: {device}") # Model initialization model_name = "SamanthaStorm/tether-multilabel-v4" model = AutoModelForSequenceClassification.from_pretrained(model_name).to(device) tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False) # Sentiment model sentiment_model = AutoModelForSequenceClassification.from_pretrained("SamanthaStorm/tether-sentiment").to(device) sentiment_tokenizer = AutoTokenizer.from_pretrained("SamanthaStorm/tether-sentiment", use_fast=False) # Emotion pipeline emotion_pipeline = hf_pipeline( "text-classification", model="j-hartmann/emotion-english-distilroberta-base", top_k=6, truncation=True, device=0 if torch.cuda.is_available() else -1 ) # DARVO model darvo_model = AutoModelForSequenceClassification.from_pretrained("SamanthaStorm/tether-darvo-regressor-v1").to(device) darvo_tokenizer = AutoTokenizer.from_pretrained("SamanthaStorm/tether-darvo-regressor-v1", use_fast=False) darvo_model.eval() # Constants and Labels LABELS = [ "recovery", "control", "gaslighting", "guilt tripping", "dismissiveness", "blame shifting", "nonabusive", "projection", "insults", "contradictory statements", "obscure language" ] SENTIMENT_LABELS = ["undermining", "supportive"] THRESHOLDS = { "recovery": 0.4, "control": 0.45, "gaslighting": 0.25, "guilt tripping": 0.20, "dismissiveness": 0.25, "blame shifting": 0.25, "projection": 0.25, "insults": 0.05, "contradictory statements": 0.25, "obscure language": 0.15, "nonabusive": 1.0 } PATTERN_WEIGHTS = { "recovery": 0.7, "control": 1.4, "gaslighting": 1.50, "guilt tripping": 1.2, "dismissiveness": 0.9, "blame shifting": 0.8, "projection": 0.5, "insults": 1.4, "contradictory statements": 1.0, "obscure language": 0.9, "nonabusive": 0.0 } ESCALATION_QUESTIONS = [ ("Partner has access to firearms or weapons", 4), ("Partner threatened to kill you", 3), ("Partner threatened you with a weapon", 3), ("Partner has ever choked you, even if you considered it consensual at the time", 4), ("Partner injured or threatened your pet(s)", 3), ("Partner has broken your things, punched or kicked walls, or thrown things ", 2), ("Partner forced or coerced you into unwanted sexual acts", 3), ("Partner threatened to take away your children", 2), ("Violence has increased in frequency or severity", 3), ("Partner monitors your calls/GPS/social media", 2) ] RISK_STAGE_LABELS = { 1: "🌀 Risk Stage: Tension-Building\nThis message reflects rising emotional pressure or subtle control attempts.", 2: "🔥 Risk Stage: Escalation\nThis message includes direct or aggressive patterns, suggesting active harm.", 3: "🌧️ Risk Stage: Reconciliation\nThis message reflects a reset attempt—apologies or emotional repair without accountability.", 4: "🌸 Risk Stage: Calm / Honeymoon\nThis message appears supportive but may follow prior harm, minimizing it." } THREAT_MOTIFS = [ "i'll kill you", "i'm going to hurt you", "you're dead", "you won't survive this", "i'll break your face", "i'll bash your head in", "i'll snap your neck", "i'll come over there and make you shut up", "i'll knock your teeth out", "you're going to bleed", "you want me to hit you?", "i won't hold back next time", "i swear to god i'll beat you", "next time, i won't miss", "i'll make you scream", "i know where you live", "i'm outside", "i'll be waiting", "i saw you with him", "you can't hide from me", "i'm coming to get you", "i'll find you", "i know your schedule", "i watched you leave", "i followed you home", "you'll regret this", "you'll be sorry", "you're going to wish you hadn't", "you brought this on yourself", "don't push me", "you have no idea what i'm capable of", "you better watch yourself", "i don't care what happens to you anymore", "i'll make you suffer", "you'll pay for this", "i'll never let you go", "you're nothing without me", "if you leave me, i'll kill myself", "i'll ruin you", "i'll tell everyone what you did", "i'll make sure everyone knows", "i'm going to destroy your name", "you'll lose everyone", "i'll expose you", "your friends will hate you", "i'll post everything", "you'll be cancelled", "you'll lose everything", "i'll take the house", "i'll drain your account", "you'll never see a dime", "you'll be broke when i'm done", "i'll make sure you lose your job", "i'll take your kids", "i'll make sure you have nothing", "you can't afford to leave me", "don't make me do this", "you know what happens when i'm mad", "you're forcing my hand", "if you just behaved, this wouldn't happen", "this is your fault", "you're making me hurt you", "i warned you", "you should have listened" ] def get_emotion_profile(text): """Get emotion profile from text""" emotions = emotion_pipeline(text) if isinstance(emotions, list) and isinstance(emotions[0], list): emotions = emotions[0] return {e['label'].lower(): round(e['score'], 3) for e in emotions} def get_emotional_tone_tag(text, sentiment, patterns, abuse_score): """Get emotional tone tag based on emotions and patterns""" emotions = get_emotion_profile(text) sadness = emotions.get("sadness", 0) joy = emotions.get("joy", 0) neutral = emotions.get("neutral", 0) disgust = emotions.get("disgust", 0) anger = emotions.get("anger", 0) fear = emotions.get("fear", 0) # 1. Performative Regret if ( sadness > 0.4 and any(p in patterns for p in ["blame shifting", "guilt tripping", "recovery"]) and (sentiment == "undermining" or abuse_score > 40) ): return "performative regret" # 2. Coercive Warmth if ( (joy > 0.3 or sadness > 0.4) and any(p in patterns for p in ["control", "gaslighting"]) and sentiment == "undermining" ): return "coercive warmth" # 3. Cold Invalidation if ( (neutral + disgust) > 0.5 and any(p in patterns for p in ["dismissiveness", "projection", "obscure language"]) and sentiment == "undermining" ): return "cold invalidation" # 4. Genuine Vulnerability if ( (sadness + fear) > 0.5 and sentiment == "supportive" and all(p in ["recovery"] for p in patterns) ): return "genuine vulnerability" # 5. Emotional Threat if ( (anger + disgust) > 0.5 and any(p in patterns for p in ["control", "insults", "dismissiveness"]) and sentiment == "undermining" ): return "emotional threat" # 6. Weaponized Sadness if ( sadness > 0.6 and any(p in patterns for p in ["guilt tripping", "projection"]) and sentiment == "undermining" ): return "weaponized sadness" # 7. Toxic Resignation if ( neutral > 0.5 and any(p in patterns for p in ["dismissiveness", "obscure language"]) and sentiment == "undermining" ): return "toxic resignation" # 8. Aggressive Dismissal if ( anger > 0.5 and any(p in patterns for p in ["insults", "control"]) and sentiment == "undermining" ): return "aggressive dismissal" # 9. Deflective Hostility if ( (0.2 < anger < 0.7 or 0.2 < disgust < 0.7) and any(p in patterns for p in ["projection"]) and sentiment == "undermining" ): return "deflective hostility" # 10. Contradictory Gaslight if ( (joy + anger + sadness) > 0.5 and any(p in patterns for p in ["gaslighting", "contradictory statements"]) and sentiment == "undermining" ): return "contradictory gaslight" # 11. Forced Accountability Flip if ( (anger + disgust) > 0.5 and any(p in patterns for p in ["blame shifting", "projection"]) and sentiment == "undermining" ): return "forced accountability flip" # Emotional Instability Fallback if ( (anger + sadness + disgust) > 0.6 and sentiment == "undermining" ): return "emotional instability" return "neutral" @spaces.GPU def predict_darvo_score(text): """Predict DARVO score for given text""" try: inputs = darvo_tokenizer(text, return_tensors="pt", truncation=True, padding=True) inputs = {k: v.to(device) for k, v in inputs.items()} with torch.no_grad(): logits = darvo_model(**inputs).logits return round(sigmoid(logits.cpu()).item(), 4) except Exception as e: logger.error(f"Error in DARVO prediction: {e}") return 0.0 def detect_weapon_language(text): """Detect weapon-related language in text""" weapon_keywords = ["knife", "gun", "bomb", "weapon", "kill", "stab"] t = text.lower() return any(w in t for w in weapon_keywords) def get_risk_stage(patterns, sentiment): """Determine risk stage based on patterns and sentiment""" try: if "insults" in patterns: return 2 elif "recovery" in patterns: return 3 elif "control" in patterns or "guilt tripping" in patterns: return 1 elif sentiment == "supportive" and any(p in patterns for p in ["projection", "dismissiveness"]): return 4 return 1 except Exception as e: logger.error(f"Error determining risk stage: {e}") return 1 @spaces.GPU def compute_abuse_score(matched_scores, sentiment): """Compute abuse score from matched patterns and sentiment""" try: if not matched_scores: return 0.0 total_weight = sum(weight for _, _, weight in matched_scores) if total_weight == 0: return 0.0 pattern_scores = [(label, score) for label, score, _ in matched_scores] sorted_scores = sorted(pattern_scores, key=lambda x: x[1], reverse=True) weighted_sum = sum(score * weight for _, score, weight in matched_scores) base_score = (weighted_sum / total_weight) * 100 # Pattern combination multipliers if len(matched_scores) >= 3: base_score *= 1.2 high_severity_patterns = {'gaslighting', 'control', 'blame shifting'} if any(label in high_severity_patterns for label, _, _ in matched_scores): base_score *= 1.15 if any(score > 0.6 for _, score, _ in matched_scores): base_score *= 1.1 high_scores = len([score for _, score, _ in matched_scores if score > 0.5]) if high_scores >= 2: base_score *= 1.15 # Apply sentiment modifiers if sentiment == "supportive": if any(label in high_severity_patterns for label, _, _ in matched_scores): base_score *= 0.9 else: base_score *= 0.85 elif sentiment == "undermining": base_score *= 1.15 if any(score > 0.6 for _, score, _ in matched_scores): base_score = max(base_score, 65.0) return min(round(base_score, 1), 100.0) except Exception as e: logger.error(f"Error computing abuse score: {e}") return 0.0 @spaces.GPU def analyze_single_message(text, thresholds): """Analyze a single message for abuse patterns""" logger.debug("\n=== DEBUG START ===") logger.debug(f"Input text: {text}") try: if not text.strip(): logger.debug("Empty text, returning zeros") return 0.0, [], [], {"label": "none"}, 1, 0.0, None # Check for explicit abuse explicit_abuse_words = ['fuck', 'bitch', 'shit', 'ass', 'dick'] explicit_abuse = any(word in text.lower() for word in explicit_abuse_words) logger.debug(f"Explicit abuse detected: {explicit_abuse}") # Abuse model inference inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True) inputs = {k: v.to(device) for k, v in inputs.items()} with torch.no_grad(): outputs = model(**inputs) raw_scores = torch.sigmoid(outputs.logits.squeeze(0)).cpu().numpy() # Log raw model outputs logger.debug("\nRaw model scores:") for label, score in zip(LABELS, raw_scores): logger.debug(f"{label}: {score:.3f}") # Get predictions and sort them predictions = list(zip(LABELS, raw_scores)) sorted_predictions = sorted(predictions, key=lambda x: x[1], reverse=True) logger.debug("\nTop 3 predictions:") for label, score in sorted_predictions[:3]: logger.debug(f"{label}: {score:.3f}") # Apply thresholds threshold_labels = [] if explicit_abuse: threshold_labels.append("insults") logger.debug("\nForced inclusion of 'insults' due to explicit abuse") for label, score in sorted_predictions: base_threshold = thresholds.get(label, 0.25) if explicit_abuse: base_threshold *= 0.5 if score > base_threshold: if label not in threshold_labels: threshold_labels.append(label) logger.debug("\nLabels that passed thresholds:", threshold_labels) # Calculate matched scores matched_scores = [] for label in threshold_labels: score = raw_scores[LABELS.index(label)] weight = PATTERN_WEIGHTS.get(label, 1.0) if explicit_abuse and label == "insults": weight *= 1.5 matched_scores.append((label, score, weight)) # Get sentiment sent_inputs = sentiment_tokenizer(text, return_tensors="pt", truncation=True, padding=True) sent_inputs = {k: v.to(device) for k, v in sent_inputs.items()} with torch.no_grad(): sent_logits = sentiment_model(**sent_inputs).logits[0] sent_probs = torch.softmax(sent_logits, dim=-1).cpu().numpy() sentiment = SENTIMENT_LABELS[int(np.argmax(sent_probs))] # Calculate abuse score abuse_score = compute_abuse_score(matched_scores, sentiment) if explicit_abuse: abuse_score = max(abuse_score, 70.0) # Get DARVO score darvo_score = predict_darvo_score(text) # Get tone using emotion-based approach tone_tag = get_emotional_tone_tag(text, sentiment, threshold_labels, abuse_score) # Set stage stage = 2 if explicit_abuse or abuse_score > 70 else 1 logger.debug("=== DEBUG END ===\n") return abuse_score, threshold_labels, matched_scores, {"label": sentiment}, stage, darvo_score, tone_tag except Exception as e: logger.error(f"Error in analyze_single_message: {e}") return 0.0, [], [], {"label": "error"}, 1, 0.0, None def generate_abuse_score_chart(dates, scores, patterns): """Generate a timeline chart of abuse scores""" try: plt.figure(figsize=(10, 6)) plt.clf() # Create new figure fig, ax = plt.subplots(figsize=(10, 6)) # Plot points and lines x = range(len(scores)) plt.plot(x, scores, 'bo-', linewidth=2, markersize=8) # Add labels for each point for i, (score, pattern) in enumerate(zip(scores, patterns)): plt.annotate( f'{pattern}\n{score:.0f}%', (i, score), textcoords="offset points", xytext=(0, 10), ha='center', bbox=dict( boxstyle='round,pad=0.5', fc='white', ec='gray', alpha=0.8 ) ) # Customize the plot plt.ylim(-5, 105) plt.grid(True, linestyle='--', alpha=0.7) plt.title('Abuse Pattern Timeline', pad=20, fontsize=12) plt.ylabel('Abuse Score %') # X-axis labels plt.xticks(x, dates, rotation=45) # Risk level bands plt.axhspan(0, 50, color='#90EE90', alpha=0.2) # light green plt.axhspan(50, 70, color='#FFD700', alpha=0.2) # gold plt.axhspan(70, 85, color='#FFA500', alpha=0.2) # orange plt.axhspan(85, 100, color='#FF6B6B', alpha=0.2) # light red # Add risk level labels plt.text(-0.2, 25, 'Low Risk', rotation=90, va='center') plt.text(-0.2, 60, 'Moderate Risk', rotation=90, va='center') plt.text(-0.2, 77.5, 'High Risk', rotation=90, va='center') plt.text(-0.2, 92.5, 'Critical Risk', rotation=90, va='center') # Adjust layout plt.tight_layout() # Convert plot to image buf = io.BytesIO() plt.savefig(buf, format='png', bbox_inches='tight') buf.seek(0) plt.close('all') # Close all figures to prevent memory leaks return Image.open(buf) except Exception as e: logger.error(f"Error generating abuse score chart: {e}") return None def analyze_composite(msg1, msg2, msg3, *answers_and_none): """Analyze multiple messages and checklist responses""" try: # Process checklist responses none_selected_checked = answers_and_none[-1] responses_checked = any(answers_and_none[:-1]) none_selected = not responses_checked and none_selected_checked if none_selected: escalation_score = 0 escalation_note = "Checklist completed: no danger items reported." escalation_completed = True elif responses_checked: escalation_score = sum(w for (_, w), a in zip(ESCALATION_QUESTIONS, answers_and_none[:-1]) if a) escalation_note = "Checklist completed." escalation_completed = True else: escalation_score = None escalation_note = "Checklist not completed." escalation_completed = False # Process messages messages = [msg1, msg2, msg3] active = [(m, f"Message {i+1}") for i, m in enumerate(messages) if m.strip()] if not active: return "Please enter at least one message.", None # Detect threats def normalize(text): import unicodedata text = text.lower().strip() text = unicodedata.normalize("NFKD", text) text = text.replace("'", "'") return re.sub(r"[^a-z0-9 ]", "", text) def detect_threat_motifs(message, motif_list): norm_msg = normalize(message) return [motif for motif in motif_list if normalize(motif) in norm_msg] # Analyze threats and patterns immediate_threats = [detect_threat_motifs(m, THREAT_MOTIFS) for m, _ in active] flat_threats = [t for sublist in immediate_threats for t in sublist] threat_risk = "Yes" if flat_threats else "No" # Analyze each message results = [(analyze_single_message(m, THRESHOLDS.copy()), d) for m, d in active] # Extract scores and metadata abuse_scores = [r[0][0] for r in results] stages = [r[0][4] for r in results] darvo_scores = [r[0][5] for r in results] tone_tags = [r[0][6] for r in results] dates_used = [r[1] for r in results] # Analyze patterns predicted_labels = [label for r in results for label in r[0][1]] high = {'control'} moderate = {'gaslighting', 'dismissiveness', 'obscure language', 'insults', 'contradictory statements', 'guilt tripping'} low = {'blame shifting', 'projection', 'recovery'} counts = {'high': 0, 'moderate': 0, 'low': 0} for label in predicted_labels: if label in high: counts['high'] += 1 elif label in moderate: counts['moderate'] += 1 elif label in low: counts['low'] += 1 # Pattern escalation logic if counts['high'] >= 2 and counts['moderate'] >= 2: pattern_escalation_risk = "Critical" elif (counts['high'] >= 2 and counts['moderate'] >= 1) or \ (counts['moderate'] >= 3) or \ (counts['high'] >= 1 and counts['moderate'] >= 2): pattern_escalation_risk = "High" elif (counts['moderate'] == 2) or \ (counts['high'] == 1 and counts['moderate'] == 1) or \ (counts['moderate'] == 1 and counts['low'] >= 2) or \ (counts['high'] == 1 and sum(counts.values()) == 1): pattern_escalation_risk = "Moderate" else: pattern_escalation_risk = "Low" # Calculate escalation risk checklist_escalation_risk = "Unknown" if escalation_score is None else ( "Critical" if escalation_score >= 20 else "Moderate" if escalation_score >= 10 else "Low" ) # Calculate escalation bump escalation_bump = 0 for result, _ in results: abuse_score, _, _, sentiment, stage, darvo_score, tone_tag = result if darvo_score > 0.65: escalation_bump += 3 if tone_tag in ["forced accountability flip", "emotional threat"]: escalation_bump += 2 if abuse_score > 80: escalation_bump += 2 if stage == 2: escalation_bump += 3 # Calculate combined risk def rank(label): return {"Low": 0, "Moderate": 1, "High": 2, "Critical": 3, "Unknown": 0}.get(label, 0) combined_score = rank(pattern_escalation_risk) + rank(checklist_escalation_risk) + escalation_bump escalation_risk = ( "Critical" if combined_score >= 6 else "High" if combined_score >= 4 else "Moderate" if combined_score >= 2 else "Low" ) # Build escalation text if escalation_score is None: escalation_text = ( "🚫 **Escalation Potential: Unknown** (Checklist not completed)\n" "⚠️ This section was not completed. Escalation potential is estimated using message data only.\n" ) hybrid_score = 0 elif escalation_score == 0: escalation_text = ( "✅ **Escalation Checklist Completed:** No danger items reported.\n" "🧭 **Escalation potential estimated from detected message patterns only.**\n" f"• Pattern Risk: {pattern_escalation_risk}\n" f"• Checklist Risk: None reported\n" f"• Escalation Bump: +{escalation_bump} (from DARVO, tone, intensity, etc.)" ) hybrid_score = escalation_bump else: hybrid_score = escalation_score + escalation_bump escalation_text = ( f"📈 **Escalation Potential: {escalation_risk} ({hybrid_score}/29)**\n" "📋 This score combines your safety checklist answers *and* detected high-risk behavior.\n" f"• Pattern Risk: {pattern_escalation_risk}\n" f"• Checklist Risk: {checklist_escalation_risk}\n" f"• Escalation Bump: +{escalation_bump} (from DARVO, tone, intensity, etc.)" ) # Composite Abuse Score composite_abuse = int(round(sum(abuse_scores) / len(abuse_scores))) # Get most common stage most_common_stage = max(set(stages), key=stages.count) stage_text = RISK_STAGE_LABELS[most_common_stage] # Build output text out = f"Abuse Intensity: {composite_abuse}%\n" out += "📊 This reflects the strength and severity of detected abuse patterns in the message(s).\n\n" # Add risk assessment risk_level = ( "Critical" if composite_abuse >= 85 or hybrid_score >= 20 else "High" if composite_abuse >= 70 or hybrid_score >= 15 else "Moderate" if composite_abuse >= 50 or hybrid_score >= 10 else "Low" ) risk_descriptions = { "Critical": ( "🚨 **Risk Level: Critical**\n" "Multiple severe abuse patterns detected. This situation shows signs of " "dangerous escalation and immediate intervention may be needed." ), "High": ( "⚠️ **Risk Level: High**\n" "Strong abuse patterns detected. This situation shows concerning " "signs of manipulation and control." ), "Moderate": ( "⚡ **Risk Level: Moderate**\n" "Concerning patterns detected. While not severe, these behaviors " "indicate unhealthy relationship dynamics." ), "Low": ( "📝 **Risk Level: Low**\n" "Minor concerning patterns detected. While present, the detected " "behaviors are subtle or infrequent." ) } out += risk_descriptions[risk_level] out += f"\n\n{stage_text}" # Add DARVO analysis avg_darvo = round(sum(darvo_scores) / len(darvo_scores), 3) if avg_darvo > 0.25: level = "moderate" if avg_darvo < 0.65 else "high" out += f"\n\n🎭 **DARVO Score: {avg_darvo}** → This indicates a **{level} likelihood** of narrative reversal (DARVO), where the speaker may be denying, attacking, or reversing blame." # Add emotional tones out += "\n\n🎭 **Emotional Tones Detected:**\n" for i, tone in enumerate(tone_tags): out += f"• Message {i+1}: *{tone or 'none'}*\n" # Add threats section if flat_threats: out += "\n\n🚨 **Immediate Danger Threats Detected:**\n" for t in set(flat_threats): out += f"• \"{t}\"\n" out += "\n⚠️ These phrases may indicate an imminent risk to physical safety." else: out += "\n\n🧩 **Immediate Danger Threats:** None explicitly detected.\n" out += "This does *not* rule out risk, but no direct threat phrases were matched." # Generate timeline pattern_labels = [ pats[0][0] if (pats := r[0][2]) else "none" for r in results ] timeline_image = generate_abuse_score_chart(dates_used, abuse_scores, pattern_labels) # Add escalation text out += "\n\n" + escalation_text return out, timeline_image except Exception as e: logger.error(f"Error in analyze_composite: {e}") return "An error occurred during analysis.", None # Gradio Interface Setup def create_interface(): try: textbox_inputs = [gr.Textbox(label=f"Message {i+1}") for i in range(3)] quiz_boxes = [gr.Checkbox(label=q) for q, _ in ESCALATION_QUESTIONS] none_box = gr.Checkbox(label="None of the above") demo = gr.Interface( fn=analyze_composite, inputs=textbox_inputs + quiz_boxes + [none_box], outputs=[ gr.Textbox(label="Results"), gr.Image(label="Abuse Score Timeline", type="pil") ], title="Abuse Pattern Detector + Escalation Quiz", description=( "Enter up to three messages that concern you. " "For the most accurate results, include messages from a recent emotionally intense period." ), flagging_mode="manual" ) return demo except Exception as e: logger.error(f"Error creating interface: {e}") raise # Main execution if __name__ == "__main__": try: demo = create_interface() demo.launch( server_name="0.0.0.0", server_port=7860, share=False ) except Exception as e: logger.error(f"Failed to launch app: {e}") raise