Spaces:
Sleeping
Sleeping
from openai import OpenAI | |
import streamlit as st | |
import openai | |
import os | |
import time | |
#from roles import * | |
import io | |
from pypdf import PdfReader | |
#from langchain_community.document_loaders import PyPDFLoader | |
import tempfile | |
from RAG import load_graph,text_splitter | |
import torch | |
from sentence_transformers import SentenceTransformer | |
import torch | |
import uuid | |
import re | |
import requests | |
from cloudhands import CloudHandsPayment | |
from database_center import db_transaction | |
device='cuda' if torch.cuda.is_available() else 'cpu' | |
import os | |
# Explicitly override cache paths (matches Dockerfile ENV) | |
os.environ["HF_HOME"] = "/app/hf_cache" | |
os.environ["TRANSFORMERS_CACHE"] = "/app/hf_cache" | |
encoder = SentenceTransformer( | |
"sentence-transformers/all-MiniLM-L6-v2", | |
cache_folder="/app/hf_cache" | |
).to(device) | |
global chat_messages | |
chat_messages=[] | |
outputs=[] | |
# Set your OpenAI API key here or use environment variable | |
payment_key=os.getenv('Payment_Key') | |
def complete_payment(): | |
if st.session_state.token : | |
chPay=st.session_state.chPay | |
try: | |
result = chPay.charge( | |
charge=0.5, | |
event_name="Sample cloudhands charge", | |
) | |
st.success(f"You payment is succeeded") | |
st.session_state.transaction_id=result.transaction_id | |
st.session_state.db_transaction.add({ | |
'id':str(uuid.uuid4()), | |
'app':'app_title', | |
'transaction-id':result.transaction_id, | |
'price':0.5 | |
}) | |
except Exception as e: | |
st.error(f"Charge failed: {e}") | |
else: | |
st.error('Please generate your Tokens.') | |
def pay(): | |
chPay = st.session_state.chPay | |
# Step 1: Show auth link only once | |
auth_url = chPay.get_authorization_url() | |
st.link_button("Authenticate", url=auth_url) | |
# Step 2: User pastes the code | |
code = st.text_input("Place your code") | |
if st.button("Exchange Code"): | |
try: | |
token = chPay.exchange_code_for_token(code) | |
st.session_state.token = token | |
st.success("Code exchanged successfully! Token stored.") | |
except Exception as e: | |
st.error(f"Failed: {e}") | |
def embed_document(file_text): | |
chunks=text_splitter.split_text(file_text) | |
#embedded=[] | |
embeddings=st.session_state.encoder.encode(chunks, convert_to_tensor=True, show_progress_bar=True) | |
embeddings = embeddings.cpu().numpy() | |
#embeddings=torch.concatenate(embedded).cpu().numpy() | |
#embeddings=embeddings.cpu().numpy() | |
#print(embedded) | |
return embeddings,chunks | |
def embed_sentence(text): | |
embeddings = st.session_state.encoder.encode([text], convert_to_tensor=True, show_progress_bar=True) | |
return embeddings.cpu().tolist() | |
def stream_response(): | |
for char in extract_output(st.session_state.response).split(" "): | |
yield char+" " | |
time.sleep(0.1) # Simulate a delay | |
def stream_thoughts(): | |
for char in extract_thinking(st.session_state.response).split(" "): | |
yield char+" " | |
time.sleep(0.1) # Simulate a delay | |
def get_text(uploaded_file): | |
# Save uploaded file to a temporary file | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file: | |
tmp_file.write(uploaded_file.read()) | |
tmp_path = tmp_file.name | |
loader = PyPDFLoader(tmp_path) | |
pages = loader.load() | |
text = "\n".join([page.page_content for page in pages]) | |
return text | |
def respond_chat(text): | |
url="https://8000-01k3gce7dwxsk16d7dd40n75xb.cloudspaces.litng.ai/predict" | |
payload = { "user_prompt":text} | |
headers = {"Content-Type": "application/json"} | |
response = requests.post(url, data=payload) | |
if response.status_code == 200: | |
complete_payment() | |
if st.session_state.transaction_id: | |
return response.json()['output'][0] | |
def extract_thinking(text: str) -> str: | |
""" | |
Extracts content inside <thinking>...</thinking> tags. | |
Returns the first match or an empty string if not found. | |
""" | |
match = re.search(r"<thinking>(.*?)</thinking>", text, re.DOTALL | re.IGNORECASE) | |
return match.group(1).strip() if match else "" | |
def extract_output(text: str) -> str: | |
""" | |
Extracts content inside <output>...</output> tags. | |
Returns the first match or an empty string if not found. | |
""" | |
match = re.search(r"<output>(.*?)</output>", text, re.DOTALL | re.IGNORECASE) | |
return match.group(1).strip() if match else "" | |
# Dropdown for model selection | |
if 'doc_flag' not in st.session_state: | |
st.session_state.doc_flag = False | |
if 'flag' not in st.session_state: | |
st.session_state.flag = False | |
if 'encoder' not in st.session_state: | |
st.session_state.encoder = encoder | |
if 'file_text' not in st.session_state: | |
st.session_state.file_text = "" | |
if "chPay" not in st.session_state: | |
st.session_state.chPay = CloudHandsPayment( | |
author_key=payment_key | |
) | |
if "token" not in st.session_state: | |
st.session_state.token = None | |
if 'db_transaction' not in st.session_state: | |
st.session_state.db_transaction = db_transaction | |
if 'embeddings' not in st.session_state: | |
st.session_state.embeddings = None | |
if 'chunks' not in st.session_state: | |
st.session_state.chunks = None | |
if 'response' not in st.session_state: | |
st.session_state.response='' | |
# Sidebar document upload | |
st.sidebar.title("Uploading your document π") | |
uploaded_file = st.sidebar.file_uploader( | |
"Upload your document π", | |
type=["pdf"], | |
label_visibility="collapsed" | |
) | |
upload_button=st.sidebar.button("Upload Document") | |
uploaded_file = st.sidebar.file_uploader( | |
"Upload your PDF", | |
type=["pdf"], | |
key="pdf_uploader", | |
) | |
def extract_pdf_text_from_bytes(file_bytes: bytes) -> str: | |
reader = PdfReader(io.BytesIO(file_bytes)) | |
pages_text = [] | |
for p in reader.pages: | |
txt = p.extract_text() or "" | |
pages_text.append(txt) | |
return "\n".join(pages_text) | |
if uploaded_file is not None: | |
with st.spinner("Reading & embedding your PDF..."): | |
# Important: read bytes once on this rerun | |
file_bytes = uploaded_file.read() | |
# (Optional) if you ever re-use uploaded_file later, do: uploaded_file.seek(0) | |
# Extract text purely in-memory (no /tmp files, no PyPDFLoader) | |
file_text = extract_pdf_text_from_bytes(file_bytes) | |
# Persist to session state | |
st.session_state.file_text = file_text | |
# Build embeddings (uses your existing text_splitter + encoder) | |
chunks = text_splitter.split_text(file_text) | |
embeddings = st.session_state.encoder.encode( | |
chunks, convert_to_tensor=True, show_progress_bar=True | |
).cpu().numpy() | |
st.session_state.embeddings = embeddings | |
st.session_state.chunks = chunks | |
st.session_state.doc_flag = True | |
st.success(f"Loaded: {uploaded_file.name} β {len(st.session_state.chunks)} chunks") | |
st.sidebar.write("Before making the your faviorate charecter sound, authenicate your code") | |
Authenication=st.sidebar.button('Authenicate') | |
if Authenication: | |
pay() | |
#subject=st.pills('Select your subject',list(roles.keys()),selection_mode='single') | |
st.title("Plaito") | |
st.write("Chat with our reasoning model and ask your questions. The model show you it's chain of thoughts and final answer.") | |
text=st.text_area("Ask your question:", height=100) | |
document_button=st.pills("Ask based on Documents", ['search'], selection_mode="single") | |
generate_button=st.button("Generate Response") | |
if generate_button: | |
with st.spinner("Generating code..."): | |
try: | |
if document_button: | |
graph=load_graph(st.session_state.embeddings,st.session_state.chunks) | |
graph=graph.compile() | |
initial_state = { | |
"embedded_query":embed_sentence(text), | |
"knowledge": [], | |
"summary": "", | |
"final_response": None,} | |
final_state = graph.invoke(initial_state) | |
updated_text = f""" | |
Then respond to the client. Also follow the retrived information in the ##Summary section. | |
## Instructions: | |
{text} | |
## Summary: | |
{final_state['summary']} | |
""" | |
if st.session_state.db_transaction: | |
response=respond_chat(updated_text) | |
st.session_state.response=response | |
else: | |
if st.session_state.db_transaction: | |
response=respond_chat(text) | |
st.session_state.response=response | |
except Exception as e: | |
st.error(f"Error during code generation: {e}") | |
col1,col2=st.columns([2,1]) | |
with col2: | |
st.write("### Thought Process") | |
st.write_stream(stream_thoughts()) | |
with col1: | |
st.write("### Response") | |
st.write_stream(stream_response()) | |