|
import os |
|
import re |
|
import logging |
|
import streamlit as st |
|
import chromadb |
|
from chromadb.utils import embedding_functions |
|
from huggingface_hub import InferenceClient |
|
from dotenv import load_dotenv |
|
from typing import List, Dict, Tuple |
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
HF_TOKEN = os.getenv('HF_TOKEN') |
|
MODEL_NAME = "Qwen/Qwen2.5-72B-Instruct" |
|
|
|
|
|
DATABASE_DIR = "chroma_db" |
|
|
|
|
|
EMBEDDING_MODEL = "sentence-transformers/paraphrase-multilingual-mpnet-base-v2" |
|
|
|
|
|
SYSTEM_PROMPT = """Jesteś asystentem prawniczym specjalizującym się w polskim prawie. |
|
Twoje odpowiedzi opierają się na aktualnych przepisach prawnych. |
|
Zawsze cytuj konkretne artykuły i paragrafy z odpowiednich ustaw.""" |
|
|
|
class KodeksProcessor: |
|
def __init__(self): |
|
logging.info("Inicjalizacja klienta bazy danych...") |
|
self.client = chromadb.PersistentClient(path=DATABASE_DIR) |
|
try: |
|
self.collection = self.client.get_collection("kodeksy") |
|
logging.info("Pobrano istniejącą kolekcję 'kodeksy'.") |
|
except: |
|
self.collection = self.client.create_collection( |
|
name="kodeksy", |
|
embedding_function=embedding_functions.SentenceTransformerEmbeddingFunction( |
|
model_name=EMBEDDING_MODEL |
|
) |
|
) |
|
logging.info("Utworzono nową kolekcję 'kodeksy'.") |
|
|
|
def extract_metadata(self, text: str) -> Dict: |
|
metadata = {} |
|
dz_u_match = re.search(r'Dz\.U\.(\d{4})\.(\d+)\.(\d+)', text) |
|
if dz_u_match: |
|
metadata['dz_u'] = f"Dz.U.{dz_u_match.group(1)}.{dz_u_match.group(2)}.{dz_u_match.group(3)}" |
|
metadata['rok'] = dz_u_match.group(1) |
|
|
|
nazwa_match = re.search(r'USTAWA\s+z dnia(.*?)\n(.*?)\n', text) |
|
if nazwa_match: |
|
metadata['data_ustawy'] = nazwa_match.group(1).strip() |
|
metadata['nazwa'] = nazwa_match.group(2).strip() |
|
|
|
logging.info("Wydobyto metadane: %s", metadata) |
|
return metadata |
|
|
|
def split_header_and_content(self, text: str) -> Tuple[str, str]: |
|
parts = text.split("USTAWA", 1) |
|
if len(parts) > 1: |
|
return parts[0], "USTAWA" + parts[1] |
|
return "", text |
|
|
|
def process_article(self, article_text: str) -> Dict: |
|
art_num_match = re.match(r'Art\.\s*(\d+)', article_text) |
|
article_num = art_num_match.group(1) if art_num_match else "" |
|
|
|
paragraphs = re.findall(r'§\s*(\d+)\.\s*(.*?)(?=§\s*\d+|Art\.\s*\d+|$)', article_text, re.DOTALL) |
|
|
|
if not paragraphs: |
|
return { |
|
"article_num": article_num, |
|
"content": article_text.strip(), |
|
"has_paragraphs": False |
|
} |
|
|
|
return { |
|
"article_num": article_num, |
|
"paragraphs": paragraphs, |
|
"has_paragraphs": True |
|
} |
|
|
|
def split_into_chunks(self, text: str, metadata: Dict) -> List[Dict]: |
|
chunks = [] |
|
articles = re.split(r'(Art\.\s*\d+)', text) |
|
|
|
for i in range(1, len(articles), 2): |
|
article_title = articles[i].strip() |
|
article_content = articles[i + 1].strip() if i + 1 < len(articles) else "" |
|
|
|
processed_article = self.process_article(article_title + " " + article_content) |
|
|
|
chunk_metadata = { |
|
**metadata, |
|
"article": processed_article["article_num"] |
|
} |
|
|
|
if processed_article["has_paragraphs"]: |
|
for par_num, par_content in processed_article["paragraphs"]: |
|
chunks.append({ |
|
"text": f"{article_title} §{par_num}. {par_content.strip()}", |
|
"metadata": {**chunk_metadata, "paragraph": par_num} |
|
}) |
|
else: |
|
chunks.append({ |
|
"text": processed_article["content"], |
|
"metadata": chunk_metadata |
|
}) |
|
|
|
logging.info("Podzielono tekst na %d chunków.", len(chunks)) |
|
return chunks |
|
|
|
def process_file(self, filepath: str) -> None: |
|
logging.info("Przetwarzanie pliku: %s", filepath) |
|
|
|
with open(filepath, 'r', encoding='utf-8') as file: |
|
content = file.read() |
|
|
|
header, main_content = self.split_header_and_content(content) |
|
metadata = self.extract_metadata(main_content) |
|
metadata['filename'] = os.path.basename(filepath) |
|
|
|
chunks = self.split_into_chunks(main_content, metadata) |
|
|
|
if chunks: |
|
for i, chunk in enumerate(chunks): |
|
self.collection.add( |
|
documents=[chunk["text"]], |
|
metadatas=[chunk["metadata"]], |
|
ids=[f"{metadata['filename']}_{chunk['metadata']['article']}_{i}"] |
|
) |
|
logging.info("Dodano chunk: %s", chunk["text"]) |
|
else: |
|
logging.warning("Brak chunków do dodania z pliku: %s", filepath) |
|
|
|
logging.info("Dodano %d chunków z pliku %s", len(chunks), metadata['filename']) |
|
|
|
def process_all_files(self, directory: str) -> None: |
|
logging.info("Rozpoczęcie przetwarzania wszystkich plików w katalogu: %s", directory) |
|
for filename in os.listdir(directory): |
|
if filename.endswith('.txt'): |
|
filepath = os.path.join(directory, filename) |
|
logging.info("Przetwarzanie pliku: %s", filepath) |
|
self.process_file(filepath) |
|
logging.info("Zakończono przetwarzanie plików.") |
|
|
|
def search(self, query: str, n_results: int = 3) -> Dict: |
|
logging.info("Wyszukiwanie w bazie danych dla zapytania: %s", query) |
|
results = self.collection.query( |
|
query_texts=[query], |
|
n_results=n_results |
|
) |
|
logging.info("Znaleziono %d wyników dla zapytania: %s", len(results['documents'][0]), query) |
|
return results |
|
|
|
class Chatbot: |
|
def __init__(self): |
|
self.client = InferenceClient(api_key=HF_TOKEN) |
|
self.conversation_history = [ |
|
{"role": "system", "content": SYSTEM_PROMPT} |
|
] |
|
|
|
def generate_context(self, relevant_chunks: List[Dict]) -> str: |
|
context = "Kontekst z przepisów prawnych:\n\n" |
|
for chunk in relevant_chunks: |
|
context += f"{chunk['text']}\n\n" |
|
return context |
|
|
|
def get_response(self, user_input: str, context: str) -> str: |
|
messages = self.conversation_history + [ |
|
{"role": "user", "content": f"Kontekst: {context}\n\nPytanie: {user_input}"} |
|
] |
|
|
|
response = "" |
|
stream = self.client.chat.completions.create( |
|
model=MODEL_NAME, |
|
messages=messages, |
|
temperature=0.5, |
|
max_tokens=8192, |
|
top_p=0.7, |
|
stream=True |
|
) |
|
|
|
for chunk in stream: |
|
content = chunk.choices[0].delta.content |
|
if content: |
|
response += content |
|
yield content |
|
|
|
self.conversation_history.append({"role": "user", "content": user_input}) |
|
self.conversation_history.append({"role": "assistant", "content": response}) |
|
|
|
def clear_history(self): |
|
self.conversation_history = [ |
|
{"role": "system", "content": SYSTEM_PROMPT} |
|
] |
|
|
|
def initialize_session_state(): |
|
if 'chatbot' not in st.session_state: |
|
st.session_state.chatbot = Chatbot() |
|
if 'messages' not in st.session_state: |
|
st.session_state.messages = [] |
|
|
|
def main(): |
|
st.title("Asystent Prawny") |
|
|
|
initialize_session_state() |
|
|
|
|
|
if 'db_initialized' not in st.session_state: |
|
with st.spinner("Inicjalizacja bazy danych..."): |
|
processor = KodeksProcessor() |
|
if not os.path.exists("chroma_db"): |
|
processor.process_all_files("data/kodeksy") |
|
st.session_state.db_initialized = True |
|
|
|
|
|
if st.sidebar.button("Wyczyść historię"): |
|
st.session_state.chatbot.clear_history() |
|
st.session_state.messages = [] |
|
st.rerun() |
|
|
|
|
|
for message in st.session_state.messages: |
|
with st.chat_message(message["role"]): |
|
st.markdown(message["content"]) |
|
|
|
|
|
if prompt := st.chat_input("Zadaj pytanie dotyczące prawa..."): |
|
|
|
st.session_state.messages.append({"role": "user", "content": prompt}) |
|
|
|
with st.chat_message("user"): |
|
st.markdown(prompt) |
|
|
|
|
|
processor = KodeksProcessor() |
|
relevant_chunks = processor.search(prompt) |
|
|
|
|
|
with st.chat_message("assistant"): |
|
message_placeholder = st.empty() |
|
full_response = "" |
|
|
|
context = st.session_state.chatbot.generate_context( |
|
[{"text": doc} for doc in relevant_chunks['documents'][0]] |
|
) |
|
|
|
for response_chunk in st.session_state.chatbot.get_response(prompt, context): |
|
full_response += response_chunk |
|
message_placeholder.markdown(full_response + "▌") |
|
|
|
message_placeholder.markdown(full_response) |
|
|
|
|
|
st.session_state.messages.append({"role": "assistant", "content": full_response}) |
|
|
|
if __name__ == "__main__": |
|
main() |