Spaces:
Running
Running
import streamlit as st | |
st.set_page_config(page_title="AI Pathology Assistant", layout="wide", initial_sidebar_state="collapsed") | |
import os | |
import time | |
import re | |
import requests | |
from PIL import Image | |
from io import BytesIO | |
from openai import OpenAI | |
# ------------------ Authentication ------------------ | |
VALID_USERS = { | |
"[email protected]": "Pass.123", | |
"[email protected]": "Pass.123", | |
"[email protected]": "Pass.123", | |
"[email protected]": "Pass.123", | |
"[email protected]": "Pass.123", | |
} | |
def login(): | |
st.title("π Login Required") | |
email = st.text_input("Email") | |
password = st.text_input("Password", type="password") | |
if st.button("Login"): | |
if VALID_USERS.get(email) == password: | |
st.session_state["authenticated"] = True | |
st.experimental_set_query_params(logged_in="1") | |
st.rerun() | |
else: | |
st.error("β Incorrect email or password.") | |
if not st.session_state.get("authenticated", False): | |
login() | |
st.stop() | |
# ------------------ App Title ------------------ | |
st.title("𧬠AI Pathology Assistant") | |
# ------------------ Load OpenAI ------------------ | |
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") | |
if not OPENAI_API_KEY: | |
st.error("β Missing OPENAI_API_KEY environment variable.") | |
st.stop() | |
client = OpenAI(api_key=OPENAI_API_KEY) | |
# ------------------ Assistant Setup ------------------ | |
ASSISTANT_ID = "asst_jXDSjCG8LI4HEaFEcjFVq8KB" | |
# ------------------ Session State Initialization ------------------ | |
for key in ["messages", "thread_id", "image_urls", "pending_prompt", "image_url", "image_updated"]: | |
if key not in st.session_state: | |
st.session_state[key] = [] if key.endswith("s") else None if "url" in key else False | |
# ------------------ Tabs ------------------ | |
tab1, tab2 = st.tabs(["π¬ Chat Assistant", "πΌοΈ Visual Reference Search"]) | |
# ------------------ Tab 1: Chat Assistant ------------------ | |
with tab1: | |
with st.sidebar: | |
st.header("π§ Tools") | |
if st.button("π§Ή Clear Chat"): | |
for key in ["messages", "thread_id", "image_url", "image_updated", "pending_prompt"]: | |
st.session_state[key] = [] if key == "messages" else None if "url" in key else False | |
st.rerun() | |
if st.button("πͺ Logout"): | |
st.session_state.clear() | |
st.rerun() | |
show_image = st.toggle("πΈ Show Images", value=True) | |
keyword = st.text_input("Keyword Search", placeholder="e.g. mitosis, carcinoma") | |
if st.button("π Search") and keyword: | |
st.session_state.pending_prompt = f"Find clauses or references related to: {keyword}" | |
section = st.text_input("Section Lookup", placeholder="e.g. Connective Tissue") | |
if st.button("π Lookup") and section: | |
st.session_state.pending_prompt = f"Summarize or list key points from section: {section}" | |
action = st.selectbox("π Common Pathology Queries", [ | |
"Select an action...", | |
"List histological features of inflammation", | |
"Summarize features of carcinoma", | |
"List muscle types and features", | |
"Extract diagnostic markers", | |
"Summarize embryology stages" | |
]) | |
if action and action != "Select an action...": | |
st.session_state.pending_prompt = action | |
st.rerun() | |
chat_col, image_col = st.columns([2, 1]) | |
with chat_col: | |
st.markdown("### π¬ Ask a Pathology-Specific Question") | |
user_input = st.chat_input("Example: What are features of squamous cell carcinoma?") | |
if user_input or st.session_state.pending_prompt: | |
st.session_state.messages.append({ | |
"role": "user", | |
"content": user_input or st.session_state.pending_prompt | |
}) | |
st.session_state.pending_prompt = None | |
if st.session_state.messages and st.session_state.messages[-1]["role"] == "user": | |
try: | |
if not st.session_state.thread_id: | |
st.session_state.thread_id = client.beta.threads.create().id | |
client.beta.threads.messages.create( | |
thread_id=st.session_state.thread_id, | |
role="user", | |
content=st.session_state.messages[-1]["content"] | |
) | |
run = client.beta.threads.runs.create( | |
thread_id=st.session_state.thread_id, | |
assistant_id=ASSISTANT_ID | |
) | |
with st.spinner("π¬ Analyzing..."): | |
while True: | |
status = client.beta.threads.runs.retrieve(thread_id=st.session_state.thread_id, run_id=run.id) | |
if status.status in ("completed", "failed", "cancelled"): | |
break | |
time.sleep(1) | |
if status.status == "completed": | |
messages = client.beta.threads.messages.list(thread_id=st.session_state.thread_id) | |
for m in reversed(messages.data): | |
if m.role == "assistant": | |
reply = m.content[0].text.value.strip() | |
if not any(reply in msg["content"] or msg["content"] in reply for msg in st.session_state.messages if msg["role"] == "assistant"): | |
st.session_state.messages.append({"role": "assistant", "content": reply}) | |
image_matches = re.findall(r'https://raw\.githubusercontent\.com/AndrewLORTech/witspathologai/main/[^\s\n"]+\.png', reply) | |
if image_matches: | |
st.session_state.image_url = image_matches[0] | |
st.session_state.image_updated = True | |
break | |
else: | |
st.error("β Assistant failed to respond.") | |
st.rerun() | |
except Exception as e: | |
st.error(f"β Error: {e}") | |
for msg in st.session_state.messages: | |
with st.chat_message(msg["role"]): | |
st.markdown(msg["content"], unsafe_allow_html=True) | |
with image_col: | |
if show_image and st.session_state.image_url: | |
try: | |
r = requests.get(st.session_state.image_url) | |
r.raise_for_status() | |
img = Image.open(BytesIO(r.content)) | |
st.image(img, caption="π Referenced Image", use_container_width=True) | |
except Exception as e: | |
st.error(f"πΌοΈ Failed to load image: {e}") | |
# ------------------ Tab 2: Visual Reference Search ------------------ | |
with tab2: | |
ASSISTANT_ID = "asst_9v09zgizdcuuhNdcFQpRo9RO" | |
for key in ["image_thread_id", "image_response", "image_results", "image_lightbox"]: | |
if key not in st.session_state: | |
st.session_state[key] = None if key in ["image_thread_id", "image_response", "image_lightbox"] else [] | |
st.markdown("### π Search Histology Visual References") | |
image_input = st.chat_input("e.g. mitosis, seminiferous tubule, cell cycle") | |
if image_input: | |
st.session_state.image_response = None | |
st.session_state.image_results = [] | |
st.session_state.image_lightbox = None | |
try: | |
if not st.session_state.image_thread_id: | |
st.session_state.image_thread_id = client.beta.threads.create().id | |
client.beta.threads.messages.create( | |
thread_id=st.session_state.image_thread_id, | |
role="user", | |
content=image_input | |
) | |
run = client.beta.threads.runs.create( | |
thread_id=st.session_state.image_thread_id, | |
assistant_id=ASSISTANT_ID | |
) | |
with st.spinner("𧬠Retrieving histology references..."): | |
while True: | |
status = client.beta.threads.runs.retrieve( | |
thread_id=st.session_state.image_thread_id, | |
run_id=run.id | |
) | |
if status.status in ("completed", "failed", "cancelled"): | |
break | |
time.sleep(1) | |
if status.status == "completed": | |
messages = client.beta.threads.messages.list(thread_id=st.session_state.image_thread_id) | |
for msg in reversed(messages.data): | |
if msg.role == "assistant": | |
response_text = msg.content[0].text.value.strip() | |
st.session_state.image_response = response_text | |
# β Reliable parser for markdown blocks | |
blocks = response_text.split("### πΌοΈ")[1:] | |
results = [] | |
for b in blocks: | |
lines = b.strip().splitlines() | |
card = { | |
"title": lines[0].strip() if lines else "Untitled", | |
"desc": "", | |
"url": "" | |
} | |
for line in lines: | |
if "Description:" in line: | |
card["desc"] = line.split("Description:")[1].strip() | |
elif "Image URL:" in line: | |
card["url"] = line.split("Image URL:")[1].strip() | |
elif line.startswith("http") and not card["url"]: | |
card["url"] = line.strip() | |
if card["url"]: | |
results.append(card) | |
st.session_state.image_results = results | |
break | |
except Exception as e: | |
st.error(f"β Assistant Error: {e}") | |
# πΌοΈ Display images in vertical stack | |
if st.session_state.image_results: | |
st.markdown("### πΌοΈ Image Results") | |
for idx, item in enumerate(st.session_state.image_results): | |
try: | |
img = Image.open(BytesIO(requests.get(item["url"], timeout=10).content)) | |
st.image(img, use_container_width=True) | |
except: | |
st.image("https://via.placeholder.com/150?text=Image+Error", use_container_width=True) | |
st.markdown(f"**{item.get('title', 'No Title')}**", unsafe_allow_html=True) | |
st.caption(item.get("desc", "")[:200] + "..." if item.get("desc") else "") | |
if st.button("π View", key=f"img_{idx}"): | |
st.session_state.image_lightbox = item["url"] | |
# π¬ Full image lightbox | |
if st.session_state.image_lightbox: | |
st.markdown("### π¬ Full Image View") | |
try: | |
full_img = Image.open(BytesIO(requests.get(st.session_state.image_lightbox).content)) | |
st.image(full_img, use_container_width=True) | |
except Exception as e: | |
st.error(f"β οΈ Could not load full image: {e}") | |
if st.button("β Close Image View"): | |
st.session_state.image_lightbox = None | |
st.rerun() | |