Spaces:
Running
Running
import streamlit as st | |
st.set_page_config(page_title="Schlager Forrestdale DocAIAssist", layout="wide", initial_sidebar_state="collapsed") | |
import os | |
import time | |
import re | |
import requests | |
import json | |
from PIL import Image | |
from io import BytesIO | |
from urllib.parse import quote | |
from openai import OpenAI | |
# ------------------ Authentication ------------------ | |
VALID_USERS = { | |
"[email protected]": "Pass.123", | |
"[email protected]": "Pass.123", | |
"[email protected]": "Pass.123", | |
"[email protected]": "Pass.123", | |
} | |
def login(): | |
st.title("π Login Required") | |
email = st.text_input("Email") | |
password = st.text_input("Password", type="password") | |
if st.button("Login"): | |
if VALID_USERS.get(email) == password: | |
st.session_state["authenticated"] = True | |
st.experimental_set_query_params(logged_in="1") | |
st.rerun() | |
else: | |
st.error("β Incorrect email or password.") | |
if not st.session_state.get("authenticated", False): | |
login() | |
st.stop() | |
# ------------------ App Configuration ------------------ | |
st.title("π Schlager Forrestdale Document Assistant") | |
st.caption("Explore City of Armadale construction documents using AI + OCR π§") | |
# ------------------ Load API Key ------------------ | |
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") | |
if not OPENAI_API_KEY: | |
st.error("β Missing OPENAI_API_KEY environment variable.") | |
st.stop() | |
client = OpenAI(api_key=OPENAI_API_KEY) | |
# ------------------ Tabs (Reordered) ------------------ | |
tab1, tab2, tab3 = st.tabs(["π¬ Assistant", "π Contract Queries", "π Visual Queries"]) | |
# ------------------ Tab 1: General Chat Assistant ------------------ | |
with tab1: | |
CHAT_ASSISTANT_ID = "asst_wQ1SwtuT6V51pJurVbO583Bk" | |
if "chatbot_messages" not in st.session_state: | |
st.session_state.chatbot_messages = [] | |
if "chatbot_thread_id" not in st.session_state: | |
st.session_state.chatbot_thread_id = None | |
with st.sidebar: | |
st.header("π¬ Chat Assistant Options") | |
if st.button("π§Ή Clear Chat Assistant"): | |
st.session_state.chatbot_messages = [] | |
st.session_state.chatbot_thread_id = None | |
st.rerun() | |
if st.button("πͺ Logout"): | |
st.session_state.clear() | |
st.rerun() | |
st.markdown("### π€ General Chat Assistant") | |
user_prompt = st.chat_input("Ask anything related to the project or documents...") | |
if user_prompt: | |
st.session_state.chatbot_messages.append({"role": "user", "content": user_prompt}) | |
for msg in st.session_state.chatbot_messages: | |
with st.chat_message(msg["role"]): | |
st.markdown(msg["content"], unsafe_allow_html=True) | |
if st.session_state.chatbot_messages and st.session_state.chatbot_messages[-1]["role"] == "user": | |
try: | |
if st.session_state.chatbot_thread_id is None: | |
thread = client.beta.threads.create() | |
st.session_state.chatbot_thread_id = thread.id | |
for m in st.session_state.chatbot_messages[:-1]: | |
client.beta.threads.messages.create( | |
thread_id=st.session_state.chatbot_thread_id, | |
role=m["role"], | |
content=m["content"] | |
) | |
client.beta.threads.messages.create( | |
thread_id=st.session_state.chatbot_thread_id, | |
role="user", | |
content=st.session_state.chatbot_messages[-1]["content"] | |
) | |
run = client.beta.threads.runs.create( | |
thread_id=st.session_state.chatbot_thread_id, | |
assistant_id=CHAT_ASSISTANT_ID | |
) | |
with st.spinner("π€ Assistant replying..."): | |
while True: | |
status = client.beta.threads.runs.retrieve( | |
thread_id=st.session_state.chatbot_thread_id, | |
run_id=run.id | |
) | |
if status.status in ("completed", "failed", "cancelled"): | |
break | |
time.sleep(1) | |
if status.status == "completed": | |
messages = client.beta.threads.messages.list( | |
thread_id=st.session_state.chatbot_thread_id | |
) | |
assistant_replies = [m for m in messages.data if m.role == "assistant"] | |
if assistant_replies: | |
latest_reply = "" | |
for item in assistant_replies[0].content: | |
if item.type == "text": | |
latest_reply += item.text.value.strip() + "\n" | |
latest_reply = latest_reply.strip() | |
if latest_reply and not any( | |
m["content"].strip() == latest_reply | |
for m in st.session_state.chatbot_messages | |
if m["role"] == "assistant" | |
): | |
st.session_state.chatbot_messages.append({ | |
"role": "assistant", | |
"content": latest_reply | |
}) | |
st.rerun() | |
else: | |
st.error("β Assistant failed to respond.") | |
except Exception as e: | |
st.error(f"β Chat Assistant Error: {e}") | |
# ------------------ Tab 2: Contract Queries ------------------ | |
with tab2: | |
ASSISTANT_ID = "asst_Qcl6k4bYxK2UiOfBfqsC6LZv" | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
if "thread_id" not in st.session_state: | |
st.session_state.thread_id = None | |
if "image_url" not in st.session_state: | |
st.session_state.image_url = None | |
if "image_updated" not in st.session_state: | |
st.session_state.image_updated = False | |
if "pending_prompt" not in st.session_state: | |
st.session_state.pending_prompt = None | |
with st.sidebar: | |
st.header("βΉοΈ Contract Tools") | |
if st.button("π§Ή Clear Chat"): | |
st.session_state.messages = [] | |
st.session_state.thread_id = None | |
st.session_state.image_url = None | |
st.session_state.image_updated = False | |
st.session_state.pending_prompt = None | |
st.rerun() | |
show_image = st.toggle("π Show Page Image", value=True) | |
keyword = st.text_input("Search by Keyword", placeholder="e.g. defects, WHS, delay") | |
if st.button("π Search Keyword") and keyword: | |
st.session_state.pending_prompt = f"Find clauses or references related to: {keyword}" | |
section = st.selectbox("π Jump to Section", [ | |
"Select a section...", | |
"1. Formal Instrument of Contract", | |
"2. Offer and Acceptance", | |
"3. Key Personnel", | |
"4. Contract Pricing", | |
"5. Specifications", | |
"6. WHS Policies", | |
"7. Penalties and Delays", | |
"8. Dispute Resolution", | |
"9. Principal Obligations" | |
]) | |
if section and section != "Select a section...": | |
st.session_state.pending_prompt = f"Summarize or list key points from section: {section}" | |
st.rerun() | |
action = st.selectbox("βοΈ Common Queries", [ | |
"Select an action...", | |
"List all contractual obligations", | |
"Summarize payment terms", | |
"List WHS responsibilities", | |
"Find delay-related penalties", | |
"Extract dispute resolution steps" | |
]) | |
if action and action != "Select an action...": | |
st.session_state.pending_prompt = action | |
st.rerun() | |
chat_col, image_col = st.columns([2, 1]) | |
with chat_col: | |
st.markdown("### π§ Ask a Document-Specific Question") | |
user_input = st.chat_input("Example: What is the defects liability period?") | |
if user_input: | |
st.session_state.messages.append({"role": "user", "content": user_input}) | |
elif st.session_state.pending_prompt: | |
st.session_state.messages.append({"role": "user", "content": st.session_state.pending_prompt}) | |
st.session_state.pending_prompt = None | |
if st.session_state.messages and st.session_state.messages[-1]["role"] == "user": | |
try: | |
if st.session_state.thread_id is None: | |
thread = client.beta.threads.create() | |
st.session_state.thread_id = thread.id | |
client.beta.threads.messages.create( | |
thread_id=st.session_state.thread_id, | |
role="user", | |
content=st.session_state.messages[-1]["content"] | |
) | |
run = client.beta.threads.runs.create( | |
thread_id=st.session_state.thread_id, | |
assistant_id=ASSISTANT_ID | |
) | |
with st.spinner("π€ Thinking..."): | |
while True: | |
status = client.beta.threads.runs.retrieve(thread_id=st.session_state.thread_id, run_id=run.id) | |
if status.status in ("completed", "failed", "cancelled"): | |
break | |
time.sleep(1) | |
if status.status == "completed": | |
messages = client.beta.threads.messages.list(thread_id=st.session_state.thread_id) | |
for m in reversed(messages.data): | |
if m.role == "assistant": | |
reply = m.content[0].text.value.strip() | |
if not any(reply in msg["content"] for msg in st.session_state.messages if msg["role"] == "assistant"): | |
st.session_state.messages.append({"role": "assistant", "content": reply}) | |
match = re.search(r'π\s*(.*?)\.txt\s*π’.*?Page\s*(\d+)', reply) | |
if match: | |
doc, page = match.group(1).strip(), int(match.group(2)) | |
folder = quote(doc) | |
img_url = f"https://raw.githubusercontent.com/AndrewLORTech/c2ozschlaegerforrestdale/main/{folder}/{folder}_page_{page:04d}.png" | |
st.session_state.image_url = img_url | |
st.session_state.image_updated = True | |
break | |
else: | |
st.error("β Assistant failed to respond.") | |
st.rerun() | |
except Exception as e: | |
st.error(f"β Assistant Error: {e}") | |
for msg in reversed(st.session_state.messages): | |
with st.chat_message(msg["role"]): | |
st.markdown(msg["content"], unsafe_allow_html=True) | |
if st.session_state.messages and st.session_state.messages[-1]["role"] == "assistant": | |
latest_reply = st.session_state.messages[-1]["content"] | |
match = re.search(r"###\s*\*\*Some Possible Questions:\*\*(.*?)###", latest_reply + "###", re.DOTALL) | |
questions = [] | |
if match: | |
block = match.group(1) | |
questions = [line.strip(" -β’") for line in block.splitlines() if line.strip().startswith(("-", "β’"))] | |
if questions: | |
st.markdown("#### π‘ Follow-Up Suggestions") | |
for q in questions: | |
if st.button(f"π {q}"): | |
st.session_state.pending_prompt = q | |
st.rerun() | |
with image_col: | |
if show_image and st.session_state.image_url: | |
try: | |
r = requests.get(st.session_state.image_url) | |
r.raise_for_status() | |
img = Image.open(BytesIO(r.content)) | |
st.image(img, caption="π OCR Page Image", use_container_width=True) | |
except Exception as e: | |
st.error(f"πΌοΈ Failed to load image: {e}") | |
# ------------------ Technical Tab ------------------ | |
with tab3: | |
ASSISTANT_ID = "asst_DjvuWBc7tCvMbAhY7n1em4BZ" | |
if "tech_messages" not in st.session_state: | |
st.session_state.tech_messages = [] | |
if "tech_thread_id" not in st.session_state: | |
st.session_state.tech_thread_id = None | |
if "tech_results" not in st.session_state: | |
st.session_state.tech_results = [] | |
if "tech_lightbox" not in st.session_state: | |
st.session_state.tech_lightbox = None | |
tech_input = st.chat_input("Ask about plans, drawings or components") | |
if tech_input: | |
st.session_state.tech_messages.append({"role": "user", "content": tech_input}) | |
if st.session_state.tech_messages and st.session_state.tech_messages[-1]["role"] == "user": | |
try: | |
if st.session_state.tech_thread_id is None: | |
thread = client.beta.threads.create() | |
st.session_state.tech_thread_id = thread.id | |
client.beta.threads.messages.create( | |
thread_id=st.session_state.tech_thread_id, | |
role="user", | |
content=st.session_state.tech_messages[-1]["content"] | |
) | |
run = client.beta.threads.runs.create( | |
thread_id=st.session_state.tech_thread_id, | |
assistant_id=ASSISTANT_ID | |
) | |
with st.spinner("π Searching technical drawings..."): | |
while True: | |
run_status = client.beta.threads.runs.retrieve( | |
thread_id=st.session_state.tech_thread_id, | |
run_id=run.id | |
) | |
if run_status.status in ("completed", "failed", "cancelled"): | |
break | |
time.sleep(1) | |
if run_status.status == "completed": | |
messages = client.beta.threads.messages.list(thread_id=st.session_state.tech_thread_id) | |
for msg in reversed(messages.data): | |
if msg.role == "assistant": | |
raw = msg.content[0].text.value.strip() | |
try: | |
match = re.search(r"```json(.*?)```", raw, re.DOTALL) | |
if match: | |
json_data = json.loads(match.group(1).strip()) | |
st.session_state.tech_results = json_data | |
st.session_state.tech_messages.append({"role": "assistant", "content": raw}) | |
except Exception as e: | |
st.error(f"β Failed to parse assistant response: {e}") | |
break | |
except Exception as e: | |
st.error(f"β Technical Assistant Error: {e}") | |
with st.expander("π§ Options (Filter + Pagination)", expanded=False): | |
disciplines = sorted(set(d.get("discipline", "") for d in st.session_state.tech_results)) | |
selected_disciplines = st.multiselect("π Filter by discipline", disciplines, default=disciplines) | |
page_size = 8 | |
page = st.number_input("Page", min_value=1, step=1, value=1) | |
if st.session_state.tech_results: | |
st.subheader("π Results") | |
results = [r for r in st.session_state.tech_results if r.get("discipline") in selected_disciplines] | |
paged = results[(page - 1) * page_size : page * page_size] | |
cols = st.columns(4) | |
for i, item in enumerate(paged): | |
with cols[i % 4]: | |
st.markdown(f"**π {item['drawing_number']} ({item['discipline']})**") | |
st.caption(item.get("summary", "")) | |
image_urls = item.get("images", []) | |
if not image_urls: | |
st.warning("β οΈ No image available.") | |
else: | |
for j, url in enumerate(image_urls): | |
try: | |
st.image(url, caption=f"Page {j+1}", use_container_width=True) | |
except Exception as e: | |
st.error(f"β Could not load image: {e}") | |
if st.button("πΌοΈ View All Pages", key=f"thumb_{i}"): | |
st.session_state.tech_lightbox = image_urls | |
if isinstance(st.session_state.tech_lightbox, list): | |
st.subheader("π Enlarged Drawing Preview") | |
for url in st.session_state.tech_lightbox: | |
st.image(url, use_container_width=True) | |
if st.button("β Close Preview"): | |
st.session_state.tech_lightbox = None | |
st.rerun() | |
else: | |
st.info("π No matching drawings found. Try a different prompt.") | |
# π‘ Static Prompt Suggestions for All Disciplines | |
static_prompts = [ | |
"Find architectural site plans", | |
"List structural foundation drawings", | |
"Show civil earthworks plans", | |
"Locate mechanical ventilation layouts", | |
"Display hydraulic waste and drainage drawings", | |
"Show electrical lighting and power layouts", | |
"Find all tree protection zone drawings", | |
"Where are the electrical switchboards?", | |
"What is the HVAC equipment layout?", | |
"Show roof framing details" | |
] | |
with st.expander("π‘ Try a Prompt", expanded=False): | |
cols = st.columns(3) | |
for i, prompt in enumerate(static_prompts): | |
with cols[i % 3]: | |
if st.button(f"π€ {prompt}", key=f"prompt_static_{i}"): | |
st.session_state.tech_messages.append({"role": "user", "content": prompt}) | |
st.rerun() |