prawnikai / app.py
adowu's picture
Update app.py
514204a verified
raw
history blame
9.86 kB
import os
import re
import logging
import streamlit as st
import chromadb
from chromadb.utils import embedding_functions
from huggingface_hub import InferenceClient
from dotenv import load_dotenv
from typing import List, Dict, Tuple
# Konfiguracja logowania
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Ładowanie zmiennych środowiskowych
load_dotenv()
# Konfiguracja API
HF_TOKEN = os.getenv('HF_TOKEN')
MODEL_NAME = "Qwen/Qwen2.5-72B-Instruct"
# Konfiguracja bazy danych
DATABASE_DIR = "chroma_db"
# Konfiguracja modelu embeddings
EMBEDDING_MODEL = "sentence-transformers/paraphrase-multilingual-mpnet-base-v2"
# System prompt
SYSTEM_PROMPT = """Jesteś asystentem prawniczym specjalizującym się w polskim prawie.
Twoje odpowiedzi opierają się na aktualnych przepisach prawnych.
Zawsze cytuj konkretne artykuły i paragrafy z odpowiednich ustaw."""
class KodeksProcessor:
def __init__(self):
logging.info("Inicjalizacja klienta bazy danych...")
self.client = chromadb.PersistentClient(path=DATABASE_DIR)
try:
self.collection = self.client.get_collection("kodeksy")
logging.info("Pobrano istniejącą kolekcję 'kodeksy'.")
except:
self.collection = self.client.create_collection(
name="kodeksy",
embedding_function=embedding_functions.SentenceTransformerEmbeddingFunction(
model_name=EMBEDDING_MODEL
)
)
logging.info("Utworzono nową kolekcję 'kodeksy'.")
def extract_metadata(self, text: str) -> Dict:
metadata = {}
dz_u_match = re.search(r'Dz\.U\.(\d{4})\.(\d+)\.(\d+)', text)
if dz_u_match:
metadata['dz_u'] = f"Dz.U.{dz_u_match.group(1)}.{dz_u_match.group(2)}.{dz_u_match.group(3)}"
metadata['rok'] = dz_u_match.group(1)
nazwa_match = re.search(r'USTAWA\s+z dnia(.*?)\n(.*?)\n', text)
if nazwa_match:
metadata['data_ustawy'] = nazwa_match.group(1).strip()
metadata['nazwa'] = nazwa_match.group(2).strip()
logging.info("Wydobyto metadane: %s", metadata)
return metadata
def split_header_and_content(self, text: str) -> Tuple[str, str]:
parts = text.split("USTAWA", 1)
if len(parts) > 1:
return parts[0], "USTAWA" + parts[1]
return "", text
def process_article(self, article_text: str) -> Dict:
art_num_match = re.match(r'Art\.\s*(\d+)', article_text)
article_num = art_num_match.group(1) if art_num_match else ""
paragraphs = re.findall(r'§\s*(\d+)\.\s*(.*?)(?=§\s*\d+|Art\.\s*\d+|$)', article_text, re.DOTALL)
if not paragraphs:
return {
"article_num": article_num,
"content": article_text.strip(),
"has_paragraphs": False
}
return {
"article_num": article_num,
"paragraphs": paragraphs,
"has_paragraphs": True
}
def split_into_chunks(self, text: str, metadata: Dict) -> List[Dict]:
chunks = []
articles = re.split(r'(Art\.\s*\d+)', text) # Podział na artykuły
for i in range(1, len(articles), 2): # Przechodzimy przez artykuły
article_title = articles[i].strip()
article_content = articles[i + 1].strip() if i + 1 < len(articles) else ""
processed_article = self.process_article(article_title + " " + article_content)
chunk_metadata = {
**metadata,
"article": processed_article["article_num"]
}
if processed_article["has_paragraphs"]:
for par_num, par_content in processed_article["paragraphs"]:
chunks.append({
"text": f"{article_title} §{par_num}. {par_content.strip()}",
"metadata": {**chunk_metadata, "paragraph": par_num}
})
else:
chunks.append({
"text": processed_article["content"],
"metadata": chunk_metadata
})
logging.info("Podzielono tekst na %d chunków.", len(chunks))
return chunks
def process_file(self, filepath: str) -> None:
logging.info("Przetwarzanie pliku: %s", filepath)
with open(filepath, 'r', encoding='utf-8') as file:
content = file.read()
header, main_content = self.split_header_and_content(content)
metadata = self.extract_metadata(main_content)
metadata['filename'] = os.path.basename(filepath)
chunks = self.split_into_chunks(main_content, metadata)
if chunks: # Sprawdzenie, czy są jakieś chunk'i do dodania
for i, chunk in enumerate(chunks):
self.collection.add(
documents=[chunk["text"]],
metadatas=[chunk["metadata"]],
ids=[f"{metadata['filename']}_{chunk['metadata']['article']}_{i}"]
)
logging.info("Dodano chunk: %s", chunk["text"]) # Logowanie dodawanych chunków
else:
logging.warning("Brak chunków do dodania z pliku: %s", filepath) # Logowanie braku chunków
logging.info("Dodano %d chunków z pliku %s", len(chunks), metadata['filename'])
def process_all_files(self, directory: str) -> None:
logging.info("Rozpoczęcie przetwarzania wszystkich plików w katalogu: %s", directory)
for filename in os.listdir(directory):
if filename.endswith('.txt'):
filepath = os.path.join(directory, filename)
logging.info("Przetwarzanie pliku: %s", filepath) # Logowanie przetwarzania pliku
self.process_file(filepath)
logging.info("Zakończono przetwarzanie plików.")
def search(self, query: str, n_results: int = 3) -> Dict:
logging.info("Wyszukiwanie w bazie danych dla zapytania: %s", query)
results = self.collection.query(
query_texts=[query],
n_results=n_results
)
logging.info("Znaleziono %d wyników dla zapytania: %s", len(results['documents'][0]), query)
return results
class Chatbot:
def __init__(self):
self.client = InferenceClient(api_key=HF_TOKEN)
self.conversation_history = [
{"role": "system", "content": SYSTEM_PROMPT}
]
def generate_context(self, relevant_chunks: List[Dict]) -> str:
context = "Kontekst z przepisów prawnych:\n\n"
for chunk in relevant_chunks:
context += f"{chunk['text']}\n\n"
return context
def get_response(self, user_input: str, context: str) -> str:
messages = self.conversation_history + [
{"role": "user", "content": f"Kontekst: {context}\n\nPytanie: {user_input}"}
]
response = ""
stream = self.client.chat.completions.create(
model=MODEL_NAME,
messages=messages,
temperature=0.5,
max_tokens=8192,
top_p=0.7,
stream=True
)
for chunk in stream:
content = chunk.choices[0].delta.content
if content:
response += content
yield content
self.conversation_history.append({"role": "user", "content": user_input})
self.conversation_history.append({"role": "assistant", "content": response})
def clear_history(self):
self.conversation_history = [
{"role": "system", "content": SYSTEM_PROMPT}
]
def initialize_session_state():
if 'chatbot' not in st.session_state:
st.session_state.chatbot = Chatbot()
if 'messages' not in st.session_state:
st.session_state.messages = []
def main():
st.title("Asystent Prawny")
initialize_session_state()
# Inicjalizacja bazy danych (jeśli potrzebna)
if 'db_initialized' not in st.session_state:
with st.spinner("Inicjalizacja bazy danych..."):
processor = KodeksProcessor()
if not os.path.exists("chroma_db"):
processor.process_all_files("data/kodeksy")
st.session_state.db_initialized = True
# Przycisk do czyszczenia historii
if st.sidebar.button("Wyczyść historię"):
st.session_state.chatbot.clear_history()
st.session_state.messages = []
st.rerun()
# Wyświetlenie historii czatu
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# Input użytkownika
if prompt := st.chat_input("Zadaj pytanie dotyczące prawa..."):
# Dodaj pytanie użytkownika do historii
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
# Wyszukaj odpowiednie fragmenty w bazie
processor = KodeksProcessor()
relevant_chunks = processor.search(prompt)
# Wygeneruj odpowiedź
with st.chat_message("assistant"):
message_placeholder = st.empty()
full_response = ""
context = st.session_state.chatbot.generate_context(
[{"text": doc} for doc in relevant_chunks['documents'][0]]
)
for response_chunk in st.session_state.chatbot.get_response(prompt, context):
full_response += response_chunk
message_placeholder.markdown(full_response + "▌")
message_placeholder.markdown(full_response)
# Dodaj odpowiedź asystenta do historii
st.session_state.messages.append({"role": "assistant", "content": full_response})
if __name__ == "__main__":
main()