from openai import OpenAI import streamlit as st import openai import os import time #from roles import * import io from pypdf import PdfReader #from langchain_community.document_loaders import PyPDFLoader import tempfile from RAG import load_graph,text_splitter import torch from sentence_transformers import SentenceTransformer import torch import uuid import re import requests from cloudhands import CloudHandsPayment from database_center import db_transaction device='cuda' if torch.cuda.is_available() else 'cpu' import os # Explicitly override cache paths (matches Dockerfile ENV) os.environ["HF_HOME"] = "/app/hf_cache" os.environ["TRANSFORMERS_CACHE"] = "/app/hf_cache" encoder = SentenceTransformer( "sentence-transformers/all-MiniLM-L6-v2", cache_folder="/app/hf_cache" ).to(device) global chat_messages chat_messages=[] outputs=[] # Set your OpenAI API key here or use environment variable payment_key=os.getenv('Payment_Key') def complete_payment(): if st.session_state.token : chPay=st.session_state.chPay try: result = chPay.charge( charge=0.5, event_name="Sample cloudhands charge", ) st.success(f"You payment is succeeded") st.session_state.transaction_id=result.transaction_id st.session_state.db_transaction.add({ 'id':str(uuid.uuid4()), 'app':'app_title', 'transaction-id':result.transaction_id, 'price':0.5 }) except Exception as e: st.error(f"Charge failed: {e}") else: st.error('Please generate your Tokens.') @st.dialog("Payment link") def pay(): chPay = st.session_state.chPay # Step 1: Show auth link only once auth_url = chPay.get_authorization_url() st.link_button("Authenticate", url=auth_url) # Step 2: User pastes the code code = st.text_input("Place your code") if st.button("Exchange Code"): try: token = chPay.exchange_code_for_token(code) st.session_state.token = token st.success("Code exchanged successfully! Token stored.") except Exception as e: st.error(f"Failed: {e}") def embed_document(file_text): chunks=text_splitter.split_text(file_text) #embedded=[] embeddings=st.session_state.encoder.encode(chunks, convert_to_tensor=True, show_progress_bar=True) embeddings = embeddings.cpu().numpy() #embeddings=torch.concatenate(embedded).cpu().numpy() #embeddings=embeddings.cpu().numpy() #print(embedded) return embeddings,chunks def embed_sentence(text): embeddings = st.session_state.encoder.encode([text], convert_to_tensor=True, show_progress_bar=True) return embeddings.cpu().tolist() def stream_response(): for char in extract_output(st.session_state.response).split(" "): yield char+" " time.sleep(0.1) # Simulate a delay def stream_thoughts(): for char in extract_thinking(st.session_state.response).split(" "): yield char+" " time.sleep(0.1) # Simulate a delay def get_text(uploaded_file): # Save uploaded file to a temporary file with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp_file: tmp_file.write(uploaded_file.read()) tmp_path = tmp_file.name loader = PyPDFLoader(tmp_path) pages = loader.load() text = "\n".join([page.page_content for page in pages]) return text def respond_chat(text): url="https://8000-01k3gce7dwxsk16d7dd40n75xb.cloudspaces.litng.ai/predict" payload = { "user_prompt":text} headers = {"Content-Type": "application/json"} response = requests.post(url, data=payload) if response.status_code == 200: complete_payment() if st.session_state.transaction_id: return response.json()['output'][0] def extract_thinking(text: str) -> str: """ Extracts content inside ... tags. Returns the first match or an empty string if not found. """ match = re.search(r"(.*?)", text, re.DOTALL | re.IGNORECASE) return match.group(1).strip() if match else "" def extract_output(text: str) -> str: """ Extracts content inside ... tags. Returns the first match or an empty string if not found. """ match = re.search(r"(.*?)", text, re.DOTALL | re.IGNORECASE) return match.group(1).strip() if match else "" # Dropdown for model selection if 'doc_flag' not in st.session_state: st.session_state.doc_flag = False if 'flag' not in st.session_state: st.session_state.flag = False if 'encoder' not in st.session_state: st.session_state.encoder = encoder if 'file_text' not in st.session_state: st.session_state.file_text = "" if "chPay" not in st.session_state: st.session_state.chPay = CloudHandsPayment( author_key=payment_key ) if "token" not in st.session_state: st.session_state.token = None if 'db_transaction' not in st.session_state: st.session_state.db_transaction = db_transaction if 'embeddings' not in st.session_state: st.session_state.embeddings = None if 'chunks' not in st.session_state: st.session_state.chunks = None if 'response' not in st.session_state: st.session_state.response='' # Sidebar document upload uploaded_file = st.sidebar.file_uploader( "Upload your PDF", type=["pdf"], key="pdf_uploader", ) upload_button=st.sidebar.button("Uploading your document 📄") def extract_pdf_text_from_bytes(file_bytes: bytes) -> str: reader = PdfReader(io.BytesIO(file_bytes)) pages_text = [] for p in reader.pages: txt = p.extract_text() or "" pages_text.append(txt) return "\n".join(pages_text) if upload_button: if uploaded_file is not None: with st.spinner("Reading & embedding your PDF..."): # Important: read bytes once on this rerun file_bytes = uploaded_file.read() # (Optional) if you ever re-use uploaded_file later, do: uploaded_file.seek(0) # Extract text purely in-memory (no /tmp files, no PyPDFLoader) file_text = extract_pdf_text_from_bytes(file_bytes) # Persist to session state st.session_state.file_text = file_text # Build embeddings (uses your existing text_splitter + encoder) chunks = text_splitter.split_text(file_text) embeddings = st.session_state.encoder.encode( chunks, convert_to_tensor=True, show_progress_bar=True ).cpu().numpy() st.session_state.embeddings = embeddings st.session_state.chunks = chunks st.session_state.doc_flag = True st.success(f"Loaded: {uploaded_file.name} — {len(st.session_state.chunks)} chunks") st.sidebar.write("Before making the your faviorate charecter sound, authenicate your code") Authenication=st.sidebar.button('Authenicate') if Authenication: pay() #subject=st.pills('Select your subject',list(roles.keys()),selection_mode='single') st.title("Plaito") st.write("Chat with our reasoning model and ask your questions. The model show you it's chain of thoughts and final answer.") text=st.text_area("Ask your question:", height=100) document_button=st.pills("Ask based on Documents", ['search'], selection_mode="single") generate_button=st.button("Generate Response") if generate_button: with st.spinner("Generating code..."): try: if document_button: graph=load_graph(st.session_state.embeddings,st.session_state.chunks) graph=graph.compile() initial_state = { "embedded_query":embed_sentence(text), "knowledge": [], "summary": "", "final_response": None,} final_state = graph.invoke(initial_state) updated_text = f""" Then respond to the client. Also follow the retrived information in the ##Summary section. ## Instructions: {text} ## Summary: {final_state['summary']} """ if st.session_state.db_transaction: response=respond_chat(updated_text) st.session_state.response=response else: if st.session_state.db_transaction: response=respond_chat(text) st.session_state.response=response except Exception as e: st.error(f"Error during code generation: {e}") col1,col2=st.columns([2,1]) with col2: st.write("### Thought Process") st.write_stream(stream_thoughts()) with col1: st.write("### Response") st.write_stream(stream_response())