witspathology / app.py
IAMTFRMZA's picture
Update app.py
bff8693 verified
raw
history blame
7.23 kB
# ------------------ Import ------------------
import streamlit as st
import os
import time
import re
import requests
from PIL import Image
from io import BytesIO
from urllib.parse import quote
from openai import OpenAI
# ------------------ Authentication ------------------
VALID_USERS = {
"[email protected]": "Pass.123",
"[email protected]": "Pass.123",
"[email protected]": "Pass.123",
"[email protected]": "Pass.123",
}
def login():
st.title("πŸ” Login Required")
email = st.text_input("Email")
password = st.text_input("Password", type="password")
if st.button("Login"):
if VALID_USERS.get(email) == password:
st.session_state.authenticated = True
st.rerun()
else:
st.error("❌ Incorrect email or password.")
if "authenticated" not in st.session_state:
st.session_state.authenticated = False
if not st.session_state.authenticated:
login()
st.stop()
# ------------------ Configuration ------------------
st.set_page_config(page_title="AI Pathology Assistant", layout="wide", initial_sidebar_state="collapsed")
st.title("🧬 AI Pathology Assistant")
st.caption("AI-powered exploration of pathology, anatomy, and histology documents via OCR + GPT")
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
if not OPENAI_API_KEY:
st.error("❌ Missing OPENAI_API_KEY environment variable.")
st.stop()
client = OpenAI(api_key=OPENAI_API_KEY)
ASSISTANT_ID = "asst_jXDSjCG8LI4HEaFEcjFVq8KB"
# ------------------ State Setup ------------------
if "messages" not in st.session_state:
st.session_state.messages = []
if "thread_id" not in st.session_state:
st.session_state.thread_id = None
if "image_urls" not in st.session_state:
st.session_state.image_urls = []
if "pending_prompt" not in st.session_state:
st.session_state.pending_prompt = None
# ------------------ Sidebar ------------------
with st.sidebar:
st.header("πŸ§ͺ Pathology Tools")
if st.button("🧹 Clear Chat"):
st.session_state.messages = []
st.session_state.thread_id = None
st.session_state.image_urls = []
st.session_state.pending_prompt = None
st.rerun()
show_image = st.toggle("πŸ“Έ Show Images", value=True)
keyword = st.text_input("Keyword Search", placeholder="e.g. mitosis, carcinoma")
if st.button("πŸ”Ž Search") and keyword:
st.session_state.pending_prompt = f"Find clauses or references related to: {keyword}"
section = st.text_input("Section Lookup", placeholder="e.g. Connective Tissue")
if section:
st.session_state.pending_prompt = f"Summarize or list key points from section: {section}"
actions = [
"Select an action...",
"List histological features of inflammation",
"Summarize features of carcinoma",
"List muscle types and features",
"Extract diagnostic markers",
"Summarize embryology stages"
]
action = st.selectbox("Common Pathology Queries", actions)
if action != actions[0]:
st.session_state.pending_prompt = action
# ------------------ Chat UI ------------------
chat_col, image_col = st.columns([2, 1])
with chat_col:
st.markdown("### πŸ’¬ Ask a Pathology-Specific Question")
user_input = st.chat_input("Example: What are features of squamous cell carcinoma?")
if user_input:
st.session_state.messages.append({"role": "user", "content": user_input})
elif st.session_state.pending_prompt:
st.session_state.messages.append({"role": "user", "content": st.session_state.pending_prompt})
st.session_state.pending_prompt = None
if st.session_state.messages and st.session_state.messages[-1]["role"] == "user":
try:
if st.session_state.thread_id is None:
thread = client.beta.threads.create()
st.session_state.thread_id = thread.id
client.beta.threads.messages.create(
thread_id=st.session_state.thread_id,
role="user",
content=st.session_state.messages[-1]["content"]
)
run = client.beta.threads.runs.create(
thread_id=st.session_state.thread_id,
assistant_id=ASSISTANT_ID
)
with st.spinner("πŸ”¬ Analyzing..."):
while True:
status = client.beta.threads.runs.retrieve(thread_id=st.session_state.thread_id, run_id=run.id)
if status.status in ("completed", "failed", "cancelled"):
break
time.sleep(1)
if status.status == "completed":
messages = client.beta.threads.messages.list(thread_id=st.session_state.thread_id)
for m in reversed(messages.data):
if m.role == "assistant":
reply = m.content[0].text.value
st.session_state.messages.append({"role": "assistant", "content": reply})
matches = re.findall(
r'https://raw\.githubusercontent\.com/AndrewLORTech/witspathologai/main/[^\s]+\.png',
reply
)
st.session_state.image_urls = matches
break
else:
st.error("❌ Assistant failed to respond.")
st.rerun()
except Exception as e:
st.error(f"❌ Error: {e}")
for msg in st.session_state.messages:
with st.chat_message(msg["role"]):
st.markdown(msg["content"], unsafe_allow_html=True)
# ------------------ Scrollable Image Carousel ------------------
with image_col:
if show_image and st.session_state.image_urls:
st.markdown("### πŸ–ΌοΈ Image(s)")
st.markdown("""
<style>
.carousel-wrapper {
display: flex;
overflow-x: auto;
scroll-snap-type: x mandatory;
gap: 1rem;
padding: 0.5rem 0;
}
.carousel-wrapper::-webkit-scrollbar {
height: 10px;
}
.carousel-wrapper::-webkit-scrollbar-thumb {
background: #888;
border-radius: 5px;
}
.carousel-wrapper img {
scroll-snap-align: start;
height: 500px;
border-radius: 8px;
}
</style>
""", unsafe_allow_html=True)
html = '<div class="carousel-wrapper">'
for raw_url in st.session_state.image_urls:
try:
if "githubusercontent.com" not in raw_url:
continue
_, raw_path = raw_url.split("githubusercontent.com/", 1)
segments = raw_path.strip().split("/")
encoded_segments = [quote(seg) for seg in segments]
encoded_url = "https://raw.githubusercontent.com/" + "/".join(encoded_segments)
html += f'<img src="{encoded_url}" alt="slide preview">'
except Exception as e:
st.error(f"❌ Failed to load image: {e}")
html += "</div>"
st.markdown(html, unsafe_allow_html=True)