Spaces:
Running
on
Zero
Running
on
Zero
| import gradio as gr | |
| import spaces | |
| import torch | |
| import numpy as np | |
| from transformers import AutoModelForSequenceClassification, AutoTokenizer, pipeline as hf_pipeline | |
| import re | |
| import matplotlib.pyplot as plt | |
| import io | |
| from PIL import Image | |
| from datetime import datetime | |
| from torch.nn.functional import sigmoid | |
| from collections import Counter | |
| import logging | |
| import traceback | |
| import json | |
| # Set up logging | |
| logging.basicConfig(level=logging.DEBUG) | |
| logger = logging.getLogger(__name__) | |
| # Device configuration | |
| device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') | |
| logger.info(f"Using device: {device}") | |
| # Set up custom logging | |
| class CustomFormatter(logging.Formatter): | |
| """Custom formatter with colors and better formatting""" | |
| grey = "\x1b[38;21m" | |
| blue = "\x1b[38;5;39m" | |
| yellow = "\x1b[38;5;226m" | |
| red = "\x1b[38;5;196m" | |
| bold_red = "\x1b[31;1m" | |
| reset = "\x1b[0m" | |
| def format(self, record): | |
| # Remove the logger name from the output | |
| if record.levelno == logging.DEBUG: | |
| return f"{self.blue}{record.getMessage()}{self.reset}" | |
| elif record.levelno == logging.INFO: | |
| return f"{self.grey}{record.getMessage()}{self.reset}" | |
| elif record.levelno == logging.WARNING: | |
| return f"{self.yellow}{record.getMessage()}{self.reset}" | |
| elif record.levelno == logging.ERROR: | |
| return f"{self.red}{record.getMessage()}{self.reset}" | |
| elif record.levelno == logging.CRITICAL: | |
| return f"{self.bold_red}{record.getMessage()}{self.reset}" | |
| return record.getMessage() | |
| # Setup logger | |
| logger = logging.getLogger(__name__) | |
| logger.setLevel(logging.DEBUG) | |
| # Remove any existing handlers | |
| logger.handlers = [] | |
| # Create console handler with custom formatter | |
| ch = logging.StreamHandler() | |
| ch.setLevel(logging.DEBUG) | |
| ch.setFormatter(CustomFormatter()) | |
| logger.addHandler(ch) | |
| # Model initialization | |
| model_name = "SamanthaStorm/tether-multilabel-v6" | |
| model = AutoModelForSequenceClassification.from_pretrained(model_name).to(device) | |
| tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False) | |
| # sentiment model | |
| sentiment_model = AutoModelForSequenceClassification.from_pretrained("SamanthaStorm/tether-sentiment-v3").to(device) | |
| sentiment_tokenizer = AutoTokenizer.from_pretrained("SamanthaStorm/tether-sentiment-v3", use_fast=False) | |
| sentiment_model.eval() | |
| emotion_pipeline = hf_pipeline( | |
| "text-classification", | |
| model="j-hartmann/emotion-english-distilroberta-base", | |
| return_all_scores=True, # Get all emotion scores | |
| top_k=None, # Don't limit to top k predictions | |
| truncation=True, | |
| device=0 if torch.cuda.is_available() else -1 | |
| ) | |
| # DARVO model | |
| darvo_model = AutoModelForSequenceClassification.from_pretrained("SamanthaStorm/tether-darvo-regressor-v1").to(device) | |
| darvo_tokenizer = AutoTokenizer.from_pretrained("SamanthaStorm/tether-darvo-regressor-v1", use_fast=False) | |
| darvo_model.eval() | |
| boundary_model = AutoModelForSequenceClassification.from_pretrained("SamanthaStorm/healthy-boundary-predictor").to(device) | |
| boundary_tokenizer = AutoTokenizer.from_pretrained("SamanthaStorm/healthy-boundary-predictor", use_fast=False) | |
| boundary_model.eval() | |
| def predict_boundary_health(text): | |
| """Predict boundary health for given text - returns healthy probability""" | |
| try: | |
| inputs = boundary_tokenizer(text, return_tensors="pt", truncation=True, padding=True) | |
| inputs = {k: v.to(device) for k, v in inputs.items()} | |
| with torch.no_grad(): | |
| outputs = boundary_model(**inputs) | |
| predictions = torch.nn.functional.softmax(outputs.logits, dim=-1) | |
| # Return healthy probability (index 1 = healthy, index 0 = unhealthy) | |
| healthy_prob = predictions[0][1].item() | |
| return round(healthy_prob, 4) | |
| except Exception as e: | |
| logger.error(f"Error in boundary prediction: {e}") | |
| return 0.5 # Return neutral score on error | |
| def get_boundary_assessment(text, healthy_prob): | |
| """Get boundary assessment and recommendations""" | |
| if healthy_prob >= 0.8: | |
| return { | |
| 'assessment': 'healthy', | |
| 'label': 'Healthy Boundary', | |
| 'confidence': healthy_prob, | |
| 'description': 'This communication shows healthy boundary setting', | |
| 'recommendations': ['Continue this respectful communication approach'] | |
| } | |
| elif healthy_prob >= 0.6: | |
| return { | |
| 'assessment': 'mostly_healthy', | |
| 'label': 'Mostly Healthy', | |
| 'confidence': healthy_prob, | |
| 'description': 'Generally healthy communication with room for improvement', | |
| 'recommendations': ['Consider expressing needs more clearly', 'Focus on "I" statements'] | |
| } | |
| elif healthy_prob >= 0.4: | |
| return { | |
| 'assessment': 'concerning', | |
| 'label': 'Concerning Pattern', | |
| 'confidence': 1 - healthy_prob, | |
| 'description': 'Communication shows some concerning boundary patterns', | |
| 'recommendations': ['Avoid ultimatums and threats', 'Express needs respectfully', 'Consider the other person\'s perspective'] | |
| } | |
| else: | |
| return { | |
| 'assessment': 'unhealthy', | |
| 'label': 'Unhealthy Boundary', | |
| 'confidence': 1 - healthy_prob, | |
| 'description': 'Communication shows unhealthy boundary patterns', | |
| 'recommendations': ['Use "I" statements instead of "you" accusations', 'Avoid manipulation and control language', 'Consider counseling for communication skills'] | |
| } | |
| # Constants and Labels | |
| LABELS = [ | |
| "recovery phase", "control", "gaslighting", "guilt tripping", "dismissiveness", | |
| "blame shifting", "nonabusive", "projection", "insults", | |
| "contradictory statements", "obscure language", | |
| "veiled threats", "stalking language", "false concern", | |
| "false equivalence", "future faking" | |
| ] | |
| SENTIMENT_LABELS = ["supportive", "undermining"] | |
| THRESHOLDS = { | |
| "recovery phase": 0.278, | |
| "control": 0.287, | |
| "gaslighting": 0.144, | |
| "guilt tripping": 0.220, | |
| "dismissiveness": 0.142, | |
| "blame shifting": 0.183, | |
| "projection": 0.253, | |
| "insults": 0.247, | |
| "contradictory statements": 0.200, | |
| "obscure language": 0.455, | |
| "nonabusive": 0.281, | |
| # NEW v6 patterns: | |
| "veiled threats": 0.310, | |
| "stalking language": 0.339, | |
| "false concern": 0.334, | |
| "false equivalence": 0.317, | |
| "future faking": 0.385 | |
| } | |
| PATTERN_WEIGHTS = { | |
| "recovery phase": 0.7, | |
| "control": 1.4, | |
| "gaslighting": 1.3, | |
| "guilt tripping": 1.2, | |
| "dismissiveness": 0.9, | |
| "blame shifting": 1.0, | |
| "projection": 0.5, | |
| "insults": 1.4, | |
| "contradictory statements": 1.0, | |
| "obscure language": 0.9, | |
| "nonabusive": 0.0, | |
| # NEW v6 patterns: | |
| "veiled threats": 1.6, # High weight - very dangerous | |
| "stalking language": 1.8, # Highest weight - extremely dangerous | |
| "false concern": 1.1, # Moderate weight - manipulative | |
| "false equivalence": 1.3, # Enhances DARVO detection | |
| "future faking": 0.8 # Lower weight - manipulation tactic | |
| } | |
| ESCALATION_QUESTIONS = [ | |
| ("Partner has access to firearms or weapons", 4), | |
| ("Partner threatened to kill you", 3), | |
| ("Partner threatened you with a weapon", 3), | |
| ("Partner has ever choked you, even if you considered it consensual at the time", 4), | |
| ("Partner injured or threatened your pet(s)", 3), | |
| ("Partner has broken your things, punched or kicked walls, or thrown things ", 2), | |
| ("Partner forced or coerced you into unwanted sexual acts", 3), | |
| ("Partner threatened to take away your children", 2), | |
| ("Violence has increased in frequency or severity", 3), | |
| ("Partner monitors your calls/GPS/social media", 2) | |
| ] | |
| RISK_STAGE_LABELS = { | |
| 1: "π Risk Stage: Tension-Building\nThis message reflects rising emotional pressure or subtle control attempts.", | |
| 2: "π₯ Risk Stage: Escalation\nThis message includes direct or aggressive patterns, suggesting active harm.", | |
| 3: "π§οΈ Risk Stage: Reconciliation\nThis message reflects a reset attemptβapologies or emotional repair without accountability.", | |
| 4: "πΈ Risk Stage: Calm / Honeymoon\nThis message appears supportive but may follow prior harm, minimizing it." | |
| } | |
| THREAT_MOTIFS = [ | |
| "i'll kill you", "i'm going to hurt you", "you're dead", "you won't survive this", | |
| "i'll break your face", "i'll bash your head in", "i'll snap your neck", | |
| "i'll come over there and make you shut up", "i'll knock your teeth out", | |
| "you're going to bleed", "you want me to hit you?", "i won't hold back next time", | |
| "i swear to god i'll beat you", "next time, i won't miss", "i'll make you scream", | |
| "i know where you live", "i'm outside", "i'll be waiting", "i saw you with him", | |
| "you can't hide from me", "i'm coming to get you", "i'll find you", "i know your schedule", | |
| "i watched you leave", "i followed you home", "you'll regret this", "you'll be sorry", | |
| "you're going to wish you hadn't", "you brought this on yourself", "don't push me", | |
| "you have no idea what i'm capable of", "you better watch yourself", | |
| "i don't care what happens to you anymore", "i'll make you suffer", "you'll pay for this", | |
| "i'll never let you go", "you're nothing without me", "if you leave me, i'll kill myself", | |
| "i'll ruin you", "i'll tell everyone what you did", "i'll make sure everyone knows", | |
| "i'm going to destroy your name", "you'll lose everyone", "i'll expose you", | |
| "your friends will hate you", "i'll post everything", "you'll be cancelled", | |
| "you'll lose everything", "i'll take the house", "i'll drain your account", | |
| "you'll never see a dime", "you'll be broke when i'm done", "i'll make sure you lose your job", | |
| "i'll take your kids", "i'll make sure you have nothing", "you can't afford to leave me", | |
| "don't make me do this", "you know what happens when i'm mad", "you're forcing my hand", | |
| "if you just behaved, this wouldn't happen", "this is your fault", | |
| "you're making me hurt you", "i warned you", "you should have listened" | |
| ] | |
| # MOVED TO TOP LEVEL - Fixed tone severity mapping | |
| TONE_SEVERITY = { | |
| # Highest danger tones | |
| "obsessive fixation": 4, | |
| "menacing calm": 4, | |
| "conditional menace": 4, | |
| "surveillance intimacy": 4, | |
| # High danger tones | |
| "predatory concern": 3, | |
| "victim cosplay": 3, | |
| "entitled rage": 3, | |
| "direct threat": 3, | |
| # Moderate danger tones | |
| "manipulative hope": 2, | |
| "false vulnerability": 2, | |
| "calculated coldness": 2, | |
| "predictive punishment": 2, | |
| # Existing tones (keep current mappings) | |
| "emotional threat": 3, | |
| "forced accountability flip": 3, | |
| "performative regret": 2, | |
| "coercive warmth": 2, | |
| "cold invalidation": 2, | |
| "weaponized sadness": 2, | |
| "contradictory gaslight": 2, | |
| # Low risk tones | |
| "neutral": 0, | |
| "genuine vulnerability": 0 | |
| } | |
| # MOVED TO TOP LEVEL - Helper functions | |
| def log_emotional_tone_usage(tone_tag, patterns): | |
| """Log tone usage for analytics""" | |
| logger.debug(f"π Detected tone tag: {tone_tag} with patterns: {patterns}") | |
| # Track dangerous tone combinations | |
| dangerous_tones = [ | |
| "obsessive fixation", "menacing calm", "predatory concern", | |
| "surveillance intimacy", "conditional menace", "victim cosplay" | |
| ] | |
| if tone_tag in dangerous_tones: | |
| logger.warning(f"β οΈ Dangerous emotional tone detected: {tone_tag}") | |
| def calculate_tone_risk_boost(tone_tag): | |
| """Calculate risk boost based on emotional tone severity""" | |
| return TONE_SEVERITY.get(tone_tag, 0) | |
| def should_show_safety_planning(abuse_score, escalation_risk, detected_patterns): | |
| """Check if we should show safety planning""" | |
| if escalation_risk in ["High", "Critical"]: | |
| return True | |
| if abuse_score >= 70: | |
| return True | |
| dangerous_patterns = ["stalking language", "veiled threats", "threats"] | |
| if any(pattern in detected_patterns for pattern in dangerous_patterns): | |
| return True | |
| return False | |
| def generate_simple_safety_plan(abuse_score, escalation_risk, detected_patterns): | |
| """Generate a basic safety plan""" | |
| plan = "π‘οΈ **SAFETY PLANNING RECOMMENDED**\n\n" | |
| if escalation_risk == "Critical" or abuse_score >= 85: | |
| plan += "π¨ **CRITICAL SAFETY SITUATION**\n\n" | |
| plan += "**IMMEDIATE ACTIONS:**\n" | |
| plan += "β’ Contact domestic violence hotline: **1-800-799-7233** (24/7, free, confidential)\n" | |
| plan += "β’ Text START to **88788** for crisis text support\n" | |
| plan += "β’ Consider staying with trusted friends/family tonight\n" | |
| plan += "β’ Keep phone charged and accessible\n" | |
| plan += "β’ Have emergency bag ready (documents, medications, cash)\n" | |
| plan += "\n**IF IN IMMEDIATE DANGER: Call 911**\n\n" | |
| elif escalation_risk == "High" or abuse_score >= 70: | |
| plan += "β οΈ **HIGH RISK SITUATION**\n\n" | |
| plan += "**SAFETY STEPS:**\n" | |
| plan += "β’ Contact domestic violence hotline for safety planning: **1-800-799-7233**\n" | |
| plan += "β’ Identify 3 trusted people you can contact for help\n" | |
| plan += "β’ Plan escape routes and transportation options\n" | |
| plan += "β’ Document concerning behaviors with dates and details\n" | |
| plan += "β’ Research legal protection options\n\n" | |
| # Add pattern-specific advice | |
| if "stalking language" in detected_patterns: | |
| plan += "π **STALKING BEHAVIORS DETECTED:**\n" | |
| plan += "β’ Vary your routines and routes\n" | |
| plan += "β’ Check devices for tracking software\n" | |
| plan += "β’ Keep record of all stalking incidents\n" | |
| plan += "β’ Alert neighbors to watch for suspicious activity\n\n" | |
| if "veiled threats" in detected_patterns: | |
| plan += "β οΈ **THREATENING LANGUAGE IDENTIFIED:**\n" | |
| plan += "β’ Take all threats seriously, even indirect ones\n" | |
| plan += "β’ Document all threatening communications\n" | |
| plan += "β’ Inform trusted people about threat patterns\n" | |
| plan += "β’ Avoid being alone in isolated locations\n\n" | |
| # Always include crisis resources | |
| plan += "π **CRISIS RESOURCES (24/7):**\n" | |
| plan += "β’ **National DV Hotline:** 1-800-799-7233\n" | |
| plan += "β’ **Crisis Text Line:** Text START to 88788\n" | |
| plan += "β’ **Online Chat:** thehotline.org\n" | |
| plan += "β’ **Emergency:** Call 911\n\n" | |
| plan += "π **Remember:** You are not alone. This is not your fault. You deserve to be safe." | |
| return plan | |
| def detect_rare_threats(text): | |
| rare_threats = ["necktie party", "permanent solution", "final conversation"] | |
| if any(threat in text.lower() for threat in rare_threats): | |
| return [("veiled threats", 0.90, 1.6)] | |
| return [] | |
| def detect_enhanced_threats(text, patterns): | |
| """Enhanced threat detection for v6 patterns""" | |
| text_lower = text.lower() | |
| enhanced_threats = [] | |
| # Stalking language indicators | |
| stalking_phrases = [ | |
| "stop at nothing", "will find you", "know where you", | |
| "watching you", "following you", "can't hide", | |
| "i know your", "saw you with", "you belong to me" | |
| ] | |
| # Veiled threat indicators | |
| veiled_threat_phrases = [ | |
| "some people might", "things happen to people who", | |
| "be careful", "hope nothing happens", "accidents happen", | |
| "necktie party", "permanent solution", "wouldn't want" | |
| ] | |
| # False concern indicators | |
| false_concern_phrases = [ | |
| "just worried about", "concerned about your", | |
| "someone needs to protect", "for your own good" | |
| ] | |
| if any(phrase in text_lower for phrase in stalking_phrases): | |
| enhanced_threats.append("stalking language") | |
| if any(phrase in text_lower for phrase in veiled_threat_phrases): | |
| enhanced_threats.append("veiled threats") | |
| if any(phrase in text_lower for phrase in false_concern_phrases): | |
| enhanced_threats.append("false concern") | |
| return enhanced_threats | |
| def calculate_enhanced_risk_level(abuse_score, detected_patterns, escalation_risk, darvo_score): | |
| """Enhanced risk calculation that properly weights dangerous patterns""" | |
| # Start with base risk from escalation system | |
| base_risk = escalation_risk | |
| # CRITICAL PATTERNS - Auto-elevate to HIGH risk minimum | |
| critical_patterns = ["stalking language", "veiled threats"] | |
| has_critical = any(pattern in detected_patterns for pattern in critical_patterns) | |
| # DANGEROUS COMBINATIONS - Auto-elevate to CRITICAL | |
| dangerous_combos = [ | |
| ("stalking language", "control"), | |
| ("veiled threats", "stalking language"), | |
| ("stalking language", "false concern"), | |
| ("veiled threats", "control") | |
| ] | |
| has_dangerous_combo = any( | |
| all(pattern in detected_patterns for pattern in combo) | |
| for combo in dangerous_combos | |
| ) | |
| # FORCE RISK ELEVATION for dangerous patterns | |
| if has_dangerous_combo: | |
| return "Critical" | |
| elif has_critical and abuse_score >= 30: # Lower threshold for critical patterns | |
| return "High" | |
| elif has_critical: | |
| return "Moderate" | |
| elif abuse_score >= 70: | |
| return "High" | |
| elif abuse_score >= 50: | |
| return "Moderate" | |
| else: | |
| return base_risk | |
| def get_emotion_profile(text): | |
| """Get emotion profile from text with all scores""" | |
| try: | |
| emotions = emotion_pipeline(text) | |
| if isinstance(emotions, list) and isinstance(emotions[0], list): | |
| # Extract all scores from the first prediction | |
| emotion_scores = emotions[0] | |
| # Convert to dictionary with lowercase emotion names | |
| return {e['label'].lower(): round(e['score'], 3) for e in emotion_scores} | |
| return {} | |
| except Exception as e: | |
| logger.error(f"Error in get_emotion_profile: {e}") | |
| return { | |
| "sadness": 0.0, | |
| "joy": 0.0, | |
| "neutral": 0.0, | |
| "disgust": 0.0, | |
| "anger": 0.0, | |
| "fear": 0.0 | |
| } | |
| # FIXED FUNCTION - Added missing "d" and cleaned up structure | |
| def get_emotional_tone_tag(text, sentiment, patterns, abuse_score): | |
| """Get emotional tone tag based on emotions and patterns""" | |
| emotions = get_emotion_profile(text) | |
| sadness = emotions.get("sadness", 0) | |
| joy = emotions.get("joy", 0) | |
| neutral = emotions.get("neutral", 0) | |
| disgust = emotions.get("disgust", 0) | |
| anger = emotions.get("anger", 0) | |
| fear = emotions.get("fear", 0) | |
| text_lower = text.lower() | |
| # 1. Direct Threat Detection | |
| threat_indicators = [ | |
| "if you", "i'll make", "don't forget", "remember", "regret", | |
| "i control", "i'll take", "you'll lose", "make sure", | |
| "never see", "won't let" | |
| ] | |
| if ( | |
| any(indicator in text_lower for indicator in threat_indicators) and | |
| any(p in patterns for p in ["control", "insults"]) and | |
| (anger > 0.2 or disgust > 0.2 or abuse_score > 70) | |
| ): | |
| return "direct threat" | |
| # 2. Obsessive Fixation (for stalking language) | |
| obsessive_indicators = [ | |
| "stop at nothing", "most desired", "forever", "always will", | |
| "belong to me", "you're mine", "never let you go", "can't live without" | |
| ] | |
| if ( | |
| any(indicator in text_lower for indicator in obsessive_indicators) and | |
| "stalking language" in patterns and | |
| (joy > 0.3 or sadness > 0.4 or fear > 0.2) | |
| ): | |
| return "obsessive fixation" | |
| # 3. Menacing Calm (for veiled threats) | |
| veiled_threat_indicators = [ | |
| "some people might", "accidents happen", "be careful", | |
| "wouldn't want", "things happen", "unfortunate" | |
| ] | |
| if ( | |
| any(indicator in text_lower for indicator in veiled_threat_indicators) and | |
| "veiled threats" in patterns and | |
| neutral > 0.4 and anger < 0.2 | |
| ): | |
| return "menacing calm" | |
| # 4. Predatory Concern (for false concern) | |
| concern_indicators = [ | |
| "worried about", "concerned about", "for your own good", | |
| "someone needs to", "protect you", "take care of you" | |
| ] | |
| if ( | |
| any(indicator in text_lower for indicator in concern_indicators) and | |
| "false concern" in patterns and | |
| (joy > 0.2 or neutral > 0.3) and sentiment == "undermining" | |
| ): | |
| return "predatory concern" | |
| # 5. Victim Cosplay (for false equivalence/DARVO) | |
| victim_indicators = [ | |
| "i'm the victim", "you're abusing me", "i'm being hurt", | |
| "you're attacking me", "i'm innocent", "you're the problem" | |
| ] | |
| if ( | |
| any(indicator in text_lower for indicator in victim_indicators) and | |
| "false equivalence" in patterns and | |
| sadness > 0.4 and anger > 0.2 | |
| ): | |
| return "victim cosplay" | |
| # 6. Manipulative Hope (for future faking) | |
| future_indicators = [ | |
| "i'll change", "we'll be", "i promise", "things will be different", | |
| "next time", "from now on", "i'll never", "we'll have" | |
| ] | |
| if ( | |
| any(indicator in text_lower for indicator in future_indicators) and | |
| "future faking" in patterns and | |
| (joy > 0.3 or sadness > 0.3) | |
| ): | |
| return "manipulative hope" | |
| # 7. Surveillance Intimacy (for stalking with false intimacy) | |
| surveillance_indicators = [ | |
| "i know you", "i saw you", "i watched", "i've been", | |
| "your routine", "where you go", "what you do" | |
| ] | |
| if ( | |
| any(indicator in text_lower for indicator in surveillance_indicators) and | |
| "stalking language" in patterns and | |
| joy > 0.2 and neutral > 0.2 | |
| ): | |
| return "surveillance intimacy" | |
| # 8. Conditional Menace (for threats with conditions) | |
| conditional_indicators = [ | |
| "if you", "unless you", "you better", "don't make me", | |
| "you wouldn't want", "force me to" | |
| ] | |
| if ( | |
| any(indicator in text_lower for indicator in conditional_indicators) and | |
| any(p in patterns for p in ["veiled threats", "control"]) and | |
| anger > 0.3 and neutral > 0.2 | |
| ): | |
| return "conditional menace" | |
| # 9. False Vulnerability (manipulation disguised as weakness) | |
| vulnerability_indicators = [ | |
| "i can't help", "i need you", "without you i", "you're all i have", | |
| "i'm lost without", "i don't know what to do" | |
| ] | |
| if ( | |
| any(indicator in text_lower for indicator in vulnerability_indicators) and | |
| any(p in patterns for p in ["guilt tripping", "future faking", "false concern"]) and | |
| sadness > 0.5 and sentiment == "undermining" | |
| ): | |
| return "false vulnerability" | |
| # 10. Entitled Rage (anger with entitlement) | |
| entitlement_indicators = [ | |
| "you owe me", "after everything", "how dare you", "you should", | |
| "i deserve", "you have no right" | |
| ] | |
| if ( | |
| any(indicator in text_lower for indicator in entitlement_indicators) and | |
| anger > 0.5 and | |
| any(p in patterns for p in ["control", "insults", "blame shifting"]) | |
| ): | |
| return "entitled rage" | |
| # 11. Calculated Coldness (deliberate emotional detachment) | |
| cold_indicators = [ | |
| "i don't care", "whatever", "your choice", "suit yourself", | |
| "fine by me", "your loss" | |
| ] | |
| calculated_patterns = ["dismissiveness", "obscure language", "control"] | |
| if ( | |
| any(indicator in text_lower for indicator in cold_indicators) and | |
| any(p in patterns for p in calculated_patterns) and | |
| neutral > 0.6 and all(e < 0.2 for e in [anger, sadness, joy]) | |
| ): | |
| return "calculated coldness" | |
| # 12. Predictive Punishment | |
| future_consequences = [ | |
| "will end up", "you'll be", "you will be", "going to be", | |
| "will become", "will find yourself", "will realize", | |
| "you'll regret", "you'll see", "will learn", "truly will", | |
| "end up alone", "end up miserable" | |
| ] | |
| dismissive_endings = [ | |
| "i'm out", "i'm done", "whatever", "good luck", | |
| "your choice", "your problem", "regardless", | |
| "keep", "keep on" | |
| ] | |
| if ( | |
| (any(phrase in text_lower for phrase in future_consequences) or | |
| any(end in text_lower for end in dismissive_endings)) and | |
| any(p in ["dismissiveness", "control"] for p in patterns) and | |
| (disgust > 0.2 or neutral > 0.3 or anger > 0.2) | |
| ): | |
| return "predictive punishment" | |
| # 13. Performative Regret | |
| if ( | |
| sadness > 0.3 and | |
| any(p in patterns for p in ["blame shifting", "guilt tripping", "recovery"]) and | |
| (sentiment == "undermining" or abuse_score > 40) | |
| ): | |
| return "performative regret" | |
| # 14. Coercive Warmth | |
| if ( | |
| (joy > 0.2 or sadness > 0.3) and | |
| any(p in patterns for p in ["control", "gaslighting"]) and | |
| sentiment == "undermining" | |
| ): | |
| return "coercive warmth" | |
| # 15. Cold Invalidation | |
| if ( | |
| (neutral + disgust) > 0.4 and | |
| any(p in patterns for p in ["dismissiveness", "projection", "obscure language"]) and | |
| sentiment == "undermining" | |
| ): | |
| return "cold invalidation" | |
| # 16. Genuine Vulnerability | |
| if ( | |
| (sadness + fear) > 0.4 and | |
| sentiment == "supportive" and | |
| all(p in ["recovery"] for p in patterns) | |
| ): | |
| return "genuine vulnerability" | |
| # 17. Emotional Threat | |
| if ( | |
| (anger + disgust) > 0.4 and | |
| any(p in patterns for p in ["control", "insults", "dismissiveness"]) and | |
| sentiment == "undermining" | |
| ): | |
| return "emotional threat" | |
| # 18. Weaponized Sadness | |
| if ( | |
| sadness > 0.5 and | |
| any(p in patterns for p in ["guilt tripping", "projection"]) and | |
| sentiment == "undermining" | |
| ): | |
| return "weaponized sadness" | |
| # 19. Contradictory Gaslight | |
| if ( | |
| (joy + anger + sadness) > 0.4 and | |
| any(p in patterns for p in ["gaslighting", "contradictory statements"]) and | |
| sentiment == "undermining" | |
| ): | |
| return "contradictory gaslight" | |
| # 20. Forced Accountability Flip | |
| if ( | |
| (anger + disgust) > 0.4 and | |
| any(p in patterns for p in ["blame shifting", "projection"]) and | |
| sentiment == "undermining" | |
| ): | |
| return "forced accountability flip" | |
| # Emotional Instability Fallback | |
| if ( | |
| (anger + sadness + disgust) > 0.5 and | |
| sentiment == "undermining" | |
| ): | |
| return "emotional instability" | |
| return "neutral" | |
| def predict_darvo_score(text): | |
| """Predict DARVO score for given text""" | |
| try: | |
| inputs = darvo_tokenizer(text, return_tensors="pt", truncation=True, padding=True) | |
| inputs = {k: v.to(device) for k, v in inputs.items()} | |
| with torch.no_grad(): | |
| logits = darvo_model(**inputs).logits | |
| return round(sigmoid(logits.cpu()).item(), 4) | |
| except Exception as e: | |
| logger.error(f"Error in DARVO prediction: {e}") | |
| return 0.0 | |
| def detect_weapon_language(text): | |
| """Detect weapon-related language in text""" | |
| weapon_keywords = ["knife", "gun", "bomb", "weapon", "kill", "stab"] | |
| t = text.lower() | |
| return any(w in t for w in weapon_keywords) | |
| def get_risk_stage(patterns, sentiment): | |
| """Determine risk stage based on patterns and sentiment""" | |
| try: | |
| if "insults" in patterns: | |
| return 2 | |
| elif "recovery" in patterns: | |
| return 3 | |
| elif "control" in patterns or "guilt tripping" in patterns: | |
| return 1 | |
| elif sentiment == "supportive" and any(p in patterns for p in ["projection", "dismissiveness"]): | |
| return 4 | |
| return 1 | |
| except Exception as e: | |
| logger.error(f"Error determining risk stage: {e}") | |
| return 1 | |
| def detect_threat_pattern(text, patterns): | |
| """Detect if a message contains threat patterns""" | |
| # Threat indicators in text | |
| threat_words = [ | |
| "regret", "sorry", "pay", "hurt", "suffer", "destroy", "ruin", | |
| "expose", "tell everyone", "never see", "take away", "lose", | |
| "control", "make sure", "won't let", "force", "warn", "never", | |
| "punish", "teach you", "learn", "show you", "remember", | |
| "if you", "don't forget", "i control", "i'll make sure", # Added these specific phrases | |
| "bank account", "phone", "money", "access" # Added financial control indicators | |
| ] | |
| # Check for conditional threats (if/then structures) | |
| text_lower = text.lower() | |
| conditional_threat = ( | |
| "if" in text_lower and | |
| any(word in text_lower for word in ["regret", "make sure", "control"]) | |
| ) | |
| has_threat_words = any(word in text_lower for word in threat_words) | |
| # Check for threat patterns | |
| threat_patterns = {"control", "gaslighting", "blame shifting", "insults"} | |
| has_threat_patterns = any(p in threat_patterns for p in patterns) | |
| return has_threat_words or has_threat_patterns or conditional_threat | |
| def detect_compound_threat(text, patterns): | |
| """Detect compound threats in a single message""" | |
| try: | |
| # Rule A: Single Message Multiple Patterns | |
| high_risk_patterns = {"control", "gaslighting", "blame shifting", "insults"} | |
| high_risk_count = sum(1 for p in patterns if p in high_risk_patterns) | |
| has_threat = detect_threat_pattern(text, patterns) | |
| # Special case for control + threats | |
| has_control = "control" in patterns | |
| has_conditional_threat = "if" in text.lower() and any(word in text.lower() | |
| for word in ["regret", "make sure", "control"]) | |
| # Single message compound threat | |
| if (has_threat and high_risk_count >= 2) or (has_control and has_conditional_threat): | |
| return True, "single_message" | |
| return False, None | |
| except Exception as e: | |
| logger.error(f"Error in compound threat detection: {e}") | |
| return False, None | |
| def analyze_message_batch_threats(messages, results): | |
| """Analyze multiple messages for compound threats""" | |
| threat_messages = [] | |
| support_messages = [] | |
| for i, (msg, (result, _)) in enumerate(zip(messages, results)): | |
| if not msg.strip(): # Skip empty messages | |
| continue | |
| patterns = result[1] # Get detected patterns | |
| # Check for threat in this message | |
| if detect_threat_pattern(msg, patterns): | |
| threat_messages.append(i) | |
| # Check for supporting patterns | |
| if any(p in {"control", "gaslighting", "blame shifting"} for p in patterns): | |
| support_messages.append(i) | |
| # Rule B: Multi-Message Accumulation | |
| if len(threat_messages) >= 2: | |
| return True, "multiple_threats" | |
| elif len(threat_messages) == 1 and len(support_messages) >= 2: | |
| return True, "threat_with_support" | |
| return False, None | |
| def compute_abuse_score(matched_scores, sentiment): | |
| """Compute abuse score from matched patterns and sentiment""" | |
| try: | |
| if not matched_scores: | |
| logger.debug("No matched scores, returning 0") | |
| return 0.0 | |
| # Calculate weighted score | |
| total_weight = sum(weight for _, _, weight in matched_scores) | |
| if total_weight == 0: | |
| logger.debug("Total weight is 0, returning 0") | |
| return 0.0 | |
| # Get highest pattern scores | |
| pattern_scores = [(label, score) for label, score, _ in matched_scores] | |
| sorted_scores = sorted(pattern_scores, key=lambda x: x[1], reverse=True) | |
| logger.debug(f"Sorted pattern scores: {sorted_scores}") | |
| # Base score calculation | |
| weighted_sum = sum(score * weight for _, score, weight in matched_scores) | |
| base_score = (weighted_sum / total_weight) * 100 | |
| logger.debug(f"Initial base score: {base_score:.1f}") | |
| # Cap maximum score based on pattern severity | |
| max_score = 85.0 # Set maximum possible score | |
| if any(label in {'control', 'gaslighting'} for label, _, _ in matched_scores): | |
| max_score = 90.0 | |
| logger.debug(f"Increased max score to {max_score} due to high severity patterns") | |
| # Apply diminishing returns for multiple patterns | |
| if len(matched_scores) > 1: | |
| multiplier = 1 + (0.1 * (len(matched_scores) - 1)) | |
| base_score *= multiplier | |
| logger.debug(f"Applied multiplier {multiplier:.2f} for {len(matched_scores)} patterns") | |
| # Apply sentiment modifier | |
| if sentiment == "supportive": | |
| base_score *= 0.85 | |
| logger.debug("Applied 15% reduction for supportive sentiment") | |
| final_score = min(round(base_score, 1), max_score) | |
| logger.debug(f"Final abuse score: {final_score}") | |
| return final_score | |
| except Exception as e: | |
| logger.error(f"Error computing abuse score: {e}") | |
| return 0.0 | |
| def detect_explicit_abuse(text): | |
| """Improved explicit abuse detection with word boundary checking""" | |
| import re | |
| explicit_abuse_words = ['fuck', 'bitch', 'shit', 'dick'] # Removed 'ass' | |
| # Add more specific patterns for actual abusive uses of 'ass' | |
| abusive_ass_patterns = [ | |
| r'\bass\b(?!\s*glass)', # 'ass' not followed by 'glass' | |
| r'\bdumb\s*ass\b', | |
| r'\bkiss\s*my\s*ass\b', | |
| r'\bget\s*your\s*ass\b' | |
| ] | |
| text_lower = text.lower() | |
| # Check basic explicit words | |
| for word in explicit_abuse_words: | |
| if re.search(r'\b' + word + r'\b', text_lower): | |
| return True | |
| # Check specific abusive 'ass' patterns | |
| for pattern in abusive_ass_patterns: | |
| if re.search(pattern, text_lower): | |
| return True | |
| return False | |
| def analyze_single_message(text, thresholds): | |
| """Analyze a single message for abuse patterns with boundary assessment""" | |
| logger.debug("\n=== DEBUG START ===") | |
| logger.debug(f"Input text: {text}") | |
| try: | |
| if not text.strip(): | |
| logger.debug("Empty text, returning zeros") | |
| return 0.0, [], [], {"label": "none"}, 1, 0.0, None, {'assessment': 'neutral', 'confidence': 0.5} | |
| # BOUNDARY HEALTH CHECK - Add this new section | |
| logger.debug("\nπ‘οΈ BOUNDARY HEALTH ANALYSIS") | |
| logger.debug("-" * 40) | |
| healthy_prob = predict_boundary_health(text) | |
| boundary_assessment = get_boundary_assessment(text, healthy_prob) | |
| logger.debug(f"Boundary Health Score: {healthy_prob:.3f}") | |
| logger.debug(f"Boundary Assessment: {boundary_assessment['label']}") | |
| # EARLY SUPPORTIVE MESSAGE CHECK | |
| innocent_indicators = [ | |
| 'broken', 'not working', 'cracked', 'glass', 'screen', 'phone', | |
| 'device', 'battery', 'charger', 'wifi', 'internet', 'computer', | |
| 'sorry', 'apologize', 'my fault', 'mistake' | |
| ] | |
| # Enhanced early return check - now includes boundary health | |
| if (any(indicator in text.lower() for indicator in innocent_indicators) and | |
| len(text.split()) < 20 and | |
| not any(threat in text.lower() for threat in ['kill', 'hurt', 'destroy', 'hate']) and | |
| healthy_prob > 0.7): # Added boundary health check | |
| # Run quick sentiment check | |
| sent_inputs = sentiment_tokenizer(text, return_tensors="pt", truncation=True, padding=True) | |
| sent_inputs = {k: v.to(device) for k, v in sent_inputs.items()} | |
| with torch.no_grad(): | |
| sent_logits = sentiment_model(**sent_inputs).logits[0] | |
| sent_probs = torch.softmax(sent_logits, dim=-1).cpu().numpy() | |
| # If sentiment is strongly supportive AND boundary health is good, return early | |
| if sent_probs[0] > 0.8: # 80% supportive | |
| logger.debug("Early return: Message appears to be innocent/supportive with healthy boundaries") | |
| return 0.0, [], [], {"label": "supportive"}, 1, 0.0, "neutral", boundary_assessment | |
| # Check for explicit abuse (moved AFTER early return check) | |
| explicit_abuse = detect_explicit_abuse(text) | |
| logger.debug(f"Explicit abuse detected: {explicit_abuse}") | |
| # Abuse model inference | |
| inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True) | |
| inputs = {k: v.to(device) for k, v in inputs.items()} | |
| with torch.no_grad(): | |
| outputs = model(**inputs) | |
| raw_scores = torch.sigmoid(outputs.logits.squeeze(0)).cpu().numpy() | |
| # Log raw model outputs | |
| logger.debug("\nRaw model scores:") | |
| for label, score in zip(LABELS, raw_scores): | |
| logger.debug(f"{label}: {score:.3f}") | |
| # Get predictions and sort them | |
| predictions = list(zip(LABELS, raw_scores)) | |
| sorted_predictions = sorted(predictions, key=lambda x: x[1], reverse=True) | |
| logger.debug("\nTop 3 predictions:") | |
| for label, score in sorted_predictions[:3]: | |
| logger.debug(f"{label}: {score:.3f}") | |
| # Apply thresholds | |
| threshold_labels = [] | |
| if explicit_abuse: | |
| threshold_labels.append("insults") | |
| logger.debug("\nForced inclusion of 'insults' due to explicit abuse") | |
| for label, score in sorted_predictions: | |
| base_threshold = thresholds.get(label, 0.25) | |
| if explicit_abuse: | |
| base_threshold *= 0.5 | |
| if score > base_threshold: | |
| if label not in threshold_labels: | |
| threshold_labels.append(label) | |
| logger.debug(f"\nLabels that passed thresholds: {threshold_labels}") | |
| # Calculate matched scores | |
| matched_scores = [] | |
| for label in threshold_labels: | |
| score = raw_scores[LABELS.index(label)] | |
| weight = PATTERN_WEIGHTS.get(label, 1.0) | |
| if explicit_abuse and label == "insults": | |
| weight *= 1.5 | |
| matched_scores.append((label, score, weight)) | |
| # Get sentiment | |
| sent_inputs = sentiment_tokenizer(text, return_tensors="pt", truncation=True, padding=True) | |
| sent_inputs = {k: v.to(device) for k, v in sent_inputs.items()} | |
| with torch.no_grad(): | |
| sent_logits = sentiment_model(**sent_inputs).logits[0] | |
| sent_probs = torch.softmax(sent_logits, dim=-1).cpu().numpy() | |
| # Add detailed logging | |
| logger.debug("\nπ SENTIMENT ANALYSIS DETAILS") | |
| logger.debug(f"Raw logits: {sent_logits}") | |
| logger.debug(f"Probabilities: supportive={sent_probs[0]:.3f}, undermining={sent_probs[1]:.3f}") | |
| # Make sure we're using the correct index mapping | |
| sentiment = SENTIMENT_LABELS[int(np.argmax(sent_probs))] | |
| logger.debug(f"Selected sentiment: {sentiment}") | |
| enhanced_patterns = detect_enhanced_threats(text, threshold_labels) | |
| for pattern in enhanced_patterns: | |
| if pattern not in threshold_labels: | |
| threshold_labels.append(pattern) | |
| # Add to matched_scores with high confidence | |
| weight = PATTERN_WEIGHTS.get(pattern, 1.0) | |
| matched_scores.append((pattern, 0.85, weight)) | |
| # Calculate abuse score | |
| abuse_score = compute_abuse_score(matched_scores, sentiment) | |
| if explicit_abuse: | |
| abuse_score = max(abuse_score, 70.0) | |
| # Apply boundary health modifier to abuse score | |
| if healthy_prob > 0.8 and not explicit_abuse: | |
| # Very healthy boundaries - cap abuse score much lower | |
| abuse_score = min(abuse_score, 20.0) | |
| logger.debug(f"Capped abuse score to {abuse_score} due to very healthy boundaries") | |
| elif healthy_prob > 0.6 and sentiment == "supportive": | |
| # Moderately healthy boundaries with supportive sentiment | |
| abuse_score = min(abuse_score, 35.0) | |
| logger.debug(f"Capped abuse score to {abuse_score} due to healthy boundaries") | |
| # Apply sentiment-based score capping BEFORE compound threat check | |
| if sentiment == "supportive" and not explicit_abuse: | |
| # For supportive messages, cap the abuse score much lower | |
| abuse_score = min(abuse_score, 30.0) | |
| logger.debug(f"Capped abuse score to {abuse_score} due to supportive sentiment") | |
| # Check for compound threats | |
| compound_threat_flag, threat_type = detect_compound_threat(text, threshold_labels) | |
| # Apply compound threat override only for non-supportive messages | |
| if compound_threat_flag and sentiment != "supportive": | |
| logger.debug(f"β οΈ Compound threat detected in message: {threat_type}") | |
| abuse_score = max(abuse_score, 85.0) | |
| # Get DARVO score | |
| darvo_score = predict_darvo_score(text) | |
| # Get tone using emotion-based approach | |
| tone_tag = get_emotional_tone_tag(text, sentiment, threshold_labels, abuse_score) | |
| # Log tone usage | |
| log_emotional_tone_usage(tone_tag, threshold_labels) | |
| # Check for the specific combination (final safety check) | |
| highest_pattern = max(matched_scores, key=lambda x: x[1])[0] if matched_scores else None | |
| if sentiment == "supportive" and tone_tag == "neutral" and highest_pattern == "obscure language" and healthy_prob > 0.6: | |
| logger.debug("Message classified as likely non-abusive (supportive, neutral, healthy boundaries). Returning low risk.") | |
| return 0.0, [], [], {"label": "supportive"}, 1, 0.0, "neutral", boundary_assessment | |
| # Set stage | |
| stage = 2 if explicit_abuse or abuse_score > 70 else 1 | |
| logger.debug("=== DEBUG END ===\n") | |
| # Return with boundary assessment as the 8th element | |
| return abuse_score, threshold_labels, matched_scores, {"label": sentiment}, stage, darvo_score, tone_tag, boundary_assessment | |
| except Exception as e: | |
| logger.error(f"Error in analyze_single_message: {e}") | |
| return 0.0, [], [], {"label": "error"}, 1, 0.0, None, {'assessment': 'error', 'confidence': 0.0} | |
| def generate_abuse_score_chart(dates, scores, patterns): | |
| """Generate a timeline chart of abuse scores""" | |
| try: | |
| plt.figure(figsize=(10, 6)) | |
| plt.clf() | |
| # Create new figure | |
| fig, ax = plt.subplots(figsize=(10, 6)) | |
| # Plot points and lines | |
| x = range(len(scores)) | |
| plt.plot(x, scores, 'bo-', linewidth=2, markersize=8) | |
| # Add labels for each point with highest scoring pattern | |
| for i, (score, pattern) in enumerate(zip(scores, patterns)): | |
| # Get the pattern and its score | |
| plt.annotate( | |
| f'{pattern}\n{score:.0f}%', | |
| (i, score), | |
| textcoords="offset points", | |
| xytext=(0, 10), | |
| ha='center', | |
| bbox=dict( | |
| boxstyle='round,pad=0.5', | |
| fc='white', | |
| ec='gray', | |
| alpha=0.8 | |
| ) | |
| ) | |
| # Customize the plot | |
| plt.ylim(-5, 105) | |
| plt.grid(True, linestyle='--', alpha=0.7) | |
| plt.title('Abuse Pattern Timeline', pad=20, fontsize=12) | |
| plt.ylabel('Abuse Score %') | |
| # X-axis labels | |
| plt.xticks(x, dates, rotation=45) | |
| # Risk level bands with better colors | |
| plt.axhspan(0, 50, color='#90EE90', alpha=0.2) # light green - Low Risk | |
| plt.axhspan(50, 70, color='#FFD700', alpha=0.2) # gold - Moderate Risk | |
| plt.axhspan(70, 85, color='#FFA500', alpha=0.2) # orange - High Risk | |
| plt.axhspan(85, 100, color='#FF6B6B', alpha=0.2) # light red - Critical Risk | |
| # Add risk level labels | |
| plt.text(-0.2, 25, 'Low Risk', rotation=90, va='center') | |
| plt.text(-0.2, 60, 'Moderate Risk', rotation=90, va='center') | |
| plt.text(-0.2, 77.5, 'High Risk', rotation=90, va='center') | |
| plt.text(-0.2, 92.5, 'Critical Risk', rotation=90, va='center') | |
| # Adjust layout | |
| plt.tight_layout() | |
| # Convert plot to image | |
| buf = io.BytesIO() | |
| plt.savefig(buf, format='png', bbox_inches='tight') | |
| buf.seek(0) | |
| plt.close('all') # Close all figures to prevent memory leaks | |
| return Image.open(buf) | |
| except Exception as e: | |
| logger.error(f"Error generating abuse score chart: {e}") | |
| return None | |
| def analyze_composite(msg1, msg2, msg3, *answers_and_none): | |
| """Analyze multiple messages and checklist responses""" | |
| logger.debug("\nπ STARTING NEW ANALYSIS") | |
| logger.debug("=" * 50) | |
| # Define severity categories at the start | |
| high = {'control'} | |
| moderate = {'gaslighting', 'dismissiveness', 'obscure language', 'insults', | |
| 'contradictory statements', 'guilt tripping'} | |
| low = {'blame shifting', 'projection', 'recovery'} | |
| try: | |
| # Process checklist responses | |
| logger.debug("\nπ CHECKLIST PROCESSING") | |
| logger.debug("=" * 50) | |
| none_selected_checked = answers_and_none[-1] | |
| responses_checked = any(answers_and_none[:-1]) | |
| none_selected = not responses_checked and none_selected_checked | |
| logger.debug("Checklist Status:") | |
| logger.debug(f" β’ None Selected Box: {'β' if none_selected_checked else 'β'}") | |
| logger.debug(f" β’ Has Responses: {'β' if responses_checked else 'β'}") | |
| logger.debug(f" β’ Final Status: {'None Selected' if none_selected else 'Has Selections'}") | |
| if none_selected: | |
| escalation_score = 0 | |
| escalation_note = "Checklist completed: no danger items reported." | |
| escalation_completed = True | |
| logger.debug("\nβ Checklist: No items selected") | |
| elif responses_checked: | |
| escalation_score = sum(w for (_, w), a in zip(ESCALATION_QUESTIONS, answers_and_none[:-1]) if a) | |
| escalation_note = "Checklist completed." | |
| escalation_completed = True | |
| logger.debug(f"\nπ Checklist Score: {escalation_score}") | |
| # Log checked items | |
| logger.debug("\nβ οΈ Selected Risk Factors:") | |
| for (q, w), a in zip(ESCALATION_QUESTIONS, answers_and_none[:-1]): | |
| if a: | |
| logger.debug(f" β’ [{w} points] {q}") | |
| else: | |
| escalation_score = None | |
| escalation_note = "Checklist not completed." | |
| escalation_completed = False | |
| logger.debug("\nβ Checklist: Not completed") | |
| # Process messages | |
| logger.debug("\nπ MESSAGE PROCESSING") | |
| logger.debug("=" * 50) | |
| messages = [msg1, msg2, msg3] | |
| active = [(m, f"Message {i+1}") for i, m in enumerate(messages) if m.strip()] | |
| logger.debug(f"Active Messages: {len(active)} of 3") | |
| if not active: | |
| logger.debug("β Error: No messages provided") | |
| return "Please enter at least one message.", None | |
| # Detect threats | |
| logger.debug("\nπ¨ THREAT DETECTION") | |
| logger.debug("=" * 50) | |
| def normalize(text): | |
| import unicodedata | |
| text = text.lower().strip() | |
| text = unicodedata.normalize("NFKD", text) | |
| text = text.replace("'", "'") | |
| return re.sub(r"[^a-z0-9 ]", "", text) | |
| def detect_threat_motifs(message, motif_list): | |
| norm_msg = normalize(message) | |
| return [motif for motif in motif_list if normalize(motif) in norm_msg] | |
| # Analyze threats and patterns | |
| immediate_threats = [detect_threat_motifs(m, THREAT_MOTIFS) for m, _ in active] | |
| flat_threats = [t for sublist in immediate_threats for t in sublist] | |
| threat_risk = "Yes" if flat_threats else "No" | |
| # Analyze each message | |
| logger.debug("\nπ INDIVIDUAL MESSAGE ANALYSIS") | |
| logger.debug("=" * 50) | |
| results = [] | |
| for m, d in active: | |
| logger.debug(f"\nπ ANALYZING {d}") | |
| logger.debug("-" * 40) | |
| result = analyze_single_message(m, THRESHOLDS.copy()) | |
| # Check for non-abusive classification and skip further analysis | |
| if result[0] == 0.0 and result[1] == [] and result[3] == {"label": "supportive"} and result[4] == 1 and result[5] == 0.0 and result[6] == "neutral": | |
| logger.debug(f"β {d} classified as non-abusive, skipping further analysis.") | |
| continue # This MUST be inside the for loop | |
| results.append((result, d)) | |
| # UNPACK RESULT FIRST - INSIDE the for loop | |
| abuse_score, patterns, matched_scores, sentiment, stage, darvo_score, tone, boundary_assessment = result | |
| # NOW you can log these variables safely | |
| logger.debug("\nπ CORE METRICS") | |
| logger.debug(f" β’ Abuse Score: {abuse_score:.1f}%") | |
| logger.debug(f" β’ DARVO Score: {darvo_score:.3f}") | |
| logger.debug(f" β’ Risk Stage: {stage}") | |
| logger.debug(f" β’ Sentiment: {sentiment['label']}") | |
| logger.debug(f" β’ Tone: {tone}") | |
| # Log detected patterns with scores | |
| if patterns: | |
| logger.debug("\nπ― DETECTED PATTERNS") | |
| for label, score, weight in matched_scores: | |
| severity = "βHIGH" if label in high else "β οΈ MODERATE" if label in moderate else "π LOW" | |
| logger.debug(f" β’ {severity} | {label}: {score:.3f} (weight: {weight})") | |
| else: | |
| logger.debug("\nβ No abuse patterns detected") | |
| # AFTER the for loop - Extract scores and metadata | |
| abuse_scores = [r[0][0] for r in results] | |
| # ... rest of your code | |
| # NOW you can log these variables safely | |
| logger.debug("\nπ CORE METRICS") | |
| logger.debug(f" β’ Abuse Score: {abuse_score:.1f}%") | |
| logger.debug(f" β’ DARVO Score: {darvo_score:.3f}") | |
| logger.debug(f" β’ Risk Stage: {stage}") | |
| logger.debug(f" β’ Sentiment: {sentiment['label']}") | |
| logger.debug(f" β’ Tone: {tone}") # Now this works! | |
| # Log detected patterns with scores | |
| if patterns: | |
| logger.debug("\nπ― DETECTED PATTERNS") | |
| for label, score, weight in matched_scores: | |
| severity = "βHIGH" if label in high else "β οΈ MODERATE" if label in moderate else "π LOW" | |
| logger.debug(f" β’ {severity} | {label}: {score:.3f} (weight: {weight})") | |
| else: | |
| logger.debug("\nβ No abuse patterns detected") | |
| # Extract scores and metadata | |
| abuse_scores = [r[0][0] for r in results] | |
| stages = [r[0][4] for r in results] | |
| darvo_scores = [r[0][5] for r in results] | |
| tone_tags = [r[0][6] for r in results] | |
| dates_used = [r[1] for r in results] | |
| # Pattern Analysis Summary | |
| logger.debug("\nπ PATTERN ANALYSIS SUMMARY") | |
| logger.debug("=" * 50) | |
| predicted_labels = [label for r in results for label in r[0][1]] | |
| if predicted_labels: | |
| logger.debug("Detected Patterns Across All Messages:") | |
| pattern_counts = Counter(predicted_labels) | |
| # Log high severity patterns first | |
| high_patterns = [p for p in pattern_counts if p in high] | |
| if high_patterns: | |
| logger.debug("\nβ HIGH SEVERITY PATTERNS:") | |
| for p in high_patterns: | |
| logger.debug(f" β’ {p} (Γ{pattern_counts[p]})") | |
| # Then moderate | |
| moderate_patterns = [p for p in pattern_counts if p in moderate] | |
| if moderate_patterns: | |
| logger.debug("\nβ οΈ MODERATE SEVERITY PATTERNS:") | |
| for p in moderate_patterns: | |
| logger.debug(f" β’ {p} (Γ{pattern_counts[p]})") | |
| # Then low | |
| low_patterns = [p for p in pattern_counts if p in low] | |
| if low_patterns: | |
| logger.debug("\nπ LOW SEVERITY PATTERNS:") | |
| for p in low_patterns: | |
| logger.debug(f" β’ {p} (Γ{pattern_counts[p]})") | |
| else: | |
| logger.debug("β No patterns detected across messages") | |
| # Pattern Severity Analysis | |
| logger.debug("\nβοΈ SEVERITY ANALYSIS") | |
| logger.debug("=" * 50) | |
| counts = {'high': 0, 'moderate': 0, 'low': 0} | |
| for label in predicted_labels: | |
| if label in high: | |
| counts['high'] += 1 | |
| elif label in moderate: | |
| counts['moderate'] += 1 | |
| elif label in low: | |
| counts['low'] += 1 | |
| logger.debug("Pattern Distribution:") | |
| if counts['high'] > 0: | |
| logger.debug(f" β High Severity: {counts['high']}") | |
| if counts['moderate'] > 0: | |
| logger.debug(f" β οΈ Moderate Severity: {counts['moderate']}") | |
| if counts['low'] > 0: | |
| logger.debug(f" π Low Severity: {counts['low']}") | |
| total_patterns = sum(counts.values()) | |
| if total_patterns > 0: | |
| logger.debug(f"\nSeverity Percentages:") | |
| logger.debug(f" β’ High: {(counts['high']/total_patterns)*100:.1f}%") | |
| logger.debug(f" β’ Moderate: {(counts['moderate']/total_patterns)*100:.1f}%") | |
| logger.debug(f" β’ Low: {(counts['low']/total_patterns)*100:.1f}%") | |
| # Risk Assessment | |
| logger.debug("\nπ― RISK ASSESSMENT") | |
| logger.debug("=" * 50) | |
| if counts['high'] >= 2 and counts['moderate'] >= 2: | |
| pattern_escalation_risk = "Critical" | |
| logger.debug("ββ CRITICAL RISK") | |
| logger.debug(" β’ Multiple high and moderate patterns detected") | |
| logger.debug(f" β’ High patterns: {counts['high']}") | |
| logger.debug(f" β’ Moderate patterns: {counts['moderate']}") | |
| elif (counts['high'] >= 2 and counts['moderate'] >= 1) or \ | |
| (counts['moderate'] >= 3) or \ | |
| (counts['high'] >= 1 and counts['moderate'] >= 2): | |
| pattern_escalation_risk = "High" | |
| logger.debug("β HIGH RISK") | |
| logger.debug(" β’ Significant pattern combination detected") | |
| logger.debug(f" β’ High patterns: {counts['high']}") | |
| logger.debug(f" β’ Moderate patterns: {counts['moderate']}") | |
| elif (counts['moderate'] == 2) or \ | |
| (counts['high'] == 1 and counts['moderate'] == 1) or \ | |
| (counts['moderate'] == 1 and counts['low'] >= 2) or \ | |
| (counts['high'] == 1 and sum(counts.values()) == 1): | |
| pattern_escalation_risk = "Moderate" | |
| logger.debug("β οΈ MODERATE RISK") | |
| logger.debug(" β’ Concerning pattern combination detected") | |
| logger.debug(f" β’ Pattern distribution: H:{counts['high']}, M:{counts['moderate']}, L:{counts['low']}") | |
| else: | |
| pattern_escalation_risk = "Low" | |
| logger.debug("π LOW RISK") | |
| logger.debug(" β’ Limited pattern severity detected") | |
| logger.debug(f" β’ Pattern distribution: H:{counts['high']}, M:{counts['moderate']}, L:{counts['low']}") | |
| # Checklist Risk Assessment | |
| logger.debug("\nπ CHECKLIST RISK ASSESSMENT") | |
| logger.debug("=" * 50) | |
| checklist_escalation_risk = "Unknown" if escalation_score is None else ( | |
| "Critical" if escalation_score >= 20 else | |
| "Moderate" if escalation_score >= 10 else | |
| "Low" | |
| ) | |
| if escalation_score is not None: | |
| logger.debug(f"Score: {escalation_score}/29") | |
| logger.debug(f"Risk Level: {checklist_escalation_risk}") | |
| if escalation_score >= 20: | |
| logger.debug("ββ CRITICAL: Score indicates severe risk") | |
| elif escalation_score >= 10: | |
| logger.debug("β οΈ MODERATE: Score indicates concerning risk") | |
| else: | |
| logger.debug("π LOW: Score indicates limited risk") | |
| else: | |
| logger.debug("β Risk Level: Unknown (checklist not completed)") | |
| # Escalation Analysis | |
| logger.debug("\nπ ESCALATION ANALYSIS") | |
| logger.debug("=" * 50) | |
| escalation_bump = 0 | |
| for result, msg_id in results: | |
| abuse_score, patterns, matched_scores, sentiment, stage, darvo_score, tone_tag, boundary_assessment = result | |
| logger.debug(f"\nπ Message {msg_id} Risk Factors:") | |
| factors = [] | |
| if darvo_score > 0.65: | |
| escalation_bump += 3 | |
| factors.append(f"β² +3: High DARVO score ({darvo_score:.3f})") | |
| if tone_tag in ["forced accountability flip", "emotional threat"]: | |
| escalation_bump += 2 | |
| factors.append(f"β² +2: Concerning tone ({tone_tag})") | |
| if abuse_score > 80: | |
| escalation_bump += 2 | |
| factors.append(f"β² +2: High abuse score ({abuse_score:.1f}%)") | |
| if stage == 2: | |
| escalation_bump += 3 | |
| factors.append("β² +3: Escalation stage") | |
| if factors: | |
| for factor in factors: | |
| logger.debug(f" {factor}") | |
| else: | |
| logger.debug(" β No escalation factors") | |
| logger.debug(f"\nπ Total Escalation Bump: +{escalation_bump}") | |
| # Check for compound threats across messages | |
| compound_threat_flag, threat_type = analyze_message_batch_threats( | |
| [msg1, msg2, msg3], results | |
| ) | |
| if compound_threat_flag: | |
| logger.debug(f"β οΈ Compound threat detected across messages: {threat_type}") | |
| pattern_escalation_risk = "Critical" # Override risk level | |
| logger.debug("Risk level elevated to CRITICAL due to compound threats") | |
| # Combined Risk Calculation | |
| logger.debug("\nπ― FINAL RISK CALCULATION") | |
| logger.debug("=" * 50) | |
| def rank(label): | |
| return {"Low": 0, "Moderate": 1, "High": 2, "Critical": 3, "Unknown": 0}.get(label, 0) | |
| combined_score = rank(pattern_escalation_risk) + rank(checklist_escalation_risk) + escalation_bump | |
| logger.debug("Risk Components:") | |
| logger.debug(f" β’ Pattern Risk ({pattern_escalation_risk}): +{rank(pattern_escalation_risk)}") | |
| logger.debug(f" β’ Checklist Risk ({checklist_escalation_risk}): +{rank(checklist_escalation_risk)}") | |
| logger.debug(f" β’ Escalation Bump: +{escalation_bump}") | |
| logger.debug(f" = Combined Score: {combined_score}") | |
| escalation_risk = ( | |
| "Critical" if combined_score >= 6 else | |
| "High" if combined_score >= 4 else | |
| "Moderate" if combined_score >= 2 else | |
| "Low" | |
| ) | |
| logger.debug(f"\nβ οΈ Final Escalation Risk: {escalation_risk}") | |
| # Generate Output Text | |
| logger.debug("\nπ GENERATING OUTPUT") | |
| logger.debug("=" * 50) | |
| if escalation_score is None: | |
| escalation_text = ( | |
| "π« **Escalation Potential: Unknown** (Checklist not completed)\n" | |
| "β οΈ This section was not completed. Escalation potential is estimated using message data only.\n" | |
| ) | |
| hybrid_score = 0 | |
| logger.debug("Generated output for incomplete checklist") | |
| elif escalation_score == 0: | |
| escalation_text = ( | |
| "β **Escalation Checklist Completed:** No danger items reported.\n" | |
| "π§ **Escalation potential estimated from detected message patterns only.**\n" | |
| f"β’ Pattern Risk: {pattern_escalation_risk}\n" | |
| f"β’ Checklist Risk: None reported\n" | |
| f"β’ Escalation Bump: +{escalation_bump} (from DARVO, tone, intensity, etc.)" | |
| ) | |
| hybrid_score = escalation_bump | |
| logger.debug("Generated output for no-risk checklist") | |
| else: | |
| hybrid_score = escalation_score + escalation_bump | |
| escalation_text = ( | |
| f"π **Escalation Potential: {escalation_risk} ({hybrid_score}/29)**\n" | |
| "π This score combines your safety checklist answers *and* detected high-risk behavior.\n" | |
| f"β’ Pattern Risk: {pattern_escalation_risk}\n" | |
| f"β’ Checklist Risk: {checklist_escalation_risk}\n" | |
| f"β’ Escalation Bump: +{escalation_bump} (from DARVO, tone, intensity, etc.)" | |
| ) | |
| logger.debug(f"Generated output with hybrid score: {hybrid_score}/29") | |
| # Final Metrics | |
| logger.debug("\nπ FINAL METRICS") | |
| logger.debug("=" * 50) | |
| composite_abuse = int(round(sum(abuse_scores) / len(abuse_scores))) | |
| logger.debug(f"Composite Abuse Score: {composite_abuse}%") | |
| most_common_stage = max(set(stages), key=stages.count) | |
| logger.debug(f"Most Common Stage: {most_common_stage}") | |
| avg_darvo = round(sum(darvo_scores) / len(darvo_scores), 3) | |
| logger.debug(f"Average DARVO Score: {avg_darvo}") | |
| final_risk_level = calculate_enhanced_risk_level( | |
| composite_abuse, | |
| predicted_labels, | |
| escalation_risk, | |
| avg_darvo | |
| ) | |
| # Override escalation_risk with the enhanced version | |
| escalation_risk = final_risk_level | |
| # Generate Final Report | |
| logger.debug("\nπ GENERATING FINAL REPORT") | |
| logger.debug("=" * 50) | |
| out = f"Abuse Intensity: {composite_abuse}%\n" | |
| # Add detected patterns to output | |
| if predicted_labels: | |
| out += "π Detected Patterns:\n" | |
| if high_patterns: | |
| patterns_str = ", ".join(f"{p} ({pattern_counts[p]}x)" for p in high_patterns) | |
| out += f"β High Severity: {patterns_str}\n" | |
| if moderate_patterns: | |
| patterns_str = ", ".join(f"{p} ({pattern_counts[p]}x)" for p in moderate_patterns) | |
| out += f"β οΈ Moderate Severity: {patterns_str}\n" | |
| if low_patterns: | |
| patterns_str = ", ".join(f"{p} ({pattern_counts[p]}x)" for p in low_patterns) | |
| out += f"π Low Severity: {patterns_str}\n" | |
| out += "\n" | |
| out += "π This reflects the strength and severity of detected abuse patterns in the message(s).\n\n" | |
| # Risk Level Assessment | |
| risk_level = final_risk_level | |
| logger.debug(f"Final Risk Level: {risk_level}") | |
| # Add Risk Description | |
| risk_descriptions = { | |
| "Critical": ( | |
| "π¨ **Risk Level: Critical**\n" | |
| "Multiple severe abuse patterns detected. This situation shows signs of " | |
| "dangerous escalation and immediate intervention may be needed." | |
| ), | |
| "High": ( | |
| "β οΈ **Risk Level: High**\n" | |
| "Strong abuse patterns detected. This situation shows concerning " | |
| "signs of manipulation and control." | |
| ), | |
| "Moderate": ( | |
| "β‘ **Risk Level: Moderate**\n" | |
| "Concerning patterns detected. While not severe, these behaviors " | |
| "indicate unhealthy relationship dynamics." | |
| ), | |
| "Low": ( | |
| "π **Risk Level: Low**\n" | |
| "Minor concerning patterns detected. While present, the detected " | |
| "behaviors are subtle or infrequent." | |
| ) | |
| } | |
| out += risk_descriptions[risk_level] | |
| out += f"\n\n{RISK_STAGE_LABELS[most_common_stage]}" | |
| logger.debug("Added risk description and stage information") | |
| # Add DARVO Analysis | |
| if avg_darvo > 0.25: | |
| level = "moderate" if avg_darvo < 0.65 else "high" | |
| out += f"\n\nπ **DARVO Score: {avg_darvo}** β This indicates a **{level} likelihood** of narrative reversal (DARVO), where the speaker may be denying, attacking, or reversing blame." | |
| logger.debug(f"Added DARVO analysis ({level} level)") | |
| # Add Emotional Tones | |
| logger.debug("\nπ Adding Emotional Tones") | |
| out += "\n\nπ **Emotional Tones Detected:**\n" | |
| for i, tone in enumerate(tone_tags): | |
| out += f"β’ Message {i+1}: *{tone or 'none'}*\n" | |
| logger.debug(f"Message {i+1} tone: {tone}") | |
| # Add Threats Section | |
| logger.debug("\nβ οΈ Adding Threat Analysis") | |
| if flat_threats: | |
| out += "\n\nπ¨ **Immediate Danger Threats Detected:**\n" | |
| for t in set(flat_threats): | |
| out += f"β’ \"{t}\"\n" | |
| out += "\nβ οΈ These phrases may indicate an imminent risk to physical safety." | |
| logger.debug(f"Added {len(set(flat_threats))} unique threat warnings") | |
| else: | |
| out += "\n\nπ§© **Immediate Danger Threats:** None explicitly detected.\n" | |
| out += "This does *not* rule out risk, but no direct threat phrases were matched." | |
| logger.debug("No threats to add") | |
| # Generate Timeline | |
| logger.debug("\nπ Generating Timeline") | |
| pattern_labels = [] | |
| for result, _ in results: | |
| matched_scores = result[2] # Get the matched_scores from the result tuple | |
| if matched_scores: | |
| # Sort matched_scores by score and get the highest scoring pattern | |
| highest_pattern = max(matched_scores, key=lambda x: x[1]) | |
| pattern_labels.append(highest_pattern[0]) # Add the pattern name | |
| else: | |
| pattern_labels.append("none") | |
| logger.debug("Pattern labels for timeline:") | |
| for i, (pattern, score) in enumerate(zip(pattern_labels, abuse_scores)): | |
| logger.debug(f"Message {i+1}: {pattern} ({score:.1f}%)") | |
| timeline_image = generate_abuse_score_chart(dates_used, abuse_scores, pattern_labels) | |
| logger.debug("Timeline generated successfully") | |
| # Add Escalation Text | |
| out += "\n\n" + escalation_text | |
| logger.debug("Added escalation text to output") | |
| logger.debug("\nβ ANALYSIS COMPLETE") | |
| logger.debug("=" * 50) | |
| # SAFETY PLANNING CHECK | |
| # Check if safety planning should be offered | |
| show_safety = should_show_safety_planning( | |
| composite_abuse, | |
| escalation_risk, | |
| predicted_labels | |
| ) | |
| safety_plan = "" | |
| if show_safety: | |
| # Generate safety plan | |
| safety_plan = generate_simple_safety_plan( | |
| composite_abuse, | |
| escalation_risk, | |
| predicted_labels | |
| ) | |
| # Add notice to main results | |
| out += "\n\n" + "π‘οΈ " + "="*48 | |
| out += "\n**SAFETY PLANNING AVAILABLE**" | |
| out += "\n" + "="*50 | |
| out += "\n\nBased on your analysis results, we've generated a safety plan." | |
| out += "\nCheck the 'Safety Plan' output below for personalized guidance." | |
| return out, timeline_image, safety_plan | |
| except Exception as e: | |
| logger.error("\nβ ERROR IN ANALYSIS") | |
| logger.error("=" * 50) | |
| logger.error(f"Error type: {type(e).__name__}") | |
| logger.error(f"Error message: {str(e)}") | |
| logger.error(f"Traceback:\n{traceback.format_exc()}") | |
| return "An error occurred during analysis.", None, "" | |
| def format_results_for_new_ui(analysis_output, timeline_image, safety_plan): | |
| """ | |
| Convert your existing analysis output into the format needed for the new UI | |
| """ | |
| try: | |
| # Parse your existing text output to extract structured data | |
| lines = analysis_output.split('\n') | |
| # Extract abuse intensity | |
| abuse_intensity = 0 | |
| for line in lines: | |
| if line.startswith('Abuse Intensity:'): | |
| abuse_intensity = int(re.findall(r'\d+', line)[0]) | |
| break | |
| # Extract DARVO score | |
| darvo_score = 0.0 | |
| for line in lines: | |
| if 'DARVO Score:' in line: | |
| # Extract number from line like "π **DARVO Score: 0.456**" | |
| darvo_match = re.search(r'DARVO Score: ([\d.]+)', line) | |
| if darvo_match: | |
| darvo_score = float(darvo_match.group(1)) | |
| break | |
| # Extract emotional tones | |
| emotional_tones = [] | |
| in_tones_section = False | |
| for line in lines: | |
| if 'π **Emotional Tones Detected:**' in line: | |
| in_tones_section = True | |
| continue | |
| elif in_tones_section and line.strip(): | |
| if line.startswith('β’ Message'): | |
| # Extract tone from line like "β’ Message 1: *menacing calm*" | |
| tone_match = re.search(r'\*([^*]+)\*', line) | |
| if tone_match: | |
| tone = tone_match.group(1) | |
| emotional_tones.append(tone if tone != 'none' else 'neutral') | |
| else: | |
| emotional_tones.append('neutral') | |
| elif not line.startswith('β’') and line.strip(): | |
| break | |
| # Determine risk level based on your existing logic | |
| if abuse_intensity >= 85: | |
| risk_level = 'critical' | |
| elif abuse_intensity >= 70: | |
| risk_level = 'high' | |
| elif abuse_intensity >= 50: | |
| risk_level = 'moderate' | |
| else: | |
| risk_level = 'low' | |
| # FIXED: Extract detected patterns properly | |
| patterns = [] | |
| in_patterns_section = False | |
| # Define valid pattern names to filter against | |
| valid_patterns = { | |
| "recovery phase", "control", "gaslighting", "guilt tripping", "dismissiveness", | |
| "blame shifting", "nonabusive", "projection", "insults", | |
| "contradictory statements", "obscure language", | |
| "veiled threats", "stalking language", "false concern", | |
| "false equivalence", "future faking" | |
| } | |
| for line in lines: | |
| if 'π Detected Patterns:' in line: | |
| in_patterns_section = True | |
| continue | |
| elif in_patterns_section and line.strip(): | |
| if line.startswith('β'): | |
| severity = 'high' | |
| elif line.startswith('β οΈ'): | |
| severity = 'moderate' | |
| elif line.startswith('π'): | |
| severity = 'low' | |
| else: | |
| continue | |
| # Extract pattern text after the severity indicator | |
| if ':' in line: | |
| pattern_text = line.split(':', 1)[1].strip() | |
| else: | |
| pattern_text = line[2:].strip() # Remove emoji and space | |
| # Parse individual patterns from the text | |
| # Handle format like "blame shifting (1x), projection (2x)" | |
| pattern_parts = pattern_text.split(',') | |
| for part in pattern_parts: | |
| # Clean up the pattern name | |
| pattern_name = part.strip() | |
| # Remove count indicators like "(1x)", "(2x)", etc. | |
| pattern_name = re.sub(r'\s*\(\d+x?\)', '', pattern_name) | |
| # Remove any remaining special characters and clean | |
| pattern_name = pattern_name.strip().lower() | |
| # Only add if it's a valid pattern name | |
| if pattern_name in valid_patterns: | |
| patterns.append({ | |
| 'name': pattern_name.replace('_', ' ').title(), | |
| 'severity': severity, | |
| 'description': get_pattern_description(pattern_name) | |
| }) | |
| elif line.strip() and not line.startswith(('β', 'β οΈ', 'π')) and in_patterns_section: | |
| # Exit patterns section when we hit a non-pattern line | |
| break | |
| # Generate personalized recommendations | |
| recommendations = generate_personalized_recommendations(abuse_intensity, patterns, safety_plan) | |
| # Add this before the return statement in format_results_for_new_ui() | |
| boundary_health = { | |
| 'overall_health': 'unknown', | |
| 'message_assessments': [], | |
| 'recommendations': [] | |
| } | |
| return { | |
| 'riskLevel': risk_level, | |
| 'riskScore': abuse_intensity, | |
| 'primaryConcerns': patterns[:3], # Top 3 most important | |
| 'allPatterns': patterns, | |
| 'riskStage': extract_risk_stage(analysis_output), | |
| 'emotionalTones': emotional_tones, | |
| 'darvoScore': darvo_score, | |
| 'boundaryHealth': boundary_health, | |
| 'personalizedRecommendations': recommendations, | |
| 'hasSafetyPlan': bool(safety_plan), | |
| 'safetyPlan': safety_plan, | |
| 'rawAnalysis': analysis_output | |
| } | |
| except Exception as e: | |
| logger.error(f"Error formatting results: {e}") | |
| return { | |
| 'riskLevel': 'low', | |
| 'riskScore': 0, | |
| 'primaryConcerns': [], | |
| 'allPatterns': [], | |
| 'riskStage': 'unknown', | |
| 'emotionalTones': [], | |
| 'darvoScore': 0.0, | |
| 'boundaryHealth': {'overall_health': 'unknown', 'message_assessments': [], 'recommendations': []}, | |
| 'personalizedRecommendations': ['Consider speaking with a counselor about your relationship concerns'], | |
| 'hasSafetyPlan': False, | |
| 'safetyPlan': '', | |
| 'rawAnalysis': analysis_output | |
| } | |
| def get_pattern_description(pattern_name): | |
| """Get human-readable descriptions for patterns""" | |
| descriptions = { | |
| 'control': 'Attempts to manage your behavior, decisions, or daily activities', | |
| 'gaslighting': 'Making you question your memory, perception, or reality', | |
| 'dismissiveness': 'Minimizing or invalidating your feelings and experiences', | |
| 'guilt tripping': 'Making you feel guilty to influence your behavior', | |
| 'blame shifting': 'Placing responsibility for their actions onto you', | |
| 'projection': 'Accusing you of behaviors they themselves exhibit', | |
| 'insults': 'Name-calling or personal attacks intended to hurt', | |
| 'contradictory statements': 'Saying things that conflict with previous statements', | |
| 'obscure language': 'Using vague or confusing language to avoid accountability', | |
| 'veiled threats': 'Indirect threats or intimidating language', | |
| 'stalking language': 'Monitoring, tracking, or obsessive behaviors', | |
| 'false concern': 'Expressing fake worry to manipulate or control', | |
| 'false equivalence': 'Comparing incomparable situations to justify behavior', | |
| 'future faking': 'Making promises about future behavior that are unlikely to be kept' | |
| } | |
| return descriptions.get(pattern_name.lower(), 'Concerning communication pattern detected') | |
| def generate_personalized_recommendations(abuse_score, patterns, safety_plan): | |
| """Generate recommendations based on specific findings""" | |
| recommendations = [] | |
| # Base recommendations | |
| if abuse_score >= 70: | |
| recommendations.extend([ | |
| 'Document these conversations with dates and times', | |
| 'Reach out to a trusted friend or family member about your concerns', | |
| 'Consider contacting the National Domestic Violence Hotline for guidance' | |
| ]) | |
| elif abuse_score >= 40: | |
| recommendations.extend([ | |
| 'Keep a private journal of concerning interactions', | |
| 'Talk to someone you trust about these communication patterns', | |
| 'Consider counseling to explore healthy relationship dynamics' | |
| ]) | |
| else: | |
| recommendations.extend([ | |
| 'Continue monitoring communication patterns that concern you', | |
| 'Consider discussing communication styles with your partner when you feel safe to do so' | |
| ]) | |
| # Pattern-specific recommendations | |
| pattern_names = [p['name'].lower() for p in patterns] | |
| if 'control' in pattern_names: | |
| recommendations.append('Maintain your independence and decision-making autonomy') | |
| if 'gaslighting' in pattern_names: | |
| recommendations.append('Trust your memory and perceptions - consider keeping notes') | |
| if any(p in pattern_names for p in ['stalking language', 'veiled threats']): | |
| recommendations.append('Vary your routines and inform trusted people of your whereabouts') | |
| if safety_plan: | |
| recommendations.append('Review your personalized safety plan regularly') | |
| return recommendations[:4] # Limit to 4 recommendations | |
| def extract_risk_stage(analysis_output): | |
| """Extract risk stage from analysis output""" | |
| if 'Tension-Building' in analysis_output: | |
| return 'tension-building' | |
| elif 'Escalation' in analysis_output: | |
| return 'escalation' | |
| elif 'Reconciliation' in analysis_output: | |
| return 'reconciliation' | |
| elif 'Honeymoon' in analysis_output: | |
| return 'honeymoon' | |
| else: | |
| return 'unknown' | |
| def create_boundary_health_display(boundary_assessment): | |
| """Create HTML display for boundary health assessment""" | |
| color_map = { | |
| 'healthy': '#10b981', | |
| 'mostly_healthy': '#3b82f6', | |
| 'concerning': '#f59e0b', | |
| 'unhealthy': '#ef4444', | |
| 'neutral': '#6b7280' | |
| } | |
| bg_map = { | |
| 'healthy': '#f0fdf4', | |
| 'mostly_healthy': '#eff6ff', | |
| 'concerning': '#fffbeb', | |
| 'unhealthy': '#fef2f2', | |
| 'neutral': '#f9fafb' | |
| } | |
| assessment = boundary_assessment.get('assessment', 'neutral') | |
| color = color_map.get(assessment, '#6b7280') | |
| bg_color = bg_map.get(assessment, '#f9fafb') | |
| html = f""" | |
| <div style="background: {bg_color}; border-left: 4px solid {color}; border-radius: 8px; padding: 16px; margin: 16px 0;"> | |
| <h4 style="margin: 0 0 8px 0; color: #1f2937;">π‘οΈ Boundary Health: {boundary_assessment.get('label', 'Unknown')}</h4> | |
| <p style="margin: 0 0 8px 0; color: #6b7280;">{boundary_assessment.get('description', 'No assessment available')}</p> | |
| <p style="margin: 0; color: #6b7280; font-size: 14px;"><strong>Confidence:</strong> {boundary_assessment.get('confidence', 0):.1%}</p> | |
| <div style="margin-top: 12px;"> | |
| <h5 style="margin: 0 0 4px 0; color: #1f2937;">Recommendations:</h5> | |
| <ul style="margin: 0; padding-left: 20px; color: #6b7280;"> | |
| """ | |
| for rec in boundary_assessment.get('recommendations', []): | |
| html += f"<li style='margin: 2px 0; color: #6b7280;'>{rec}</li>" | |
| html += """ | |
| </ul> | |
| </div> | |
| </div> | |
| """ | |
| return html | |
| def analyze_composite_with_ui_format(msg1, msg2, msg3, *answers_and_none): | |
| """ | |
| Your existing analysis function, but returns formatted data for the new UI | |
| """ | |
| # Run your existing analysis | |
| analysis_output, timeline_image, safety_plan = analyze_composite(msg1, msg2, msg3, *answers_and_none) | |
| # Format for new UI | |
| structured_results = format_results_for_new_ui(analysis_output, timeline_image, safety_plan) | |
| # Return as JSON string for the new UI to parse | |
| return json.dumps(structured_results), timeline_image, safety_plan | |
| def create_mobile_friendly_interface(): | |
| """Create a responsive interface that works well on both mobile and desktop with full functionality""" | |
| css = """ | |
| /* Base responsive layout */ | |
| .gradio-container { | |
| max-width: 100% !important; | |
| padding: 12px !important; | |
| } | |
| /* Desktop: side-by-side columns */ | |
| @media (min-width: 1024px) { | |
| .desktop-row { | |
| display: flex !important; | |
| gap: 20px !important; | |
| } | |
| .desktop-col-messages { | |
| flex: 2 !important; | |
| min-width: 400px !important; | |
| } | |
| .desktop-col-checklist { | |
| flex: 1 !important; | |
| min-width: 300px !important; | |
| } | |
| .desktop-col-results { | |
| flex: 2 !important; | |
| min-width: 400px !important; | |
| } | |
| .mobile-only { | |
| display: none !important; | |
| } | |
| .mobile-expandable-btn { | |
| display: none !important; | |
| } | |
| } | |
| /* Mobile/Tablet: stack everything */ | |
| @media (max-width: 1023px) { | |
| .gradio-row { | |
| flex-direction: column !important; | |
| } | |
| .gradio-column { | |
| width: 100% !important; | |
| margin-bottom: 20px !important; | |
| } | |
| .desktop-only { | |
| display: none !important; | |
| } | |
| /* Mobile expandable sections */ | |
| .mobile-expandable-content { | |
| display: none; | |
| } | |
| .mobile-expandable-content.show { | |
| display: block; | |
| } | |
| } | |
| /* Button styling */ | |
| .gradio-button { | |
| margin-bottom: 8px !important; | |
| } | |
| @media (max-width: 1023px) { | |
| .gradio-button { | |
| width: 100% !important; | |
| padding: 16px !important; | |
| font-size: 16px !important; | |
| } | |
| .mobile-expand-btn { | |
| background: #f9fafb !important; | |
| border: 1px solid #e5e7eb !important; | |
| color: #374151 !important; | |
| padding: 12px 16px !important; | |
| margin: 8px 0 !important; | |
| border-radius: 8px !important; | |
| font-weight: 500 !important; | |
| } | |
| .mobile-expand-btn:hover { | |
| background: #f3f4f6 !important; | |
| } | |
| } | |
| /* Results styling */ | |
| .risk-low { border-left: 4px solid #10b981; background: #f0fdf4; } | |
| .risk-moderate { border-left: 4px solid #f59e0b; background: #fffbeb; } | |
| .risk-high { border-left: 4px solid #f97316; background: #fff7ed; } | |
| .risk-critical { border-left: 4px solid #ef4444; background: #fef2f2; } | |
| /* Clean group styling */ | |
| .gradio-group { | |
| border: none !important; | |
| background: none !important; | |
| padding: 0 !important; | |
| margin: 0 !important; | |
| box-shadow: none !important; | |
| } | |
| /* Force readable text colors */ | |
| .gradio-html * { | |
| color: #1f2937 !important; | |
| } | |
| .gradio-html p, .gradio-html div, .gradio-html span, .gradio-html li, .gradio-html ul, .gradio-html h1, .gradio-html h2, .gradio-html h3, .gradio-html h4 { | |
| color: #1f2937 !important; | |
| } | |
| /* Form spacing */ | |
| .gradio-textbox { | |
| margin-bottom: 12px !important; | |
| } | |
| .gradio-checkbox { | |
| margin-bottom: 6px !important; | |
| font-size: 14px !important; | |
| } | |
| /* Compact checklist */ | |
| .compact-checklist .gradio-checkbox { | |
| margin-bottom: 4px !important; | |
| } | |
| /* Specific overrides for safety plan and analysis displays */ | |
| .gradio-html pre { | |
| color: #1f2937 !important; | |
| background: #f9fafb !important; | |
| padding: 12px !important; | |
| border-radius: 8px !important; | |
| } | |
| """ | |
| with gr.Blocks(css=css, title="Relationship Pattern Analyzer") as demo: | |
| gr.HTML(""" | |
| <div style="text-align: center; padding: 30px 20px;"> | |
| <h1 style="font-size: 2.5rem; font-weight: bold; color: #1f2937; margin-bottom: 16px;"> | |
| Relationship Pattern Analyzer | |
| </h1> | |
| <p style="font-size: 1.25rem; color: #6b7280; max-width: 600px; margin: 0 auto;"> | |
| Share messages that concern you, and we'll help you understand what patterns might be present. | |
| </p> | |
| </div> | |
| """) | |
| with gr.Tab("Analyze Messages"): | |
| # Privacy notice | |
| gr.HTML(""" | |
| <div style="background: #1e40af; border-radius: 12px; padding: 24px; margin-bottom: 24px; width: 100%; box-shadow: 0 4px 12px rgba(30, 64, 175, 0.3);"> | |
| <div style="display: flex; align-items: center; margin-bottom: 12px;"> | |
| <span style="font-size: 1.5rem; margin-right: 12px;">π‘οΈ</span> | |
| <h3 style="color: white; margin: 0; font-size: 1.25rem; font-weight: 600;">Your Privacy Matters</h3> | |
| </div> | |
| <p style="color: #e0e7ff; margin: 0; font-size: 1rem; line-height: 1.5;"> | |
| Your messages are analyzed locally and are not stored or shared. | |
| This tool is for educational purposes and not a substitute for professional counseling. | |
| </p> | |
| </div> | |
| """) | |
| # Desktop layout | |
| with gr.Row(elem_classes=["desktop-row", "desktop-only"], equal_height=True): | |
| # Messages column | |
| with gr.Column(elem_classes=["desktop-col-messages"], scale=4, min_width=400): | |
| gr.HTML("<h3 style='margin-bottom: 16px;'>Share Your Messages</h3>") | |
| gr.HTML(""" | |
| <p style="color: #6b7280; margin-bottom: 20px;"> | |
| Enter up to three messages that made you feel uncomfortable, confused, or concerned. | |
| For the most accurate analysis, include messages from recent emotionally intense conversations. | |
| </p> | |
| """) | |
| msg1_desktop = gr.Textbox( | |
| label="Message 1 *", | |
| placeholder="Enter the message here...", | |
| lines=4 | |
| ) | |
| msg2_desktop = gr.Textbox( | |
| label="Message 2 (optional)", | |
| placeholder="Enter the message here...", | |
| lines=4 | |
| ) | |
| msg3_desktop = gr.Textbox( | |
| label="Message 3 (optional)", | |
| placeholder="Enter the message here...", | |
| lines=4 | |
| ) | |
| # Checklist column | |
| with gr.Column(elem_classes=["desktop-col-checklist"], scale=3, min_width=300): | |
| gr.HTML("<h3 style='margin-bottom: 16px;'>Safety Checklist</h3>") | |
| gr.HTML(""" | |
| <p style="color: #6b7280; margin-bottom: 20px; font-size: 14px;"> | |
| Optional but recommended. Check any that apply to your situation: | |
| </p> | |
| """) | |
| checklist_items_desktop = [] | |
| with gr.Column(elem_classes=["compact-checklist"]): | |
| for question, weight in ESCALATION_QUESTIONS: | |
| checklist_items_desktop.append(gr.Checkbox(label=question, elem_classes=["compact-checkbox"])) | |
| none_selected_desktop = gr.Checkbox( | |
| label="None of the above apply to my situation", | |
| elem_classes=["none-checkbox"] | |
| ) | |
| analyze_btn_desktop = gr.Button( | |
| "Analyze Messages", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| # Results column | |
| with gr.Column(elem_classes=["desktop-col-results"], scale=5, min_width=400): | |
| gr.HTML("<h3 style='margin-bottom: 16px;'>Analysis Results</h3>") | |
| gr.HTML(""" | |
| <p style="color: #6b7280; margin-bottom: 20px; font-style: italic;"> | |
| Results will appear here after analysis... | |
| </p> | |
| """) | |
| # Desktop results components | |
| results_json_desktop = gr.JSON(visible=False) | |
| risk_summary_desktop = gr.HTML(visible=False) | |
| concerns_display_desktop = gr.HTML(visible=False) | |
| additional_metrics_desktop = gr.HTML(visible=False) | |
| recommendations_display_desktop = gr.HTML(visible=False) | |
| with gr.Row(visible=False) as action_buttons_desktop: | |
| safety_plan_btn_desktop = gr.Button("π‘οΈ Get Safety Plan", variant="secondary") | |
| full_analysis_btn_desktop = gr.Button("π Show Full Analysis", variant="secondary") | |
| download_btn_desktop = gr.Button("π Download Report", variant="secondary") | |
| full_analysis_display_desktop = gr.HTML(visible=False) | |
| timeline_chart_desktop = gr.Image(visible=False, label="Pattern Timeline") | |
| download_file_desktop = gr.File(label="Download Report", visible=False) | |
| # Mobile layout | |
| with gr.Column(elem_classes=["mobile-only"]): | |
| # Message input - always visible | |
| gr.HTML("<h3>π Share Your Messages</h3>") | |
| gr.HTML(""" | |
| <p style="color: #6b7280; margin-bottom: 16px; font-size: 14px;"> | |
| Enter messages that made you uncomfortable or concerned: | |
| </p> | |
| """) | |
| msg1_mobile = gr.Textbox( | |
| label="Message 1 (required)", | |
| placeholder="Enter the concerning message here...", | |
| lines=3 | |
| ) | |
| # Button to show additional messages | |
| show_more_msgs_btn = gr.Button( | |
| "β Add More Messages (Optional)", | |
| elem_classes=["mobile-expand-btn", "mobile-expandable-btn"], | |
| variant="secondary" | |
| ) | |
| # Additional messages (hidden by default) | |
| with gr.Column(visible=False) as additional_messages_mobile: | |
| msg2_mobile = gr.Textbox( | |
| label="Message 2 (optional)", | |
| placeholder="Enter another message...", | |
| lines=3 | |
| ) | |
| msg3_mobile = gr.Textbox( | |
| label="Message 3 (optional)", | |
| placeholder="Enter a third message...", | |
| lines=3 | |
| ) | |
| # Button to show safety checklist | |
| show_checklist_btn = gr.Button( | |
| "β οΈ Safety Checklist (Optional)", | |
| elem_classes=["mobile-expand-btn", "mobile-expandable-btn"], | |
| variant="secondary" | |
| ) | |
| # Safety checklist (hidden by default) | |
| with gr.Column(visible=False) as safety_checklist_mobile: | |
| gr.HTML(""" | |
| <p style="color: #6b7280; margin-bottom: 16px; font-size: 14px;"> | |
| Check any that apply to improve analysis accuracy: | |
| </p> | |
| """) | |
| checklist_items_mobile = [] | |
| for question, weight in ESCALATION_QUESTIONS: | |
| checklist_items_mobile.append(gr.Checkbox(label=question, elem_classes=["compact-checkbox"])) | |
| none_selected_mobile = gr.Checkbox( | |
| label="None of the above apply", | |
| elem_classes=["none-checkbox"] | |
| ) | |
| # Analysis button | |
| analyze_btn_mobile = gr.Button( | |
| "π Analyze Messages", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| # Mobile results components | |
| results_json_mobile = gr.JSON(visible=False) | |
| risk_summary_mobile = gr.HTML(visible=False) | |
| concerns_display_mobile = gr.HTML(visible=False) | |
| additional_metrics_mobile = gr.HTML(visible=False) | |
| recommendations_display_mobile = gr.HTML(visible=False) | |
| with gr.Row(visible=False) as action_buttons_mobile: | |
| safety_plan_btn_mobile = gr.Button("π‘οΈ Safety Plan", variant="secondary") | |
| full_analysis_btn_mobile = gr.Button("π Full Analysis", variant="secondary") | |
| download_btn_mobile = gr.Button("π Download", variant="secondary") | |
| full_analysis_display_mobile = gr.HTML(visible=False) | |
| timeline_chart_mobile = gr.Image(visible=False, label="Pattern Timeline") | |
| download_file_mobile = gr.File(label="Download Report", visible=False) | |
| with gr.Tab("Safety Resources"): | |
| gr.HTML(""" | |
| <div style="background: #dcfce7; border-radius: 12px; padding: 24px; margin-bottom: 20px;"> | |
| <h2 style="color: #166534; margin-bottom: 16px;">π‘οΈ Safety Planning</h2> | |
| <p style="color: #166534;"> | |
| If you're concerned about your safety, here are immediate resources and steps you can take. | |
| </p> | |
| </div> | |
| """) | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.HTML(""" | |
| <div class="risk-card" style="background: #fef2f2; border-left: 4px solid #ef4444;"> | |
| <h3 style="color: #991b1b;">π¨ Emergency Resources</h3> | |
| <div style="margin: 16px 0;"> | |
| <p><strong>911</strong> - For immediate danger</p> | |
| <p><strong>1-800-799-7233</strong> - National DV Hotline (24/7)</p> | |
| <p><strong>Text START to 88788</strong> - Crisis Text Line</p> | |
| <p><strong>988</strong> - National Suicide Prevention Lifeline</p> | |
| </div> | |
| </div> | |
| """) | |
| with gr.Column(): | |
| gr.HTML(""" | |
| <div class="risk-card" style="background: #f0fdf4; border-left: 4px solid #10b981;"> | |
| <h3 style="color: #065f46;">π Support Resources</h3> | |
| <div style="margin: 16px 0;"> | |
| <p><strong>thehotline.org</strong> - Online chat support</p> | |
| <p><strong>Local counseling services</strong> - Professional support</p> | |
| <p><strong>Trusted friends/family</strong> - Personal support network</p> | |
| <p><strong>Legal advocacy</strong> - Know your rights</p> | |
| </div> | |
| </div> | |
| """) | |
| safety_plan_display = gr.HTML() | |
| # Mobile expandable button handlers | |
| def toggle_additional_messages(current_visibility): | |
| return gr.update(visible=not current_visibility) | |
| def toggle_safety_checklist(current_visibility): | |
| return gr.update(visible=not current_visibility) | |
| show_more_msgs_btn.click( | |
| toggle_additional_messages, | |
| inputs=[additional_messages_mobile], | |
| outputs=[additional_messages_mobile] | |
| ) | |
| show_checklist_btn.click( | |
| toggle_safety_checklist, | |
| inputs=[safety_checklist_mobile], | |
| outputs=[safety_checklist_mobile] | |
| ) | |
| # Full analysis processing function | |
| def process_analysis(*inputs): | |
| """Process the analysis and format for display - FULL FUNCTIONALITY""" | |
| msgs = inputs[:3] | |
| checklist_responses = inputs[3:] | |
| # Run analysis | |
| analysis_result, timeline_img, safety_plan = analyze_composite_with_ui_format(*inputs) | |
| # Parse results | |
| try: | |
| results = json.loads(analysis_result) | |
| except: | |
| results = {'riskLevel': 'low', 'riskScore': 0, 'primaryConcerns': [], 'emotionalTones': [], 'darvoScore': 0, 'personalizedRecommendations': []} | |
| # Format risk summary | |
| risk_config = { | |
| 'low': {'color': '#10b981', 'bg': '#f0fdf4', 'icon': 'π’', 'label': 'Low Risk'}, | |
| 'moderate': {'color': '#f59e0b', 'bg': '#fffbeb', 'icon': 'π‘', 'label': 'Moderate Concern'}, | |
| 'high': {'color': '#f97316', 'bg': '#fff7ed', 'icon': 'π ', 'label': 'High Risk'}, | |
| 'critical': {'color': '#ef4444', 'bg': '#fef2f2', 'icon': 'π΄', 'label': 'Critical Risk'} | |
| } | |
| config = risk_config.get(results['riskLevel'], risk_config['low']) | |
| # Create pattern summary for display with explicit styling | |
| pattern_summary = "" | |
| if results.get('primaryConcerns'): | |
| # Filter out the "escalation potential" concern when displaying in summary | |
| actual_concerns = [concern for concern in results['primaryConcerns'] | |
| if 'escalation potential' not in concern['name'].lower()] | |
| if actual_concerns: | |
| pattern_names = [concern['name'] for concern in actual_concerns] | |
| if len(pattern_names) == 1: | |
| pattern_summary = f"<span style='color: #1f2937 !important;'><strong style='color: #1f2937 !important;'>{pattern_names[0]}</strong> pattern detected</span>" | |
| elif len(pattern_names) == 2: | |
| pattern_summary = f"<span style='color: #1f2937 !important;'><strong style='color: #1f2937 !important;'>{pattern_names[0]}</strong> and <strong style='color: #1f2937 !important;'>{pattern_names[1]}</strong> patterns detected</span>" | |
| else: | |
| pattern_summary = f"<span style='color: #1f2937 !important;'><strong style='color: #1f2937 !important;'>{', '.join(pattern_names[:-1])}</strong> and <strong style='color: #1f2937 !important;'>{pattern_names[-1]}</strong> patterns detected</span>" | |
| else: | |
| # Only escalation potential was found (incomplete checklist) | |
| pattern_summary = "<span style='color: #1f2937 !important;'><strong style='color: #1f2937 !important;'>Concerning communication patterns</strong> detected</span>" | |
| else: | |
| pattern_summary = "<span style='color: #1f2937 !important;'><strong style='color: #1f2937 !important;'>Concerning communication patterns</strong> detected</span>" | |
| risk_html = f""" | |
| <div style="background: {config['bg']}; border-left: 4px solid {config['color']}; border-radius: 12px; padding: 24px; margin-bottom: 20px; box-shadow: 0 2px 8px rgba(0,0,0,0.1);"> | |
| <div style="display: flex; align-items: center; margin-bottom: 16px;"> | |
| <span style="font-size: 2rem; margin-right: 12px;">{config['icon']}</span> | |
| <div> | |
| <h2 style="font-size: 1.5rem; font-weight: bold; color: #1f2937; margin: 0;">{config['label']}</h2> | |
| <p style="color: #374151; margin: 0; font-weight: 500;">Based on the messages you shared</p> | |
| </div> | |
| </div> | |
| <div style="background: rgba(0,0,0,0.05); border-radius: 8px; padding: 16px;"> | |
| <div style="color: #1f2937 !important; margin: 0 0 8px 0; font-size: 1rem;"> | |
| <span style="color: #1f2937 !important;">{pattern_summary}</span> | |
| </div> | |
| <p style="color: #374151 !important; margin: 0; font-weight: 600;"> | |
| Risk Score: {results['riskScore']}% | |
| </p> | |
| </div> | |
| </div> | |
| """ | |
| # Format concerns | |
| concerns_html = "<h3 style='margin-top: 24px;'>Key Concerns Found</h3>" | |
| if results.get('primaryConcerns'): | |
| for concern in results['primaryConcerns']: | |
| severity_colors = { | |
| 'high': '#fee2e2', | |
| 'moderate': '#fef3c7', | |
| 'low': '#dbeafe' | |
| } | |
| bg_color = severity_colors.get(concern.get('severity', 'low'), '#f3f4f6') | |
| concerns_html += f""" | |
| <div style="background: {bg_color}; border-radius: 8px; padding: 16px; margin: 8px 0;"> | |
| <h4 style="margin: 0 0 8px 0; color: #1f2937;">{concern.get('name', 'Unknown Concern')}</h4> | |
| <p style="margin: 0; color: #6b7280;">{concern.get('description', 'No description available')}</p> | |
| </div> | |
| """ | |
| else: | |
| concerns_html += "<p style='color: #6b7280; font-style: italic;'>No specific concerns identified in the messages.</p>" | |
| # Additional Metrics Section | |
| metrics_html = "<h3 style='margin-top: 24px;'>Additional Analysis</h3>" | |
| # DARVO Score | |
| darvo_score = results.get('darvoScore', 0) | |
| if darvo_score > 0.25: | |
| darvo_level = "High" if darvo_score >= 0.65 else "Moderate" | |
| darvo_color = "#fee2e2" if darvo_score >= 0.65 else "#fef3c7" | |
| metrics_html += f""" | |
| <div style="background: {darvo_color}; border-radius: 8px; padding: 16px; margin: 8px 0;"> | |
| <h4 style="margin: 0 0 8px 0; color: #1f2937;">π DARVO Score: {darvo_score:.3f} ({darvo_level})</h4> | |
| <p style="margin: 0; color: #6b7280;"> | |
| DARVO (Deny, Attack, Reverse Victim & Offender) indicates potential narrative manipulation where the speaker may be deflecting responsibility. | |
| </p> | |
| </div> | |
| """ | |
| # Emotional Tones | |
| emotional_tones = results.get('emotionalTones', []) | |
| if emotional_tones and any(tone != 'neutral' for tone in emotional_tones): | |
| metrics_html += f""" | |
| <div style="background: #f8fafc; border-radius: 8px; padding: 16px; margin: 8px 0;"> | |
| <h4 style="margin: 0 0 8px 0; color: #1f2937;">π Emotional Tones Detected</h4> | |
| <div style="margin: 8px 0;"> | |
| """ | |
| for i, tone in enumerate(emotional_tones): | |
| if tone and tone != 'neutral': | |
| metrics_html += f""" | |
| <p style="margin: 4px 0; color: #6b7280;">β’ Message {i+1}: <em>{tone}</em></p> | |
| """ | |
| metrics_html += """ | |
| </div> | |
| <p style="margin: 8px 0 0 0; color: #6b7280; font-size: 14px;"> | |
| Emotional tone analysis helps identify underlying manipulation tactics or concerning emotional patterns. | |
| </p> | |
| </div> | |
| """ | |
| # Format recommendations | |
| rec_html = "<h3 style='margin-top: 24px;'>Personalized Recommendations</h3>" | |
| recommendations = results.get('personalizedRecommendations', []) | |
| for rec in recommendations: | |
| rec_html += f""" | |
| <div style="background: #f8fafc; border-left: 3px solid #3b82f6; border-radius: 8px; padding: 12px; margin: 8px 0;"> | |
| <p style="margin: 0; color: #374151;">β’ {rec}</p> | |
| </div> | |
| """ | |
| return ( | |
| gr.update(value=analysis_result, visible=False), # results_json | |
| gr.update(value=risk_html, visible=True), # risk_summary | |
| gr.update(value=concerns_html, visible=True), # concerns_display | |
| gr.update(value=metrics_html, visible=True), # additional_metrics | |
| gr.update(value=rec_html, visible=True), # recommendations_display | |
| gr.update(visible=True), # action_buttons | |
| gr.update(visible=False), # full_analysis_display | |
| gr.update(value=timeline_img, visible=True), # timeline_chart | |
| gr.update(visible=False), # download_file | |
| gr.update(value=safety_plan) # safety_plan_display | |
| ) | |
| def show_full_analysis(results_json_str): | |
| """Show the full technical analysis""" | |
| try: | |
| if not results_json_str: | |
| return gr.update(value="<p>No analysis data available. Please run the analysis first.</p>", visible=True) | |
| # Handle both JSON string and dict inputs | |
| if isinstance(results_json_str, str): | |
| results = json.loads(results_json_str) | |
| elif isinstance(results_json_str, dict): | |
| results = results_json_str | |
| else: | |
| return gr.update(value="<p>Invalid data format. Please run the analysis again.</p>", visible=True) | |
| # Create comprehensive full analysis display | |
| full_html = f""" | |
| <div style="background: white; border-radius: 12px; padding: 24px; border: 1px solid #e5e7eb; margin-top: 20px;"> | |
| <h3 style="color: #1f2937 !important;">π Complete Technical Analysis</h3> | |
| <div style="background: #f9fafb; border-radius: 8px; padding: 16px; margin: 16px 0;"> | |
| <h4 style="color: #1f2937 !important;">π Risk Assessment Summary</h4> | |
| <p style="color: #1f2937 !important;"><strong style="color: #1f2937 !important;">Risk Level:</strong> {results.get('riskLevel', 'Unknown').title()}</p> | |
| <p style="color: #1f2937 !important;"><strong style="color: #1f2937 !important;">Risk Score:</strong> {results.get('riskScore', 'N/A')}%</p> | |
| <p style="color: #1f2937 !important;"><strong style="color: #1f2937 !important;">Risk Stage:</strong> {results.get('riskStage', 'Unknown').replace('-', ' ').title()}</p> | |
| </div> | |
| <div style="background: #f9fafb; border-radius: 8px; padding: 16px; margin: 16px 0;"> | |
| <h4 style="color: #1f2937 !important;">π Behavioral Analysis</h4> | |
| <p style="color: #1f2937 !important;"><strong style="color: #1f2937 !important;">DARVO Score:</strong> {results.get('darvoScore', 0):.3f}</p> | |
| <p style="color: #1f2937 !important;"><strong style="color: #1f2937 !important;">Emotional Tones:</strong> {', '.join(results.get('emotionalTones', ['None detected']))}</p> | |
| </div> | |
| <div style="background: #f9fafb; border-radius: 8px; padding: 16px; margin: 16px 0;"> | |
| <h4 style="color: #1f2937 !important;">π Detected Patterns</h4> | |
| """ | |
| if results.get('allPatterns'): | |
| for pattern in results['allPatterns']: | |
| severity_badge = { | |
| 'high': 'π΄', | |
| 'moderate': 'π‘', | |
| 'low': 'π’' | |
| }.get(pattern.get('severity', 'low'), 'βͺ') | |
| full_html += f""" | |
| <div style="margin: 8px 0; padding: 8px; background: white; border-radius: 4px;"> | |
| <p style="margin: 0; color: #1f2937 !important;"><strong style="color: #1f2937 !important;">{severity_badge} {pattern.get('name', 'Unknown')}</strong></p> | |
| <p style="margin: 4px 0 0 0; font-size: 14px; color: #6b7280 !important;">{pattern.get('description', 'No description available')}</p> | |
| </div> | |
| """ | |
| else: | |
| full_html += "<p style='color: #1f2937 !important;'>No specific patterns detected.</p>" | |
| full_html += """ | |
| </div> | |
| <div style="background: #f9fafb; border-radius: 8px; padding: 16px; margin: 16px 0;"> | |
| <h4 style="color: #1f2937 !important;">π Complete Analysis Output</h4> | |
| <div style="max-height: 400px; overflow-y: auto; background: white; padding: 12px; border-radius: 4px; font-family: monospace; font-size: 14px; white-space: pre-wrap; color: #1f2937 !important;">""" | |
| full_html += results.get('rawAnalysis', 'No detailed analysis available') | |
| full_html += """ | |
| </div> | |
| </div> | |
| </div> | |
| """ | |
| return gr.update(value=full_html, visible=True) | |
| except Exception as e: | |
| error_html = f""" | |
| <div style="background: #fee2e2; border-radius: 8px; padding: 16px; margin-top: 20px;"> | |
| <h4>β Error Loading Analysis</h4> | |
| <p>Unable to parse analysis results: {str(e)}</p> | |
| <p>Please try running the analysis again.</p> | |
| </div> | |
| """ | |
| return gr.update(value=error_html, visible=True) | |
| def generate_report(results_json_str, timeline_img): | |
| """Generate a downloadable report with all analysis information""" | |
| import tempfile | |
| import os | |
| from datetime import datetime | |
| try: | |
| if not results_json_str: | |
| return None | |
| # Handle both JSON string and dict inputs | |
| if isinstance(results_json_str, str): | |
| results = json.loads(results_json_str) | |
| elif isinstance(results_json_str, dict): | |
| results = results_json_str | |
| else: | |
| return None | |
| current_date = datetime.now().strftime("%Y-%m-%d") | |
| current_time = datetime.now().strftime("%I:%M %p") | |
| # Create comprehensive report | |
| report = f"""RELATIONSHIP PATTERN ANALYSIS REPORT | |
| Generated: {current_date} at {current_time} | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| EXECUTIVE SUMMARY | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| Risk Level: {results.get('riskLevel', 'Unknown').upper()} | |
| Risk Score: {results.get('riskScore', 'N/A')}% | |
| Risk Stage: {results.get('riskStage', 'Unknown').replace('-', ' ').title()} | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| DETECTED PATTERNS | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ""" | |
| # Add detected patterns | |
| if results.get('allPatterns'): | |
| for pattern in results['allPatterns']: | |
| severity_symbol = { | |
| 'high': 'π΄ HIGH', | |
| 'moderate': 'π‘ MODERATE', | |
| 'low': 'π’ LOW' | |
| }.get(pattern.get('severity', 'low'), 'βͺ UNKNOWN') | |
| report += f""" | |
| {severity_symbol} SEVERITY: {pattern.get('name', 'Unknown Pattern')} | |
| Description: {pattern.get('description', 'No description available')}""" | |
| else: | |
| report += "\n\nNo specific patterns detected in the analysis." | |
| # Add behavioral analysis | |
| report += f""" | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| BEHAVIORAL ANALYSIS | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| DARVO Score: {results.get('darvoScore', 0):.3f}""" | |
| darvo_score = results.get('darvoScore', 0) | |
| if darvo_score > 0.65: | |
| report += "\nDARVO Level: HIGH - Strong indication of narrative manipulation" | |
| elif darvo_score > 0.25: | |
| report += "\nDARVO Level: MODERATE - Some indication of narrative manipulation" | |
| else: | |
| report += "\nDARVO Level: LOW - Limited indication of narrative manipulation" | |
| report += """\n | |
| DARVO Definition: Deny, Attack, Reverse Victim & Offender - a manipulation | |
| tactic where the perpetrator denies wrongdoing, attacks the victim, and | |
| positions themselves as the victim. | |
| Emotional Tone Analysis:""" | |
| # Add emotional tones | |
| emotional_tones = results.get('emotionalTones', []) | |
| if emotional_tones: | |
| for i, tone in enumerate(emotional_tones): | |
| if tone and tone != 'neutral': | |
| report += f"\nMessage {i+1}: {tone}" | |
| if not any(tone != 'neutral' for tone in emotional_tones): | |
| report += "\nNo concerning emotional tones detected." | |
| else: | |
| report += "\nNo emotional tone data available." | |
| # Add recommendations | |
| report += f""" | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| PERSONALIZED RECOMMENDATIONS | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ""" | |
| recommendations = results.get('personalizedRecommendations', []) | |
| for i, rec in enumerate(recommendations, 1): | |
| report += f"\n{i}. {rec}" | |
| # Add safety planning | |
| safety_plan = results.get('safetyPlan', '') | |
| if safety_plan: | |
| report += f""" | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| SAFETY PLANNING | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| {safety_plan}""" | |
| # Add emergency resources | |
| report += """ | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| EMERGENCY RESOURCES | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| π¨ IMMEDIATE EMERGENCY: Call 911 | |
| 24/7 CRISIS SUPPORT: | |
| β’ National Domestic Violence Hotline: 1-800-799-7233 | |
| β’ Crisis Text Line: Text START to 88788 | |
| β’ National Suicide Prevention Lifeline: 988 | |
| β’ Online Chat Support: thehotline.org | |
| ADDITIONAL SUPPORT: | |
| β’ Local counseling services | |
| β’ Legal advocacy organizations | |
| β’ Trusted friends and family | |
| β’ Employee assistance programs (if available) | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| IMPORTANT DISCLAIMERS | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| β’ This analysis is for educational purposes only | |
| β’ It is not a substitute for professional counseling or legal advice | |
| β’ Trust your instincts about your safety | |
| β’ Consider sharing this report with a trusted counselor or advocate | |
| β’ Your messages were analyzed locally and not stored or shared | |
| Report Generated by: Relationship Pattern Analyzer | |
| Analysis Date: {current_date} | |
| Report Version: 1.0 | |
| βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ""" | |
| # Create temporary file | |
| temp_file = tempfile.NamedTemporaryFile( | |
| mode='w', | |
| suffix='.txt', | |
| prefix=f'relationship_analysis_report_{current_date.replace("-", "_")}_', | |
| delete=False, | |
| encoding='utf-8' | |
| ) | |
| temp_file.write(report) | |
| temp_file.close() | |
| return temp_file.name | |
| except Exception as e: | |
| # Create error report | |
| error_report = f"""RELATIONSHIP PATTERN ANALYSIS REPORT - ERROR | |
| Generated: {datetime.now().strftime("%Y-%m-%d at %I:%M %p")} | |
| An error occurred while generating the full report: {str(e)} | |
| Please try running the analysis again or contact support if the issue persists.""" | |
| temp_file = tempfile.NamedTemporaryFile( | |
| mode='w', | |
| suffix='.txt', | |
| prefix='error_report_', | |
| delete=False, | |
| encoding='utf-8' | |
| ) | |
| temp_file.write(error_report) | |
| temp_file.close() | |
| return temp_file.name | |
| def show_safety_plan_content(safety_plan_content): | |
| """Display the personalized safety plan""" | |
| if safety_plan_content: | |
| safety_plan_html = f""" | |
| <div style="background: white; border-radius: 12px; padding: 24px; border: 1px solid #e5e7eb; margin-top: 20px;"> | |
| <h3 style="color: #1f2937 !important;">π‘οΈ Your Personalized Safety Plan</h3> | |
| <div style="background: #f0fdf4; border-radius: 8px; padding: 16px; margin: 16px 0;"> | |
| <div style="white-space: pre-wrap; font-family: inherit; font-size: 14px; line-height: 1.5; color: #1f2937 !important;">{safety_plan_content}</div> | |
| </div> | |
| </div> | |
| """ | |
| return gr.update(value=safety_plan_html, visible=True) | |
| else: | |
| # Fallback to general safety information | |
| general_safety = """ | |
| <div style="background: white; border-radius: 12px; padding: 24px; border: 1px solid #e5e7eb; margin-top: 20px;"> | |
| <h3 style="color: #1f2937 !important;">π‘οΈ Safety Planning</h3> | |
| <div style="background: #f0fdf4; border-radius: 8px; padding: 16px; margin: 16px 0;"> | |
| <h4 style="color: #1f2937 !important;">Immediate Safety Steps:</h4> | |
| <ul style="color: #1f2937 !important;"> | |
| <li style="color: #1f2937 !important;">Trust your instincts - if something feels wrong, it probably is</li> | |
| <li style="color: #1f2937 !important;">Document concerning incidents with dates and details</li> | |
| <li style="color: #1f2937 !important;">Identify safe people you can reach out to</li> | |
| <li style="color: #1f2937 !important;">Keep important documents and emergency contacts accessible</li> | |
| <li style="color: #1f2937 !important;">Consider speaking with a counselor or trusted friend</li> | |
| </ul> | |
| <h4 style="color: #1f2937 !important;">Emergency Resources:</h4> | |
| <ul style="color: #1f2937 !important;"> | |
| <li style="color: #1f2937 !important;"><strong style="color: #1f2937 !important;">911</strong> - For immediate danger</li> | |
| <li style="color: #1f2937 !important;"><strong style="color: #1f2937 !important;">1-800-799-7233</strong> - National DV Hotline (24/7)</li> | |
| <li style="color: #1f2937 !important;"><strong style="color: #1f2937 !important;">Text START to 88788</strong> - Crisis Text Line</li> | |
| </ul> | |
| </div> | |
| </div> | |
| """ | |
| return gr.update(value=general_safety, visible=True) | |
| # Connect desktop event handlers | |
| analyze_btn_desktop.click( | |
| process_analysis, | |
| inputs=[msg1_desktop, msg2_desktop, msg3_desktop] + checklist_items_desktop + [none_selected_desktop], | |
| outputs=[ | |
| results_json_desktop, risk_summary_desktop, concerns_display_desktop, | |
| additional_metrics_desktop, recommendations_display_desktop, action_buttons_desktop, | |
| full_analysis_display_desktop, timeline_chart_desktop, download_file_desktop, safety_plan_display | |
| ] | |
| ) | |
| full_analysis_btn_desktop.click( | |
| show_full_analysis, | |
| inputs=[results_json_desktop], | |
| outputs=[full_analysis_display_desktop] | |
| ) | |
| download_btn_desktop.click( | |
| generate_report, | |
| inputs=[results_json_desktop, timeline_chart_desktop], | |
| outputs=[download_file_desktop] | |
| ).then( | |
| lambda: gr.update(visible=True), | |
| outputs=[download_file_desktop] | |
| ) | |
| safety_plan_btn_desktop.click( | |
| show_safety_plan_content, | |
| inputs=[safety_plan_display], | |
| outputs=[full_analysis_display_desktop] | |
| ) | |
| # Connect mobile event handlers | |
| analyze_btn_mobile.click( | |
| process_analysis, | |
| inputs=[msg1_mobile, msg2_mobile, msg3_mobile] + checklist_items_mobile + [none_selected_mobile], | |
| outputs=[ | |
| results_json_mobile, risk_summary_mobile, concerns_display_mobile, | |
| additional_metrics_mobile, recommendations_display_mobile, action_buttons_mobile, | |
| full_analysis_display_mobile, timeline_chart_mobile, download_file_mobile, safety_plan_display | |
| ] | |
| ) | |
| full_analysis_btn_mobile.click( | |
| show_full_analysis, | |
| inputs=[results_json_mobile], | |
| outputs=[full_analysis_display_mobile] | |
| ) | |
| download_btn_mobile.click( | |
| generate_report, | |
| inputs=[results_json_mobile, timeline_chart_mobile], | |
| outputs=[download_file_mobile] | |
| ).then( | |
| lambda: gr.update(visible=True), | |
| outputs=[download_file_mobile] | |
| ) | |
| safety_plan_btn_mobile.click( | |
| show_safety_plan_content, | |
| inputs=[safety_plan_display], | |
| outputs=[full_analysis_display_mobile] | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| try: | |
| print("π± Creating interface...") | |
| demo = create_mobile_friendly_interface() | |
| print("β Interface created successfully") | |
| print("π Launching demo...") | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False | |
| ) | |
| print("π App launched!") | |
| except Exception as e: | |
| print(f"β Error: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| raise | |