|
"""Conversation tracking for BF chatbot""" |
|
|
|
import json |
|
import os |
|
from datetime import datetime |
|
from typing import List, Dict, Any |
|
|
|
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
|
CONVERSATIONS_FILE = os.path.join(BASE_DIR, "conversations.json") |
|
PUBLIC_FILE = os.path.join(BASE_DIR, "public", "conversations.json") |
|
MAX_CONVERSATIONS = 100 |
|
|
|
def load_conversations(): |
|
"""Load conversation history from file""" |
|
if os.path.exists(CONVERSATIONS_FILE): |
|
try: |
|
with open(CONVERSATIONS_FILE, 'r', encoding='utf-8') as f: |
|
return json.load(f) |
|
except: |
|
return [] |
|
return [] |
|
|
|
def save_conversations(conversations): |
|
"""Save conversations to file""" |
|
try: |
|
|
|
if len(conversations) > MAX_CONVERSATIONS: |
|
conversations = conversations[-MAX_CONVERSATIONS:] |
|
|
|
|
|
with open(CONVERSATIONS_FILE, 'w', encoding='utf-8') as f: |
|
json.dump(conversations, f, ensure_ascii=False, indent=2) |
|
print(f"✅ Saved {len(conversations)} conversations to {CONVERSATIONS_FILE}") |
|
|
|
|
|
try: |
|
|
|
os.makedirs(os.path.dirname(PUBLIC_FILE), exist_ok=True) |
|
with open(PUBLIC_FILE, 'w', encoding='utf-8') as f: |
|
json.dump(conversations, f, ensure_ascii=False, indent=2) |
|
print(f"✅ Also saved to {PUBLIC_FILE}") |
|
except: |
|
pass |
|
|
|
except Exception as e: |
|
print(f"❌ Error saving conversations: {e}") |
|
|
|
global _memory_conversations |
|
_memory_conversations = conversations |
|
|
|
|
|
_memory_conversations = [] |
|
|
|
def add_conversation(user_message, bot_response): |
|
"""Add a new conversation to history""" |
|
conversations = load_conversations() |
|
|
|
conversation = { |
|
"timestamp": datetime.now().isoformat(), |
|
"user": user_message, |
|
"bot": bot_response |
|
} |
|
|
|
conversations.append(conversation) |
|
save_conversations(conversations) |
|
|
|
return conversation |