Spaces:
Running
Running
import requests | |
import os | |
import json | |
import streamlit as st | |
from datetime import datetime, timedelta | |
import time | |
import uuid | |
# Page configuration | |
st.set_page_config( | |
page_title="Chat Flow π·", | |
page_icon="π¬", | |
initial_sidebar_state="expanded" # Changed to show chat history by default | |
) | |
# Enhanced CSS with chat history styling | |
st.markdown(""" | |
<style> | |
.stApp { | |
background: white; | |
} | |
.main .block-container { | |
max-width: 800px; | |
} | |
#MainMenu {visibility: hidden;} | |
footer {visibility: hidden;} | |
header {visibility: hidden;} | |
.stDeployButton {display: none;} | |
.model-id { | |
color: #28a745; | |
font-family: monospace; | |
} | |
.model-attribution { | |
color: #28a745; | |
font-size: 0.8em; | |
font-style: italic; | |
} | |
/* Chat history styling */ | |
.chat-history-item { | |
padding: 8px 12px; | |
margin: 4px 0; | |
border-radius: 8px; | |
border: 1px solid #e0e0e0; | |
background: #f8f9fa; | |
cursor: pointer; | |
transition: all 0.2s; | |
} | |
.chat-history-item:hover { | |
background: #e9ecef; | |
border-color: #28a745; | |
} | |
.chat-history-item.active { | |
background: #28a745; | |
color: white; | |
border-color: #28a745; | |
} | |
.chat-title { | |
font-weight: 500; | |
font-size: 0.9em; | |
margin-bottom: 2px; | |
} | |
.chat-date { | |
font-size: 0.75em; | |
opacity: 0.7; | |
} | |
.new-chat-btn { | |
width: 100%; | |
margin-bottom: 16px; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
# File to store chat history | |
HISTORY_FILE = "chat_history.json" | |
# NEW: File to store online users | |
USERS_FILE = "online_users.json" | |
# NEW: File to store chat sessions | |
SESSIONS_FILE = "chat_sessions.json" | |
def load_chat_history(): | |
"""Load chat history from file""" | |
try: | |
if os.path.exists(HISTORY_FILE): | |
with open(HISTORY_FILE, 'r', encoding='utf-8') as f: | |
return json.load(f) | |
except Exception as e: | |
st.error(f"Error loading chat history: {e}") | |
return [] | |
def save_chat_history(messages): | |
"""Save chat history to file""" | |
try: | |
with open(HISTORY_FILE, 'w', encoding='utf-8') as f: | |
json.dump(messages, f, ensure_ascii=False, indent=2) | |
except Exception as e: | |
st.error(f"Error saving chat history: {e}") | |
def clear_chat_history(): | |
"""Clear chat history file""" | |
try: | |
if os.path.exists(HISTORY_FILE): | |
os.remove(HISTORY_FILE) | |
st.session_state.messages = [] | |
except Exception as e: | |
st.error(f"Error clearing chat history: {e}") | |
# NEW: Chat Sessions Management | |
def load_chat_sessions(): | |
"""Load all chat sessions""" | |
try: | |
if os.path.exists(SESSIONS_FILE): | |
with open(SESSIONS_FILE, 'r', encoding='utf-8') as f: | |
return json.load(f) | |
except Exception as e: | |
st.error(f"Error loading chat sessions: {e}") | |
return {} | |
def save_chat_sessions(sessions): | |
"""Save chat sessions to file""" | |
try: | |
with open(SESSIONS_FILE, 'w', encoding='utf-8') as f: | |
json.dump(sessions, f, ensure_ascii=False, indent=2) | |
except Exception as e: | |
st.error(f"Error saving chat sessions: {e}") | |
def get_session_id(): | |
"""Get or create session ID""" | |
if 'session_id' not in st.session_state: | |
st.session_state.session_id = str(uuid.uuid4()) | |
return st.session_state.session_id | |
def get_chat_title(messages): | |
"""Generate a title for the chat based on conversation content using AI""" | |
if not messages: | |
return "New Chat" | |
# If only one message, use first 30 characters | |
if len(messages) <= 1: | |
for msg in messages: | |
if msg["role"] == "user": | |
content = msg["content"] | |
if len(content) > 30: | |
return content[:30] + "..." | |
return content | |
return "New Chat" | |
# If we have a conversation, use AI to generate a smart title | |
try: | |
return generate_smart_title(messages) | |
except: | |
# Fallback to first message if AI title generation fails | |
for msg in messages: | |
if msg["role"] == "user": | |
content = msg["content"] | |
if len(content) > 30: | |
return content[:30] + "..." | |
return content | |
return "New Chat" | |
def generate_smart_title(messages): | |
"""Use AI to generate a smart title for the conversation""" | |
if not OPENROUTER_API_KEY: | |
# Fallback if no API key | |
for msg in messages: | |
if msg["role"] == "user": | |
content = msg["content"] | |
if len(content) > 30: | |
return content[:30] + "..." | |
return content | |
return "New Chat" | |
# Prepare conversation summary for title generation | |
conversation_text = "" | |
message_count = 0 | |
for msg in messages: | |
if message_count >= 6: # Limit to first 6 messages for title generation | |
break | |
if msg["role"] in ["user", "assistant"]: | |
role = "User" if msg["role"] == "user" else "Assistant" | |
# Clean the message content | |
content = msg["content"] | |
if "Response created by:" in content: | |
content = content.split("\n\n---\n*Response created by:")[0] | |
conversation_text += f"{role}: {content[:200]}...\n" | |
message_count += 1 | |
# Create prompt for title generation | |
title_prompt = f"""Based on this conversation, generate a short, descriptive title (2-5 words max): | |
{conversation_text} | |
Generate only a brief title that captures the main topic. Examples: | |
- "Python Code Help" | |
- "Recipe Ideas" | |
- "Travel Planning" | |
- "Math Problem" | |
- "Writing Assistance" | |
Title:""" | |
url = "https://openrouter.ai/api/v1/chat/completions" | |
headers = { | |
"Content-Type": "application/json", | |
"Authorization": f"Bearer {OPENROUTER_API_KEY}", | |
"HTTP-Referer": "http://localhost:8501", | |
"X-Title": "Streamlit AI Assistant" | |
} | |
data = { | |
"model": "openai/gpt-3.5-turbo", # Use fast model for title generation | |
"messages": [{"role": "user", "content": title_prompt}], | |
"max_tokens": 20, # Short response | |
"temperature": 0.3, # More focused | |
"stream": False # Don't stream for title generation | |
} | |
try: | |
response = requests.post(url, headers=headers, json=data, timeout=10) | |
if response.status_code == 200: | |
result = response.json() | |
title = result["choices"][0]["message"]["content"].strip() | |
# Clean up the title | |
title = title.replace('"', '').replace("Title:", "").strip() | |
# Limit length | |
if len(title) > 40: | |
title = title[:40] + "..." | |
return title if title else "New Chat" | |
except Exception as e: | |
# If anything fails, use fallback | |
pass | |
# Final fallback | |
for msg in messages: | |
if msg["role"] == "user": | |
content = msg["content"] | |
if len(content) > 30: | |
return content[:30] + "..." | |
return content | |
return "New Chat" | |
def save_current_session(): | |
"""Save current chat session with smart AI-generated title""" | |
if not st.session_state.messages: | |
return | |
sessions = load_chat_sessions() | |
session_id = get_session_id() | |
# Generate smart title only if we have meaningful conversation | |
# (at least one user message and one assistant response) | |
user_messages = [msg for msg in st.session_state.messages if msg["role"] == "user"] | |
assistant_messages = [msg for msg in st.session_state.messages if msg["role"] == "assistant"] | |
if len(user_messages) >= 1 and len(assistant_messages) >= 1: | |
# We have a real conversation, generate smart title | |
title = get_chat_title(st.session_state.messages) | |
else: | |
# Just starting conversation, use simple title | |
title = "New Chat" | |
if user_messages: | |
first_message = user_messages[0]["content"] | |
if len(first_message) > 30: | |
title = first_message[:30] + "..." | |
else: | |
title = first_message | |
sessions[session_id] = { | |
"title": title, | |
"messages": st.session_state.messages, | |
"created_at": sessions.get(session_id, {}).get("created_at", datetime.now().isoformat()), | |
"updated_at": datetime.now().isoformat() | |
} | |
save_chat_sessions(sessions) | |
def load_session(session_id): | |
"""Load a specific chat session""" | |
sessions = load_chat_sessions() | |
if session_id in sessions: | |
st.session_state.messages = sessions[session_id]["messages"] | |
st.session_state.session_id = session_id | |
return True | |
return False | |
def delete_session(session_id): | |
"""Delete a chat session""" | |
sessions = load_chat_sessions() | |
if session_id in sessions: | |
del sessions[session_id] | |
save_chat_sessions(sessions) | |
return True | |
return False | |
def start_new_chat(): | |
"""Start a new chat session""" | |
# Save current session if it has messages | |
if st.session_state.messages: | |
save_current_session() | |
# Clear current chat and create new session | |
st.session_state.messages = [] | |
st.session_state.session_id = str(uuid.uuid4()) | |
# NEW: User tracking functions | |
def get_user_id(): | |
"""Get unique ID for this user session""" | |
if 'user_id' not in st.session_state: | |
st.session_state.user_id = str(uuid.uuid4())[ | |
:8] # Short ID for family use | |
return st.session_state.user_id | |
def update_online_users(): | |
"""Update that this user is online right now""" | |
try: | |
# Load current online users | |
users = {} | |
if os.path.exists(USERS_FILE): | |
with open(USERS_FILE, 'r') as f: | |
users = json.load(f) | |
# Add/update this user | |
user_id = get_user_id() | |
users[user_id] = { | |
'last_seen': datetime.now().isoformat(), | |
'name': f'User-{user_id}' # You can customize this | |
} | |
# Remove users not seen in last 5 minutes | |
current_time = datetime.now() | |
active_users = {} | |
for uid, data in users.items(): | |
last_seen = datetime.fromisoformat(data['last_seen']) | |
if current_time - last_seen < timedelta(minutes=5): | |
active_users[uid] = data | |
# Save updated list | |
with open(USERS_FILE, 'w') as f: | |
json.dump(active_users, f, indent=2) | |
return len(active_users) | |
except Exception: | |
return 1 # If error, assume at least you're online | |
def get_online_count(): | |
"""Get number of people currently online""" | |
try: | |
if not os.path.exists(USERS_FILE): | |
return 0 | |
with open(USERS_FILE, 'r') as f: | |
users = json.load(f) | |
# Check who's still active (last 5 minutes) | |
current_time = datetime.now() | |
active_count = 0 | |
for data in users.values(): | |
last_seen = datetime.fromisoformat(data['last_seen']) | |
if current_time - last_seen < timedelta(minutes=5): | |
active_count += 1 | |
return active_count | |
except Exception: | |
return 0 | |
# Initialize session state with saved history | |
if "messages" not in st.session_state: | |
st.session_state.messages = load_chat_history() | |
# Initialize session ID | |
if "session_id" not in st.session_state: | |
st.session_state.session_id = str(uuid.uuid4()) | |
# Get API key | |
OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY") | |
def check_api_status(): | |
if not OPENROUTER_API_KEY: | |
return "No API Key" | |
try: | |
url = "https://openrouter.ai/api/v1/models" | |
headers = {"Authorization": f"Bearer {OPENROUTER_API_KEY}"} | |
response = requests.get(url, headers=headers, timeout=10) | |
return "Connected" if response.status_code == 200 else "Error" | |
except: | |
return "Error" | |
def get_ai_response(messages, model="openai/gpt-3.5-turbo"): | |
if not OPENROUTER_API_KEY: | |
return "No API key found. Please add OPENROUTER_API_KEY to environment variables." | |
url = "https://openrouter.ai/api/v1/chat/completions" | |
headers = { | |
"Content-Type": "application/json", | |
"Authorization": f"Bearer {OPENROUTER_API_KEY}", | |
"HTTP-Referer": "http://localhost:8501", # Optional: Your site URL | |
"X-Title": "Streamlit AI Assistant" # Optional: Your app name | |
} | |
# Create system message and user messages | |
api_messages = [ | |
{"role": "system", "content": "You are a helpful AI assistant. Provide clear and helpful responses."}] | |
api_messages.extend(messages) | |
data = { | |
"model": model, | |
"messages": api_messages, | |
"stream": True, | |
"max_tokens": 2000, | |
"temperature": 0.7, | |
"top_p": 1, | |
"frequency_penalty": 0, | |
"presence_penalty": 0 | |
} | |
try: | |
response = requests.post(url, headers=headers, | |
json=data, stream=True, timeout=60) | |
# Better error handling | |
if response.status_code != 200: | |
error_detail = "" | |
try: | |
error_data = response.json() | |
error_detail = error_data.get('error', {}).get( | |
'message', f"HTTP {response.status_code}") | |
except: | |
error_detail = f"HTTP {response.status_code}: {response.reason}" | |
yield f"API Error: {error_detail}. Please try a different model or check your API key." | |
return | |
full_response = "" | |
buffer = "" | |
# Using your working streaming logic | |
for line in response.iter_lines(): | |
if line: | |
# The server sends lines starting with "data: ..." | |
if line.startswith(b"data: "): | |
data_str = line[len(b"data: "):].decode("utf-8") | |
if data_str.strip() == "[DONE]": | |
break | |
try: | |
data = json.loads(data_str) | |
delta = data["choices"][0]["delta"].get("content", "") | |
if delta: | |
full_response += delta | |
yield full_response | |
except json.JSONDecodeError: | |
continue | |
except (KeyError, IndexError): | |
continue | |
except requests.exceptions.Timeout: | |
yield "Request timed out. Please try again with a shorter message or different model." | |
except requests.exceptions.ConnectionError: | |
yield "Connection error. Please check your internet connection and try again." | |
except requests.exceptions.RequestException as e: | |
yield f"Request error: {str(e)}. Please try again." | |
except Exception as e: | |
yield f"Unexpected error: {str(e)}. Please try again or contact support." | |
# Header | |
st.title("Chat Flow π·") | |
st.caption("10 powerful Models, one simple chat.") | |
# Sidebar with Chat History | |
with st.sidebar: | |
# NEW: Chat History Section at the top | |
st.header("π¬ Chat History") | |
# New Chat Button | |
if st.button("β New Chat", use_container_width=True, type="primary"): | |
start_new_chat() | |
st.rerun() | |
st.divider() | |
# Load and display chat sessions | |
sessions = load_chat_sessions() | |
current_session_id = get_session_id() | |
if sessions: | |
st.subheader("Previous Chats") | |
# Sort sessions by updated_at (most recent first) | |
sorted_sessions = sorted( | |
sessions.items(), | |
key=lambda x: x[1].get("updated_at", x[1].get("created_at", "")), | |
reverse=True | |
) | |
for session_id, session_data in sorted_sessions: | |
# Create container for each chat item | |
chat_container = st.container() | |
with chat_container: | |
# Show current chat with different styling | |
if session_id == current_session_id: | |
st.markdown(f"πΉ **{session_data['title']}**") | |
else: | |
col_load, col_delete = st.columns([3, 1]) | |
with col_load: | |
if st.button( | |
f"π {session_data['title']}", | |
key=f"load_{session_id}", | |
use_container_width=True | |
): | |
# Save current session before switching | |
if st.session_state.messages: | |
save_current_session() | |
# Load selected session | |
load_session(session_id) | |
st.rerun() | |
with col_delete: | |
if st.button("ποΈ", key=f"delete_{session_id}"): | |
delete_session(session_id) | |
# If deleted session was current, start new chat | |
if session_id == current_session_id: | |
start_new_chat() | |
st.rerun() | |
# Show session info | |
if "updated_at" in session_data: | |
update_time = datetime.fromisoformat(session_data["updated_at"]) | |
st.caption(f"Updated: {update_time.strftime('%m/%d %H:%M')}") | |
st.markdown("---") | |
else: | |
st.info("No previous chats yet") | |
# Auto-save current session periodically | |
if st.session_state.messages: | |
save_current_session() | |
# Auto-refresh the sidebar every few seconds to show latest sessions | |
if st.button("π", help="Refresh chat list", use_container_width=False): | |
st.rerun() | |
st.divider() | |
# Settings Section | |
st.header("Settings") | |
# API Status | |
status = check_api_status() | |
if status == "Connected": | |
st.success("π’ API Connected") | |
elif status == "No API Key": | |
st.error("No API Key") | |
else: | |
st.warning("Connection Issue") | |
st.divider() | |
# NEW: Live Users Section | |
st.header("π₯ Who's Online") | |
# Update that you're online | |
online_count = update_online_users() | |
# Show live count | |
if online_count == 1: | |
st.info("π’ Just you online") | |
else: | |
st.success(f"π’ {online_count} people online") | |
# Show your session | |
your_id = get_user_id() | |
st.caption(f"You: User-{your_id}") | |
# Quick refresh button | |
if st.button("Refresh", use_container_width=True): | |
st.rerun() | |
# === NEW: DEBUG SECTION === | |
with st.expander("π Debug Info"): | |
if os.path.exists(USERS_FILE): | |
with open(USERS_FILE, 'r') as f: | |
users = json.load(f) | |
st.write(f"Users in file: {len(users)}") | |
for uid, data in users.items(): | |
last_seen_time = datetime.fromisoformat(data['last_seen']) | |
time_ago = datetime.now() - last_seen_time | |
minutes_ago = int(time_ago.total_seconds() / 60) | |
st.write(f"- {uid}: {minutes_ago} min ago") | |
else: | |
st.write("No users file yet") | |
# === END DEBUG SECTION === | |
st.divider() | |
# All models including new ones | |
models = [ | |
("GPT-3.5 Turbo", "openai/gpt-3.5-turbo"), | |
("LLaMA 3.1 8B", "meta-llama/llama-3.1-8b-instruct"), | |
("LLaMA 3.1 70B", "meta-llama/llama-3.1-70b-instruct"), | |
("DeepSeek Chat v3", "deepseek/deepseek-chat-v3-0324:free"), | |
("DeepSeek R1", "deepseek/deepseek-r1-0528:free"), | |
("Qwen3 Coder", "qwen/qwen3-coder:free"), | |
("Microsoft MAI DS R1", "microsoft/mai-ds-r1:free"), | |
("Gemma 3 27B", "google/gemma-3-27b-it:free"), | |
("Gemma 3 4B", "google/gemma-3-4b-it:free"), | |
("Auto (Best Available)", "openrouter/auto") | |
] | |
model_names = [name for name, _ in models] | |
model_ids = [model_id for _, model_id in models] | |
selected_index = st.selectbox("Model", range(len(model_names)), | |
format_func=lambda x: model_names[x], | |
index=0) | |
selected_model = model_ids[selected_index] | |
# Show selected model ID in green | |
st.markdown( | |
f"**Model ID:** <span class='model-id'>{selected_model}</span>", unsafe_allow_html=True) | |
st.divider() | |
# Chat History Controls | |
st.header("Chat History") | |
# Show number of messages | |
if st.session_state.messages: | |
st.info(f"Messages stored: {len(st.session_state.messages)}") | |
# Auto-save toggle | |
auto_save = st.checkbox("Auto-save messages", value=True) | |
# Manual save/load buttons | |
col1, col2 = st.columns(2) | |
with col1: | |
if st.button("Save History", use_container_width=True): | |
save_chat_history(st.session_state.messages) | |
st.success("History saved!") | |
with col2: | |
if st.button("Load History", use_container_width=True): | |
st.session_state.messages = load_chat_history() | |
st.success("History loaded!") | |
st.rerun() | |
st.divider() | |
# View History | |
if st.button("View History File", use_container_width=True): | |
if os.path.exists(HISTORY_FILE): | |
with open(HISTORY_FILE, 'r', encoding='utf-8') as f: | |
history_content = f.read() | |
st.text_area("Chat History (JSON)", history_content, height=200) | |
else: | |
st.warning("No history file found") | |
# Download History | |
if os.path.exists(HISTORY_FILE): | |
with open(HISTORY_FILE, 'rb') as f: | |
st.download_button( | |
label="Download History", | |
data=f.read(), | |
file_name=f"chat_history_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", | |
mime="application/json", | |
use_container_width=True | |
) | |
st.divider() | |
# Clear controls | |
if st.button("Clear Chat", use_container_width=True, type="secondary"): | |
clear_chat_history() | |
st.success("Chat cleared!") | |
st.rerun() | |
# Show welcome message when no messages | |
# Display chat messages | |
for message in st.session_state.messages: | |
with st.chat_message(message["role"]): | |
# Check if this is an assistant message with attribution | |
if message["role"] == "assistant" and "Response created by:" in message["content"]: | |
# Split content and attribution | |
parts = message["content"].split("\n\n---\n*Response created by:") | |
main_content = parts[0] | |
if len(parts) > 1: | |
model_name = parts[1].replace("***", "").replace("**", "") | |
st.markdown(main_content) | |
st.markdown( | |
f"<div class='model-attribution'>Response created by: <strong>{model_name}</strong></div>", unsafe_allow_html=True) | |
else: | |
st.markdown(message["content"]) | |
else: | |
st.markdown(message["content"]) | |
# Chat input - MUST be at the main level, not inside sidebar or columns | |
if prompt := st.chat_input("Chat Smarter. Chat many Brains"): | |
# NEW: Update online status when user sends message | |
update_online_users() | |
# Add user message | |
user_message = {"role": "user", "content": prompt} | |
st.session_state.messages.append(user_message) | |
# Auto-save if enabled | |
if 'auto_save' not in locals(): | |
auto_save = True # Default value if not set in sidebar | |
if auto_save: | |
save_chat_history(st.session_state.messages) | |
# ALWAYS auto-save the current session after each user message | |
save_current_session() | |
# Display user message | |
with st.chat_message("user"): | |
st.markdown(prompt) | |
# Get AI response | |
with st.chat_message("assistant"): | |
placeholder = st.empty() | |
full_response = "" | |
try: | |
for response in get_ai_response(st.session_state.messages, selected_model): | |
full_response = response | |
placeholder.markdown(full_response + "β") | |
# Remove cursor and show final response | |
placeholder.markdown(full_response) | |
except Exception as e: | |
error_msg = f"An error occurred: {str(e)}" | |
placeholder.markdown(error_msg) | |
full_response = error_msg | |
# Add AI response to messages with attribution | |
full_response_with_attribution = full_response + \ | |
f"\n\n---\n*Response created by: **{model_names[selected_index]}***" | |
assistant_message = {"role": "assistant", | |
"content": full_response_with_attribution} | |
st.session_state.messages.append(assistant_message) | |
# Auto-save if enabled | |
if auto_save: | |
save_chat_history(st.session_state.messages) | |
# ALWAYS auto-save the current session after each AI response | |
save_current_session() | |
# Show currently using model | |
st.caption(f"Currently using: **{model_names[selected_index]}**") |