witspathology / app.py
IAMTFRMZA's picture
Update app.py
6bfb206 verified
raw
history blame
10.1 kB
import streamlit as st
import os
import time
import re
import requests
import json
from PIL import Image
from io import BytesIO
from openai import OpenAI
# ------------------ Authentication ------------------
VALID_USERS = {
"[email protected]": "Pass.123",
"[email protected]": "Pass.123",
"[email protected]": "Pass.123",
"[email protected]": "Pass.123",
}
def login():
st.title("πŸ” Login Required")
email = st.text_input("Email")
password = st.text_input("Password", type="password")
if st.button("Login"):
if VALID_USERS.get(email) == password:
st.session_state.authenticated = True
st.rerun()
else:
st.error("❌ Incorrect email or password.")
if "authenticated" not in st.session_state:
st.session_state.authenticated = False
if not st.session_state.authenticated:
login()
st.stop()
# ------------------ App Config ------------------
st.set_page_config(page_title="AI Pathology Assistant", layout="wide", initial_sidebar_state="collapsed")
st.title("🧬 AI Pathology Assistant")
# ------------------ Load OpenAI ------------------
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
if not OPENAI_API_KEY:
st.error("❌ Missing OPENAI_API_KEY environment variable.")
st.stop()
client = OpenAI(api_key=OPENAI_API_KEY)
# ------------------ Assistant Setup ------------------
ASSISTANT_ID = "asst_9v09zgizdcuuhNdcFQpRo9RO"
# ------------------ State ------------------
if "messages" not in st.session_state:
st.session_state.messages = []
if "thread_id" not in st.session_state:
st.session_state.thread_id = None
if "image_urls" not in st.session_state:
st.session_state.image_urls = []
if "pending_prompt" not in st.session_state:
st.session_state.pending_prompt = None
# ------------------ Tabs ------------------
tab1, tab2 = st.tabs(["πŸ’¬ Chat Assistant", "πŸ–ΌοΈ Visual Reference Search"])
# ------------------ Tab 1: Chat Assistant ------------------
with tab1:
with st.sidebar:
st.header("πŸ§ͺ Pathology Tools")
if st.button("🧹 Clear Chat"):
st.session_state.messages = []
st.session_state.thread_id = None
st.session_state.image_urls = []
st.session_state.pending_prompt = None
st.rerun()
show_image = st.toggle("πŸ“Έ Show Images", value=True)
keyword = st.text_input("Keyword Search", placeholder="e.g. mitosis, carcinoma")
if st.button("πŸ”Ž Search") and keyword:
st.session_state.pending_prompt = f"Find clauses or references related to: {keyword}"
section = st.text_input("Section Lookup", placeholder="e.g. Connective Tissue")
if section:
st.session_state.pending_prompt = f"Summarize or list key points from section: {section}"
actions = [
"Select an action...",
"List histological features of inflammation",
"Summarize features of carcinoma",
"List muscle types and features",
"Extract diagnostic markers",
"Summarize embryology stages"
]
action = st.selectbox("Common Pathology Queries", actions)
if action != actions[0]:
st.session_state.pending_prompt = action
chat_col, image_col = st.columns([2, 1])
with chat_col:
st.markdown("### πŸ’¬ Ask a Pathology-Specific Question")
user_input = st.chat_input("Example: What are features of squamous cell carcinoma?")
if user_input:
st.session_state.messages.append({"role": "user", "content": user_input})
elif st.session_state.pending_prompt:
st.session_state.messages.append({"role": "user", "content": st.session_state.pending_prompt})
st.session_state.pending_prompt = None
if st.session_state.messages and st.session_state.messages[-1]["role"] == "user":
try:
if st.session_state.thread_id is None:
thread = client.beta.threads.create()
st.session_state.thread_id = thread.id
client.beta.threads.messages.create(
thread_id=st.session_state.thread_id,
role="user",
content=st.session_state.messages[-1]["content"]
)
run = client.beta.threads.runs.create(
thread_id=st.session_state.thread_id,
assistant_id=ASSISTANT_ID
)
with st.spinner("πŸ”¬ Analyzing..."):
while True:
status = client.beta.threads.runs.retrieve(thread_id=st.session_state.thread_id, run_id=run.id)
if status.status in ("completed", "failed", "cancelled"):
break
time.sleep(1)
if status.status == "completed":
messages = client.beta.threads.messages.list(thread_id=st.session_state.thread_id)
for m in reversed(messages.data):
if m.role == "assistant":
reply = m.content[0].text.value
st.session_state.messages.append({"role": "assistant", "content": reply})
image_matches = re.findall(
r'https://raw\.githubusercontent\.com/AndrewLORTech/witspathologai/main/[^\s\n"]+\.png',
reply
)
st.session_state.image_urls = image_matches
break
else:
st.error("❌ Assistant failed to respond.")
st.rerun()
except Exception as e:
st.error(f"❌ Error: {e}")
for msg in st.session_state.messages:
with st.chat_message(msg["role"]):
st.markdown(msg["content"], unsafe_allow_html=True)
with image_col:
if show_image and st.session_state.image_urls:
st.markdown("### πŸ–ΌοΈ Image(s)")
for raw_url in st.session_state.image_urls:
try:
r = requests.get(raw_url, timeout=5)
r.raise_for_status()
img = Image.open(BytesIO(r.content))
st.image(img, caption=f"πŸ“· {raw_url.split('/')[-1]}", use_container_width=True)
except Exception:
continue
from urllib.parse import quote # βœ… Make sure this is at the top of your file
# ------------------ Tab 2: Visual Reference Search ------------------
with tab2:
st.title("πŸ” Visual Reference Search")
user_query = st.text_input("Enter keyword to search images (e.g. ovary, thyroid, mitosis)")
if "image_thread_id" not in st.session_state:
st.session_state.image_thread_id = None
if "image_response" not in st.session_state:
st.session_state.image_response = None
if st.button("Ask Assistant") and user_query:
try:
if st.session_state.image_thread_id is None:
thread = client.beta.threads.create()
st.session_state.image_thread_id = thread.id
client.beta.threads.messages.create(
thread_id=st.session_state.image_thread_id,
role="user",
content=user_query
)
run = client.beta.threads.runs.create(
thread_id=st.session_state.image_thread_id,
assistant_id="asst_9v09zgizdcuuhNdcFQpRo9RO"
)
with st.spinner("πŸ”¬ Searching for visual references..."):
while True:
run_status = client.beta.threads.runs.retrieve(
thread_id=st.session_state.image_thread_id,
run_id=run.id
)
if run_status.status in ("completed", "failed", "cancelled"):
break
time.sleep(1)
if run_status.status == "completed":
messages = client.beta.threads.messages.list(thread_id=st.session_state.image_thread_id)
for m in reversed(messages.data):
if m.role == "assistant":
st.session_state.image_response = m.content[0].text.value
break
else:
st.error("❌ Assistant failed to return an image match.")
except Exception as e:
st.error(f"Error: {e}")
# Render assistant response + build GitHub image URLs from filenames
if st.session_state.image_response:
st.markdown("### 🧠 Assistant Response")
st.markdown(st.session_state.image_response, unsafe_allow_html=True)
# Extract image filenames from assistant response
filename_matches = re.findall(r'Image Filename:\s*(.*?\.png)', st.session_state.image_response)
for fname in filename_matches:
# Extract folder from filename prefix before _page_
match = re.match(r'^(.+?)_page_', fname)
if match:
folder = match.group(1).strip()
else:
st.warning(f"⚠️ Could not determine folder for: {fname}")
continue
# βœ… Use URL-safe encoding for both folder and filename
encoded_folder = quote(folder)
encoded_fname = quote(fname)
url = f"https://raw.githubusercontent.com/AndrewLORTech/witspathologaihisto/main/{encoded_folder}/{encoded_fname}"
st.markdown(f"πŸ”— [View image directly]({url})")
try:
r = requests.get(url, timeout=5)
r.raise_for_status()
img = Image.open(BytesIO(r.content))
st.image(img, caption=fname, use_container_width=True)
except Exception as e:
st.warning(f"⚠️ Failed to load: {url}")
st.error(f"πŸ›‘ Error: {str(e)}")