witspathology / app.py
IAMTFRMZA's picture
Update app.py
2a87f5c verified
raw
history blame
10 kB
import streamlit as st
import os
import time
import re
import requests
import json
from PIL import Image
from io import BytesIO
from openai import OpenAI
# ------------------ Authentication ------------------
VALID_USERS = {
"[email protected]": "Pass.123",
"[email protected]": "Pass.123",
"[email protected]": "Pass.123",
"[email protected]": "Pass.123",
}
def login():
st.title("πŸ” Login Required")
email = st.text_input("Email")
password = st.text_input("Password", type="password")
if st.button("Login"):
if VALID_USERS.get(email) == password:
st.session_state.authenticated = True
st.rerun()
else:
st.error("❌ Incorrect email or password.")
if "authenticated" not in st.session_state:
st.session_state.authenticated = False
if not st.session_state.authenticated:
login()
st.stop()
# ------------------ App Config ------------------
st.set_page_config(page_title="AI Pathology Assistant", layout="wide", initial_sidebar_state="collapsed")
st.title("🧬 AI Pathology Assistant")
# ------------------ Load OpenAI ------------------
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
if not OPENAI_API_KEY:
st.error("❌ Missing OPENAI_API_KEY environment variable.")
st.stop()
client = OpenAI(api_key=OPENAI_API_KEY)
# ------------------ Assistant Setup ------------------
ASSISTANT_ID = "asst_9v09zgizdcuuhNdcFQpRo9RO"
# ------------------ State ------------------
if "messages" not in st.session_state:
st.session_state.messages = []
if "thread_id" not in st.session_state:
st.session_state.thread_id = None
if "image_urls" not in st.session_state:
st.session_state.image_urls = []
if "pending_prompt" not in st.session_state:
st.session_state.pending_prompt = None
# ------------------ Tabs ------------------
tab1, tab2 = st.tabs(["πŸ’¬ Chat Assistant", "πŸ–ΌοΈ Visual Reference Search"])
# ------------------ Tab 1: Chat Assistant ------------------
with tab1:
with st.sidebar:
st.header("πŸ§ͺ Pathology Tools")
if st.button("🧹 Clear Chat"):
st.session_state.messages = []
st.session_state.thread_id = None
st.session_state.image_urls = []
st.session_state.pending_prompt = None
st.rerun()
show_image = st.toggle("πŸ“Έ Show Images", value=True)
keyword = st.text_input("Keyword Search", placeholder="e.g. mitosis, carcinoma")
if st.button("πŸ”Ž Search") and keyword:
st.session_state.pending_prompt = f"Find clauses or references related to: {keyword}"
section = st.text_input("Section Lookup", placeholder="e.g. Connective Tissue")
if section:
st.session_state.pending_prompt = f"Summarize or list key points from section: {section}"
actions = [
"Select an action...",
"List histological features of inflammation",
"Summarize features of carcinoma",
"List muscle types and features",
"Extract diagnostic markers",
"Summarize embryology stages"
]
action = st.selectbox("Common Pathology Queries", actions)
if action != actions[0]:
st.session_state.pending_prompt = action
chat_col, image_col = st.columns([2, 1])
with chat_col:
st.markdown("### πŸ’¬ Ask a Pathology-Specific Question")
user_input = st.chat_input("Example: What are features of squamous cell carcinoma?")
if user_input:
st.session_state.messages.append({"role": "user", "content": user_input})
elif st.session_state.pending_prompt:
st.session_state.messages.append({"role": "user", "content": st.session_state.pending_prompt})
st.session_state.pending_prompt = None
if st.session_state.messages and st.session_state.messages[-1]["role"] == "user":
try:
if st.session_state.thread_id is None:
thread = client.beta.threads.create()
st.session_state.thread_id = thread.id
client.beta.threads.messages.create(
thread_id=st.session_state.thread_id,
role="user",
content=st.session_state.messages[-1]["content"]
)
run = client.beta.threads.runs.create(
thread_id=st.session_state.thread_id,
assistant_id=ASSISTANT_ID
)
with st.spinner("πŸ”¬ Analyzing..."):
while True:
status = client.beta.threads.runs.retrieve(thread_id=st.session_state.thread_id, run_id=run.id)
if status.status in ("completed", "failed", "cancelled"):
break
time.sleep(1)
if status.status == "completed":
messages = client.beta.threads.messages.list(thread_id=st.session_state.thread_id)
for m in reversed(messages.data):
if m.role == "assistant":
reply = m.content[0].text.value
st.session_state.messages.append({"role": "assistant", "content": reply})
image_matches = re.findall(
r'https://raw\.githubusercontent\.com/AndrewLORTech/witspathologai/main/[^\s\n"]+\.png',
reply
)
st.session_state.image_urls = image_matches
break
else:
st.error("❌ Assistant failed to respond.")
st.rerun()
except Exception as e:
st.error(f"❌ Error: {e}")
for msg in st.session_state.messages:
with st.chat_message(msg["role"]):
st.markdown(msg["content"], unsafe_allow_html=True)
with image_col:
if show_image and st.session_state.image_urls:
st.markdown("### πŸ–ΌοΈ Image(s)")
for raw_url in st.session_state.image_urls:
try:
r = requests.get(raw_url, timeout=5)
r.raise_for_status()
img = Image.open(BytesIO(r.content))
st.image(img, caption=f"πŸ“· {raw_url.split('/')[-1]}", use_container_width=True)
except Exception:
continue
# ------------------ Tab 2: Visual Search ------------------
with tab2:
st.title("πŸ” Visual Reference Search")
user_query = st.text_input("Enter keyword to search images (e.g. ovary, thyroid, mitosis)")
if "image_thread_id" not in st.session_state:
st.session_state.image_thread_id = None
if "image_response" not in st.session_state:
st.session_state.image_response = None
if st.button("Ask Assistant") and user_query:
try:
if st.session_state.image_thread_id is None:
thread = client.beta.threads.create()
st.session_state.image_thread_id = thread.id
client.beta.threads.messages.create(
thread_id=st.session_state.image_thread_id,
role="user",
content=user_query
)
run = client.beta.threads.runs.create(
thread_id=st.session_state.image_thread_id,
assistant_id="asst_9v09zgizdcuuhNdcFQpRo9RO"
)
with st.spinner("πŸ”¬ Searching for visual references..."):
while True:
run_status = client.beta.threads.runs.retrieve(
thread_id=st.session_state.image_thread_id,
run_id=run.id
)
if run_status.status in ("completed", "failed", "cancelled"):
break
time.sleep(1)
if run_status.status == "completed":
messages = client.beta.threads.messages.list(thread_id=st.session_state.image_thread_id)
for m in reversed(messages.data):
if m.role == "assistant":
st.session_state.image_response = m.content[0].text.value
break
else:
st.error("❌ Assistant failed to return an image match.")
except Exception as e:
st.error(f"Error: {e}")
# Render assistant response + build GitHub image URLs from filenames
if st.session_state.image_response:
st.markdown("### 🧠 Assistant Response")
st.markdown(st.session_state.image_response, unsafe_allow_html=True)
# Extract image filenames from assistant response
filename_matches = re.findall(r'Image Filename:\s*(.*?\.png)', st.session_state.image_response)
for fname in filename_matches:
# Dynamically extract folder name from filename prefix
match = re.match(r'^(.+?)_page_', fname)
if match:
folder = match.group(1)
else:
st.warning(f"⚠️ Could not determine folder for: {fname}")
continue
# Encode folder (including parentheses) but NOT parentheses in filenames
encoded_folder = folder.replace(" ", "%20").replace("(", "%28").replace(")", "%29")
encoded_fname = fname.replace(" ", "%20") # Leave parentheses unencoded
url = f"https://raw.githubusercontent.com/AndrewLORTech/witspathologaihisto/main/{encoded_folder}/{encoded_fname}"
try:
r = requests.get(url, timeout=5)
r.raise_for_status()
img = Image.open(BytesIO(r.content))
st.image(img, caption=fname, use_container_width=True)
except Exception:
st.warning(f"⚠️ Failed to load: {url}")