Spaces:
Running
Running
import streamlit as st | |
import os | |
from twilio.rest import Client | |
import time | |
# Set up Streamlit app | |
st.title("Twilio Conversation Viewer") | |
# Get Twilio credentials from Streamlit secrets | |
account_sid = st.secrets["TWILIO_SID"] | |
auth_token = st.secrets["TWILIO_TOKEN"] | |
conversation_sid = st.secrets["TWILIO_CONVERSATION_SID"] | |
client = Client(account_sid, auth_token) | |
def fetch_messages(conversation_sid): | |
"""Fetches messages from the Twilio conversation.""" | |
try: | |
messages = client.conversations.v1.conversations(conversation_sid).messages.list() | |
return list(messages) | |
except Exception as e: | |
st.error(f"Error fetching messages: {e}") | |
return [] | |
def display_messages(messages): | |
"""Displays messages in the Streamlit app.""" | |
for message in messages: | |
if message.author == 'system': | |
st.markdown(f'<div style="background-color:#f0f2f6;padding:10px;border-radius:5px;margin-bottom:5px;"><strong>System:</strong> {message.body}</div>', unsafe_allow_html=True) | |
else: | |
st.markdown(f'<div style="background-color:#e1f5fe;padding:10px;border-radius:5px;margin-bottom:5px;"><strong>{message.author}:</strong> {message.body}</div>', unsafe_allow_html=True) | |
def send_message(conversation_sid, author, body): | |
"""Sends a new message to the Twilio conversation.""" | |
try: | |
message = client.conversations.v1.conversations(conversation_sid).messages.create(author=author, body=body) | |
st.success(f"Message sent (SID: {message.sid})") | |
except Exception as e: | |
st.error(f"Error sending message: {e}") | |
# Sidebar for configuration and actions | |
with st.sidebar: | |
st.header("Conversation Actions") | |
new_message = st.text_input("Enter new message:") | |
message_author = st.text_input("Your identifier (e.g., user1):", "user") | |
if st.button("Send Message"): | |
if new_message: | |
send_message(conversation_sid, message_author, new_message) | |
# Give a short delay to see the updated messages | |
time.sleep(1) | |
st.rerun() # Refresh the app to show the new message | |
st.header("Real-time Updates") | |
refresh_rate = st.slider("Refresh interval (seconds):", 1, 10, 3) | |
# Main area to display messages | |
st.subheader("Conversation History") | |
while True: | |
messages = fetch_messages(conversation_sid) | |
if messages: | |
display_messages(messages) | |
else: | |
st.info("No messages in this conversation yet.") | |
time.sleep(refresh_rate) | |
st.empty() # Clear the previous messages for the next update |