import streamlit as st import os import time import re import requests from PIL import Image from io import BytesIO from urllib.parse import quote from openai import OpenAI # ------------------ Authentication ------------------ VALID_USERS = { "andrew@lortechnologies.com": "Pass.123", "asherS@schlagergroup.com.au": "Pass.123", "daniel@schlagergroup.com.au": "Pass.123", "admin@schlagergroup.com.au": "Pass.123", } def login(): st.title("๐Ÿ” Login Required") email = st.text_input("Email") password = st.text_input("Password", type="password") if st.button("Login"): if VALID_USERS.get(email) == password: st.session_state.authenticated = True st.rerun() else: st.error("โŒ Incorrect email or password.") if "authenticated" not in st.session_state: st.session_state.authenticated = False if not st.session_state.authenticated: login() st.stop() # ------------------ App Configuration ------------------ st.set_page_config(page_title="AI Pathology Assistant", layout="wide", initial_sidebar_state="collapsed") st.title("๐Ÿงฌ AI Pathology Assistant") st.caption("AI-powered exploration of pathology, anatomy, and histology documents via OCR + GPT") # ------------------ Load OpenAI API Key ------------------ OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") if not OPENAI_API_KEY: st.error("โŒ Missing OPENAI_API_KEY environment variable.") st.stop() client = OpenAI(api_key=OPENAI_API_KEY) # ------------------ Assistant Configuration ------------------ ASSISTANT_ID = "asst_jXDSjCG8LI4HEaFEcjFVq8KB" # Replace with your Assistant ID # ------------------ Session State ------------------ if "messages" not in st.session_state: st.session_state.messages = [] if "thread_id" not in st.session_state: st.session_state.thread_id = None if "image_urls" not in st.session_state: st.session_state.image_urls = [] if "pending_prompt" not in st.session_state: st.session_state.pending_prompt = None # ------------------ Sidebar ------------------ with st.sidebar: st.header("๐Ÿงช Pathology Tools") if st.button("๐Ÿงน Clear Chat"): st.session_state.messages = [] st.session_state.thread_id = None st.session_state.image_urls = [] st.session_state.pending_prompt = None st.rerun() show_image = st.toggle("๐Ÿ“ธ Show Slide Images", value=True) keyword = st.text_input("Keyword Search", placeholder="e.g. mitosis, carcinoma") if st.button("๐Ÿ”Ž Search") and keyword: st.session_state.pending_prompt = f"Find clauses or references related to: {keyword}" section = st.text_input("Section Lookup", placeholder="e.g. Connective Tissue") if section: st.session_state.pending_prompt = f"Summarize or list key points from section: {section}" actions = [ "Select an action...", "List histological features of inflammation", "Summarize features of carcinoma", "List muscle types and features", "Extract diagnostic markers", "Summarize embryology stages" ] action = st.selectbox("Common Pathology Queries", actions) if action != actions[0]: st.session_state.pending_prompt = action # ------------------ Main Chat UI ------------------ chat_col, image_col = st.columns([2, 1]) with chat_col: st.markdown("### ๐Ÿ’ฌ Ask a Pathology-Specific Question") user_input = st.chat_input("Example: What are features of squamous cell carcinoma?") if user_input: st.session_state.messages.append({"role": "user", "content": user_input}) elif st.session_state.pending_prompt: st.session_state.messages.append({"role": "user", "content": st.session_state.pending_prompt}) st.session_state.pending_prompt = None if st.session_state.messages and st.session_state.messages[-1]["role"] == "user": try: if st.session_state.thread_id is None: thread = client.beta.threads.create() st.session_state.thread_id = thread.id client.beta.threads.messages.create( thread_id=st.session_state.thread_id, role="user", content=st.session_state.messages[-1]["content"] ) run = client.beta.threads.runs.create( thread_id=st.session_state.thread_id, assistant_id=ASSISTANT_ID ) with st.spinner("๐Ÿ”ฌ Analyzing..."): while True: status = client.beta.threads.runs.retrieve(thread_id=st.session_state.thread_id, run_id=run.id) if status.status in ("completed", "failed", "cancelled"): break time.sleep(1) if status.status == "completed": messages = client.beta.threads.messages.list(thread_id=st.session_state.thread_id) for m in reversed(messages.data): if m.role == "assistant": reply = m.content[0].text.value st.session_state.messages.append({"role": "assistant", "content": reply}) # ๐Ÿ” Extract all GitHub image URLs image_matches = re.findall( r'https://raw\.githubusercontent\.com/AndrewLORTech/witspathologai/main/[^\s\n]+\.png', reply ) st.session_state.image_urls = image_matches break else: st.error("โŒ Assistant failed to respond.") st.rerun() except Exception as e: st.error(f"โŒ Error: {e}") for msg in st.session_state.messages: with st.chat_message(msg["role"]): st.markdown(msg["content"], unsafe_allow_html=True) # ------------------ Image Preview with Safe Encoding ------------------ with image_col: if show_image and st.session_state.image_urls: st.markdown("### ๐Ÿ–ผ๏ธ Slide Previews") for raw_url in st.session_state.image_urls: try: url_parts = raw_url.split("githubusercontent.com/") if len(url_parts) == 2: encoded_path = quote(url_parts[1]) encoded_url = f"https://raw.githubusercontent.com/{encoded_path}" else: encoded_url = raw_url r = requests.get(encoded_url) r.raise_for_status() img = Image.open(BytesIO(r.content)) st.image(img, caption=f"๐Ÿ“ท {encoded_url.split('/')[-1]}", use_container_width=True) except Exception as e: st.error(f"โŒ Failed to load image from {raw_url}: {e}")