Spaces:
Sleeping
Sleeping
import streamlit as st | |
from PyPDF2 import PdfReader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
import os | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from langchain.vectorstores import FAISS | |
from langchain_groq import ChatGroq | |
from langchain.chains.question_answering import load_qa_chain | |
from langchain.prompts import PromptTemplate | |
from dotenv import load_dotenv | |
import re | |
load_dotenv() | |
os.getenv("GROQ_API_KEY") | |
css_style = """ | |
<style> | |
.step-number { | |
font-size: 24px; | |
font-weight: bold; | |
} | |
.response-box { | |
padding: 20px; | |
background-color: #f8f9fa; | |
border-radius: 10px; | |
border-left: 5px solid #252850; | |
margin: 20px 0; | |
box-shadow: 0 2px 4px rgba(0,0,0,0.1); | |
} | |
.metadata-box { | |
padding: 20px; | |
background-color: #f0f2f6; | |
border-radius: 10px; | |
margin-bottom: 20px; | |
} | |
.custom-input { | |
font-size: 16px; | |
padding: 10px; | |
border-radius: 5px; | |
border: 1px solid #ccc; | |
} | |
.titulo-principal { | |
font-size: 24px; /* Reduce el tamaño del título */ | |
} | |
.boton-enviar { | |
margin-top: -5px; /* Ajusta la posición vertical del botón */ | |
vertical-align: middle; | |
} | |
</style> | |
""" | |
def eliminar_proceso_pensamiento(texto): | |
texto_limpio = re.sub(r'', '', texto, flags=re.DOTALL) | |
lineas = [line.strip() for line in texto_limpio.split('\n') if line.strip()] | |
return lineas[-1] if lineas else "Respuesta no disponible" | |
def get_pdf_text(pdf_docs): | |
text = "" | |
for pdf in pdf_docs: | |
pdf_reader = PdfReader(pdf) | |
for page in pdf_reader.pages: | |
text += page.extract_text() | |
return text | |
def get_text_chunks(text): | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=5000, chunk_overlap=500) | |
return text_splitter.split_text(text) | |
def get_vector_store(text_chunks): | |
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
return FAISS.from_texts(text_chunks, embedding=embeddings) | |
def get_conversational_chain(): | |
prompt_template = """ | |
Responde en español exclusivamente con la información solicitada usando el contexto. | |
Formato: Respuesta directa sin prefijos. Si no hay información, di "No disponible". | |
Contexto: | |
{context} | |
Pregunta: | |
{question} | |
Respuesta: | |
""" | |
model = ChatGroq( | |
temperature=0.2, | |
model_name="deepseek-r1-distill-llama-70b", | |
groq_api_key=os.getenv("GROQ_API_KEY") | |
) | |
return load_qa_chain(model, chain_type="stuff", | |
prompt=PromptTemplate(template=prompt_template, | |
input_variables=["context", "question"])) | |
def extract_metadata(vector_store): | |
metadata_questions = { | |
"title": "¿Cual es o podría ser el título del documento? Redacta una sola frase", | |
"entity": "¿A qué entidad u organización pertenece este documento?", | |
"date": "¿En qué fecha se implantará el contenido? Si no se detalla responde \"No se especifica\"" | |
} | |
metadata = {} | |
chain = get_conversational_chain() | |
for key, question in metadata_questions.items(): | |
docs = vector_store.similarity_search(question, k=2) | |
response = chain( | |
{"input_documents": docs, "question": question}, | |
return_only_outputs=True | |
) | |
clean_response = eliminar_proceso_pensamiento(response['output_text']) | |
metadata[key] = clean_response if clean_response else "No disponible" | |
return metadata | |
def mostrar_respuesta(texto): | |
with st.container(): | |
st.markdown(f'<div class="response-box">{texto}</div>', unsafe_allow_html=True) | |
def procesar_consulta(user_question): | |
if 'vector_store' not in st.session_state: | |
st.error("Por favor carga un documento primero") | |
return | |
chain = get_conversational_chain() | |
docs = st.session_state.vector_store.similarity_search(user_question) | |
with st.spinner("Analizando documento..."): | |
response = chain( | |
{"input_documents": docs, "question": user_question}, | |
return_only_outputs=True | |
) | |
respuesta_final = eliminar_proceso_pensamiento(response['output_text']) | |
mostrar_respuesta(respuesta_final) | |
def main(): | |
st.set_page_config(page_title="PDF Consultor 🔍", page_icon="🔍", layout="wide") | |
st.markdown(css_style, unsafe_allow_html=True) | |
# Título principal con estilo personalizado | |
st.markdown(f'<h1 class="titulo-principal">PDF Consultor 🔍</h1>', unsafe_allow_html=True) | |
# Inicializa estado de sesión | |
if 'documento_cargado' not in st.session_state: | |
st.session_state.documento_cargado = False | |
# Sidebar - Carga de documentos | |
with st.sidebar: | |
st.markdown('<p class="step-number">1 Subir archivos</p>', unsafe_allow_html=True) | |
pdf_docs = st.file_uploader( | |
"Subir PDF(s)", | |
accept_multiple_files=True, | |
type=["pdf"], | |
label_visibility="collapsed" | |
) | |
# Procesamiento automático al cargar documentos | |
if pdf_docs and not st.session_state.documento_cargado: | |
with st.spinner("Analizando documento..."): | |
try: | |
raw_text = get_pdf_text(pdf_docs) | |
text_chunks = get_text_chunks(raw_text) | |
vector_store = get_vector_store(text_chunks) | |
st.session_state.metadata = extract_metadata(vector_store) | |
st.session_state.vector_store = vector_store | |
st.session_state.documento_cargado = True | |
st.rerun() | |
except Exception as e: | |
st.error(f"Error procesando documento: {str(e)}") | |
# Mostrar metadatos | |
if 'metadata' in st.session_state: | |
st.markdown("---") | |
cols = st.columns(3) | |
campos = [ | |
("📄 Título", "title"), | |
("🏛️ Entidad", "entity"), | |
("📅 Fecha Implantación", "date") | |
] | |
for col, (icono, key) in zip(cols, campos): | |
with col: | |
st.markdown(f""" | |
<div class="metadata-box"> | |
<div class="metadata-title">{icono}</div> | |
{st.session_state.metadata[key]} | |
</div> | |
""", unsafe_allow_html=True) | |
st.markdown("---") | |
# Interfaz de consultas | |
if st.session_state.documento_cargado: | |
with st.form("consulta_form"): | |
col1, col2 = st.columns([5, 1]) | |
with col1: | |
user_question = st.text_input( | |
"Escribe tu pregunta:", | |
placeholder="Ej: ¿Qué normativa regula este proceso?", | |
label_visibility="collapsed" | |
) | |
with col2: | |
enviar = st.form_submit_button("Enviar ▶", class_="boton-enviar") | |
if user_question and enviar: | |
procesar_consulta(user_question) | |
else: | |
st.write("Por favor, sube un documento para continuar.") | |
if __name__ == "__main__": | |
main() | |