witspathologyv2 / app.py
IAMTFRMZA's picture
Update app.py
9ebcebb verified
raw
history blame
11.7 kB
import streamlit as st
import os
import time
import re
import requests
import json
from PIL import Image
from io import BytesIO
from urllib.parse import quote
from openai import OpenAI
# ------------------ Authentication ------------------
VALID_USERS = {
"[email protected]": "Pass.123",
"[email protected]": "Pass.123",
"[email protected]": "Pass.123",
"[email protected]": "Pass.123",
}
def login():
st.title("πŸ” Login Required")
email = st.text_input("Email")
password = st.text_input("Password", type="password")
if st.button("Login"):
if VALID_USERS.get(email) == password:
st.session_state.authenticated = True
st.rerun()
else:
st.error("❌ Incorrect email or password.")
if "authenticated" not in st.session_state:
st.session_state.authenticated = False
if not st.session_state.authenticated:
login()
st.stop()
# ------------------ App Configuration ------------------
st.set_page_config(page_title="Schlager Forrestdale DocAIAssist", layout="wide", initial_sidebar_state="collapsed")
st.title("πŸ“„ Schlager Forrestdale Document Assistant")
st.caption("Explore City of Armadale construction documents using AI + OCR 🧐")
# ------------------ Load API Key ------------------
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY")
if not OPENAI_API_KEY:
st.error("❌ Missing OPENAI_API_KEY environment variable.")
st.stop()
client = OpenAI(api_key=OPENAI_API_KEY)
# ------------------ Tabs ------------------
tab1, tab2 = st.tabs(["πŸ“‘ Contract", "πŸ“ Technical"])
# ------------------ Contract Tab ------------------
with tab1:
ASSISTANT_ID = "asst_KsQRedoJUnEeStzfox1o06lO"
# Correct session state initialization
if "messages" not in st.session_state:
st.session_state.messages = []
if "thread_id" not in st.session_state:
st.session_state.thread_id = None
if "image_url" not in st.session_state:
st.session_state.image_url = None
if "image_updated" not in st.session_state:
st.session_state.image_updated = False
if "pending_prompt" not in st.session_state:
st.session_state.pending_prompt = None
# Sidebar tools
with st.sidebar:
st.header("ℹ️ Contract Tools")
if st.button("🧹 Clear Chat"):
st.session_state.messages = []
st.session_state.thread_id = None
st.session_state.image_url = None
st.session_state.image_updated = False
st.session_state.pending_prompt = None
st.rerun()
show_image = st.toggle("πŸ“‘ Show Page Image", value=True)
keyword = st.text_input("Search by Keyword", placeholder="e.g. defects, WHS, delay")
if st.button("πŸ”Ž Search Keyword") and keyword:
st.session_state.pending_prompt = f"Find clauses or references related to: {keyword}"
section_options = [
"Select a section...",
"1. Formal Instrument of Contract",
"2. Offer and Acceptance",
"3. Key Personnel",
"4. Contract Pricing",
"5. Specifications",
"6. WHS Policies",
"7. Penalties and Delays",
"8. Dispute Resolution",
"9. Principal Obligations"
]
section = st.selectbox("πŸ“„ Jump to Section", section_options)
if section != section_options[0]:
st.session_state.pending_prompt = f"Summarize or list key points from section: {section}"
actions = [
"Select an action...",
"List all contractual obligations",
"Summarize payment terms",
"List WHS responsibilities",
"Find delay-related penalties",
"Extract dispute resolution steps"
]
action = st.selectbox("βš™οΈ Common Queries", actions)
if action != actions[0]:
st.session_state.pending_prompt = action
# Chat + Image layout
chat_col, image_col = st.columns([2, 1])
with chat_col:
st.markdown("### 🧠 Ask a Document-Specific Question")
user_input = st.chat_input("Example: What is the defects liability period?")
if user_input:
st.session_state.messages.append({"role": "user", "content": user_input})
elif st.session_state.pending_prompt:
st.session_state.messages.append({"role": "user", "content": st.session_state.pending_prompt})
st.session_state.pending_prompt = None
if st.session_state.messages and st.session_state.messages[-1]["role"] == "user":
try:
if st.session_state.thread_id is None:
thread = client.beta.threads.create()
st.session_state.thread_id = thread.id
client.beta.threads.messages.create(
thread_id=st.session_state.thread_id,
role="user",
content=st.session_state.messages[-1]["content"]
)
run = client.beta.threads.runs.create(
thread_id=st.session_state.thread_id,
assistant_id=ASSISTANT_ID
)
with st.spinner("πŸ€– Thinking..."):
while True:
status = client.beta.threads.runs.retrieve(thread_id=st.session_state.thread_id, run_id=run.id)
if status.status in ("completed", "failed", "cancelled"):
break
time.sleep(1)
if status.status == "completed":
messages = client.beta.threads.messages.list(thread_id=st.session_state.thread_id)
for m in reversed(messages.data):
if m.role == "assistant":
reply = m.content[0].text.value
st.session_state.messages.append({"role": "assistant", "content": reply})
match = re.search(r'Document Reference:\s*(.*?),\s*Page\s*(\d+)', reply)
if match:
doc, page = match.group(1).strip(), int(match.group(2))
folder = quote(doc)
img_url = f"https://raw.githubusercontent.com/AndrewLORTech/c2ozschlaegerforrestdale/main/{folder}/{folder}_page_{page:04d}.png"
st.session_state.image_url = img_url
st.session_state.image_updated = True
break
else:
st.error("❌ Assistant failed.")
st.rerun()
except Exception as e:
st.error(f"❌ Error: {e}")
for msg in st.session_state.messages:
with st.chat_message(msg["role"]):
st.markdown(msg["content"], unsafe_allow_html=True)
with image_col:
if show_image and st.session_state.image_url:
try:
r = requests.get(st.session_state.image_url)
r.raise_for_status()
img = Image.open(BytesIO(r.content))
st.image(img, caption="πŸ“„ OCR Page Image", use_container_width=True)
except Exception as e:
st.error(f"πŸ–ΌοΈ Image failed: {e}")
# ------------------ Technical Tab ------------------
# ------------------ Technical Tab ------------------
with tab2:
ASSISTANT_ID = "asst_DjvuWBc7tCvMbAhY7n1em4BZ"
if "tech_messages" not in st.session_state:
st.session_state.tech_messages = []
if "tech_results" not in st.session_state:
st.session_state.tech_results = []
if "tech_lightbox" not in st.session_state:
st.session_state.tech_lightbox = None
tech_input = st.chat_input("Ask about plans, drawings or components")
if tech_input:
# Clear old thread/results each time
st.session_state.tech_messages.append({"role": "user", "content": tech_input})
st.session_state.tech_results = []
try:
# Always create a new thread for each new query
thread = client.beta.threads.create()
tech_thread_id = thread.id
# Send user message
client.beta.threads.messages.create(
thread_id=tech_thread_id,
role="user",
content=tech_input
)
# Run assistant
run = client.beta.threads.runs.create(
thread_id=tech_thread_id,
assistant_id=ASSISTANT_ID
)
with st.spinner("πŸ” Searching technical drawings..."):
while True:
run_status = client.beta.threads.runs.retrieve(
thread_id=tech_thread_id,
run_id=run.id
)
if run_status.status in ("completed", "failed", "cancelled"):
break
time.sleep(1)
if run_status.status == "completed":
messages = client.beta.threads.messages.list(thread_id=tech_thread_id)
for msg in reversed(messages.data):
if msg.role == "assistant":
content = msg.content[0].text.value
st.session_state.tech_messages.append({"role": "assistant", "content": content})
# Parse JSON from assistant response
try:
st.session_state.tech_results = json.loads(content.strip("json ").strip())
except Exception as e:
st.warning(f"⚠️ Failed to parse drawing data: {e}")
break
except Exception as e:
st.error(f"❌ Technical Assistant Error: {e}")
# Expander for filtering and pagination
with st.expander("πŸ”§ Options (Filter + Pagination)", expanded=False):
disciplines = sorted(set(d.get("discipline", "") for d in st.session_state.tech_results))
selected = st.selectbox("🌍 Filter by discipline", ["All"] + disciplines)
page_size = 8
page = st.number_input("Page", min_value=1, step=1, value=1)
# Display results
if st.session_state.tech_results:
st.subheader("πŸ“‚ Results")
results = [r for r in st.session_state.tech_results if selected == "All" or r.get("discipline") == selected]
paged = results[(page - 1) * page_size : page * page_size]
cols = st.columns(4)
for i, item in enumerate(paged):
with cols[i % 4]:
st.markdown(f"**πŸ“ {item['drawing_number']} ({item['discipline']})**")
st.caption(item.get("summary", ""))
for url in item.get("images", [])[:1]:
if st.button("πŸ–ΌοΈ View Drawing Details", key=f"thumb_{i}"):
st.session_state.tech_lightbox = url
st.image(url, caption=f"{item['drawing_number']} - Page 1", use_container_width=True)
if st.session_state.tech_lightbox:
st.image(st.session_state.tech_lightbox, caption="πŸ” Enlarged Drawing Preview", use_container_width=True)
if st.button("❌ Close Preview"):
st.session_state.tech_lightbox = None
st.rerun()
else:
# Only show chat messages if no results
for msg in st.session_state.tech_messages:
with st.chat_message(msg["role"]):
st.markdown(msg["content"], unsafe_allow_html=True)