|
""" |
|
Simplified Neural-Symbolic AI for Hugging Face Space |
|
Based on src/learning/neurosymbolic_ai.py |
|
""" |
|
|
|
import numpy as np |
|
import json |
|
from datetime import datetime |
|
from typing import Dict, List, Any, Optional |
|
import logging |
|
|
|
class SimplifiedNeuroSymbolicAI: |
|
"""Simplified neural-symbolic AI for cybersecurity analysis in HF Space""" |
|
|
|
def __init__(self): |
|
self.logger = logging.getLogger(__name__) |
|
|
|
|
|
self.security_rules = { |
|
"malware_indicators": [ |
|
"suspicious_process_execution", |
|
"network_communication_anomaly", |
|
"file_modification_pattern", |
|
"registry_manipulation" |
|
], |
|
"network_threats": [ |
|
"port_scanning", |
|
"brute_force_attack", |
|
"ddos_pattern", |
|
"lateral_movement" |
|
], |
|
"data_exfiltration": [ |
|
"large_data_transfer", |
|
"encrypted_communication", |
|
"unusual_access_pattern", |
|
"external_connection" |
|
] |
|
} |
|
|
|
|
|
self.threat_severity = { |
|
"critical": {"score": 0.9, "action": "immediate_response"}, |
|
"high": {"score": 0.7, "action": "urgent_investigation"}, |
|
"medium": {"score": 0.5, "action": "monitor_closely"}, |
|
"low": {"score": 0.3, "action": "routine_check"} |
|
} |
|
|
|
def analyze_threat_neural_symbolic(self, threat_data: str, |
|
context: Optional[Dict] = None) -> Dict[str, Any]: |
|
"""Perform neural-symbolic threat analysis""" |
|
|
|
analysis_id = f"ns_analysis_{datetime.now().strftime('%Y%m%d_%H%M%S')}" |
|
|
|
|
|
neural_features = self._extract_neural_features(threat_data) |
|
|
|
|
|
symbolic_analysis = self._symbolic_reasoning(threat_data, neural_features) |
|
|
|
|
|
integrated_result = self._integrate_analysis(neural_features, symbolic_analysis) |
|
|
|
return { |
|
"analysis_id": analysis_id, |
|
"timestamp": datetime.now().isoformat(), |
|
"threat_data": threat_data, |
|
"neural_analysis": { |
|
"feature_extraction": neural_features, |
|
"confidence": neural_features.get("confidence", 0.8) |
|
}, |
|
"symbolic_analysis": symbolic_analysis, |
|
"integrated_result": integrated_result, |
|
"recommendations": self._generate_recommendations(integrated_result) |
|
} |
|
|
|
def _extract_neural_features(self, threat_data: str) -> Dict[str, Any]: |
|
"""Extract neural features from threat data""" |
|
|
|
|
|
features = { |
|
"anomaly_score": min(0.9, len(threat_data) / 100.0 + 0.3), |
|
"semantic_features": [], |
|
"behavioral_patterns": [], |
|
"confidence": 0.8 |
|
} |
|
|
|
|
|
threat_lower = threat_data.lower() |
|
|
|
if any(term in threat_lower for term in ["malware", "virus", "trojan", "backdoor"]): |
|
features["semantic_features"].append("malware_related") |
|
features["anomaly_score"] += 0.2 |
|
|
|
if any(term in threat_lower for term in ["network", "scan", "port", "connection"]): |
|
features["semantic_features"].append("network_activity") |
|
features["anomaly_score"] += 0.1 |
|
|
|
if any(term in threat_lower for term in ["data", "exfiltration", "transfer", "leak"]): |
|
features["semantic_features"].append("data_movement") |
|
features["anomaly_score"] += 0.3 |
|
|
|
|
|
if "suspicious" in threat_lower: |
|
features["behavioral_patterns"].append("suspicious_behavior") |
|
if "anomal" in threat_lower: |
|
features["behavioral_patterns"].append("anomalous_activity") |
|
if "attack" in threat_lower: |
|
features["behavioral_patterns"].append("attack_pattern") |
|
|
|
features["anomaly_score"] = min(0.95, features["anomaly_score"]) |
|
|
|
return features |
|
|
|
def _symbolic_reasoning(self, threat_data: str, neural_features: Dict) -> Dict[str, Any]: |
|
"""Apply symbolic reasoning rules""" |
|
|
|
conclusions = [] |
|
applied_rules = [] |
|
confidence_scores = [] |
|
|
|
threat_lower = threat_data.lower() |
|
|
|
|
|
if any(indicator in neural_features.get("semantic_features", []) for indicator in ["malware_related"]): |
|
conclusions.append({ |
|
"rule": "malware_detection_rule", |
|
"conclusion": "Potential malware activity detected", |
|
"confidence": 0.85, |
|
"evidence": neural_features["semantic_features"] |
|
}) |
|
applied_rules.append("malware_detection_rule") |
|
confidence_scores.append(0.85) |
|
|
|
|
|
if "network_activity" in neural_features.get("semantic_features", []): |
|
network_confidence = 0.7 |
|
if any(term in threat_lower for term in ["scan", "brute", "ddos"]): |
|
network_confidence = 0.9 |
|
|
|
conclusions.append({ |
|
"rule": "network_threat_rule", |
|
"conclusion": "Network-based threat activity identified", |
|
"confidence": network_confidence, |
|
"evidence": ["network_activity_patterns"] |
|
}) |
|
applied_rules.append("network_threat_rule") |
|
confidence_scores.append(network_confidence) |
|
|
|
|
|
if "data_movement" in neural_features.get("semantic_features", []): |
|
conclusions.append({ |
|
"rule": "data_exfiltration_rule", |
|
"conclusion": "Potential data exfiltration attempt detected", |
|
"confidence": 0.8, |
|
"evidence": ["unusual_data_transfer_patterns"] |
|
}) |
|
applied_rules.append("data_exfiltration_rule") |
|
confidence_scores.append(0.8) |
|
|
|
|
|
if neural_features["anomaly_score"] > 0.7: |
|
conclusions.append({ |
|
"rule": "behavioral_anomaly_rule", |
|
"conclusion": "High behavioral anomaly detected", |
|
"confidence": neural_features["anomaly_score"], |
|
"evidence": neural_features["behavioral_patterns"] |
|
}) |
|
applied_rules.append("behavioral_anomaly_rule") |
|
confidence_scores.append(neural_features["anomaly_score"]) |
|
|
|
return { |
|
"conclusions": conclusions, |
|
"applied_rules": applied_rules, |
|
"overall_confidence": np.mean(confidence_scores) if confidence_scores else 0.5, |
|
"reasoning_steps": len(conclusions) |
|
} |
|
|
|
def _integrate_analysis(self, neural_features: Dict, symbolic_analysis: Dict) -> Dict[str, Any]: |
|
"""Integrate neural and symbolic analysis results""" |
|
|
|
|
|
neural_score = neural_features["anomaly_score"] |
|
symbolic_score = symbolic_analysis["overall_confidence"] |
|
|
|
integrated_score = (neural_score + symbolic_score) / 2 |
|
|
|
|
|
if integrated_score >= 0.8: |
|
threat_level = "CRITICAL" |
|
severity = "critical" |
|
elif integrated_score >= 0.6: |
|
threat_level = "HIGH" |
|
severity = "high" |
|
elif integrated_score >= 0.4: |
|
threat_level = "MEDIUM" |
|
severity = "medium" |
|
else: |
|
threat_level = "LOW" |
|
severity = "low" |
|
|
|
return { |
|
"threat_level": threat_level, |
|
"severity": severity, |
|
"integrated_score": round(integrated_score, 3), |
|
"neural_contribution": round(neural_score, 3), |
|
"symbolic_contribution": round(symbolic_score, 3), |
|
"confidence": min(0.95, integrated_score), |
|
"explanation": self._generate_explanation(neural_features, symbolic_analysis, threat_level) |
|
} |
|
|
|
def _generate_explanation(self, neural_features: Dict, symbolic_analysis: Dict, threat_level: str) -> str: |
|
"""Generate human-readable explanation""" |
|
|
|
explanation_parts = [ |
|
f"π Analysis indicates {threat_level} threat level based on:", |
|
"", |
|
"π§ Neural Analysis:", |
|
f" β’ Anomaly Score: {neural_features['anomaly_score']:.2f}", |
|
f" β’ Detected Features: {', '.join(neural_features.get('semantic_features', ['none']))}", |
|
f" β’ Behavioral Patterns: {', '.join(neural_features.get('behavioral_patterns', ['none']))}", |
|
"", |
|
"π Symbolic Reasoning:", |
|
f" β’ Rules Applied: {len(symbolic_analysis['applied_rules'])}", |
|
f" β’ Conclusions: {len(symbolic_analysis['conclusions'])}", |
|
f" β’ Confidence: {symbolic_analysis['overall_confidence']:.2f}", |
|
] |
|
|
|
if symbolic_analysis["conclusions"]: |
|
explanation_parts.append(" β’ Key Findings:") |
|
for conclusion in symbolic_analysis["conclusions"][:3]: |
|
explanation_parts.append(f" - {conclusion['conclusion']} (confidence: {conclusion['confidence']:.2f})") |
|
|
|
return "\n".join(explanation_parts) |
|
|
|
def _generate_recommendations(self, integrated_result: Dict) -> List[str]: |
|
"""Generate actionable security recommendations""" |
|
|
|
severity = integrated_result["severity"] |
|
threat_level = integrated_result["threat_level"] |
|
|
|
recommendations = [] |
|
|
|
|
|
severity_info = self.threat_severity.get(severity, self.threat_severity["medium"]) |
|
|
|
if severity == "critical": |
|
recommendations.extend([ |
|
"π¨ IMMEDIATE ACTION REQUIRED", |
|
"β’ Initiate incident response procedures", |
|
"β’ Isolate affected systems immediately", |
|
"β’ Contact security team and management", |
|
"β’ Begin forensic data collection" |
|
]) |
|
elif severity == "high": |
|
recommendations.extend([ |
|
"β οΈ URGENT INVESTIGATION NEEDED", |
|
"β’ Deploy additional monitoring on affected systems", |
|
"β’ Implement network segmentation if possible", |
|
"β’ Escalate to security analysts", |
|
"β’ Review related security logs" |
|
]) |
|
elif severity == "medium": |
|
recommendations.extend([ |
|
"π CLOSE MONITORING RECOMMENDED", |
|
"β’ Increase logging and monitoring", |
|
"β’ Schedule security review within 24 hours", |
|
"β’ Implement additional access controls", |
|
"β’ Update threat intelligence feeds" |
|
]) |
|
else: |
|
recommendations.extend([ |
|
"β
ROUTINE SECURITY MEASURES", |
|
"β’ Continue normal monitoring", |
|
"β’ Document findings for future reference", |
|
"β’ Regular security updates recommended" |
|
]) |
|
|
|
|
|
recommendations.append("\nπ‘οΈ SPECIFIC SECURITY MEASURES:") |
|
recommendations.extend([ |
|
"β’ Update antivirus and security signatures", |
|
"β’ Review network access controls", |
|
"β’ Validate backup and recovery procedures", |
|
"β’ Consider threat hunting activities" |
|
]) |
|
|
|
return recommendations |
|
|
|
|
|
neuro_symbolic_ai = SimplifiedNeuroSymbolicAI() |
|
|