|
import streamlit as st |
|
import asyncio |
|
import websockets |
|
import uuid |
|
import argparse |
|
from datetime import datetime |
|
import os |
|
import random |
|
import time |
|
import hashlib |
|
from PIL import Image |
|
import glob |
|
import base64 |
|
import io |
|
import streamlit.components.v1 as components |
|
import edge_tts |
|
import nest_asyncio |
|
import re |
|
from streamlit_paste_button import paste_image_button |
|
import pytz |
|
import shutil |
|
from urllib.parse import urlencode |
|
from PyPDF2 import PdfReader |
|
import json |
|
|
|
|
|
nest_asyncio.apply() |
|
|
|
|
|
icons = '🤖🧠🔬📝' |
|
START_ROOM = "Sector 🌌" |
|
|
|
|
|
st.set_page_config( |
|
page_title="🤖🧠MMO Chat Brain📝🔬", |
|
page_icon=icons, |
|
layout="wide", |
|
initial_sidebar_state="auto" |
|
) |
|
|
|
|
|
FUN_USERNAMES = { |
|
"CosmicJester 🌌": "en-US-AriaNeural", |
|
"PixelPanda 🐼": "en-US-JennyNeural", |
|
"QuantumQuack 🦆": "en-GB-SoniaNeural", |
|
"StellarSquirrel 🐿️": "en-AU-NatashaNeural", |
|
"GizmoGuru ⚙️": "en-CA-ClaraNeural", |
|
"NebulaNinja 🌠": "en-US-GuyNeural", |
|
"ByteBuster 💾": "en-GB-RyanNeural", |
|
"GalacticGopher 🌍": "en-AU-WilliamNeural", |
|
"RocketRaccoon 🚀": "en-CA-LiamNeural", |
|
"EchoElf 🧝": "en-US-AnaNeural", |
|
"PhantomFox 🦊": "en-US-BrandonNeural", |
|
"WittyWizard 🧙": "en-GB-ThomasNeural", |
|
"LunarLlama 🌙": "en-AU-FreyaNeural", |
|
"SolarSloth ☀️": "en-CA-LindaNeural", |
|
"AstroAlpaca 🦙": "en-US-ChristopherNeural", |
|
"CyberCoyote 🐺": "en-GB-ElliotNeural", |
|
"MysticMoose 🦌": "en-AU-JamesNeural", |
|
"GlitchGnome 🧚": "en-CA-EthanNeural", |
|
"VortexViper 🐍": "en-US-AmberNeural", |
|
"ChronoChimp 🐒": "en-GB-LibbyNeural" |
|
} |
|
|
|
|
|
CHAT_FILE = "global_chat.md" |
|
QUOTE_VOTES_FILE = "quote_votes.md" |
|
MEDIA_VOTES_FILE = "media_votes.md" |
|
HISTORY_FILE = "chat_history.md" |
|
STATE_FILE = "user_state.txt" |
|
AUDIO_DIR = "audio_logs" |
|
MEDIA_DIR = "media_files" |
|
os.makedirs(AUDIO_DIR, exist_ok=True) |
|
os.makedirs(MEDIA_DIR, exist_ok=True) |
|
|
|
|
|
UNICODE_DIGITS = {i: f"{i}\uFE0F⃣" for i in range(10)} |
|
|
|
|
|
UNICODE_FONTS = [ |
|
("Normal", lambda x: x), |
|
("Bold", lambda x: "".join(chr(ord(c) + 0x1D400 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D41A - 0x61) if 'a' <= c <= 'z' else c for c in x)), |
|
("Italic", lambda x: "".join(chr(ord(c) + 0x1D434 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D44E - 0x61) if 'a' <= c <= 'z' else c for c in x)), |
|
("Bold Italic", lambda x: "".join(chr(ord(c) + 0x1D468 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D482 - 0x61) if 'a' <= c <= 'z' else c for c in x)), |
|
("Script", lambda x: "".join(chr(ord(c) + 0x1D49C - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D4B6 - 0x61) if 'a' <= c <= 'z' else c for c in x)), |
|
("Bold Script", lambda x: "".join(chr(ord(c) + 0x1D4D0 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D4EA - 0x61) if 'a' <= c <= 'z' else c for c in x)), |
|
("Fraktur", lambda x: "".join(chr(ord(c) + 0x1D504 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D51E - 0x61) if 'a' <= c <= 'z' else c for c in x)), |
|
("Bold Fraktur", lambda x: "".join(chr(ord(c) + 0x1D56C - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D586 - 0x61) if 'a' <= c <= 'z' else c for c in x)), |
|
("Double Struck", lambda x: "".join(chr(ord(c) + 0x1D538 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D552 - 0x61) if 'a' <= c <= 'z' else c for c in x)), |
|
("Sans Serif", lambda x: "".join(chr(ord(c) + 0x1D5A0 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D5BA - 0x61) if 'a' <= c <= 'z' else c for c in x)), |
|
("Sans Serif Bold", lambda x: "".join(chr(ord(c) + 0x1D5D4 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D5EE - 0x61) if 'a' <= c <= 'z' else c for c in x)), |
|
("Sans Serif Italic", lambda x: "".join(chr(ord(c) + 0x1D608 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D622 - 0x61) if 'a' <= c <= 'z' else c for c in x)), |
|
("Sans Serif Bold Italic", lambda x: "".join(chr(ord(c) + 0x1D63C - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D656 - 0x61) if 'a' <= c <= 'z' else c for c in x)), |
|
("Monospace", lambda x: "".join(chr(ord(c) + 0x1D670 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D68A - 0x61) if 'a' <= c <= 'z' else c for c in x)), |
|
("Circled", lambda x: "".join(chr(ord(c) - 0x41 + 0x24B6) if 'A' <= c <= 'Z' else chr(ord(c) - 0x61 + 0x24D0) if 'a' <= c <= 'z' else c for c in x)), |
|
("Squared", lambda x: "".join(chr(ord(c) - 0x41 + 0x1F130) if 'A' <= c <= 'Z' else c for c in x)), |
|
("Negative Circled", lambda x: "".join(chr(ord(c) - 0x41 + 0x1F150) if 'A' <= c <= 'Z' else c for c in x)), |
|
("Negative Squared", lambda x: "".join(chr(ord(c) - 0x41 + 0x1F170) if 'A' <= c <= 'Z' else c for c in x)), |
|
("Regional Indicator", lambda x: "".join(chr(ord(c) - 0x41 + 0x1F1E6) if 'A' <= c <= 'Z' else c for c in x)), |
|
] |
|
|
|
|
|
if 'server_running' not in st.session_state: |
|
st.session_state.server_running = False |
|
if 'server_task' not in st.session_state: |
|
st.session_state.server_task = None |
|
if 'active_connections' not in st.session_state: |
|
st.session_state.active_connections = {} |
|
if 'media_notifications' not in st.session_state: |
|
st.session_state.media_notifications = [] |
|
if 'last_chat_update' not in st.session_state: |
|
st.session_state.last_chat_update = 0 |
|
if 'displayed_chat_lines' not in st.session_state: |
|
st.session_state.displayed_chat_lines = [] |
|
if 'message_text' not in st.session_state: |
|
st.session_state.message_text = "" |
|
if 'audio_cache' not in st.session_state: |
|
st.session_state.audio_cache = {} |
|
if 'pasted_image_data' not in st.session_state: |
|
st.session_state.pasted_image_data = None |
|
if 'quote_line' not in st.session_state: |
|
st.session_state.quote_line = None |
|
if 'refresh_rate' not in st.session_state: |
|
st.session_state.refresh_rate = 5 |
|
if 'base64_cache' not in st.session_state: |
|
st.session_state.base64_cache = {} |
|
if 'image_hashes' not in st.session_state: |
|
st.session_state.image_hashes = set() |
|
if 'gallery_columns' not in st.session_state: |
|
st.session_state.gallery_columns = 1 |
|
if 'user_id' not in st.session_state: |
|
st.session_state.user_id = None |
|
if 'user_hash' not in st.session_state: |
|
st.session_state.user_hash = None |
|
|
|
|
|
def format_timestamp_prefix(username): |
|
central = pytz.timezone('US/Central') |
|
now = datetime.now(central) |
|
return f"{username}-{now.strftime('%I-%M-%p-%m-%d-%Y')}-{st.session_state.user_id}" |
|
|
|
|
|
def compute_image_hash(image_data): |
|
if isinstance(image_data, Image.Image): |
|
img_byte_arr = io.BytesIO() |
|
image_data.save(img_byte_arr, format='PNG') |
|
img_bytes = img_byte_arr.getvalue() |
|
else: |
|
img_bytes = image_data |
|
return hashlib.md5(img_bytes).hexdigest()[:8] |
|
|
|
|
|
def get_node_name(): |
|
parser = argparse.ArgumentParser(description='Start a chat node with a specific name') |
|
parser.add_argument('--node-name', type=str, default=None) |
|
parser.add_argument('--port', type=int, default=8501) |
|
args = parser.parse_args() |
|
username = st.session_state.get('username', 'System 🌟') |
|
log_action(username, "🌐🍼 - Node naming - christening the beast!") |
|
return args.node_name or f"node-{uuid.uuid4().hex[:8]}", args.port |
|
|
|
|
|
def log_action(username, action): |
|
if 'action_log' not in st.session_state: |
|
st.session_state.action_log = {} |
|
user_log = st.session_state.action_log.setdefault(username, {}) |
|
current_time = time.time() |
|
user_log = {k: v for k, v in user_log.items() if current_time - v < 10} |
|
st.session_state.action_log[username] = user_log |
|
if action not in user_log: |
|
central = pytz.timezone('US/Central') |
|
with open(HISTORY_FILE, 'a') as f: |
|
f.write(f"[{datetime.now(central).strftime('%Y-%m-%d %H:%M:%S')}] {username}: {action}\n") |
|
user_log[action] = current_time |
|
|
|
|
|
def clean_text_for_tts(text): |
|
cleaned = re.sub(r'[#*!\[\]]+', '', text) |
|
cleaned = ' '.join(cleaned.split()) |
|
return cleaned if cleaned else "No text to speak" |
|
|
|
|
|
@st.cache_resource |
|
def get_shared_memory(): |
|
""" |
|
Oh, the cache, a grand, shared hall, where data dances, never to fall! |
|
Thread-safe and global, it holds our lore, across users and sessions, forevermore, in Streamlit’s core! 🌍🧙 |
|
""" |
|
class SharedMemory: |
|
def __init__(self): |
|
self.chat_history = [] |
|
self.media_files = [] |
|
self.cache_time = datetime.now() |
|
|
|
def update_chat(self, entry): |
|
"""Add a chat entry, a rhyme in the stream, keeping our saga supreme, in sync’s dream, in Streamlit’s beam! 💬🎶""" |
|
self.chat_history.append(entry) |
|
if len(self.chat_history) > 100: |
|
self.chat_history.pop(0) |
|
|
|
def update_media(self, media_path): |
|
"""Store media files, a visual dream, in our shared cache, a radiant beam, in sync’s gleam, in Streamlit’s stream! 🖼️🌟""" |
|
self.media_files.append(media_path) |
|
if len(self.media_files) > 50: |
|
self.media_files.pop(0) |
|
|
|
def get_condensed_dialogs(self): |
|
"""Condense the chatter, a poetic pact, short and sweet, our story intact, in sync’s act, in Streamlit’s tract! 🗣️✨""" |
|
return "\n".join(f"- {entry.split(': ')[1][:50]}" for entry in self.chat_history[-10:]) |
|
|
|
def clear(self): |
|
"""Clear the cache, a dramatic purge, resetting our tale, a new surge, in sync’s urge, in Streamlit’s surge! 🌀🔥""" |
|
self.chat_history.clear() |
|
self.media_files.clear() |
|
self.cache_time = datetime.now() |
|
|
|
return SharedMemory() |
|
|
|
|
|
class AudioProcessor: |
|
def __init__(self): |
|
self.cache_dir = AUDIO_DIR |
|
os.makedirs(self.cache_dir, exist_ok=True) |
|
self.metadata = self._load_metadata() |
|
|
|
def _load_metadata(self): |
|
metadata_file = os.path.join(self.cache_dir, "metadata.json") |
|
return json.load(open(metadata_file)) if os.path.exists(metadata_file) else {} |
|
|
|
def _save_metadata(self): |
|
metadata_file = os.path.join(self.cache_dir, "metadata.json") |
|
with open(metadata_file, 'w') as f: |
|
json.dump(self.metadata, f) |
|
|
|
async def create_audio(self, text, voice='en-US-AriaNeural', filename=None): |
|
""" |
|
Create audio, a voice that roars, in our shared cache, through cosmic doors, in sync’s shores, in Streamlit’s roars! 🎤🌌 |
|
""" |
|
cache_key = hashlib.md5(f"{text}:{voice}".encode()).hexdigest() |
|
timestamp = format_timestamp_prefix(st.session_state.username) |
|
filename = filename or os.path.join(self.cache_dir, f"audio_{timestamp}_{random.randint(1000, 9999)}.mp3") |
|
|
|
if cache_key in self.metadata and os.path.exists(filename): |
|
return filename |
|
|
|
|
|
text = text.replace("\n", " ").replace("</s>", " ").strip() |
|
if not text: |
|
return None |
|
|
|
|
|
try: |
|
communicate = edge_tts.Communicate(text, voice) |
|
await communicate.save(filename) |
|
if not os.path.exists(filename): |
|
raise edge_tts.exceptions.NoAudioReceived("No audio file created") |
|
except edge_tts.exceptions.NoAudioReceived as e: |
|
log_action("System 🌟", f"TTS failed for text '{text}' with voice '{voice}': {str(e)}") |
|
return None |
|
|
|
|
|
self.metadata[cache_key] = { |
|
'timestamp': datetime.now().isoformat(), |
|
'text_length': len(text), |
|
'voice': voice |
|
} |
|
self._save_metadata() |
|
|
|
return filename |
|
|
|
|
|
async def save_chat_entry(username, message, is_markdown=False, quote_line=None, media_file=None, skip_audio=False): |
|
""" |
|
Save chats with flair, in shared cache we dare, a rhyming affair, in sync’s air, in Streamlit’s stare! ✨🎉 |
|
""" |
|
central = pytz.timezone('US/Central') |
|
timestamp = datetime.now(central).strftime("%Y-%m-%d %H:%M:%S") |
|
user_history_file = f"{username}_history.md" |
|
voice = st.session_state.voice if username == st.session_state.username else FUN_USERNAMES.get(username, "en-US-AriaNeural") |
|
indent = " " if quote_line else "" |
|
|
|
|
|
if is_markdown: |
|
entry = f"{indent}[{timestamp}] {username}:\n{indent}```markdown\n{indent}{message}\n{indent}```" |
|
else: |
|
entry = f"{indent}[{timestamp}] {username}: {message}" |
|
if quote_line: |
|
entry = f"{indent}> {quote_line}\n{entry}" |
|
|
|
|
|
with open(CHAT_FILE, 'a') as f: |
|
f.write(f"{entry}\n") |
|
|
|
|
|
if not os.path.exists(user_history_file): |
|
with open(user_history_file, 'w') as f: |
|
f.write(f"# Chat History for {username} (Voice: {voice})\n\n") |
|
with open(user_history_file, 'a') as f: |
|
f.write(f"{entry}\n") |
|
|
|
|
|
audio_filename = None |
|
if not skip_audio and message.strip(): |
|
cleaned_message = clean_text_for_tts(message) |
|
audio_processor = AudioProcessor() |
|
audio_filename = await audio_processor.create_audio(cleaned_message, voice) |
|
if audio_filename: |
|
with open(HISTORY_FILE, 'a') as f: |
|
f.write(f"[{timestamp}] {username} ({voice}): Audio generated - {audio_filename}\n") |
|
with open(user_history_file, 'a') as f: |
|
f.write(f"{indent}[{timestamp}] Audio: {audio_filename}\n") |
|
with open(CHAT_FILE, 'a') as f: |
|
f.write(f"{indent}[{timestamp}] Audio: {audio_filename}\n") |
|
|
|
|
|
if media_file: |
|
if isinstance(media_file, Image.Image): |
|
timestamp_prefix = format_timestamp_prefix(username) |
|
media_filename = f"{username}-{timestamp_prefix.split('-')[-1]}.{media_file.format.lower()}" |
|
media_path = os.path.join(MEDIA_DIR, media_filename) |
|
img_byte_arr = io.BytesIO() |
|
media_file.save(img_byte_arr, format=media_file.format) |
|
await asyncio.to_thread(lambda: open(media_path, 'wb').write(img_byte_arr.getvalue())) |
|
media_file = media_filename |
|
elif media_file.name in ['png', 'jpg', 'mp4', 'mp3', 'wav']: |
|
timestamp_prefix = format_timestamp_prefix(username) |
|
media_filename = f"{username}-{timestamp_prefix.split('-')[-1]}.{media_file.name.split('.')[-1]}" |
|
media_path = os.path.join(MEDIA_DIR, media_filename) |
|
await asyncio.to_thread(lambda: open(media_path, 'wb').write(media_file.getbuffer())) |
|
media_file = media_filename |
|
elif media_file.name == 'pdf': |
|
timestamp_prefix = format_timestamp_prefix(username) |
|
file_hash = hashlib.md5(media_file.getbuffer()).hexdigest()[:8] |
|
media_filename = f"{username}-{timestamp_prefix.split('-')[-1]}-{file_hash}.pdf" |
|
media_path = os.path.join(MEDIA_DIR, media_filename) |
|
await asyncio.to_thread(lambda: open(media_path, 'wb').write(media_file.getbuffer())) |
|
media_file = media_filename |
|
with open(CHAT_FILE, 'a') as f: |
|
f.write(f"{indent}[{timestamp}] Media: \n") |
|
with open(user_history_file, 'a') as f: |
|
f.write(f"{indent}[{timestamp}] Media: \n") |
|
|
|
|
|
shared_memory = get_shared_memory() |
|
shared_memory.update_chat(entry) |
|
if media_file: |
|
shared_memory.update_media(os.path.join(MEDIA_DIR, media_file)) |
|
|
|
await broadcast_message(f"{username}|{message}", "chat") |
|
st.session_state.last_chat_update = time.time() |
|
return audio_filename |
|
|
|
|
|
async def save_chat_history_with_image(username, image_path): |
|
""" |
|
Save history, a scribe’s grand sight, in shared memory, pure and bright, in sync’s light, in Streamlit’s right! ✨📖 |
|
""" |
|
central = pytz.timezone('US/Central') |
|
timestamp = datetime.now(central).strftime("%Y-%m-%d_%H-%M-%S") |
|
user_history_file = f"{username}_history.md" |
|
chat_content = await load_chat() |
|
voice = st.session_state.voice if username == st.session_state.username else FUN_USERNAMES.get(username, "en-US-AriaNeural") |
|
if not os.path.exists(user_history_file): |
|
with open(user_history_file, 'w') as f: |
|
f.write(f"# Chat History for {username} (Voice: {voice})\n\n") |
|
with open(user_history_file, 'a') as f: |
|
f.write(f"[{timestamp}] {username} (Voice: {voice}) Shared Media: {os.path.basename(image_path)}\n") |
|
f.write(f"```markdown\n{chat_content}\n```\n") |
|
|
|
|
|
@st.cache_resource |
|
async def load_chat(): |
|
""" |
|
Load chats, a shared memory spree, from cache with glee, our history free, in sync’s sea, in Streamlit’s tree! 🌟💬 |
|
""" |
|
username = st.session_state.get('username', 'System 🌟') |
|
await asyncio.to_thread(log_action, username, "📜🚀 - Chat loader - history unleashed!") |
|
if not os.path.exists(CHAT_FILE): |
|
await asyncio.to_thread(lambda: open(CHAT_FILE, 'a').write(f"# {START_ROOM} Chat\n\nWelcome to the cosmic hub - start chatting! 🎤\n")) |
|
with open(CHAT_FILE, 'r') as f: |
|
content = await asyncio.to_thread(f.read) |
|
return content |
|
|
|
|
|
async def get_user_list(chat_content): |
|
""" |
|
List users, a shared roster’s rhyme, in cache divine, through space and time, in sync’s chime, in Streamlit’s time! 🌍🎭 |
|
""" |
|
username = st.session_state.get('username', 'System 🌟') |
|
await asyncio.to_thread(log_action, username, "👥🎉 - User lister - who’s in the gang!") |
|
users = set() |
|
for line in chat_content.split('\n'): |
|
if line.strip() and ': ' in line: |
|
user = line.split(': ')[1].split(' ')[0] |
|
users.add(user) |
|
return sorted(list(users)) |
|
|
|
|
|
async def has_joined_before(client_id, chat_content): |
|
""" |
|
Check joins, a shared memory chore, in cache secure, forevermore, in sync’s lore, in Streamlit’s core! 🌀🔐 |
|
""" |
|
username = st.session_state.get('username', 'System 🌟') |
|
await asyncio.to_thread(log_action, username, "🚪🔍 - Join checker - been here before?") |
|
return any(f"Client-{client_id}" in line for line in chat_content.split('\n')) |
|
|
|
|
|
async def get_message_suggestions(chat_content, prefix): |
|
""" |
|
Suggest quips, a shared memory jest, in cache we nest, our humor blessed, in sync’s zest, in Streamlit’s best! 😂🌟 |
|
""" |
|
username = st.session_state.get('username', 'System 🌟') |
|
await asyncio.to_thread(log_action, username, "💡📝 - Suggestion maker - old quips resurface!") |
|
lines = chat_content.split('\n') |
|
messages = [line.split(': ', 1)[1] for line in lines if ': ' in line and line.strip()] |
|
return [msg for msg in messages if msg.lower().startswith(prefix.lower())][:5] |
|
|
|
|
|
@st.cache_resource |
|
async def save_vote(file, item, user_hash, username, comment=""): |
|
""" |
|
Save votes, a shared tally’s cheer, in cache so clear, our triumph near, in sync’s spear, in Streamlit’s near! 🏆🎉 |
|
""" |
|
await asyncio.to_thread(log_action, username, "👍📊 - Vote saver - cheers recorded!") |
|
central = pytz.timezone('US/Central') |
|
timestamp = datetime.now(central).strftime("%Y-%m-%d %H:%M:%S") |
|
entry = f"[{timestamp}] {user_hash} voted for {item}" |
|
await asyncio.to_thread(lambda: open(file, 'a').write(f"{entry}\n")) |
|
await asyncio.to_thread(lambda: open(HISTORY_FILE, "a").write(f"- {timestamp} - User {user_hash} voted for {item}\n")) |
|
chat_message = f"{username} upvoted: \"{item}\"" |
|
if comment: |
|
chat_message += f" - {comment}" |
|
await save_chat_entry(username, chat_message) |
|
return entry |
|
|
|
|
|
@st.cache_resource |
|
async def load_votes(file): |
|
""" |
|
Count votes, a shared tally’s might, in cache so bright, our victory’s light, in sync’s fight, in Streamlit’s light! 🌟📊 |
|
""" |
|
username = st.session_state.get('username', 'System 🌟') |
|
await asyncio.to_thread(log_action, username, "🏆📈 - Vote counter - tallying the love!") |
|
if not os.path.exists(file): |
|
await asyncio.to_thread(lambda: open(file, 'w').write("# Vote Tally\n\nNo votes yet - get clicking! 🖱️\n")) |
|
with open(file, 'r') as f: |
|
content = await asyncio.to_thread(f.read) |
|
lines = content.strip().split('\n')[2:] |
|
votes = {} |
|
user_votes = set() |
|
for line in lines: |
|
if line.strip() and 'voted for' in line: |
|
user_hash = line.split('] ')[1].split(' voted for ')[0] |
|
item = line.split('voted for ')[1] |
|
vote_key = f"{user_hash}-{item}" |
|
if vote_key not in user_votes: |
|
votes[item] = votes.get(item, 0) + 1 |
|
user_votes.add(vote_key) |
|
return votes |
|
|
|
|
|
async def generate_user_hash(): |
|
""" |
|
Generate hashes, a shared code’s chime, in cache sublime, through space and time, in sync’s rhyme, in Streamlit’s chime! 🌌🔐 |
|
""" |
|
username = st.session_state.get('username', 'System 🌟') |
|
await asyncio.to_thread(log_action, username, "🔑🕵️ - Hash generator - secret codes ahoy!") |
|
if 'user_hash' not in st.session_state: |
|
st.session_state.user_hash = hashlib.md5(str(random.getrandbits(128)).encode()).hexdigest()[:8] |
|
return st.session_state.user_hash |
|
|
|
|
|
async def async_edge_tts_generate(text, voice, rate=0, pitch=0, file_format="mp3"): |
|
""" |
|
Make audio, a shared voice’s thrill, in cache we fill, with sonic will, in sync’s hill, in Streamlit’s will! 🎤🔊 |
|
""" |
|
username = st.session_state.get('username', 'System 🌟') |
|
await asyncio.to_thread(log_action, username, "🎶🌟 - Audio maker - voices come alive!") |
|
timestamp = format_timestamp_prefix(username) |
|
filename = os.path.join(AUDIO_DIR, f"audio_{timestamp}_{random.randint(1000, 9999)}.mp3") |
|
communicate = edge_tts.Communicate(text, voice, rate=f"{rate:+d}%", pitch=f"{pitch:+d}Hz") |
|
try: |
|
await communicate.save(filename) |
|
return filename if os.path.exists(filename) else None |
|
except edge_tts.exceptions.NoAudioReceived: |
|
with open(HISTORY_FILE, 'a') as f: |
|
central = pytz.timezone('US/Central') |
|
f.write(f"[{datetime.now(central).strftime('%Y-%m-%d %H:%M:%S')}] {username}: Audio failed - No audio received for '{text}'\n") |
|
return None |
|
|
|
|
|
def play_and_download_audio(file_path): |
|
""" |
|
Play tunes, a shared melody’s jest, in cache expressed, our audio quest, in sync’s zest, in Streamlit’s quest! 🎵🌌 |
|
""" |
|
if file_path and os.path.exists(file_path): |
|
st.audio(file_path) |
|
with open(file_path, "rb") as f: |
|
b64 = base64.b64encode(f.read()).decode() |
|
dl_link = f'<a href="data:audio/mpeg;base64,{b64}" download="{os.path.basename(file_path)}">🎵 Download {os.path.basename(file_path)}</a>' |
|
st.markdown(dl_link, unsafe_allow_html=True) |
|
|
|
|
|
async def save_pasted_image(image, username): |
|
""" |
|
Save images, a shared sight’s cheer, in cache so clear, our vision near, in sync’s spear, in Streamlit’s near! 🖼️🌟 |
|
""" |
|
await asyncio.to_thread(log_action, username, "📸💾 - Image saver - pics preserved!") |
|
timestamp = format_timestamp_prefix(username) |
|
media_filename = f"{username}-{timestamp.split('-')[-1]}.png" |
|
media_path = os.path.join(MEDIA_DIR, media_filename) |
|
img_byte_arr = io.BytesIO() |
|
image.save(img_byte_arr, format='PNG') |
|
await asyncio.to_thread(lambda: open(media_path, 'wb').write(img_byte_arr.getvalue())) |
|
return media_filename |
|
|
|
|
|
async def save_media(file, username, ext): |
|
""" |
|
Save media, a shared epic’s might, in cache so bright, our treasures ignite, in sync’s kite, in Streamlit’s kite! 🧙🔥 |
|
""" |
|
await asyncio.to_thread(log_action, username, f"📸💾 - Media saver - {ext} preserved!") |
|
timestamp = format_timestamp_prefix(username) |
|
media_filename = f"{username}-{timestamp.split('-')[-1]}.{ext}" |
|
media_path = os.path.join(MEDIA_DIR, media_filename) |
|
await asyncio.to_thread(lambda: open(media_path, 'wb').write(file.getbuffer())) |
|
return media_filename |
|
|
|
|
|
async def save_pdf_and_generate_audio(pdf_file, username, max_pages=10): |
|
""" |
|
Save PDFs, a shared document’s glee, in cache we see, our history’s key, in sync’s sea, in Streamlit’s key! 📚🌟 |
|
""" |
|
await asyncio.to_thread(log_action, username, "📜🎶 - PDF saver and audio generator!") |
|
timestamp = format_timestamp_prefix(username) |
|
file_hash = hashlib.md5(pdf_file.getbuffer()).hexdigest()[:8] |
|
pdf_filename = f"{username}-{timestamp.split('-')[-1]}-{file_hash}.pdf" |
|
media_path = os.path.join(MEDIA_DIR, pdf_filename) |
|
with open(media_path, 'wb') as f: |
|
f.write(pdf_file.getbuffer()) |
|
|
|
reader = PdfReader(media_path) |
|
total_pages = min(len(reader.pages), max_pages) |
|
texts = [] |
|
audio_files = [] |
|
|
|
audio_processor = AudioProcessor() |
|
voice = st.session_state.voice if username == st.session_state.username else FUN_USERNAMES.get(username, "en-US-AriaNeural") |
|
|
|
for i in range(total_pages): |
|
text = reader.pages[i].extract_text() |
|
texts.append(text) |
|
audio_filename = f"{username}-{timestamp.split('-')[-1]}-page{i+1}-{file_hash}-voice-{voice}.mp3" |
|
audio_path = os.path.join(AUDIO_DIR, audio_filename) |
|
audio_data = await audio_processor.create_audio(text, voice, audio_path) |
|
if audio_data: |
|
audio_files.append(audio_filename) |
|
|
|
return pdf_filename, texts, audio_files |
|
|
|
|
|
def get_video_html(video_path, width="100px"): |
|
""" |
|
Render videos, a shared screen’s thrill, in cache we fill, with cinematic will, in sync’s hill, in Streamlit’s will! 📺🌌 |
|
""" |
|
video_url = f"data:video/mp4;base64,{base64.b64encode(open(os.path.join(MEDIA_DIR, video_path), 'rb').read()).decode()}" |
|
return f''' |
|
<video width="{width}" controls autoplay muted loop> |
|
<source src="{video_url}" type="video/mp4"> |
|
Your browser does not support the video tag. |
|
</video> |
|
''' |
|
|
|
|
|
async def get_audio_html(audio_path, width="100px"): |
|
""" |
|
Render audio, a shared sound’s cheer, in cache so clear, our music near, in sync’s spear, in Streamlit’s near! 🎵🌟 |
|
""" |
|
username = st.session_state.get('username', 'System 🌟') |
|
await asyncio.to_thread(log_action, username, "🎶✈️ - Audio renderer - sounds soar!") |
|
audio_url = f"data:audio/mpeg;base64,{base64.b64encode(await asyncio.to_thread(open, os.path.join(AUDIO_DIR, audio_path), 'rb').read()).decode()}" |
|
return f''' |
|
<audio controls style="width: {width};"> |
|
<source src="{audio_url}" type="audio/mpeg"> |
|
Your browser does not support the audio element. |
|
</audio> |
|
''' |
|
|
|
|
|
async def websocket_handler(websocket, path): |
|
""" |
|
Handle chats, a shared link’s might, in cache so bright, our network’s light, in sync’s night, in Streamlit’s light! 🌍🔌 |
|
""" |
|
username = st.session_state.get('username', 'System 🌟') |
|
await asyncio.to_thread(log_action, username, "🌐🔗 - Websocket handler - chat links up!") |
|
try: |
|
client_id = str(uuid.uuid4()) |
|
room_id = "chat" |
|
st.session_state.active_connections.setdefault(room_id, {})[client_id] = websocket |
|
chat_content = await load_chat() |
|
username = st.session_state.get('username', random.choice(list(FUN_USERNAMES.keys()))) |
|
if not await has_joined_before(client_id, chat_content): |
|
await save_chat_entry(f"Client-{client_id}", f"{username} has joined {START_ROOM}!") |
|
async for message in websocket: |
|
parts = message.split('|', 1) |
|
if len(parts) == 2: |
|
username, content = parts |
|
await save_chat_entry(username, content) |
|
except websockets.ConnectionClosed: |
|
pass |
|
finally: |
|
if room_id in st.session_state.active_connections and client_id in st.session_state.active_connections[room_id]: |
|
del st.session_state.active_connections[room_id][client_id] |
|
|
|
|
|
async def broadcast_message(message, room_id): |
|
""" |
|
Broadcast words, a shared echo’s cheer, in cache so clear, our message near, in sync’s spear, in Streamlit’s near! 🌠📡 |
|
""" |
|
username = st.session_state.get('username', 'System 🌟') |
|
await asyncio.to_thread(log_action, username, "📢✈️ - Message broadcaster - words fly far!") |
|
if room_id in st.session_state.active_connections: |
|
disconnected = [] |
|
for client_id, ws in st.session_state.active_connections[room_id].items(): |
|
try: |
|
await ws.send(message) |
|
except websockets.ConnectionClosed: |
|
disconnected.append(client_id) |
|
for client_id in disconnected: |
|
del st.session_state.active_connections[room_id][client_id] |
|
|
|
|
|
async def run_websocket_server(): |
|
""" |
|
Start server, a shared spin’s delight, in cache so right, our web takes flight, in sync’s night, in Streamlit’s flight! 🚀🌐 |
|
""" |
|
username = st.session_state.get('username', 'System 🌟') |
|
await asyncio.to_thread(log_action, username, "🖥️🌀 - Server starter - web spins up!") |
|
if not st.session_state.server_running: |
|
server = await websockets.serve(websocket_handler, '0.0.0.0', 8765) |
|
st.session_state.server_running = True |
|
await server.wait_closed() |
|
|
|
|
|
def delete_user_files(): |
|
""" |
|
Delete files, a shared purge’s might, in cache so light, our slate wiped tight, in sync’s night, in Streamlit’s tight! 🧹🌌 |
|
""" |
|
protected_files = {'app.py', 'requirements.txt', 'README.md', CHAT_FILE, QUOTE_VOTES_FILE, MEDIA_VOTES_FILE, HISTORY_FILE, STATE_FILE, AUDIO_DIR, MEDIA_DIR} |
|
deleted_files = [] |
|
for file in os.listdir('.'): |
|
if file not in protected_files and not file.endswith('_history.md'): |
|
try: |
|
os.remove(file) |
|
deleted_files.append(file) |
|
except Exception as e: |
|
st.error(f"Failed to delete {file}: {e}") |
|
for root, dirs, files in os.walk(AUDIO_DIR): |
|
for file in files: |
|
file_path = os.path.join(root, file) |
|
try: |
|
os.remove(file_path) |
|
deleted_files.append(file_path) |
|
except Exception as e: |
|
st.error(f"Failed to delete {file_path}: {e}") |
|
for root, dirs, files in os.walk(MEDIA_DIR): |
|
for file in files: |
|
file_path = os.path.join(root, file) |
|
try: |
|
os.remove(file_path) |
|
deleted_files.append(file_path) |
|
except Exception as e: |
|
st.error(f"Failed to delete {file_path}: {e}") |
|
st.session_state.image_hashes.clear() |
|
st.session_state.audio_cache.clear() |
|
st.session_state.base64_cache.clear() |
|
st.session_state.displayed_chat_lines.clear() |
|
shared_memory = get_shared_memory() |
|
shared_memory.clear() |
|
return deleted_files |
|
|
|
|
|
def check_query_params(): |
|
""" |
|
Check queries, a shared name’s quest, in cache so blessed, our path expressed, in sync’s zest, in Streamlit’s best! 🌟🔍 |
|
""" |
|
query_params = st.query_params if hasattr(st, 'query_params') else st.experimental_get_query_params() |
|
q_value = query_params.get("q", [None])[0] |
|
if q_value and q_value in FUN_USERNAMES: |
|
st.session_state.username = q_value |
|
st.session_state.voice = FUN_USERNAMES[q_value] |
|
return q_value |
|
elif q_value: |
|
st.session_state.user_id = q_value |
|
return None |
|
|
|
|
|
def generate_mermaid_graph(chat_lines): |
|
""" |
|
Generate graphs, a shared vision’s rhyme, in cache sublime, our story’s chime, in sync’s time, in Streamlit’s chime! 🧩🌌 |
|
""" |
|
mermaid_code = "graph TD\n" |
|
nodes = {} |
|
edges = [] |
|
for i, line in enumerate(chat_lines): |
|
if line.strip() and not line.startswith(' '): |
|
timestamp = line.split('] ')[0][1:] if '] ' in line else "Unknown" |
|
content = line.split(': ', 1)[1] if ': ' in line else line |
|
user = content.split(' ')[0] |
|
message = content.split(' ', 1)[1] if ' ' in content else '' |
|
node_id = f"{user}_{i}" |
|
nodes[node_id] = f"{user}: {message[:20]}..." if len(message) > 20 else f"{user}: {message}" |
|
if i + 1 < len(chat_lines) and "Audio:" in chat_lines[i + 1]: |
|
audio_node = f"audio_{i}" |
|
nodes[audio_node] = "🎵" |
|
edges.append(f"{node_id} --> {audio_node}") |
|
if i + 2 < len(chat_lines) and "Media:" in chat_lines[i + 2]: |
|
media_node = f"media_{i}" |
|
nodes[media_node] = "🖼" |
|
edges.append(f"{node_id} --> {media_node}") |
|
if i > 0 and "> " in line: |
|
parent_user = chat_lines[i-1].split(': ')[1].split(' ')[0] |
|
parent_id = f"{parent_user}_{i-1}" |
|
edges.append(f"{parent_id} --> {node_id}") |
|
|
|
for node_id, label in nodes.items(): |
|
mermaid_code += f" {node_id}[\"{label}\"]\n" |
|
mermaid_code += "\n".join(f" {edge}" for edge in edges) |
|
return mermaid_code |
|
|
|
|
|
def main(): |
|
""" |
|
Launch our saga, a shared memory’s cheer, in Streamlit’s cache, our epic year, in sync’s spear, in Streamlit’s year! 🚀✨ |
|
""" |
|
NODE_NAME, port = get_node_name() |
|
|
|
|
|
try: |
|
loop = asyncio.get_event_loop() |
|
if loop.is_closed(): |
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
except RuntimeError: |
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
|
|
|
|
loop.run_until_complete(async_interface()) |
|
loop.close() |
|
|
|
|
|
async def async_interface(): |
|
|
|
if not st.session_state.user_id: |
|
st.session_state.user_id = str(uuid.uuid4()) |
|
st.session_state.user_hash = await generate_user_hash() |
|
|
|
|
|
q_value = check_query_params() |
|
if not q_value and 'username' not in st.session_state: |
|
chat_content = await load_chat() |
|
available_names = [name for name in FUN_USERNAMES if not any(f"{name} has joined" in line for line in chat_content.split('\n'))] |
|
st.session_state.username = random.choice(available_names) if available_names else random.choice(list(FUN_USERNAMES.keys())) |
|
st.session_state.voice = FUN_USERNAMES[st.session_state.username] |
|
st.markdown(f"**🎙️ Voice Selected**: {st.session_state.voice} 🗣️ for {st.session_state.username}") |
|
|
|
|
|
user_history_file = f"{st.session_state.username}_history.md" |
|
if os.path.exists(user_history_file): |
|
with open(user_history_file, 'r') as f: |
|
st.session_state.displayed_chat_lines = f.read().split('\n') |
|
|
|
user_url = f"/q={st.session_state.username}" |
|
with st.container(): |
|
st.markdown(f"<small>Your unique URL path: [{user_url}]({user_url})</small>", unsafe_allow_html=True) |
|
|
|
with st.container(): |
|
st.markdown(f"#### 🤖🧠MMO {st.session_state.username}📝🔬") |
|
st.markdown(f"<small>Welcome to {START_ROOM} - chat, vote, upload, paste images, and enjoy quoting! 🎉 User ID: {st.session_state.user_id}</small>", unsafe_allow_html=True) |
|
|
|
if not st.session_state.server_task: |
|
st.session_state.server_task = asyncio.create_task(run_websocket_server()) |
|
|
|
|
|
with st.container(): |
|
st.markdown(f"##### {START_ROOM} Chat History 💬") |
|
shared_memory = get_shared_memory() |
|
chat_content = await load_chat() |
|
chat_lines = [line for line in chat_content.split('\n') if line.strip() and not line.startswith('#')] |
|
if chat_lines: |
|
chat_by_minute = {} |
|
for line in reversed(chat_lines): |
|
timestamp = line.split('] ')[0][1:] if '] ' in line else "Unknown" |
|
minute_key = timestamp[:16] |
|
if minute_key not in chat_by_minute: |
|
chat_by_minute[minute_key] = [] |
|
chat_by_minute[minute_key].append(line) |
|
|
|
markdown_output = "" |
|
for minute, lines in chat_by_minute.items(): |
|
minute_output = f"###### {minute[-5:]}\n" |
|
for line in lines: |
|
if ': ' in line and not line.startswith(' '): |
|
user_message = line.split(': ', 1)[1] |
|
user = user_message.split(' ')[0] |
|
msg = user_message.split(' ', 1)[1] if ' ' in user_message else '' |
|
audio_html = "" |
|
media_content = "" |
|
next_lines = chat_lines[chat_lines.index(line)+1:chat_lines.index(line)+3] |
|
for nl in next_lines: |
|
if "Audio:" in nl: |
|
audio_file = nl.split("Audio: ")[-1].strip() |
|
audio_html = play_and_download_audio(audio_file) |
|
elif "Media:" in nl: |
|
media_file = nl.split("Media: ")[-1].strip('![]()') |
|
media_path = os.path.join(MEDIA_DIR, media_file) |
|
if os.path.exists(media_path): |
|
if media_file.endswith(('.png', '.jpg')): |
|
media_content = f"<img src='file://{media_path}' width='100'>" |
|
elif media_file.endswith('.mp4'): |
|
media_content = get_video_html(media_file) |
|
elif media_file.endswith('.mp3'): |
|
media_content = await get_audio_html(media_file) |
|
elif media_file.endswith('.pdf'): |
|
media_content = f"📜 {os.path.basename(media_file)}" |
|
minute_output += f"- 💬 **{user}**: {msg[:50]}... {audio_html} {media_content}\n" |
|
markdown_output += minute_output |
|
st.markdown(markdown_output, unsafe_allow_html=True) |
|
shared_memory.update_chat(markdown_output) |
|
|
|
|
|
st.markdown("###### Condensed Dialogs 🗣️") |
|
condensed_dialogs = shared_memory.get_condensed_dialogs() |
|
st.markdown(condensed_dialogs) |
|
|
|
|
|
st.markdown("###### Chat Relationship Tree 🌳") |
|
mermaid_code = generate_mermaid_graph(chat_lines) |
|
mermaid_html = f""" |
|
<script src="https://cdn.jsdelivr.net/npm/mermaid/dist/mermaid.min.js"></script> |
|
<div class="mermaid" style="height: 200px; overflow: auto;">{mermaid_code}</div> |
|
<script>mermaid.initialize({{startOnLoad:true}});</script> |
|
""" |
|
components.html(mermaid_html, height=250) |
|
|
|
with st.container(): |
|
if st.session_state.quote_line: |
|
st.markdown(f"###### Quoting: {st.session_state.quote_line}") |
|
quote_response = st.text_area("Add your response", key="quote_response", value=st.session_state.message_text) |
|
paste_result_quote = paste_image_button("📋 Paste Image or Text with Quote", key="paste_button_quote") |
|
if paste_result_quote.image_data is not None: |
|
if isinstance(paste_result_quote.image_data, str): |
|
st.session_state.message_text = paste_result_quote.image_data |
|
st.text_area("Add your response", key="quote_response", value=st.session_state.message_text) |
|
else: |
|
st.image(paste_result_quote.image_data, caption="Received Image for Quote") |
|
filename = await save_pasted_image(paste_result_quote.image_data, st.session_state.username) |
|
if filename: |
|
st.session_state.pasted_image_data = filename |
|
await save_chat_entry(st.session_state.username, f"Pasted image: {filename}", quote_line=st.session_state.quote_line, media_file=paste_result_quote.image_data) |
|
if st.button("Send Quote 🚀", key="send_quote"): |
|
markdown_response = f"### Quote Response\n- **Original**: {st.session_state.quote_line}\n- **{st.session_state.username} Replies**: {quote_response}" |
|
if st.session_state.pasted_image_data: |
|
markdown_response += f"\n- **Image**: " |
|
await save_chat_entry(st.session_state.username, f"Pasted image: {st.session_state.pasted_image_data}", quote_line=st.session_state.quote_line, media_file=st.session_state.pasted_image_data) |
|
st.session_state.pasted_image_data = None |
|
await save_chat_entry(st.session_state.username, markdown_response, is_markdown=True, quote_line=st.session_state.quote_line) |
|
st.session_state.quote_line = None |
|
st.session_state.message_text = '' |
|
st.rerun() |
|
|
|
current_selection = st.session_state.username if st.session_state.username in FUN_USERNAMES else "" |
|
new_username = st.selectbox("Change Name and Voice", [""] + list(FUN_USERNAMES.keys()), index=(list(FUN_USERNAMES.keys()).index(current_selection) + 1 if current_selection else 0), format_func=lambda x: f"{x} ({FUN_USERNAMES.get(x, 'No Voice')})" if x else "Select a name") |
|
if new_username and new_username != st.session_state.username: |
|
await save_chat_entry("System 🌟", f"{st.session_state.username} changed name to {new_username}") |
|
st.session_state.username = new_username |
|
st.session_state.voice = FUN_USERNAMES[new_username] |
|
st.markdown(f"**🎙️ Voice Changed**: {st.session_state.voice} 🗣️ for {st.session_state.username}") |
|
st.rerun() |
|
|
|
|
|
col_input, col_send = st.columns([5, 1]) |
|
with col_input: |
|
message = st.text_input(f"Message as {st.session_state.username} (Voice: {st.session_state.voice})", key="message_input", value=st.session_state.message_text) |
|
with col_send: |
|
if st.button("Send 🚀", key="send_button"): |
|
if message.strip() or st.session_state.pasted_image_data: |
|
await save_chat_entry(st.session_state.username, message if message.strip() else "Image shared", is_markdown=True, media_file=st.session_state.pasted_image_data if st.session_state.pasted_image_data else None, skip_audio=not message.strip()) |
|
if st.session_state.pasted_image_data: |
|
st.session_state.pasted_image_data = None |
|
st.session_state.message_text = '' |
|
st.rerun() |
|
|
|
paste_result_msg = paste_image_button("📋 Paste Image or Text with Message", key="paste_button_msg") |
|
if paste_result_msg.image_data is not None: |
|
if isinstance(paste_result_msg.image_data, str): |
|
st.session_state.message_text = paste_result_msg.image_data |
|
st.text_input(f"Message as {st.session_state.username} (Voice: {st.session_state.voice})", key="message_input_paste", value=st.session_state.message_text) |
|
else: |
|
st.image(paste_result_msg.image_data, caption="Received Image for Quote") |
|
filename = await save_pasted_image(paste_result_msg.image_data, st.session_state.username) |
|
if filename: |
|
await save_chat_entry(st.session_state.username, "Image shared", is_markdown=True, media_file=paste_result_msg.image_data, skip_audio=True) |
|
st.session_state.pasted_image_data = None |
|
st.rerun() |
|
|
|
with st.container(): |
|
st.markdown("###### Upload Media 🎨🎶📜🎥") |
|
uploaded_file = st.file_uploader("Upload Media", type=['png', 'jpg', 'mp4', 'mp3', 'wav', 'pdf', 'txt', 'md', 'py']) |
|
if uploaded_file: |
|
timestamp = format_timestamp_prefix(st.session_state.username) |
|
username = st.session_state.username |
|
ext = uploaded_file.name.split('.')[-1].lower() |
|
if ext in ['png', 'jpg']: |
|
filename = await save_pasted_image(uploaded_file, username) |
|
elif ext in ['mp4', 'mp3', 'wav']: |
|
filename = await save_media(uploaded_file, username, ext) |
|
elif ext == 'pdf': |
|
pdf_filename, _, _ = await save_pdf_and_generate_audio(uploaded_file, username) |
|
filename = pdf_filename |
|
else: |
|
filename = f"{username}-{timestamp.split('-')[-1]}.{ext}" |
|
media_path = os.path.join(MEDIA_DIR, filename) |
|
await asyncio.to_thread(lambda: open(media_path, 'wb').write(uploaded_file.getbuffer())) |
|
await save_chat_entry(username, f"Uploaded {ext.upper()}: {os.path.basename(filename)}", media_file=uploaded_file, skip_audio=True) |
|
shared_memory = get_shared_memory() |
|
shared_memory.update_media(os.path.join(MEDIA_DIR, filename)) |
|
st.success(f"Uploaded {filename}") |
|
|
|
|
|
st.markdown("###### 🛑 Danger Zone") |
|
if st.button("Try Not To Delete It All On Your First Day", key="delete_all", help="Deletes all user-added files!", type="primary", use_container_width=True): |
|
deleted_files = delete_user_files() |
|
if deleted_files: |
|
st.markdown("### 🗑️ Deleted Files:\n" + "\n".join([f"- `{file}`" for file in deleted_files])) |
|
shared_memory = get_shared_memory() |
|
shared_memory.clear() |
|
else: |
|
st.markdown("### 🗑️ Nothing to Delete!") |
|
st.rerun() |
|
|
|
st.markdown("###### Refresh ⏳") |
|
refresh_rate = st.slider("Refresh Rate", 1, 300, st.session_state.refresh_rate) |
|
st.session_state.refresh_rate = refresh_rate |
|
timer_placeholder = st.empty() |
|
for i in range(st.session_state.refresh_rate, -1, -1): |
|
font_name, font_func = random.choice(UNICODE_FONTS) |
|
countdown_str = "".join(UNICODE_DIGITS[int(d)] for d in str(i)) if i < 10 else font_func(str(i)) |
|
timer_placeholder.markdown(f"<small>⏳ {font_func('Refresh in:')} {countdown_str}</small>", unsafe_allow_html=True) |
|
await asyncio.sleep(1) |
|
st.rerun() |
|
|
|
|
|
with st.container(): |
|
all_files = glob.glob(os.path.join(MEDIA_DIR, "*.md")) + glob.glob(os.path.join(MEDIA_DIR, "*.pdf")) + glob.glob(os.path.join(MEDIA_DIR, "*.txt")) + glob.glob(os.path.join(MEDIA_DIR, "*.py")) + glob.glob(os.path.join(MEDIA_DIR, "*.png")) + glob.glob(os.path.join(MEDIA_DIR, "*.jpg")) + glob.glob(os.path.join(MEDIA_DIR, "*.mp3")) + glob.glob(os.path.join(MEDIA_DIR, "*.mp4")) + glob.glob(os.path.join(AUDIO_DIR, "*.mp3")) |
|
shared_memory = get_shared_memory() |
|
own_files = [f for f in all_files if st.session_state.user_id in os.path.basename(f) or st.session_state.username in os.path.basename(f)] |
|
shared_files = [f for f in all_files if f not in own_files and not f in [CHAT_FILE, QUOTE_VOTES_FILE, MEDIA_VOTES_FILE, HISTORY_FILE, STATE_FILE, os.path.join(AUDIO_DIR, "*"), os.path.join(MEDIA_DIR, "*")]] |
|
|
|
st.markdown("###### Your Files 📂") |
|
st.markdown("###### Image Gallery 🖼") |
|
own_image_files = [f for f in own_files if f.endswith(('.png', '.jpg'))] |
|
image_cols = st.slider("Image Gallery Columns 🖼 (Own)", min_value=1, max_value=15, value=5) |
|
cols = st.columns(image_cols) |
|
for idx, image_file in enumerate(own_image_files): |
|
with cols[idx % image_cols]: |
|
st.image(image_file, use_container_width=True) |
|
shared_memory.update_media(image_file) |
|
|
|
st.markdown("###### Video Gallery 🎥") |
|
own_video_files = [f for f in own_files if f.endswith('.mp4')] |
|
video_cols = st.slider("Video Gallery Columns 🎬 (Own)", min_value=1, max_value=5, value=3) |
|
cols = st.columns(video_cols) |
|
for idx, video_file in enumerate(own_video_files): |
|
with cols[idx % video_cols]: |
|
st.markdown(get_video_html(video_file), unsafe_allow_html=True) |
|
shared_memory.update_media(video_file) |
|
|
|
st.markdown("###### Audio Gallery 🎧") |
|
own_audio_files = [f for f in own_files if f.endswith(('.mp3', '.wav')) or f.startswith(os.path.join(AUDIO_DIR, "audio_"))] |
|
audio_cols = st.slider("Audio Gallery Columns 🎶 (Own)", min_value=1, max_value=15, value=5) |
|
cols = st.columns(audio_cols) |
|
for idx, audio_file in enumerate(own_audio_files): |
|
with cols[idx % audio_cols]: |
|
st.markdown(await get_audio_html(audio_file), unsafe_allow_html=True) |
|
shared_memory.update_media(audio_file) |
|
|
|
st.markdown("###### Shared Files 📤") |
|
st.markdown("###### Image Gallery 🖼") |
|
shared_image_files = [f for f in shared_files if f.endswith(('.png', '.jpg'))] |
|
image_cols = st.slider("Image Gallery Columns 🖼 (Shared)", min_value=1, max_value=15, value=5) |
|
cols = st.columns(image_cols) |
|
for idx, image_file in enumerate(shared_image_files): |
|
with cols[idx % image_cols]: |
|
st.image(image_file, use_container_width=True) |
|
shared_memory.update_media(image_file) |
|
|
|
st.markdown("###### Video Gallery 🎥") |
|
shared_video_files = [f for f in shared_files if f.endswith('.mp4')] |
|
video_cols = st.slider("Video Gallery Columns 🎬 (Shared)", min_value=1, max_value=5, value=3) |
|
cols = st.columns(video_cols) |
|
for idx, video_file in enumerate(shared_video_files): |
|
with cols[idx % video_cols]: |
|
st.markdown(get_video_html(video_file), unsafe_allow_html=True) |
|
shared_memory.update_media(video_file) |
|
|
|
st.markdown("###### Audio Gallery 🎧") |
|
shared_audio_files = [f for f in shared_files if f.endswith(('.mp3', '.wav')) or f.startswith(os.path.join(AUDIO_DIR, "audio_"))] |
|
audio_cols = st.slider("Audio Gallery Columns 🎶 (Shared)", min_value=1, max_value=15, value=5) |
|
cols = st.columns(audio_cols) |
|
for idx, audio_file in enumerate(shared_audio_files): |
|
with cols[idx % audio_cols]: |
|
st.markdown(await get_audio_html(audio_file), unsafe_allow_html=True) |
|
shared_memory.update_media(audio_file) |
|
|
|
|
|
with st.container(): |
|
st.markdown("###### Full Chat Log 📜") |
|
with open(CHAT_FILE, 'r') as f: |
|
history_content = f.read() |
|
st.markdown(history_content) |
|
st.download_button("Download Chat Log as .md", history_content, file_name=f"chat_{st.session_state.user_id}.md", mime="text/markdown") |
|
|
|
|
|
if st.button("Clear Shared Memory Cache", key="clear_cache"): |
|
shared_memory = get_shared_memory() |
|
shared_memory.clear() |
|
st.success("Shared memory cache cleared, a fresh start with a bard’s heart, in sync’s art, in Streamlit’s art! 🌟🎶") |
|
|
|
if __name__ == "__main__": |
|
""" |
|
Launch our saga, a shared memory’s cheer, in Streamlit’s cache, our epic year, in sync’s spear, in Streamlit’s year! 🚀✨ |
|
""" |
|
main() |