witspathologyv2 / app.py
IAMTFRMZA's picture
Update app.py
41c50e2 verified
raw
history blame
7.16 kB
import streamlit as st
import os
import time
import re
from urllib.parse import quote
from openai import OpenAI
# ------------------ Authentication ------------------
VALID_USERS = {
"[email protected]": "Pass.123",
"[email protected]": "Pass.123",
"[email protected]": "Pass.123",
"[email protected]": "Pass.123",
}
def login():
st.title("πŸ” Login Required")
email = st.text_input("Email")
password = st.text_input("Password", type="password")
if st.button("Login"):
if VALID_USERS.get(email) == password:
st.session_state.authenticated = True
st.rerun()
else:
st.error("❌ Incorrect email or password.")
if "authenticated" not in st.session_state:
st.session_state.authenticated = False
if not st.session_state.authenticated:
login()
st.stop()
# ------------------ App Configuration ------------------
st.set_page_config(page_title="AI Pathology Assistant", layout="wide", initial_sidebar_state="collapsed")
st.title("🧬 AI Pathology Assistant")
st.caption("AI-powered exploration of pathology, anatomy, and histology documents via OCR + GPT")
# ------------------ Load OpenAI API Key ------------------
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
if not OPENAI_API_KEY:
st.error("❌ Missing OPENAI_API_KEY environment variable.")
st.stop()
client = OpenAI(api_key=OPENAI_API_KEY)
# ------------------ Assistant Configuration ------------------
ASSISTANT_ID = "asst_jXDSjCG8LI4HEaFEcjFVq8KB" # Replace with your actual assistant ID
# ------------------ Session State ------------------
if "messages" not in st.session_state:
st.session_state.messages = []
if "thread_id" not in st.session_state:
st.session_state.thread_id = None
if "image_urls" not in st.session_state:
st.session_state.image_urls = []
if "pending_prompt" not in st.session_state:
st.session_state.pending_prompt = None
# ------------------ Sidebar ------------------
with st.sidebar:
st.header("πŸ§ͺ Pathology Tools")
if st.button("🧹 Clear Chat"):
st.session_state.messages = []
st.session_state.thread_id = None
st.session_state.image_urls = []
st.session_state.pending_prompt = None
st.rerun()
show_image = st.toggle("πŸ“Έ Show Slide Images", value=True)
keyword = st.text_input("Keyword Search", placeholder="e.g. mitosis, carcinoma")
if st.button("πŸ”Ž Search") and keyword:
st.session_state.pending_prompt = f"Find clauses or references related to: {keyword}"
section = st.text_input("Section Lookup", placeholder="e.g. Connective Tissue")
if section:
st.session_state.pending_prompt = f"Summarize or list key points from section: {section}"
actions = [
"Select an action...",
"List histological features of inflammation",
"Summarize features of carcinoma",
"List muscle types and features",
"Extract diagnostic markers",
"Summarize embryology stages"
]
action = st.selectbox("Common Pathology Queries", actions)
if action != actions[0]:
st.session_state.pending_prompt = action
# ------------------ Main Chat UI ------------------
chat_col, image_col = st.columns([2, 1])
with chat_col:
st.markdown("### πŸ’¬ Ask a Pathology-Specific Question")
user_input = st.chat_input("Example: What are features of squamous cell carcinoma?")
if user_input:
st.session_state.messages.append({"role": "user", "content": user_input})
elif st.session_state.pending_prompt:
st.session_state.messages.append({"role": "user", "content": st.session_state.pending_prompt})
st.session_state.pending_prompt = None
if st.session_state.messages and st.session_state.messages[-1]["role"] == "user":
try:
if st.session_state.thread_id is None:
thread = client.beta.threads.create()
st.session_state.thread_id = thread.id
client.beta.threads.messages.create(
thread_id=st.session_state.thread_id,
role="user",
content=st.session_state.messages[-1]["content"]
)
run = client.beta.threads.runs.create(
thread_id=st.session_state.thread_id,
assistant_id=ASSISTANT_ID
)
with st.spinner("πŸ”¬ Analyzing..."):
while True:
status = client.beta.threads.runs.retrieve(thread_id=st.session_state.thread_id, run_id=run.id)
if status.status in ("completed", "failed", "cancelled"):
break
time.sleep(1)
if status.status == "completed":
messages = client.beta.threads.messages.list(thread_id=st.session_state.thread_id)
for m in reversed(messages.data):
if m.role == "assistant":
reply = m.content[0].text.value
# βœ‚οΈ Split output into main and references
split_marker = "---"
if split_marker in reply:
main_answer, references = reply.split(split_marker, 1)
st.session_state.messages.append({"role": "assistant", "content": main_answer.strip()})
st.session_state.messages.append({"role": "references", "content": references.strip()})
else:
st.session_state.messages.append({"role": "assistant", "content": reply})
# Extract raw GitHub image URLs
image_matches = re.findall(
r'https://raw\.githubusercontent\.com/AndrewLORTech/witspathologai/main/[^\s\n]+\.png',
reply
)
st.session_state.image_urls = image_matches
break
else:
st.error("❌ Assistant failed to respond.")
st.rerun()
except Exception as e:
st.error(f"❌ Error: {e}")
for msg in st.session_state.messages:
with st.chat_message("assistant" if msg["role"] != "references" else "πŸ“„"):
st.markdown(msg["content"], unsafe_allow_html=True)
# ------------------ Image Viewer (with encoded URLs) ------------------
with image_col:
if show_image and st.session_state.image_urls:
st.markdown("### πŸ–ΌοΈ Slide Previews")
for i, raw_url in enumerate(st.session_state.image_urls):
try:
if "raw.githubusercontent.com" in raw_url:
base = "https://raw.githubusercontent.com/"
path = raw_url.replace(base, "")
encoded_path = "/".join(quote(p) for p in path.split("/"))
encoded_url = base + encoded_path
st.image(encoded_url, caption=f"Slide {i + 1}", use_container_width=True)
except Exception as e:
st.error(f"❌ Failed to load image {i + 1}: {e}")