Spaces:
Running
Running
import streamlit as st | |
import os | |
import time | |
import re | |
import requests | |
import json | |
from PIL import Image | |
from io import BytesIO | |
from urllib.parse import quote | |
from openai import OpenAI | |
# ------------------ App Configuration ------------------ | |
st.set_page_config(page_title="Schlaeger Forrestdale DocAIA", layout="wide", initial_sidebar_state="collapsed") | |
st.title("π Schlaeger Forrestdale Document Assistant") | |
st.caption("Explore City of Armadale construction documents using AI + OCR π§ ") | |
# ------------------ Load API Key ------------------ | |
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") | |
if not OPENAI_API_KEY: | |
st.error("β Missing OPENAI_API_KEY in Hugging Face Space secrets.") | |
st.stop() | |
client = OpenAI(api_key=OPENAI_API_KEY) | |
# ------------------ Tabs ------------------ | |
tabs = st.tabs(["π Contract", "π Technical"]) | |
# ================== CONTRACT TAB ================== | |
with tabs[0]: | |
ASSISTANT_ID = "asst_KsQRedoJUnEeStzfox1o06lO" | |
if "contract_messages" not in st.session_state: | |
st.session_state.contract_messages = [] | |
if "contract_thread_id" not in st.session_state: | |
st.session_state.contract_thread_id = None | |
if "image_url" not in st.session_state: | |
st.session_state.image_url = None | |
st.sidebar.header("π Contract Tools") | |
if st.sidebar.button("π§Ή Clear Chat", key="clear_contract"): | |
st.session_state.contract_messages = [] | |
st.session_state.contract_thread_id = None | |
st.session_state.image_url = None | |
st.rerun() | |
show_image = st.sidebar.toggle("π Show Page Image", value=True, key="show_image_toggle") | |
keyword = st.sidebar.text_input("Search by Keyword", key="kw") | |
if st.sidebar.button("π Search", key="kw_btn") and keyword: | |
st.session_state.contract_messages.append({"role": "user", "content": f"Find clauses or references related to: {keyword}"}) | |
section = st.sidebar.selectbox("π Jump to Section", [ | |
"Select a section...", | |
"1. Formal Instrument of Contract", | |
"2. Offer and Acceptance", | |
"3. Key Personnel", | |
"4. Contract Pricing", | |
"5. Specifications", | |
"6. WHS Policies", | |
"7. Penalties and Delays", | |
"8. Dispute Resolution", | |
"9. Principal Obligations" | |
]) | |
if section != "Select a section...": | |
st.session_state.contract_messages.append({"role": "user", "content": f"Summarize or list key points from section: {section}"}) | |
action = st.sidebar.selectbox("βοΈ Common Queries", [ | |
"Select an action...", | |
"List all contractual obligations", | |
"Summarize payment terms", | |
"List WHS responsibilities", | |
"Find delay-related penalties", | |
"Extract dispute resolution steps" | |
]) | |
if action != "Select an action...": | |
st.session_state.contract_messages.append({"role": "user", "content": action}) | |
chat_col, img_col = st.columns([2, 1]) | |
with chat_col: | |
st.markdown("### π§ Ask Contract Document Question") | |
user_input = st.chat_input("Ask something about the contract") | |
if user_input: | |
st.session_state.contract_messages.append({"role": "user", "content": user_input}) | |
if st.session_state.contract_messages and st.session_state.contract_messages[-1]["role"] == "user": | |
try: | |
if st.session_state.contract_thread_id is None: | |
thread = client.beta.threads.create() | |
st.session_state.contract_thread_id = thread.id | |
client.beta.threads.messages.create( | |
thread_id=st.session_state.contract_thread_id, | |
role="user", | |
content=st.session_state.contract_messages[-1]["content"] | |
) | |
run = client.beta.threads.runs.create( | |
thread_id=st.session_state.contract_thread_id, | |
assistant_id=ASSISTANT_ID | |
) | |
with st.spinner("π€ Analyzing contract..."): | |
while True: | |
status = client.beta.threads.runs.retrieve( | |
thread_id=st.session_state.contract_thread_id, | |
run_id=run.id | |
) | |
if status.status in ("completed", "failed", "cancelled"): | |
break | |
time.sleep(1) | |
if status.status == "completed": | |
messages = client.beta.threads.messages.list(thread_id=st.session_state.contract_thread_id) | |
for m in reversed(messages.data): | |
if m.role == "assistant": | |
content = m.content[0].text.value | |
st.session_state.contract_messages.append({"role": "assistant", "content": content}) | |
match = re.search(r'Document Reference:\s*(.*?),\s*Page\s*(\d+)', content) | |
if match: | |
doc, page = match.group(1).strip(), int(match.group(2)) | |
page_str = f"{page:04d}" | |
folder = quote(doc) | |
st.session_state.image_url = f"https://raw.githubusercontent.com/AndrewLORTech/c2ozschlaegerforrestdale/main/{folder}/{folder}_page_{page_str}.png" | |
break | |
else: | |
st.error("β Assistant failed.") | |
st.rerun() | |
except Exception as e: | |
st.error(f"β Error: {e}") | |
for msg in st.session_state.contract_messages: | |
with st.chat_message(msg["role"]): | |
st.markdown(msg["content"], unsafe_allow_html=True) | |
with img_col: | |
if show_image and st.session_state.image_url: | |
with st.spinner("Loading image..."): | |
try: | |
res = requests.get(st.session_state.image_url) | |
img = Image.open(BytesIO(res.content)) | |
st.image(img, caption="π OCR Page Image", use_container_width=True) | |
except: | |
st.error("β οΈ Failed to load image.") | |
# ================== TECHNICAL TAB ================== | |
with tabs[1]: | |
ASSISTANT_ID = "asst_DjvuWBc7tCvMbAhY7n1em4BZ" | |
if "tech_messages" not in st.session_state: | |
st.session_state.tech_messages = [] | |
if "tech_thread_id" not in st.session_state: | |
st.session_state.tech_thread_id = None | |
if "results" not in st.session_state: | |
st.session_state.results = [] | |
if "lightbox_url" not in st.session_state: | |
st.session_state.lightbox_url = None | |
prompt = st.chat_input("Ask about plans, drawings or components") | |
if prompt: | |
st.session_state.tech_messages.append({"role": "user", "content": prompt}) | |
if st.session_state.tech_messages and st.session_state.tech_messages[-1]["role"] == "user": | |
try: | |
if st.session_state.tech_thread_id is None: | |
thread = client.beta.threads.create() | |
st.session_state.tech_thread_id = thread.id | |
client.beta.threads.messages.create( | |
thread_id=st.session_state.tech_thread_id, | |
role="user", | |
content=st.session_state.tech_messages[-1]["content"] | |
) | |
run = client.beta.threads.runs.create( | |
thread_id=st.session_state.tech_thread_id, | |
assistant_id=ASSISTANT_ID | |
) | |
with st.spinner("π€ Querying technical documents..."): | |
while True: | |
status = client.beta.threads.runs.retrieve( | |
thread_id=st.session_state.tech_thread_id, | |
run_id=run.id | |
) | |
if status.status in ("completed", "failed", "cancelled"): | |
break | |
time.sleep(1) | |
if status.status == "completed": | |
messages = client.beta.threads.messages.list(thread_id=st.session_state.tech_thread_id) | |
for m in reversed(messages.data): | |
if m.role == "assistant": | |
content = m.content[0].text.value | |
st.session_state.tech_messages.append({"role": "assistant", "content": content}) | |
try: | |
json_data = json.loads(content.strip("`json ")) | |
st.session_state.results = json_data | |
except: | |
st.session_state.results = [] | |
break | |
else: | |
st.error("β οΈ Assistant failed to complete.") | |
st.rerun() | |
except Exception as e: | |
st.error(f"β Error: {e}") | |
if st.session_state.results: | |
disciplines = sorted(set(d.get("discipline", "") for d in st.session_state.results)) | |
selected = st.selectbox("π Filter by discipline", ["All"] + disciplines) | |
page_size = 8 | |
page_num = st.number_input("Page", min_value=1, step=1, value=1) | |
filtered = [r for r in st.session_state.results if selected == "All" or r.get("discipline") == selected] | |
paged = filtered[(page_num - 1) * page_size : page_num * page_size] | |
st.markdown("---") | |
st.subheader("π Drawing Results") | |
cols = st.columns(4) | |
for i, item in enumerate(paged): | |
with cols[i % 4]: | |
st.markdown(f"**{item['drawing_number']}**") | |
st.markdown(f"_Discipline: {item['discipline']}_") | |
st.caption(item.get("summary", "")) | |
for url in item.get("images", [])[:1]: | |
if st.button("πΌοΈ View Image", key=f"view_{i}"): | |
st.session_state.lightbox_url = url | |
if st.session_state.lightbox_url: | |
st.markdown("---") | |
st.image(st.session_state.lightbox_url, use_column_width=True, caption="π Enlarged Preview") | |
if st.button("β Close Viewer"): | |
st.session_state.lightbox_url = None | |
st.rerun() | |
else: | |
for msg in st.session_state.tech_messages: | |
with st.chat_message(msg["role"]): | |
st.markdown(msg["content"], unsafe_allow_html=True) | |