awacke1's picture
Create app.py
87fbb6b verified
raw
history blame
24 kB
import streamlit as st
import asyncio
import websockets
import uuid
import argparse
from datetime import datetime
import os
import random
import time
import hashlib
from PIL import Image
import glob
import re
from urllib.parse import quote
# App Configuration
Site_Name = 'πŸ€–πŸ§ Chat & Quote NodeπŸ“πŸ”¬'
title = "πŸ€–πŸ§ Chat & Quote NodeπŸ“πŸ”¬"
icons = 'πŸ€–πŸ§ πŸ”¬πŸ“'
START_ROOM = "Sector 🌌"
st.set_page_config(
page_title=title,
page_icon=icons,
layout="wide",
initial_sidebar_state="auto"
)
# Fun usernames with emojis - the VIP list of quirky characters! πŸŽ‰πŸ˜œ
FUN_USERNAMES = [
"CosmicJester 🌌", "PixelPanda 🐼", "QuantumQuack πŸ¦†", "StellarSquirrel 🐿️",
"GizmoGuru βš™οΈ", "NebulaNinja 🌠", "ByteBuster πŸ’Ύ", "GalacticGopher 🌍",
"RocketRaccoon πŸš€", "EchoElf 🧝", "PhantomFox 🦊", "WittyWizard πŸ§™",
"LunarLlama πŸŒ™", "SolarSloth β˜€οΈ", "AstroAlpaca πŸ¦™", "CyberCoyote 🐺",
"MysticMoose 🦌", "GlitchGnome 🧚", "VortexViper 🐍", "ChronoChimp πŸ’"
]
# Directories for chat, votes, and media - the secret vaults where treasures are stashed! πŸ—„οΈπŸ”’
CHAT_DIR = "chat_logs"
VOTE_DIR = "vote_logs"
MEDIA_DIR = "media"
STATE_FILE = "user_state.txt"
os.makedirs(CHAT_DIR, exist_ok=True)
os.makedirs(VOTE_DIR, exist_ok=True)
os.makedirs(MEDIA_DIR, exist_ok=True)
# Persistent files - the grand tomes of chatter and votes! πŸ“–βœ¨
CHAT_FILE = os.path.join(CHAT_DIR, "global_chat.md")
QUOTE_VOTES_FILE = os.path.join(VOTE_DIR, "quote_votes.md")
IMAGE_VOTES_FILE = os.path.join(VOTE_DIR, "image_votes.md")
HISTORY_FILE = os.path.join(VOTE_DIR, "vote_history.md")
# Unicode digit conversion table - the magic map for emoji numbers! πŸ”’βœ¨
UNICODE_DIGITS = {
0: "0️⃣", 1: "1️⃣", 2: "2️⃣", 3: "3️⃣", 4: "4️⃣",
5: "5️⃣", 6: "6️⃣", 7: "7️⃣", 8: "8️⃣", 9: "9️⃣"
}
# Unicode font examples - the stylish wardrobe of text flair! πŸ‘—πŸ“
UNICODE_FONTS = [
("Normal", lambda x: x),
("Bold", lambda x: "".join(chr(ord(c) + 0x1D400 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D41A - 0x61) if 'a' <= c <= 'z' else c for c in x)),
("Italic", lambda x: "".join(chr(ord(c) + 0x1D434 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D44E - 0x61) if 'a' <= c <= 'z' else c for c in x)),
("Script", lambda x: "".join(chr(ord(c) + 0x1D49C - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D4B6 - 0x61) if 'a' <= c <= 'z' else c for c in x)),
("Fraktur", lambda x: "".join(chr(ord(c) + 0x1D504 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D51E - 0x61) if 'a' <= c <= 'z' else c for c in x)),
("Double Struck", lambda x: "".join(chr(ord(c) + 0x1D538 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D552 - 0x61) if 'a' <= c <= 'z' else c for c in x)),
("Sans Serif", lambda x: "".join(chr(ord(c) + 0x1D5A0 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D5BA - 0x61) if 'a' <= c <= 'z' else c for c in x)),
("Sans Bold", lambda x: "".join(chr(ord(c) + 0x1D5D4 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D5EE - 0x61) if 'a' <= c <= 'z' else c for c in x)),
("Mono", lambda x: "".join(chr(ord(c) + 0x1D670 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D68A - 0x61) if 'a' <= c <= 'z' else c for c in x)),
("Circle", lambda x: "".join(chr(ord(c) + 0x24B6 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x24D0 - 0x61) if 'a' <= c <= 'z' else c for c in x))
]
# Global WebSocket server flag
server_running = False
server_task = None
# Node name - the app’s codename generator, sneaky and slick! πŸ•΅οΈβ€β™‚οΈπŸ’Ύ
def get_node_name():
"""🎲 Spins the wheel of fate to name our node - a random alias or user pick! 🏷️"""
parser = argparse.ArgumentParser(description='Start a chat node with a specific name')
parser.add_argument('--node-name', type=str, default=None, help='Name for this chat node')
parser.add_argument('--port', type=int, default=8501, help='Port to run the Streamlit interface on')
args = parser.parse_args()
return args.node_name or f"node-{uuid.uuid4().hex[:8]}", args.port
# Chat saver - the scribe etching epic messages into the eternal scroll! πŸ–‹οΈπŸ“œ
def save_chat_entry(username, message):
"""πŸ–ŒοΈ Carves a chat line into the grand Markdown tome - history in the making! πŸ›οΈ"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
entry = f"[{timestamp}] {username}: {message}"
try:
with open(CHAT_FILE, 'a') as f:
f.write(f"{entry}\n")
return True
except Exception as e:
print(f"Oops! Failed to save chat: {e}")
return False
# Chat loader - the archaeologist unearthing the chat saga! β›οΈπŸ“š
def load_chat():
"""πŸ” Digs up the chat treasure from the filesystem - tales of old and new! πŸ’°"""
if not os.path.exists(CHAT_FILE):
with open(CHAT_FILE, 'w') as f:
f.write(f"# {START_ROOM} Chat\n\nWelcome to the cosmic hub - start chatting! 🎀\n")
try:
with open(CHAT_FILE, 'r') as f:
content = f.read()
lines = content.strip().split('\n')
numbered_content = "\n".join(f"{i+1}. {line}" for i, line in enumerate(lines) if line.strip())
return numbered_content
except Exception as e:
print(f"Chat load hiccup: {e}")
return f"# Error loading {START_ROOM} chat\nSomething went wonky! 😡"
# User list grabber - the social butterfly spotting all the chatters! πŸ¦‹πŸ‘₯
def get_user_list(chat_content):
"""πŸ‘€ Peeks at the chat to spot all the cool cats talking - who’s in the club? 🎸"""
users = set()
for line in chat_content.split('\n'):
if line.strip() and ': ' in line:
user = line.split(': ')[1].split(' ')[0]
users.add(user)
return sorted(list(users))
# Quote loader - the sage pulling wisdom from the ages! πŸ“œπŸ§™
def load_quotes(source="famous"):
"""πŸ“š Grabs a stack of wise words from famous folks or custom quips! πŸ—£οΈ"""
famous_quotes = [
"The true sign of intelligence is not knowledge but imagination. – Albert Einstein",
"I have not failed. I've just found 10,000 ways that won't work. – Thomas Edison",
"Innovation distinguishes between a leader and a follower. – Steve Jobs",
"Research is what I'm doing when I don't know what I'm doing. – Wernher von Braun",
"The only way to discover the limits of the possible is to go beyond them into the impossible. – Arthur C. Clarke",
"Success is a science; if you have the conditions, you get the result. – Oscar Wilde",
"An expert is a person who has made all the mistakes that can be made in a very narrow field. – Niels Bohr",
"The important thing is to not stop questioning. Curiosity has its own reason for existing. – Albert Einstein",
"The best way to predict the future is to invent it. – Alan Kay",
"If I have seen further it is by standing on the shoulders of Giants. – Isaac Newton",
"Logic will get you from A to B. Imagination will take you everywhere. – Albert Einstein",
"Imagination is more important than knowledge. Knowledge is limited. Imagination encircles the world. – Albert Einstein",
"Science is a way of thinking much more than it is a body of knowledge. – Carl Sagan",
"We cannot solve our problems with the same thinking we used when we created them. – Albert Einstein",
"The true method of knowledge is experiment. – William Blake",
"The scientist is not a person who gives the right answers, he's one who asks the right questions. – Claude Levi-Strauss",
"It's kind of fun to do the impossible. – Walt Disney",
"Any sufficiently advanced technology is indistinguishable from magic. – Arthur C. Clarke",
"Creativity is intelligence having fun. – Albert Einstein",
"To invent, you need a good imagination and a pile of junk. – Thomas Edison"
]
custom_quotes = [
"Every age unfolds a new lesson. Life's chapters evolve, each teaching us anew.",
"From infancy to twilight, our journey is painted in growth. Every stage shines with its own wisdom.",
"Love is the universal language, transcending boundaries and touching souls.",
"Through love, we find connection, unity, and the essence of existence."
]
quotes = famous_quotes if source == "famous" else custom_quotes
return quotes if quotes else ["No quotes available - check back later! πŸ“­"]
# Vote saver - the tally keeper counting thumbs up! πŸ‘πŸ“Š
def save_vote(file, item, user_hash):
"""✍️ Tallies a vote in the grand ledger - your opinion matters! πŸ—³οΈ"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
entry = f"[{timestamp}] {user_hash} voted for {item}"
try:
with open(file, 'a') as f:
f.write(f"{entry}\n")
with open(HISTORY_FILE, 'a') as f:
f.write(f"- {timestamp} - User {user_hash} voted for {item}\n")
return True
except Exception as e:
print(f"Vote save flop: {e}")
return False
# Vote loader - the scorekeeper tallying the crowd’s cheers! πŸŽ‰πŸ…
def load_votes(file):
"""πŸ“ˆ Counts the votes from the ledger - who’s winning the popularity contest? πŸ†"""
if not os.path.exists(file):
with open(file, 'w') as f:
f.write("# Vote Tally\n\nNo votes yet - get clicking! πŸ–±οΈ\n")
try:
with open(file, 'r') as f:
lines = f.read().strip().split('\n')
votes = {}
for line in lines[2:]: # Skip header
if line.strip() and 'voted for' in line:
item = line.split('voted for ')[1]
votes[item] = votes.get(item, 0) + 1
return votes
except Exception as e:
print(f"Vote load oopsie: {e}")
return {}
# User hash generator - the secret agent giving you a cool code! πŸ•΅οΈβ€β™‚οΈπŸ”‘
def generate_user_hash():
"""πŸ•΅οΈ Crafts a snazzy 8-digit ID badge - you’re in the club now! 🎟️"""
if 'user_hash' not in st.session_state:
session_id = str(random.getrandbits(128))
hash_object = hashlib.md5(session_id.encode())
st.session_state['user_hash'] = hash_object.hexdigest()[:8]
return st.session_state['user_hash']
# Username persistence - save and load across refreshes! πŸ’ΎπŸ”„
def save_username(username):
"""πŸ’Ύ Stashes your cosmic alias before the refresh void claims it! 🌌"""
try:
with open(STATE_FILE, 'w') as f:
f.write(username)
except Exception as e:
print(f"Failed to save username: {e}")
def load_username():
"""πŸ”„ Resurrects your cosmic alias from the refresh abyss! 🌠"""
if os.path.exists(STATE_FILE):
try:
with open(STATE_FILE, 'r') as f:
return f.read().strip()
except Exception as e:
print(f"Failed to load username: {e}")
return None
# Media HTML generators - IVA’s multimedia magic! πŸŽ₯πŸŽΆπŸ–ΌοΈ
def get_video_html(video_path, width="100%"):
"""🎬 Rolls out the red carpet for videos - autoplay and loop like a star! 🌟"""
video_url = f"data:video/mp4;base64,{base64.b64encode(open(video_path, 'rb').read()).decode()}"
return f'''
<video width="{width}" controls autoplay muted loop>
<source src="{video_url}" type="video/mp4">
Your browser does not support the video tag.
</video>
'''
def get_audio_html(audio_path, width="100%"):
"""🎢 Drops a beat with audio - your personal DJ in action! 🎧"""
audio_url = f"data:audio/mpeg;base64,{base64.b64encode(open(audio_path, 'rb').read()).decode()}"
return f'''
<audio controls style="width: {width};">
<source src="{audio_url}" type="audio/mpeg">
Your browser does not support the audio element.
</audio>
'''
active_connections = {}
# WebSocket handler - the bouncer at the Sector rave, keeping it hopping! πŸŽ‰πŸšͺ
async def websocket_handler(websocket, path):
"""🎧 Guards the cosmic gate, welcoming all to Sector and booting crashers! 🚨"""
try:
client_id = str(uuid.uuid4())
room_id = "chat" # Single room "Sector 🌌"
active_connections.setdefault(room_id, {})[client_id] = websocket
print(f"Client {client_id} joined the Sector party!")
# Auto-join message for every connection
username = st.session_state.get('username', random.choice(FUN_USERNAMES))
save_chat_entry("System 🌟", f"{username} has joined {START_ROOM}!")
async for message in websocket:
try:
parts = message.split('|', 1)
if len(parts) == 2:
username, content = parts
save_chat_entry(username, content)
await broadcast_message(f"{username}|{content}", room_id)
except Exception as e:
print(f"Message mishap: {e}")
await websocket.send(f"ERROR|Oops, bad message format! 😬")
except websockets.ConnectionClosed:
print(f"Client {client_id} bailed from Sector!")
save_chat_entry("System 🌟", f"{username} has left {START_ROOM}!")
finally:
if room_id in active_connections and client_id in active_connections[room_id]:
del active_connections[room_id][client_id]
if not active_connections[room_id]:
del active_connections[room_id]
# Broadcaster - the megaphone blasting Sector vibes to all! πŸ“£πŸŽΆ
async def broadcast_message(message, room_id):
"""πŸ“’ Shouts the latest Sector beat to every cosmic dancer - hear it loud! 🎡"""
if room_id in active_connections:
disconnected = []
for client_id, ws in active_connections[room_id].items():
try:
await ws.send(message)
except websockets.ConnectionClosed:
disconnected.append(client_id)
for client_id in disconnected:
del active_connections[room_id][client_id]
# WebSocket server runner - the DJ spinning up the Sector tunes once! 🎧πŸ”₯
async def run_websocket_server():
"""🌐 Cranks up the WebSocket jukebox once, keeping Sector rocking! 🎸"""
global server_running, server_task
if not server_running:
server = await websockets.serve(websocket_handler, '0.0.0.0', 8765)
print(f"WebSocket server jamming on ws://0.0.0.0:8765")
server_running = True
await server.wait_closed()
# Chat interface maker - the stage builder for our Sector extravaganza! 🎭🏟️
def create_streamlit_interface(initial_username):
"""πŸ–ŒοΈ Sets up the Sector stage with pulsing timers, voting, and IVA media flair! 🌟"""
# Custom CSS for a sleek chat box and timer
st.markdown("""
<style>
.chat-box {
font-family: monospace;
background: #1e1e1e;
color: #d4d4d4;
padding: 10px;
border-radius: 5px;
height: 300px;
overflow-y: auto;
}
.timer {
font-size: 24px;
color: #ffcc00;
text-align: center;
animation: pulse 1s infinite;
}
@keyframes pulse {
0% { transform: scale(1); }
50% { transform: scale(1.1); }
100% { transform: scale(1); }
}
</style>
""", unsafe_allow_html=True)
# Title and intro
st.title(f"{Site_Name}")
st.markdown(f"Welcome to {START_ROOM} - chat, vote, and enjoy IVA’s media magic! πŸŽ‰")
# Load or set username
saved_username = load_username()
if saved_username and saved_username in FUN_USERNAMES:
initial_username = saved_username
if 'username' not in st.session_state:
st.session_state.username = initial_username
save_chat_entry("System 🌟", f"{initial_username} has joined {START_ROOM}!")
save_username(st.session_state.username) # Persist before any refresh
# Session state for refresh rate and quote index
if 'refresh_rate' not in st.session_state:
st.session_state.refresh_rate = 5
if 'timer_start' not in st.session_state:
st.session_state.timer_start = time.time() # Autostart timer
if 'quote_index' not in st.session_state:
quotes = load_quotes("famous")
st.session_state.quote_index = random.randint(0, max(0, len(quotes) - 1)) if quotes else 0
if 'quote_source' not in st.session_state:
st.session_state.quote_source = "famous"
# Chat section
st.subheader(f"{START_ROOM} Chat πŸ’¬")
chat_content = load_chat()
chat_lines = chat_content.split('\n')
for i, line in enumerate(chat_lines):
if line.strip() and ': ' in line:
col1, col2 = st.columns([5, 1])
with col1:
st.markdown(line)
with col2:
if st.button(f"πŸ‘", key=f"chat_vote_{i}"):
user_hash = generate_user_hash()
save_vote(QUOTE_VOTES_FILE, line, user_hash)
st.session_state.timer_start = time.time()
save_username(st.session_state.username)
st.rerun()
user_list = get_user_list(chat_content)
new_username = st.selectbox("Switch User", user_list + [st.session_state.username], index=len(user_list))
if new_username != st.session_state.username:
save_chat_entry("System 🌟", f"{st.session_state.username} switched to {new_username} in {START_ROOM}!")
st.session_state.username = new_username
st.session_state.timer_start = time.time()
save_username(st.session_state.username)
message = st.text_input("Message", placeholder="Type your epic line here! ✍️")
if st.button("Send πŸš€") and message.strip():
save_chat_entry(st.session_state.username, message)
st.session_state.timer_start = time.time()
save_username(st.session_state.username)
st.rerun()
# Quote section
st.subheader("Quote of the Moment πŸ“")
quotes = load_quotes(st.session_state.quote_source)
if quotes:
st.session_state.quote_index = st.session_state.quote_index % len(quotes)
quote = quotes[st.session_state.quote_index]
col1, col2 = st.columns([5, 1])
with col1:
st.markdown(quote)
with col2:
if st.button("πŸ‘ Upvote", key="quote_vote"):
user_hash = generate_user_hash()
save_vote(QUOTE_VOTES_FILE, quote, user_hash)
st.session_state.timer_start = time.time()
save_username(st.session_state.username)
st.rerun()
if time.time() - st.session_state.timer_start > 10: # 10s quote refresh
st.session_state.quote_index = (st.session_state.quote_index + 1) % len(quotes)
st.session_state.quote_source = "custom" if st.session_state.quote_source == "famous" else "famous"
quotes = load_quotes(st.session_state.quote_source)
st.session_state.quote_index = st.session_state.quote_index % len(quotes) if quotes else 0
st.session_state.timer_start = time.time()
save_username(st.session_state.username)
st.rerun()
else:
st.markdown("No quotes available - check back later! πŸ“­")
# IVA Media Gallery
st.subheader("IVA Media Gallery 🎨🎢πŸŽ₯")
media_files = (
glob.glob(f"{MEDIA_DIR}/*.png") + glob.glob(f"{MEDIA_DIR}/*.jpg") + glob.glob(f"{MEDIA_DIR}/*.jpeg") +
glob.glob(f"{MEDIA_DIR}/*.mp3") + glob.glob(f"{MEDIA_DIR}/*.wav") +
glob.glob(f"{MEDIA_DIR}/*.mp4")
)
if media_files:
media_cols = st.slider("Gallery Columns", min_value=1, max_value=5, value=3)
cols = st.columns(media_cols)
for idx, media_file in enumerate(media_files):
with cols[idx % media_cols]:
if media_file.endswith(('.png', '.jpg', '.jpeg')):
st.image(media_file, use_container_width=True)
elif media_file.endswith(('.mp3', '.wav')):
st.markdown(get_audio_html(media_file, width="100%"), unsafe_allow_html=True)
elif media_file.endswith('.mp4'):
st.markdown(get_video_html(media_file, width="100%"), unsafe_allow_html=True)
if st.button(f"πŸ‘ Upvote {os.path.basename(media_file)}", key=f"media_vote_{idx}"):
user_hash = generate_user_hash()
save_vote(IMAGE_VOTES_FILE, media_file, user_hash)
st.session_state.timer_start = time.time()
save_username(st.session_state.username)
st.rerun()
else:
st.error("No media files (images, audio, video) found in 'media' directory!")
# Refresh rate controls with pulsing timer
st.subheader("Set Refresh Rate ⏳")
refresh_rate = st.slider("Refresh Rate (seconds)", min_value=1, max_value=300, value=st.session_state.refresh_rate, step=1)
if refresh_rate != st.session_state.refresh_rate:
st.session_state.refresh_rate = refresh_rate
st.session_state.timer_start = time.time()
save_username(st.session_state.username)
col1, col2, col3 = st.columns(3)
with col1:
if st.button("πŸ‡ Small (1s)"):
st.session_state.refresh_rate = 1
st.session_state.timer_start = time.time()
save_username(st.session_state.username)
with col2:
if st.button("🐒 Medium (5s)"):
st.session_state.refresh_rate = 5
st.session_state.timer_start = time.time()
save_username(st.session_state.username)
with col3:
if st.button("🐘 Large (5m)"):
st.session_state.refresh_rate = 300
st.session_state.timer_start = time.time()
save_username(st.session_state.username)
# Pulsing countdown timer with emoji digits and random Unicode font
timer_placeholder = st.empty()
start_time = st.session_state.timer_start
for i in range(st.session_state.refresh_rate, -1, -1):
font_name, font_func = random.choice(UNICODE_FONTS)
countdown_str = "".join(UNICODE_DIGITS[int(d)] for d in str(i)) if i < 10 else font_func(str(i))
timer_emoji = "⏳" if i % 2 == 0 else "πŸ’“" # Pulse effect
timer_placeholder.markdown(f"<p class='timer'>{timer_emoji} {font_func('Next refresh in:')} {countdown_str} {font_func('seconds')}</p>", unsafe_allow_html=True)
time.sleep(1) # Pulse every second
st.session_state.timer_start = time.time()
st.session_state.last_refresh = time.time()
save_username(st.session_state.username)
st.rerun()
# Sidebar vote stats
st.sidebar.subheader("Vote Totals")
chat_votes = load_votes(QUOTE_VOTES_FILE)
image_votes = load_votes(IMAGE_VOTES_FILE)
for item, count in chat_votes.items():
st.sidebar.write(f"{item}: {count} votes")
for image, count in image_votes.items():
st.sidebar.write(f"{image}: {count} votes")
# Main event - the ringmaster kicking off the Sector circus! πŸŽͺ🀑
async def main():
"""🎀 Drops the mic and starts the Sector party - it’s showtime, folks! πŸŽ‰"""
global NODE_NAME, server_task
NODE_NAME, port = get_node_name()
# Start WebSocket server only once as a background task
if server_task is None:
server_task = asyncio.create_task(run_websocket_server())
query_params = st.query_params if hasattr(st, 'query_params') else {}
initial_username = query_params.get("username", random.choice(FUN_USERNAMES)) if query_params else random.choice(FUN_USERNAMES)
print(f"Welcoming {initial_username} to the Sector bash!")
create_streamlit_interface(initial_username)
if __name__ == "__main__":
asyncio.run(main())