Spaces:
Sleeping
Sleeping
import gradio as gr | |
import spaces | |
from transformers import pipeline | |
import torch | |
import time | |
# Simple CSS for clean design | |
simple_css = """ | |
.gradio-container { | |
max-width: 900px !important; | |
margin: 0 auto !important; | |
font-family: 'Arial', sans-serif; | |
} | |
.threat-input { | |
border-radius: 8px !important; | |
border: 2px solid #e0e0e0 !important; | |
padding: 15px !important; | |
font-size: 14px !important; | |
} | |
.threat-input:focus { | |
border-color: #667eea !important; | |
} | |
.analyze-btn { | |
background: #667eea !important; | |
border: none !important; | |
border-radius: 8px !important; | |
padding: 12px 30px !important; | |
font-size: 16px !important; | |
font-weight: 600 !important; | |
color: white !important; | |
} | |
.analysis-output { | |
background: #f8f9fa !important; | |
border-radius: 8px !important; | |
border: 1px solid #e0e0e0 !important; | |
padding: 20px !important; | |
line-height: 1.6 !important; | |
} | |
.status-box { | |
background: #d4edda !important; | |
border: 1px solid #c3e6cb !important; | |
color: #155724 !important; | |
padding: 10px !important; | |
border-radius: 6px !important; | |
margin: 10px 0 !important; | |
} | |
""" | |
# Global model variables | |
pipe = None | |
model_status = "🔄 Loading model..." | |
def load_model(): | |
"""Load the best available model""" | |
global pipe, model_status | |
models_to_try = [ | |
"openai/gpt-oss-20b", | |
"microsoft/DialoGPT-large", | |
"microsoft/DialoGPT-medium", | |
"gpt2-large" | |
] | |
for model_name in models_to_try: | |
try: | |
print(f"🔄 Loading {model_name}...") | |
pipe = pipeline( | |
"text-generation", | |
model=model_name, | |
torch_dtype="auto", | |
device_map="auto" if torch.cuda.is_available() else None, | |
trust_remote_code=True | |
) | |
# Test the model | |
pipe("Test", max_new_tokens=5, do_sample=False) | |
model_status = f"✅ {model_name} ready" | |
print(model_status) | |
return model_status | |
except Exception as e: | |
print(f"❌ {model_name} failed: {str(e)[:50]}") | |
if torch.cuda.is_available(): | |
torch.cuda.empty_cache() | |
continue | |
model_status = "⚠️ Using fallback mode" | |
return model_status | |
def analyze_threat(threat_description, analyst_level): | |
"""Simple threat analysis""" | |
if not threat_description.strip(): | |
return "Please enter a threat description first.", "" | |
start_time = time.time() | |
# Create simple prompt | |
prompt = f"""As a {analyst_level} cybersecurity analyst, analyze this threat: | |
THREAT: {threat_description} | |
Provide a {analyst_level} level security analysis including: | |
- Threat assessment | |
- Potential impact | |
- Recommended actions | |
ANALYSIS:""" | |
if pipe: | |
try: | |
result = pipe( | |
prompt, | |
max_new_tokens=300, | |
do_sample=True, | |
temperature=0.3, | |
top_p=0.9, | |
repetition_penalty=1.1 | |
) | |
analysis = result[0]['generated_text'][len(prompt):].strip() | |
if len(analysis) < 30: | |
analysis = get_simple_fallback(threat_description, analyst_level) | |
except Exception as e: | |
analysis = f"AI Error: {str(e)[:100]}\n\n{get_simple_fallback(threat_description, analyst_level)}" | |
else: | |
analysis = get_simple_fallback(threat_description, analyst_level) | |
processing_time = round(time.time() - start_time, 2) | |
status = f"✅ Analysis completed in {processing_time}s | {model_status}" | |
return analysis, status | |
def get_simple_fallback(threat_description, analyst_level): | |
"""Simple fallback analysis""" | |
if analyst_level == "L1": | |
return f"""🚨 L1 TRIAGE ANALYSIS | |
THREAT SUMMARY: | |
{threat_description} | |
IMMEDIATE ACTIONS: | |
• Assess severity and scope | |
• Document all available evidence | |
• Isolate affected systems if needed | |
• Escalate to L2 if severity is high | |
PRIORITY: Immediate containment and escalation decision required""" | |
elif analyst_level == "L2": | |
return f"""🔍 L2 INVESTIGATION ANALYSIS | |
THREAT DETAILS: | |
{threat_description} | |
INVESTIGATION STEPS: | |
1. Collect and preserve evidence | |
2. Analyze attack vectors and methods | |
3. Determine scope of compromise | |
4. Identify indicators of compromise (IOCs) | |
5. Assess potential data exposure | |
CONTAINMENT: | |
• Implement network segmentation | |
• Deploy additional monitoring | |
• Review authentication logs | |
• Check for lateral movement | |
NEXT STEPS: | |
• Continue monitoring for related activity | |
• Update security controls as needed | |
• Consider L3 escalation for complex threats""" | |
else: # L3 | |
return f"""🎯 L3 EXPERT ANALYSIS | |
STRATEGIC THREAT ASSESSMENT: | |
{threat_description} | |
ADVANCED ANALYSIS: | |
• Threat actor attribution assessment | |
• Campaign analysis and TTPs | |
• Business impact evaluation | |
• Risk quantification | |
STRATEGIC RESPONSE: | |
• Coordinate incident response team | |
• Executive briefing preparation | |
• Regulatory compliance review | |
• Long-term security posture improvements | |
RECOMMENDATIONS: | |
• Implement advanced threat hunting | |
• Enhance detection capabilities | |
• Review security architecture | |
• Consider external forensics support""" | |
# Create simple interface | |
with gr.Blocks(title="Simple SOC Analyzer", theme=gr.themes.Soft(), css=simple_css) as demo: | |
# Simple header | |
gr.Markdown(""" | |
# 🛡️ SOC Threat Analyzer | |
**Simple • Fast • Effective** | |
Enter any security threat and get instant AI analysis. | |
""") | |
# Model status | |
status_display = gr.Textbox( | |
value="🔄 Loading model...", | |
label="System Status", | |
interactive=False, | |
elem_classes=["status-box"] | |
) | |
# Main interface | |
with gr.Row(): | |
with gr.Column(scale=1): | |
# Threat input | |
threat_input = gr.Textbox( | |
label="🚨 Describe the Security Threat", | |
placeholder="Example: Suspicious PowerShell execution detected on user workstation with encoded commands...", | |
lines=5, | |
elem_classes=["threat-input"] | |
) | |
# Analysis level | |
analyst_level = gr.Radio( | |
choices=["L1", "L2", "L3"], | |
value="L2", | |
label="Analysis Level", | |
info="L1: Quick Triage • L2: Detailed Investigation • L3: Strategic Analysis" | |
) | |
# Analyze button | |
analyze_btn = gr.Button( | |
"🔍 Analyze Threat", | |
variant="primary", | |
size="lg", | |
elem_classes=["analyze-btn"] | |
) | |
# Quick examples | |
gr.Markdown(""" | |
### 📝 Quick Examples: | |
- Suspicious email with malicious attachment | |
- Unusual network traffic to external IP | |
- User account showing signs of compromise | |
- Ransomware indicators detected on server | |
- Failed login attempts from multiple locations | |
""") | |
with gr.Column(scale=2): | |
# Analysis output | |
analysis_output = gr.Textbox( | |
label="🤖 Security Analysis", | |
lines=20, | |
interactive=False, | |
elem_classes=["analysis-output"], | |
placeholder="Analysis will appear here..." | |
) | |
# Processing status | |
process_status = gr.Textbox( | |
label="Processing Status", | |
interactive=False, | |
lines=1 | |
) | |
# Quick action buttons | |
with gr.Row(): | |
gr.Button("💾 Save Analysis", variant="secondary", size="sm") | |
gr.Button("📧 Email Report", variant="secondary", size="sm") | |
gr.Button("🔄 Clear All", variant="secondary", size="sm") | |
# Simple footer | |
gr.Markdown(""" | |
--- | |
**💡 Tips:** Be specific about what you observed, include timestamps, IP addresses, user accounts, or file names when available. | |
""") | |
# Event handlers | |
analyze_btn.click( | |
fn=analyze_threat, | |
inputs=[threat_input, analyst_level], | |
outputs=[analysis_output, process_status] | |
) | |
# Initialize model on startup | |
demo.load( | |
fn=load_model, | |
outputs=[status_display] | |
) | |
if __name__ == "__main__": | |
demo.launch(share=True) |