witspathology / app.py
IAMTFRMZA's picture
Update app.py
8edd282 verified
raw
history blame
13.6 kB
import streamlit as st
import os
import time
import re
import requests
from PIL import Image
from io import BytesIO
from openai import OpenAI
# ------------------ Authentication ------------------
VALID_USERS = {
"[email protected]": "Pass.123",
"[email protected]": "Pass.123",
"[email protected]": "Pass.123",
"[email protected]": "Pass.123",
}
def login():
st.title("πŸ” Login Required")
email = st.text_input("Email")
password = st.text_input("Password", type="password")
if st.button("Login"):
if VALID_USERS.get(email) == password:
st.session_state.authenticated = True
st.rerun()
else:
st.error("❌ Incorrect email or password.")
if "authenticated" not in st.session_state:
st.session_state.authenticated = False
if not st.session_state.authenticated:
login()
st.stop()
# ------------------ App Config ------------------
st.set_page_config(page_title="AI Pathology Assistant", layout="wide", initial_sidebar_state="collapsed")
st.title("🧬 AI Pathology Assistant")
# ------------------ Load OpenAI ------------------
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
if not OPENAI_API_KEY:
st.error("❌ Missing OPENAI_API_KEY environment variable.")
st.stop()
client = OpenAI(api_key=OPENAI_API_KEY)
# ------------------ Assistant Setup ------------------
ASSISTANT_ID = "asst_jXDSjCG8LI4HEaFEcjFVq8KB"
# ------------------ State ------------------
if "messages" not in st.session_state:
st.session_state.messages = []
if "thread_id" not in st.session_state:
st.session_state.thread_id = None
if "image_urls" not in st.session_state:
st.session_state.image_urls = []
if "pending_prompt" not in st.session_state:
st.session_state.pending_prompt = None
# ------------------ Tabs ------------------
tab1, tab2 = st.tabs(["πŸ’¬ Chat Assistant", "πŸ–ΌοΈ Visual Reference Search"])
# ------------------ Tab 1: Chat Assistant ------------------
with tab1:
# Sidebar
with st.sidebar:
st.header("πŸ§ͺ Pathology Tools")
if st.button("🧹 Clear Chat"):
for k in ["messages", "thread_id", "image_urls", "pending_prompt"]:
st.session_state[k] = [] if k.endswith("s") else None
st.rerun()
show_image = st.toggle("πŸ“Έ Show Images", value=True)
keyword = st.text_input("Keyword Search", placeholder="e.g. mitosis, carcinoma")
if st.button("πŸ”Ž Search") and keyword:
st.session_state.pending_prompt = f"Find clauses or references related to: {keyword}"
section = st.text_input("Section Lookup", placeholder="e.g. Connective Tissue")
if section:
st.session_state.pending_prompt = f"Summarize or list key points from section: {section}"
action = st.selectbox("Common Pathology Queries", [
"Select an action...",
"List histological features of inflammation",
"Summarize features of carcinoma",
"List muscle types and features",
"Extract diagnostic markers",
"Summarize embryology stages"
])
if action != "Select an action...":
st.session_state.pending_prompt = action
chat_col, image_col = st.columns([2, 1])
with chat_col:
st.markdown("### πŸ’¬ Ask a Pathology-Specific Question")
user_input = st.chat_input("Example: What are features of squamous cell carcinoma?")
if user_input:
st.session_state.messages.append({"role": "user", "content": user_input})
elif st.session_state.pending_prompt:
st.session_state.messages.append({"role": "user", "content": st.session_state.pending_prompt})
st.session_state.pending_prompt = None
# Assistant logic
if st.session_state.messages and st.session_state.messages[-1]["role"] == "user":
try:
if not st.session_state.thread_id:
st.session_state.thread_id = client.beta.threads.create().id
client.beta.threads.messages.create(
thread_id=st.session_state.thread_id,
role="user",
content=st.session_state.messages[-1]["content"]
)
run = client.beta.threads.runs.create(
thread_id=st.session_state.thread_id,
assistant_id=ASSISTANT_ID
)
with st.spinner("πŸ”¬ Analyzing..."):
while True:
status = client.beta.threads.runs.retrieve(thread_id=st.session_state.thread_id, run_id=run.id)
if status.status in ("completed", "failed", "cancelled"):
break
time.sleep(1)
if status.status == "completed":
responses = client.beta.threads.messages.list(thread_id=st.session_state.thread_id).data
for m in reversed(responses):
if m.role == "assistant":
full_reply = m.content[0].text.value.strip()
# βœ‚οΈ Extract and remove "Some Possible Questions"
question_block_match = re.search(
r"Some Possible Questions:\s*(.*?)(\n{2,}|Other References:|$)",
full_reply,
re.DOTALL
)
question_block = None
if question_block_match:
question_block = question_block_match.group(1)
full_reply = full_reply.replace(f"Some Possible Questions:\n{question_block}", "").strip()
reply = full_reply
if not any(reply in msg["content"] or msg["content"] in reply
for msg in st.session_state.messages if msg["role"] == "assistant"):
st.session_state.messages.append({"role": "assistant", "content": reply})
# Extract image URLs
images = re.findall(
r'https://raw\.githubusercontent\.com/AndrewLORTech/witspathologai/main/[^\s\n"]+\.png',
reply
)
st.session_state.image_urls = images
break
else:
st.error("❌ Assistant failed to complete.")
st.rerun()
except Exception as e:
st.error(f"❌ Error: {e}")
# Display messages
for msg in st.session_state.messages:
with st.chat_message(msg["role"]):
st.markdown(msg["content"], unsafe_allow_html=True)
# βœ… Follow-Up Suggestions (from stripped question block)
if 'question_block' in locals() and question_block:
questions = [
line.strip(" -β€’") for line in question_block.splitlines()
if line.strip().startswith(("-", "β€’")) and line.strip().endswith("?")
]
if questions:
st.markdown("#### πŸ’‘ Follow-Up Suggestions")
for q in questions:
if st.button(f"πŸ“Œ {q}"):
st.session_state.pending_prompt = q
st.rerun()
with image_col:
if show_image and st.session_state.image_urls:
st.markdown("### πŸ–ΌοΈ Images")
for url in st.session_state.image_urls:
try:
img = Image.open(BytesIO(requests.get(url, timeout=5).content))
st.image(img, caption=url.split("/")[-1], use_container_width=True)
except Exception:
st.warning(f"⚠️ Failed to load image: {url}")
# ------------------ Tab 2: Visual Reference Search ------------------
with tab2:
ASSISTANT_ID = "asst_9v09zgizdcuuhNdcFQpRo9RO"
if "image_thread_id" not in st.session_state:
st.session_state.image_thread_id = None
if "image_response" not in st.session_state:
st.session_state.image_response = None
if "image_results" not in st.session_state:
st.session_state.image_results = []
if "image_lightbox" not in st.session_state:
st.session_state.image_lightbox = None
image_input = st.chat_input("Ask for histology visual references (e.g. ovary histology, mitosis)")
if image_input:
st.session_state.image_response = None
st.session_state.image_results = []
st.session_state.image_lightbox = None
try:
if st.session_state.image_thread_id is None:
thread = client.beta.threads.create()
st.session_state.image_thread_id = thread.id
client.beta.threads.messages.create(
thread_id=st.session_state.image_thread_id,
role="user",
content=image_input
)
run = client.beta.threads.runs.create(
thread_id=st.session_state.image_thread_id,
assistant_id=ASSISTANT_ID
)
with st.spinner("πŸ”¬ Searching for histology references..."):
while True:
run_status = client.beta.threads.runs.retrieve(
thread_id=st.session_state.image_thread_id,
run_id=run.id
)
if run_status.status in ("completed", "failed", "cancelled"):
break
time.sleep(1)
if run_status.status == "completed":
messages = client.beta.threads.messages.list(thread_id=st.session_state.image_thread_id)
for msg in reversed(messages.data):
if msg.role == "assistant":
response_text = msg.content[0].text.value
st.session_state.image_response = response_text
# βœ… FINAL FIXED PARSER: Handles Image URL on next line
lines = response_text.splitlines()
image_urls = []
expecting_url = False
for line in lines:
line_clean = line.strip().replace("**", "")
if "Image URL:" in line_clean:
parts = line_clean.split("Image URL:")
if len(parts) > 1 and parts[1].strip().startswith("http"):
image_urls.append(parts[1].strip())
else:
expecting_url = True # Flag next line
elif expecting_url:
if line_clean.startswith("http"):
image_urls.append(line_clean)
expecting_url = False
st.session_state.image_results = [{"image": url} for url in image_urls]
if image_urls and not st.session_state.image_lightbox:
st.session_state.image_lightbox = image_urls[0]
break
except Exception as e:
st.error(f"❌ Visual Assistant Error: {e}")
text_col, image_col = st.columns([2, 1])
with text_col:
if st.session_state.image_response:
st.markdown("### 🧠 Assistant Response")
st.markdown(st.session_state.image_response, unsafe_allow_html=True)
# Optional: Debug
# st.code(st.session_state.image_response, language="markdown")
with image_col:
if st.session_state.image_results:
st.markdown("### πŸ–ΌοΈ Image Preview(s)")
for i, item in enumerate(st.session_state.image_results):
image_url = item.get("image")
if image_url:
try:
encoded_url = requests.utils.requote_uri(image_url)
r = requests.get(encoded_url, timeout=10)
r.raise_for_status()
img = Image.open(BytesIO(r.content))
st.image(img, caption=encoded_url.split("/")[-1], use_container_width=True)
if st.button("πŸ” View Full Image", key=f"full_img_{i}"):
st.session_state.image_lightbox = encoded_url
except Exception as e:
st.warning(f"⚠️ Could not load: {image_url}")
st.error(f"{e}")
else:
st.info("ℹ️ No image references found yet.")
if st.session_state.image_lightbox:
st.markdown("### πŸ”¬ Full Image View")
try:
img_url = st.session_state.image_lightbox
r = requests.get(img_url, timeout=10)
r.raise_for_status()
full_img = Image.open(BytesIO(r.content))
st.image(full_img, caption=img_url.split("/")[-1], use_container_width=True)
except Exception as e:
st.warning("⚠️ Could not load full image.")
st.error(str(e))
if st.button("❌ Close Preview"):
st.session_state.image_lightbox = None
st.rerun()