cyber_llm / src /startup /persistent_cognitive_startup.py
unit731's picture
Upload core Cyber-LLM platform components
23804b3 verified
"""
Persistent Cognitive System Configuration and Startup
Central configuration and initialization for the complete persistent cognitive architecture
Author: Cyber-LLM Development Team
Date: August 6, 2025
Version: 2.0.0
"""
import asyncio
import json
import logging
import os
import sys
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field, asdict
import yaml
# Configuration imports
from cognitive.persistent_reasoning_system import PersistentCognitiveSystem
from server.persistent_agent_server import PersistentAgentServer, create_server_config
from integration.persistent_multi_agent_integration import (
PersistentMultiAgentSystem, create_persistent_multi_agent_system
)
@dataclass
class DatabaseConfiguration:
"""Database configuration settings"""
cognitive_db_path: str = "data/cognitive_system.db"
server_db_path: str = "data/server_agent_system.db"
backup_enabled: bool = True
backup_interval_hours: int = 24
backup_retention_days: int = 30
auto_vacuum: bool = True
wal_mode: bool = True
sync_mode: str = "NORMAL" # OFF, NORMAL, FULL
@dataclass
class ServerConfiguration:
"""Server configuration settings"""
enabled: bool = True
host: str = "0.0.0.0"
port: int = 8080
ssl_enabled: bool = False
ssl_cert_path: Optional[str] = None
ssl_key_path: Optional[str] = None
max_connections: int = 1000
session_timeout_hours: int = 24
memory_backup_interval_hours: int = 1
distributed_mode: bool = False
cluster_nodes: List[str] = field(default_factory=list)
@dataclass
class CognitiveConfiguration:
"""Cognitive system configuration"""
memory_consolidation_enabled: bool = True
memory_consolidation_interval_hours: int = 6
memory_decay_enabled: bool = True
memory_decay_rate: float = 0.1
working_memory_capacity: int = 20
reasoning_chain_timeout_minutes: int = 60
strategic_planning_enabled: bool = True
meta_cognitive_enabled: bool = True
background_processing_enabled: bool = True
inter_agent_coordination: bool = True
@dataclass
class LoggingConfiguration:
"""Logging configuration settings"""
level: str = "INFO"
format: str = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
file_enabled: bool = True
file_path: str = "logs/persistent_cognitive_system.log"
file_max_size_mb: int = 100
file_backup_count: int = 5
console_enabled: bool = True
structured_logging: bool = False
@dataclass
class SecurityConfiguration:
"""Security configuration settings"""
authentication_enabled: bool = False
api_key_required: bool = False
rate_limiting_enabled: bool = True
rate_limit_per_minute: int = 100
encryption_enabled: bool = False
audit_logging: bool = True
secure_memory_erasure: bool = True
safety_agent_required: bool = True
@dataclass
class PerformanceConfiguration:
"""Performance configuration settings"""
max_worker_threads: int = 8
memory_cache_size_mb: int = 512
query_timeout_seconds: int = 30
batch_processing_enabled: bool = True
batch_size: int = 100
connection_pool_size: int = 20
async_processing: bool = True
optimization_level: str = "balanced" # conservative, balanced, aggressive
@dataclass
class PersistentCognitiveConfiguration:
"""Complete configuration for persistent cognitive system"""
database: DatabaseConfiguration = field(default_factory=DatabaseConfiguration)
server: ServerConfiguration = field(default_factory=ServerConfiguration)
cognitive: CognitiveConfiguration = field(default_factory=CognitiveConfiguration)
logging: LoggingConfiguration = field(default_factory=LoggingConfiguration)
security: SecurityConfiguration = field(default_factory=SecurityConfiguration)
performance: PerformanceConfiguration = field(default_factory=PerformanceConfiguration)
system_name: str = "Persistent Cognitive Multi-Agent System"
version: str = "2.0.0"
environment: str = "development" # development, staging, production
debug_mode: bool = False
def to_dict(self) -> Dict[str, Any]:
"""Convert configuration to dictionary"""
return asdict(self)
def save_to_file(self, filepath: str):
"""Save configuration to YAML file"""
Path(filepath).parent.mkdir(parents=True, exist_ok=True)
with open(filepath, 'w') as f:
yaml.dump(self.to_dict(), f, default_flow_style=False)
@classmethod
def load_from_file(cls, filepath: str) -> 'PersistentCognitiveConfiguration':
"""Load configuration from YAML file"""
with open(filepath, 'r') as f:
config_dict = yaml.safe_load(f)
return cls.from_dict(config_dict)
@classmethod
def from_dict(cls, config_dict: Dict[str, Any]) -> 'PersistentCognitiveConfiguration':
"""Create configuration from dictionary"""
# Extract nested configurations
database_config = DatabaseConfiguration(**config_dict.get('database', {}))
server_config = ServerConfiguration(**config_dict.get('server', {}))
cognitive_config = CognitiveConfiguration(**config_dict.get('cognitive', {}))
logging_config = LoggingConfiguration(**config_dict.get('logging', {}))
security_config = SecurityConfiguration(**config_dict.get('security', {}))
performance_config = PerformanceConfiguration(**config_dict.get('performance', {}))
# Create main configuration
return cls(
database=database_config,
server=server_config,
cognitive=cognitive_config,
logging=logging_config,
security=security_config,
performance=performance_config,
system_name=config_dict.get('system_name', 'Persistent Cognitive Multi-Agent System'),
version=config_dict.get('version', '2.0.0'),
environment=config_dict.get('environment', 'development'),
debug_mode=config_dict.get('debug_mode', False)
)
class PersistentCognitiveSystemManager:
"""
Manager class for the complete persistent cognitive system
Handles configuration, initialization, startup, and shutdown
"""
def __init__(self, config: Optional[PersistentCognitiveConfiguration] = None):
self.config = config or PersistentCognitiveConfiguration()
self.logger = None
# System components
self.cognitive_system: Optional[PersistentCognitiveSystem] = None
self.agent_server: Optional[PersistentAgentServer] = None
self.multi_agent_system: Optional[PersistentMultiAgentSystem] = None
# System state
self.is_initialized = False
self.is_running = False
self.startup_time: Optional[datetime] = None
self.background_tasks: List[asyncio.Task] = []
# Initialize logging first
self._setup_logging()
self.logger.info(f"Created {self.config.system_name} v{self.config.version}")
def _setup_logging(self):
"""Setup logging based on configuration"""
# Create logs directory
if self.config.logging.file_enabled:
Path(self.config.logging.file_path).parent.mkdir(parents=True, exist_ok=True)
# Configure root logger
root_logger = logging.getLogger()
root_logger.setLevel(getattr(logging, self.config.logging.level.upper()))
# Clear existing handlers
root_logger.handlers.clear()
# Create formatter
if self.config.logging.structured_logging:
formatter = logging.Formatter(
'{"timestamp":"%(asctime)s","name":"%(name)s","level":"%(levelname)s","message":"%(message)s"}'
)
else:
formatter = logging.Formatter(self.config.logging.format)
# Console handler
if self.config.logging.console_enabled:
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(formatter)
root_logger.addHandler(console_handler)
# File handler
if self.config.logging.file_enabled:
from logging.handlers import RotatingFileHandler
file_handler = RotatingFileHandler(
self.config.logging.file_path,
maxBytes=self.config.logging.file_max_size_mb * 1024 * 1024,
backupCount=self.config.logging.file_backup_count
)
file_handler.setFormatter(formatter)
root_logger.addHandler(file_handler)
# Get logger for this class
self.logger = logging.getLogger("persistent_cognitive_manager")
self.logger.info("Logging configured")
def _setup_directories(self):
"""Setup required directories"""
directories = [
Path(self.config.database.cognitive_db_path).parent,
Path(self.config.database.server_db_path).parent,
"data/backups",
"data/exports",
"data/imports",
"logs",
"temp"
]
for directory in directories:
Path(directory).mkdir(parents=True, exist_ok=True)
self.logger.debug("Directories setup complete")
def _validate_configuration(self):
"""Validate configuration settings"""
issues = []
# Database validation
if not self.config.database.cognitive_db_path:
issues.append("Cognitive database path is required")
if not self.config.database.server_db_path:
issues.append("Server database path is required")
# Server validation
if self.config.server.enabled:
if self.config.server.port < 1 or self.config.server.port > 65535:
issues.append("Server port must be between 1 and 65535")
if self.config.server.ssl_enabled:
if not self.config.server.ssl_cert_path:
issues.append("SSL certificate path required when SSL is enabled")
if not self.config.server.ssl_key_path:
issues.append("SSL key path required when SSL is enabled")
# Performance validation
if self.config.performance.max_worker_threads < 1:
issues.append("Max worker threads must be at least 1")
if self.config.performance.memory_cache_size_mb < 64:
issues.append("Memory cache size should be at least 64MB")
if issues:
raise ValueError(f"Configuration validation failed: {'; '.join(issues)}")
self.logger.info("Configuration validation passed")
async def initialize(self):
"""Initialize all system components"""
if self.is_initialized:
self.logger.warning("System already initialized")
return
try:
self.logger.info("Initializing persistent cognitive system...")
# Validate configuration
self._validate_configuration()
# Setup directories
self._setup_directories()
# Initialize cognitive system
await self._initialize_cognitive_system()
# Initialize agent server if enabled
if self.config.server.enabled:
await self._initialize_agent_server()
# Initialize multi-agent system
await self._initialize_multi_agent_system()
self.is_initialized = True
self.logger.info("System initialization complete")
except Exception as e:
self.logger.error(f"System initialization failed: {e}")
raise
async def _initialize_cognitive_system(self):
"""Initialize the persistent cognitive system"""
try:
self.cognitive_system = PersistentCognitiveSystem(
self.config.database.cognitive_db_path
)
# Configure system parameters
if hasattr(self.cognitive_system, 'memory_manager'):
self.cognitive_system.memory_manager.working_memory_capacity = (
self.config.cognitive.working_memory_capacity
)
await self.cognitive_system.initialize()
self.logger.info("Cognitive system initialized")
except Exception as e:
self.logger.error(f"Failed to initialize cognitive system: {e}")
raise
async def _initialize_agent_server(self):
"""Initialize the agent server"""
try:
server_config = create_server_config(
host=self.config.server.host,
port=self.config.server.port,
ssl_cert=self.config.server.ssl_cert_path,
ssl_key=self.config.server.ssl_key_path,
max_connections=self.config.server.max_connections,
session_timeout=self.config.server.session_timeout_hours * 3600,
distributed_mode=self.config.server.distributed_mode
)
self.agent_server = PersistentAgentServer(
server_config,
self.config.database.server_db_path
)
self.logger.info("Agent server initialized")
except Exception as e:
self.logger.error(f"Failed to initialize agent server: {e}")
raise
async def _initialize_multi_agent_system(self):
"""Initialize the multi-agent system"""
try:
server_config = None
if self.config.server.enabled:
server_config = create_server_config(
host=self.config.server.host,
port=self.config.server.port + 1, # Use next port for multi-agent server
max_connections=self.config.server.max_connections
)
self.multi_agent_system = PersistentMultiAgentSystem(
cognitive_db_path=self.config.database.cognitive_db_path,
server_config=server_config
)
await self.multi_agent_system.initialize_system()
self.logger.info("Multi-agent system initialized")
except Exception as e:
self.logger.error(f"Failed to initialize multi-agent system: {e}")
raise
async def start(self):
"""Start the system"""
if not self.is_initialized:
await self.initialize()
if self.is_running:
self.logger.warning("System already running")
return
try:
self.logger.info("Starting persistent cognitive system...")
self.startup_time = datetime.now()
# Start background tasks
await self._start_background_tasks()
# Start agent server
if self.agent_server:
server_task = asyncio.create_task(self.agent_server.start_server())
self.background_tasks.append(server_task)
self.is_running = True
self.logger.info(f"System started successfully on {self.startup_time}")
# Print startup information
await self._print_startup_info()
except Exception as e:
self.logger.error(f"System startup failed: {e}")
await self.shutdown()
raise
async def _start_background_tasks(self):
"""Start background maintenance tasks"""
# Database backup task
if self.config.database.backup_enabled:
backup_task = asyncio.create_task(self._backup_worker())
self.background_tasks.append(backup_task)
# System monitoring task
monitor_task = asyncio.create_task(self._monitoring_worker())
self.background_tasks.append(monitor_task)
# Performance optimization task
if self.config.performance.optimization_level != "conservative":
optimization_task = asyncio.create_task(self._optimization_worker())
self.background_tasks.append(optimization_task)
self.logger.info(f"Started {len(self.background_tasks)} background tasks")
async def _backup_worker(self):
"""Background database backup worker"""
interval = self.config.database.backup_interval_hours * 3600
while self.is_running:
try:
await asyncio.sleep(interval)
await self._create_system_backup()
except asyncio.CancelledError:
break
except Exception as e:
self.logger.error(f"Backup worker error: {e}")
async def _monitoring_worker(self):
"""Background system monitoring worker"""
while self.is_running:
try:
await asyncio.sleep(300) # Every 5 minutes
await self._log_system_metrics()
except asyncio.CancelledError:
break
except Exception as e:
self.logger.error(f"Monitoring worker error: {e}")
async def _optimization_worker(self):
"""Background performance optimization worker"""
while self.is_running:
try:
await asyncio.sleep(1800) # Every 30 minutes
await self._optimize_system_performance()
except asyncio.CancelledError:
break
except Exception as e:
self.logger.error(f"Optimization worker error: {e}")
async def _create_system_backup(self):
"""Create a complete system backup"""
try:
backup_dir = Path("data/backups") / datetime.now().strftime("%Y%m%d_%H%M%S")
backup_dir.mkdir(parents=True, exist_ok=True)
# Backup databases
if self.cognitive_system:
cognitive_backup = backup_dir / "cognitive_system.db"
# Simple file copy for SQLite
import shutil
shutil.copy2(self.config.database.cognitive_db_path, cognitive_backup)
if self.agent_server:
server_backup = backup_dir / "server_system.db"
import shutil
shutil.copy2(self.config.database.server_db_path, server_backup)
# Backup configuration
config_backup = backup_dir / "system_config.yaml"
self.config.save_to_file(str(config_backup))
self.logger.info(f"System backup created: {backup_dir}")
# Cleanup old backups
await self._cleanup_old_backups()
except Exception as e:
self.logger.error(f"Backup creation failed: {e}")
async def _cleanup_old_backups(self):
"""Clean up old backup files"""
try:
backup_base = Path("data/backups")
if not backup_base.exists():
return
retention_days = self.config.database.backup_retention_days
cutoff_time = datetime.now() - timedelta(days=retention_days)
for backup_dir in backup_base.iterdir():
if backup_dir.is_dir():
dir_time = datetime.fromtimestamp(backup_dir.stat().st_mtime)
if dir_time < cutoff_time:
import shutil
shutil.rmtree(backup_dir)
self.logger.debug(f"Removed old backup: {backup_dir}")
except Exception as e:
self.logger.error(f"Backup cleanup failed: {e}")
async def _log_system_metrics(self):
"""Log system performance metrics"""
try:
metrics = await self.get_system_metrics()
self.logger.info(f"System Metrics - "
f"Uptime: {metrics.get('uptime_hours', 0):.1f}h, "
f"Memory: {metrics.get('memory_usage_mb', 0):.1f}MB, "
f"Active Sessions: {metrics.get('active_sessions', 0)}")
except Exception as e:
self.logger.error(f"Metrics logging failed: {e}")
async def _optimize_system_performance(self):
"""Optimize system performance"""
try:
# Database optimization
if self.cognitive_system:
# Vacuum databases periodically
if self.config.database.auto_vacuum:
# This would be implemented in the cognitive system
pass
# Memory optimization
if self.multi_agent_system:
# Trigger memory consolidation
await self.multi_agent_system.cognitive_system.memory_manager.consolidate_memories()
self.logger.debug("System optimization completed")
except Exception as e:
self.logger.error(f"System optimization failed: {e}")
async def _print_startup_info(self):
"""Print system startup information"""
info_lines = [
f"",
f"{'='*60}",
f"{self.config.system_name} v{self.config.version}",
f"{'='*60}",
f"Environment: {self.config.environment}",
f"Started: {self.startup_time}",
f"",
f"Components:",
]
if self.cognitive_system:
info_lines.append(f" ✓ Persistent Cognitive System")
if self.agent_server:
info_lines.append(f" ✓ Agent Server (http://{self.config.server.host}:{self.config.server.port})")
if self.multi_agent_system:
info_lines.append(f" ✓ Multi-Agent System")
info_lines.extend([
f"",
f"Database:",
f" • Cognitive DB: {self.config.database.cognitive_db_path}",
f" • Server DB: {self.config.database.server_db_path}",
f"",
f"Background Tasks: {len(self.background_tasks)}",
f"",
f"System ready for cognitive operations!",
f"{'='*60}",
f""
])
for line in info_lines:
print(line)
async def get_system_metrics(self) -> Dict[str, Any]:
"""Get comprehensive system metrics"""
metrics = {
"system_name": self.config.system_name,
"version": self.config.version,
"environment": self.config.environment,
"is_running": self.is_running,
"startup_time": self.startup_time.isoformat() if self.startup_time else None,
}
if self.startup_time:
uptime = datetime.now() - self.startup_time
metrics["uptime_hours"] = uptime.total_seconds() / 3600
# Memory metrics
import psutil
process = psutil.Process()
metrics["memory_usage_mb"] = process.memory_info().rss / 1024 / 1024
metrics["cpu_percent"] = process.cpu_percent()
# Component metrics
if self.multi_agent_system:
system_status = await self.multi_agent_system.get_system_status()
metrics.update(system_status)
# Background task metrics
metrics["background_tasks"] = len(self.background_tasks)
metrics["background_tasks_running"] = sum(1 for task in self.background_tasks if not task.done())
return metrics
async def run_scenario(self, scenario: Dict[str, Any]) -> Dict[str, Any]:
"""Run a scenario through the system"""
if not self.is_running:
raise RuntimeError("System is not running")
if self.multi_agent_system:
return await self.multi_agent_system.run_cognitive_scenario(scenario)
else:
raise RuntimeError("Multi-agent system not available")
async def shutdown(self):
"""Graceful system shutdown"""
if not self.is_running:
return
try:
self.logger.info("Shutting down persistent cognitive system...")
# Stop accepting new work
self.is_running = False
# Cancel background tasks
for task in self.background_tasks:
task.cancel()
# Wait for tasks to complete
if self.background_tasks:
await asyncio.gather(*self.background_tasks, return_exceptions=True)
# Shutdown components
if self.multi_agent_system:
await self.multi_agent_system.shutdown()
if self.agent_server:
await self.agent_server.shutdown()
# Final backup
if self.config.database.backup_enabled:
await self._create_system_backup()
uptime = datetime.now() - self.startup_time if self.startup_time else timedelta(0)
self.logger.info(f"System shutdown complete. Uptime: {uptime}")
except Exception as e:
self.logger.error(f"Error during shutdown: {e}")
# Configuration templates
def create_development_config() -> PersistentCognitiveConfiguration:
"""Create development configuration"""
config = PersistentCognitiveConfiguration()
config.environment = "development"
config.debug_mode = True
config.logging.level = "DEBUG"
config.database.backup_interval_hours = 1
config.server.port = 8080
return config
def create_production_config() -> PersistentCognitiveConfiguration:
"""Create production configuration"""
config = PersistentCognitiveConfiguration()
config.environment = "production"
config.debug_mode = False
config.logging.level = "INFO"
config.logging.structured_logging = True
config.database.backup_interval_hours = 24
config.database.backup_retention_days = 90
config.server.port = 443
config.server.ssl_enabled = True
config.security.authentication_enabled = True
config.security.rate_limiting_enabled = True
config.performance.optimization_level = "aggressive"
return config
# CLI interface
async def main():
"""Main entry point for the system"""
import argparse
parser = argparse.ArgumentParser(description="Persistent Cognitive Multi-Agent System")
parser.add_argument("--config", help="Configuration file path")
parser.add_argument("--environment", choices=["development", "staging", "production"],
default="development", help="Environment type")
parser.add_argument("--port", type=int, default=8080, help="Server port")
parser.add_argument("--host", default="0.0.0.0", help="Server host")
parser.add_argument("--debug", action="store_true", help="Enable debug mode")
args = parser.parse_args()
# Load configuration
if args.config:
config = PersistentCognitiveConfiguration.load_from_file(args.config)
elif args.environment == "production":
config = create_production_config()
else:
config = create_development_config()
# Override with command line arguments
if args.port:
config.server.port = args.port
if args.host:
config.server.host = args.host
if args.debug:
config.debug_mode = True
config.logging.level = "DEBUG"
config.environment = args.environment
# Create and start system
system_manager = PersistentCognitiveSystemManager(config)
try:
await system_manager.start()
# Keep system running
while system_manager.is_running:
await asyncio.sleep(1)
except KeyboardInterrupt:
print("\nShutdown requested by user")
except Exception as e:
print(f"System error: {e}")
finally:
await system_manager.shutdown()
if __name__ == "__main__":
asyncio.run(main())