Spaces:
Sleeping
Sleeping
import streamlit as st | |
import os | |
import time | |
import re | |
import requests | |
from PIL import Image | |
from io import BytesIO | |
from urllib.parse import quote | |
from openai import OpenAI | |
# ------------------ App Configuration ------------------ | |
st.set_page_config(page_title="Schlaeger Forrestdale TechDocAIA", layout="wide", initial_sidebar_state="collapsed") | |
st.title("π Schlaeger Forrestdale Document Assistant") | |
st.caption("Explore City of Armadale construction documents using AI + OCR π§ ") | |
# ------------------ Load API Key and Assistant ID ------------------ | |
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY") | |
ASSISTANT_ID = os.environ.get("ASSISTANT_ID") | |
if not OPENAI_API_KEY or not ASSISTANT_ID: | |
st.error("β Missing secrets. Please set both OPENAI_API_KEY and ASSISTANT_ID in Hugging Face Space secrets.") | |
st.stop() | |
client = OpenAI(api_key=OPENAI_API_KEY) | |
# ------------------ Session State Initialization ------------------ | |
if "messages" not in st.session_state: | |
st.session_state.messages = [] | |
if "thread_id" not in st.session_state: | |
st.session_state.thread_id = None | |
if "image_url" not in st.session_state: | |
st.session_state.image_url = None | |
if "image_updated" not in st.session_state: | |
st.session_state.image_updated = False | |
if "pending_prompt" not in st.session_state: | |
st.session_state.pending_prompt = None | |
# ------------------ Sidebar ------------------ | |
st.sidebar.header("βΉοΈ Information") | |
if st.sidebar.button("π§Ή Clear Chat"): | |
st.session_state.messages = [] | |
st.session_state.thread_id = None | |
st.session_state.image_url = None | |
st.session_state.image_updated = False | |
st.session_state.pending_prompt = None | |
st.rerun() | |
show_image = st.sidebar.toggle("π Show Page Image", value=True) | |
st.sidebar.subheader("π Document Tools") | |
st.sidebar.markdown("Use the tools below to locate relevant clauses and actions:") | |
keyword = st.sidebar.text_input("Search by Keyword", placeholder="e.g. defects, WHS, delay") | |
if st.sidebar.button("π Search Keyword") and keyword: | |
st.session_state.pending_prompt = f"Find clauses or references related to: {keyword}" | |
section_options = [ | |
"Select a section...", | |
"1. Formal Instrument of Contract", | |
"2. Offer and Acceptance", | |
"3. Key Personnel", | |
"4. Contract Pricing", | |
"5. Specifications", | |
"6. WHS Policies", | |
"7. Penalties and Delays", | |
"8. Dispute Resolution", | |
"9. Principal Obligations" | |
] | |
section_select = st.sidebar.selectbox("π Jump to Section", section_options) | |
if section_select != section_options[0]: | |
st.session_state.pending_prompt = f"Summarize or list key points from section: {section_select}" | |
actions = [ | |
"Select an action...", | |
"List all contractual obligations", | |
"Summarize payment terms", | |
"List WHS responsibilities", | |
"Find delay-related penalties", | |
"Extract dispute resolution steps" | |
] | |
action_select = st.sidebar.selectbox("βοΈ Common Contract Queries", actions) | |
if action_select != actions[0]: | |
st.session_state.pending_prompt = action_select | |
# ------------------ Layout: Chat + Image ------------------ | |
chat_col, image_col = st.columns([2, 1]) | |
# ------------------ Chat Interface ------------------ | |
with chat_col: | |
st.markdown("### π§ Ask a Document-Specific Question") | |
user_prompt = st.chat_input("Example: What is the defects liability period?") | |
# Use pending prompt from sidebar if no new chat prompt | |
if user_prompt: | |
st.session_state.messages.append({"role": "user", "content": user_prompt}) | |
elif st.session_state.pending_prompt: | |
st.session_state.messages.append({"role": "user", "content": st.session_state.pending_prompt}) | |
st.session_state.pending_prompt = None | |
if st.session_state.messages and st.session_state.messages[-1]["role"] == "user": | |
try: | |
if st.session_state.thread_id is None: | |
thread = client.beta.threads.create() | |
st.session_state.thread_id = thread.id | |
client.beta.threads.messages.create( | |
thread_id=st.session_state.thread_id, | |
role="user", | |
content=st.session_state.messages[-1]["content"] | |
) | |
run = client.beta.threads.runs.create( | |
thread_id=st.session_state.thread_id, | |
assistant_id=ASSISTANT_ID | |
) | |
with st.spinner("π€ Parsing and responding with referenced content..."): | |
while True: | |
run_status = client.beta.threads.runs.retrieve( | |
thread_id=st.session_state.thread_id, | |
run_id=run.id | |
) | |
if run_status.status in ("completed", "failed", "cancelled"): | |
break | |
time.sleep(1) | |
if run_status.status != "completed": | |
st.error(f"β οΈ Assistant failed: {run_status.status}") | |
else: | |
messages = client.beta.threads.messages.list(thread_id=st.session_state.thread_id) | |
for message in reversed(messages.data): | |
if message.role == "assistant": | |
assistant_reply = message.content[0].text.value | |
st.session_state.messages.append({"role": "assistant", "content": assistant_reply}) | |
# Parse Document Reference and Page, then construct image URL with encoding | |
match = re.search(r'Document Reference:\s*(.*?),\s*Page\s*(\d+)', assistant_reply) | |
if match: | |
doc_name = match.group(1).strip() | |
page = int(match.group(2)) | |
page_str = f"{page:04d}" | |
folder = quote(doc_name) | |
image_url = ( | |
f"https://raw.githubusercontent.com/AndrewLORTech/c2ozschlaegerforrestdale/main/" | |
f"{folder}/{folder}_page_{page_str}.png" | |
) | |
st.session_state.image_url = image_url | |
st.session_state.image_updated = True | |
break | |
st.rerun() | |
except Exception as e: | |
st.error(f"β Error: {e}") | |
for msg in st.session_state.messages: | |
with st.chat_message(msg["role"]): | |
st.markdown(msg["content"], unsafe_allow_html=True) | |
# ------------------ Image Display ------------------ | |
with image_col: | |
if show_image and st.session_state.image_url: | |
with st.spinner("Loading document preview..."): | |
try: | |
response = requests.get(st.session_state.image_url) | |
response.raise_for_status() | |
img = Image.open(BytesIO(response.content)) | |
st.image(img, caption="π OCR Page Image", use_container_width=True) | |
st.session_state.image_updated = False | |
except Exception as e: | |
st.error(f"β Failed to load image: {e}") | |