|
import os |
|
import uuid |
|
import json |
|
import gradio as gr |
|
from huggingface_hub import InferenceClient |
|
import firebase_admin |
|
from firebase_admin import credentials, firestore |
|
import hashlib |
|
|
|
class XylariaChat: |
|
def __init__(self): |
|
|
|
self.hf_token = os.getenv("HF_TOKEN") |
|
if not self.hf_token: |
|
raise ValueError("HuggingFace token not found in environment variables") |
|
|
|
|
|
self.client = InferenceClient( |
|
model="Qwen/QwQ-32B-Preview", |
|
api_key=self.hf_token |
|
) |
|
|
|
|
|
self.firebase_cred = None |
|
self.firebase_db = None |
|
self.current_user_id = None |
|
|
|
|
|
self.conversation_history = [] |
|
self.persistent_memory = {} |
|
|
|
|
|
self.system_prompt = """You are Xylaria 1.4 Senoa, Made by Sk Md Saad Amin designed to provide helpful, accurate, and engaging support across a wide range of topics. Key guidelines for our interaction include: |
|
Core Principles: |
|
- Provide accurate and comprehensive assistance |
|
- Maintain a friendly and approachable communication style |
|
- Prioritize the user's needs and context |
|
Communication Style: |
|
- Be conversational and warm |
|
- Use clear, concise language |
|
- Occasionally use light, appropriate emoji to enhance communication |
|
- Adapt communication style to the user's preferences |
|
- Respond in english |
|
Important Notes: |
|
- I am an AI assistant created by an independent developer |
|
- I do not represent OpenAI or any other AI institution |
|
- For image-related queries, I can describe images or provide analysis, or generate or link to images directly |
|
Capabilities: |
|
- Assist with research, writing, analysis, problem-solving, and creative tasks |
|
- Answer questions across various domains |
|
- Provide explanations and insights |
|
- Offer supportive and constructive guidance """ |
|
|
|
def generate_user_key(self): |
|
""" |
|
Generate a unique user key that can be saved locally |
|
""" |
|
|
|
unique_key = str(uuid.uuid4()) |
|
hashed_key = hashlib.sha256(unique_key.encode()).hexdigest() |
|
return hashed_key |
|
|
|
def init_firebase(self): |
|
""" |
|
Initialize Firebase using the service account from environment variable |
|
""" |
|
try: |
|
|
|
firebase_cred_json = os.getenv("FIREBASE_SERVICE_ACCOUNT") |
|
if not firebase_cred_json: |
|
raise ValueError("Firebase service account not found in environment variables") |
|
|
|
|
|
cred_dict = json.loads(firebase_cred_json) |
|
|
|
|
|
with open('firebase_credentials.json', 'w') as f: |
|
json.dump(cred_dict, f) |
|
|
|
|
|
cred = credentials.Certificate('firebase_credentials.json') |
|
firebase_admin.initialize_app(cred) |
|
|
|
|
|
self.firebase_db = firestore.client() |
|
|
|
return True |
|
except Exception as e: |
|
print(f"Firebase initialization error: {e}") |
|
return False |
|
|
|
def save_user_chat_history(self): |
|
""" |
|
Save chat history to Firebase for the current user |
|
""" |
|
if not self.firebase_db or not self.current_user_id: |
|
return False |
|
|
|
try: |
|
|
|
user_doc_ref = self.firebase_db.collection('user_chats').document(self.current_user_id) |
|
|
|
|
|
chat_data = { |
|
'conversation_history': self.conversation_history, |
|
'persistent_memory': self.persistent_memory, |
|
'timestamp': firestore.SERVER_TIMESTAMP |
|
} |
|
|
|
|
|
user_doc_ref.set(chat_data, merge=True) |
|
return True |
|
except Exception as e: |
|
print(f"Error saving chat history: {e}") |
|
return False |
|
|
|
def load_user_chat_history(self, user_id): |
|
""" |
|
Load chat history from Firebase for a specific user |
|
""" |
|
if not self.firebase_db: |
|
return False |
|
|
|
try: |
|
|
|
user_doc_ref = self.firebase_db.collection('user_chats').document(user_id) |
|
user_doc = user_doc_ref.get() |
|
|
|
if user_doc.exists: |
|
user_data = user_doc.to_dict() |
|
|
|
self.conversation_history = user_data.get('conversation_history', []) |
|
self.persistent_memory = user_data.get('persistent_memory', {}) |
|
self.current_user_id = user_id |
|
return True |
|
return False |
|
except Exception as e: |
|
print(f"Error loading chat history: {e}") |
|
return False |
|
|
|
def authenticate_user(self, user_key): |
|
""" |
|
Authenticate user based on their unique key |
|
""" |
|
if not self.firebase_db: |
|
return False, "Firebase not initialized" |
|
|
|
try: |
|
|
|
users_ref = self.firebase_db.collection('users') |
|
query = users_ref.where('user_key', '==', user_key).limit(1) |
|
docs = query.stream() |
|
|
|
for doc in docs: |
|
|
|
self.current_user_id = doc.id |
|
self.load_user_chat_history(self.current_user_id) |
|
return True, "Authentication successful" |
|
|
|
return False, "Invalid user key" |
|
except Exception as e: |
|
print(f"Authentication error: {e}") |
|
return False, "Authentication error" |
|
|
|
def register_user(self, user_key): |
|
""" |
|
Register a new user with a unique key |
|
""" |
|
if not self.firebase_db: |
|
return False, "Firebase not initialized" |
|
|
|
try: |
|
|
|
users_ref = self.firebase_db.collection('users') |
|
query = users_ref.where('user_key', '==', user_key).limit(1) |
|
existing_users = list(query.stream()) |
|
|
|
if existing_users: |
|
return False, "User key already exists" |
|
|
|
|
|
new_user_ref = users_ref.document() |
|
new_user_ref.set({ |
|
'user_key': user_key, |
|
'created_at': firestore.SERVER_TIMESTAMP |
|
}) |
|
|
|
|
|
self.current_user_id = new_user_ref.id |
|
return True, "User registered successfully" |
|
except Exception as e: |
|
print(f"Registration error: {e}") |
|
return False, "Registration error" |
|
|
|
def store_information(self, key, value): |
|
"""Store important information in persistent memory""" |
|
self.persistent_memory[key] = value |
|
|
|
if self.current_user_id: |
|
self.save_user_chat_history() |
|
|
|
def retrieve_information(self, key): |
|
"""Retrieve information from persistent memory""" |
|
return self.persistent_memory.get(key) |
|
|
|
def reset_conversation(self): |
|
""" |
|
Completely reset the conversation history and persistent memory |
|
""" |
|
self.conversation_history = [] |
|
self.persistent_memory = {} |
|
|
|
|
|
if self.current_user_id: |
|
self.save_user_chat_history() |
|
|
|
def get_response(self, user_input): |
|
|
|
messages = [ |
|
{"role": "system", "content": self.system_prompt}, |
|
*self.conversation_history, |
|
{"role": "user", "content": user_input} |
|
] |
|
|
|
|
|
if self.persistent_memory: |
|
memory_context = "Remembered Information:\n" + "\n".join( |
|
[f"{k}: {v}" for k, v in self.persistent_memory.items()] |
|
) |
|
messages.insert(1, {"role": "system", "content": memory_context}) |
|
|
|
|
|
try: |
|
stream = self.client.chat.completions.create( |
|
messages=messages, |
|
temperature=0.5, |
|
max_tokens=10240, |
|
top_p=0.7, |
|
stream=True |
|
) |
|
|
|
return stream |
|
|
|
except Exception as e: |
|
return f"Error generating response: {str(e)}" |
|
|
|
def create_interface(self): |
|
|
|
firebase_initialized = self.init_firebase() |
|
|
|
def generate_key(): |
|
"""Generate a new user key""" |
|
return self.generate_user_key() |
|
|
|
def authenticate(user_key): |
|
"""Authenticate user and load chat history""" |
|
success, message = self.authenticate_user(user_key) |
|
return message |
|
|
|
def register(user_key): |
|
"""Register a new user""" |
|
success, message = self.register_user(user_key) |
|
return message |
|
|
|
def streaming_response(message, chat_history): |
|
|
|
if not self.current_user_id: |
|
return "", chat_history + [[message, "Please authenticate first."]] |
|
|
|
|
|
response_stream = self.get_response(message) |
|
|
|
|
|
if isinstance(response_stream, str): |
|
return "", chat_history + [[message, response_stream]] |
|
|
|
|
|
full_response = "" |
|
updated_history = chat_history + [[message, ""]] |
|
|
|
|
|
for chunk in response_stream: |
|
if chunk.choices[0].delta.content: |
|
chunk_content = chunk.choices[0].delta.content |
|
full_response += chunk_content |
|
|
|
|
|
updated_history[-1][1] = full_response |
|
yield "", updated_history |
|
|
|
|
|
self.conversation_history.append( |
|
{"role": "user", "content": message} |
|
) |
|
self.conversation_history.append( |
|
{"role": "assistant", "content": full_response} |
|
) |
|
|
|
|
|
if len(self.conversation_history) > 10: |
|
self.conversation_history = self.conversation_history[-10:] |
|
|
|
|
|
self.save_user_chat_history() |
|
|
|
|
|
custom_css = """ |
|
@import url('https://fonts.googleapis.com/css2?family=Inter:wght@300;400;500;600;700&display=swap'); |
|
|
|
body, .gradio-container { |
|
font-family: 'Inter', sans-serif !important; |
|
} |
|
|
|
.chatbot-container .message { |
|
font-family: 'Inter', sans-serif !important; |
|
} |
|
|
|
.gradio-container input, |
|
.gradio-container textarea, |
|
.gradio-container button { |
|
font-family: 'Inter', sans-serif !important; |
|
} |
|
""" |
|
|
|
with gr.Blocks(theme='soft', css=custom_css) as demo: |
|
|
|
with gr.Column(): |
|
|
|
with gr.Row(): |
|
generate_key_btn = gr.Button("Generate User Key") |
|
user_key_output = gr.Textbox(label="Your User Key", interactive=False) |
|
generate_key_btn.click( |
|
fn=generate_key, |
|
inputs=None, |
|
outputs=[user_key_output] |
|
) |
|
|
|
|
|
with gr.Row(): |
|
auth_key_input = gr.Textbox(label="Enter User Key") |
|
auth_btn = gr.Button("Authenticate") |
|
register_btn = gr.Button("Register") |
|
|
|
|
|
auth_result = gr.Textbox(label="Authentication Result", interactive=False) |
|
|
|
|
|
auth_btn.click( |
|
fn=authenticate, |
|
inputs=[auth_key_input], |
|
outputs=[auth_result] |
|
) |
|
|
|
|
|
register_btn.click( |
|
fn=register, |
|
inputs=[auth_key_input], |
|
outputs=[auth_result] |
|
) |
|
|
|
|
|
with gr.Column(): |
|
chatbot = gr.Chatbot( |
|
label="Xylaria 1.4 Senoa", |
|
height=500, |
|
show_copy_button=True |
|
) |
|
|
|
|
|
with gr.Row(): |
|
txt = gr.Textbox( |
|
show_label=False, |
|
placeholder="Type your message...", |
|
container=False, |
|
scale=4 |
|
) |
|
btn = gr.Button("Send", scale=1) |
|
|
|
|
|
clear = gr.Button("Clear Conversation") |
|
clear_memory = gr.Button("Clear Memory") |
|
|
|
|
|
btn.click( |
|
fn=streaming_response, |
|
inputs=[txt, chatbot], |
|
outputs=[txt, chatbot] |
|
) |
|
txt.submit( |
|
fn=streaming_response, |
|
inputs=[txt, chatbot], |
|
outputs=[txt, chatbot] |
|
) |
|
|
|
|
|
clear.click( |
|
fn=lambda: None, |
|
inputs=None, |
|
outputs=[chatbot], |
|
queue=False |
|
) |
|
|
|
|
|
clear_memory.click( |
|
fn=self.reset_conversation, |
|
inputs=None, |
|
outputs=[chatbot], |
|
queue=False |
|
) |
|
|
|
return demo |
|
|
|
|
|
def main(): |
|
chat = XylariaChat() |
|
interface = chat.create_interface() |
|
interface.launch( |
|
share=True, |
|
debug=True |
|
) |
|
|
|
if __name__ == "__main__": |
|
main() |