import gradio as gr import spaces import json import datetime import random from transformers import pipeline import torch import time # Custom CSS for better styling custom_css = """ .gradio-container { max-width: 1200px !important; } .alert-box { background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); color: white; padding: 20px; border-radius: 10px; margin: 10px 0; } .status-success { background: #d4edda; border: 1px solid #c3e6cb; color: #155724; padding: 10px; border-radius: 5px; } .status-warning { background: #fff3cd; border: 1px solid #ffeaa7; color: #856404; padding: 10px; border-radius: 5px; } """ # Initialize the LLM pipeline with zeroGPU support @spaces.GPU def initialize_llm(): try: # Check GPU availability device = "cuda" if torch.cuda.is_available() else "cpu" print(f"Using device: {device}") # Try to use a larger model with GPU acceleration model_id = "microsoft/DialoGPT-medium" pipe = pipeline( "text-generation", model=model_id, torch_dtype=torch.float16 if device == "cuda" else torch.float32, device_map="auto" if device == "cuda" else "cpu", max_length=512, pad_token_id=50256 ) return pipe, f"✅ LLM Model loaded on {device}: {model_id}" except Exception as e: return None, f"⚠️ LLM not available: {str(e)[:100]}... Using fallback analysis." pipe, model_status = initialize_llm() # Enhanced attack scenarios with more realistic data ATTACK_SCENARIOS = { "🔄 Lateral Movement": { "description": "Advanced Persistent Threat (APT) - Attacker moving laterally through network after initial compromise", "severity": "Critical", "alerts": [ { "id": "ALR-001", "timestamp": "2025-01-15 14:30:45", "source_ip": "192.168.1.100", "destination_ip": "192.168.1.25", "user": "corp\\john.doe", "alert_type": "Suspicious Process Execution", "severity": "High", "description": "Unusual PowerShell execution with encoded commands detected", "raw_log": "Process: powershell.exe -WindowStyle Hidden -enc ZXhlYyBjYWxjLmV4ZQ== Parent: winword.exe", "threat_intel": "Base64 encoded PowerShell commonly used by APT29 (Cozy Bear) for initial access", "mitre_tactic": "T1059.001 - PowerShell", "confidence": 85 }, { "id": "ALR-002", "timestamp": "2025-01-15 14:35:12", "source_ip": "192.168.1.100", "destination_ip": "192.168.1.50", "user": "corp\\john.doe", "alert_type": "Credential Dumping Attempt", "severity": "Critical", "description": "LSASS memory access detected - possible credential harvesting", "raw_log": "Process: rundll32.exe comsvcs.dll MiniDump [PID] lsass.dmp full", "threat_intel": "LSASS dumping technique associated with credential theft operations", "mitre_tactic": "T1003.001 - LSASS Memory", "confidence": 92 }, { "id": "ALR-003", "timestamp": "2025-01-15 14:42:18", "source_ip": "192.168.1.100", "destination_ip": "10.0.0.15", "user": "SYSTEM", "alert_type": "Abnormal Network Connection", "severity": "Medium", "description": "Connection to unusual internal subnet using stolen credentials", "raw_log": "TCP connection established to 10.0.0.15:445 from 192.168.1.100:51234", "threat_intel": "SMB connections to sensitive subnets often indicate lateral movement", "mitre_tactic": "T1021.002 - SMB/Windows Admin Shares", "confidence": 78 } ] }, "📧 Phishing Campaign": { "description": "Email-based social engineering attack leading to credential theft and data exfiltration", "severity": "High", "alerts": [ { "id": "ALR-004", "timestamp": "2025-01-15 09:15:30", "source_ip": "203.0.113.50", "destination_ip": "192.168.1.75", "user": "corp\\sarah.wilson", "alert_type": "Malicious Email Detected", "severity": "High", "description": "Suspicious email with credential harvesting link detected", "raw_log": "From: security@bank-urgent-verification.com Subject: URGENT: Account Suspended - Verify Now", "threat_intel": "Domain registered 48 hours ago, hosted on bulletproof hosting provider", "mitre_tactic": "T1566.002 - Spearphishing Link", "confidence": 88 }, { "id": "ALR-005", "timestamp": "2025-01-15 09:45:22", "source_ip": "192.168.1.75", "destination_ip": "203.0.113.50", "user": "corp\\sarah.wilson", "alert_type": "Credential Submission", "severity": "Critical", "description": "User credentials submitted to suspicious external site", "raw_log": "HTTPS POST to https://203.0.113.50/login.php - Credentials: username=sarah.wilson&password=[REDACTED]", "threat_intel": "IP address hosting multiple phishing kits targeting financial institutions", "mitre_tactic": "T1056.003 - Web Portal Capture", "confidence": 95 } ] }, "🔒 Ransomware Attack": { "description": "File encryption attack with ransom demand - likely REvil/Sodinokibi variant", "severity": "Critical", "alerts": [ { "id": "ALR-006", "timestamp": "2025-01-15 16:20:10", "source_ip": "192.168.1.85", "destination_ip": "192.168.1.85", "user": "corp\\admin.backup", "alert_type": "Mass File Encryption", "severity": "Critical", "description": "Rapid file modifications detected across multiple directories", "raw_log": "Files encrypted: 1,247 in C:\\Users\\Documents\\ Extensions changed to: .locked2025", "threat_intel": "Encryption pattern and extension match REvil ransomware family signatures", "mitre_tactic": "T1486 - Data Encrypted for Impact", "confidence": 97 }, { "id": "ALR-007", "timestamp": "2025-01-15 16:25:33", "source_ip": "192.168.1.85", "destination_ip": "45.33.22.11", "user": "SYSTEM", "alert_type": "Command and Control Communication", "severity": "High", "description": "Encrypted communication to known ransomware C2 infrastructure", "raw_log": "TLS 1.3 connection established to 45.33.22.11:8443 - Data exchanged: 2.3KB", "threat_intel": "IP address previously associated with REvil ransomware C2 operations", "mitre_tactic": "T1071.001 - Web Protocols", "confidence": 91 } ] } } @spaces.GPU def generate_advanced_llm_analysis(alert_data, analyst_level): """Generate comprehensive LLM-based analysis with enhanced prompting and GPU acceleration""" # Enhanced context with more structured prompting system_context = f"""You are an expert cybersecurity analyst assistant specializing in SOC operations. Analyze the following security alert for a Level {analyst_level} analyst. ALERT CONTEXT: ID: {alert_data['id']} Type: {alert_data['alert_type']} Severity: {alert_data['severity']} Timestamp: {alert_data['timestamp']} Network: {alert_data['source_ip']} → {alert_data['destination_ip']} User: {alert_data['user']} Description: {alert_data['description']} Technical Details: {alert_data['raw_log']} Threat Intelligence: {alert_data['threat_intel']} MITRE ATT&CK: {alert_data['mitre_tactic']} Confidence: {alert_data['confidence']}% Provide analysis appropriate for {analyst_level} level:""" if pipe: try: # Use GPU acceleration for faster inference device = next(pipe.model.parameters()).device print(f"LLM running on device: {device}") prompt = f"{system_context}\n\nAnalysis:" response = pipe( prompt, max_new_tokens=300, do_sample=True, temperature=0.7, top_p=0.9, pad_token_id=pipe.tokenizer.eos_token_id ) generated_text = response[0]['generated_text'] analysis = generated_text[len(prompt):].strip() return analysis if analysis else get_fallback_analysis(alert_data, analyst_level) except Exception as e: print(f"LLM Error: {e}") return f"LLM Processing Error: {str(e)}\n\n{get_fallback_analysis(alert_data, analyst_level)}" return get_fallback_analysis(alert_data, analyst_level) def get_fallback_analysis(alert_data, analyst_level): """Enhanced fallback analysis with detailed recommendations""" base_analysis = { "L1": { "icon": "🚨", "title": "L1 TRIAGE ANALYSIS", "focus": "Initial Assessment & Escalation", "template": """ {icon} {title} ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 🎯 THREAT SUMMARY: {alert_type} - {severity} severity ⏰ OCCURRED: {timestamp} 🌐 AFFECTED SYSTEM: {source_ip} (User: {user}) 🔍 CONFIDENCE LEVEL: {confidence}% 🚀 IMMEDIATE ACTIONS: • Isolate affected system: {source_ip} • Verify user account status: {user} • Check for similar alerts in timeframe • Document incident ID: {id} ⬆️ ESCALATION CRITERIA: • Severity: {severity} - Meets L2 escalation threshold • MITRE Tactic: {mitre_tactic} • Recommend immediate L2 review 📋 INITIAL NOTES: {threat_intel} """ }, "L2": { "icon": "🔍", "title": "L2 INVESTIGATION ANALYSIS", "focus": "Detailed Investigation & Correlation", "template": """ {icon} {title} ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 🎯 ATTACK VECTOR: {description} ⚙️ TECHNICAL DETAILS: {raw_log} 🧠 THREAT CONTEXT: {threat_intel} 🎪 MITRE ATT&CK: {mitre_tactic} 🔬 INVESTIGATION STEPS: 1. Examine parent process tree for {source_ip} 2. Correlate network connections in ±30min window 3. Review authentication logs for user: {user} 4. Check for indicators across environment 5. Analyze file system changes (if applicable) 🎯 CORRELATION POINTS: • Source IP timeline analysis • User behavior baseline comparison • Similar TTPs in recent incidents • Network segmentation verification 📊 RISK ASSESSMENT: • Technical Impact: {severity} • Business Risk: Review asset criticality • Containment Priority: High (based on {confidence}% confidence) ⬆️ L3 ESCALATION IF: • Attack campaign indicators found • Critical asset involvement confirmed • Advanced persistent threat suspected """ }, "L3": { "icon": "🎯", "title": "L3 EXPERT ANALYSIS", "focus": "Attribution & Strategic Response", "template": """ {icon} {title} ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 🎭 ADVERSARY PROFILE: Advanced threat actor 🎪 CAMPAIGN ANALYSIS: {threat_intel} 💼 BUSINESS IMPACT: {severity} - Requires C-level awareness 🛡️ DEFENSIVE POSTURE: Enhanced monitoring required 🕵️ THREAT HUNTING PRIORITIES: 1. Memory forensics on {source_ip} 2. Network traffic deep packet inspection 3. Endpoint artifact preservation 4. Active Directory security log analysis 5. Cloud infrastructure review (if applicable) 🎯 ATTRIBUTION INDICATORS: • TTPs match: {mitre_tactic} • Technical sophistication: High • Targeting pattern: [Analyze organizational profile] • Infrastructure overlap: Review IOC databases 🛠️ MITIGATION STRATEGY: • Immediate: Block C2 communications • Short-term: Deploy hunting queries • Medium-term: Security architecture review • Long-term: Staff training and awareness 📈 EXECUTIVE BRIEFING POINTS: • Sophisticated attack requiring coordinated response • Potential for lateral movement and data exfiltration • Recommend incident response team activation • Consider external forensics support 🔮 PREDICTIVE ANALYSIS: • High probability of follow-up attacks • Recommend 48-72 hour enhanced monitoring • Consider threat landscape implications """ } } if analyst_level in base_analysis: template = base_analysis[analyst_level]["template"] return template.format( icon=base_analysis[analyst_level]["icon"], title=base_analysis[analyst_level]["title"], **alert_data ) return "Analysis not available for specified level." def analyze_alert_comprehensive(scenario_name, alert_index, analyst_level): """Enhanced main analysis function with timing and status updates""" start_time = time.time() # Validate inputs if scenario_name not in ATTACK_SCENARIOS: return "❌ Invalid scenario selected.", "", "Error: Invalid scenario" scenario = ATTACK_SCENARIOS[scenario_name] alerts = scenario["alerts"] if alert_index >= len(alerts): return "❌ Invalid alert index.", "", "Error: Invalid alert index" selected_alert = alerts[alert_index] # Generate comprehensive analysis analysis = generate_advanced_llm_analysis(selected_alert, analyst_level) # Enhanced alert details formatting alert_details = f""" 🎫 ALERT ID: {selected_alert['id']} | 🕐 {selected_alert['timestamp']} ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 🌐 NETWORK FLOW: Source: {selected_alert['source_ip']} → Destination: {selected_alert['destination_ip']} 👤 USER CONTEXT: Account: {selected_alert['user']} ⚠️ ALERT CLASSIFICATION: Type: {selected_alert['alert_type']} Severity: {selected_alert['severity']} Confidence: {selected_alert['confidence']}% 📝 DESCRIPTION: {selected_alert['description']} 🔍 TECHNICAL EVIDENCE: {selected_alert['raw_log']} 🧠 THREAT INTELLIGENCE: {selected_alert['threat_intel']} 🎪 MITRE ATT&CK MAPPING: {selected_alert['mitre_tactic']} ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ """ processing_time = round(time.time() - start_time, 2) status_message = f"✅ {analyst_level} analysis completed in {processing_time}s | Model: {model_status}" return alert_details, analysis, status_message def get_enhanced_scenario_info(scenario_name): """Enhanced scenario information with threat overview""" if scenario_name in ATTACK_SCENARIOS: scenario = ATTACK_SCENARIOS[scenario_name] info = f""" ## 🎭 **Attack Scenario: {scenario_name}** **📋 Description:** {scenario['description']} **⚠️ Severity Level:** {scenario['severity']} **📊 Total Alerts:** {len(scenario['alerts'])} security events detected ### 🔍 **Alert Timeline:** """ for i, alert in enumerate(scenario['alerts']): info += f""" **[{i+1}] {alert['timestamp']}** - {alert['alert_type']} └─ Severity: {alert['severity']} | Confidence: {alert['confidence']}% """ info += f""" ### 🎯 **Analysis Capabilities:** - **L1 Triage:** Initial assessment and escalation decisions - **L2 Investigation:** Detailed technical analysis and correlation - **L3 Expert:** Attribution, impact assessment, and strategic response """ return info return "⚠️ No scenario selected. Please choose an attack scenario to begin analysis." # Create enhanced Gradio interface with gr.Blocks(title="SOC LLM Assistant - Advanced PoC", theme=gr.themes.Soft(), css=custom_css) as demo: # Header gr.Markdown(""" # 🛡️ SOC LLM Assistant - Advanced Proof of Concept **Intelligent Security Alert Analysis for Multi-Level SOC Operations** *Demonstrating LLM-powered assistance for L1, L2, and L3 security analysts* """) # Model status display gr.Markdown(f"🤖 **System Status:** {model_status}") with gr.Row(): # Left Panel - Controls with gr.Column(scale=1, min_width=300): gr.Markdown("## 🎮 Attack Simulation Control") scenario_dropdown = gr.Dropdown( choices=list(ATTACK_SCENARIOS.keys()), label="🎭 Select Attack Scenario", value="🔄 Lateral Movement", interactive=True ) scenario_info = gr.Markdown() gr.Markdown("---") gr.Markdown("## ⚙️ Analysis Configuration") alert_slider = gr.Slider( minimum=0, maximum=2, step=1, value=0, label="📋 Alert Selection", info="Choose which alert from the scenario to analyze" ) analyst_level = gr.Radio( choices=["L1", "L2", "L3"], label="👤 Analyst Level", value="L2", info="L1: Triage | L2: Investigation | L3: Expert Analysis" ) analyze_btn = gr.Button( "🔍 Analyze Alert", variant="primary", size="lg" ) gr.Markdown("---") gr.Markdown("## 📊 Quick Stats") gr.Markdown(""" **🎯 Demo Features:** - 3 realistic attack scenarios - Multi-level analysis (L1/L2/L3) - MITRE ATT&CK mapping - Threat intelligence integration - Real-time LLM processing """) # Right Panel - Results with gr.Column(scale=2): gr.Markdown("## 📋 Security Alert Details") alert_output = gr.Textbox( label="🎫 Raw Alert Information", lines=15, interactive=False, placeholder="Alert details will appear here after analysis..." ) gr.Markdown("## 🤖 AI-Powered Analysis") analysis_output = gr.Textbox( label="🧠 Intelligent Analysis & Recommendations", lines=20, interactive=False, placeholder="LLM analysis will appear here after processing..." ) status_output = gr.Textbox( label="📊 Processing Status", interactive=False, lines=1 ) # Footer information gr.Markdown(""" --- ## 📖 **Usage Instructions:** 1. **📊 Select Scenario:** Choose from realistic cybersecurity attack scenarios 2. **🎯 Pick Alert:** Use the slider to select which alert in the sequence to analyze 3. **👤 Choose Level:** Select analyst expertise level (L1/L2/L3) for tailored analysis 4. **🔍 Analyze:** Click the analyze button to get AI-powered insights and recommendations ## 🎯 **Key Capabilities Demonstrated:** - **🎭 Realistic Scenarios:** Based on actual cybersecurity incidents and attack patterns - **🧠 Contextual Analysis:** LLM considers all available metadata, threat intelligence, and historical patterns - **👥 Role-Based Insights:** Tailored recommendations for different SOC analyst skill levels - **⚡ Real-Time Processing:** Immediate analysis with actionable next steps - **🎪 Industry Standards:** MITRE ATT&CK framework integration for standardized threat classification ## 🔬 **Research Value:** This PoC demonstrates the feasibility of LLM integration in operational security environments, supporting research in automated threat analysis, human-AI collaboration, and intelligent SOC operations. --- **👨‍🎓 Developed by:** Abdullah Alanazi | **🏛️ Institution:** KAUST | **👨‍🏫 Supervisor:** Prof. Ali Shoker """) # Event handlers with enhanced functionality scenario_dropdown.change( fn=get_enhanced_scenario_info, inputs=[scenario_dropdown], outputs=[scenario_info] ) # Update slider maximum based on scenario def update_slider_max(scenario_name): if scenario_name in ATTACK_SCENARIOS: max_alerts = len(ATTACK_SCENARIOS[scenario_name]["alerts"]) - 1 return gr.Slider(maximum=max_alerts, value=0) return gr.Slider(maximum=2, value=0) scenario_dropdown.change( fn=update_slider_max, inputs=[scenario_dropdown], outputs=[alert_slider] ) analyze_btn.click( fn=analyze_alert_comprehensive, inputs=[scenario_dropdown, alert_slider, analyst_level], outputs=[alert_output, analysis_output, status_output] ) # Initialize with default scenario demo.load( fn=get_enhanced_scenario_info, inputs=[scenario_dropdown], outputs=[scenario_info] ) # Launch configuration if __name__ == "__main__": demo.launch( share=True, server_name="0.0.0.0", server_port=7860, show_error=True )