Spaces:
Sleeping
Sleeping
Create memory_logic.py
Browse files- memory_logic.py +163 -0
memory_logic.py
ADDED
@@ -0,0 +1,163 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
import os
|
2 |
+
import json
|
3 |
+
import logging
|
4 |
+
from datetime import datetime
|
5 |
+
import re # For insight format validation
|
6 |
+
|
7 |
+
logger = logging.getLogger(__name__)
|
8 |
+
|
9 |
+
DATA_DIR = "app_data"
|
10 |
+
MEMORIES_FILE = os.path.join(DATA_DIR, "conversation_memories.jsonl") # JSON Lines format
|
11 |
+
RULES_FILE = os.path.join(DATA_DIR, "learned_rules.jsonl") # Rules/Insights, also JSON Lines
|
12 |
+
|
13 |
+
# Ensure data directory exists
|
14 |
+
os.makedirs(DATA_DIR, exist_ok=True)
|
15 |
+
|
16 |
+
# --- Rules/Insights Management ---
|
17 |
+
|
18 |
+
def load_rules_from_file() -> list[str]:
|
19 |
+
"""Loads rules (insights) from the JSON Lines file."""
|
20 |
+
rules = []
|
21 |
+
if not os.path.exists(RULES_FILE):
|
22 |
+
return rules
|
23 |
+
try:
|
24 |
+
with open(RULES_FILE, 'r', encoding='utf-8') as f:
|
25 |
+
for line in f:
|
26 |
+
if line.strip():
|
27 |
+
try:
|
28 |
+
# Assuming each line is a JSON object like {"rule_text": "...", "timestamp": "..."}
|
29 |
+
# For simplicity, if we only stored the text previously, adapt here.
|
30 |
+
# Let's assume we store {"text": "rule_text_content"}
|
31 |
+
data = json.loads(line)
|
32 |
+
if "text" in data and isinstance(data["text"], str) and data["text"].strip():
|
33 |
+
rules.append(data["text"].strip())
|
34 |
+
elif isinstance(data, str): # If old format was just text per line
|
35 |
+
rules.append(data.strip())
|
36 |
+
|
37 |
+
except json.JSONDecodeError:
|
38 |
+
logger.warning(f"Skipping malformed JSON line in rules file: {line.strip()}")
|
39 |
+
logger.info(f"Loaded {len(rules)} rules from {RULES_FILE}")
|
40 |
+
except Exception as e:
|
41 |
+
logger.error(f"Error loading rules from {RULES_FILE}: {e}", exc_info=True)
|
42 |
+
return sorted(list(set(rules))) # Ensure unique and sorted
|
43 |
+
|
44 |
+
def save_rule_to_file(rule_text: str) -> bool:
|
45 |
+
"""Saves a single rule (insight) to the JSON Lines file if it's new and valid."""
|
46 |
+
rule_text = rule_text.strip()
|
47 |
+
if not rule_text:
|
48 |
+
logger.warning("Attempted to save an empty rule.")
|
49 |
+
return False
|
50 |
+
|
51 |
+
# Validate format: [TYPE|SCORE] Text
|
52 |
+
if not re.match(r"\[(CORE_RULE|RESPONSE_PRINCIPLE|BEHAVIORAL_ADJUSTMENT|GENERAL_LEARNING)\|([\d\.]+?)\](.*)", rule_text, re.I|re.DOTALL):
|
53 |
+
logger.warning(f"Rule '{rule_text[:50]}...' has invalid format. Not saving.")
|
54 |
+
return False
|
55 |
+
|
56 |
+
current_rules = load_rules_from_file()
|
57 |
+
if rule_text in current_rules:
|
58 |
+
logger.info(f"Rule '{rule_text[:50]}...' already exists. Not saving duplicate.")
|
59 |
+
return False # Or True if "already exists" is considered success
|
60 |
+
|
61 |
+
try:
|
62 |
+
with open(RULES_FILE, 'a', encoding='utf-8') as f:
|
63 |
+
# Store as JSON object for potential future metadata
|
64 |
+
json.dump({"text": rule_text, "added_at": datetime.utcnow().isoformat()}, f)
|
65 |
+
f.write('\n')
|
66 |
+
logger.info(f"Saved new rule: {rule_text[:70]}...")
|
67 |
+
return True
|
68 |
+
except Exception as e:
|
69 |
+
logger.error(f"Error saving rule '{rule_text[:50]}...' to {RULES_FILE}: {e}", exc_info=True)
|
70 |
+
return False
|
71 |
+
|
72 |
+
def delete_rule_from_file(rule_text_to_delete: str) -> bool:
|
73 |
+
"""Deletes a rule from the file."""
|
74 |
+
rule_text_to_delete = rule_text_to_delete.strip()
|
75 |
+
if not rule_text_to_delete: return False
|
76 |
+
|
77 |
+
current_rules = load_rules_from_file()
|
78 |
+
if rule_text_to_delete not in current_rules:
|
79 |
+
logger.info(f"Rule '{rule_text_to_delete[:50]}...' not found for deletion.")
|
80 |
+
return False
|
81 |
+
|
82 |
+
updated_rules = [rule for rule in current_rules if rule != rule_text_to_delete]
|
83 |
+
try:
|
84 |
+
with open(RULES_FILE, 'w', encoding='utf-8') as f: # Overwrite with updated list
|
85 |
+
for rule_text in updated_rules:
|
86 |
+
json.dump({"text": rule_text, "added_at": "unknown"}, f) # timestamp lost on rewrite this way
|
87 |
+
f.write('\n')
|
88 |
+
logger.info(f"Deleted rule: {rule_text_to_delete[:70]}...")
|
89 |
+
return True
|
90 |
+
except Exception as e:
|
91 |
+
logger.error(f"Error deleting rule '{rule_text_to_delete[:50]}...' from {RULES_FILE}: {e}", exc_info=True)
|
92 |
+
return False
|
93 |
+
|
94 |
+
|
95 |
+
# --- Conversation Memories Management ---
|
96 |
+
|
97 |
+
def load_memories_from_file() -> list[dict]:
|
98 |
+
"""Loads conversation memories from the JSON Lines file."""
|
99 |
+
memories = []
|
100 |
+
if not os.path.exists(MEMORIES_FILE):
|
101 |
+
return memories
|
102 |
+
try:
|
103 |
+
with open(MEMORIES_FILE, 'r', encoding='utf-8') as f:
|
104 |
+
for line in f:
|
105 |
+
if line.strip():
|
106 |
+
try:
|
107 |
+
mem_obj = json.loads(line)
|
108 |
+
# Basic validation for expected keys
|
109 |
+
if all(k in mem_obj for k in ["user_input", "bot_response", "metrics", "timestamp"]):
|
110 |
+
memories.append(mem_obj)
|
111 |
+
else:
|
112 |
+
logger.warning(f"Skipping memory object with missing keys: {line.strip()}")
|
113 |
+
except json.JSONDecodeError:
|
114 |
+
logger.warning(f"Skipping malformed JSON line in memories file: {line.strip()}")
|
115 |
+
logger.info(f"Loaded {len(memories)} memories from {MEMORIES_FILE}")
|
116 |
+
except Exception as e:
|
117 |
+
logger.error(f"Error loading memories from {MEMORIES_FILE}: {e}", exc_info=True)
|
118 |
+
# Sort by timestamp if needed, though append-only usually keeps order
|
119 |
+
return sorted(memories, key=lambda x: x.get("timestamp", ""))
|
120 |
+
|
121 |
+
|
122 |
+
def save_memory_to_file(user_input: str, bot_response: str, metrics: dict) -> bool:
|
123 |
+
"""Saves a conversation memory to the JSON Lines file."""
|
124 |
+
if not user_input or not bot_response: # Metrics can be empty
|
125 |
+
logger.warning("Attempted to save memory with empty user input or bot response.")
|
126 |
+
return False
|
127 |
+
|
128 |
+
memory_entry = {
|
129 |
+
"user_input": user_input,
|
130 |
+
"bot_response": bot_response,
|
131 |
+
"metrics": metrics,
|
132 |
+
"timestamp": datetime.utcnow().isoformat()
|
133 |
+
}
|
134 |
+
|
135 |
+
try:
|
136 |
+
with open(MEMORIES_FILE, 'a', encoding='utf-8') as f:
|
137 |
+
json.dump(memory_entry, f)
|
138 |
+
f.write('\n')
|
139 |
+
logger.info(f"Saved new memory. User: {user_input[:50]}...")
|
140 |
+
return True
|
141 |
+
except Exception as e:
|
142 |
+
logger.error(f"Error saving memory to {MEMORIES_FILE}: {e}", exc_info=True)
|
143 |
+
return False
|
144 |
+
|
145 |
+
def clear_all_rules() -> bool:
|
146 |
+
try:
|
147 |
+
if os.path.exists(RULES_FILE):
|
148 |
+
os.remove(RULES_FILE)
|
149 |
+
logger.info("All rules cleared.")
|
150 |
+
return True
|
151 |
+
except Exception as e:
|
152 |
+
logger.error(f"Error clearing rules file {RULES_FILE}: {e}")
|
153 |
+
return False
|
154 |
+
|
155 |
+
def clear_all_memories() -> bool:
|
156 |
+
try:
|
157 |
+
if os.path.exists(MEMORIES_FILE):
|
158 |
+
os.remove(MEMORIES_FILE)
|
159 |
+
logger.info("All memories cleared.")
|
160 |
+
return True
|
161 |
+
except Exception as e:
|
162 |
+
logger.error(f"Error clearing memories file {MEMORIES_FILE}: {e}")
|
163 |
+
return False
|