Spaces:
Sleeping
Sleeping
File size: 6,226 Bytes
08557bb eb5fde5 08557bb eb5fde5 08557bb eb5fde5 08557bb eb5fde5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
import os
import streamlit as st
import pdfplumber
from concurrent.futures import ThreadPoolExecutor
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from transformers import pipeline, M2M100ForConditionalGeneration, AutoTokenizer
# Set up the page configuration
st.set_page_config(page_title="RAG-based PDF Chat", layout="centered", page_icon="π")
# Load the summarization pipeline model
@st.cache_resource
def load_summarization_pipeline():
summarizer = pipeline("summarization", model="facebook/bart-large-cnn")
return summarizer
summarizer = load_summarization_pipeline()
# Load the translation model
@st.cache_resource
def load_translation_model():
model = M2M100ForConditionalGeneration.from_pretrained("alirezamsh/small100")
tokenizer = AutoTokenizer.from_pretrained("alirezamsh/small100")
return model, tokenizer
translation_model, translation_tokenizer = load_translation_model()
# Define available languages for translation
LANGUAGES = {
"English": "en",
"French": "fr",
"Spanish": "es",
"Chinese": "zh",
"Hindi": "hi",
"Urdu": "ur",
}
# Split text into manageable chunks
@st.cache_data
def get_text_chunks(text):
text_splitter = RecursiveCharacterTextSplitter(chunk_size=10000, chunk_overlap=1000)
chunks = text_splitter.split_text(text)
return chunks
# Initialize embedding function
embedding_function = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
# Create a FAISS vector store with embeddings
@st.cache_resource
def load_or_create_vector_store(text_chunks):
if not text_chunks:
st.error("No valid text chunks found to create a vector store. Please check your PDF files.")
return None
vector_store = FAISS.from_texts(text_chunks, embedding=embedding_function)
return vector_store
# Helper function to process a single PDF
def process_single_pdf(file_path):
text = ""
try:
with pdfplumber.open(file_path) as pdf:
for page in pdf.pages:
page_text = page.extract_text()
if page_text:
text += page_text
except Exception as e:
st.error(f"Failed to read PDF: {file_path} - {e}")
return text
# Load PDFs with progress display
def load_pdfs_with_progress(folder_path):
all_text = ""
pdf_files = [os.path.join(folder_path, filename) for filename in os.listdir(folder_path) if filename.endswith('.pdf')]
num_files = len(pdf_files)
if num_files == 0:
st.error("No PDF files found in the specified folder.")
st.session_state['vector_store'] = None
st.session_state['loading'] = False
return
st.markdown("### Loading data...")
progress_bar = st.progress(0)
status_text = st.empty()
processed_count = 0
for file_path in pdf_files:
result = process_single_pdf(file_path)
all_text += result
processed_count += 1
progress_percentage = int((processed_count / num_files) * 100)
progress_bar.progress(processed_count / num_files)
status_text.text(f"Loading documents: {progress_percentage}% completed")
progress_bar.empty()
status_text.text("Document loading completed!")
if all_text:
text_chunks = get_text_chunks(all_text)
vector_store = load_or_create_vector_store(text_chunks)
st.session_state['vector_store'] = vector_store
else:
st.session_state['vector_store'] = None
st.session_state['loading'] = False
# Generate summary based on retrieved text
def generate_summary_with_huggingface(query, retrieved_text):
summarization_input = f"{query} Related information:{retrieved_text}"
max_input_length = 1024
summarization_input = summarization_input[:max_input_length]
summary = summarizer(summarization_input, max_length=500, min_length=50, do_sample=False)
return summary[0]["summary_text"]
# Generate response for user query
def user_input(user_question):
vector_store = st.session_state.get('vector_store')
if vector_store is None:
return "The app is still loading documents or no documents were successfully loaded."
docs = vector_store.similarity_search(user_question)
context_text = " ".join([doc.page_content for doc in docs])
return generate_summary_with_huggingface(user_question, context_text)
# Translate text to selected language
def translate_text(text, target_lang):
translation_tokenizer.tgt_lang = target_lang
encoded_text = translation_tokenizer(text, return_tensors="pt")
generated_tokens = translation_model.generate(**encoded_text)
translated_text = translation_tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)[0]
return translated_text
# Main function to run the Streamlit app
def main():
st.markdown(
"""
<h1 style="font-size:30px; text-align: center;">
π JusticeCompass: Your AI-Powered Legal Navigator for Swift, Accurate Guidance.
</h1>
""",
unsafe_allow_html=True
)
if 'loading' not in st.session_state or st.session_state['loading']:
st.session_state['loading'] = True
load_pdfs_with_progress('documents1')
user_question = st.text_input("Ask a Question:", placeholder="Type your question here...")
# Display language selection dropdown
selected_language = st.selectbox("Select output language:", list(LANGUAGES.keys()))
if st.session_state.get('loading', True):
st.info("The app is loading documents in the background. You can type your question now and submit once loading is complete.")
# Only display "Get Response" button after user enters a question
if user_question:
if st.button("Get Response"):
with st.spinner("Generating response..."):
answer = user_input(user_question)
target_lang_code = LANGUAGES[selected_language]
translated_answer = translate_text(answer, target_lang_code)
st.markdown(f"**π€ AI ({selected_language}):** {translated_answer}")
if __name__ == "__main__":
main()
|