File size: 6,426 Bytes
c899329
 
8853856
 
 
c899329
d091eda
8853856
d091eda
c899329
d091eda
 
 
 
 
c899329
d091eda
c899329
d091eda
8853856
c899329
d091eda
f18a5d7
c899329
d091eda
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c899329
d091eda
d48dc2f
 
 
 
 
 
d091eda
f18a5d7
 
d091eda
 
 
d48dc2f
d091eda
 
 
d48dc2f
 
 
 
 
f18a5d7
d091eda
f18a5d7
d48dc2f
 
f18a5d7
 
d091eda
f18a5d7
d091eda
 
 
c899329
d091eda
 
c899329
d091eda
 
 
 
 
 
 
 
 
 
c899329
d091eda
0e43e5f
c899329
d091eda
 
 
f18a5d7
d091eda
 
 
 
 
f18a5d7
d091eda
8853856
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
import json
import os
import sys
src_directory = os.path.abspath(os.path.join(os.path.dirname(__file__), "../..", "backend"))
sys.path.append(src_directory)
from datetime import datetime
from supabase import create_client, StorageException
from backend.utils import logger
from dotenv import load_dotenv

# Logger Initialization
logger = logger.get_logger()

# Load Environment Variables
load_dotenv()
SUPABASE_URL = os.getenv('SUPABASE_URL')
SUPABASE_KEY = os.getenv('SUPABASE_KEY')
SUPABASE_BUCKET = os.getenv('SUPABASE_BUCKET')
LLM_MODEL_NAME = os.getenv('LLM_MODEL_NAME')
BUCKET_FOLDER = "chat-history"

# Supabase Client Initialization
supabase = create_client(SUPABASE_URL, SUPABASE_KEY)

# File Path Generator
def _get_file_path(conversation_id: str) -> str:
    """
    Generates the file path for storing chat history JSON files.

    Args:
        conversation_id (str): Unique identifier for the conversation.

    Returns:
        str: Path to the chat history JSON file.
    """
    return f"chat-history/{conversation_id}.json"

# JSON Loader with Safe Handling
def _load_json(data: bytes) -> dict:
    """
    Safely loads JSON data from a byte stream.

    Args:
        data (bytes): The byte stream to decode and parse as JSON.

    Returns:
        dict: Parsed JSON data or an empty dictionary on failure.
    """
    try:
        return json.loads(data.decode('utf-8'))
    except (json.JSONDecodeError, AttributeError):
        logger.error("Failed to decode JSON data.")
        return {}

# JSON Dumper with Indentation
def _dump_json(data: dict) -> str:
    """
    Formats data as a JSON string with indentation for better readability.

    Args:
        data (dict): The data to format.

    Returns:
        str: Formatted JSON string.
    """
    return json.dumps(data, indent=4)

def store_chat_history(conversation_id: str, new_messages: list) -> dict:
    """
    Stores or updates chat history in Supabase storage. If the file exists,
    appends new messages; otherwise, creates a new file.

    Args:
        conversation_id (str): Unique identifier for the conversation.
        new_messages (list): List of chat messages to store.

    Returns:
        dict: Operation success status and related message.
    """
    try:
        file_path = _get_file_path(conversation_id)
        metadata = {
            "timestamp": datetime.now().isoformat(),
            "language": "en",
            "model": LLM_MODEL_NAME
        }

        # Load Existing Data
        try:
            existing_data = supabase.storage.from_(SUPABASE_BUCKET).download(file_path)
            chat_data = _load_json(existing_data)
            if 'messages' not in chat_data:
                chat_data['messages'] = []
            chat_data['messages'].extend(new_messages)
            logger.info(f"Messages appended to existing file for conversation ID: {conversation_id}")
        except StorageException as e:
            logger.warning(f"No existing file found. Creating new one for ID: {conversation_id}")
            chat_data = {
                "conversation_id": conversation_id,
                "messages": new_messages,
                "metadata": metadata
            }

        updated_json_data = _dump_json(chat_data)
        supabase.storage.from_(SUPABASE_BUCKET).upload(
            file_path, updated_json_data.encode('utf-8'),
            file_options={"content-type": "application/json", "upsert": "true"}
        )

        return {"success": True, "message": "Chat history stored successfully."}

    except StorageException as e:
        logger.error(f"Supabase Storage error: {e}")
        return {"success": False, "error": "Failed to store chat history. Storage error occurred."}
    except Exception as e:
        logger.error(f"Unexpected error while storing chat history: {e}")
        return {"success": False, "error": "Unexpected error occurred while storing chat history."}

def retrieve_chat_history(conversation_id: str) -> dict:
    """
    Retrieves chat history from Supabase storage based on the given conversation ID.

    Args:
        conversation_id (str): Unique identifier for the conversation.

    Returns:
        dict: Retrieved chat data or error message on failure.
    """
    try:
        file_path = _get_file_path(conversation_id)
        existing_data = supabase.storage.from_(SUPABASE_BUCKET).download(file_path)

        if not existing_data:
            logger.warning(f"No chat history found for ID: {conversation_id}")
            return {"success": False, "message": "No chat history found."}

        return {"success": True, "data": _load_json(existing_data)}

    except StorageException as e:
        logger.error(f"Supabase Storage error while retrieving chat history: {e}")
        return {"success": False, "error": "Failed to retrieve chat history. Storage error occurred."}
    except Exception as e:
        logger.error(f"Unexpected error retrieving chat history for ID {conversation_id}: {e}")
        return {"success": False, "error": "Unexpected error occurred while retrieving chat history."}
    
def get_bucket_items():
    """
    Retrieves item names from a specified Supabase storage bucket and returns them as a list, 
    excluding the '.json' extension and omitting the last item in the response.

    This function uses the globally defined `SUPABASE_BUCKET` and `BUCKET_FOLDER` variables 
    to identify the bucket and folder path.

    Returns:
        list: A list of item names with '.json' removed, excluding the last item in the bucket.

    Logs:
        - An error if there are no items found in the bucket.
        - An error if an exception occurs during the fetching process.

    Example:
        Suppose the bucket contains:
        - "2025-03-18.json"
        - "2025-03-19.json"
        - "2025-03-20.json"

        The function will return:
        ['2025-03-18', '2025-03-19']

    Raises:
        Exception: Logs an error if fetching bucket items fails.
    """
    try:
        response = supabase.storage.from_(SUPABASE_BUCKET).list(BUCKET_FOLDER)
        conversation_ids = []
        if response:
            for item in response[:-1]:
                conversation_ids.append(item['name'].replace('.json', ''))
            return conversation_ids
        else:
            logger.error("No items found in the bucket.")
    except Exception as e:
        logger.error(f"Error fetching bucket items: {e}")