Spaces:
Sleeping
Sleeping
File size: 6,426 Bytes
c899329 8853856 c899329 d091eda 8853856 d091eda c899329 d091eda c899329 d091eda c899329 d091eda 8853856 c899329 d091eda f18a5d7 c899329 d091eda c899329 d091eda d48dc2f d091eda f18a5d7 d091eda d48dc2f d091eda d48dc2f f18a5d7 d091eda f18a5d7 d48dc2f f18a5d7 d091eda f18a5d7 d091eda c899329 d091eda c899329 d091eda c899329 d091eda 0e43e5f c899329 d091eda f18a5d7 d091eda f18a5d7 d091eda 8853856 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 |
import json
import os
import sys
src_directory = os.path.abspath(os.path.join(os.path.dirname(__file__), "../..", "backend"))
sys.path.append(src_directory)
from datetime import datetime
from supabase import create_client, StorageException
from backend.utils import logger
from dotenv import load_dotenv
# Logger Initialization
logger = logger.get_logger()
# Load Environment Variables
load_dotenv()
SUPABASE_URL = os.getenv('SUPABASE_URL')
SUPABASE_KEY = os.getenv('SUPABASE_KEY')
SUPABASE_BUCKET = os.getenv('SUPABASE_BUCKET')
LLM_MODEL_NAME = os.getenv('LLM_MODEL_NAME')
BUCKET_FOLDER = "chat-history"
# Supabase Client Initialization
supabase = create_client(SUPABASE_URL, SUPABASE_KEY)
# File Path Generator
def _get_file_path(conversation_id: str) -> str:
"""
Generates the file path for storing chat history JSON files.
Args:
conversation_id (str): Unique identifier for the conversation.
Returns:
str: Path to the chat history JSON file.
"""
return f"chat-history/{conversation_id}.json"
# JSON Loader with Safe Handling
def _load_json(data: bytes) -> dict:
"""
Safely loads JSON data from a byte stream.
Args:
data (bytes): The byte stream to decode and parse as JSON.
Returns:
dict: Parsed JSON data or an empty dictionary on failure.
"""
try:
return json.loads(data.decode('utf-8'))
except (json.JSONDecodeError, AttributeError):
logger.error("Failed to decode JSON data.")
return {}
# JSON Dumper with Indentation
def _dump_json(data: dict) -> str:
"""
Formats data as a JSON string with indentation for better readability.
Args:
data (dict): The data to format.
Returns:
str: Formatted JSON string.
"""
return json.dumps(data, indent=4)
def store_chat_history(conversation_id: str, new_messages: list) -> dict:
"""
Stores or updates chat history in Supabase storage. If the file exists,
appends new messages; otherwise, creates a new file.
Args:
conversation_id (str): Unique identifier for the conversation.
new_messages (list): List of chat messages to store.
Returns:
dict: Operation success status and related message.
"""
try:
file_path = _get_file_path(conversation_id)
metadata = {
"timestamp": datetime.now().isoformat(),
"language": "en",
"model": LLM_MODEL_NAME
}
# Load Existing Data
try:
existing_data = supabase.storage.from_(SUPABASE_BUCKET).download(file_path)
chat_data = _load_json(existing_data)
if 'messages' not in chat_data:
chat_data['messages'] = []
chat_data['messages'].extend(new_messages)
logger.info(f"Messages appended to existing file for conversation ID: {conversation_id}")
except StorageException as e:
logger.warning(f"No existing file found. Creating new one for ID: {conversation_id}")
chat_data = {
"conversation_id": conversation_id,
"messages": new_messages,
"metadata": metadata
}
updated_json_data = _dump_json(chat_data)
supabase.storage.from_(SUPABASE_BUCKET).upload(
file_path, updated_json_data.encode('utf-8'),
file_options={"content-type": "application/json", "upsert": "true"}
)
return {"success": True, "message": "Chat history stored successfully."}
except StorageException as e:
logger.error(f"Supabase Storage error: {e}")
return {"success": False, "error": "Failed to store chat history. Storage error occurred."}
except Exception as e:
logger.error(f"Unexpected error while storing chat history: {e}")
return {"success": False, "error": "Unexpected error occurred while storing chat history."}
def retrieve_chat_history(conversation_id: str) -> dict:
"""
Retrieves chat history from Supabase storage based on the given conversation ID.
Args:
conversation_id (str): Unique identifier for the conversation.
Returns:
dict: Retrieved chat data or error message on failure.
"""
try:
file_path = _get_file_path(conversation_id)
existing_data = supabase.storage.from_(SUPABASE_BUCKET).download(file_path)
if not existing_data:
logger.warning(f"No chat history found for ID: {conversation_id}")
return {"success": False, "message": "No chat history found."}
return {"success": True, "data": _load_json(existing_data)}
except StorageException as e:
logger.error(f"Supabase Storage error while retrieving chat history: {e}")
return {"success": False, "error": "Failed to retrieve chat history. Storage error occurred."}
except Exception as e:
logger.error(f"Unexpected error retrieving chat history for ID {conversation_id}: {e}")
return {"success": False, "error": "Unexpected error occurred while retrieving chat history."}
def get_bucket_items():
"""
Retrieves item names from a specified Supabase storage bucket and returns them as a list,
excluding the '.json' extension and omitting the last item in the response.
This function uses the globally defined `SUPABASE_BUCKET` and `BUCKET_FOLDER` variables
to identify the bucket and folder path.
Returns:
list: A list of item names with '.json' removed, excluding the last item in the bucket.
Logs:
- An error if there are no items found in the bucket.
- An error if an exception occurs during the fetching process.
Example:
Suppose the bucket contains:
- "2025-03-18.json"
- "2025-03-19.json"
- "2025-03-20.json"
The function will return:
['2025-03-18', '2025-03-19']
Raises:
Exception: Logs an error if fetching bucket items fails.
"""
try:
response = supabase.storage.from_(SUPABASE_BUCKET).list(BUCKET_FOLDER)
conversation_ids = []
if response:
for item in response[:-1]:
conversation_ids.append(item['name'].replace('.json', ''))
return conversation_ids
else:
logger.error("No items found in the bucket.")
except Exception as e:
logger.error(f"Error fetching bucket items: {e}") |