TwilioBot / app.py
masadonline's picture
Create app.py
a43bc9b verified
raw
history blame
2.56 kB
import streamlit as st
import os
from twilio.rest import Client
import time
# Set up Streamlit app
st.title("Twilio Conversation Viewer")
# Get Twilio credentials from Streamlit secrets
account_sid = st.secrets["TWILIO_SID"]
auth_token = st.secrets["TWILIO_TOKEN"]
conversation_sid = st.secrets["TWILIO_CONVERSATION_SID"]
client = Client(account_sid, auth_token)
def fetch_messages(conversation_sid):
"""Fetches messages from the Twilio conversation."""
try:
messages = client.conversations.v1.conversations(conversation_sid).messages.list()
return list(messages)
except Exception as e:
st.error(f"Error fetching messages: {e}")
return []
def display_messages(messages):
"""Displays messages in the Streamlit app."""
for message in messages:
if message.author == 'system':
st.markdown(f'<div style="background-color:#f0f2f6;padding:10px;border-radius:5px;margin-bottom:5px;"><strong>System:</strong> {message.body}</div>', unsafe_allow_html=True)
else:
st.markdown(f'<div style="background-color:#e1f5fe;padding:10px;border-radius:5px;margin-bottom:5px;"><strong>{message.author}:</strong> {message.body}</div>', unsafe_allow_html=True)
def send_message(conversation_sid, author, body):
"""Sends a new message to the Twilio conversation."""
try:
message = client.conversations.v1.conversations(conversation_sid).messages.create(author=author, body=body)
st.success(f"Message sent (SID: {message.sid})")
except Exception as e:
st.error(f"Error sending message: {e}")
# Sidebar for configuration and actions
with st.sidebar:
st.header("Conversation Actions")
new_message = st.text_input("Enter new message:")
message_author = st.text_input("Your identifier (e.g., user1):", "user")
if st.button("Send Message"):
if new_message:
send_message(conversation_sid, message_author, new_message)
# Give a short delay to see the updated messages
time.sleep(1)
st.rerun() # Refresh the app to show the new message
st.header("Real-time Updates")
refresh_rate = st.slider("Refresh interval (seconds):", 1, 10, 3)
# Main area to display messages
st.subheader("Conversation History")
while True:
messages = fetch_messages(conversation_sid)
if messages:
display_messages(messages)
else:
st.info("No messages in this conversation yet.")
time.sleep(refresh_rate)
st.empty() # Clear the previous messages for the next update