Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import asyncio | |
| import websockets | |
| import uuid | |
| import argparse | |
| from datetime import datetime | |
| import os | |
| import random | |
| import time | |
| import hashlib | |
| from PIL import Image | |
| import glob | |
| from urllib.parse import quote | |
| import base64 | |
| import io | |
| # Initial App Configuration (static) | |
| icons = 'π€π§ π¬π' | |
| START_ROOM = "Sector π" | |
| # Set page config once at the top | |
| st.set_page_config( | |
| page_title="π€π§ MMO Chat Brainππ¬", # Initial static title | |
| page_icon=icons, | |
| layout="wide", | |
| initial_sidebar_state="auto" | |
| ) | |
| # Fun usernames with emojis | |
| FUN_USERNAMES = [ | |
| "CosmicJester π", "PixelPanda πΌ", "QuantumQuack π¦", "StellarSquirrel πΏοΈ", | |
| "GizmoGuru βοΈ", "NebulaNinja π ", "ByteBuster πΎ", "GalacticGopher π", | |
| "RocketRaccoon π", "EchoElf π§", "PhantomFox π¦", "WittyWizard π§", | |
| "LunarLlama π", "SolarSloth βοΈ", "AstroAlpaca π¦", "CyberCoyote πΊ", | |
| "MysticMoose π¦", "GlitchGnome π§", "VortexViper π", "ChronoChimp π" | |
| ] | |
| # Directories and files | |
| CHAT_DIR = "chat_logs" | |
| VOTE_DIR = "vote_logs" | |
| STATE_FILE = "user_state.txt" | |
| os.makedirs(CHAT_DIR, exist_ok=True) | |
| os.makedirs(VOTE_DIR, exist_ok=True) | |
| CHAT_FILE = os.path.join(CHAT_DIR, "global_chat.md") | |
| QUOTE_VOTES_FILE = os.path.join(VOTE_DIR, "quote_votes.md") | |
| MEDIA_VOTES_FILE = os.path.join(VOTE_DIR, "media_votes.md") | |
| HISTORY_FILE = os.path.join(VOTE_DIR, "vote_history.md") | |
| # Unicode digits and fonts | |
| UNICODE_DIGITS = {i: f"{i}\uFE0Fβ£" for i in range(10)} | |
| UNICODE_FONTS = [ | |
| ("Normal", lambda x: x), | |
| ("Bold", lambda x: "".join(chr(ord(c) + 0x1D400 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D41A - 0x61) if 'a' <= c <= 'z' else c for c in x)), | |
| # ... (other font styles remain the same) | |
| ] | |
| server_running = False | |
| server_task = None | |
| def get_node_name(): | |
| parser = argparse.ArgumentParser(description='Start a chat node with a specific name') | |
| parser.add_argument('--node-name', type=str, default=None) | |
| parser.add_argument('--port', type=int, default=8501) | |
| args = parser.parse_args() | |
| return args.node_name or f"node-{uuid.uuid4().hex[:8]}", args.port | |
| def save_chat_entry(username, message): | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| entry = f"[{timestamp}] {username}: {message}" | |
| with open(CHAT_FILE, 'a') as f: | |
| f.write(f"{entry}\n") | |
| def load_chat(): | |
| if not os.path.exists(CHAT_FILE): | |
| with open(CHAT_FILE, 'w') as f: | |
| f.write(f"# {START_ROOM} Chat\n\nWelcome to the cosmic hub - start chatting! π€\n") | |
| with open(CHAT_FILE, 'r') as f: | |
| content = f.read() | |
| lines = content.strip().split('\n') | |
| return "\n".join(f"{i+1}. {line}" for i, line in enumerate(lines) if line.strip()) | |
| def get_user_list(chat_content): | |
| users = set() | |
| for line in chat_content.split('\n'): | |
| if line.strip() and ': ' in line: | |
| user = line.split(': ')[1].split(' ')[0] | |
| users.add(user) | |
| return sorted(list(users)) | |
| def has_joined_before(client_id, chat_content): | |
| return any(f"Client-{client_id} has joined" in line for line in chat_content.split('\n')) | |
| def get_message_suggestions(chat_content, prefix): | |
| lines = chat_content.split('\n') | |
| messages = [line.split(': ', 1)[1] for line in lines if ': ' in line and line.strip()] | |
| return [msg for msg in messages if msg.lower().startswith(prefix.lower())][:5] | |
| def load_quotes(source="famous"): | |
| famous_quotes = [ | |
| "The true sign of intelligence is not knowledge but imagination. β Albert Einstein", | |
| # ... (other quotes remain the same) | |
| ] | |
| custom_quotes = [ | |
| "Every age unfolds a new lesson. Life's chapters evolve, each teaching us anew.", | |
| # ... (other custom quotes remain the same) | |
| ] | |
| return famous_quotes if source == "famous" else custom_quotes | |
| def save_vote(file, item, user_hash): | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| entry = f"[{timestamp}] {user_hash} voted for {item}" | |
| with open(file, 'a') as f: | |
| f.write(f"{entry}\n") | |
| with open(HISTORY_FILE, "a") as f: | |
| f.write(f"- {timestamp} - User {user_hash} voted for {item}\n") | |
| def load_votes(file): | |
| if not os.path.exists(file): | |
| with open(file, 'w') as f: | |
| f.write("# Vote Tally\n\nNo votes yet - get clicking! π±οΈ\n") | |
| with open(file, 'r') as f: | |
| lines = f.read().strip().split('\n')[2:] # Skip header | |
| votes = {} | |
| user_votes = set() | |
| for line in lines: | |
| if line.strip() and 'voted for' in line: | |
| user_hash = line.split('] ')[1].split(' voted for ')[0] | |
| item = line.split('voted for ')[1] | |
| vote_key = f"{user_hash}-{item}" | |
| if vote_key not in user_votes: | |
| votes[item] = votes.get(item, 0) + 1 | |
| user_votes.add(vote_key) | |
| return votes | |
| def generate_user_hash(): | |
| if 'user_hash' not in st.session_state: | |
| st.session_state.user_hash = hashlib.md5(str(random.getrandbits(128)).encode()).hexdigest()[:8] | |
| return st.session_state.user_hash | |
| def save_pasted_image(image_data): | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"paste_{timestamp}.png" | |
| filepath = os.path.join('./', filename) | |
| if ',' in image_data: | |
| image_data = image_data.split(',')[1] | |
| img_bytes = base64.b64decode(image_data) | |
| img = Image.open(io.BytesIO(img_bytes)) | |
| img.save(filepath, "PNG") | |
| return filename | |
| def get_video_html(video_path, width="100%"): | |
| video_url = f"data:video/mp4;base64,{base64.b64encode(open(video_path, 'rb').read()).decode()}" | |
| return f'<video width="{width}" controls autoplay muted loop><source src="{video_url}" type="video/mp4">Your browser does not support the video tag.</video>' | |
| def get_audio_html(audio_path, width="100%"): | |
| audio_url = f"data:audio/mpeg;base64,{base64.b64encode(open(audio_path, 'rb').read()).decode()}" | |
| return f'<audio controls style="width: {width};"><source src="{audio_url}" type="audio/mpeg">Your browser does not support the audio element.</audio>' | |
| active_connections = {} | |
| async def websocket_handler(websocket, path): | |
| try: | |
| client_id = str(uuid.uuid4()) | |
| room_id = "chat" | |
| active_connections.setdefault(room_id, {})[client_id] = websocket | |
| chat_content = load_chat() | |
| username = st.session_state.get('username', random.choice(FUN_USERNAMES)) | |
| if not has_joined_before(client_id, chat_content): | |
| save_chat_entry(f"Client-{client_id}", f"{username} has joined {START_ROOM}!") | |
| async for message in websocket: | |
| parts = message.split('|', 1) | |
| if len(parts) == 2: | |
| username, content = parts | |
| save_chat_entry(username, content) | |
| await broadcast_message(f"{username}|{content}", room_id) | |
| except websockets.ConnectionClosed: | |
| pass | |
| finally: | |
| if room_id in active_connections and client_id in active_connections[room_id]: | |
| del active_connections[room_id][client_id] | |
| async def broadcast_message(message, room_id): | |
| if room_id in active_connections: | |
| disconnected = [] | |
| for client_id, ws in active_connections[room_id].items(): | |
| try: | |
| await ws.send(message) | |
| except websockets.ConnectionClosed: | |
| disconnected.append(client_id) | |
| for client_id in disconnected: | |
| del active_connections[room_id][client_id] | |
| async def run_websocket_server(): | |
| global server_running, server_task | |
| if not server_running: | |
| server = await websockets.serve(websocket_handler, '0.0.0.0', 8765) | |
| server_running = True | |
| await server.wait_closed() | |
| def create_streamlit_interface(): | |
| # Dynamic title based on username | |
| if 'username' in st.session_state: | |
| dynamic_title = f"π€π§ MMO {st.session_state.username}ππ¬" | |
| else: | |
| dynamic_title = "π€π§ MMO Chat Brainππ¬" | |
| st.markdown(""" | |
| <style> | |
| .chat-box {font-family: monospace; background: #1e1e1e; color: #d4d4d4; padding: 10px; border-radius: 5px; height: 300px; overflow-y: auto;} | |
| .timer {font-size: 24px; color: #ffcc00; text-align: center; animation: pulse 1s infinite;} | |
| @keyframes pulse {0% {transform: scale(1);} 50% {transform: scale(1.1);} 100% {transform: scale(1);}} | |
| </style> | |
| <script> | |
| document.addEventListener('paste', function(e) { | |
| const items = (e.clipboardData || window.clipboardData).items; | |
| for (let i = 0; i < items.length; i++) { | |
| if (items[i].type.indexOf('image') !== -1) { | |
| const blob = items[i].getAsFile(); | |
| const reader = new FileReader(); | |
| reader.onload = function(event) { | |
| const imageData = event.target.result; | |
| sessionStorage.setItem('pastedImage', imageData); | |
| document.getElementById('message_input').value = 'PastedImage:' + blob.name; | |
| document.getElementById('send_button').click(); | |
| }; | |
| reader.readAsDataURL(blob); | |
| } | |
| } | |
| }); | |
| </script> | |
| """, unsafe_allow_html=True) | |
| st.title(dynamic_title) # Use dynamic title here | |
| st.markdown(f"Welcome to {START_ROOM} - chat, vote, upload, and enjoy! π") | |
| # Initialize client ID and username | |
| if 'client_id' not in st.session_state: | |
| st.session_state.client_id = str(uuid.uuid4()) | |
| if 'username' not in st.session_state: | |
| available_names = [name for name in FUN_USERNAMES if not any(f"{name} has joined" in line for line in load_chat().split('\n'))] | |
| st.session_state.username = random.choice(available_names) if available_names else random.choice(FUN_USERNAMES) | |
| # Session state initialization | |
| if 'refresh_rate' not in st.session_state: | |
| st.session_state.refresh_rate = 5 | |
| if 'timer_start' not in st.session_state: | |
| st.session_state.timer_start = time.time() | |
| if 'quote_index' not in st.session_state: | |
| quotes = load_quotes("famous") | |
| st.session_state.quote_index = random.randint(0, max(0, len(quotes) - 1)) if quotes else 0 | |
| if 'quote_source' not in st.session_state: | |
| st.session_state.quote_source = "famous" | |
| # Chat section | |
| st.subheader(f"{START_ROOM} Chat π¬") | |
| chat_content = load_chat() | |
| for i, line in enumerate(chat_content.split('\n')): | |
| if line.strip() and ': ' in line: | |
| col1, col2 = st.columns([5, 1]) | |
| with col1: | |
| st.markdown(line) | |
| with col2: | |
| if st.button(f"π", key=f"chat_vote_{i}"): | |
| save_vote(QUOTE_VOTES_FILE, line, generate_user_hash()) | |
| st.rerun() | |
| # Username change dropdown | |
| new_username = st.selectbox("Change Name", [""] + FUN_USERNAMES, index=0) | |
| if new_username and new_username != st.session_state.username: | |
| save_chat_entry("System π", f"{st.session_state.username} changed name to {new_username}") | |
| st.session_state.username = new_username | |
| st.rerun() | |
| # Message input | |
| message = st.text_input(f"Message as {st.session_state.username}", key="message_input") | |
| if st.button("Send π", key="send_button") and message.strip(): | |
| if message.startswith("PastedImage:"): | |
| image_data = sessionStorage.get('pastedImage') | |
| if image_data: | |
| filename = save_pasted_image(image_data) | |
| if filename: | |
| save_chat_entry(st.session_state.username, f"Pasted image: {filename}") | |
| else: | |
| save_chat_entry(st.session_state.username, message) | |
| st.rerun() | |
| # Media section with upload and delete | |
| st.subheader("Media Gallery π¨πΆπ₯") | |
| uploaded_file = st.file_uploader("Upload Media", type=['png', 'jpg', 'mp3', 'mp4']) | |
| if uploaded_file: | |
| file_path = os.path.join('./', uploaded_file.name) | |
| with open(file_path, 'wb') as f: | |
| f.write(uploaded_file.getbuffer()) | |
| st.success(f"Uploaded {uploaded_file.name}") | |
| media_files = glob.glob("./*.png") + glob.glob("./*.jpg") + glob.glob("./*.mp3") + glob.glob("./*.mp4") | |
| if media_files: | |
| cols = st.columns(3) | |
| media_votes = load_votes(MEDIA_VOTES_FILE) | |
| for idx, media_file in enumerate(media_files): | |
| with cols[idx % 3]: | |
| if media_file.endswith(('.png', '.jpg')): | |
| st.image(media_file, use_container_width=True) | |
| elif media_file.endswith('.mp3'): | |
| st.markdown(get_audio_html(media_file), unsafe_allow_html=True) | |
| elif media_file.endswith('.mp4'): | |
| st.markdown(get_video_html(media_file), unsafe_allow_html=True) | |
| vote_count = media_votes.get(media_file, 0) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| if st.button(f"π {vote_count}", key=f"media_vote_{idx}"): | |
| save_vote(MEDIA_VOTES_FILE, media_file, generate_user_hash()) | |
| st.rerun() | |
| with col2: | |
| if st.button("ποΈ", key=f"media_delete_{idx}"): | |
| os.remove(media_file) | |
| st.rerun() | |
| # Refresh timer | |
| st.subheader("Refresh β³") | |
| refresh_rate = st.slider("Refresh Rate", 1, 300, st.session_state.refresh_rate) | |
| st.session_state.refresh_rate = refresh_rate | |
| timer_placeholder = st.empty() | |
| for i in range(st.session_state.refresh_rate, -1, -1): | |
| font_name, font_func = random.choice(UNICODE_FONTS) | |
| countdown_str = "".join(UNICODE_DIGITS[int(d)] for d in str(i)) if i < 10 else font_func(str(i)) | |
| timer_placeholder.markdown(f"<p class='timer'>β³ {font_func('Refresh in:')} {countdown_str}</p>", unsafe_allow_html=True) | |
| time.sleep(1) | |
| st.rerun() | |
| # Sidebar vote stats | |
| st.sidebar.subheader("Vote Counts") | |
| chat_votes = load_votes(QUOTE_VOTES_FILE) | |
| media_votes = load_votes(MEDIA_VOTES_FILE) | |
| for item, count in {**chat_votes, **media_votes}.items(): | |
| st.sidebar.write(f"{item}: {count} votes") | |
| async def main(): | |
| global NODE_NAME, server_task | |
| NODE_NAME, port = get_node_name() | |
| if server_task is None: | |
| server_task = asyncio.create_task(run_websocket_server()) | |
| create_streamlit_interface() | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |