import gradio as gr import spaces import torch import numpy as np from transformers import AutoModelForSequenceClassification, AutoTokenizer, pipeline as hf_pipeline import re import matplotlib.pyplot as plt import io from PIL import Image from datetime import datetime from torch.nn.functional import sigmoid from collections import Counter import logging import traceback import json # Set up logging logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) # Device configuration device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') logger.info(f"Using device: {device}") # Set up custom logging class CustomFormatter(logging.Formatter): """Custom formatter with colors and better formatting""" grey = "\x1b[38;21m" blue = "\x1b[38;5;39m" yellow = "\x1b[38;5;226m" red = "\x1b[38;5;196m" bold_red = "\x1b[31;1m" reset = "\x1b[0m" def format(self, record): # Remove the logger name from the output if record.levelno == logging.DEBUG: return f"{self.blue}{record.getMessage()}{self.reset}" elif record.levelno == logging.INFO: return f"{self.grey}{record.getMessage()}{self.reset}" elif record.levelno == logging.WARNING: return f"{self.yellow}{record.getMessage()}{self.reset}" elif record.levelno == logging.ERROR: return f"{self.red}{record.getMessage()}{self.reset}" elif record.levelno == logging.CRITICAL: return f"{self.bold_red}{record.getMessage()}{self.reset}" return record.getMessage() # Setup logger logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) # Remove any existing handlers logger.handlers = [] # Create console handler with custom formatter ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) ch.setFormatter(CustomFormatter()) logger.addHandler(ch) # Model initialization model_name = "SamanthaStorm/tether-multilabel-v6" model = AutoModelForSequenceClassification.from_pretrained(model_name).to(device) tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False) # sentiment model sentiment_model = AutoModelForSequenceClassification.from_pretrained("SamanthaStorm/tether-sentiment-v3").to(device) sentiment_tokenizer = AutoTokenizer.from_pretrained("SamanthaStorm/tether-sentiment-v3", use_fast=False) sentiment_model.eval() emotion_pipeline = hf_pipeline( "text-classification", model="j-hartmann/emotion-english-distilroberta-base", return_all_scores=True, # Get all emotion scores top_k=None, # Don't limit to top k predictions truncation=True, device=0 if torch.cuda.is_available() else -1 ) # DARVO model darvo_model = AutoModelForSequenceClassification.from_pretrained("SamanthaStorm/tether-darvo-regressor-v1").to(device) darvo_tokenizer = AutoTokenizer.from_pretrained("SamanthaStorm/tether-darvo-regressor-v1", use_fast=False) darvo_model.eval() boundary_model = AutoModelForSequenceClassification.from_pretrained("SamanthaStorm/healthy-boundary-predictor").to(device) boundary_tokenizer = AutoTokenizer.from_pretrained("SamanthaStorm/healthy-boundary-predictor", use_fast=False) boundary_model.eval() @spaces.GPU def predict_boundary_health(text): """Predict boundary health for given text - returns 1 for healthy, 0 for unhealthy""" try: inputs = boundary_tokenizer(text, return_tensors="pt", truncation=True, padding=True) inputs = {k: v.to(device) for k, v in inputs.items()} with torch.no_grad(): outputs = boundary_model(**inputs) predictions = torch.nn.functional.softmax(outputs.logits, dim=-1) # Return the actual prediction (0 or 1) predicted_class = torch.argmax(predictions, dim=-1).item() return predicted_class # Returns 1 for healthy, 0 for unhealthy except Exception as e: logger.error(f"Error in boundary prediction: {e}") return 0 # Return unhealthy on error def get_boundary_assessment(text, prediction): """Get boundary assessment based on binary prediction""" if prediction == 1: # healthy return { 'assessment': 'healthy', 'label': 'Healthy Boundary', 'confidence': 1.0, 'description': 'This communication shows healthy boundary setting', 'recommendations': ['Continue this respectful communication approach'] } else: # unhealthy (prediction == 0) return { 'assessment': 'unhealthy', 'label': 'Unhealthy Boundary', 'confidence': 1.0, 'description': 'Communication shows unhealthy boundary patterns', 'recommendations': ['Use "I" statements instead of accusations', 'Focus on respectful communication'] } # Constants and Labels LABELS = [ "recovery phase", "control", "gaslighting", "guilt tripping", "dismissiveness", "blame shifting", "nonabusive", "projection", "insults", "contradictory statements", "obscure language", "veiled threats", "stalking language", "false concern", "false equivalence", "future faking" ] SENTIMENT_LABELS = ["supportive", "undermining"] THRESHOLDS = { "recovery phase": 0.278, "control": 0.287, "gaslighting": 0.144, "guilt tripping": 0.220, "dismissiveness": 0.142, "blame shifting": 0.183, "projection": 0.253, "insults": 0.247, "contradictory statements": 0.200, "obscure language": 0.455, "nonabusive": 0.281, # NEW v6 patterns: "veiled threats": 0.310, "stalking language": 0.339, "false concern": 0.334, "false equivalence": 0.317, "future faking": 0.385 } PATTERN_WEIGHTS = { "recovery phase": 0.7, "control": 1.4, "gaslighting": 1.3, "guilt tripping": 1.2, "dismissiveness": 0.9, "blame shifting": 1.0, "projection": 0.5, "insults": 1.4, "contradictory statements": 1.0, "obscure language": 0.9, "nonabusive": 0.0, # NEW v6 patterns: "veiled threats": 1.6, # High weight - very dangerous "stalking language": 1.8, # Highest weight - extremely dangerous "false concern": 1.1, # Moderate weight - manipulative "false equivalence": 1.3, # Enhances DARVO detection "future faking": 0.8 # Lower weight - manipulation tactic } ESCALATION_QUESTIONS = [ ("Partner has access to firearms or weapons", 4), ("Partner threatened to kill you", 3), ("Partner threatened you with a weapon", 3), ("Partner has ever choked you, even if you considered it consensual at the time", 4), ("Partner injured or threatened your pet(s)", 3), ("Partner has broken your things, punched or kicked walls, or thrown things ", 2), ("Partner forced or coerced you into unwanted sexual acts", 3), ("Partner threatened to take away your children", 2), ("Violence has increased in frequency or severity", 3), ("Partner monitors your calls/GPS/social media", 2) ] RISK_STAGE_LABELS = { 1: "π Risk Stage: Tension-Building\nThis message reflects rising emotional pressure or subtle control attempts.", 2: "π₯ Risk Stage: Escalation\nThis message includes direct or aggressive patterns, suggesting active harm.", 3: "π§οΈ Risk Stage: Reconciliation\nThis message reflects a reset attemptβapologies or emotional repair without accountability.", 4: "πΈ Risk Stage: Calm / Honeymoon\nThis message appears supportive but may follow prior harm, minimizing it." } THREAT_MOTIFS = [ "i'll kill you", "i'm going to hurt you", "you're dead", "you won't survive this", "i'll break your face", "i'll bash your head in", "i'll snap your neck", "i'll come over there and make you shut up", "i'll knock your teeth out", "you're going to bleed", "you want me to hit you?", "i won't hold back next time", "i swear to god i'll beat you", "next time, i won't miss", "i'll make you scream", "i know where you live", "i'm outside", "i'll be waiting", "i saw you with him", "you can't hide from me", "i'm coming to get you", "i'll find you", "i know your schedule", "i watched you leave", "i followed you home", "you'll regret this", "you'll be sorry", "you're going to wish you hadn't", "you brought this on yourself", "don't push me", "you have no idea what i'm capable of", "you better watch yourself", "i don't care what happens to you anymore", "i'll make you suffer", "you'll pay for this", "i'll never let you go", "you're nothing without me", "if you leave me, i'll kill myself", "i'll ruin you", "i'll tell everyone what you did", "i'll make sure everyone knows", "i'm going to destroy your name", "you'll lose everyone", "i'll expose you", "your friends will hate you", "i'll post everything", "you'll be cancelled", "you'll lose everything", "i'll take the house", "i'll drain your account", "you'll never see a dime", "you'll be broke when i'm done", "i'll make sure you lose your job", "i'll take your kids", "i'll make sure you have nothing", "you can't afford to leave me", "don't make me do this", "you know what happens when i'm mad", "you're forcing my hand", "if you just behaved, this wouldn't happen", "this is your fault", "you're making me hurt you", "i warned you", "you should have listened" ] # MOVED TO TOP LEVEL - Fixed tone severity mapping TONE_SEVERITY = { # Highest danger tones "obsessive fixation": 4, "menacing calm": 4, "conditional menace": 4, "surveillance intimacy": 4, # High danger tones "predatory concern": 3, "victim cosplay": 3, "entitled rage": 3, "direct threat": 3, # Moderate danger tones "manipulative hope": 2, "false vulnerability": 2, "calculated coldness": 2, "predictive punishment": 2, # Existing tones (keep current mappings) "emotional threat": 3, "forced accountability flip": 3, "performative regret": 2, "coercive warmth": 2, "cold invalidation": 2, "weaponized sadness": 2, "contradictory gaslight": 2, # Low risk tones "neutral": 0, "genuine vulnerability": 0 } # MOVED TO TOP LEVEL - Helper functions def log_emotional_tone_usage(tone_tag, patterns): """Log tone usage for analytics""" logger.debug(f"π Detected tone tag: {tone_tag} with patterns: {patterns}") # Track dangerous tone combinations dangerous_tones = [ "obsessive fixation", "menacing calm", "predatory concern", "surveillance intimacy", "conditional menace", "victim cosplay" ] if tone_tag in dangerous_tones: logger.warning(f"β οΈ Dangerous emotional tone detected: {tone_tag}") def calculate_tone_risk_boost(tone_tag): """Calculate risk boost based on emotional tone severity""" return TONE_SEVERITY.get(tone_tag, 0) def should_show_safety_planning(abuse_score, escalation_risk, detected_patterns): """Check if we should show safety planning""" if escalation_risk in ["High", "Critical"]: return True if abuse_score >= 70: return True dangerous_patterns = ["stalking language", "veiled threats", "threats"] if any(pattern in detected_patterns for pattern in dangerous_patterns): return True return False def generate_simple_safety_plan(abuse_score, escalation_risk, detected_patterns): """Generate a basic safety plan""" plan = "π‘οΈ **SAFETY PLANNING RECOMMENDED**\n\n" if escalation_risk == "Critical" or abuse_score >= 85: plan += "π¨ **CRITICAL SAFETY SITUATION**\n\n" plan += "**IMMEDIATE ACTIONS:**\n" plan += "β’ Contact domestic violence hotline: **1-800-799-7233** (24/7, free, confidential)\n" plan += "β’ Text START to **88788** for crisis text support\n" plan += "β’ Consider staying with trusted friends/family tonight\n" plan += "β’ Keep phone charged and accessible\n" plan += "β’ Have emergency bag ready (documents, medications, cash)\n" plan += "\n**IF IN IMMEDIATE DANGER: Call 911**\n\n" elif escalation_risk == "High" or abuse_score >= 70: plan += "β οΈ **HIGH RISK SITUATION**\n\n" plan += "**SAFETY STEPS:**\n" plan += "β’ Contact domestic violence hotline for safety planning: **1-800-799-7233**\n" plan += "β’ Identify 3 trusted people you can contact for help\n" plan += "β’ Plan escape routes and transportation options\n" plan += "β’ Document concerning behaviors with dates and details\n" plan += "β’ Research legal protection options\n\n" # Add pattern-specific advice if "stalking language" in detected_patterns: plan += "π **STALKING BEHAVIORS DETECTED:**\n" plan += "β’ Vary your routines and routes\n" plan += "β’ Check devices for tracking software\n" plan += "β’ Keep record of all stalking incidents\n" plan += "β’ Alert neighbors to watch for suspicious activity\n\n" if "veiled threats" in detected_patterns: plan += "β οΈ **THREATENING LANGUAGE IDENTIFIED:**\n" plan += "β’ Take all threats seriously, even indirect ones\n" plan += "β’ Document all threatening communications\n" plan += "β’ Inform trusted people about threat patterns\n" plan += "β’ Avoid being alone in isolated locations\n\n" # Always include crisis resources plan += "π **CRISIS RESOURCES (24/7):**\n" plan += "β’ **National DV Hotline:** 1-800-799-7233\n" plan += "β’ **Crisis Text Line:** Text START to 88788\n" plan += "β’ **Online Chat:** thehotline.org\n" plan += "β’ **Emergency:** Call 911\n\n" plan += "π **Remember:** You are not alone. This is not your fault. You deserve to be safe." return plan def detect_rare_threats(text): rare_threats = ["necktie party", "permanent solution", "final conversation"] if any(threat in text.lower() for threat in rare_threats): return [("veiled threats", 0.90, 1.6)] return [] def detect_enhanced_threats(text, patterns): """Enhanced threat detection for v6 patterns""" text_lower = text.lower() enhanced_threats = [] # Stalking language indicators stalking_phrases = [ "stop at nothing", "will find you", "know where you", "watching you", "following you", "can't hide", "i know your", "saw you with", "you belong to me" ] # Veiled threat indicators veiled_threat_phrases = [ "some people might", "things happen to people who", "be careful", "hope nothing happens", "accidents happen", "necktie party", "permanent solution", "wouldn't want" ] # False concern indicators false_concern_phrases = [ "just worried about", "concerned about your", "someone needs to protect", "for your own good" ] if any(phrase in text_lower for phrase in stalking_phrases): enhanced_threats.append("stalking language") if any(phrase in text_lower for phrase in veiled_threat_phrases): enhanced_threats.append("veiled threats") if any(phrase in text_lower for phrase in false_concern_phrases): enhanced_threats.append("false concern") return enhanced_threats def calculate_enhanced_risk_level(abuse_score, detected_patterns, escalation_risk, darvo_score): """Enhanced risk calculation that properly weights dangerous patterns""" # Start with base risk from escalation system base_risk = escalation_risk # CRITICAL PATTERNS - Auto-elevate to HIGH risk minimum critical_patterns = ["stalking language", "veiled threats"] has_critical = any(pattern in detected_patterns for pattern in critical_patterns) # DANGEROUS COMBINATIONS - Auto-elevate to CRITICAL dangerous_combos = [ ("stalking language", "control"), ("veiled threats", "stalking language"), ("stalking language", "false concern"), ("veiled threats", "control") ] has_dangerous_combo = any( all(pattern in detected_patterns for pattern in combo) for combo in dangerous_combos ) # FORCE RISK ELEVATION for dangerous patterns if has_dangerous_combo: return "Critical" elif has_critical and abuse_score >= 30: # Lower threshold for critical patterns return "High" elif has_critical: return "Moderate" elif abuse_score >= 70: return "High" elif abuse_score >= 50: return "Moderate" else: return base_risk def get_emotion_profile(text): """Get emotion profile from text with all scores""" try: emotions = emotion_pipeline(text) if isinstance(emotions, list) and isinstance(emotions[0], list): # Extract all scores from the first prediction emotion_scores = emotions[0] # Convert to dictionary with lowercase emotion names return {e['label'].lower(): round(e['score'], 3) for e in emotion_scores} return {} except Exception as e: logger.error(f"Error in get_emotion_profile: {e}") return { "sadness": 0.0, "joy": 0.0, "neutral": 0.0, "disgust": 0.0, "anger": 0.0, "fear": 0.0 } # FIXED FUNCTION - Added missing "d" and cleaned up structure def get_emotional_tone_tag(text, sentiment, patterns, abuse_score): """Get emotional tone tag based on emotions and patterns""" emotions = get_emotion_profile(text) sadness = emotions.get("sadness", 0) joy = emotions.get("joy", 0) neutral = emotions.get("neutral", 0) disgust = emotions.get("disgust", 0) anger = emotions.get("anger", 0) fear = emotions.get("fear", 0) text_lower = text.lower() # 1. Direct Threat Detection threat_indicators = [ "if you", "i'll make", "don't forget", "remember", "regret", "i control", "i'll take", "you'll lose", "make sure", "never see", "won't let" ] if ( any(indicator in text_lower for indicator in threat_indicators) and any(p in patterns for p in ["control", "insults"]) and (anger > 0.2 or disgust > 0.2 or abuse_score > 70) ): return "direct threat" # 2. Obsessive Fixation (for stalking language) obsessive_indicators = [ "stop at nothing", "most desired", "forever", "always will", "belong to me", "you're mine", "never let you go", "can't live without" ] if ( any(indicator in text_lower for indicator in obsessive_indicators) and "stalking language" in patterns and (joy > 0.3 or sadness > 0.4 or fear > 0.2) ): return "obsessive fixation" # 3. Menacing Calm (for veiled threats) veiled_threat_indicators = [ "some people might", "accidents happen", "be careful", "wouldn't want", "things happen", "unfortunate" ] if ( any(indicator in text_lower for indicator in veiled_threat_indicators) and "veiled threats" in patterns and neutral > 0.4 and anger < 0.2 ): return "menacing calm" # 4. Predatory Concern (for false concern) concern_indicators = [ "worried about", "concerned about", "for your own good", "someone needs to", "protect you", "take care of you" ] if ( any(indicator in text_lower for indicator in concern_indicators) and "false concern" in patterns and (joy > 0.2 or neutral > 0.3) and sentiment == "undermining" ): return "predatory concern" # 5. Victim Cosplay (for false equivalence/DARVO) victim_indicators = [ "i'm the victim", "you're abusing me", "i'm being hurt", "you're attacking me", "i'm innocent", "you're the problem" ] if ( any(indicator in text_lower for indicator in victim_indicators) and "false equivalence" in patterns and sadness > 0.4 and anger > 0.2 ): return "victim cosplay" # 6. Manipulative Hope (for future faking) future_indicators = [ "i'll change", "we'll be", "i promise", "things will be different", "next time", "from now on", "i'll never", "we'll have" ] if ( any(indicator in text_lower for indicator in future_indicators) and "future faking" in patterns and (joy > 0.3 or sadness > 0.3) ): return "manipulative hope" # 7. Surveillance Intimacy (for stalking with false intimacy) surveillance_indicators = [ "i know you", "i saw you", "i watched", "i've been", "your routine", "where you go", "what you do" ] if ( any(indicator in text_lower for indicator in surveillance_indicators) and "stalking language" in patterns and joy > 0.2 and neutral > 0.2 ): return "surveillance intimacy" # 8. Conditional Menace (for threats with conditions) conditional_indicators = [ "if you", "unless you", "you better", "don't make me", "you wouldn't want", "force me to" ] if ( any(indicator in text_lower for indicator in conditional_indicators) and any(p in patterns for p in ["veiled threats", "control"]) and anger > 0.3 and neutral > 0.2 ): return "conditional menace" # 9. False Vulnerability (manipulation disguised as weakness) vulnerability_indicators = [ "i can't help", "i need you", "without you i", "you're all i have", "i'm lost without", "i don't know what to do" ] if ( any(indicator in text_lower for indicator in vulnerability_indicators) and any(p in patterns for p in ["guilt tripping", "future faking", "false concern"]) and sadness > 0.5 and sentiment == "undermining" ): return "false vulnerability" # 10. Entitled Rage (anger with entitlement) entitlement_indicators = [ "you owe me", "after everything", "how dare you", "you should", "i deserve", "you have no right" ] if ( any(indicator in text_lower for indicator in entitlement_indicators) and anger > 0.5 and any(p in patterns for p in ["control", "insults", "blame shifting"]) ): return "entitled rage" # 11. Calculated Coldness (deliberate emotional detachment) cold_indicators = [ "i don't care", "whatever", "your choice", "suit yourself", "fine by me", "your loss" ] calculated_patterns = ["dismissiveness", "obscure language", "control"] if ( any(indicator in text_lower for indicator in cold_indicators) and any(p in patterns for p in calculated_patterns) and neutral > 0.6 and all(e < 0.2 for e in [anger, sadness, joy]) ): return "calculated coldness" # 12. Predictive Punishment future_consequences = [ "will end up", "you'll be", "you will be", "going to be", "will become", "will find yourself", "will realize", "you'll regret", "you'll see", "will learn", "truly will", "end up alone", "end up miserable" ] dismissive_endings = [ "i'm out", "i'm done", "whatever", "good luck", "your choice", "your problem", "regardless", "keep", "keep on" ] if ( (any(phrase in text_lower for phrase in future_consequences) or any(end in text_lower for end in dismissive_endings)) and any(p in ["dismissiveness", "control"] for p in patterns) and (disgust > 0.2 or neutral > 0.3 or anger > 0.2) ): return "predictive punishment" # 13. Performative Regret if ( sadness > 0.3 and any(p in patterns for p in ["blame shifting", "guilt tripping", "recovery"]) and (sentiment == "undermining" or abuse_score > 40) ): return "performative regret" # 14. Coercive Warmth if ( (joy > 0.2 or sadness > 0.3) and any(p in patterns for p in ["control", "gaslighting"]) and sentiment == "undermining" ): return "coercive warmth" # 15. Cold Invalidation if ( (neutral + disgust) > 0.4 and any(p in patterns for p in ["dismissiveness", "projection", "obscure language"]) and sentiment == "undermining" ): return "cold invalidation" # 16. Genuine Vulnerability if ( (sadness + fear) > 0.4 and sentiment == "supportive" and all(p in ["recovery"] for p in patterns) ): return "genuine vulnerability" # 17. Emotional Threat if ( (anger + disgust) > 0.4 and any(p in patterns for p in ["control", "insults", "dismissiveness"]) and sentiment == "undermining" ): return "emotional threat" # 18. Weaponized Sadness if ( sadness > 0.5 and any(p in patterns for p in ["guilt tripping", "projection"]) and sentiment == "undermining" ): return "weaponized sadness" # 19. Contradictory Gaslight if ( (joy + anger + sadness) > 0.4 and any(p in patterns for p in ["gaslighting", "contradictory statements"]) and sentiment == "undermining" ): return "contradictory gaslight" # 20. Forced Accountability Flip if ( (anger + disgust) > 0.4 and any(p in patterns for p in ["blame shifting", "projection"]) and sentiment == "undermining" ): return "forced accountability flip" # Emotional Instability Fallback if ( (anger + sadness + disgust) > 0.5 and sentiment == "undermining" ): return "emotional instability" return "neutral" @spaces.GPU def predict_darvo_score(text): """Predict DARVO score for given text""" try: inputs = darvo_tokenizer(text, return_tensors="pt", truncation=True, padding=True) inputs = {k: v.to(device) for k, v in inputs.items()} with torch.no_grad(): logits = darvo_model(**inputs).logits return round(sigmoid(logits.cpu()).item(), 4) except Exception as e: logger.error(f"Error in DARVO prediction: {e}") return 0.0 def detect_weapon_language(text): """Detect weapon-related language in text""" weapon_keywords = ["knife", "gun", "bomb", "weapon", "kill", "stab"] t = text.lower() return any(w in t for w in weapon_keywords) def get_risk_stage(patterns, sentiment): """Determine risk stage based on patterns and sentiment""" try: if "insults" in patterns: return 2 elif "recovery" in patterns: return 3 elif "control" in patterns or "guilt tripping" in patterns: return 1 elif sentiment == "supportive" and any(p in patterns for p in ["projection", "dismissiveness"]): return 4 return 1 except Exception as e: logger.error(f"Error determining risk stage: {e}") return 1 def detect_threat_pattern(text, patterns): """Detect if a message contains threat patterns""" # Threat indicators in text threat_words = [ "regret", "sorry", "pay", "hurt", "suffer", "destroy", "ruin", "expose", "tell everyone", "never see", "take away", "lose", "control", "make sure", "won't let", "force", "warn", "never", "punish", "teach you", "learn", "show you", "remember", "if you", "don't forget", "i control", "i'll make sure", # Added these specific phrases "bank account", "phone", "money", "access" # Added financial control indicators ] # Check for conditional threats (if/then structures) text_lower = text.lower() conditional_threat = ( "if" in text_lower and any(word in text_lower for word in ["regret", "make sure", "control"]) ) has_threat_words = any(word in text_lower for word in threat_words) # Check for threat patterns threat_patterns = {"control", "gaslighting", "blame shifting", "insults"} has_threat_patterns = any(p in threat_patterns for p in patterns) return has_threat_words or has_threat_patterns or conditional_threat def detect_compound_threat(text, patterns): """Detect compound threats in a single message""" try: # Rule A: Single Message Multiple Patterns high_risk_patterns = {"control", "gaslighting", "blame shifting", "insults"} high_risk_count = sum(1 for p in patterns if p in high_risk_patterns) has_threat = detect_threat_pattern(text, patterns) # Special case for control + threats has_control = "control" in patterns has_conditional_threat = "if" in text.lower() and any(word in text.lower() for word in ["regret", "make sure", "control"]) # Single message compound threat if (has_threat and high_risk_count >= 2) or (has_control and has_conditional_threat): return True, "single_message" return False, None except Exception as e: logger.error(f"Error in compound threat detection: {e}") return False, None def analyze_message_batch_threats(messages, results): """Analyze multiple messages for compound threats""" threat_messages = [] support_messages = [] for i, (msg, (result, _)) in enumerate(zip(messages, results)): if not msg.strip(): # Skip empty messages continue patterns = result[1] # Get detected patterns # Check for threat in this message if detect_threat_pattern(msg, patterns): threat_messages.append(i) # Check for supporting patterns if any(p in {"control", "gaslighting", "blame shifting"} for p in patterns): support_messages.append(i) # Rule B: Multi-Message Accumulation if len(threat_messages) >= 2: return True, "multiple_threats" elif len(threat_messages) == 1 and len(support_messages) >= 2: return True, "threat_with_support" return False, None @spaces.GPU def compute_abuse_score(matched_scores, sentiment): """Compute abuse score from matched patterns and sentiment""" try: if not matched_scores: logger.debug("No matched scores, returning 0") return 0.0 # Calculate weighted score total_weight = sum(weight for _, _, weight in matched_scores) if total_weight == 0: logger.debug("Total weight is 0, returning 0") return 0.0 # Get highest pattern scores pattern_scores = [(label, score) for label, score, _ in matched_scores] sorted_scores = sorted(pattern_scores, key=lambda x: x[1], reverse=True) logger.debug(f"Sorted pattern scores: {sorted_scores}") # Base score calculation weighted_sum = sum(score * weight for _, score, weight in matched_scores) base_score = (weighted_sum / total_weight) * 100 logger.debug(f"Initial base score: {base_score:.1f}") # Cap maximum score based on pattern severity max_score = 85.0 # Set maximum possible score if any(label in {'control', 'gaslighting'} for label, _, _ in matched_scores): max_score = 90.0 logger.debug(f"Increased max score to {max_score} due to high severity patterns") # Apply diminishing returns for multiple patterns if len(matched_scores) > 1: multiplier = 1 + (0.1 * (len(matched_scores) - 1)) base_score *= multiplier logger.debug(f"Applied multiplier {multiplier:.2f} for {len(matched_scores)} patterns") # Apply sentiment modifier if sentiment == "supportive": base_score *= 0.85 logger.debug("Applied 15% reduction for supportive sentiment") final_score = min(round(base_score, 1), max_score) logger.debug(f"Final abuse score: {final_score}") return final_score except Exception as e: logger.error(f"Error computing abuse score: {e}") return 0.0 def detect_explicit_abuse(text): """Improved explicit abuse detection with word boundary checking""" import re explicit_abuse_words = ['fuck', 'bitch', 'shit', 'dick'] # Removed 'ass' # Add more specific patterns for actual abusive uses of 'ass' abusive_ass_patterns = [ r'\bass\b(?!\s*glass)', # 'ass' not followed by 'glass' r'\bdumb\s*ass\b', r'\bkiss\s*my\s*ass\b', r'\bget\s*your\s*ass\b' ] text_lower = text.lower() # Check basic explicit words for word in explicit_abuse_words: if re.search(r'\b' + word + r'\b', text_lower): return True # Check specific abusive 'ass' patterns for pattern in abusive_ass_patterns: if re.search(pattern, text_lower): return True return False @spaces.GPU @spaces.GPU def analyze_single_message(text, thresholds): """Analyze a single message for abuse patterns with boundary assessment""" logger.debug("\n=== DEBUG START ===") logger.debug(f"Input text: {text}") try: if not text.strip(): logger.debug("Empty text, returning zeros") return 0.0, [], [], {"label": "none"}, 1, 0.0, None, {'assessment': 'neutral', 'confidence': 0.5} # BOUNDARY HEALTH CHECK - Add this new section logger.debug("\nπ‘οΈ BOUNDARY HEALTH ANALYSIS") logger.debug("-" * 40) healthy_prob = predict_boundary_health(text) boundary_assessment = get_boundary_assessment(text, healthy_prob) logger.debug(f"Boundary Health Score: {healthy_prob:.3f}") logger.debug(f"Boundary Assessment: {boundary_assessment['label']}") # Get sentiment EARLY - BEFORE any early returns sent_inputs = sentiment_tokenizer(text, return_tensors="pt", truncation=True, padding=True) sent_inputs = {k: v.to(device) for k, v in sent_inputs.items()} with torch.no_grad(): sent_logits = sentiment_model(**sent_inputs).logits[0] sent_probs = torch.softmax(sent_logits, dim=-1).cpu().numpy() # Add detailed logging logger.debug("\nπ SENTIMENT ANALYSIS DETAILS") logger.debug(f"Raw logits: {sent_logits}") logger.debug(f"Probabilities: supportive={sent_probs[0]:.3f}, undermining={sent_probs[1]:.3f}") # Make sure we're using the correct index mapping sentiment = SENTIMENT_LABELS[int(np.argmax(sent_probs))] logger.debug(f"Selected sentiment: {sentiment}") # ADD THE OVERRIDE HERE - RIGHT AFTER SENTIMENT IS DEFINED: if (healthy_prob < 0.7 and sentiment == "supportive" and len(text.split()) > 50 and any(phrase in text.lower() for phrase in [ "i need you to", "i want to understand", "this isn't about", "about accuracy", "willing to do something different" ])): logger.debug("π Boundary assessment override: Sophisticated healthy boundary detected") boundary_assessment = { 'assessment': 'healthy', 'label': 'Healthy Boundary (Sophisticated)', 'confidence': 0.85, 'description': 'Complex but healthy boundary-setting communication', 'recommendations': ['Continue this thoughtful, direct approach'] } # EARLY SUPPORTIVE MESSAGE CHECK innocent_indicators = [ 'broken', 'not working', 'cracked', 'glass', 'screen', 'phone', 'device', 'battery', 'charger', 'wifi', 'internet', 'computer', 'sorry', 'apologize', 'my fault', 'mistake' ] # Enhanced early return check - now includes boundary health if (any(indicator in text.lower() for indicator in innocent_indicators) and len(text.split()) < 20 and not any(threat in text.lower() for threat in ['kill', 'hurt', 'destroy', 'hate']) and healthy_prob > 0.7): # Added boundary health check # If sentiment is strongly supportive AND boundary health is good, return early if sent_probs[0] > 0.8: # 80% supportive logger.debug("Early return: Message appears to be innocent/supportive with healthy boundaries") return 0.0, [], [], {"label": "supportive"}, 1, 0.0, "neutral", boundary_assessment # Check for explicit abuse (moved AFTER early return check) explicit_abuse = detect_explicit_abuse(text) logger.debug(f"Explicit abuse detected: {explicit_abuse}") # Abuse model inference inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True) inputs = {k: v.to(device) for k, v in inputs.items()} with torch.no_grad(): outputs = model(**inputs) raw_scores = torch.sigmoid(outputs.logits.squeeze(0)).cpu().numpy() # Log raw model outputs logger.debug("\nRaw model scores:") for label, score in zip(LABELS, raw_scores): logger.debug(f"{label}: {score:.3f}") # Get predictions and sort them predictions = list(zip(LABELS, raw_scores)) sorted_predictions = sorted(predictions, key=lambda x: x[1], reverse=True) logger.debug("\nTop 3 predictions:") for label, score in sorted_predictions[:3]: logger.debug(f"{label}: {score:.3f}") # Apply thresholds threshold_labels = [] if explicit_abuse: threshold_labels.append("insults") logger.debug("\nForced inclusion of 'insults' due to explicit abuse") for label, score in sorted_predictions: base_threshold = thresholds.get(label, 0.25) if explicit_abuse: base_threshold *= 0.5 if score > base_threshold: if label not in threshold_labels: threshold_labels.append(label) logger.debug(f"\nLabels that passed thresholds: {threshold_labels}") # Calculate matched scores matched_scores = [] for label in threshold_labels: score = raw_scores[LABELS.index(label)] weight = PATTERN_WEIGHTS.get(label, 1.0) if explicit_abuse and label == "insults": weight *= 1.5 matched_scores.append((label, score, weight)) enhanced_patterns = detect_enhanced_threats(text, threshold_labels) for pattern in enhanced_patterns: if pattern not in threshold_labels: threshold_labels.append(pattern) # Add to matched_scores with high confidence weight = PATTERN_WEIGHTS.get(pattern, 1.0) matched_scores.append((pattern, 0.85, weight)) # Calculate abuse score abuse_score = compute_abuse_score(matched_scores, sentiment) if explicit_abuse: abuse_score = max(abuse_score, 70.0) # Apply boundary health modifier to abuse score if healthy_prob > 0.8 and not explicit_abuse: # Very healthy boundaries - cap abuse score much lower abuse_score = min(abuse_score, 20.0) logger.debug(f"Capped abuse score to {abuse_score} due to very healthy boundaries") elif healthy_prob > 0.6 and sentiment == "supportive": # Moderately healthy boundaries with supportive sentiment abuse_score = min(abuse_score, 35.0) logger.debug(f"Capped abuse score to {abuse_score} due to healthy boundaries") # Apply sentiment-based score capping BEFORE compound threat check if sentiment == "supportive" and not explicit_abuse: # For supportive messages, cap the abuse score much lower abuse_score = min(abuse_score, 30.0) logger.debug(f"Capped abuse score to {abuse_score} due to supportive sentiment") # Check for compound threats compound_threat_flag, threat_type = detect_compound_threat(text, threshold_labels) # Apply compound threat override only for non-supportive messages if compound_threat_flag and sentiment != "supportive": logger.debug(f"β οΈ Compound threat detected in message: {threat_type}") abuse_score = max(abuse_score, 85.0) # Get DARVO score darvo_score = predict_darvo_score(text) # Get tone using emotion-based approach tone_tag = get_emotional_tone_tag(text, sentiment, threshold_labels, abuse_score) # Log tone usage log_emotional_tone_usage(tone_tag, threshold_labels) # Check for the specific combination (final safety check) highest_pattern = max(matched_scores, key=lambda x: x[1])[0] if matched_scores else None if sentiment == "supportive" and tone_tag == "neutral" and highest_pattern == "obscure language" and healthy_prob > 0.6: logger.debug("Message classified as likely non-abusive (supportive, neutral, healthy boundaries). Returning low risk.") return 0.0, [], [], {"label": "supportive"}, 1, 0.0, "neutral", boundary_assessment # Set stage stage = 2 if explicit_abuse or abuse_score > 70 else 1 logger.debug("=== DEBUG END ===\n") # Return with boundary assessment as the 8th element return abuse_score, threshold_labels, matched_scores, {"label": sentiment}, stage, darvo_score, tone_tag, boundary_assessment except Exception as e: logger.error(f"Error in analyze_single_message: {e}") logger.error(f"Traceback: {traceback.format_exc()}") return 0.0, [], [], {"label": "error"}, 1, 0.0, None, {'assessment': 'error', 'confidence': 0.0} def generate_abuse_score_chart(dates, scores, patterns): """Generate a timeline chart of abuse scores""" try: plt.figure(figsize=(10, 6)) plt.clf() # Create new figure fig, ax = plt.subplots(figsize=(10, 6)) # Plot points and lines x = range(len(scores)) plt.plot(x, scores, 'bo-', linewidth=2, markersize=8) # Add labels for each point with highest scoring pattern for i, (score, pattern) in enumerate(zip(scores, patterns)): # Get the pattern and its score plt.annotate( f'{pattern}\n{score:.0f}%', (i, score), textcoords="offset points", xytext=(0, 10), ha='center', bbox=dict( boxstyle='round,pad=0.5', fc='white', ec='gray', alpha=0.8 ) ) # Customize the plot plt.ylim(-5, 105) plt.grid(True, linestyle='--', alpha=0.7) plt.title('Abuse Pattern Timeline', pad=20, fontsize=12) plt.ylabel('Abuse Score %') # X-axis labels plt.xticks(x, dates, rotation=45) # Risk level bands with better colors plt.axhspan(0, 50, color='#90EE90', alpha=0.2) # light green - Low Risk plt.axhspan(50, 70, color='#FFD700', alpha=0.2) # gold - Moderate Risk plt.axhspan(70, 85, color='#FFA500', alpha=0.2) # orange - High Risk plt.axhspan(85, 100, color='#FF6B6B', alpha=0.2) # light red - Critical Risk # Add risk level labels plt.text(-0.2, 25, 'Low Risk', rotation=90, va='center') plt.text(-0.2, 60, 'Moderate Risk', rotation=90, va='center') plt.text(-0.2, 77.5, 'High Risk', rotation=90, va='center') plt.text(-0.2, 92.5, 'Critical Risk', rotation=90, va='center') # Adjust layout plt.tight_layout() # Convert plot to image buf = io.BytesIO() plt.savefig(buf, format='png', bbox_inches='tight') buf.seek(0) plt.close('all') # Close all figures to prevent memory leaks return Image.open(buf) except Exception as e: logger.error(f"Error generating abuse score chart: {e}") return None def analyze_composite(msg1, msg2, msg3, *answers_and_none): """Analyze multiple messages and checklist responses""" logger.debug("\nπ STARTING NEW ANALYSIS") logger.debug("=" * 50) # Define severity categories at the start high = {'control'} moderate = {'gaslighting', 'dismissiveness', 'obscure language', 'insults', 'contradictory statements', 'guilt tripping'} low = {'blame shifting', 'projection', 'recovery'} try: # Process checklist responses logger.debug("\nπ CHECKLIST PROCESSING") logger.debug("=" * 50) none_selected_checked = answers_and_none[-1] responses_checked = any(answers_and_none[:-1]) none_selected = not responses_checked and none_selected_checked logger.debug("Checklist Status:") logger.debug(f" β’ None Selected Box: {'β' if none_selected_checked else 'β'}") logger.debug(f" β’ Has Responses: {'β' if responses_checked else 'β'}") logger.debug(f" β’ Final Status: {'None Selected' if none_selected else 'Has Selections'}") if none_selected: escalation_score = 0 escalation_note = "Checklist completed: no danger items reported." escalation_completed = True logger.debug("\nβ Checklist: No items selected") elif responses_checked: escalation_score = sum(w for (_, w), a in zip(ESCALATION_QUESTIONS, answers_and_none[:-1]) if a) escalation_note = "Checklist completed." escalation_completed = True logger.debug(f"\nπ Checklist Score: {escalation_score}") # Log checked items logger.debug("\nβ οΈ Selected Risk Factors:") for (q, w), a in zip(ESCALATION_QUESTIONS, answers_and_none[:-1]): if a: logger.debug(f" β’ [{w} points] {q}") else: escalation_score = None escalation_note = "Checklist not completed." escalation_completed = False logger.debug("\nβ Checklist: Not completed") # Process messages logger.debug("\nπ MESSAGE PROCESSING") logger.debug("=" * 50) messages = [msg1, msg2, msg3] active = [(m, f"Message {i+1}") for i, m in enumerate(messages) if m.strip()] logger.debug(f"Active Messages: {len(active)} of 3") if not active: logger.debug("β Error: No messages provided") return "Please enter at least one message.", None # Detect threats logger.debug("\nπ¨ THREAT DETECTION") logger.debug("=" * 50) def normalize(text): import unicodedata text = text.lower().strip() text = unicodedata.normalize("NFKD", text) text = text.replace("'", "'") return re.sub(r"[^a-z0-9 ]", "", text) def detect_threat_motifs(message, motif_list): norm_msg = normalize(message) return [motif for motif in motif_list if normalize(motif) in norm_msg] # Analyze threats and patterns immediate_threats = [detect_threat_motifs(m, THREAT_MOTIFS) for m, _ in active] flat_threats = [t for sublist in immediate_threats for t in sublist] threat_risk = "Yes" if flat_threats else "No" # Analyze each message logger.debug("\nπ INDIVIDUAL MESSAGE ANALYSIS") logger.debug("=" * 50) results = [] for m, d in active: logger.debug(f"\nπ ANALYZING {d}") logger.debug("-" * 40) result = analyze_single_message(m, THRESHOLDS.copy()) # Check for non-abusive classification and skip further analysis if result[0] == 0.0 and result[1] == [] and result[3] == {"label": "supportive"} and result[4] == 1 and result[5] == 0.0 and result[6] == "neutral": logger.debug(f"β {d} classified as non-abusive, skipping further analysis.") continue # This MUST be inside the for loop results.append((result, d)) # UNPACK RESULT FIRST - INSIDE the for loop abuse_score, patterns, matched_scores, sentiment, stage, darvo_score, tone, boundary_assessment = result # NOW you can log these variables safely logger.debug("\nπ CORE METRICS") logger.debug(f" β’ Abuse Score: {abuse_score:.1f}%") logger.debug(f" β’ DARVO Score: {darvo_score:.3f}") logger.debug(f" β’ Risk Stage: {stage}") logger.debug(f" β’ Sentiment: {sentiment['label']}") logger.debug(f" β’ Tone: {tone}") # Log detected patterns with scores if patterns: logger.debug("\nπ― DETECTED PATTERNS") for label, score, weight in matched_scores: severity = "βHIGH" if label in high else "β οΈ MODERATE" if label in moderate else "π LOW" logger.debug(f" β’ {severity} | {label}: {score:.3f} (weight: {weight})") else: logger.debug("\nβ No abuse patterns detected") # Check if we have any results to process if not results: logger.debug("No valid results to analyze - all messages were classified as non-abusive") return "All messages appear to be non-abusive based on the analysis.", None # Extract scores and metadata abuse_scores = [r[0][0] for r in results] stages = [r[0][4] for r in results] darvo_scores = [r[0][5] for r in results] tone_tags = [r[0][6] for r in results] dates_used = [r[1] for r in results] # Pattern Analysis Summary logger.debug("\nπ PATTERN ANALYSIS SUMMARY") logger.debug("=" * 50) predicted_labels = [label for r in results for label in r[0][1]] if predicted_labels: logger.debug("Detected Patterns Across All Messages:") pattern_counts = Counter(predicted_labels) # Log high severity patterns first high_patterns = [p for p in pattern_counts if p in high] if high_patterns: logger.debug("\nβ HIGH SEVERITY PATTERNS:") for p in high_patterns: logger.debug(f" β’ {p} (Γ{pattern_counts[p]})") # Then moderate moderate_patterns = [p for p in pattern_counts if p in moderate] if moderate_patterns: logger.debug("\nβ οΈ MODERATE SEVERITY PATTERNS:") for p in moderate_patterns: logger.debug(f" β’ {p} (Γ{pattern_counts[p]})") # Then low low_patterns = [p for p in pattern_counts if p in low] if low_patterns: logger.debug("\nπ LOW SEVERITY PATTERNS:") for p in low_patterns: logger.debug(f" β’ {p} (Γ{pattern_counts[p]})") else: logger.debug("β No patterns detected across messages") # Pattern Severity Analysis logger.debug("\nβοΈ SEVERITY ANALYSIS") logger.debug("=" * 50) counts = {'high': 0, 'moderate': 0, 'low': 0} for label in predicted_labels: if label in high: counts['high'] += 1 elif label in moderate: counts['moderate'] += 1 elif label in low: counts['low'] += 1 logger.debug("Pattern Distribution:") if counts['high'] > 0: logger.debug(f" β High Severity: {counts['high']}") if counts['moderate'] > 0: logger.debug(f" β οΈ Moderate Severity: {counts['moderate']}") if counts['low'] > 0: logger.debug(f" π Low Severity: {counts['low']}") total_patterns = sum(counts.values()) if total_patterns > 0: logger.debug(f"\nSeverity Percentages:") logger.debug(f" β’ High: {(counts['high']/total_patterns)*100:.1f}%") logger.debug(f" β’ Moderate: {(counts['moderate']/total_patterns)*100:.1f}%") logger.debug(f" β’ Low: {(counts['low']/total_patterns)*100:.1f}%") # Risk Assessment logger.debug("\nπ― RISK ASSESSMENT") logger.debug("=" * 50) if counts['high'] >= 2 and counts['moderate'] >= 2: pattern_escalation_risk = "Critical" logger.debug("ββ CRITICAL RISK") logger.debug(" β’ Multiple high and moderate patterns detected") logger.debug(f" β’ High patterns: {counts['high']}") logger.debug(f" β’ Moderate patterns: {counts['moderate']}") elif (counts['high'] >= 2 and counts['moderate'] >= 1) or \ (counts['moderate'] >= 3) or \ (counts['high'] >= 1 and counts['moderate'] >= 2): pattern_escalation_risk = "High" logger.debug("β HIGH RISK") logger.debug(" β’ Significant pattern combination detected") logger.debug(f" β’ High patterns: {counts['high']}") logger.debug(f" β’ Moderate patterns: {counts['moderate']}") elif (counts['moderate'] == 2) or \ (counts['high'] == 1 and counts['moderate'] == 1) or \ (counts['moderate'] == 1 and counts['low'] >= 2) or \ (counts['high'] == 1 and sum(counts.values()) == 1): pattern_escalation_risk = "Moderate" logger.debug("β οΈ MODERATE RISK") logger.debug(" β’ Concerning pattern combination detected") logger.debug(f" β’ Pattern distribution: H:{counts['high']}, M:{counts['moderate']}, L:{counts['low']}") else: pattern_escalation_risk = "Low" logger.debug("π LOW RISK") logger.debug(" β’ Limited pattern severity detected") logger.debug(f" β’ Pattern distribution: H:{counts['high']}, M:{counts['moderate']}, L:{counts['low']}") # Checklist Risk Assessment logger.debug("\nπ CHECKLIST RISK ASSESSMENT") logger.debug("=" * 50) checklist_escalation_risk = "Unknown" if escalation_score is None else ( "Critical" if escalation_score >= 20 else "Moderate" if escalation_score >= 10 else "Low" ) if escalation_score is not None: logger.debug(f"Score: {escalation_score}/29") logger.debug(f"Risk Level: {checklist_escalation_risk}") if escalation_score >= 20: logger.debug("ββ CRITICAL: Score indicates severe risk") elif escalation_score >= 10: logger.debug("β οΈ MODERATE: Score indicates concerning risk") else: logger.debug("π LOW: Score indicates limited risk") else: logger.debug("β Risk Level: Unknown (checklist not completed)") # Escalation Analysis logger.debug("\nπ ESCALATION ANALYSIS") logger.debug("=" * 50) escalation_bump = 0 for result, msg_id in results: abuse_score, patterns, matched_scores, sentiment, stage, darvo_score, tone_tag, boundary_assessment = result logger.debug(f"\nπ Message {msg_id} Risk Factors:") factors = [] if darvo_score > 0.65: escalation_bump += 3 factors.append(f"β² +3: High DARVO score ({darvo_score:.3f})") if tone_tag in ["forced accountability flip", "emotional threat"]: escalation_bump += 2 factors.append(f"β² +2: Concerning tone ({tone_tag})") if abuse_score > 80: escalation_bump += 2 factors.append(f"β² +2: High abuse score ({abuse_score:.1f}%)") if stage == 2: escalation_bump += 3 factors.append("β² +3: Escalation stage") if factors: for factor in factors: logger.debug(f" {factor}") else: logger.debug(" β No escalation factors") logger.debug(f"\nπ Total Escalation Bump: +{escalation_bump}") # Check for compound threats across messages compound_threat_flag, threat_type = analyze_message_batch_threats( [msg1, msg2, msg3], results ) if compound_threat_flag: logger.debug(f"β οΈ Compound threat detected across messages: {threat_type}") pattern_escalation_risk = "Critical" # Override risk level logger.debug("Risk level elevated to CRITICAL due to compound threats") # Combined Risk Calculation logger.debug("\nπ― FINAL RISK CALCULATION") logger.debug("=" * 50) def rank(label): return {"Low": 0, "Moderate": 1, "High": 2, "Critical": 3, "Unknown": 0}.get(label, 0) combined_score = rank(pattern_escalation_risk) + rank(checklist_escalation_risk) + escalation_bump logger.debug("Risk Components:") logger.debug(f" β’ Pattern Risk ({pattern_escalation_risk}): +{rank(pattern_escalation_risk)}") logger.debug(f" β’ Checklist Risk ({checklist_escalation_risk}): +{rank(checklist_escalation_risk)}") logger.debug(f" β’ Escalation Bump: +{escalation_bump}") logger.debug(f" = Combined Score: {combined_score}") escalation_risk = ( "Critical" if combined_score >= 6 else "High" if combined_score >= 4 else "Moderate" if combined_score >= 2 else "Low" ) logger.debug(f"\nβ οΈ Final Escalation Risk: {escalation_risk}") # Generate Output Text logger.debug("\nπ GENERATING OUTPUT") logger.debug("=" * 50) if escalation_score is None: escalation_text = ( "π« **Escalation Potential: Unknown** (Checklist not completed)\n" "β οΈ This section was not completed. Escalation potential is estimated using message data only.\n" ) hybrid_score = 0 logger.debug("Generated output for incomplete checklist") elif escalation_score == 0: escalation_text = ( "β **Escalation Checklist Completed:** No danger items reported.\n" "π§ **Escalation potential estimated from detected message patterns only.**\n" f"β’ Pattern Risk: {pattern_escalation_risk}\n" f"β’ Checklist Risk: None reported\n" f"β’ Escalation Bump: +{escalation_bump} (from DARVO, tone, intensity, etc.)" ) hybrid_score = escalation_bump logger.debug("Generated output for no-risk checklist") else: hybrid_score = escalation_score + escalation_bump escalation_text = ( f"π **Escalation Potential: {escalation_risk} ({hybrid_score}/29)**\n" "π This score combines your safety checklist answers *and* detected high-risk behavior.\n" f"β’ Pattern Risk: {pattern_escalation_risk}\n" f"β’ Checklist Risk: {checklist_escalation_risk}\n" f"β’ Escalation Bump: +{escalation_bump} (from DARVO, tone, intensity, etc.)" ) logger.debug(f"Generated output with hybrid score: {hybrid_score}/29") # Final Metrics logger.debug("\nπ FINAL METRICS") logger.debug("=" * 50) composite_abuse = int(round(sum(abuse_scores) / len(abuse_scores))) logger.debug(f"Composite Abuse Score: {composite_abuse}%") most_common_stage = max(set(stages), key=stages.count) logger.debug(f"Most Common Stage: {most_common_stage}") avg_darvo = round(sum(darvo_scores) / len(darvo_scores), 3) logger.debug(f"Average DARVO Score: {avg_darvo}") final_risk_level = calculate_enhanced_risk_level( composite_abuse, predicted_labels, escalation_risk, avg_darvo ) # Override escalation_risk with the enhanced version escalation_risk = final_risk_level # Add this to your analyze_composite function, right after the Final Report section # Generate Final Report (existing code) logger.debug("\nπ GENERATING FINAL REPORT") logger.debug("=" * 50) out = f"Abuse Intensity: {composite_abuse}%\n" # ADD HEALTHY BOUNDARY DETECTION - NEW SECTION logger.debug("\nπ‘οΈ CHECKING FOR HEALTHY BOUNDARIES") logger.debug("=" * 50) # Check if any messages were identified as having healthy boundaries healthy_boundaries_detected = [] for result, msg_id in results: abuse_score, patterns, matched_scores, sentiment, stage, darvo_score, tone_tag, boundary_assessment = result # Check if this message has healthy boundaries if boundary_assessment.get('assessment') == 'healthy': healthy_boundaries_detected.append({ 'message_id': msg_id, 'label': boundary_assessment.get('label', 'Healthy Boundary'), 'confidence': boundary_assessment.get('confidence', 1.0), 'description': boundary_assessment.get('description', 'Healthy boundary communication detected') }) logger.debug(f"β {msg_id}: {boundary_assessment.get('label')}") # Add healthy boundary section to output if any detected if healthy_boundaries_detected: logger.debug(f"Found {len(healthy_boundaries_detected)} healthy boundary messages") out += "\nπ‘οΈ **HEALTHY BOUNDARIES DETECTED**\n" out += "=" * 50 + "\n" if len(healthy_boundaries_detected) == 1: boundary = healthy_boundaries_detected[0] out += f"β **{boundary['message_id']}**: This is a healthily phrased boundary\n" out += f" β’ **Type**: {boundary['label']}\n" out += f" β’ **Analysis**: {boundary['description']}\n" out += " β’ **Recommendation**: Continue this respectful, direct communication approach\n" else: out += f"β **Multiple healthy boundaries detected** ({len(healthy_boundaries_detected)} messages)\n" for boundary in healthy_boundaries_detected: out += f" β’ **{boundary['message_id']}**: {boundary['label']}\n" out += " β’ **Overall**: These messages demonstrate healthy boundary-setting skills\n" out += "\nπ‘ **About Healthy Boundaries**: Even when addressing difficult topics, " out += "these messages use respectful language, focus on specific behaviors rather than " out += "character attacks, and communicate needs clearly without manipulation.\n" logger.debug("Added healthy boundary section to output") else: logger.debug("No healthy boundaries detected in messages") if predicted_labels: out += "\nπ Detected Patterns:\n" # Add detected patterns to output if predicted_labels: out += "π Detected Patterns:\n" pattern_counts = Counter(predicted_labels) # Re-define here for safety high_patterns = [p for p in pattern_counts if p in high] moderate_patterns = [p for p in pattern_counts if p in moderate] low_patterns = [p for p in pattern_counts if p in low] if high_patterns: patterns_str = ", ".join(f"{p} ({pattern_counts[p]}x)" for p in high_patterns) out += f"β High Severity: {patterns_str}\n" if moderate_patterns: patterns_str = ", ".join(f"{p} ({pattern_counts[p]}x)" for p in moderate_patterns) out += f"β οΈ Moderate Severity: {patterns_str}\n" if low_patterns: patterns_str = ", ".join(f"{p} ({pattern_counts[p]}x)" for p in low_patterns) out += f"π Low Severity: {patterns_str}\n" out += "\n" out += "π This reflects the strength and severity of detected abuse patterns in the message(s).\n\n" # Risk Level Assessment risk_level = final_risk_level logger.debug(f"Final Risk Level: {risk_level}") # Add Risk Description risk_descriptions = { "Critical": ( "π¨ **Risk Level: Critical**\n" "Multiple severe abuse patterns detected. This situation shows signs of " "dangerous escalation and immediate intervention may be needed." ), "High": ( "β οΈ **Risk Level: High**\n" "Strong abuse patterns detected. This situation shows concerning " "signs of manipulation and control." ), "Moderate": ( "β‘ **Risk Level: Moderate**\n" "Concerning patterns detected. While not severe, these behaviors " "indicate unhealthy relationship dynamics." ), "Low": ( "π **Risk Level: Low**\n" "Minor concerning patterns detected. While present, the detected " "behaviors are subtle or infrequent." ) } out += risk_descriptions[risk_level] out += f"\n\n{RISK_STAGE_LABELS[most_common_stage]}" logger.debug("Added risk description and stage information") # Add DARVO Analysis if avg_darvo > 0.25: level = "moderate" if avg_darvo < 0.65 else "high" out += f"\n\nπ **DARVO Score: {avg_darvo}** β This indicates a **{level} likelihood** of narrative reversal (DARVO), where the speaker may be denying, attacking, or reversing blame." logger.debug(f"Added DARVO analysis ({level} level)") # Add Emotional Tones logger.debug("\nπ Adding Emotional Tones") out += "\n\nπ **Emotional Tones Detected:**\n" for i, tone in enumerate(tone_tags): out += f"β’ Message {i+1}: *{tone or 'none'}*\n" logger.debug(f"Message {i+1} tone: {tone}") # Add Threats Section logger.debug("\nβ οΈ Adding Threat Analysis") if flat_threats: out += "\n\nπ¨ **Immediate Danger Threats Detected:**\n" for t in set(flat_threats): out += f"β’ \"{t}\"\n" out += "\nβ οΈ These phrases may indicate an imminent risk to physical safety." logger.debug(f"Added {len(set(flat_threats))} unique threat warnings") else: out += "\n\nπ§© **Immediate Danger Threats:** None explicitly detected.\n" out += "This does *not* rule out risk, but no direct threat phrases were matched." logger.debug("No threats to add") # Generate Timeline logger.debug("\nπ Generating Timeline") pattern_labels = [] for result, _ in results: matched_scores = result[2] # Get the matched_scores from the result tuple if matched_scores: # Sort matched_scores by score and get the highest scoring pattern highest_pattern = max(matched_scores, key=lambda x: x[1]) pattern_labels.append(highest_pattern[0]) # Add the pattern name else: pattern_labels.append("none") logger.debug("Pattern labels for timeline:") for i, (pattern, score) in enumerate(zip(pattern_labels, abuse_scores)): logger.debug(f"Message {i+1}: {pattern} ({score:.1f}%)") timeline_image = generate_abuse_score_chart(dates_used, abuse_scores, pattern_labels) logger.debug("Timeline generated successfully") # Add Escalation Text out += "\n\n" + escalation_text logger.debug("Added escalation text to output") logger.debug("\nβ ANALYSIS COMPLETE") logger.debug("=" * 50) # SAFETY PLANNING CHECK # Check if safety planning should be offered show_safety = should_show_safety_planning( composite_abuse, escalation_risk, predicted_labels ) safety_plan = "" if show_safety: # Generate safety plan safety_plan = generate_simple_safety_plan( composite_abuse, escalation_risk, predicted_labels ) # Add notice to main results out += "\n\n" + "π‘οΈ " + "="*48 out += "\n**SAFETY PLANNING AVAILABLE**" out += "\n" + "="*50 out += "\n\nBased on your analysis results, we've generated a safety plan." out += "\nCheck the 'Safety Plan' output below for personalized guidance." return out, timeline_image, safety_plan except Exception as e: logger.error("\nβ ERROR IN ANALYSIS") logger.error("=" * 50) logger.error(f"Error type: {type(e).__name__}") logger.error(f"Error message: {str(e)}") logger.error(f"Traceback:\n{traceback.format_exc()}") return "An error occurred during analysis.", None, "" def format_results_for_new_ui(analysis_output, timeline_image, safety_plan): """ Convert your existing analysis output into the format needed for the new UI """ try: # Parse your existing text output to extract structured data lines = analysis_output.split('\n') # Extract abuse intensity abuse_intensity = 0 for line in lines: if line.startswith('Abuse Intensity:'): abuse_intensity = int(re.findall(r'\d+', line)[0]) break # Extract DARVO score darvo_score = 0.0 for line in lines: if 'DARVO Score:' in line: # Extract number from line like "π **DARVO Score: 0.456**" darvo_match = re.search(r'DARVO Score: ([\d.]+)', line) if darvo_match: darvo_score = float(darvo_match.group(1)) break # Extract emotional tones emotional_tones = [] in_tones_section = False for line in lines: if 'π **Emotional Tones Detected:**' in line: in_tones_section = True continue elif in_tones_section and line.strip(): if line.startswith('β’ Message'): # Extract tone from line like "β’ Message 1: *menacing calm*" tone_match = re.search(r'\*([^*]+)\*', line) if tone_match: tone = tone_match.group(1) emotional_tones.append(tone if tone != 'none' else 'neutral') else: emotional_tones.append('neutral') elif not line.startswith('β’') and line.strip(): break # Determine risk level based on your existing logic if abuse_intensity >= 85: risk_level = 'critical' elif abuse_intensity >= 70: risk_level = 'high' elif abuse_intensity >= 50: risk_level = 'moderate' else: risk_level = 'low' # FIXED: Extract detected patterns properly patterns = [] in_patterns_section = False # Define valid pattern names to filter against valid_patterns = { "recovery phase", "control", "gaslighting", "guilt tripping", "dismissiveness", "blame shifting", "nonabusive", "projection", "insults", "contradictory statements", "obscure language", "veiled threats", "stalking language", "false concern", "false equivalence", "future faking" } for line in lines: if 'π Detected Patterns:' in line: in_patterns_section = True continue elif in_patterns_section and line.strip(): if line.startswith('β'): severity = 'high' elif line.startswith('β οΈ'): severity = 'moderate' elif line.startswith('π'): severity = 'low' else: continue # Extract pattern text after the severity indicator if ':' in line: pattern_text = line.split(':', 1)[1].strip() else: pattern_text = line[2:].strip() # Remove emoji and space # Parse individual patterns from the text # Handle format like "blame shifting (1x), projection (2x)" pattern_parts = pattern_text.split(',') for part in pattern_parts: # Clean up the pattern name pattern_name = part.strip() # Remove count indicators like "(1x)", "(2x)", etc. pattern_name = re.sub(r'\s*\(\d+x?\)', '', pattern_name) # Remove any remaining special characters and clean pattern_name = pattern_name.strip().lower() # Only add if it's a valid pattern name if pattern_name in valid_patterns: patterns.append({ 'name': pattern_name.replace('_', ' ').title(), 'severity': severity, 'description': get_pattern_description(pattern_name) }) elif line.strip() and not line.startswith(('β', 'β οΈ', 'π')) and in_patterns_section: # Exit patterns section when we hit a non-pattern line break # Generate personalized recommendations recommendations = generate_personalized_recommendations(abuse_intensity, patterns, safety_plan) # Add this before the return statement in format_results_for_new_ui() boundary_health = { 'overall_health': 'unknown', 'message_assessments': [], 'recommendations': [] } return { 'riskLevel': risk_level, 'riskScore': abuse_intensity, 'primaryConcerns': patterns[:3], # Top 3 most important 'allPatterns': patterns, 'riskStage': extract_risk_stage(analysis_output), 'emotionalTones': emotional_tones, 'darvoScore': darvo_score, 'boundaryHealth': boundary_health, 'personalizedRecommendations': recommendations, 'hasSafetyPlan': bool(safety_plan), 'safetyPlan': safety_plan, 'rawAnalysis': analysis_output } except Exception as e: logger.error(f"Error formatting results: {e}") return { 'riskLevel': 'low', 'riskScore': 0, 'primaryConcerns': [], 'allPatterns': [], 'riskStage': 'unknown', 'emotionalTones': [], 'darvoScore': 0.0, 'boundaryHealth': {'overall_health': 'unknown', 'message_assessments': [], 'recommendations': []}, 'personalizedRecommendations': ['Consider speaking with a counselor about your relationship concerns'], 'hasSafetyPlan': False, 'safetyPlan': '', 'rawAnalysis': analysis_output } def get_pattern_description(pattern_name): """Get human-readable descriptions for patterns""" descriptions = { 'control': 'Attempts to manage your behavior, decisions, or daily activities', 'gaslighting': 'Making you question your memory, perception, or reality', 'dismissiveness': 'Minimizing or invalidating your feelings and experiences', 'guilt tripping': 'Making you feel guilty to influence your behavior', 'blame shifting': 'Placing responsibility for their actions onto you', 'projection': 'Accusing you of behaviors they themselves exhibit', 'insults': 'Name-calling or personal attacks intended to hurt', 'contradictory statements': 'Saying things that conflict with previous statements', 'obscure language': 'Using vague or confusing language to avoid accountability', 'veiled threats': 'Indirect threats or intimidating language', 'stalking language': 'Monitoring, tracking, or obsessive behaviors', 'false concern': 'Expressing fake worry to manipulate or control', 'false equivalence': 'Comparing incomparable situations to justify behavior', 'future faking': 'Making promises about future behavior that are unlikely to be kept' } return descriptions.get(pattern_name.lower(), 'Concerning communication pattern detected') def generate_personalized_recommendations(abuse_score, patterns, safety_plan): """Generate recommendations based on specific findings""" recommendations = [] # Base recommendations if abuse_score >= 70: recommendations.extend([ 'Document these conversations with dates and times', 'Reach out to a trusted friend or family member about your concerns', 'Consider contacting the National Domestic Violence Hotline for guidance' ]) elif abuse_score >= 40: recommendations.extend([ 'Keep a private journal of concerning interactions', 'Talk to someone you trust about these communication patterns', 'Consider counseling to explore healthy relationship dynamics' ]) else: recommendations.extend([ 'Continue monitoring communication patterns that concern you', 'Consider discussing communication styles with your partner when you feel safe to do so' ]) # Pattern-specific recommendations pattern_names = [p['name'].lower() for p in patterns] if 'control' in pattern_names: recommendations.append('Maintain your independence and decision-making autonomy') if 'gaslighting' in pattern_names: recommendations.append('Trust your memory and perceptions - consider keeping notes') if any(p in pattern_names for p in ['stalking language', 'veiled threats']): recommendations.append('Vary your routines and inform trusted people of your whereabouts') if safety_plan: recommendations.append('Review your personalized safety plan regularly') return recommendations[:4] # Limit to 4 recommendations def extract_risk_stage(analysis_output): """Extract risk stage from analysis output""" if 'Tension-Building' in analysis_output: return 'tension-building' elif 'Escalation' in analysis_output: return 'escalation' elif 'Reconciliation' in analysis_output: return 'reconciliation' elif 'Honeymoon' in analysis_output: return 'honeymoon' else: return 'unknown' def create_boundary_health_display(boundary_assessment): """Create HTML display for boundary health assessment""" color_map = { 'healthy': '#10b981', 'mostly_healthy': '#3b82f6', 'concerning': '#f59e0b', 'unhealthy': '#ef4444', 'neutral': '#6b7280' } bg_map = { 'healthy': '#f0fdf4', 'mostly_healthy': '#eff6ff', 'concerning': '#fffbeb', 'unhealthy': '#fef2f2', 'neutral': '#f9fafb' } assessment = boundary_assessment.get('assessment', 'neutral') color = color_map.get(assessment, '#6b7280') bg_color = bg_map.get(assessment, '#f9fafb') html = f"""
{boundary_assessment.get('description', 'No assessment available')}
Confidence: {boundary_assessment.get('confidence', 0):.1%}
Share messages that concern you, and we'll help you understand what patterns might be present.
Your messages are analyzed locally and are not stored or shared. This tool is for educational purposes and not a substitute for professional counseling.
Enter up to three messages that made you feel uncomfortable, confused, or concerned. For the most accurate analysis, include messages from recent emotionally intense conversations.
""") msg1_desktop = gr.Textbox( label="Message 1 *", placeholder="Enter the message here...", lines=4 ) msg2_desktop = gr.Textbox( label="Message 2 (optional)", placeholder="Enter the message here...", lines=4 ) msg3_desktop = gr.Textbox( label="Message 3 (optional)", placeholder="Enter the message here...", lines=4 ) # Checklist column with gr.Column(elem_classes=["desktop-col-checklist"], scale=3, min_width=300): gr.HTML("Optional but recommended. Check any that apply to your situation:
""") checklist_items_desktop = [] with gr.Column(elem_classes=["compact-checklist"]): for question, weight in ESCALATION_QUESTIONS: checklist_items_desktop.append(gr.Checkbox(label=question, elem_classes=["compact-checkbox"])) none_selected_desktop = gr.Checkbox( label="None of the above apply to my situation", elem_classes=["none-checkbox"] ) analyze_btn_desktop = gr.Button( "Analyze Messages", variant="primary", size="lg" ) # Results column with gr.Column(elem_classes=["desktop-col-results"], scale=5, min_width=400): gr.HTML("Results will appear here after analysis...
""") # Desktop results components results_json_desktop = gr.JSON(visible=False) risk_summary_desktop = gr.HTML(visible=False) concerns_display_desktop = gr.HTML(visible=False) additional_metrics_desktop = gr.HTML(visible=False) recommendations_display_desktop = gr.HTML(visible=False) with gr.Row(visible=False) as action_buttons_desktop: safety_plan_btn_desktop = gr.Button("π‘οΈ Get Safety Plan", variant="secondary") full_analysis_btn_desktop = gr.Button("π Show Full Analysis", variant="secondary") download_btn_desktop = gr.Button("π Download Report", variant="secondary") full_analysis_display_desktop = gr.HTML(visible=False) timeline_chart_desktop = gr.Image(visible=False, label="Pattern Timeline") download_file_desktop = gr.File(label="Download Report", visible=False) # Mobile layout with gr.Column(elem_classes=["mobile-only"]): # Message input - always visible gr.HTML("Enter messages that made you uncomfortable or concerned:
""") msg1_mobile = gr.Textbox( label="Message 1 (required)", placeholder="Enter the concerning message here...", lines=3 ) # Button to show additional messages show_more_msgs_btn = gr.Button( "β Add More Messages (Optional)", elem_classes=["mobile-expand-btn", "mobile-expandable-btn"], variant="secondary" ) # Additional messages (hidden by default) with gr.Column(visible=False) as additional_messages_mobile: msg2_mobile = gr.Textbox( label="Message 2 (optional)", placeholder="Enter another message...", lines=3 ) msg3_mobile = gr.Textbox( label="Message 3 (optional)", placeholder="Enter a third message...", lines=3 ) # Button to show safety checklist show_checklist_btn = gr.Button( "β οΈ Safety Checklist (Optional)", elem_classes=["mobile-expand-btn", "mobile-expandable-btn"], variant="secondary" ) # Safety checklist (hidden by default) with gr.Column(visible=False) as safety_checklist_mobile: gr.HTML("""Check any that apply to improve analysis accuracy:
""") checklist_items_mobile = [] for question, weight in ESCALATION_QUESTIONS: checklist_items_mobile.append(gr.Checkbox(label=question, elem_classes=["compact-checkbox"])) none_selected_mobile = gr.Checkbox( label="None of the above apply", elem_classes=["none-checkbox"] ) # Analysis button analyze_btn_mobile = gr.Button( "π Analyze Messages", variant="primary", size="lg" ) # Mobile results components results_json_mobile = gr.JSON(visible=False) risk_summary_mobile = gr.HTML(visible=False) concerns_display_mobile = gr.HTML(visible=False) additional_metrics_mobile = gr.HTML(visible=False) recommendations_display_mobile = gr.HTML(visible=False) with gr.Row(visible=False) as action_buttons_mobile: safety_plan_btn_mobile = gr.Button("π‘οΈ Safety Plan", variant="secondary") full_analysis_btn_mobile = gr.Button("π Full Analysis", variant="secondary") download_btn_mobile = gr.Button("π Download", variant="secondary") full_analysis_display_mobile = gr.HTML(visible=False) timeline_chart_mobile = gr.Image(visible=False, label="Pattern Timeline") download_file_mobile = gr.File(label="Download Report", visible=False) with gr.Tab("Safety Resources"): gr.HTML("""If you're concerned about your safety, here are immediate resources and steps you can take.
911 - For immediate danger
1-800-799-7233 - National DV Hotline (24/7)
Text START to 88788 - Crisis Text Line
988 - National Suicide Prevention Lifeline
thehotline.org - Online chat support
Local counseling services - Professional support
Trusted friends/family - Personal support network
Legal advocacy - Know your rights
Based on the messages you shared
Risk Score: {results['riskScore']}%
{concern.get('description', 'No description available')}
No specific concerns identified in the messages.
" # Additional Metrics Section metrics_html = "DARVO (Deny, Attack, Reverse Victim & Offender) indicates potential narrative manipulation where the speaker may be deflecting responsibility.
β’ Message {i+1}: {tone}
""" metrics_html += """Emotional tone analysis helps identify underlying manipulation tactics or concerning emotional patterns.
β’ {rec}
No analysis data available. Please run the analysis first.
", visible=True) # Handle both JSON string and dict inputs if isinstance(results_json_str, str): results = json.loads(results_json_str) elif isinstance(results_json_str, dict): results = results_json_str else: return gr.update(value="Invalid data format. Please run the analysis again.
", visible=True) # Create comprehensive full analysis display full_html = f"""Risk Level: {results.get('riskLevel', 'Unknown').title()}
Risk Score: {results.get('riskScore', 'N/A')}%
Risk Stage: {results.get('riskStage', 'Unknown').replace('-', ' ').title()}
DARVO Score: {results.get('darvoScore', 0):.3f}
Emotional Tones: {', '.join(results.get('emotionalTones', ['None detected']))}
{severity_badge} {pattern.get('name', 'Unknown')}
{pattern.get('description', 'No description available')}
No specific patterns detected.
" full_html += """Unable to parse analysis results: {str(e)}
Please try running the analysis again.