Spaces:
Running
Running
import requests | |
import os | |
import json | |
import streamlit as st | |
from datetime import datetime, timedelta | |
import time | |
import uuid | |
# Page configuration | |
st.set_page_config( | |
page_title="Chat Flow π·", | |
page_icon="π¬", | |
initial_sidebar_state="expanded" | |
) | |
# Enhanced CSS with chat history styling and BLACK NEW CHAT BUTTON | |
st.markdown(""" | |
<style> | |
.stApp { | |
background: white; | |
} | |
.main .block-container { | |
max-width: 800px; | |
} | |
#MainMenu {visibility: hidden;} | |
footer {visibility: hidden;} | |
header {visibility: hidden;} | |
.stDeployButton {display: none;} | |
.model-id { | |
color: #28a745; | |
font-family: monospace; | |
} | |
.model-attribution { | |
color: #28a745; | |
font-size: 0.8em; | |
font-style: italic; | |
} | |
/* NEW CHAT BUTTON - Black background, white text */ | |
.stButton > button[kind="primary"] { | |
background-color: #000000 !important; | |
border-color: #000000 !important; | |
color: #ffffff !important; | |
} | |
.stButton > button[kind="primary"]:hover { | |
background-color: #333333 !important; | |
border-color: #333333 !important; | |
color: #ffffff !important; | |
} | |
.stButton > button[kind="primary"]:active { | |
background-color: #1a1a1a !important; | |
border-color: #1a1a1a !important; | |
color: #ffffff !important; | |
} | |
.stButton > button[kind="primary"]:focus { | |
background-color: #000000 !important; | |
border-color: #000000 !important; | |
color: #ffffff !important; | |
box-shadow: 0 0 0 0.2rem rgba(0, 0, 0, 0.25) !important; | |
} | |
/* Chat history styling */ | |
.chat-history-item { | |
padding: 8px 12px; | |
margin: 4px 0; | |
border-radius: 8px; | |
border: 1px solid #e0e0e0; | |
background: #f8f9fa; | |
cursor: pointer; | |
transition: all 0.2s; | |
} | |
.chat-history-item:hover { | |
background: #e9ecef; | |
border-color: #28a745; | |
} | |
.chat-history-item.active { | |
background: #28a745; | |
color: white; | |
border-color: #28a745; | |
} | |
.chat-title { | |
font-weight: 500; | |
font-size: 0.9em; | |
margin-bottom: 2px; | |
} | |
.chat-date { | |
font-size: 0.75em; | |
opacity: 0.7; | |
} | |
.new-chat-btn { | |
width: 100%; | |
margin-bottom: 16px; | |
} | |
</style> | |
""", unsafe_allow_html=True) | |
# File to store chat history | |
HISTORY_FILE = "chat_history.json" | |
USERS_FILE = "online_users.json" | |
SESSIONS_FILE = "chat_sessions.json" | |
# ================= USER FUNCTIONS ================= | |
def get_user_id(): | |
"""Get unique ID for this user session""" | |
if 'user_id' not in st.session_state: | |
st.session_state.user_id = str(uuid.uuid4())[:8] | |
return st.session_state.user_id | |
def update_online_users(): | |
"""Update user status""" | |
try: | |
# Load current users | |
users = {} | |
if os.path.exists(USERS_FILE): | |
with open(USERS_FILE, 'r') as f: | |
users = json.load(f) | |
user_id = get_user_id() | |
# Update user info | |
users[user_id] = { | |
'last_seen': datetime.now().isoformat(), | |
'name': f'User-{user_id}', | |
'session_start': users.get(user_id, {}).get('session_start', datetime.now().isoformat()) | |
} | |
# Clean up old users (not seen in 5 minutes) | |
current_time = datetime.now() | |
active_users = {} | |
for uid, data in users.items(): | |
try: | |
last_seen = datetime.fromisoformat(data['last_seen']) | |
if current_time - last_seen < timedelta(minutes=5): | |
active_users[uid] = data | |
except: | |
continue | |
# Save updated users | |
with open(USERS_FILE, 'w') as f: | |
json.dump(active_users, f, indent=2) | |
return len(active_users) | |
except Exception as e: | |
st.error(f"User tracking error: {e}") | |
return 1 | |
def show_online_users(): | |
"""Display online users count""" | |
st.header("π₯ Who's Online") | |
try: | |
if not os.path.exists(USERS_FILE): | |
st.info("No user data yet") | |
return 0 | |
with open(USERS_FILE, 'r') as f: | |
users = json.load(f) | |
if not users: | |
st.info("No active users") | |
return 0 | |
online_count = len(users) | |
# Show count | |
if online_count == 1: | |
st.success("π’ Just you online") | |
else: | |
st.success(f"π’ {online_count} people online") | |
st.divider() | |
# Show each user | |
current_user_id = get_user_id() | |
for user_id, data in users.items(): | |
is_current_user = (user_id == current_user_id) | |
# User header | |
if is_current_user: | |
st.markdown("**π€ You**") | |
else: | |
st.markdown(f"**π€ {data.get('name', user_id)}**") | |
# Show session info | |
try: | |
session_start = datetime.fromisoformat(data['session_start']) | |
duration = datetime.now() - session_start | |
minutes = int(duration.total_seconds() / 60) | |
st.caption(f"π Online for {minutes} minutes") | |
except: | |
st.caption("π Session time unknown") | |
st.divider() | |
return online_count | |
except Exception as e: | |
st.error(f"Error showing users: {e}") | |
return 0 | |
# ================= CHAT FUNCTIONS ================= | |
def load_chat_history(): | |
"""Load chat history from file""" | |
try: | |
if os.path.exists(HISTORY_FILE): | |
with open(HISTORY_FILE, 'r', encoding='utf-8') as f: | |
return json.load(f) | |
except Exception as e: | |
st.error(f"Error loading chat history: {e}") | |
return [] | |
def save_chat_history(messages): | |
"""Save chat history to file""" | |
try: | |
with open(HISTORY_FILE, 'w', encoding='utf-8') as f: | |
json.dump(messages, f, ensure_ascii=False, indent=2) | |
except Exception as e: | |
st.error(f"Error saving chat history: {e}") | |
def clear_chat_history(): | |
"""Clear chat history file""" | |
try: | |
if os.path.exists(HISTORY_FILE): | |
os.remove(HISTORY_FILE) | |
st.session_state.messages = [] | |
except Exception as e: | |
st.error(f"Error clearing chat history: {e}") | |
def load_chat_sessions(): | |
"""Load all chat sessions""" | |
try: | |
if os.path.exists(SESSIONS_FILE): | |
with open(SESSIONS_FILE, 'r', encoding='utf-8') as f: | |
return json.load(f) | |
except Exception as e: | |
st.error(f"Error loading chat sessions: {e}") | |
return {} | |
def save_chat_sessions(sessions): | |
"""Save chat sessions to file""" | |
try: | |
with open(SESSIONS_FILE, 'w', encoding='utf-8') as f: | |
json.dump(sessions, f, ensure_ascii=False, indent=2) | |
except Exception as e: | |
st.error(f"Error saving chat sessions: {e}") | |
def get_session_id(): | |
"""Get or create session ID""" | |
if 'session_id' not in st.session_state: | |
st.session_state.session_id = str(uuid.uuid4()) | |
return st.session_state.session_id | |
def get_chat_title(messages): | |
"""Generate a title for the chat based on conversation content using AI""" | |
if not messages: | |
return "New Chat" | |
if len(messages) <= 1: | |
for msg in messages: | |
if msg["role"] == "user": | |
content = msg["content"] | |
if len(content) > 30: | |
return content[:30] + "..." | |
return content | |
return "New Chat" | |
try: | |
return generate_smart_title(messages) | |
except: | |
for msg in messages: | |
if msg["role"] == "user": | |
content = msg["content"] | |
if len(content) > 30: | |
return content[:30] + "..." | |
return content | |
return "New Chat" | |
def generate_smart_title(messages): | |
"""Use AI to generate a smart title for the conversation""" | |
if not OPENROUTER_API_KEY: | |
for msg in messages: | |
if msg["role"] == "user": | |
content = msg["content"] | |
if len(content) > 30: | |
return content[:30] + "..." | |
return content | |
return "New Chat" | |
conversation_text = "" | |
message_count = 0 | |
for msg in messages: | |
if message_count >= 6: | |
break | |
if msg["role"] in ["user", "assistant"]: | |
role = "User" if msg["role"] == "user" else "Assistant" | |
content = msg["content"] | |
if "Response created by:" in content: | |
content = content.split("\n\n---\n*Response created by:")[0] | |
conversation_text += f"{role}: {content[:200]}...\n" | |
message_count += 1 | |
title_prompt = f"""Based on this conversation, generate a short, descriptive title (2-5 words max): | |
{conversation_text} | |
Generate only a brief title that captures the main topic. Examples: | |
- "Python Code Help" | |
- "Recipe Ideas" | |
- "Travel Planning" | |
- "Math Problem" | |
- "Writing Assistance" | |
Title:""" | |
url = "https://openrouter.ai/api/v1/chat/completions" | |
headers = { | |
"Content-Type": "application/json", | |
"Authorization": f"Bearer {OPENROUTER_API_KEY}", | |
"HTTP-Referer": "http://localhost:8501", | |
"X-Title": "Streamlit AI Assistant" | |
} | |
data = { | |
"model": "openai/gpt-3.5-turbo", | |
"messages": [{"role": "user", "content": title_prompt}], | |
"max_tokens": 20, | |
"temperature": 0.3, | |
"stream": False | |
} | |
try: | |
response = requests.post(url, headers=headers, json=data, timeout=10) | |
if response.status_code == 200: | |
result = response.json() | |
title = result["choices"][0]["message"]["content"].strip() | |
title = title.replace('"', '').replace("Title:", "").strip() | |
if len(title) > 40: | |
title = title[:40] + "..." | |
return title if title else "New Chat" | |
except Exception as e: | |
pass | |
for msg in messages: | |
if msg["role"] == "user": | |
content = msg["content"] | |
if len(content) > 30: | |
return content[:30] + "..." | |
return content | |
return "New Chat" | |
def save_current_session(): | |
"""Save current chat session with smart AI-generated title""" | |
if not st.session_state.messages: | |
return | |
sessions = load_chat_sessions() | |
session_id = get_session_id() | |
user_messages = [msg for msg in st.session_state.messages if msg["role"] == "user"] | |
assistant_messages = [msg for msg in st.session_state.messages if msg["role"] == "assistant"] | |
if len(user_messages) >= 1 and len(assistant_messages) >= 1: | |
title = get_chat_title(st.session_state.messages) | |
else: | |
title = "New Chat" | |
if user_messages: | |
first_message = user_messages[0]["content"] | |
if len(first_message) > 30: | |
title = first_message[:30] + "..." | |
else: | |
title = first_message | |
sessions[session_id] = { | |
"title": title, | |
"messages": st.session_state.messages, | |
"created_at": sessions.get(session_id, {}).get("created_at", datetime.now().isoformat()), | |
"updated_at": datetime.now().isoformat() | |
} | |
save_chat_sessions(sessions) | |
def load_session(session_id): | |
"""Load a specific chat session""" | |
sessions = load_chat_sessions() | |
if session_id in sessions: | |
st.session_state.messages = sessions[session_id]["messages"] | |
st.session_state.session_id = session_id | |
return True | |
return False | |
def delete_session(session_id): | |
"""Delete a chat session""" | |
sessions = load_chat_sessions() | |
if session_id in sessions: | |
del sessions[session_id] | |
save_chat_sessions(sessions) | |
return True | |
return False | |
def start_new_chat(): | |
"""Start a new chat session""" | |
if st.session_state.messages: | |
save_current_session() | |
st.session_state.messages = [] | |
st.session_state.session_id = str(uuid.uuid4()) | |
# Initialize session state | |
if "messages" not in st.session_state: | |
st.session_state.messages = load_chat_history() | |
if "session_id" not in st.session_state: | |
st.session_state.session_id = str(uuid.uuid4()) | |
# Get API key | |
OPENROUTER_API_KEY = os.environ.get("OPENROUTER_API_KEY") | |
def check_api_status(): | |
if not OPENROUTER_API_KEY: | |
return "No API Key" | |
try: | |
url = "https://openrouter.ai/api/v1/models" | |
headers = {"Authorization": f"Bearer {OPENROUTER_API_KEY}"} | |
response = requests.get(url, headers=headers, timeout=10) | |
return "Connected" if response.status_code == 200 else "Error" | |
except: | |
return "Error" | |
def get_ai_response(messages, model="openai/gpt-3.5-turbo"): | |
if not OPENROUTER_API_KEY: | |
return "No API key found. Please add OPENROUTER_API_KEY to environment variables." | |
url = "https://openrouter.ai/api/v1/chat/completions" | |
headers = { | |
"Content-Type": "application/json", | |
"Authorization": f"Bearer {OPENROUTER_API_KEY}", | |
"HTTP-Referer": "http://localhost:8501", | |
"X-Title": "Streamlit AI Assistant" | |
} | |
api_messages = [ | |
{"role": "system", "content": "You are a helpful AI assistant. Provide clear and helpful responses."}] | |
api_messages.extend(messages) | |
data = { | |
"model": model, | |
"messages": api_messages, | |
"stream": True, | |
"max_tokens": 2000, | |
"temperature": 0.7, | |
"top_p": 1, | |
"frequency_penalty": 0, | |
"presence_penalty": 0 | |
} | |
try: | |
response = requests.post(url, headers=headers, json=data, stream=True, timeout=60) | |
if response.status_code != 200: | |
error_detail = "" | |
try: | |
error_data = response.json() | |
error_detail = error_data.get('error', {}).get('message', f"HTTP {response.status_code}") | |
except: | |
error_detail = f"HTTP {response.status_code}: {response.reason}" | |
yield f"API Error: {error_detail}. Please try a different model or check your API key." | |
return | |
full_response = "" | |
for line in response.iter_lines(): | |
if line: | |
if line.startswith(b"data: "): | |
data_str = line[len(b"data: "):].decode("utf-8") | |
if data_str.strip() == "[DONE]": | |
break | |
try: | |
data = json.loads(data_str) | |
delta = data["choices"][0]["delta"].get("content", "") | |
if delta: | |
full_response += delta | |
yield full_response | |
except (json.JSONDecodeError, KeyError, IndexError): | |
continue | |
except requests.exceptions.Timeout: | |
yield "Request timed out. Please try again with a shorter message or different model." | |
except requests.exceptions.ConnectionError: | |
yield "Connection error. Please check your internet connection and try again." | |
except requests.exceptions.RequestException as e: | |
yield f"Request error: {str(e)}. Please try again." | |
except Exception as e: | |
yield f"Unexpected error: {str(e)}. Please try again or contact support." | |
# ================= MAIN APP ================= | |
# Header | |
st.title("Chat Flow π·") | |
st.caption("10 powerful Models, one simple chat.") | |
# Sidebar | |
with st.sidebar: | |
# New Chat Button (BLACK) | |
if st.button("β New Chat", use_container_width=True, type="primary"): | |
start_new_chat() | |
st.rerun() | |
st.divider() | |
# ONLINE USERS SECTION | |
online_count = show_online_users() | |
# Update user tracking | |
update_online_users() | |
# Quick refresh for users | |
if st.button("π Refresh Users", use_container_width=True): | |
st.rerun() | |
st.divider() | |
# Chat Sessions | |
sessions = load_chat_sessions() | |
current_session_id = get_session_id() | |
if sessions: | |
st.subheader("Previous Chats") | |
sorted_sessions = sorted(sessions.items(), key=lambda x: x[1].get("updated_at", x[1].get("created_at", "")), reverse=True) | |
for session_id, session_data in sorted_sessions: | |
if session_id == current_session_id: | |
st.markdown(f"πΉ **{session_data['title']}**") | |
else: | |
col_load, col_delete = st.columns([3, 1]) | |
with col_load: | |
if st.button(f"π {session_data['title']}", key=f"load_{session_id}", use_container_width=True): | |
if st.session_state.messages: | |
save_current_session() | |
load_session(session_id) | |
st.rerun() | |
with col_delete: | |
if st.button("β", key=f"delete_{session_id}"): | |
delete_session(session_id) | |
if session_id == current_session_id: | |
start_new_chat() | |
st.rerun() | |
if "updated_at" in session_data: | |
update_time = datetime.fromisoformat(session_data["updated_at"]) | |
st.caption(f"Updated: {update_time.strftime('%m/%d %H:%M')}") | |
st.markdown("---") | |
else: | |
st.info("No previous chats yet") | |
if st.session_state.messages: | |
save_current_session() | |
st.divider() | |
# Settings Section | |
st.header("Settings") | |
status = check_api_status() | |
if status == "Connected": | |
st.success("π’ API Connected") | |
elif status == "No API Key": | |
st.error("No API Key") | |
else: | |
st.warning("Connection Issue") | |
st.divider() | |
# Model Selection | |
models = [ | |
("GPT-3.5 Turbo", "openai/gpt-3.5-turbo"), | |
("LLaMA 3.1 8B", "meta-llama/llama-3.1-8b-instruct"), | |
("LLaMA 3.1 70B", "meta-llama/llama-3.1-70b-instruct"), | |
("DeepSeek Chat v3", "deepseek/deepseek-chat-v3-0324:free"), | |
("DeepSeek R1", "deepseek/deepseek-r1-0528:free"), | |
("Qwen3 Coder", "qwen/qwen3-coder:free"), | |
("Microsoft MAI DS R1", "microsoft/mai-ds-r1:free"), | |
("Gemma 3 27B", "google/gemma-3-27b-it:free"), | |
("Gemma 3 4B", "google/gemma-3-4b-it:free"), | |
("Auto (Best Available)", "openrouter/auto") | |
] | |
model_names = [name for name, _ in models] | |
model_ids = [model_id for _, model_id in models] | |
selected_index = st.selectbox("Model", range(len(model_names)), | |
format_func=lambda x: model_names[x], | |
index=0) | |
selected_model = model_ids[selected_index] | |
# Show selected model ID in green | |
st.markdown(f"**Model ID:** <span class='model-id'>{selected_model}</span>", unsafe_allow_html=True) | |
st.divider() | |
# Chat History Controls | |
st.header("Chat History") | |
# Show number of messages | |
if st.session_state.messages: | |
st.info(f"Messages stored: {len(st.session_state.messages)}") | |
# Auto-save toggle | |
auto_save = st.checkbox("Auto-save messages", value=True) | |
# Manual save/load buttons | |
col1, col2 = st.columns(2) | |
with col1: | |
if st.button("Save History", use_container_width=True): | |
save_chat_history(st.session_state.messages) | |
st.success("History saved!") | |
with col2: | |
if st.button("Load History", use_container_width=True): | |
st.session_state.messages = load_chat_history() | |
st.success("History loaded!") | |
st.rerun() | |
st.divider() | |
# View History | |
if st.button("View History File", use_container_width=True): | |
if os.path.exists(HISTORY_FILE): | |
with open(HISTORY_FILE, 'r', encoding='utf-8') as f: | |
history_content = f.read() | |
st.text_area("Chat History (JSON)", history_content, height=200) | |
else: | |
st.warning("No history file found") | |
# Download History | |
if os.path.exists(HISTORY_FILE): | |
with open(HISTORY_FILE, 'rb') as f: | |
st.download_button( | |
label="Download History", | |
data=f.read(), | |
file_name=f"chat_history_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json", | |
mime="application/json", | |
use_container_width=True | |
) | |
st.divider() | |
# Clear controls | |
if st.button("Clear Chat", use_container_width=True, type="secondary"): | |
clear_chat_history() | |
st.success("Chat cleared!") | |
st.rerun() | |
# ================= MAIN CHAT AREA ================= | |
# Display chat messages | |
for message in st.session_state.messages: | |
with st.chat_message(message["role"]): | |
# Check if this is an assistant message with attribution | |
if message["role"] == "assistant" and "Response created by:" in message["content"]: | |
# Split content and attribution | |
parts = message["content"].split("\n\n---\n*Response created by:") | |
main_content = parts[0] | |
if len(parts) > 1: | |
model_name = parts[1].replace("***", "").replace("**", "") | |
st.markdown(main_content) | |
st.markdown( | |
f"<div class='model-attribution'>Response created by: <strong>{model_name}</strong></div>", | |
unsafe_allow_html=True) | |
else: | |
st.markdown(message["content"]) | |
else: | |
st.markdown(message["content"]) | |
# Chat input - MAIN CHAT FUNCTIONALITY | |
if prompt := st.chat_input("Chat Smarter. Chat many Brains"): | |
# Update user tracking when user sends message | |
update_online_users() | |
# Add user message | |
user_message = {"role": "user", "content": prompt} | |
st.session_state.messages.append(user_message) | |
# Auto-save if enabled | |
if 'auto_save' not in locals(): | |
auto_save = True | |
if auto_save: | |
save_chat_history(st.session_state.messages) | |
# Always auto-save the current session | |
save_current_session() | |
# Display user message | |
with st.chat_message("user"): | |
st.markdown(prompt) | |
# Get AI response | |
with st.chat_message("assistant"): | |
placeholder = st.empty() | |
full_response = "" | |
try: | |
for response in get_ai_response(st.session_state.messages, selected_model): | |
full_response = response | |
placeholder.markdown(full_response + "β") | |
# Remove cursor and show final response | |
placeholder.markdown(full_response) | |
except Exception as e: | |
error_msg = f"An error occurred: {str(e)}" | |
placeholder.markdown(error_msg) | |
full_response = error_msg | |
# Add AI response to messages with attribution | |
full_response_with_attribution = full_response + f"\n\n---\n*Response created by: **{model_names[selected_index]}***" | |
assistant_message = {"role": "assistant", "content": full_response_with_attribution} | |
st.session_state.messages.append(assistant_message) | |
# Auto-save if enabled | |
if auto_save: | |
save_chat_history(st.session_state.messages) | |
# Always auto-save the current session | |
save_current_session() | |
# Show currently using model | |
st.caption(f"Currently using: **{model_names[selected_index]}**") |