|
import streamlit as st |
|
import asyncio |
|
import websockets |
|
import uuid |
|
import argparse |
|
from datetime import datetime |
|
import os |
|
import random |
|
import time |
|
import hashlib |
|
from PIL import Image |
|
import glob |
|
from urllib.parse import quote |
|
import base64 |
|
import io |
|
import streamlit.components.v1 as components |
|
import edge_tts |
|
from audio_recorder_streamlit import audio_recorder |
|
import nest_asyncio |
|
import re |
|
from streamlit_paste_button import paste_image_button |
|
|
|
|
|
nest_asyncio.apply() |
|
|
|
|
|
icons = 'π€π§ π¬π' |
|
START_ROOM = "Sector π" |
|
|
|
|
|
st.set_page_config( |
|
page_title="π€π§ MMO Chat Brainππ¬", |
|
page_icon=icons, |
|
layout="wide", |
|
initial_sidebar_state="auto" |
|
) |
|
|
|
|
|
FUN_USERNAMES = { |
|
"CosmicJester π": "en-US-AriaNeural", |
|
"PixelPanda πΌ": "en-US-JennyNeural", |
|
"QuantumQuack π¦": "en-GB-SoniaNeural", |
|
"StellarSquirrel πΏοΈ": "en-AU-NatashaNeural", |
|
"GizmoGuru βοΈ": "en-CA-ClaraNeural", |
|
"NebulaNinja π ": "en-US-GuyNeural", |
|
"ByteBuster πΎ": "en-GB-RyanNeural", |
|
"GalacticGopher π": "en-AU-WilliamNeural", |
|
"RocketRaccoon π": "en-CA-LiamNeural", |
|
"EchoElf π§": "en-US-AnaNeural", |
|
"PhantomFox π¦": "en-US-BrandonNeural", |
|
"WittyWizard π§": "en-GB-ThomasNeural", |
|
"LunarLlama π": "en-AU-FreyaNeural", |
|
"SolarSloth βοΈ": "en-CA-LindaNeural", |
|
"AstroAlpaca π¦": "en-US-ChristopherNeural", |
|
"CyberCoyote πΊ": "en-GB-ElliotNeural", |
|
"MysticMoose π¦": "en-AU-JamesNeural", |
|
"GlitchGnome π§": "en-CA-EthanNeural", |
|
"VortexViper π": "en-US-AmberNeural", |
|
"ChronoChimp π": "en-GB-LibbyNeural" |
|
} |
|
|
|
|
|
CHAT_DIR = "chat_logs" |
|
VOTE_DIR = "vote_logs" |
|
STATE_FILE = "user_state.txt" |
|
AUDIO_DIR = "audio_logs" |
|
HISTORY_DIR = "history_logs" |
|
MEDIA_DIR = "media_files" |
|
os.makedirs(CHAT_DIR, exist_ok=True) |
|
os.makedirs(VOTE_DIR, exist_ok=True) |
|
os.makedirs(AUDIO_DIR, exist_ok=True) |
|
os.makedirs(HISTORY_DIR, exist_ok=True) |
|
os.makedirs(MEDIA_DIR, exist_ok=True) |
|
|
|
CHAT_FILE = os.path.join(CHAT_DIR, "global_chat.md") |
|
QUOTE_VOTES_FILE = os.path.join(VOTE_DIR, "quote_votes.md") |
|
MEDIA_VOTES_FILE = os.path.join(VOTE_DIR, "media_votes.md") |
|
HISTORY_FILE = os.path.join(HISTORY_DIR, "chat_history.md") |
|
|
|
|
|
UNICODE_DIGITS = {i: f"{i}\uFE0Fβ£" for i in range(10)} |
|
|
|
|
|
UNICODE_FONTS = [ |
|
("Normal", lambda x: x), |
|
("Bold", lambda x: "".join(chr(ord(c) + 0x1D400 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D41A - 0x61) if 'a' <= c <= 'z' else c for c in x)), |
|
("Italic", lambda x: "".join(chr(ord(c) + 0x1D434 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D44E - 0x61) if 'a' <= c <= 'z' else c for c in x)), |
|
("Bold Italic", lambda x: "".join(chr(ord(c) + 0x1D468 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D482 - 0x61) if 'a' <= c <= 'z' else c for c in x)), |
|
("Script", lambda x: "".join(chr(ord(c) + 0x1D49C - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D4B6 - 0x61) if 'a' <= c <= 'z' else c for c in x)), |
|
("Bold Script", lambda x: "".join(chr(ord(c) + 0x1D4D0 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D4EA - 0x61) if 'a' <= c <= 'z' else c for c in x)), |
|
("Fraktur", lambda x: "".join(chr(ord(c) + 0x1D504 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D51E - 0x61) if 'a' <= c <= 'z' else c for c in x)), |
|
("Bold Fraktur", lambda x: "".join(chr(ord(c) + 0x1D56C - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D586 - 0x61) if 'a' <= c <= 'z' else c for c in x)), |
|
("Double Struck", lambda x: "".join(chr(ord(c) + 0x1D538 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D552 - 0x61) if 'a' <= c <= 'z' else c for c in x)), |
|
("Sans Serif", lambda x: "".join(chr(ord(c) + 0x1D5A0 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D5BA - 0x61) if 'a' <= c <= 'z' else c for c in x)), |
|
("Sans Serif Bold", lambda x: "".join(chr(ord(c) + 0x1D5D4 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D5EE - 0x61) if 'a' <= c <= 'z' else c for c in x)), |
|
("Sans Serif Italic", lambda x: "".join(chr(ord(c) + 0x1D608 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D622 - 0x61) if 'a' <= c <= 'z' else c for c in x)), |
|
("Sans Serif Bold Italic", lambda x: "".join(chr(ord(c) + 0x1D63C - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D656 - 0x61) if 'a' <= c <= 'z' else c for c in x)), |
|
("Monospace", lambda x: "".join(chr(ord(c) + 0x1D670 - 0x41) if 'A' <= c <= 'Z' else chr(ord(c) + 0x1D68A - 0x61) if 'a' <= c <= 'z' else c for c in x)), |
|
("Circled", lambda x: "".join(chr(ord(c) - 0x41 + 0x24B6) if 'A' <= c <= 'Z' else chr(ord(c) - 0x61 + 0x24D0) if 'a' <= c <= 'z' else c for c in x)), |
|
("Squared", lambda x: "".join(chr(ord(c) - 0x41 + 0x1F130) if 'A' <= c <= 'Z' else c for c in x)), |
|
("Negative Circled", lambda x: "".join(chr(ord(c) - 0x41 + 0x1F150) if 'A' <= c <= 'Z' else c for c in x)), |
|
("Negative Squared", lambda x: "".join(chr(ord(c) - 0x41 + 0x1F170) if 'A' <= c <= 'Z' else c for c in x)), |
|
("Regional Indicator", lambda x: "".join(chr(ord(c) - 0x41 + 0x1F1E6) if 'A' <= c <= 'Z' else c for c in x)), |
|
] |
|
|
|
|
|
if 'server_running' not in st.session_state: |
|
st.session_state.server_running = False |
|
if 'server_task' not in st.session_state: |
|
st.session_state.server_task = None |
|
if 'active_connections' not in st.session_state: |
|
st.session_state.active_connections = {} |
|
if 'media_notifications' not in st.session_state: |
|
st.session_state.media_notifications = [] |
|
if 'last_chat_update' not in st.session_state: |
|
st.session_state.last_chat_update = 0 |
|
if 'displayed_chat_lines' not in st.session_state: |
|
st.session_state.displayed_chat_lines = [] |
|
if 'old_val' not in st.session_state: |
|
st.session_state.old_val = "" |
|
if 'last_query' not in st.session_state: |
|
st.session_state.last_query = "" |
|
if 'message_text' not in st.session_state: |
|
st.session_state.message_text = "" |
|
if 'audio_cache' not in st.session_state: |
|
st.session_state.audio_cache = {} |
|
if 'pasted_image_data' not in st.session_state: |
|
st.session_state.pasted_image_data = None |
|
if 'quote_line' not in st.session_state: |
|
st.session_state.quote_line = None |
|
if 'refresh_rate' not in st.session_state: |
|
st.session_state.refresh_rate = 5 |
|
if 'base64_cache' not in st.session_state: |
|
st.session_state.base64_cache = {} |
|
|
|
|
|
def format_timestamp_prefix(username): |
|
now = datetime.now() |
|
return f"{now.strftime('%I-%M-%p-ct-%m-%d-%Y')}-by-{username}" |
|
|
|
|
|
def get_node_name(): |
|
parser = argparse.ArgumentParser(description='Start a chat node with a specific name') |
|
parser.add_argument('--node-name', type=str, default=None) |
|
parser.add_argument('--port', type=int, default=8501) |
|
args = parser.parse_args() |
|
username = st.session_state.get('username', 'System π') |
|
log_action(username, "ππΌ - Node naming - christening the beast!") |
|
return args.node_name or f"node-{uuid.uuid4().hex[:8]}", args.port |
|
|
|
|
|
def log_action(username, action): |
|
if 'action_log' not in st.session_state: |
|
st.session_state.action_log = {} |
|
user_log = st.session_state.action_log.setdefault(username, {}) |
|
current_time = time.time() |
|
user_log = {k: v for k, v in user_log.items() if current_time - v < 10} |
|
st.session_state.action_log[username] = user_log |
|
if action not in user_log: |
|
with open(HISTORY_FILE, 'a') as f: |
|
f.write(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {username}: {action}\n") |
|
user_log[action] = current_time |
|
|
|
|
|
def clean_text_for_tts(text): |
|
cleaned = re.sub(r'[#*!\[\]]+', '', text) |
|
cleaned = ' '.join(cleaned.split()) |
|
return cleaned[:200] if cleaned else "No text to speak" |
|
|
|
|
|
async def save_chat_entry(username, message): |
|
await asyncio.to_thread(log_action, username, "π¬π - Chat saver - words locked tight!") |
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
entry = f"[{timestamp}] {username}: {message}" |
|
await asyncio.to_thread(lambda: open(CHAT_FILE, 'a').write(f"{entry}\n")) |
|
voice = FUN_USERNAMES.get(username, "en-US-AriaNeural") |
|
cleaned_message = clean_text_for_tts(message) |
|
audio_file = await async_edge_tts_generate(cleaned_message, voice) |
|
if audio_file: |
|
with open(HISTORY_FILE, 'a') as f: |
|
f.write(f"[{timestamp}] {username}: Audio generated - {audio_file}\n") |
|
await broadcast_message(f"{username}|{message}", "chat") |
|
st.session_state.last_chat_update = time.time() |
|
|
|
|
|
async def load_chat(): |
|
username = st.session_state.get('username', 'System π') |
|
await asyncio.to_thread(log_action, username, "ππ - Chat loader - history unleashed!") |
|
if not os.path.exists(CHAT_FILE): |
|
await asyncio.to_thread(lambda: open(CHAT_FILE, 'a').write(f"# {START_ROOM} Chat\n\nWelcome to the cosmic hub - start chatting! π€\n")) |
|
with open(CHAT_FILE, 'r') as f: |
|
content = await asyncio.to_thread(f.read) |
|
return content |
|
|
|
|
|
async def get_user_list(chat_content): |
|
username = st.session_state.get('username', 'System π') |
|
await asyncio.to_thread(log_action, username, "π₯π - User lister - whoβs in the gang!") |
|
users = set() |
|
for line in chat_content.split('\n'): |
|
if line.strip() and ': ' in line: |
|
user = line.split(': ')[1].split(' ')[0] |
|
users.add(user) |
|
return sorted(list(users)) |
|
|
|
|
|
async def has_joined_before(client_id, chat_content): |
|
username = st.session_state.get('username', 'System π') |
|
await asyncio.to_thread(log_action, username, "πͺπ - Join checker - been here before?") |
|
return any(f"Client-{client_id}" in line for line in chat_content.split('\n')) |
|
|
|
|
|
async def get_message_suggestions(chat_content, prefix): |
|
username = st.session_state.get('username', 'System π') |
|
await asyncio.to_thread(log_action, username, "π‘π - Suggestion maker - old quips resurface!") |
|
lines = chat_content.split('\n') |
|
messages = [line.split(': ', 1)[1] for line in lines if ': ' in line and line.strip()] |
|
return [msg for msg in messages if msg.lower().startswith(prefix.lower())][:5] |
|
|
|
|
|
async def save_vote(file, item, user_hash, username, comment=""): |
|
await asyncio.to_thread(log_action, username, "ππ - Vote saver - cheers recorded!") |
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") |
|
entry = f"[{timestamp}] {user_hash} voted for {item}" |
|
await asyncio.to_thread(lambda: open(file, 'a').write(f"{entry}\n")) |
|
await asyncio.to_thread(lambda: open(HISTORY_FILE, "a").write(f"- {timestamp} - User {user_hash} voted for {item}\n")) |
|
chat_message = f"{username} upvoted: \"{item}\"" |
|
if comment: |
|
chat_message += f" - {comment}" |
|
await save_chat_entry(username, chat_message) |
|
|
|
|
|
async def load_votes(file): |
|
username = st.session_state.get('username', 'System π') |
|
await asyncio.to_thread(log_action, username, "ππ - Vote counter - tallying the love!") |
|
if not os.path.exists(file): |
|
await asyncio.to_thread(lambda: open(file, 'w').write("# Vote Tally\n\nNo votes yet - get clicking! π±οΈ\n")) |
|
with open(file, 'r') as f: |
|
content = await asyncio.to_thread(f.read) |
|
lines = content.strip().split('\n')[2:] |
|
votes = {} |
|
user_votes = set() |
|
for line in lines: |
|
if line.strip() and 'voted for' in line: |
|
user_hash = line.split('] ')[1].split(' voted for ')[0] |
|
item = line.split('voted for ')[1] |
|
vote_key = f"{user_hash}-{item}" |
|
if vote_key not in user_votes: |
|
votes[item] = votes.get(item, 0) + 1 |
|
user_votes.add(vote_key) |
|
return votes |
|
|
|
|
|
async def generate_user_hash(): |
|
username = st.session_state.get('username', 'System π') |
|
await asyncio.to_thread(log_action, username, "ππ΅οΈ - Hash generator - secret codes ahoy!") |
|
if 'user_hash' not in st.session_state: |
|
st.session_state.user_hash = hashlib.md5(str(random.getrandbits(128)).encode()).hexdigest()[:8] |
|
return st.session_state.user_hash |
|
|
|
|
|
async def async_edge_tts_generate(text, voice, rate=0, pitch=0, file_format="mp3"): |
|
username = st.session_state.get('username', 'System π') |
|
await asyncio.to_thread(log_action, username, "πΆπ - Audio maker - voices come alive!") |
|
timestamp = format_timestamp_prefix(username) |
|
filename = f"{timestamp}.{file_format}" |
|
filepath = os.path.join(AUDIO_DIR, filename) |
|
communicate = edge_tts.Communicate(text, voice, rate=f"{rate:+d}%", pitch=f"{pitch:+d}Hz") |
|
try: |
|
await communicate.save(filepath) |
|
return filepath if os.path.exists(filepath) else None |
|
except edge_tts.exceptions.NoAudioReceived: |
|
with open(HISTORY_FILE, 'a') as f: |
|
f.write(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] {username}: Audio failed - No audio received for '{text}'\n") |
|
return None |
|
|
|
|
|
def play_and_download_audio(file_path): |
|
if file_path and os.path.exists(file_path): |
|
st.audio(file_path) |
|
if file_path not in st.session_state.base64_cache: |
|
with open(file_path, "rb") as f: |
|
b64 = base64.b64encode(f.read()).decode() |
|
st.session_state.base64_cache[file_path] = b64 |
|
b64 = st.session_state.base64_cache[file_path] |
|
dl_link = f'<a href="data:audio/mpeg;base64,{b64}" download="{os.path.basename(file_path)}">π΅ Download {os.path.basename(file_path)}</a>' |
|
st.markdown(dl_link, unsafe_allow_html=True) |
|
|
|
|
|
async def save_pasted_image(image, username): |
|
await asyncio.to_thread(log_action, username, "πΈπΎ - Image saver - pics preserved!") |
|
timestamp = format_timestamp_prefix(username) |
|
filename = f"{timestamp}.png" |
|
filepath = os.path.join(MEDIA_DIR, filename) |
|
await asyncio.to_thread(image.save, filepath, "PNG") |
|
return filepath |
|
|
|
|
|
async def get_video_html(video_path, width="100%"): |
|
username = st.session_state.get('username', 'System π') |
|
await asyncio.to_thread(log_action, username, "π₯π¬ - Video renderer - movies roll!") |
|
with open(video_path, 'rb') as f: |
|
video_data = await asyncio.to_thread(f.read) |
|
video_url = f"data:video/mp4;base64,{base64.b64encode(video_data).decode()}" |
|
return f'<video width="{width}" controls autoplay><source src="{video_url}" type="video/mp4">Your browser does not support the video tag.</video>' |
|
|
|
|
|
async def get_audio_html(audio_path, width="100%"): |
|
username = st.session_state.get('username', 'System π') |
|
await asyncio.to_thread(log_action, username, "πΆβοΈ - Audio renderer - sounds soar!") |
|
audio_url = f"data:audio/mpeg;base64,{base64.b64encode(await asyncio.to_thread(open, audio_path, 'rb').read()).decode()}" |
|
return f'<audio controls style="width: {width};"><source src="{audio_url}" type="audio/mpeg">Your browser does not support the audio element.</audio>' |
|
|
|
|
|
async def websocket_handler(websocket, path): |
|
username = st.session_state.get('username', 'System π') |
|
await asyncio.to_thread(log_action, username, "ππ - Websocket handler - chat links up!") |
|
try: |
|
client_id = str(uuid.uuid4()) |
|
room_id = "chat" |
|
st.session_state.active_connections.setdefault(room_id, {})[client_id] = websocket |
|
chat_content = await load_chat() |
|
username = st.session_state.get('username', random.choice(list(FUN_USERNAMES.keys()))) |
|
if not await has_joined_before(client_id, chat_content): |
|
await save_chat_entry(f"Client-{client_id}", f"{username} has joined {START_ROOM}!") |
|
async for message in websocket: |
|
parts = message.split('|', 1) |
|
if len(parts) == 2: |
|
username, content = parts |
|
await save_chat_entry(username, content) |
|
except websockets.ConnectionClosed: |
|
pass |
|
finally: |
|
if room_id in st.session_state.active_connections and client_id in st.session_state.active_connections[room_id]: |
|
del st.session_state.active_connections[room_id][client_id] |
|
|
|
|
|
async def broadcast_message(message, room_id): |
|
username = st.session_state.get('username', 'System π') |
|
await asyncio.to_thread(log_action, username, "π’βοΈ - Message broadcaster - words fly far!") |
|
if room_id in st.session_state.active_connections: |
|
disconnected = [] |
|
for client_id, ws in st.session_state.active_connections[room_id].items(): |
|
try: |
|
await ws.send(message) |
|
except websockets.ConnectionClosed: |
|
disconnected.append(client_id) |
|
for client_id in disconnected: |
|
del st.session_state.active_connections[room_id][client_id] |
|
|
|
|
|
async def run_websocket_server(): |
|
username = st.session_state.get('username', 'System π') |
|
await asyncio.to_thread(log_action, username, "π₯οΈπ - Server starter - web spins up!") |
|
if not st.session_state.server_running: |
|
server = await websockets.serve(websocket_handler, '0.0.0.0', 8765) |
|
st.session_state.server_running = True |
|
await server.wait_closed() |
|
|
|
|
|
async def process_voice_input(audio_bytes): |
|
username = st.session_state.get('username', 'System π') |
|
await asyncio.to_thread(log_action, username, "π€π - Voice processor - speech to text!") |
|
if audio_bytes: |
|
text = "Voice input simulation" |
|
await save_chat_entry(username, text) |
|
|
|
|
|
async def perform_ai_lookup(query, vocal_summary=True, extended_refs=False, titles_summary=True, full_audio=False, useArxiv=True, useArxivAudio=False): |
|
username = st.session_state.get('username', 'System π') |
|
result = f"AI Lookup Result for '{query}' (Arxiv: {useArxiv}, Audio: {useArxivAudio})" |
|
await save_chat_entry(username, result) |
|
if useArxivAudio: |
|
audio_file = await async_edge_tts_generate(result, FUN_USERNAMES.get(username, "en-US-AriaNeural")) |
|
if audio_file: |
|
st.audio(audio_file) |
|
|
|
|
|
def main(): |
|
NODE_NAME, port = get_node_name() |
|
|
|
loop = asyncio.new_event_loop() |
|
asyncio.set_event_loop(loop) |
|
|
|
async def async_interface(): |
|
if 'username' not in st.session_state: |
|
chat_content = await load_chat() |
|
available_names = [name for name in FUN_USERNAMES if not any(f"{name} has joined" in line for line in chat_content.split('\n'))] |
|
st.session_state.username = random.choice(available_names) if available_names else random.choice(list(FUN_USERNAMES.keys())) |
|
|
|
st.title(f"π€π§ MMO {st.session_state.username}ππ¬") |
|
st.markdown(f"Welcome to {START_ROOM} - chat, vote, upload, paste images, and enjoy quoting! π") |
|
|
|
if not st.session_state.server_task: |
|
st.session_state.server_task = loop.create_task(run_websocket_server()) |
|
|
|
audio_bytes = audio_recorder() |
|
if audio_bytes: |
|
await process_voice_input(audio_bytes) |
|
st.rerun() |
|
|
|
|
|
st.subheader(f"{START_ROOM} Chat π¬") |
|
chat_content = await load_chat() |
|
chat_lines = chat_content.split('\n') |
|
chat_votes = await load_votes(QUOTE_VOTES_FILE) |
|
|
|
current_time = time.time() |
|
if current_time - st.session_state.last_chat_update > 1 or not st.session_state.displayed_chat_lines: |
|
new_lines = [line for line in chat_lines if line.strip() and ': ' in line and line not in st.session_state.displayed_chat_lines and not line.startswith('#')] |
|
st.session_state.displayed_chat_lines.extend(new_lines) |
|
st.session_state.last_chat_update = current_time |
|
|
|
for i, line in enumerate(st.session_state.displayed_chat_lines): |
|
col1, col2, col3, col4 = st.columns([3, 1, 1, 2]) |
|
with col1: |
|
st.markdown(line) |
|
if "Pasted image:" in line or "Uploaded media:" in line: |
|
file_path = line.split(': ')[-1].strip() |
|
if os.path.exists(file_path): |
|
if file_path not in st.session_state.base64_cache: |
|
with open(file_path, "rb") as f: |
|
b64 = base64.b64encode(f.read()).decode() |
|
st.session_state.base64_cache[file_path] = b64 |
|
b64 = st.session_state.base64_cache[file_path] |
|
mime_type = "image/png" if file_path.endswith(('.png', '.jpg')) else "video/mp4" if file_path.endswith('.mp4') else "audio/mpeg" |
|
if file_path.endswith(('.png', '.jpg')): |
|
st.image(file_path, use_container_width=True) |
|
dl_link = f'<a href="data:{mime_type};base64,{b64}" download="{os.path.basename(file_path)}">π₯ Download {os.path.basename(file_path)}</a>' |
|
st.markdown(dl_link, unsafe_allow_html=True) |
|
elif file_path.endswith('.mp4'): |
|
st.markdown(await get_video_html(file_path), unsafe_allow_html=True) |
|
dl_link = f'<a href="data:{mime_type};base64,{b64}" download="{os.path.basename(file_path)}">π₯ Download {os.path.basename(file_path)}</a>' |
|
st.markdown(dl_link, unsafe_allow_html=True) |
|
elif file_path.endswith('.mp3'): |
|
st.markdown(await get_audio_html(file_path), unsafe_allow_html=True) |
|
dl_link = f'<a href="data:{mime_type};base64,{b64}" download="{os.path.basename(file_path)}">π₯ Download {os.path.basename(file_path)}</a>' |
|
st.markdown(dl_link, unsafe_allow_html=True) |
|
with col2: |
|
vote_count = chat_votes.get(line.split('. ')[1] if '. ' in line else line, 0) |
|
if st.button(f"π {vote_count}", key=f"chat_vote_{i}"): |
|
comment = st.session_state.message_text |
|
await save_vote(QUOTE_VOTES_FILE, line.split('. ')[1] if '. ' in line else line, await generate_user_hash(), st.session_state.username, comment) |
|
st.session_state.message_text = '' |
|
st.rerun() |
|
with col3: |
|
if st.button("π’ Quote", key=f"quote_{i}"): |
|
st.session_state.quote_line = line |
|
st.rerun() |
|
with col4: |
|
username = line.split(': ')[1].split(' ')[0] |
|
cache_key = f"{line}_{FUN_USERNAMES.get(username, 'en-US-AriaNeural')}" |
|
if cache_key not in st.session_state.audio_cache: |
|
cleaned_text = clean_text_for_tts(line.split(': ', 1)[1]) |
|
audio_file = await async_edge_tts_generate(cleaned_text, FUN_USERNAMES.get(username, "en-US-AriaNeural")) |
|
st.session_state.audio_cache[cache_key] = audio_file |
|
audio_file = st.session_state.audio_cache.get(cache_key) |
|
if audio_file: |
|
play_and_download_audio(audio_file) |
|
|
|
if st.session_state.quote_line: |
|
st.markdown(f"### Quoting: {st.session_state.quote_line}") |
|
quote_response = st.text_area("Add your response", key="quote_response") |
|
paste_result_quote = paste_image_button("π Paste Image with Quote", key="paste_button_quote") |
|
if paste_result_quote.image_data is not None: |
|
st.image(paste_result_quote.image_data, caption="Received Image for Quote") |
|
filename = await save_pasted_image(paste_result_quote.image_data, st.session_state.username) |
|
if filename: |
|
st.session_state.pasted_image_data = filename |
|
if st.button("Send Quote π", key="send_quote"): |
|
markdown_response = f"### Quote Response\n- **Original**: {st.session_state.quote_line}\n- **{st.session_state.username} Replies**: {quote_response}" |
|
if st.session_state.pasted_image_data: |
|
markdown_response += f"\n- **Image**: " |
|
await save_chat_entry(st.session_state.username, f"Pasted image: {st.session_state.pasted_image_data}") |
|
st.session_state.pasted_image_data = None |
|
await save_chat_entry(st.session_state.username, markdown_response) |
|
st.session_state.quote_line = None |
|
st.session_state.message_text = '' |
|
st.rerun() |
|
|
|
new_username = st.selectbox("Change Name", [""] + list(FUN_USERNAMES.keys()), index=0) |
|
if new_username and new_username != st.session_state.username: |
|
await save_chat_entry("System π", f"{st.session_state.username} changed name to {new_username}") |
|
st.session_state.username = new_username |
|
st.rerun() |
|
|
|
message = st.text_input(f"Message as {st.session_state.username}", key="message_input", value=st.session_state.message_text, on_change=lambda: st.session_state.update(message_text=st.session_state.message_input)) |
|
paste_result_msg = paste_image_button("π Paste Image with Message", key="paste_button_msg") |
|
if paste_result_msg.image_data is not None: |
|
st.image(paste_result_msg.image_data, caption="Received Image for Message") |
|
filename = await save_pasted_image(paste_result_msg.image_data, st.session_state.username) |
|
if filename: |
|
st.session_state.pasted_image_data = filename |
|
if st.button("Send π", key="send_button") and (message.strip() or st.session_state.pasted_image_data): |
|
if message.strip(): |
|
await save_chat_entry(st.session_state.username, message) |
|
if st.session_state.pasted_image_data: |
|
await save_chat_entry(st.session_state.username, f"Pasted image: {st.session_state.pasted_image_data}") |
|
st.session_state.pasted_image_data = None |
|
st.session_state.message_text = '' |
|
st.rerun() |
|
|
|
|
|
tab_main = st.radio("Action:", ["π€ Voice", "πΈ Media", "π ArXiv", "π Editor"], horizontal=True) |
|
useArxiv = st.checkbox("Search Arxiv for Research Paper Answers", value=True) |
|
useArxivAudio = st.checkbox("Generate Audio File for Research Paper Answers", value=False) |
|
|
|
|
|
st.subheader("Upload Media π¨πΆπ₯") |
|
uploaded_file = st.file_uploader("Upload Media", type=['png', 'jpg', 'mp4', 'mp3']) |
|
if uploaded_file: |
|
timestamp = format_timestamp_prefix(st.session_state.username) |
|
username = st.session_state.username |
|
ext = uploaded_file.name.split('.')[-1] |
|
filename = f"{timestamp}.{ext}" |
|
file_path = os.path.join(MEDIA_DIR, filename) |
|
await asyncio.to_thread(lambda: open(file_path, 'wb').write(uploaded_file.getbuffer())) |
|
st.success(f"Uploaded {filename}") |
|
await save_chat_entry(username, f"Uploaded media: {file_path}") |
|
if file_path.endswith('.mp4'): |
|
st.session_state.media_notifications.append(file_path) |
|
|
|
st.subheader("Media Gallery π¨πΆπ₯") |
|
media_files = glob.glob(f"{MEDIA_DIR}/*.png") + glob.glob(f"{MEDIA_DIR}/*.jpg") + glob.glob(f"{MEDIA_DIR}/*.mp4") + glob.glob(f"{MEDIA_DIR}/*.mp3") |
|
if media_files: |
|
media_votes = await load_votes(MEDIA_VOTES_FILE) |
|
st.write("### All Media Uploads") |
|
seen_files = set() |
|
for media_file in sorted(media_files, key=os.path.getmtime, reverse=True): |
|
if media_file not in seen_files: |
|
seen_files.add(media_file) |
|
filename = os.path.basename(media_file) |
|
vote_count = media_votes.get(media_file, 0) |
|
|
|
col1, col2 = st.columns([3, 1]) |
|
with col1: |
|
st.markdown(f"**{filename}**") |
|
if media_file.endswith(('.png', '.jpg')): |
|
st.image(media_file, use_container_width=True) |
|
elif media_file.endswith('.mp4'): |
|
st.markdown(await get_video_html(media_file), unsafe_allow_html=True) |
|
elif media_file.endswith('.mp3'): |
|
st.markdown(await get_audio_html(media_file), unsafe_allow_html=True) |
|
with col2: |
|
if st.button(f"π {vote_count}", key=f"media_vote_{media_file}"): |
|
await save_vote(MEDIA_VOTES_FILE, media_file, await generate_user_hash(), st.session_state.username) |
|
st.rerun() |
|
|
|
st.subheader("Refresh β³") |
|
refresh_rate = st.slider("Refresh Rate", 1, 300, st.session_state.refresh_rate) |
|
st.session_state.refresh_rate = refresh_rate |
|
timer_placeholder = st.empty() |
|
for i in range(st.session_state.refresh_rate, -1, -1): |
|
font_name, font_func = random.choice(UNICODE_FONTS) |
|
countdown_str = "".join(UNICODE_DIGITS[int(d)] for d in str(i)) if i < 10 else font_func(str(i)) |
|
timer_placeholder.markdown(f"<p class='timer'>β³ {font_func('Refresh in:')} {countdown_str}</p>", unsafe_allow_html=True) |
|
time.sleep(1) |
|
st.rerun() |
|
|
|
st.sidebar.subheader("Chat History π") |
|
with open(HISTORY_FILE, 'r') as f: |
|
history_content = f.read() |
|
st.sidebar.markdown(history_content) |
|
|
|
loop.run_until_complete(async_interface()) |
|
|
|
if __name__ == "__main__": |
|
main() |