abdull4h's picture
Create app.py
3b6960a verified
raw
history blame
22.7 kB
import gradio as gr
import spaces
import json
import datetime
import random
from transformers import pipeline
import torch
import time
# Custom CSS for better styling
custom_css = """
.gradio-container {
max-width: 1200px !important;
}
.alert-box {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 20px;
border-radius: 10px;
margin: 10px 0;
}
.status-success {
background: #d4edda;
border: 1px solid #c3e6cb;
color: #155724;
padding: 10px;
border-radius: 5px;
}
.status-warning {
background: #fff3cd;
border: 1px solid #ffeaa7;
color: #856404;
padding: 10px;
border-radius: 5px;
}
"""
# Initialize the LLM pipeline with zeroGPU support
@spaces.GPU
def initialize_llm():
try:
# Check GPU availability
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using device: {device}")
# Try to use a larger model with GPU acceleration
model_id = "microsoft/DialoGPT-medium"
pipe = pipeline(
"text-generation",
model=model_id,
torch_dtype=torch.float16 if device == "cuda" else torch.float32,
device_map="auto" if device == "cuda" else "cpu",
max_length=512,
pad_token_id=50256
)
return pipe, f"✅ LLM Model loaded on {device}: {model_id}"
except Exception as e:
return None, f"⚠️ LLM not available: {str(e)[:100]}... Using fallback analysis."
pipe, model_status = initialize_llm()
# Enhanced attack scenarios with more realistic data
ATTACK_SCENARIOS = {
"🔄 Lateral Movement": {
"description": "Advanced Persistent Threat (APT) - Attacker moving laterally through network after initial compromise",
"severity": "Critical",
"alerts": [
{
"id": "ALR-001",
"timestamp": "2025-01-15 14:30:45",
"source_ip": "192.168.1.100",
"destination_ip": "192.168.1.25",
"user": "corp\\john.doe",
"alert_type": "Suspicious Process Execution",
"severity": "High",
"description": "Unusual PowerShell execution with encoded commands detected",
"raw_log": "Process: powershell.exe -WindowStyle Hidden -enc ZXhlYyBjYWxjLmV4ZQ== Parent: winword.exe",
"threat_intel": "Base64 encoded PowerShell commonly used by APT29 (Cozy Bear) for initial access",
"mitre_tactic": "T1059.001 - PowerShell",
"confidence": 85
},
{
"id": "ALR-002",
"timestamp": "2025-01-15 14:35:12",
"source_ip": "192.168.1.100",
"destination_ip": "192.168.1.50",
"user": "corp\\john.doe",
"alert_type": "Credential Dumping Attempt",
"severity": "Critical",
"description": "LSASS memory access detected - possible credential harvesting",
"raw_log": "Process: rundll32.exe comsvcs.dll MiniDump [PID] lsass.dmp full",
"threat_intel": "LSASS dumping technique associated with credential theft operations",
"mitre_tactic": "T1003.001 - LSASS Memory",
"confidence": 92
},
{
"id": "ALR-003",
"timestamp": "2025-01-15 14:42:18",
"source_ip": "192.168.1.100",
"destination_ip": "10.0.0.15",
"user": "SYSTEM",
"alert_type": "Abnormal Network Connection",
"severity": "Medium",
"description": "Connection to unusual internal subnet using stolen credentials",
"raw_log": "TCP connection established to 10.0.0.15:445 from 192.168.1.100:51234",
"threat_intel": "SMB connections to sensitive subnets often indicate lateral movement",
"mitre_tactic": "T1021.002 - SMB/Windows Admin Shares",
"confidence": 78
}
]
},
"📧 Phishing Campaign": {
"description": "Email-based social engineering attack leading to credential theft and data exfiltration",
"severity": "High",
"alerts": [
{
"id": "ALR-004",
"timestamp": "2025-01-15 09:15:30",
"source_ip": "203.0.113.50",
"destination_ip": "192.168.1.75",
"user": "corp\\sarah.wilson",
"alert_type": "Malicious Email Detected",
"severity": "High",
"description": "Suspicious email with credential harvesting link detected",
"raw_log": "From: [email protected] Subject: URGENT: Account Suspended - Verify Now",
"threat_intel": "Domain registered 48 hours ago, hosted on bulletproof hosting provider",
"mitre_tactic": "T1566.002 - Spearphishing Link",
"confidence": 88
},
{
"id": "ALR-005",
"timestamp": "2025-01-15 09:45:22",
"source_ip": "192.168.1.75",
"destination_ip": "203.0.113.50",
"user": "corp\\sarah.wilson",
"alert_type": "Credential Submission",
"severity": "Critical",
"description": "User credentials submitted to suspicious external site",
"raw_log": "HTTPS POST to https://203.0.113.50/login.php - Credentials: username=sarah.wilson&password=[REDACTED]",
"threat_intel": "IP address hosting multiple phishing kits targeting financial institutions",
"mitre_tactic": "T1056.003 - Web Portal Capture",
"confidence": 95
}
]
},
"🔒 Ransomware Attack": {
"description": "File encryption attack with ransom demand - likely REvil/Sodinokibi variant",
"severity": "Critical",
"alerts": [
{
"id": "ALR-006",
"timestamp": "2025-01-15 16:20:10",
"source_ip": "192.168.1.85",
"destination_ip": "192.168.1.85",
"user": "corp\\admin.backup",
"alert_type": "Mass File Encryption",
"severity": "Critical",
"description": "Rapid file modifications detected across multiple directories",
"raw_log": "Files encrypted: 1,247 in C:\\Users\\Documents\\ Extensions changed to: .locked2025",
"threat_intel": "Encryption pattern and extension match REvil ransomware family signatures",
"mitre_tactic": "T1486 - Data Encrypted for Impact",
"confidence": 97
},
{
"id": "ALR-007",
"timestamp": "2025-01-15 16:25:33",
"source_ip": "192.168.1.85",
"destination_ip": "45.33.22.11",
"user": "SYSTEM",
"alert_type": "Command and Control Communication",
"severity": "High",
"description": "Encrypted communication to known ransomware C2 infrastructure",
"raw_log": "TLS 1.3 connection established to 45.33.22.11:8443 - Data exchanged: 2.3KB",
"threat_intel": "IP address previously associated with REvil ransomware C2 operations",
"mitre_tactic": "T1071.001 - Web Protocols",
"confidence": 91
}
]
}
}
@spaces.GPU
def generate_advanced_llm_analysis(alert_data, analyst_level):
"""Generate comprehensive LLM-based analysis with enhanced prompting and GPU acceleration"""
# Enhanced context with more structured prompting
system_context = f"""You are an expert cybersecurity analyst assistant specializing in SOC operations.
Analyze the following security alert for a Level {analyst_level} analyst.
ALERT CONTEXT:
ID: {alert_data['id']}
Type: {alert_data['alert_type']}
Severity: {alert_data['severity']}
Timestamp: {alert_data['timestamp']}
Network: {alert_data['source_ip']}{alert_data['destination_ip']}
User: {alert_data['user']}
Description: {alert_data['description']}
Technical Details: {alert_data['raw_log']}
Threat Intelligence: {alert_data['threat_intel']}
MITRE ATT&CK: {alert_data['mitre_tactic']}
Confidence: {alert_data['confidence']}%
Provide analysis appropriate for {analyst_level} level:"""
if pipe:
try:
# Use GPU acceleration for faster inference
device = next(pipe.model.parameters()).device
print(f"LLM running on device: {device}")
prompt = f"{system_context}\n\nAnalysis:"
response = pipe(
prompt,
max_new_tokens=300,
do_sample=True,
temperature=0.7,
top_p=0.9,
pad_token_id=pipe.tokenizer.eos_token_id
)
generated_text = response[0]['generated_text']
analysis = generated_text[len(prompt):].strip()
return analysis if analysis else get_fallback_analysis(alert_data, analyst_level)
except Exception as e:
print(f"LLM Error: {e}")
return f"LLM Processing Error: {str(e)}\n\n{get_fallback_analysis(alert_data, analyst_level)}"
return get_fallback_analysis(alert_data, analyst_level)
def get_fallback_analysis(alert_data, analyst_level):
"""Enhanced fallback analysis with detailed recommendations"""
base_analysis = {
"L1": {
"icon": "🚨",
"title": "L1 TRIAGE ANALYSIS",
"focus": "Initial Assessment & Escalation",
"template": """
{icon} {title}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
🎯 THREAT SUMMARY: {alert_type} - {severity} severity
⏰ OCCURRED: {timestamp}
🌐 AFFECTED SYSTEM: {source_ip} (User: {user})
🔍 CONFIDENCE LEVEL: {confidence}%
🚀 IMMEDIATE ACTIONS:
• Isolate affected system: {source_ip}
• Verify user account status: {user}
• Check for similar alerts in timeframe
• Document incident ID: {id}
⬆️ ESCALATION CRITERIA:
• Severity: {severity} - Meets L2 escalation threshold
• MITRE Tactic: {mitre_tactic}
• Recommend immediate L2 review
📋 INITIAL NOTES:
{threat_intel}
"""
},
"L2": {
"icon": "🔍",
"title": "L2 INVESTIGATION ANALYSIS",
"focus": "Detailed Investigation & Correlation",
"template": """
{icon} {title}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
🎯 ATTACK VECTOR: {description}
⚙️ TECHNICAL DETAILS: {raw_log}
🧠 THREAT CONTEXT: {threat_intel}
🎪 MITRE ATT&CK: {mitre_tactic}
🔬 INVESTIGATION STEPS:
1. Examine parent process tree for {source_ip}
2. Correlate network connections in ±30min window
3. Review authentication logs for user: {user}
4. Check for indicators across environment
5. Analyze file system changes (if applicable)
🎯 CORRELATION POINTS:
• Source IP timeline analysis
• User behavior baseline comparison
• Similar TTPs in recent incidents
• Network segmentation verification
📊 RISK ASSESSMENT:
• Technical Impact: {severity}
• Business Risk: Review asset criticality
• Containment Priority: High (based on {confidence}% confidence)
⬆️ L3 ESCALATION IF:
• Attack campaign indicators found
• Critical asset involvement confirmed
• Advanced persistent threat suspected
"""
},
"L3": {
"icon": "🎯",
"title": "L3 EXPERT ANALYSIS",
"focus": "Attribution & Strategic Response",
"template": """
{icon} {title}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
🎭 ADVERSARY PROFILE: Advanced threat actor
🎪 CAMPAIGN ANALYSIS: {threat_intel}
💼 BUSINESS IMPACT: {severity} - Requires C-level awareness
🛡️ DEFENSIVE POSTURE: Enhanced monitoring required
🕵️ THREAT HUNTING PRIORITIES:
1. Memory forensics on {source_ip}
2. Network traffic deep packet inspection
3. Endpoint artifact preservation
4. Active Directory security log analysis
5. Cloud infrastructure review (if applicable)
🎯 ATTRIBUTION INDICATORS:
• TTPs match: {mitre_tactic}
• Technical sophistication: High
• Targeting pattern: [Analyze organizational profile]
• Infrastructure overlap: Review IOC databases
🛠️ MITIGATION STRATEGY:
• Immediate: Block C2 communications
• Short-term: Deploy hunting queries
• Medium-term: Security architecture review
• Long-term: Staff training and awareness
📈 EXECUTIVE BRIEFING POINTS:
• Sophisticated attack requiring coordinated response
• Potential for lateral movement and data exfiltration
• Recommend incident response team activation
• Consider external forensics support
🔮 PREDICTIVE ANALYSIS:
• High probability of follow-up attacks
• Recommend 48-72 hour enhanced monitoring
• Consider threat landscape implications
"""
}
}
if analyst_level in base_analysis:
template = base_analysis[analyst_level]["template"]
return template.format(
icon=base_analysis[analyst_level]["icon"],
title=base_analysis[analyst_level]["title"],
**alert_data
)
return "Analysis not available for specified level."
def analyze_alert_comprehensive(scenario_name, alert_index, analyst_level):
"""Enhanced main analysis function with timing and status updates"""
start_time = time.time()
# Validate inputs
if scenario_name not in ATTACK_SCENARIOS:
return "❌ Invalid scenario selected.", "", "Error: Invalid scenario"
scenario = ATTACK_SCENARIOS[scenario_name]
alerts = scenario["alerts"]
if alert_index >= len(alerts):
return "❌ Invalid alert index.", "", "Error: Invalid alert index"
selected_alert = alerts[alert_index]
# Generate comprehensive analysis
analysis = generate_advanced_llm_analysis(selected_alert, analyst_level)
# Enhanced alert details formatting
alert_details = f"""
🎫 ALERT ID: {selected_alert['id']} | 🕐 {selected_alert['timestamp']}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
🌐 NETWORK FLOW:
Source: {selected_alert['source_ip']} → Destination: {selected_alert['destination_ip']}
👤 USER CONTEXT:
Account: {selected_alert['user']}
⚠️ ALERT CLASSIFICATION:
Type: {selected_alert['alert_type']}
Severity: {selected_alert['severity']}
Confidence: {selected_alert['confidence']}%
📝 DESCRIPTION:
{selected_alert['description']}
🔍 TECHNICAL EVIDENCE:
{selected_alert['raw_log']}
🧠 THREAT INTELLIGENCE:
{selected_alert['threat_intel']}
🎪 MITRE ATT&CK MAPPING:
{selected_alert['mitre_tactic']}
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
"""
processing_time = round(time.time() - start_time, 2)
status_message = f"✅ {analyst_level} analysis completed in {processing_time}s | Model: {model_status}"
return alert_details, analysis, status_message
def get_enhanced_scenario_info(scenario_name):
"""Enhanced scenario information with threat overview"""
if scenario_name in ATTACK_SCENARIOS:
scenario = ATTACK_SCENARIOS[scenario_name]
info = f"""
## 🎭 **Attack Scenario: {scenario_name}**
**📋 Description:** {scenario['description']}
**⚠️ Severity Level:** {scenario['severity']}
**📊 Total Alerts:** {len(scenario['alerts'])} security events detected
### 🔍 **Alert Timeline:**
"""
for i, alert in enumerate(scenario['alerts']):
info += f"""
**[{i+1}] {alert['timestamp']}** - {alert['alert_type']}
└─ Severity: {alert['severity']} | Confidence: {alert['confidence']}%
"""
info += f"""
### 🎯 **Analysis Capabilities:**
- **L1 Triage:** Initial assessment and escalation decisions
- **L2 Investigation:** Detailed technical analysis and correlation
- **L3 Expert:** Attribution, impact assessment, and strategic response
"""
return info
return "⚠️ No scenario selected. Please choose an attack scenario to begin analysis."
# Create enhanced Gradio interface
with gr.Blocks(title="SOC LLM Assistant - Advanced PoC", theme=gr.themes.Soft(), css=custom_css) as demo:
# Header
gr.Markdown("""
# 🛡️ SOC LLM Assistant - Advanced Proof of Concept
**Intelligent Security Alert Analysis for Multi-Level SOC Operations**
*Demonstrating LLM-powered assistance for L1, L2, and L3 security analysts*
""")
# Model status display
gr.Markdown(f"🤖 **System Status:** {model_status}")
with gr.Row():
# Left Panel - Controls
with gr.Column(scale=1, min_width=300):
gr.Markdown("## 🎮 Attack Simulation Control")
scenario_dropdown = gr.Dropdown(
choices=list(ATTACK_SCENARIOS.keys()),
label="🎭 Select Attack Scenario",
value="🔄 Lateral Movement",
interactive=True
)
scenario_info = gr.Markdown()
gr.Markdown("---")
gr.Markdown("## ⚙️ Analysis Configuration")
alert_slider = gr.Slider(
minimum=0,
maximum=2,
step=1,
value=0,
label="📋 Alert Selection",
info="Choose which alert from the scenario to analyze"
)
analyst_level = gr.Radio(
choices=["L1", "L2", "L3"],
label="👤 Analyst Level",
value="L2",
info="L1: Triage | L2: Investigation | L3: Expert Analysis"
)
analyze_btn = gr.Button(
"🔍 Analyze Alert",
variant="primary",
size="lg"
)
gr.Markdown("---")
gr.Markdown("## 📊 Quick Stats")
gr.Markdown("""
**🎯 Demo Features:**
- 3 realistic attack scenarios
- Multi-level analysis (L1/L2/L3)
- MITRE ATT&CK mapping
- Threat intelligence integration
- Real-time LLM processing
""")
# Right Panel - Results
with gr.Column(scale=2):
gr.Markdown("## 📋 Security Alert Details")
alert_output = gr.Textbox(
label="🎫 Raw Alert Information",
lines=15,
interactive=False,
placeholder="Alert details will appear here after analysis..."
)
gr.Markdown("## 🤖 AI-Powered Analysis")
analysis_output = gr.Textbox(
label="🧠 Intelligent Analysis & Recommendations",
lines=20,
interactive=False,
placeholder="LLM analysis will appear here after processing..."
)
status_output = gr.Textbox(
label="📊 Processing Status",
interactive=False,
lines=1
)
# Footer information
gr.Markdown("""
---
## 📖 **Usage Instructions:**
1. **📊 Select Scenario:** Choose from realistic cybersecurity attack scenarios
2. **🎯 Pick Alert:** Use the slider to select which alert in the sequence to analyze
3. **👤 Choose Level:** Select analyst expertise level (L1/L2/L3) for tailored analysis
4. **🔍 Analyze:** Click the analyze button to get AI-powered insights and recommendations
## 🎯 **Key Capabilities Demonstrated:**
- **🎭 Realistic Scenarios:** Based on actual cybersecurity incidents and attack patterns
- **🧠 Contextual Analysis:** LLM considers all available metadata, threat intelligence, and historical patterns
- **👥 Role-Based Insights:** Tailored recommendations for different SOC analyst skill levels
- **⚡ Real-Time Processing:** Immediate analysis with actionable next steps
- **🎪 Industry Standards:** MITRE ATT&CK framework integration for standardized threat classification
## 🔬 **Research Value:**
This PoC demonstrates the feasibility of LLM integration in operational security environments, supporting research in automated threat analysis, human-AI collaboration, and intelligent SOC operations.
---
**👨‍🎓 Developed by:** Abdullah Alanazi | **🏛️ Institution:** KAUST | **👨‍🏫 Supervisor:** Prof. Ali Shoker
""")
# Event handlers with enhanced functionality
scenario_dropdown.change(
fn=get_enhanced_scenario_info,
inputs=[scenario_dropdown],
outputs=[scenario_info]
)
# Update slider maximum based on scenario
def update_slider_max(scenario_name):
if scenario_name in ATTACK_SCENARIOS:
max_alerts = len(ATTACK_SCENARIOS[scenario_name]["alerts"]) - 1
return gr.Slider(maximum=max_alerts, value=0)
return gr.Slider(maximum=2, value=0)
scenario_dropdown.change(
fn=update_slider_max,
inputs=[scenario_dropdown],
outputs=[alert_slider]
)
analyze_btn.click(
fn=analyze_alert_comprehensive,
inputs=[scenario_dropdown, alert_slider, analyst_level],
outputs=[alert_output, analysis_output, status_output]
)
# Initialize with default scenario
demo.load(
fn=get_enhanced_scenario_info,
inputs=[scenario_dropdown],
outputs=[scenario_info]
)
# Launch configuration
if __name__ == "__main__":
demo.launch(
share=True,
server_name="0.0.0.0",
server_port=7860,
show_error=True
)