|
import streamlit as st |
|
from transformers import AutoTokenizer, AutoModelForCausalLM, TextIteratorStreamer |
|
from huggingface_hub import login |
|
from threading import Thread |
|
import PyPDF2 |
|
import pandas as pd |
|
import torch |
|
|
|
|
|
st.set_page_config( |
|
page_title="WizNerd Insp", |
|
page_icon="π", |
|
layout="centered" |
|
) |
|
|
|
|
|
MODEL_NAME = "amiguel/optimizedModelListing6.1" |
|
|
|
|
|
st.title("π WizNerd Insp π") |
|
|
|
|
|
with st.sidebar: |
|
st.header("Authentication π") |
|
hf_token = st.text_input("Hugging Face Token", type="password", |
|
help="Get your token from https://huggingface.co/settings/tokens") |
|
|
|
st.header("Upload Documents π") |
|
uploaded_file = st.file_uploader( |
|
"Choose a PDF or XLSX file", |
|
type=["pdf", "xlsx"], |
|
label_visibility="collapsed" |
|
) |
|
|
|
|
|
if "messages" not in st.session_state: |
|
st.session_state.messages = [] |
|
|
|
|
|
@st.cache_data |
|
def process_file(uploaded_file): |
|
if uploaded_file is None: |
|
return "" |
|
|
|
try: |
|
if uploaded_file.type == "application/pdf": |
|
pdf_reader = PyPDF2.PdfReader(uploaded_file) |
|
return "\n".join([page.extract_text() for page in pdf_reader.pages]) |
|
elif uploaded_file.type == "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": |
|
df = pd.read_excel(uploaded_file) |
|
return df.to_markdown() |
|
except Exception as e: |
|
st.error(f"π Error processing file: {str(e)}") |
|
return "" |
|
|
|
|
|
@st.cache_resource |
|
def load_model(hf_token): |
|
try: |
|
if hf_token: |
|
login(token=hf_token) |
|
else: |
|
st.error("π Authentication required!") |
|
return None, None |
|
|
|
tokenizer = AutoTokenizer.from_pretrained( |
|
MODEL_NAME, |
|
token=hf_token |
|
) |
|
model = AutoModelForCausalLM.from_pretrained( |
|
MODEL_NAME, |
|
device_map="auto", |
|
torch_dtype=torch.float16, |
|
token=hf_token |
|
) |
|
return model, tokenizer |
|
except Exception as e: |
|
st.error(f"π€ Model loading failed: {str(e)}") |
|
return None, None |
|
|
|
|
|
def generate_response(prompt, file_context): |
|
full_prompt = f"""Analyze this context: |
|
{file_context} |
|
|
|
Question: {prompt} |
|
Answer:""" |
|
|
|
streamer = TextIteratorStreamer( |
|
tokenizer, |
|
skip_prompt=True, |
|
skip_special_tokens=True |
|
) |
|
|
|
inputs = tokenizer( |
|
full_prompt, |
|
return_tensors="pt", |
|
max_length=4096, |
|
truncation=True |
|
).to(model.device) |
|
|
|
generation_kwargs = dict( |
|
inputs, |
|
streamer=streamer, |
|
max_new_tokens=1024, |
|
temperature=0.7, |
|
top_p=0.9, |
|
repetition_penalty=1.1, |
|
do_sample=True, |
|
use_cache=True |
|
) |
|
|
|
Thread(target=model.generate, kwargs=generation_kwargs).start() |
|
return streamer |
|
|
|
|
|
for message in st.session_state.messages: |
|
try: |
|
avatar = "π€" if message["role"] == "user" else "π€" |
|
with st.chat_message(message["role"], avatar=avatar): |
|
st.markdown(message["content"]) |
|
except: |
|
with st.chat_message(message["role"]): |
|
st.markdown(message["content"]) |
|
|
|
|
|
if prompt := st.chat_input("Ask your inspection question..."): |
|
if not hf_token: |
|
st.error("π Authentication required!") |
|
st.stop() |
|
|
|
|
|
if "model" not in st.session_state: |
|
st.session_state.model, st.session_state.tokenizer = load_model(hf_token) |
|
model = st.session_state.model |
|
tokenizer = st.session_state.tokenizer |
|
|
|
|
|
with st.chat_message("user", avatar="π€"): |
|
st.markdown(prompt) |
|
st.session_state.messages.append({"role": "user", "content": prompt}) |
|
|
|
|
|
file_context = process_file(uploaded_file) |
|
|
|
|
|
if model and tokenizer: |
|
try: |
|
with st.chat_message("assistant", avatar="π€"): |
|
streamer = generate_response(prompt, file_context) |
|
response_container = st.empty() |
|
full_response = "" |
|
|
|
for chunk in streamer: |
|
|
|
cleaned_chunk = chunk.replace("<think>", "").replace("</think>", "").strip() |
|
full_response += cleaned_chunk + " " |
|
|
|
|
|
response_container.markdown(full_response + "β", unsafe_allow_html=True) |
|
|
|
|
|
response_container.markdown(full_response) |
|
st.session_state.messages.append({"role": "assistant", "content": full_response}) |
|
|
|
except Exception as e: |
|
st.error(f"β‘ Generation error: {str(e)}") |
|
else: |
|
st.error("π€ Model not loaded!") |