DigitalPal / digipal /core /memory_manager.py
BladeSzaSza's picture
🥚 Initial DigiPal deployment to HuggingFace Spaces🤖 Generated with [Claude Code](https://claude.ai/code)Co-Authored-By: Claude <[email protected]>
4399e64
raw
history blame
31.3 kB
"""
Enhanced memory management system for DigiPal with emotional values and RAG capabilities.
This module provides comprehensive memory management including:
- Memory caching for frequently accessed pet data
- Emotional memory system with happiness/stress values
- Simple RAG implementation for relevant memory retrieval
- Memory cleanup and optimization
"""
import logging
import json
import time
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any, Tuple, Set
from dataclasses import dataclass, field
from collections import defaultdict, OrderedDict
import threading
import weakref
import gc
from .models import DigiPal, Interaction
from .enums import LifeStage, InteractionResult
from ..storage.storage_manager import StorageManager
logger = logging.getLogger(__name__)
@dataclass
class EmotionalMemory:
"""Represents a memory with emotional context and metadata."""
id: str
timestamp: datetime
content: str
memory_type: str # 'interaction', 'action', 'event', 'detail'
emotional_value: float # -1.0 (very stressful) to 1.0 (very happy)
importance: float # 0.0 to 1.0, affects retention
tags: Set[str] = field(default_factory=set)
related_attributes: Dict[str, int] = field(default_factory=dict)
access_count: int = 0
last_accessed: datetime = field(default_factory=datetime.now)
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
'id': self.id,
'timestamp': self.timestamp.isoformat(),
'content': self.content,
'memory_type': self.memory_type,
'emotional_value': self.emotional_value,
'importance': self.importance,
'tags': list(self.tags),
'related_attributes': self.related_attributes,
'access_count': self.access_count,
'last_accessed': self.last_accessed.isoformat()
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'EmotionalMemory':
"""Create from dictionary."""
data['timestamp'] = datetime.fromisoformat(data['timestamp'])
data['last_accessed'] = datetime.fromisoformat(data['last_accessed'])
data['tags'] = set(data.get('tags', []))
return cls(**data)
class MemoryCache:
"""LRU cache for frequently accessed pet data with automatic cleanup."""
def __init__(self, max_size: int = 1000, ttl_seconds: int = 3600):
"""
Initialize memory cache.
Args:
max_size: Maximum number of items to cache
ttl_seconds: Time-to-live for cached items in seconds
"""
self.max_size = max_size
self.ttl_seconds = ttl_seconds
self._cache: OrderedDict = OrderedDict()
self._timestamps: Dict[str, float] = {}
self._lock = threading.RLock()
# Start cleanup thread
self._cleanup_thread = threading.Thread(target=self._cleanup_loop, daemon=True)
self._stop_cleanup = False
self._cleanup_thread.start()
def get(self, key: str) -> Optional[Any]:
"""Get item from cache."""
with self._lock:
current_time = time.time()
# Check if item exists and is not expired
if key in self._cache:
if current_time - self._timestamps[key] < self.ttl_seconds:
# Move to end (most recently used)
self._cache.move_to_end(key)
return self._cache[key]
else:
# Item expired, remove it
del self._cache[key]
del self._timestamps[key]
return None
def put(self, key: str, value: Any) -> None:
"""Put item in cache."""
with self._lock:
current_time = time.time()
# If key exists, update it
if key in self._cache:
self._cache[key] = value
self._timestamps[key] = current_time
self._cache.move_to_end(key)
return
# If cache is full, remove least recently used item
if len(self._cache) >= self.max_size:
oldest_key = next(iter(self._cache))
del self._cache[oldest_key]
del self._timestamps[oldest_key]
# Add new item
self._cache[key] = value
self._timestamps[key] = current_time
def remove(self, key: str) -> bool:
"""Remove item from cache."""
with self._lock:
if key in self._cache:
del self._cache[key]
del self._timestamps[key]
return True
return False
def clear(self) -> None:
"""Clear all cached items."""
with self._lock:
self._cache.clear()
self._timestamps.clear()
def size(self) -> int:
"""Get current cache size."""
with self._lock:
return len(self._cache)
def _cleanup_loop(self) -> None:
"""Background cleanup loop for expired items."""
while not self._stop_cleanup:
try:
current_time = time.time()
expired_keys = []
with self._lock:
for key, timestamp in self._timestamps.items():
if current_time - timestamp >= self.ttl_seconds:
expired_keys.append(key)
for key in expired_keys:
if key in self._cache:
del self._cache[key]
if key in self._timestamps:
del self._timestamps[key]
if expired_keys:
logger.debug(f"Cleaned up {len(expired_keys)} expired cache items")
# Sleep for cleanup interval (1/4 of TTL)
time.sleep(max(60, self.ttl_seconds // 4))
except Exception as e:
logger.error(f"Error in cache cleanup loop: {e}")
time.sleep(60)
def shutdown(self) -> None:
"""Shutdown the cache and cleanup thread."""
self._stop_cleanup = True
if self._cleanup_thread.is_alive():
self._cleanup_thread.join(timeout=5)
class SimpleRAG:
"""Simple Retrieval-Augmented Generation for memory retrieval."""
def __init__(self, max_context_memories: int = 5):
"""
Initialize simple RAG system.
Args:
max_context_memories: Maximum memories to include in context
"""
self.max_context_memories = max_context_memories
def retrieve_relevant_memories(self, query: str, memories: List[EmotionalMemory],
current_context: Dict[str, Any]) -> List[EmotionalMemory]:
"""
Retrieve relevant memories for a given query using simple similarity.
Args:
query: User input or context query
memories: Available memories to search
current_context: Current pet state and context
Returns:
List of relevant memories sorted by relevance
"""
if not memories:
return []
query_lower = query.lower()
scored_memories = []
for memory in memories:
score = self._calculate_relevance_score(memory, query_lower, current_context)
if score > 0:
scored_memories.append((memory, score))
# Sort by score (descending) and take top memories
scored_memories.sort(key=lambda x: x[1], reverse=True)
return [memory for memory, score in scored_memories[:self.max_context_memories]]
def _calculate_relevance_score(self, memory: EmotionalMemory, query_lower: str,
context: Dict[str, Any]) -> float:
"""Calculate relevance score for a memory."""
score = 0.0
# Text similarity (simple keyword matching)
memory_content_lower = memory.content.lower()
query_words = set(query_lower.split())
memory_words = set(memory_content_lower.split())
# Keyword overlap
common_words = query_words.intersection(memory_words)
if common_words:
score += len(common_words) / len(query_words) * 0.4
# Tag matching
query_tags = self._extract_tags_from_query(query_lower)
tag_overlap = query_tags.intersection(memory.tags)
if tag_overlap:
score += len(tag_overlap) / max(len(query_tags), 1) * 0.3
# Recency boost (more recent memories are more relevant)
hours_ago = (datetime.now() - memory.timestamp).total_seconds() / 3600
recency_score = max(0, 1 - (hours_ago / 168)) # Decay over a week
score += recency_score * 0.2
# Importance boost
score += memory.importance * 0.1
# Emotional relevance (memories with strong emotions are more memorable)
emotional_strength = abs(memory.emotional_value)
score += emotional_strength * 0.1
# Access frequency (frequently accessed memories are more relevant)
access_boost = min(memory.access_count / 10, 1.0) * 0.1
score += access_boost
return score
def _extract_tags_from_query(self, query_lower: str) -> Set[str]:
"""Extract potential tags from query text."""
# Simple tag extraction based on common patterns
tags = set()
# Action-based tags
if any(word in query_lower for word in ['eat', 'food', 'hungry', 'feed']):
tags.add('eating')
if any(word in query_lower for word in ['sleep', 'rest', 'tired', 'nap']):
tags.add('sleeping')
if any(word in query_lower for word in ['train', 'exercise', 'workout']):
tags.add('training')
if any(word in query_lower for word in ['play', 'fun', 'game']):
tags.add('playing')
if any(word in query_lower for word in ['good', 'praise', 'well done']):
tags.add('praise')
if any(word in query_lower for word in ['bad', 'scold', 'no']):
tags.add('discipline')
# Emotional tags
if any(word in query_lower for word in ['happy', 'joy', 'excited']):
tags.add('positive')
if any(word in query_lower for word in ['sad', 'upset', 'angry']):
tags.add('negative')
return tags
class EnhancedMemoryManager:
"""Enhanced memory manager with emotional values, caching, and RAG capabilities."""
def __init__(self, storage_manager: StorageManager, cache_size: int = 1000,
max_memories_per_pet: int = 500):
"""
Initialize enhanced memory manager.
Args:
storage_manager: Storage manager for persistence
cache_size: Size of memory cache
max_memories_per_pet: Maximum memories to keep per pet
"""
self.storage_manager = storage_manager
self.max_memories_per_pet = max_memories_per_pet
# Memory cache for frequently accessed data
self.memory_cache = MemoryCache(max_size=cache_size)
# Pet memories storage (pet_id -> List[EmotionalMemory])
self.pet_memories: Dict[str, List[EmotionalMemory]] = defaultdict(list)
# RAG system for memory retrieval
self.rag_system = SimpleRAG()
# Memory statistics
self.memory_stats = defaultdict(lambda: {
'total_memories': 0,
'happy_memories': 0,
'stressful_memories': 0,
'neutral_memories': 0,
'last_cleanup': datetime.now()
})
# Background cleanup
self._cleanup_thread = None
self._stop_cleanup = False
logger.info("Enhanced memory manager initialized")
def add_memory(self, pet_id: str, content: str, memory_type: str,
emotional_value: float = 0.0, importance: float = 0.5,
tags: Optional[Set[str]] = None,
related_attributes: Optional[Dict[str, int]] = None) -> str:
"""
Add a new memory for a pet.
Args:
pet_id: Pet identifier
content: Memory content
memory_type: Type of memory ('interaction', 'action', 'event', 'detail')
emotional_value: Emotional value (-1.0 to 1.0)
importance: Importance level (0.0 to 1.0)
tags: Optional tags for categorization
related_attributes: Optional attribute changes related to this memory
Returns:
Memory ID
"""
memory_id = f"{pet_id}_{int(time.time() * 1000)}"
memory = EmotionalMemory(
id=memory_id,
timestamp=datetime.now(),
content=content,
memory_type=memory_type,
emotional_value=max(-1.0, min(1.0, emotional_value)),
importance=max(0.0, min(1.0, importance)),
tags=tags or set(),
related_attributes=related_attributes or {}
)
# Add to pet memories
self.pet_memories[pet_id].append(memory)
# Update statistics
self._update_memory_stats(pet_id, memory)
# Manage memory size
self._manage_memory_size(pet_id)
# Cache the memory
self.memory_cache.put(f"memory_{memory_id}", memory)
logger.debug(f"Added memory {memory_id} for pet {pet_id}: {content[:50]}...")
return memory_id
def add_interaction_memory(self, pet: DigiPal, interaction: Interaction) -> str:
"""
Add memory from an interaction with emotional context.
Args:
pet: DigiPal instance
interaction: Interaction to convert to memory
Returns:
Memory ID
"""
# Calculate emotional value based on interaction
emotional_value = self._calculate_emotional_value(interaction, pet)
# Calculate importance based on success and attribute changes
importance = self._calculate_importance(interaction)
# Extract tags from interaction
tags = self._extract_interaction_tags(interaction)
# Create memory content
content = f"User said: '{interaction.user_input}' - I responded: '{interaction.pet_response}'"
return self.add_memory(
pet_id=pet.id,
content=content,
memory_type='interaction',
emotional_value=emotional_value,
importance=importance,
tags=tags,
related_attributes=interaction.attribute_changes
)
def add_action_memory(self, pet_id: str, action: str, result: str,
attribute_changes: Dict[str, int]) -> str:
"""
Add memory from a care action.
Args:
pet_id: Pet identifier
action: Action performed
result: Result of the action
attribute_changes: Attribute changes from action
Returns:
Memory ID
"""
# Calculate emotional value based on attribute changes
emotional_value = 0.0
if 'happiness' in attribute_changes:
emotional_value += attribute_changes['happiness'] / 100.0
if 'energy' in attribute_changes:
emotional_value += attribute_changes['energy'] / 200.0
# Clamp emotional value
emotional_value = max(-1.0, min(1.0, emotional_value))
# Calculate importance based on magnitude of changes
importance = min(1.0, sum(abs(v) for v in attribute_changes.values()) / 100.0)
# Extract tags
tags = {action, 'action'}
if emotional_value > 0.3:
tags.add('positive')
elif emotional_value < -0.3:
tags.add('negative')
content = f"Action: {action} - Result: {result}"
return self.add_memory(
pet_id=pet_id,
content=content,
memory_type='action',
emotional_value=emotional_value,
importance=importance,
tags=tags,
related_attributes=attribute_changes
)
def add_life_event_memory(self, pet_id: str, event: str, emotional_impact: float = 0.0) -> str:
"""
Add memory for significant life events (evolution, achievements, etc.).
Args:
pet_id: Pet identifier
event: Event description
emotional_impact: Emotional impact of the event
Returns:
Memory ID
"""
return self.add_memory(
pet_id=pet_id,
content=event,
memory_type='event',
emotional_value=emotional_impact,
importance=0.9, # Life events are usually important
tags={'life_event', 'milestone'}
)
def get_relevant_memories(self, pet_id: str, query: str,
current_context: Optional[Dict[str, Any]] = None) -> List[EmotionalMemory]:
"""
Get relevant memories for a query using RAG.
Args:
pet_id: Pet identifier
query: Query text
current_context: Current pet state context
Returns:
List of relevant memories
"""
memories = self.pet_memories.get(pet_id, [])
if not memories:
return []
context = current_context or {}
relevant_memories = self.rag_system.retrieve_relevant_memories(query, memories, context)
# Update access counts for retrieved memories
for memory in relevant_memories:
memory.access_count += 1
memory.last_accessed = datetime.now()
return relevant_memories
def get_memory_context_for_llm(self, pet_id: str, query: str,
current_context: Optional[Dict[str, Any]] = None) -> str:
"""
Get formatted memory context for LLM input.
Args:
pet_id: Pet identifier
query: Current query
current_context: Current pet state
Returns:
Formatted memory context string
"""
relevant_memories = self.get_relevant_memories(pet_id, query, current_context)
if not relevant_memories:
return ""
context_parts = ["Recent relevant memories:"]
for memory in relevant_memories:
# Format memory with emotional context
emotional_indicator = ""
if memory.emotional_value > 0.3:
emotional_indicator = " (happy memory)"
elif memory.emotional_value < -0.3:
emotional_indicator = " (stressful memory)"
time_ago = self._format_time_ago(memory.timestamp)
context_parts.append(f"- {time_ago}: {memory.content}{emotional_indicator}")
return "\n".join(context_parts)
def get_emotional_state_summary(self, pet_id: str) -> Dict[str, Any]:
"""
Get emotional state summary based on recent memories.
Args:
pet_id: Pet identifier
Returns:
Emotional state summary
"""
memories = self.pet_memories.get(pet_id, [])
if not memories:
return {'overall_mood': 'neutral', 'recent_trend': 'stable'}
# Analyze recent memories (last 24 hours)
recent_cutoff = datetime.now() - timedelta(hours=24)
recent_memories = [m for m in memories if m.timestamp > recent_cutoff]
if not recent_memories:
recent_memories = memories[-10:] # Use last 10 if no recent ones
# Calculate emotional metrics
total_emotional_value = sum(m.emotional_value for m in recent_memories)
avg_emotional_value = total_emotional_value / len(recent_memories)
positive_memories = sum(1 for m in recent_memories if m.emotional_value > 0.1)
negative_memories = sum(1 for m in recent_memories if m.emotional_value < -0.1)
# Determine overall mood
if avg_emotional_value > 0.3:
overall_mood = 'very_happy'
elif avg_emotional_value > 0.1:
overall_mood = 'happy'
elif avg_emotional_value < -0.3:
overall_mood = 'stressed'
elif avg_emotional_value < -0.1:
overall_mood = 'unhappy'
else:
overall_mood = 'neutral'
# Determine trend
if len(recent_memories) >= 5:
first_half = recent_memories[:len(recent_memories)//2]
second_half = recent_memories[len(recent_memories)//2:]
first_avg = sum(m.emotional_value for m in first_half) / len(first_half)
second_avg = sum(m.emotional_value for m in second_half) / len(second_half)
if second_avg - first_avg > 0.2:
trend = 'improving'
elif first_avg - second_avg > 0.2:
trend = 'declining'
else:
trend = 'stable'
else:
trend = 'stable'
return {
'overall_mood': overall_mood,
'recent_trend': trend,
'avg_emotional_value': avg_emotional_value,
'positive_memories': positive_memories,
'negative_memories': negative_memories,
'total_recent_memories': len(recent_memories)
}
def cleanup_old_memories(self, pet_id: str, max_age_days: int = 30) -> int:
"""
Clean up old memories while preserving important ones.
Args:
pet_id: Pet identifier
max_age_days: Maximum age for memories in days
Returns:
Number of memories cleaned up
"""
memories = self.pet_memories.get(pet_id, [])
if not memories:
return 0
cutoff_date = datetime.now() - timedelta(days=max_age_days)
# Separate memories into keep and remove lists
keep_memories = []
removed_count = 0
for memory in memories:
# Always keep important memories or recent ones
if (memory.importance > 0.7 or
memory.timestamp > cutoff_date or
abs(memory.emotional_value) > 0.5):
keep_memories.append(memory)
else:
removed_count += 1
# Update memories list
self.pet_memories[pet_id] = keep_memories
# Update statistics
self.memory_stats[pet_id]['last_cleanup'] = datetime.now()
if removed_count > 0:
logger.info(f"Cleaned up {removed_count} old memories for pet {pet_id}")
return removed_count
def get_memory_statistics(self, pet_id: str) -> Dict[str, Any]:
"""Get memory statistics for a pet."""
memories = self.pet_memories.get(pet_id, [])
stats = self.memory_stats[pet_id].copy()
# Update current counts
stats['total_memories'] = len(memories)
stats['happy_memories'] = sum(1 for m in memories if m.emotional_value > 0.1)
stats['stressful_memories'] = sum(1 for m in memories if m.emotional_value < -0.1)
stats['neutral_memories'] = stats['total_memories'] - stats['happy_memories'] - stats['stressful_memories']
# Memory type breakdown
type_counts = defaultdict(int)
for memory in memories:
type_counts[memory.memory_type] += 1
stats['memory_types'] = dict(type_counts)
# Recent activity
recent_cutoff = datetime.now() - timedelta(hours=24)
stats['recent_memories'] = sum(1 for m in memories if m.timestamp > recent_cutoff)
return stats
def _calculate_emotional_value(self, interaction: Interaction, pet: DigiPal) -> float:
"""Calculate emotional value for an interaction."""
emotional_value = 0.0
# Base emotional value from success/failure
if interaction.success:
emotional_value += 0.2
else:
emotional_value -= 0.3
# Adjust based on interaction result
if interaction.result == InteractionResult.SUCCESS:
emotional_value += 0.1
elif interaction.result == InteractionResult.FAILURE:
emotional_value -= 0.2
elif interaction.result == InteractionResult.STAGE_INAPPROPRIATE:
emotional_value -= 0.1
# Adjust based on command type
command = interaction.interpreted_command.lower()
if command in ['good', 'praise']:
emotional_value += 0.4
elif command in ['bad', 'scold']:
emotional_value -= 0.3
elif command in ['play', 'fun']:
emotional_value += 0.2
elif command in ['eat', 'food'] and pet.energy < 50:
emotional_value += 0.3 # Food when hungry is very positive
# Adjust based on attribute changes
if 'happiness' in interaction.attribute_changes:
emotional_value += interaction.attribute_changes['happiness'] / 100.0
return max(-1.0, min(1.0, emotional_value))
def _calculate_importance(self, interaction: Interaction) -> float:
"""Calculate importance level for an interaction."""
importance = 0.5 # Base importance
# Increase importance for successful interactions
if interaction.success:
importance += 0.2
# Increase importance based on attribute changes
total_change = sum(abs(v) for v in interaction.attribute_changes.values())
importance += min(0.3, total_change / 100.0)
# Special commands are more important
special_commands = ['evolution', 'death', 'birth', 'milestone']
if any(cmd in interaction.interpreted_command.lower() for cmd in special_commands):
importance += 0.3
return max(0.0, min(1.0, importance))
def _extract_interaction_tags(self, interaction: Interaction) -> Set[str]:
"""Extract tags from an interaction."""
tags = {'interaction'}
command = interaction.interpreted_command.lower()
# Command-based tags
if command in ['eat', 'food', 'feed']:
tags.add('eating')
elif command in ['sleep', 'rest']:
tags.add('sleeping')
elif command in ['train', 'exercise']:
tags.add('training')
elif command in ['play', 'fun']:
tags.add('playing')
elif command in ['good', 'praise']:
tags.add('praise')
elif command in ['bad', 'scold']:
tags.add('discipline')
# Success/failure tags
if interaction.success:
tags.add('successful')
else:
tags.add('failed')
return tags
def _update_memory_stats(self, pet_id: str, memory: EmotionalMemory) -> None:
"""Update memory statistics."""
stats = self.memory_stats[pet_id]
stats['total_memories'] += 1
if memory.emotional_value > 0.1:
stats['happy_memories'] += 1
elif memory.emotional_value < -0.1:
stats['stressful_memories'] += 1
else:
stats['neutral_memories'] += 1
def _manage_memory_size(self, pet_id: str) -> None:
"""Manage memory size to prevent unlimited growth."""
memories = self.pet_memories[pet_id]
if len(memories) > self.max_memories_per_pet:
# Sort by importance and recency, keep the most important/recent
memories.sort(key=lambda m: (m.importance, m.timestamp.timestamp()), reverse=True)
# Keep top memories
self.pet_memories[pet_id] = memories[:self.max_memories_per_pet]
removed_count = len(memories) - self.max_memories_per_pet
logger.debug(f"Removed {removed_count} old memories for pet {pet_id}")
def _format_time_ago(self, timestamp: datetime) -> str:
"""Format timestamp as 'time ago' string."""
delta = datetime.now() - timestamp
if delta.days > 0:
return f"{delta.days} day{'s' if delta.days != 1 else ''} ago"
elif delta.seconds > 3600:
hours = delta.seconds // 3600
return f"{hours} hour{'s' if hours != 1 else ''} ago"
elif delta.seconds > 60:
minutes = delta.seconds // 60
return f"{minutes} minute{'s' if minutes != 1 else ''} ago"
else:
return "just now"
def start_background_cleanup(self) -> None:
"""Start background cleanup thread."""
if self._cleanup_thread and self._cleanup_thread.is_alive():
return
self._stop_cleanup = False
self._cleanup_thread = threading.Thread(target=self._background_cleanup_loop, daemon=True)
self._cleanup_thread.start()
logger.info("Started background memory cleanup")
def stop_background_cleanup(self) -> None:
"""Stop background cleanup thread."""
self._stop_cleanup = True
if self._cleanup_thread:
self._cleanup_thread.join(timeout=5)
def _background_cleanup_loop(self) -> None:
"""Background cleanup loop."""
while not self._stop_cleanup:
try:
# Clean up old memories for all pets
for pet_id in list(self.pet_memories.keys()):
self.cleanup_old_memories(pet_id)
# Force garbage collection
gc.collect()
# Sleep for 1 hour
time.sleep(3600)
except Exception as e:
logger.error(f"Error in background memory cleanup: {e}")
time.sleep(300) # Sleep 5 minutes on error
def shutdown(self) -> None:
"""Shutdown the memory manager."""
logger.info("Shutting down enhanced memory manager")
# Stop background cleanup
self.stop_background_cleanup()
# Shutdown cache
self.memory_cache.shutdown()
# Clear memories to free memory
self.pet_memories.clear()
self.memory_stats.clear()
logger.info("Enhanced memory manager shutdown complete")