diff --git "a/app (37).py" "b/app (37).py" new file mode 100644--- /dev/null +++ "b/app (37).py" @@ -0,0 +1,2629 @@ +import gradio as gr +import spaces +import torch +import numpy as np +from transformers import AutoModelForSequenceClassification, AutoTokenizer, pipeline as hf_pipeline +import re +import matplotlib.pyplot as plt +import io +from PIL import Image +from datetime import datetime +from torch.nn.functional import sigmoid +from collections import Counter +import logging +import traceback +import json + + +# Set up logging +logging.basicConfig(level=logging.DEBUG) +logger = logging.getLogger(__name__) + +# Device configuration +device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') +logger.info(f"Using device: {device}") + +# Set up custom logging +class CustomFormatter(logging.Formatter): + """Custom formatter with colors and better formatting""" + grey = "\x1b[38;21m" + blue = "\x1b[38;5;39m" + yellow = "\x1b[38;5;226m" + red = "\x1b[38;5;196m" + bold_red = "\x1b[31;1m" + reset = "\x1b[0m" + + def format(self, record): + # Remove the logger name from the output + if record.levelno == logging.DEBUG: + return f"{self.blue}{record.getMessage()}{self.reset}" + elif record.levelno == logging.INFO: + return f"{self.grey}{record.getMessage()}{self.reset}" + elif record.levelno == logging.WARNING: + return f"{self.yellow}{record.getMessage()}{self.reset}" + elif record.levelno == logging.ERROR: + return f"{self.red}{record.getMessage()}{self.reset}" + elif record.levelno == logging.CRITICAL: + return f"{self.bold_red}{record.getMessage()}{self.reset}" + return record.getMessage() + +# Setup logger +logger = logging.getLogger(__name__) +logger.setLevel(logging.DEBUG) + +# Remove any existing handlers +logger.handlers = [] + +# Create console handler with custom formatter +ch = logging.StreamHandler() +ch.setLevel(logging.DEBUG) +ch.setFormatter(CustomFormatter()) +logger.addHandler(ch) + +# Model initialization +model_name = "SamanthaStorm/tether-multilabel-v6" +model = AutoModelForSequenceClassification.from_pretrained(model_name).to(device) +tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False) + +# sentiment model +sentiment_model = AutoModelForSequenceClassification.from_pretrained("SamanthaStorm/tether-sentiment-v3").to(device) +sentiment_tokenizer = AutoTokenizer.from_pretrained("SamanthaStorm/tether-sentiment-v3", use_fast=False) +sentiment_model.eval() + +emotion_pipeline = hf_pipeline( + "text-classification", + model="j-hartmann/emotion-english-distilroberta-base", + return_all_scores=True, # Get all emotion scores + top_k=None, # Don't limit to top k predictions + truncation=True, + device=0 if torch.cuda.is_available() else -1 +) + +# DARVO model +darvo_model = AutoModelForSequenceClassification.from_pretrained("SamanthaStorm/tether-darvo-regressor-v1").to(device) +darvo_tokenizer = AutoTokenizer.from_pretrained("SamanthaStorm/tether-darvo-regressor-v1", use_fast=False) +darvo_model.eval() + +# Constants and Labels +LABELS = [ + "recovery phase", "control", "gaslighting", "guilt tripping", "dismissiveness", + "blame shifting", "nonabusive", "projection", "insults", + "contradictory statements", "obscure language", + "veiled threats", "stalking language", "false concern", + "false equivalence", "future faking" +] + +SENTIMENT_LABELS = ["supportive", "undermining"] + +THRESHOLDS = { + "recovery phase": 0.278, + "control": 0.287, + "gaslighting": 0.144, + "guilt tripping": 0.220, + "dismissiveness": 0.142, + "blame shifting": 0.183, + "projection": 0.253, + "insults": 0.247, + "contradictory statements": 0.200, + "obscure language": 0.455, + "nonabusive": 0.281, + # NEW v6 patterns: + "veiled threats": 0.310, + "stalking language": 0.339, + "false concern": 0.334, + "false equivalence": 0.317, + "future faking": 0.385 +} + +PATTERN_WEIGHTS = { + "recovery phase": 0.7, + "control": 1.4, + "gaslighting": 1.3, + "guilt tripping": 1.2, + "dismissiveness": 0.9, + "blame shifting": 1.0, + "projection": 0.5, + "insults": 1.4, + "contradictory statements": 1.0, + "obscure language": 0.9, + "nonabusive": 0.0, + # NEW v6 patterns: + "veiled threats": 1.6, # High weight - very dangerous + "stalking language": 1.8, # Highest weight - extremely dangerous + "false concern": 1.1, # Moderate weight - manipulative + "false equivalence": 1.3, # Enhances DARVO detection + "future faking": 0.8 # Lower weight - manipulation tactic +} + +ESCALATION_QUESTIONS = [ + ("Partner has access to firearms or weapons", 4), + ("Partner threatened to kill you", 3), + ("Partner threatened you with a weapon", 3), + ("Partner has ever choked you, even if you considered it consensual at the time", 4), + ("Partner injured or threatened your pet(s)", 3), + ("Partner has broken your things, punched or kicked walls, or thrown things ", 2), + ("Partner forced or coerced you into unwanted sexual acts", 3), + ("Partner threatened to take away your children", 2), + ("Violence has increased in frequency or severity", 3), + ("Partner monitors your calls/GPS/social media", 2) +] + +RISK_STAGE_LABELS = { + 1: "π Risk Stage: Tension-Building\nThis message reflects rising emotional pressure or subtle control attempts.", + 2: "π₯ Risk Stage: Escalation\nThis message includes direct or aggressive patterns, suggesting active harm.", + 3: "π§οΈ Risk Stage: Reconciliation\nThis message reflects a reset attemptβapologies or emotional repair without accountability.", + 4: "πΈ Risk Stage: Calm / Honeymoon\nThis message appears supportive but may follow prior harm, minimizing it." +} + +THREAT_MOTIFS = [ + "i'll kill you", "i'm going to hurt you", "you're dead", "you won't survive this", + "i'll break your face", "i'll bash your head in", "i'll snap your neck", + "i'll come over there and make you shut up", "i'll knock your teeth out", + "you're going to bleed", "you want me to hit you?", "i won't hold back next time", + "i swear to god i'll beat you", "next time, i won't miss", "i'll make you scream", + "i know where you live", "i'm outside", "i'll be waiting", "i saw you with him", + "you can't hide from me", "i'm coming to get you", "i'll find you", "i know your schedule", + "i watched you leave", "i followed you home", "you'll regret this", "you'll be sorry", + "you're going to wish you hadn't", "you brought this on yourself", "don't push me", + "you have no idea what i'm capable of", "you better watch yourself", + "i don't care what happens to you anymore", "i'll make you suffer", "you'll pay for this", + "i'll never let you go", "you're nothing without me", "if you leave me, i'll kill myself", + "i'll ruin you", "i'll tell everyone what you did", "i'll make sure everyone knows", + "i'm going to destroy your name", "you'll lose everyone", "i'll expose you", + "your friends will hate you", "i'll post everything", "you'll be cancelled", + "you'll lose everything", "i'll take the house", "i'll drain your account", + "you'll never see a dime", "you'll be broke when i'm done", "i'll make sure you lose your job", + "i'll take your kids", "i'll make sure you have nothing", "you can't afford to leave me", + "don't make me do this", "you know what happens when i'm mad", "you're forcing my hand", + "if you just behaved, this wouldn't happen", "this is your fault", + "you're making me hurt you", "i warned you", "you should have listened" +] + +# MOVED TO TOP LEVEL - Fixed tone severity mapping +TONE_SEVERITY = { + # Highest danger tones + "obsessive fixation": 4, + "menacing calm": 4, + "conditional menace": 4, + "surveillance intimacy": 4, + + # High danger tones + "predatory concern": 3, + "victim cosplay": 3, + "entitled rage": 3, + "direct threat": 3, + + # Moderate danger tones + "manipulative hope": 2, + "false vulnerability": 2, + "calculated coldness": 2, + "predictive punishment": 2, + + # Existing tones (keep current mappings) + "emotional threat": 3, + "forced accountability flip": 3, + "performative regret": 2, + "coercive warmth": 2, + "cold invalidation": 2, + "weaponized sadness": 2, + "contradictory gaslight": 2, + + # Low risk tones + "neutral": 0, + "genuine vulnerability": 0 +} + +# MOVED TO TOP LEVEL - Helper functions +def log_emotional_tone_usage(tone_tag, patterns): + """Log tone usage for analytics""" + logger.debug(f"π Detected tone tag: {tone_tag} with patterns: {patterns}") + + # Track dangerous tone combinations + dangerous_tones = [ + "obsessive fixation", "menacing calm", "predatory concern", + "surveillance intimacy", "conditional menace", "victim cosplay" + ] + + if tone_tag in dangerous_tones: + logger.warning(f"β οΈ Dangerous emotional tone detected: {tone_tag}") + +def calculate_tone_risk_boost(tone_tag): + """Calculate risk boost based on emotional tone severity""" + return TONE_SEVERITY.get(tone_tag, 0) + +def should_show_safety_planning(abuse_score, escalation_risk, detected_patterns): + """Check if we should show safety planning""" + if escalation_risk in ["High", "Critical"]: + return True + if abuse_score >= 70: + return True + dangerous_patterns = ["stalking language", "veiled threats", "threats"] + if any(pattern in detected_patterns for pattern in dangerous_patterns): + return True + return False + +def generate_simple_safety_plan(abuse_score, escalation_risk, detected_patterns): + """Generate a basic safety plan""" + + plan = "π‘οΈ **SAFETY PLANNING RECOMMENDED**\n\n" + + if escalation_risk == "Critical" or abuse_score >= 85: + plan += "π¨ **CRITICAL SAFETY SITUATION**\n\n" + plan += "**IMMEDIATE ACTIONS:**\n" + plan += "β’ Contact domestic violence hotline: **1-800-799-7233** (24/7, free, confidential)\n" + plan += "β’ Text START to **88788** for crisis text support\n" + plan += "β’ Consider staying with trusted friends/family tonight\n" + plan += "β’ Keep phone charged and accessible\n" + plan += "β’ Have emergency bag ready (documents, medications, cash)\n" + plan += "\n**IF IN IMMEDIATE DANGER: Call 911**\n\n" + + elif escalation_risk == "High" or abuse_score >= 70: + plan += "β οΈ **HIGH RISK SITUATION**\n\n" + plan += "**SAFETY STEPS:**\n" + plan += "β’ Contact domestic violence hotline for safety planning: **1-800-799-7233**\n" + plan += "β’ Identify 3 trusted people you can contact for help\n" + plan += "β’ Plan escape routes and transportation options\n" + plan += "β’ Document concerning behaviors with dates and details\n" + plan += "β’ Research legal protection options\n\n" + + # Add pattern-specific advice + if "stalking language" in detected_patterns: + plan += "π **STALKING BEHAVIORS DETECTED:**\n" + plan += "β’ Vary your routines and routes\n" + plan += "β’ Check devices for tracking software\n" + plan += "β’ Keep record of all stalking incidents\n" + plan += "β’ Alert neighbors to watch for suspicious activity\n\n" + + if "veiled threats" in detected_patterns: + plan += "β οΈ **THREATENING LANGUAGE IDENTIFIED:**\n" + plan += "β’ Take all threats seriously, even indirect ones\n" + plan += "β’ Document all threatening communications\n" + plan += "β’ Inform trusted people about threat patterns\n" + plan += "β’ Avoid being alone in isolated locations\n\n" + + # Always include crisis resources + plan += "π **CRISIS RESOURCES (24/7):**\n" + plan += "β’ **National DV Hotline:** 1-800-799-7233\n" + plan += "β’ **Crisis Text Line:** Text START to 88788\n" + plan += "β’ **Online Chat:** thehotline.org\n" + plan += "β’ **Emergency:** Call 911\n\n" + + plan += "π **Remember:** You are not alone. This is not your fault. You deserve to be safe." + + return plan + +def detect_rare_threats(text): + rare_threats = ["necktie party", "permanent solution", "final conversation"] + if any(threat in text.lower() for threat in rare_threats): + return [("veiled threats", 0.90, 1.6)] + return [] + +def detect_enhanced_threats(text, patterns): + """Enhanced threat detection for v6 patterns""" + text_lower = text.lower() + enhanced_threats = [] + + # Stalking language indicators + stalking_phrases = [ + "stop at nothing", "will find you", "know where you", + "watching you", "following you", "can't hide", + "i know your", "saw you with", "you belong to me" + ] + + # Veiled threat indicators + veiled_threat_phrases = [ + "some people might", "things happen to people who", + "be careful", "hope nothing happens", "accidents happen", + "necktie party", "permanent solution", "wouldn't want" + ] + + # False concern indicators + false_concern_phrases = [ + "just worried about", "concerned about your", + "someone needs to protect", "for your own good" + ] + + if any(phrase in text_lower for phrase in stalking_phrases): + enhanced_threats.append("stalking language") + + if any(phrase in text_lower for phrase in veiled_threat_phrases): + enhanced_threats.append("veiled threats") + + if any(phrase in text_lower for phrase in false_concern_phrases): + enhanced_threats.append("false concern") + + return enhanced_threats + +def calculate_enhanced_risk_level(abuse_score, detected_patterns, escalation_risk, darvo_score): + """Enhanced risk calculation that properly weights dangerous patterns""" + + # Start with base risk from escalation system + base_risk = escalation_risk + + # CRITICAL PATTERNS - Auto-elevate to HIGH risk minimum + critical_patterns = ["stalking language", "veiled threats"] + has_critical = any(pattern in detected_patterns for pattern in critical_patterns) + + # DANGEROUS COMBINATIONS - Auto-elevate to CRITICAL + dangerous_combos = [ + ("stalking language", "control"), + ("veiled threats", "stalking language"), + ("stalking language", "false concern"), + ("veiled threats", "control") + ] + + has_dangerous_combo = any( + all(pattern in detected_patterns for pattern in combo) + for combo in dangerous_combos + ) + + # FORCE RISK ELEVATION for dangerous patterns + if has_dangerous_combo: + return "Critical" + elif has_critical and abuse_score >= 30: # Lower threshold for critical patterns + return "High" + elif has_critical: + return "Moderate" + elif abuse_score >= 70: + return "High" + elif abuse_score >= 50: + return "Moderate" + else: + return base_risk + +def get_emotion_profile(text): + """Get emotion profile from text with all scores""" + try: + emotions = emotion_pipeline(text) + if isinstance(emotions, list) and isinstance(emotions[0], list): + # Extract all scores from the first prediction + emotion_scores = emotions[0] + # Convert to dictionary with lowercase emotion names + return {e['label'].lower(): round(e['score'], 3) for e in emotion_scores} + return {} + except Exception as e: + logger.error(f"Error in get_emotion_profile: {e}") + return { + "sadness": 0.0, + "joy": 0.0, + "neutral": 0.0, + "disgust": 0.0, + "anger": 0.0, + "fear": 0.0 + } + +# FIXED FUNCTION - Added missing "d" and cleaned up structure +def get_emotional_tone_tag(text, sentiment, patterns, abuse_score): + """Get emotional tone tag based on emotions and patterns""" + emotions = get_emotion_profile(text) + + sadness = emotions.get("sadness", 0) + joy = emotions.get("joy", 0) + neutral = emotions.get("neutral", 0) + disgust = emotions.get("disgust", 0) + anger = emotions.get("anger", 0) + fear = emotions.get("fear", 0) + + text_lower = text.lower() + + # 1. Direct Threat Detection + threat_indicators = [ + "if you", "i'll make", "don't forget", "remember", "regret", + "i control", "i'll take", "you'll lose", "make sure", + "never see", "won't let" + ] + if ( + any(indicator in text_lower for indicator in threat_indicators) and + any(p in patterns for p in ["control", "insults"]) and + (anger > 0.2 or disgust > 0.2 or abuse_score > 70) + ): + return "direct threat" + + # 2. Obsessive Fixation (for stalking language) + obsessive_indicators = [ + "stop at nothing", "most desired", "forever", "always will", + "belong to me", "you're mine", "never let you go", "can't live without" + ] + if ( + any(indicator in text_lower for indicator in obsessive_indicators) and + "stalking language" in patterns and + (joy > 0.3 or sadness > 0.4 or fear > 0.2) + ): + return "obsessive fixation" + + # 3. Menacing Calm (for veiled threats) + veiled_threat_indicators = [ + "some people might", "accidents happen", "be careful", + "wouldn't want", "things happen", "unfortunate" + ] + if ( + any(indicator in text_lower for indicator in veiled_threat_indicators) and + "veiled threats" in patterns and + neutral > 0.4 and anger < 0.2 + ): + return "menacing calm" + + # 4. Predatory Concern (for false concern) + concern_indicators = [ + "worried about", "concerned about", "for your own good", + "someone needs to", "protect you", "take care of you" + ] + if ( + any(indicator in text_lower for indicator in concern_indicators) and + "false concern" in patterns and + (joy > 0.2 or neutral > 0.3) and sentiment == "undermining" + ): + return "predatory concern" + + # 5. Victim Cosplay (for false equivalence/DARVO) + victim_indicators = [ + "i'm the victim", "you're abusing me", "i'm being hurt", + "you're attacking me", "i'm innocent", "you're the problem" + ] + if ( + any(indicator in text_lower for indicator in victim_indicators) and + "false equivalence" in patterns and + sadness > 0.4 and anger > 0.2 + ): + return "victim cosplay" + + # 6. Manipulative Hope (for future faking) + future_indicators = [ + "i'll change", "we'll be", "i promise", "things will be different", + "next time", "from now on", "i'll never", "we'll have" + ] + if ( + any(indicator in text_lower for indicator in future_indicators) and + "future faking" in patterns and + (joy > 0.3 or sadness > 0.3) + ): + return "manipulative hope" + + # 7. Surveillance Intimacy (for stalking with false intimacy) + surveillance_indicators = [ + "i know you", "i saw you", "i watched", "i've been", + "your routine", "where you go", "what you do" + ] + if ( + any(indicator in text_lower for indicator in surveillance_indicators) and + "stalking language" in patterns and + joy > 0.2 and neutral > 0.2 + ): + return "surveillance intimacy" + + # 8. Conditional Menace (for threats with conditions) + conditional_indicators = [ + "if you", "unless you", "you better", "don't make me", + "you wouldn't want", "force me to" + ] + if ( + any(indicator in text_lower for indicator in conditional_indicators) and + any(p in patterns for p in ["veiled threats", "control"]) and + anger > 0.3 and neutral > 0.2 + ): + return "conditional menace" + + # 9. False Vulnerability (manipulation disguised as weakness) + vulnerability_indicators = [ + "i can't help", "i need you", "without you i", "you're all i have", + "i'm lost without", "i don't know what to do" + ] + if ( + any(indicator in text_lower for indicator in vulnerability_indicators) and + any(p in patterns for p in ["guilt tripping", "future faking", "false concern"]) and + sadness > 0.5 and sentiment == "undermining" + ): + return "false vulnerability" + + # 10. Entitled Rage (anger with entitlement) + entitlement_indicators = [ + "you owe me", "after everything", "how dare you", "you should", + "i deserve", "you have no right" + ] + if ( + any(indicator in text_lower for indicator in entitlement_indicators) and + anger > 0.5 and + any(p in patterns for p in ["control", "insults", "blame shifting"]) + ): + return "entitled rage" + + # 11. Calculated Coldness (deliberate emotional detachment) + cold_indicators = [ + "i don't care", "whatever", "your choice", "suit yourself", + "fine by me", "your loss" + ] + calculated_patterns = ["dismissiveness", "obscure language", "control"] + if ( + any(indicator in text_lower for indicator in cold_indicators) and + any(p in patterns for p in calculated_patterns) and + neutral > 0.6 and all(e < 0.2 for e in [anger, sadness, joy]) + ): + return "calculated coldness" + + # 12. Predictive Punishment + future_consequences = [ + "will end up", "you'll be", "you will be", "going to be", + "will become", "will find yourself", "will realize", + "you'll regret", "you'll see", "will learn", "truly will", + "end up alone", "end up miserable" + ] + dismissive_endings = [ + "i'm out", "i'm done", "whatever", "good luck", + "your choice", "your problem", "regardless", + "keep", "keep on" + ] + + if ( + (any(phrase in text_lower for phrase in future_consequences) or + any(end in text_lower for end in dismissive_endings)) and + any(p in ["dismissiveness", "control"] for p in patterns) and + (disgust > 0.2 or neutral > 0.3 or anger > 0.2) + ): + return "predictive punishment" + + # 13. Performative Regret + if ( + sadness > 0.3 and + any(p in patterns for p in ["blame shifting", "guilt tripping", "recovery"]) and + (sentiment == "undermining" or abuse_score > 40) + ): + return "performative regret" + + # 14. Coercive Warmth + if ( + (joy > 0.2 or sadness > 0.3) and + any(p in patterns for p in ["control", "gaslighting"]) and + sentiment == "undermining" + ): + return "coercive warmth" + + # 15. Cold Invalidation + if ( + (neutral + disgust) > 0.4 and + any(p in patterns for p in ["dismissiveness", "projection", "obscure language"]) and + sentiment == "undermining" + ): + return "cold invalidation" + + # 16. Genuine Vulnerability + if ( + (sadness + fear) > 0.4 and + sentiment == "supportive" and + all(p in ["recovery"] for p in patterns) + ): + return "genuine vulnerability" + + # 17. Emotional Threat + if ( + (anger + disgust) > 0.4 and + any(p in patterns for p in ["control", "insults", "dismissiveness"]) and + sentiment == "undermining" + ): + return "emotional threat" + + # 18. Weaponized Sadness + if ( + sadness > 0.5 and + any(p in patterns for p in ["guilt tripping", "projection"]) and + sentiment == "undermining" + ): + return "weaponized sadness" + + # 19. Contradictory Gaslight + if ( + (joy + anger + sadness) > 0.4 and + any(p in patterns for p in ["gaslighting", "contradictory statements"]) and + sentiment == "undermining" + ): + return "contradictory gaslight" + + # 20. Forced Accountability Flip + if ( + (anger + disgust) > 0.4 and + any(p in patterns for p in ["blame shifting", "projection"]) and + sentiment == "undermining" + ): + return "forced accountability flip" + + # Emotional Instability Fallback + if ( + (anger + sadness + disgust) > 0.5 and + sentiment == "undermining" + ): + return "emotional instability" + + return "neutral" + +@spaces.GPU +def predict_darvo_score(text): + """Predict DARVO score for given text""" + try: + inputs = darvo_tokenizer(text, return_tensors="pt", truncation=True, padding=True) + inputs = {k: v.to(device) for k, v in inputs.items()} + with torch.no_grad(): + logits = darvo_model(**inputs).logits + return round(sigmoid(logits.cpu()).item(), 4) + except Exception as e: + logger.error(f"Error in DARVO prediction: {e}") + return 0.0 + +def detect_weapon_language(text): + """Detect weapon-related language in text""" + weapon_keywords = ["knife", "gun", "bomb", "weapon", "kill", "stab"] + t = text.lower() + return any(w in t for w in weapon_keywords) + +def get_risk_stage(patterns, sentiment): + """Determine risk stage based on patterns and sentiment""" + try: + if "insults" in patterns: + return 2 + elif "recovery" in patterns: + return 3 + elif "control" in patterns or "guilt tripping" in patterns: + return 1 + elif sentiment == "supportive" and any(p in patterns for p in ["projection", "dismissiveness"]): + return 4 + return 1 + except Exception as e: + logger.error(f"Error determining risk stage: {e}") + return 1 + +def detect_threat_pattern(text, patterns): + """Detect if a message contains threat patterns""" + # Threat indicators in text + threat_words = [ + "regret", "sorry", "pay", "hurt", "suffer", "destroy", "ruin", + "expose", "tell everyone", "never see", "take away", "lose", + "control", "make sure", "won't let", "force", "warn", "never", + "punish", "teach you", "learn", "show you", "remember", + "if you", "don't forget", "i control", "i'll make sure", # Added these specific phrases + "bank account", "phone", "money", "access" # Added financial control indicators + ] + + # Check for conditional threats (if/then structures) + text_lower = text.lower() + conditional_threat = ( + "if" in text_lower and + any(word in text_lower for word in ["regret", "make sure", "control"]) + ) + + has_threat_words = any(word in text_lower for word in threat_words) + + # Check for threat patterns + threat_patterns = {"control", "gaslighting", "blame shifting", "insults"} + has_threat_patterns = any(p in threat_patterns for p in patterns) + + return has_threat_words or has_threat_patterns or conditional_threat + +def detect_compound_threat(text, patterns): + """Detect compound threats in a single message""" + try: + # Rule A: Single Message Multiple Patterns + high_risk_patterns = {"control", "gaslighting", "blame shifting", "insults"} + high_risk_count = sum(1 for p in patterns if p in high_risk_patterns) + + has_threat = detect_threat_pattern(text, patterns) + + # Special case for control + threats + has_control = "control" in patterns + has_conditional_threat = "if" in text.lower() and any(word in text.lower() + for word in ["regret", "make sure", "control"]) + + # Single message compound threat + if (has_threat and high_risk_count >= 2) or (has_control and has_conditional_threat): + return True, "single_message" + + return False, None + except Exception as e: + logger.error(f"Error in compound threat detection: {e}") + return False, None + +def analyze_message_batch_threats(messages, results): + """Analyze multiple messages for compound threats""" + threat_messages = [] + support_messages = [] + + for i, (msg, (result, _)) in enumerate(zip(messages, results)): + if not msg.strip(): # Skip empty messages + continue + + patterns = result[1] # Get detected patterns + + # Check for threat in this message + if detect_threat_pattern(msg, patterns): + threat_messages.append(i) + + # Check for supporting patterns + if any(p in {"control", "gaslighting", "blame shifting"} for p in patterns): + support_messages.append(i) + + # Rule B: Multi-Message Accumulation + if len(threat_messages) >= 2: + return True, "multiple_threats" + elif len(threat_messages) == 1 and len(support_messages) >= 2: + return True, "threat_with_support" + + return False, None + +@spaces.GPU +def compute_abuse_score(matched_scores, sentiment): + """Compute abuse score from matched patterns and sentiment""" + try: + if not matched_scores: + logger.debug("No matched scores, returning 0") + return 0.0 + + # Calculate weighted score + total_weight = sum(weight for _, _, weight in matched_scores) + if total_weight == 0: + logger.debug("Total weight is 0, returning 0") + return 0.0 + + # Get highest pattern scores + pattern_scores = [(label, score) for label, score, _ in matched_scores] + sorted_scores = sorted(pattern_scores, key=lambda x: x[1], reverse=True) + logger.debug(f"Sorted pattern scores: {sorted_scores}") + + # Base score calculation + weighted_sum = sum(score * weight for _, score, weight in matched_scores) + base_score = (weighted_sum / total_weight) * 100 + logger.debug(f"Initial base score: {base_score:.1f}") + + # Cap maximum score based on pattern severity + max_score = 85.0 # Set maximum possible score + if any(label in {'control', 'gaslighting'} for label, _, _ in matched_scores): + max_score = 90.0 + logger.debug(f"Increased max score to {max_score} due to high severity patterns") + + # Apply diminishing returns for multiple patterns + if len(matched_scores) > 1: + multiplier = 1 + (0.1 * (len(matched_scores) - 1)) + base_score *= multiplier + logger.debug(f"Applied multiplier {multiplier:.2f} for {len(matched_scores)} patterns") + + # Apply sentiment modifier + if sentiment == "supportive": + base_score *= 0.85 + logger.debug("Applied 15% reduction for supportive sentiment") + + final_score = min(round(base_score, 1), max_score) + logger.debug(f"Final abuse score: {final_score}") + return final_score + + except Exception as e: + logger.error(f"Error computing abuse score: {e}") + return 0.0 + +@spaces.GPU +def analyze_single_message(text, thresholds): + """Analyze a single message for abuse patterns""" + logger.debug("\n=== DEBUG START ===") + logger.debug(f"Input text: {text}") + + try: + if not text.strip(): + logger.debug("Empty text, returning zeros") + return 0.0, [], [], {"label": "none"}, 1, 0.0, None + + # Check for explicit abuse + explicit_abuse_words = ['fuck', 'bitch', 'shit', 'ass', 'dick'] + explicit_abuse = any(word in text.lower() for word in explicit_abuse_words) + logger.debug(f"Explicit abuse detected: {explicit_abuse}") + + # Abuse model inference + inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True) + inputs = {k: v.to(device) for k, v in inputs.items()} + + with torch.no_grad(): + outputs = model(**inputs) + raw_scores = torch.sigmoid(outputs.logits.squeeze(0)).cpu().numpy() + + # Log raw model outputs + logger.debug("\nRaw model scores:") + for label, score in zip(LABELS, raw_scores): + logger.debug(f"{label}: {score:.3f}") + + # Get predictions and sort them + predictions = list(zip(LABELS, raw_scores)) + sorted_predictions = sorted(predictions, key=lambda x: x[1], reverse=True) + logger.debug("\nTop 3 predictions:") + for label, score in sorted_predictions[:3]: + logger.debug(f"{label}: {score:.3f}") + + # Apply thresholds + threshold_labels = [] + if explicit_abuse: + threshold_labels.append("insults") + logger.debug("\nForced inclusion of 'insults' due to explicit abuse") + + for label, score in sorted_predictions: + base_threshold = thresholds.get(label, 0.25) + if explicit_abuse: + base_threshold *= 0.5 + if score > base_threshold: + if label not in threshold_labels: + threshold_labels.append(label) + + logger.debug("\nLabels that passed thresholds:", threshold_labels) + + # Calculate matched scores + matched_scores = [] + for label in threshold_labels: + score = raw_scores[LABELS.index(label)] + weight = PATTERN_WEIGHTS.get(label, 1.0) + if explicit_abuse and label == "insults": + weight *= 1.5 + matched_scores.append((label, score, weight)) + + # Get sentiment + sent_inputs = sentiment_tokenizer(text, return_tensors="pt", truncation=True, padding=True) + sent_inputs = {k: v.to(device) for k, v in sent_inputs.items()} + with torch.no_grad(): + sent_logits = sentiment_model(**sent_inputs).logits[0] + sent_probs = torch.softmax(sent_logits, dim=-1).cpu().numpy() + + # Add detailed logging + logger.debug("\nπ SENTIMENT ANALYSIS DETAILS") + logger.debug(f"Raw logits: {sent_logits}") + logger.debug(f"Probabilities: supportive={sent_probs[0]:.3f}, undermining={sent_probs[1]:.3f}") + + # Make sure we're using the correct index mapping + sentiment = SENTIMENT_LABELS[int(np.argmax(sent_probs))] + logger.debug(f"Selected sentiment: {sentiment}") + + enhanced_patterns = detect_enhanced_threats(text, threshold_labels) + for pattern in enhanced_patterns: + if pattern not in threshold_labels: + threshold_labels.append(pattern) + # Add to matched_scores with high confidence + weight = PATTERN_WEIGHTS.get(pattern, 1.0) + matched_scores.append((pattern, 0.85, weight)) + + # Calculate abuse score + abuse_score = compute_abuse_score(matched_scores, sentiment) + if explicit_abuse: + abuse_score = max(abuse_score, 70.0) + + # Check for compound threats + compound_threat_flag, threat_type = detect_compound_threat( + text, threshold_labels + ) + + if compound_threat_flag: + logger.debug(f"β οΈ Compound threat detected in message: {threat_type}") + abuse_score = max(abuse_score, 85.0) # Force high score for compound threats + + # Get DARVO score + darvo_score = predict_darvo_score(text) + + # Get tone using emotion-based approach + tone_tag = get_emotional_tone_tag(text, sentiment, threshold_labels, abuse_score) + + # Log tone usage + log_emotional_tone_usage(tone_tag, threshold_labels) + + # Check for the specific combination + highest_pattern = max(matched_scores, key=lambda x: x[1])[0] if matched_scores else None # Get highest pattern + if sentiment == "supportive" and tone_tag == "neutral" and highest_pattern == "obscure language": + logger.debug("Message classified as likely non-abusive (supportive, neutral, and obscure language). Returning low risk.") + return 0.0, [], [], {"label": "supportive"}, 1, 0.0, "neutral" # Return non-abusive values + + # Set stage + stage = 2 if explicit_abuse or abuse_score > 70 else 1 + + logger.debug("=== DEBUG END ===\n") + + return abuse_score, threshold_labels, matched_scores, {"label": sentiment}, stage, darvo_score, tone_tag + + except Exception as e: + logger.error(f"Error in analyze_single_message: {e}") + return 0.0, [], [], {"label": "error"}, 1, 0.0, None + +def generate_abuse_score_chart(dates, scores, patterns): + """Generate a timeline chart of abuse scores""" + try: + plt.figure(figsize=(10, 6)) + plt.clf() + + # Create new figure + fig, ax = plt.subplots(figsize=(10, 6)) + + # Plot points and lines + x = range(len(scores)) + plt.plot(x, scores, 'bo-', linewidth=2, markersize=8) + + # Add labels for each point with highest scoring pattern + for i, (score, pattern) in enumerate(zip(scores, patterns)): + # Get the pattern and its score + plt.annotate( + f'{pattern}\n{score:.0f}%', + (i, score), + textcoords="offset points", + xytext=(0, 10), + ha='center', + bbox=dict( + boxstyle='round,pad=0.5', + fc='white', + ec='gray', + alpha=0.8 + ) + ) + + # Customize the plot + plt.ylim(-5, 105) + plt.grid(True, linestyle='--', alpha=0.7) + plt.title('Abuse Pattern Timeline', pad=20, fontsize=12) + plt.ylabel('Abuse Score %') + + # X-axis labels + plt.xticks(x, dates, rotation=45) + + # Risk level bands with better colors + plt.axhspan(0, 50, color='#90EE90', alpha=0.2) # light green - Low Risk + plt.axhspan(50, 70, color='#FFD700', alpha=0.2) # gold - Moderate Risk + plt.axhspan(70, 85, color='#FFA500', alpha=0.2) # orange - High Risk + plt.axhspan(85, 100, color='#FF6B6B', alpha=0.2) # light red - Critical Risk + + # Add risk level labels + plt.text(-0.2, 25, 'Low Risk', rotation=90, va='center') + plt.text(-0.2, 60, 'Moderate Risk', rotation=90, va='center') + plt.text(-0.2, 77.5, 'High Risk', rotation=90, va='center') + plt.text(-0.2, 92.5, 'Critical Risk', rotation=90, va='center') + + # Adjust layout + plt.tight_layout() + + # Convert plot to image + buf = io.BytesIO() + plt.savefig(buf, format='png', bbox_inches='tight') + buf.seek(0) + plt.close('all') # Close all figures to prevent memory leaks + + return Image.open(buf) + except Exception as e: + logger.error(f"Error generating abuse score chart: {e}") + return None + +def analyze_composite(msg1, msg2, msg3, *answers_and_none): + """Analyze multiple messages and checklist responses""" + logger.debug("\nπ STARTING NEW ANALYSIS") + logger.debug("=" * 50) + + # Define severity categories at the start + high = {'control'} + moderate = {'gaslighting', 'dismissiveness', 'obscure language', 'insults', + 'contradictory statements', 'guilt tripping'} + low = {'blame shifting', 'projection', 'recovery'} + + try: + # Process checklist responses + logger.debug("\nπ CHECKLIST PROCESSING") + logger.debug("=" * 50) + none_selected_checked = answers_and_none[-1] + responses_checked = any(answers_and_none[:-1]) + none_selected = not responses_checked and none_selected_checked + + logger.debug("Checklist Status:") + logger.debug(f" β’ None Selected Box: {'β' if none_selected_checked else 'β'}") + logger.debug(f" β’ Has Responses: {'β' if responses_checked else 'β'}") + logger.debug(f" β’ Final Status: {'None Selected' if none_selected else 'Has Selections'}") + + if none_selected: + escalation_score = 0 + escalation_note = "Checklist completed: no danger items reported." + escalation_completed = True + logger.debug("\nβ Checklist: No items selected") + elif responses_checked: + escalation_score = sum(w for (_, w), a in zip(ESCALATION_QUESTIONS, answers_and_none[:-1]) if a) + escalation_note = "Checklist completed." + escalation_completed = True + logger.debug(f"\nπ Checklist Score: {escalation_score}") + + # Log checked items + logger.debug("\nβ οΈ Selected Risk Factors:") + for (q, w), a in zip(ESCALATION_QUESTIONS, answers_and_none[:-1]): + if a: + logger.debug(f" β’ [{w} points] {q}") + else: + escalation_score = None + escalation_note = "Checklist not completed." + escalation_completed = False + logger.debug("\nβ Checklist: Not completed") + + # Process messages + logger.debug("\nπ MESSAGE PROCESSING") + logger.debug("=" * 50) + messages = [msg1, msg2, msg3] + active = [(m, f"Message {i+1}") for i, m in enumerate(messages) if m.strip()] + logger.debug(f"Active Messages: {len(active)} of 3") + + if not active: + logger.debug("β Error: No messages provided") + return "Please enter at least one message.", None + + # Detect threats + logger.debug("\nπ¨ THREAT DETECTION") + logger.debug("=" * 50) + + def normalize(text): + import unicodedata + text = text.lower().strip() + text = unicodedata.normalize("NFKD", text) + text = text.replace("'", "'") + return re.sub(r"[^a-z0-9 ]", "", text) + + def detect_threat_motifs(message, motif_list): + norm_msg = normalize(message) + return [motif for motif in motif_list if normalize(motif) in norm_msg] + + # Analyze threats and patterns + immediate_threats = [detect_threat_motifs(m, THREAT_MOTIFS) for m, _ in active] + flat_threats = [t for sublist in immediate_threats for t in sublist] + threat_risk = "Yes" if flat_threats else "No" + + # Analyze each message + logger.debug("\nπ INDIVIDUAL MESSAGE ANALYSIS") + logger.debug("=" * 50) + results = [] + for m, d in active: + logger.debug(f"\nπ ANALYZING {d}") + logger.debug("-" * 40) # Separator for each message + result = analyze_single_message(m, THRESHOLDS.copy()) + + # Check for non-abusive classification and skip further analysis + if result[0] == 0.0 and result[1] == [] and result[3] == {"label": "supportive"} and result[4] == 1 and result[5] == 0.0 and result[6] == "neutral": + logger.debug(f"β {d} classified as non-abusive, skipping further analysis.") + continue # Skip to the next message + + results.append((result, d)) + + # Unpack results for cleaner logging + abuse_score, patterns, matched_scores, sentiment, stage, darvo_score, tone = result + + # Log core metrics + logger.debug("\nπ CORE METRICS") + logger.debug(f" β’ Abuse Score: {abuse_score:.1f}%") + logger.debug(f" β’ DARVO Score: {darvo_score:.3f}") + logger.debug(f" β’ Risk Stage: {stage}") + logger.debug(f" β’ Sentiment: {sentiment['label']}") + logger.debug(f" β’ Tone: {tone}") + + # Log detected patterns with scores + if patterns: + logger.debug("\nπ― DETECTED PATTERNS") + for label, score, weight in matched_scores: + severity = "βHIGH" if label in high else "β οΈ MODERATE" if label in moderate else "π LOW" + logger.debug(f" β’ {severity} | {label}: {score:.3f} (weight: {weight})") + else: + logger.debug("\nβ No abuse patterns detected") + + # Extract scores and metadata + abuse_scores = [r[0][0] for r in results] + stages = [r[0][4] for r in results] + darvo_scores = [r[0][5] for r in results] + tone_tags = [r[0][6] for r in results] + dates_used = [r[1] for r in results] + + # Pattern Analysis Summary + logger.debug("\nπ PATTERN ANALYSIS SUMMARY") + logger.debug("=" * 50) + predicted_labels = [label for r in results for label in r[0][1]] + + if predicted_labels: + logger.debug("Detected Patterns Across All Messages:") + pattern_counts = Counter(predicted_labels) + + # Log high severity patterns first + high_patterns = [p for p in pattern_counts if p in high] + if high_patterns: + logger.debug("\nβ HIGH SEVERITY PATTERNS:") + for p in high_patterns: + logger.debug(f" β’ {p} (Γ{pattern_counts[p]})") + + # Then moderate + moderate_patterns = [p for p in pattern_counts if p in moderate] + if moderate_patterns: + logger.debug("\nβ οΈ MODERATE SEVERITY PATTERNS:") + for p in moderate_patterns: + logger.debug(f" β’ {p} (Γ{pattern_counts[p]})") + + # Then low + low_patterns = [p for p in pattern_counts if p in low] + if low_patterns: + logger.debug("\nπ LOW SEVERITY PATTERNS:") + for p in low_patterns: + logger.debug(f" β’ {p} (Γ{pattern_counts[p]})") + else: + logger.debug("β No patterns detected across messages") + + # Pattern Severity Analysis + logger.debug("\nβοΈ SEVERITY ANALYSIS") + logger.debug("=" * 50) + counts = {'high': 0, 'moderate': 0, 'low': 0} + for label in predicted_labels: + if label in high: + counts['high'] += 1 + elif label in moderate: + counts['moderate'] += 1 + elif label in low: + counts['low'] += 1 + + logger.debug("Pattern Distribution:") + if counts['high'] > 0: + logger.debug(f" β High Severity: {counts['high']}") + if counts['moderate'] > 0: + logger.debug(f" β οΈ Moderate Severity: {counts['moderate']}") + if counts['low'] > 0: + logger.debug(f" π Low Severity: {counts['low']}") + + total_patterns = sum(counts.values()) + if total_patterns > 0: + logger.debug(f"\nSeverity Percentages:") + logger.debug(f" β’ High: {(counts['high']/total_patterns)*100:.1f}%") + logger.debug(f" β’ Moderate: {(counts['moderate']/total_patterns)*100:.1f}%") + logger.debug(f" β’ Low: {(counts['low']/total_patterns)*100:.1f}%") + + # Risk Assessment + logger.debug("\nπ― RISK ASSESSMENT") + logger.debug("=" * 50) + if counts['high'] >= 2 and counts['moderate'] >= 2: + pattern_escalation_risk = "Critical" + logger.debug("ββ CRITICAL RISK") + logger.debug(" β’ Multiple high and moderate patterns detected") + logger.debug(f" β’ High patterns: {counts['high']}") + logger.debug(f" β’ Moderate patterns: {counts['moderate']}") + elif (counts['high'] >= 2 and counts['moderate'] >= 1) or \ + (counts['moderate'] >= 3) or \ + (counts['high'] >= 1 and counts['moderate'] >= 2): + pattern_escalation_risk = "High" + logger.debug("β HIGH RISK") + logger.debug(" β’ Significant pattern combination detected") + logger.debug(f" β’ High patterns: {counts['high']}") + logger.debug(f" β’ Moderate patterns: {counts['moderate']}") + elif (counts['moderate'] == 2) or \ + (counts['high'] == 1 and counts['moderate'] == 1) or \ + (counts['moderate'] == 1 and counts['low'] >= 2) or \ + (counts['high'] == 1 and sum(counts.values()) == 1): + pattern_escalation_risk = "Moderate" + logger.debug("β οΈ MODERATE RISK") + logger.debug(" β’ Concerning pattern combination detected") + logger.debug(f" β’ Pattern distribution: H:{counts['high']}, M:{counts['moderate']}, L:{counts['low']}") + else: + pattern_escalation_risk = "Low" + logger.debug("π LOW RISK") + logger.debug(" β’ Limited pattern severity detected") + logger.debug(f" β’ Pattern distribution: H:{counts['high']}, M:{counts['moderate']}, L:{counts['low']}") + + # Checklist Risk Assessment + logger.debug("\nπ CHECKLIST RISK ASSESSMENT") + logger.debug("=" * 50) + checklist_escalation_risk = "Unknown" if escalation_score is None else ( + "Critical" if escalation_score >= 20 else + "Moderate" if escalation_score >= 10 else + "Low" + ) + if escalation_score is not None: + logger.debug(f"Score: {escalation_score}/29") + logger.debug(f"Risk Level: {checklist_escalation_risk}") + if escalation_score >= 20: + logger.debug("ββ CRITICAL: Score indicates severe risk") + elif escalation_score >= 10: + logger.debug("β οΈ MODERATE: Score indicates concerning risk") + else: + logger.debug("π LOW: Score indicates limited risk") + else: + logger.debug("β Risk Level: Unknown (checklist not completed)") + + # Escalation Analysis + logger.debug("\nπ ESCALATION ANALYSIS") + logger.debug("=" * 50) + escalation_bump = 0 + for result, msg_id in results: + abuse_score, _, _, sentiment, stage, darvo_score, tone_tag = result + logger.debug(f"\nπ Message {msg_id} Risk Factors:") + + factors = [] + if darvo_score > 0.65: + escalation_bump += 3 + factors.append(f"β² +3: High DARVO score ({darvo_score:.3f})") + if tone_tag in ["forced accountability flip", "emotional threat"]: + escalation_bump += 2 + factors.append(f"β² +2: Concerning tone ({tone_tag})") + if abuse_score > 80: + escalation_bump += 2 + factors.append(f"β² +2: High abuse score ({abuse_score:.1f}%)") + if stage == 2: + escalation_bump += 3 + factors.append("β² +3: Escalation stage") + + if factors: + for factor in factors: + logger.debug(f" {factor}") + else: + logger.debug(" β No escalation factors") + + logger.debug(f"\nπ Total Escalation Bump: +{escalation_bump}") + + # Check for compound threats across messages + compound_threat_flag, threat_type = analyze_message_batch_threats( + [msg1, msg2, msg3], results + ) + + if compound_threat_flag: + logger.debug(f"β οΈ Compound threat detected across messages: {threat_type}") + pattern_escalation_risk = "Critical" # Override risk level + logger.debug("Risk level elevated to CRITICAL due to compound threats") + + # Combined Risk Calculation + logger.debug("\nπ― FINAL RISK CALCULATION") + logger.debug("=" * 50) + def rank(label): + return {"Low": 0, "Moderate": 1, "High": 2, "Critical": 3, "Unknown": 0}.get(label, 0) + + combined_score = rank(pattern_escalation_risk) + rank(checklist_escalation_risk) + escalation_bump + logger.debug("Risk Components:") + logger.debug(f" β’ Pattern Risk ({pattern_escalation_risk}): +{rank(pattern_escalation_risk)}") + logger.debug(f" β’ Checklist Risk ({checklist_escalation_risk}): +{rank(checklist_escalation_risk)}") + logger.debug(f" β’ Escalation Bump: +{escalation_bump}") + logger.debug(f" = Combined Score: {combined_score}") + + escalation_risk = ( + "Critical" if combined_score >= 6 else + "High" if combined_score >= 4 else + "Moderate" if combined_score >= 2 else + "Low" + ) + logger.debug(f"\nβ οΈ Final Escalation Risk: {escalation_risk}") + + # Generate Output Text + logger.debug("\nπ GENERATING OUTPUT") + logger.debug("=" * 50) + if escalation_score is None: + escalation_text = ( + "π« **Escalation Potential: Unknown** (Checklist not completed)\n" + "β οΈ This section was not completed. Escalation potential is estimated using message data only.\n" + ) + hybrid_score = 0 + logger.debug("Generated output for incomplete checklist") + elif escalation_score == 0: + escalation_text = ( + "β **Escalation Checklist Completed:** No danger items reported.\n" + "π§ **Escalation potential estimated from detected message patterns only.**\n" + f"β’ Pattern Risk: {pattern_escalation_risk}\n" + f"β’ Checklist Risk: None reported\n" + f"β’ Escalation Bump: +{escalation_bump} (from DARVO, tone, intensity, etc.)" + ) + hybrid_score = escalation_bump + logger.debug("Generated output for no-risk checklist") + else: + hybrid_score = escalation_score + escalation_bump + escalation_text = ( + f"π **Escalation Potential: {escalation_risk} ({hybrid_score}/29)**\n" + "π This score combines your safety checklist answers *and* detected high-risk behavior.\n" + f"β’ Pattern Risk: {pattern_escalation_risk}\n" + f"β’ Checklist Risk: {checklist_escalation_risk}\n" + f"β’ Escalation Bump: +{escalation_bump} (from DARVO, tone, intensity, etc.)" + ) + logger.debug(f"Generated output with hybrid score: {hybrid_score}/29") + + # Final Metrics + logger.debug("\nπ FINAL METRICS") + logger.debug("=" * 50) + composite_abuse = int(round(sum(abuse_scores) / len(abuse_scores))) + logger.debug(f"Composite Abuse Score: {composite_abuse}%") + + most_common_stage = max(set(stages), key=stages.count) + logger.debug(f"Most Common Stage: {most_common_stage}") + + avg_darvo = round(sum(darvo_scores) / len(darvo_scores), 3) + logger.debug(f"Average DARVO Score: {avg_darvo}") + + final_risk_level = calculate_enhanced_risk_level( + composite_abuse, + predicted_labels, + escalation_risk, + avg_darvo + ) + + # Override escalation_risk with the enhanced version + escalation_risk = final_risk_level + + # Generate Final Report + logger.debug("\nπ GENERATING FINAL REPORT") + logger.debug("=" * 50) + out = f"Abuse Intensity: {composite_abuse}%\n" + + # Add detected patterns to output + if predicted_labels: + out += "π Detected Patterns:\n" + if high_patterns: + patterns_str = ", ".join(f"{p} ({pattern_counts[p]}x)" for p in high_patterns) + out += f"β High Severity: {patterns_str}\n" + if moderate_patterns: + patterns_str = ", ".join(f"{p} ({pattern_counts[p]}x)" for p in moderate_patterns) + out += f"β οΈ Moderate Severity: {patterns_str}\n" + if low_patterns: + patterns_str = ", ".join(f"{p} ({pattern_counts[p]}x)" for p in low_patterns) + out += f"π Low Severity: {patterns_str}\n" + out += "\n" + + out += "π This reflects the strength and severity of detected abuse patterns in the message(s).\n\n" + + # Risk Level Assessment + risk_level = final_risk_level + logger.debug(f"Final Risk Level: {risk_level}") + + # Add Risk Description + risk_descriptions = { + "Critical": ( + "π¨ **Risk Level: Critical**\n" + "Multiple severe abuse patterns detected. This situation shows signs of " + "dangerous escalation and immediate intervention may be needed." + ), + "High": ( + "β οΈ **Risk Level: High**\n" + "Strong abuse patterns detected. This situation shows concerning " + "signs of manipulation and control." + ), + "Moderate": ( + "β‘ **Risk Level: Moderate**\n" + "Concerning patterns detected. While not severe, these behaviors " + "indicate unhealthy relationship dynamics." + ), + "Low": ( + "π **Risk Level: Low**\n" + "Minor concerning patterns detected. While present, the detected " + "behaviors are subtle or infrequent." + ) + } + + out += risk_descriptions[risk_level] + out += f"\n\n{RISK_STAGE_LABELS[most_common_stage]}" + logger.debug("Added risk description and stage information") + + # Add DARVO Analysis + if avg_darvo > 0.25: + level = "moderate" if avg_darvo < 0.65 else "high" + out += f"\n\nπ **DARVO Score: {avg_darvo}** β This indicates a **{level} likelihood** of narrative reversal (DARVO), where the speaker may be denying, attacking, or reversing blame." + logger.debug(f"Added DARVO analysis ({level} level)") + + # Add Emotional Tones + logger.debug("\nπ Adding Emotional Tones") + out += "\n\nπ **Emotional Tones Detected:**\n" + for i, tone in enumerate(tone_tags): + out += f"β’ Message {i+1}: *{tone or 'none'}*\n" + logger.debug(f"Message {i+1} tone: {tone}") + + # Add Threats Section + logger.debug("\nβ οΈ Adding Threat Analysis") + if flat_threats: + out += "\n\nπ¨ **Immediate Danger Threats Detected:**\n" + for t in set(flat_threats): + out += f"β’ \"{t}\"\n" + out += "\nβ οΈ These phrases may indicate an imminent risk to physical safety." + logger.debug(f"Added {len(set(flat_threats))} unique threat warnings") + else: + out += "\n\nπ§© **Immediate Danger Threats:** None explicitly detected.\n" + out += "This does *not* rule out risk, but no direct threat phrases were matched." + logger.debug("No threats to add") + + # Generate Timeline + logger.debug("\nπ Generating Timeline") + pattern_labels = [] + for result, _ in results: + matched_scores = result[2] # Get the matched_scores from the result tuple + if matched_scores: + # Sort matched_scores by score and get the highest scoring pattern + highest_pattern = max(matched_scores, key=lambda x: x[1]) + pattern_labels.append(highest_pattern[0]) # Add the pattern name + else: + pattern_labels.append("none") + + logger.debug("Pattern labels for timeline:") + for i, (pattern, score) in enumerate(zip(pattern_labels, abuse_scores)): + logger.debug(f"Message {i+1}: {pattern} ({score:.1f}%)") + + timeline_image = generate_abuse_score_chart(dates_used, abuse_scores, pattern_labels) + logger.debug("Timeline generated successfully") + + # Add Escalation Text + out += "\n\n" + escalation_text + logger.debug("Added escalation text to output") + + logger.debug("\nβ ANALYSIS COMPLETE") + logger.debug("=" * 50) + + # SAFETY PLANNING CHECK + # Check if safety planning should be offered + show_safety = should_show_safety_planning( + composite_abuse, + escalation_risk, + predicted_labels + ) + + safety_plan = "" + if show_safety: + # Generate safety plan + safety_plan = generate_simple_safety_plan( + composite_abuse, + escalation_risk, + predicted_labels + ) + + # Add notice to main results + out += "\n\n" + "π‘οΈ " + "="*48 + out += "\n**SAFETY PLANNING AVAILABLE**" + out += "\n" + "="*50 + out += "\n\nBased on your analysis results, we've generated a safety plan." + out += "\nCheck the 'Safety Plan' output below for personalized guidance." + + return out, timeline_image, safety_plan + + except Exception as e: + logger.error("\nβ ERROR IN ANALYSIS") + logger.error("=" * 50) + logger.error(f"Error type: {type(e).__name__}") + logger.error(f"Error message: {str(e)}") + logger.error(f"Traceback:\n{traceback.format_exc()}") + return "An error occurred during analysis.", None, "" + +def format_results_for_new_ui(analysis_output, timeline_image, safety_plan): + """ + Convert your existing analysis output into the format needed for the new UI + """ + try: + # Parse your existing text output to extract structured data + lines = analysis_output.split('\n') + + # Extract abuse intensity + abuse_intensity = 0 + for line in lines: + if line.startswith('Abuse Intensity:'): + abuse_intensity = int(re.findall(r'\d+', line)[0]) + break + + # Extract DARVO score + darvo_score = 0.0 + for line in lines: + if 'DARVO Score:' in line: + # Extract number from line like "π **DARVO Score: 0.456**" + darvo_match = re.search(r'DARVO Score: ([\d.]+)', line) + if darvo_match: + darvo_score = float(darvo_match.group(1)) + break + + # Extract emotional tones + emotional_tones = [] + in_tones_section = False + for line in lines: + if 'π **Emotional Tones Detected:**' in line: + in_tones_section = True + continue + elif in_tones_section and line.strip(): + if line.startswith('β’ Message'): + # Extract tone from line like "β’ Message 1: *menacing calm*" + tone_match = re.search(r'\*([^*]+)\*', line) + if tone_match: + tone = tone_match.group(1) + emotional_tones.append(tone if tone != 'none' else 'neutral') + else: + emotional_tones.append('neutral') + elif not line.startswith('β’') and line.strip(): + break + + # Determine risk level based on your existing logic + if abuse_intensity >= 85: + risk_level = 'critical' + elif abuse_intensity >= 70: + risk_level = 'high' + elif abuse_intensity >= 50: + risk_level = 'moderate' + else: + risk_level = 'low' + + # FIXED: Extract detected patterns properly + patterns = [] + in_patterns_section = False + + # Define valid pattern names to filter against + valid_patterns = { + "recovery phase", "control", "gaslighting", "guilt tripping", "dismissiveness", + "blame shifting", "nonabusive", "projection", "insults", + "contradictory statements", "obscure language", + "veiled threats", "stalking language", "false concern", + "false equivalence", "future faking" + } + + for line in lines: + if 'π Detected Patterns:' in line: + in_patterns_section = True + continue + elif in_patterns_section and line.strip(): + if line.startswith('β'): + severity = 'high' + elif line.startswith('β οΈ'): + severity = 'moderate' + elif line.startswith('π'): + severity = 'low' + else: + continue + + # Extract pattern text after the severity indicator + if ':' in line: + pattern_text = line.split(':', 1)[1].strip() + else: + pattern_text = line[2:].strip() # Remove emoji and space + + # Parse individual patterns from the text + # Handle format like "blame shifting (1x), projection (2x)" + pattern_parts = pattern_text.split(',') + + for part in pattern_parts: + # Clean up the pattern name + pattern_name = part.strip() + + # Remove count indicators like "(1x)", "(2x)", etc. + pattern_name = re.sub(r'\s*\(\d+x?\)', '', pattern_name) + + # Remove any remaining special characters and clean + pattern_name = pattern_name.strip().lower() + + # Only add if it's a valid pattern name + if pattern_name in valid_patterns: + patterns.append({ + 'name': pattern_name.replace('_', ' ').title(), + 'severity': severity, + 'description': get_pattern_description(pattern_name) + }) + elif line.strip() and not line.startswith(('β', 'β οΈ', 'π')) and in_patterns_section: + # Exit patterns section when we hit a non-pattern line + break + + # Generate personalized recommendations + recommendations = generate_personalized_recommendations(abuse_intensity, patterns, safety_plan) + + return { + 'riskLevel': risk_level, + 'riskScore': abuse_intensity, + 'primaryConcerns': patterns[:3], # Top 3 most important + 'allPatterns': patterns, + 'riskStage': extract_risk_stage(analysis_output), + 'emotionalTones': emotional_tones, + 'darvoScore': darvo_score, + 'personalizedRecommendations': recommendations, + 'hasSafetyPlan': bool(safety_plan), + 'safetyPlan': safety_plan, + 'rawAnalysis': analysis_output + } + + except Exception as e: + logger.error(f"Error formatting results: {e}") + return { + 'riskLevel': 'low', + 'riskScore': 0, + 'primaryConcerns': [], + 'allPatterns': [], + 'riskStage': 'unknown', + 'emotionalTones': [], + 'darvoScore': 0.0, + 'personalizedRecommendations': ['Consider speaking with a counselor about your relationship concerns'], + 'hasSafetyPlan': False, + 'safetyPlan': '', + 'rawAnalysis': analysis_output + } + +def get_pattern_description(pattern_name): + """Get human-readable descriptions for patterns""" + descriptions = { + 'control': 'Attempts to manage your behavior, decisions, or daily activities', + 'gaslighting': 'Making you question your memory, perception, or reality', + 'dismissiveness': 'Minimizing or invalidating your feelings and experiences', + 'guilt tripping': 'Making you feel guilty to influence your behavior', + 'blame shifting': 'Placing responsibility for their actions onto you', + 'projection': 'Accusing you of behaviors they themselves exhibit', + 'insults': 'Name-calling or personal attacks intended to hurt', + 'contradictory statements': 'Saying things that conflict with previous statements', + 'obscure language': 'Using vague or confusing language to avoid accountability', + 'veiled threats': 'Indirect threats or intimidating language', + 'stalking language': 'Monitoring, tracking, or obsessive behaviors', + 'false concern': 'Expressing fake worry to manipulate or control', + 'false equivalence': 'Comparing incomparable situations to justify behavior', + 'future faking': 'Making promises about future behavior that are unlikely to be kept' + } + return descriptions.get(pattern_name.lower(), 'Concerning communication pattern detected') + +def generate_personalized_recommendations(abuse_score, patterns, safety_plan): + """Generate recommendations based on specific findings""" + recommendations = [] + + # Base recommendations + if abuse_score >= 70: + recommendations.extend([ + 'Document these conversations with dates and times', + 'Reach out to a trusted friend or family member about your concerns', + 'Consider contacting the National Domestic Violence Hotline for guidance' + ]) + elif abuse_score >= 40: + recommendations.extend([ + 'Keep a private journal of concerning interactions', + 'Talk to someone you trust about these communication patterns', + 'Consider counseling to explore healthy relationship dynamics' + ]) + else: + recommendations.extend([ + 'Continue monitoring communication patterns that concern you', + 'Consider discussing communication styles with your partner when you feel safe to do so' + ]) + + # Pattern-specific recommendations + pattern_names = [p['name'].lower() for p in patterns] + + if 'control' in pattern_names: + recommendations.append('Maintain your independence and decision-making autonomy') + + if 'gaslighting' in pattern_names: + recommendations.append('Trust your memory and perceptions - consider keeping notes') + + if any(p in pattern_names for p in ['stalking language', 'veiled threats']): + recommendations.append('Vary your routines and inform trusted people of your whereabouts') + + if safety_plan: + recommendations.append('Review your personalized safety plan regularly') + + return recommendations[:4] # Limit to 4 recommendations + +def extract_risk_stage(analysis_output): + """Extract risk stage from analysis output""" + if 'Tension-Building' in analysis_output: + return 'tension-building' + elif 'Escalation' in analysis_output: + return 'escalation' + elif 'Reconciliation' in analysis_output: + return 'reconciliation' + elif 'Honeymoon' in analysis_output: + return 'honeymoon' + else: + return 'unknown' + +def analyze_composite_with_ui_format(msg1, msg2, msg3, *answers_and_none): + """ + Your existing analysis function, but returns formatted data for the new UI + """ + # Run your existing analysis + analysis_output, timeline_image, safety_plan = analyze_composite(msg1, msg2, msg3, *answers_and_none) + + # Format for new UI + structured_results = format_results_for_new_ui(analysis_output, timeline_image, safety_plan) + + # Return as JSON string for the new UI to parse + return json.dumps(structured_results), timeline_image, safety_plan + + + + + +def create_mobile_friendly_interface(): + """Create a responsive interface that works well on both mobile and desktop with full functionality""" + + css = """ + /* Base responsive layout */ + .gradio-container { + max-width: 100% !important; + padding: 12px !important; + } + + /* Desktop: side-by-side columns */ + @media (min-width: 1024px) { + .desktop-row { + display: flex !important; + gap: 20px !important; + } + + .desktop-col-messages { + flex: 2 !important; + min-width: 400px !important; + } + + .desktop-col-checklist { + flex: 1 !important; + min-width: 300px !important; + } + + .desktop-col-results { + flex: 2 !important; + min-width: 400px !important; + } + + .mobile-only { + display: none !important; + } + + .mobile-expandable-btn { + display: none !important; + } + } + + /* Mobile/Tablet: stack everything */ + @media (max-width: 1023px) { + .gradio-row { + flex-direction: column !important; + } + + .gradio-column { + width: 100% !important; + margin-bottom: 20px !important; + } + + .desktop-only { + display: none !important; + } + + /* Mobile expandable sections */ + .mobile-expandable-content { + display: none; + } + + .mobile-expandable-content.show { + display: block; + } + } + + /* Button styling */ + .gradio-button { + margin-bottom: 8px !important; + } + + @media (max-width: 1023px) { + .gradio-button { + width: 100% !important; + padding: 16px !important; + font-size: 16px !important; + } + + .mobile-expand-btn { + background: #f9fafb !important; + border: 1px solid #e5e7eb !important; + color: #374151 !important; + padding: 12px 16px !important; + margin: 8px 0 !important; + border-radius: 8px !important; + font-weight: 500 !important; + } + + .mobile-expand-btn:hover { + background: #f3f4f6 !important; + } + } + + /* Results styling */ + .risk-low { border-left: 4px solid #10b981; background: #f0fdf4; } + .risk-moderate { border-left: 4px solid #f59e0b; background: #fffbeb; } + .risk-high { border-left: 4px solid #f97316; background: #fff7ed; } + .risk-critical { border-left: 4px solid #ef4444; background: #fef2f2; } + + /* Clean group styling */ + .gradio-group { + border: none !important; + background: none !important; + padding: 0 !important; + margin: 0 !important; + box-shadow: none !important; + } + + /* Force readable text colors */ + .gradio-html * { + color: #1f2937 !important; + } + + .gradio-html p, .gradio-html div, .gradio-html span, .gradio-html li, .gradio-html ul, .gradio-html h1, .gradio-html h2, .gradio-html h3, .gradio-html h4 { + color: #1f2937 !important; + } + + /* Form spacing */ + .gradio-textbox { + margin-bottom: 12px !important; + } + + .gradio-checkbox { + margin-bottom: 6px !important; + font-size: 14px !important; + } + + /* Compact checklist */ + .compact-checklist .gradio-checkbox { + margin-bottom: 4px !important; + } + + /* Specific overrides for safety plan and analysis displays */ + .gradio-html pre { + color: #1f2937 !important; + background: #f9fafb !important; + padding: 12px !important; + border-radius: 8px !important; + } + """ + + with gr.Blocks(css=css, title="Relationship Pattern Analyzer") as demo: + gr.HTML(""" +
+ Share messages that concern you, and we'll help you understand what patterns might be present. +
++ Your messages are analyzed locally and are not stored or shared. + This tool is for educational purposes and not a substitute for professional counseling. +
++ Enter up to three messages that made you feel uncomfortable, confused, or concerned. + For the most accurate analysis, include messages from recent emotionally intense conversations. +
+ """) + + msg1_desktop = gr.Textbox( + label="Message 1 *", + placeholder="Enter the message here...", + lines=4 + ) + msg2_desktop = gr.Textbox( + label="Message 2 (optional)", + placeholder="Enter the message here...", + lines=4 + ) + msg3_desktop = gr.Textbox( + label="Message 3 (optional)", + placeholder="Enter the message here...", + lines=4 + ) + + # Checklist column + with gr.Column(elem_classes=["desktop-col-checklist"], scale=3, min_width=300): + gr.HTML("+ Optional but recommended. Check any that apply to your situation: +
+ """) + + checklist_items_desktop = [] + with gr.Column(elem_classes=["compact-checklist"]): + for question, weight in ESCALATION_QUESTIONS: + checklist_items_desktop.append(gr.Checkbox(label=question, elem_classes=["compact-checkbox"])) + + none_selected_desktop = gr.Checkbox( + label="None of the above apply to my situation", + elem_classes=["none-checkbox"] + ) + + analyze_btn_desktop = gr.Button( + "Analyze Messages", + variant="primary", + size="lg" + ) + + # Results column + with gr.Column(elem_classes=["desktop-col-results"], scale=5, min_width=400): + gr.HTML("+ Results will appear here after analysis... +
+ """) + + # Desktop results components + results_json_desktop = gr.JSON(visible=False) + risk_summary_desktop = gr.HTML(visible=False) + concerns_display_desktop = gr.HTML(visible=False) + additional_metrics_desktop = gr.HTML(visible=False) + recommendations_display_desktop = gr.HTML(visible=False) + + with gr.Row(visible=False) as action_buttons_desktop: + safety_plan_btn_desktop = gr.Button("π‘οΈ Get Safety Plan", variant="secondary") + full_analysis_btn_desktop = gr.Button("π Show Full Analysis", variant="secondary") + download_btn_desktop = gr.Button("π Download Report", variant="secondary") + + full_analysis_display_desktop = gr.HTML(visible=False) + timeline_chart_desktop = gr.Image(visible=False, label="Pattern Timeline") + download_file_desktop = gr.File(label="Download Report", visible=False) + + # Mobile layout + with gr.Column(elem_classes=["mobile-only"]): + # Message input - always visible + gr.HTML("+ Enter messages that made you uncomfortable or concerned: +
+ """) + + msg1_mobile = gr.Textbox( + label="Message 1 (required)", + placeholder="Enter the concerning message here...", + lines=3 + ) + + # Button to show additional messages + show_more_msgs_btn = gr.Button( + "β Add More Messages (Optional)", + elem_classes=["mobile-expand-btn", "mobile-expandable-btn"], + variant="secondary" + ) + + # Additional messages (hidden by default) + with gr.Column(visible=False) as additional_messages_mobile: + msg2_mobile = gr.Textbox( + label="Message 2 (optional)", + placeholder="Enter another message...", + lines=3 + ) + msg3_mobile = gr.Textbox( + label="Message 3 (optional)", + placeholder="Enter a third message...", + lines=3 + ) + + # Button to show safety checklist + show_checklist_btn = gr.Button( + "β οΈ Safety Checklist (Optional)", + elem_classes=["mobile-expand-btn", "mobile-expandable-btn"], + variant="secondary" + ) + + # Safety checklist (hidden by default) + with gr.Column(visible=False) as safety_checklist_mobile: + gr.HTML(""" ++ Check any that apply to improve analysis accuracy: +
+ """) + + checklist_items_mobile = [] + for question, weight in ESCALATION_QUESTIONS: + checklist_items_mobile.append(gr.Checkbox(label=question, elem_classes=["compact-checkbox"])) + + none_selected_mobile = gr.Checkbox( + label="None of the above apply", + elem_classes=["none-checkbox"] + ) + + # Analysis button + analyze_btn_mobile = gr.Button( + "π Analyze Messages", + variant="primary", + size="lg" + ) + + # Mobile results components + results_json_mobile = gr.JSON(visible=False) + risk_summary_mobile = gr.HTML(visible=False) + concerns_display_mobile = gr.HTML(visible=False) + additional_metrics_mobile = gr.HTML(visible=False) + recommendations_display_mobile = gr.HTML(visible=False) + + with gr.Row(visible=False) as action_buttons_mobile: + safety_plan_btn_mobile = gr.Button("π‘οΈ Safety Plan", variant="secondary") + full_analysis_btn_mobile = gr.Button("π Full Analysis", variant="secondary") + download_btn_mobile = gr.Button("π Download", variant="secondary") + + full_analysis_display_mobile = gr.HTML(visible=False) + timeline_chart_mobile = gr.Image(visible=False, label="Pattern Timeline") + download_file_mobile = gr.File(label="Download Report", visible=False) + + with gr.Tab("Safety Resources"): + gr.HTML(""" ++ If you're concerned about your safety, here are immediate resources and steps you can take. +
+911 - For immediate danger
+1-800-799-7233 - National DV Hotline (24/7)
+Text START to 88788 - Crisis Text Line
+988 - National Suicide Prevention Lifeline
+thehotline.org - Online chat support
+Local counseling services - Professional support
+Trusted friends/family - Personal support network
+Legal advocacy - Know your rights
+Based on the messages you shared
++ Risk Score: {results['riskScore']}% +
+{concern.get('description', 'No description available')}
+No specific concerns identified in the messages.
" + + # Additional Metrics Section + metrics_html = "+ DARVO (Deny, Attack, Reverse Victim & Offender) indicates potential narrative manipulation where the speaker may be deflecting responsibility. +
+β’ Message {i+1}: {tone}
+ """ + metrics_html += """ ++ Emotional tone analysis helps identify underlying manipulation tactics or concerning emotional patterns. +
+β’ {rec}
+No analysis data available. Please run the analysis first.
", visible=True) + + # Handle both JSON string and dict inputs + if isinstance(results_json_str, str): + results = json.loads(results_json_str) + elif isinstance(results_json_str, dict): + results = results_json_str + else: + return gr.update(value="Invalid data format. Please run the analysis again.
", visible=True) + + # Create comprehensive full analysis display + full_html = f""" +Risk Level: {results.get('riskLevel', 'Unknown').title()}
+Risk Score: {results.get('riskScore', 'N/A')}%
+Risk Stage: {results.get('riskStage', 'Unknown').replace('-', ' ').title()}
+DARVO Score: {results.get('darvoScore', 0):.3f}
+Emotional Tones: {', '.join(results.get('emotionalTones', ['None detected']))}
+{severity_badge} {pattern.get('name', 'Unknown')}
+{pattern.get('description', 'No description available')}
+No specific patterns detected.
" + + full_html += """ +Unable to parse analysis results: {str(e)}
+Please try running the analysis again.
+