witspathologyv2 / app.py
IAMTFRMZA's picture
Update app.py
10e5f47 verified
raw
history blame
6.48 kB
import streamlit as st
import os
import time
import re
import requests
from PIL import Image
from io import BytesIO
from urllib.parse import quote
from openai import OpenAI
# ------------------ Authentication ------------------
VALID_USERS = {
"[email protected]": "Pass.123",
"[email protected]": "Pass.123",
"[email protected]": "Pass.123",
"[email protected]": "Pass.123",
}
def login():
st.title("πŸ” Login Required")
email = st.text_input("Email")
password = st.text_input("Password", type="password")
if st.button("Login"):
if VALID_USERS.get(email) == password:
st.session_state.authenticated = True
st.rerun()
else:
st.error("❌ Incorrect email or password.")
if "authenticated" not in st.session_state:
st.session_state.authenticated = False
if not st.session_state.authenticated:
login()
st.stop()
# ------------------ App Configuration ------------------
st.set_page_config(page_title="AI Pathology Assistant", layout="wide", initial_sidebar_state="collapsed")
st.title("🧬 AI Pathology Assistant")
st.caption("AI-powered exploration of pathology, anatomy, and histology documents via OCR + GPT")
# ------------------ Load OpenAI API Key ------------------
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
if not OPENAI_API_KEY:
st.error("❌ Missing OPENAI_API_KEY environment variable.")
st.stop()
client = OpenAI(api_key=OPENAI_API_KEY)
# ------------------ Assistant Configuration ------------------
ASSISTANT_ID = "asst_jXDSjCG8LI4HEaFEcjFVq8KB" # Replace with your Assistant ID
# ------------------ Session State ------------------
if "messages" not in st.session_state:
st.session_state.messages = []
if "thread_id" not in st.session_state:
st.session_state.thread_id = None
if "image_urls" not in st.session_state:
st.session_state.image_urls = []
if "pending_prompt" not in st.session_state:
st.session_state.pending_prompt = None
# ------------------ Sidebar ------------------
with st.sidebar:
st.header("πŸ§ͺ Pathology Tools")
if st.button("🧹 Clear Chat"):
st.session_state.messages = []
st.session_state.thread_id = None
st.session_state.image_urls = []
st.session_state.pending_prompt = None
st.rerun()
show_image = st.toggle("πŸ“Έ Show Slide Images", value=True)
keyword = st.text_input("Keyword Search", placeholder="e.g. mitosis, carcinoma")
if st.button("πŸ”Ž Search") and keyword:
st.session_state.pending_prompt = f"Find clauses or references related to: {keyword}"
section = st.text_input("Section Lookup", placeholder="e.g. Connective Tissue")
if section:
st.session_state.pending_prompt = f"Summarize or list key points from section: {section}"
actions = [
"Select an action...",
"List histological features of inflammation",
"Summarize features of carcinoma",
"List muscle types and features",
"Extract diagnostic markers",
"Summarize embryology stages"
]
action = st.selectbox("Common Pathology Queries", actions)
if action != actions[0]:
st.session_state.pending_prompt = action
# ------------------ Main Chat UI ------------------
chat_col, image_col = st.columns([2, 1])
with chat_col:
st.markdown("### πŸ’¬ Ask a Pathology-Specific Question")
user_input = st.chat_input("Example: What are features of squamous cell carcinoma?")
if user_input:
st.session_state.messages.append({"role": "user", "content": user_input})
elif st.session_state.pending_prompt:
st.session_state.messages.append({"role": "user", "content": st.session_state.pending_prompt})
st.session_state.pending_prompt = None
if st.session_state.messages and st.session_state.messages[-1]["role"] == "user":
try:
if st.session_state.thread_id is None:
thread = client.beta.threads.create()
st.session_state.thread_id = thread.id
client.beta.threads.messages.create(
thread_id=st.session_state.thread_id,
role="user",
content=st.session_state.messages[-1]["content"]
)
run = client.beta.threads.runs.create(
thread_id=st.session_state.thread_id,
assistant_id=ASSISTANT_ID
)
with st.spinner("πŸ”¬ Analyzing..."):
while True:
status = client.beta.threads.runs.retrieve(thread_id=st.session_state.thread_id, run_id=run.id)
if status.status in ("completed", "failed", "cancelled"):
break
time.sleep(1)
if status.status == "completed":
messages = client.beta.threads.messages.list(thread_id=st.session_state.thread_id)
for m in reversed(messages.data):
if m.role == "assistant":
reply = m.content[0].text.value
st.session_state.messages.append({"role": "assistant", "content": reply})
# 🧠 Extract ALL image URLs from assistant reply
image_matches = re.findall(
r'https://raw\.githubusercontent\.com/AndrewLORTech/witspathologai/main/[^ \n]+\.png',
reply
)
st.session_state.image_urls = image_matches
break
else:
st.error("❌ Assistant failed to respond.")
st.rerun()
except Exception as e:
st.error(f"❌ Error: {e}")
for msg in st.session_state.messages:
with st.chat_message(msg["role"]):
st.markdown(msg["content"], unsafe_allow_html=True)
# ------------------ Right Column: Image Previews ------------------
with image_col:
if show_image and st.session_state.image_urls:
st.markdown("### πŸ–ΌοΈ Slide Previews")
for url in st.session_state.image_urls:
try:
r = requests.get(url)
r.raise_for_status()
img = Image.open(BytesIO(r.content))
st.image(img, caption=f"πŸ“· {url.split('/')[-1]}", use_container_width=True)
except Exception as e:
st.error(f"❌ Failed to load image from {url}: {e}")