PLAITO / src /streamlit_app.py
alibidaran's picture
Update src/streamlit_app.py
c630b43 verified
raw
history blame
8.98 kB
from openai import OpenAI
import streamlit as st
import openai
import os
import time
#from roles import *
import io
from pypdf import PdfReader
#from langchain_community.document_loaders import PyPDFLoader
import tempfile
from RAG import load_graph,text_splitter
import torch
from sentence_transformers import SentenceTransformer
import torch
import uuid
import re
import requests
from cloudhands import CloudHandsPayment
from database_center import db_transaction
device='cuda' if torch.cuda.is_available() else 'cpu'
import os
# Explicitly override cache paths (matches Dockerfile ENV)
os.environ["HF_HOME"] = "/app/hf_cache"
os.environ["TRANSFORMERS_CACHE"] = "/app/hf_cache"
encoder = SentenceTransformer(
"sentence-transformers/all-MiniLM-L6-v2",
cache_folder="/app/hf_cache"
).to(device)
global chat_messages
chat_messages=[]
outputs=[]
# Set your OpenAI API key here or use environment variable
payment_key=os.getenv('Payment_Key')
def complete_payment():
if st.session_state.token :
chPay=st.session_state.chPay
try:
result = chPay.charge(
charge=0.5,
event_name="Sample cloudhands charge",
)
st.success(f"You payment is succeeded")
st.session_state.transaction_id=result.transaction_id
st.session_state.db_transaction.add({
'id':str(uuid.uuid4()),
'app':'app_title',
'transaction-id':result.transaction_id,
'price':0.5
})
except Exception as e:
st.error(f"Charge failed: {e}")
else:
st.error('Please generate your Tokens.')
@st.dialog("Payment link")
def pay():
chPay = st.session_state.chPay
# Step 1: Show auth link only once
auth_url = chPay.get_authorization_url()
st.link_button("Authenticate", url=auth_url)
# Step 2: User pastes the code
code = st.text_input("Place your code")
if st.button("Exchange Code"):
try:
token = chPay.exchange_code_for_token(code)
st.session_state.token = token
st.success("Code exchanged successfully! Token stored.")
except Exception as e:
st.error(f"Failed: {e}")
def embed_document(file_text):
chunks=text_splitter.split_text(file_text)
#embedded=[]
embeddings=st.session_state.encoder.encode(chunks, convert_to_tensor=True, show_progress_bar=True)
embeddings = embeddings.cpu().numpy()
#embeddings=torch.concatenate(embedded).cpu().numpy()
#embeddings=embeddings.cpu().numpy()
#print(embedded)
return embeddings,chunks
def embed_sentence(text):
embeddings = st.session_state.encoder.encode([text], convert_to_tensor=True, show_progress_bar=True)
return embeddings.cpu().tolist()
def stream_response():
for char in extract_output(st.session_state.response).split(" "):
yield char+" "
time.sleep(0.1) # Simulate a delay
def stream_thoughts():
for char in extract_thinking(st.session_state.response).split(" "):
yield char+" "
time.sleep(0.1) # Simulate a delay
def get_text(uploaded_file):
# Save uploaded file to a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file:
tmp_file.write(uploaded_file.read())
tmp_path = tmp_file.name
loader = PyPDFLoader(tmp_path)
pages = loader.load()
text = "\n".join([page.page_content for page in pages])
return text
def respond_chat(text):
url="https://8000-01k3gce7dwxsk16d7dd40n75xb.cloudspaces.litng.ai/predict"
payload = { "user_prompt":text}
headers = {"Content-Type": "application/json"}
response = requests.post(url, data=payload)
if response.status_code == 200:
complete_payment()
if st.session_state.transaction_id:
return response.json()['output'][0]
def extract_thinking(text: str) -> str:
"""
Extracts content inside <thinking>...</thinking> tags.
Returns the first match or an empty string if not found.
"""
match = re.search(r"<thinking>(.*?)</thinking>", text, re.DOTALL | re.IGNORECASE)
return match.group(1).strip() if match else ""
def extract_output(text: str) -> str:
"""
Extracts content inside <output>...</output> tags.
Returns the first match or an empty string if not found.
"""
match = re.search(r"<output>(.*?)</output>", text, re.DOTALL | re.IGNORECASE)
return match.group(1).strip() if match else ""
# Dropdown for model selection
if 'doc_flag' not in st.session_state:
st.session_state.doc_flag = False
if 'flag' not in st.session_state:
st.session_state.flag = False
if 'encoder' not in st.session_state:
st.session_state.encoder = encoder
if 'file_text' not in st.session_state:
st.session_state.file_text = ""
if "chPay" not in st.session_state:
st.session_state.chPay = CloudHandsPayment(
author_key=payment_key
)
if "token" not in st.session_state:
st.session_state.token = None
if 'db_transaction' not in st.session_state:
st.session_state.db_transaction = db_transaction
if 'embeddings' not in st.session_state:
st.session_state.embeddings = None
if 'chunks' not in st.session_state:
st.session_state.chunks = None
if 'response' not in st.session_state:
st.session_state.response=''
# Sidebar document upload
st.sidebar.title("Uploading your document πŸ“„")
uploaded_file = st.sidebar.file_uploader(
"Upload your document πŸ“„",
type=["pdf"],
label_visibility="collapsed"
)
upload_button=st.sidebar.button("Upload Document")
uploaded_file = st.sidebar.file_uploader(
"Upload your PDF",
type=["pdf"],
key="pdf_uploader",
)
def extract_pdf_text_from_bytes(file_bytes: bytes) -> str:
reader = PdfReader(io.BytesIO(file_bytes))
pages_text = []
for p in reader.pages:
txt = p.extract_text() or ""
pages_text.append(txt)
return "\n".join(pages_text)
if uploaded_file is not None:
with st.spinner("Reading & embedding your PDF..."):
# Important: read bytes once on this rerun
file_bytes = uploaded_file.read()
# (Optional) if you ever re-use uploaded_file later, do: uploaded_file.seek(0)
# Extract text purely in-memory (no /tmp files, no PyPDFLoader)
file_text = extract_pdf_text_from_bytes(file_bytes)
# Persist to session state
st.session_state.file_text = file_text
# Build embeddings (uses your existing text_splitter + encoder)
chunks = text_splitter.split_text(file_text)
embeddings = st.session_state.encoder.encode(
chunks, convert_to_tensor=True, show_progress_bar=True
).cpu().numpy()
st.session_state.embeddings = embeddings
st.session_state.chunks = chunks
st.session_state.doc_flag = True
st.success(f"Loaded: {uploaded_file.name} β€” {len(st.session_state.chunks)} chunks")
st.sidebar.write("Before making the your faviorate charecter sound, authenicate your code")
Authenication=st.sidebar.button('Authenicate')
if Authenication:
pay()
#subject=st.pills('Select your subject',list(roles.keys()),selection_mode='single')
st.title("Plaito")
st.write("Chat with our reasoning model and ask your questions. The model show you it's chain of thoughts and final answer.")
text=st.text_area("Ask your question:", height=100)
document_button=st.pills("Ask based on Documents", ['search'], selection_mode="single")
generate_button=st.button("Generate Response")
if generate_button:
with st.spinner("Generating code..."):
try:
if document_button:
graph=load_graph(st.session_state.embeddings,st.session_state.chunks)
graph=graph.compile()
initial_state = {
"embedded_query":embed_sentence(text),
"knowledge": [],
"summary": "",
"final_response": None,}
final_state = graph.invoke(initial_state)
updated_text = f"""
Then respond to the client. Also follow the retrived information in the ##Summary section.
## Instructions:
{text}
## Summary:
{final_state['summary']}
"""
if st.session_state.db_transaction:
response=respond_chat(updated_text)
st.session_state.response=response
else:
if st.session_state.db_transaction:
response=respond_chat(text)
st.session_state.response=response
except Exception as e:
st.error(f"Error during code generation: {e}")
col1,col2=st.columns([2,1])
with col2:
st.write("### Thought Process")
st.write_stream(stream_thoughts())
with col1:
st.write("### Response")
st.write_stream(stream_response())