File size: 2,324 Bytes
5889992 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
from typing import Dict, Any, Optional
from .redis_connection import RedisConnection
from src.llm.models.schemas import ConversationResponse
import json
import time
class RedisMemoryManager:
def __init__(self):
self.redis = RedisConnection().client
def store_conversation(self, session_id: str, chat_id: str, response: ConversationResponse) -> None:
"""
Store complete conversation response with metadata
"""
response_data = response.dict()
timestamp = time.time()
# Store in session-specific hash
self.redis.hset(
f"session:{session_id}:chats",
chat_id,
json.dumps({
'response': response_data,
'timestamp': timestamp
})
)
# Update session metadata
self.redis.hset(
f"session:{session_id}",
mapping={
'last_chat_id': chat_id,
'last_updated': str(timestamp)
}
)
def get_conversation(self, session_id: str, chat_id: str) -> Optional[ConversationResponse]:
"""
Retrieve specific conversation response
"""
data = self.redis.hget(f"session:{session_id}:chats", chat_id)
if data:
return ConversationResponse(**json.loads(data)['response'])
return None
def get_session_conversations(self, session_id: str) -> Dict[str, Any]:
"""
Get all conversations for a session
"""
conversations = self.redis.hgetall(f"session:{session_id}:chats")
return {
chat_id: ConversationResponse(**json.loads(data)['response'])
for chat_id, data in conversations.items()
}
def update_emotional_state(self, session_id: str, emotions: Dict[str, Any]) -> None:
"""
Update emotional state tracking
"""
self.redis.hset(
f"session:{session_id}:state",
'emotions',
json.dumps(emotions)
)
def get_emotional_state(self, session_id: str) -> Dict[str, Any]:
"""
Retrieve current emotional state
"""
data = self.redis.hget(f"session:{session_id}:state", 'emotions')
return json.loads(data) if data else {} |