File size: 5,555 Bytes
cdd85c7
 
9b7aea8
cdd85c7
 
 
9b7aea8
 
 
 
cdd85c7
 
 
9b7aea8
cdd85c7
 
 
9b7aea8
cdd85c7
 
 
 
 
 
 
9b7aea8
cdd85c7
 
 
e0f90ab
cdd85c7
 
 
9b7aea8
e0f90ab
cdd85c7
 
 
9b7aea8
 
cdd85c7
 
 
 
 
 
 
 
 
e0f90ab
cdd85c7
9b7aea8
e6eebe9
9b7aea8
cdd85c7
 
 
 
 
 
9b7aea8
cdd85c7
 
 
 
 
 
 
 
e6eebe9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9b7aea8
 
cdd85c7
 
 
9b7aea8
 
cdd85c7
 
9b7aea8
cdd85c7
 
 
9b7aea8
cdd85c7
9b7aea8
cdd85c7
 
 
 
 
 
e0f90ab
9b7aea8
 
cdd85c7
 
9b7aea8
cdd85c7
 
 
 
 
 
9b7aea8
cdd85c7
 
 
 
9b7aea8
e0f90ab
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import os
import re
import logging
from typing import List, Dict, Tuple
import chromadb
from chromadb.utils import embedding_functions
from config import EMBEDDING_MODEL, DATABASE_DIR

# Konfiguracja logowania
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class KodeksProcessor:
    def __init__(self):
        logging.info("Inicjalizacja klienta bazy danych...")
        self.client = chromadb.PersistentClient(path=DATABASE_DIR)
        try:
            self.collection = self.client.get_collection("kodeksy")
            logging.info("Pobrano istniej膮c膮 kolekcj臋 'kodeksy'.")
        except:
            self.collection = self.client.create_collection(
                name="kodeksy",
                embedding_function=embedding_functions.SentenceTransformerEmbeddingFunction(
                    model_name=EMBEDDING_MODEL
                )
            )
            logging.info("Utworzono now膮 kolekcj臋 'kodeksy'.")

    def extract_metadata(self, text: str) -> Dict:
        metadata = {}
        dz_u_match = re.search(r'Dz\.U\.(\d{4})\.(\d+)\.(\d+)', text)
        if dz_u_match:
            metadata['dz_u'] = f"Dz.U.{dz_u_match.group(1)}.{dz_u_match.group(2)}.{dz_u_match.group(3)}"
            metadata['rok'] = dz_u_match.group(1)

        nazwa_match = re.search(r'USTAWA\s+z dnia(.*?)\n(.*?)\n', text)
        if nazwa_match:
            metadata['data_ustawy'] = nazwa_match.group(1).strip()
            metadata['nazwa'] = nazwa_match.group(2).strip()

        logging.info("Wydobyto metadane: %s", metadata)
        return metadata

    def split_header_and_content(self, text: str) -> Tuple[str, str]:
        parts = text.split("USTAWA", 1)
        if len(parts) > 1:
            return parts[0], "USTAWA" + parts[1]
        return "", text

    def process_article(self, article_text: str) -> Dict:
        art_num_match = re.match(r'Art\.\s*(\d+)', article_text)
        article_num = art_num_match.group(1) if art_num_match else ""

        paragraphs = re.findall(r'搂\s*(\d+)\.\s*(.*?)(?=搂\s*\d+|Art\.\s*\d+|$)', article_text, re.DOTALL)

        if not paragraphs:
            return {
                "article_num": article_num,
                "content": article_text.strip(),
                "has_paragraphs": False
            }

        return {
            "article_num": article_num,
            "paragraphs": paragraphs,
            "has_paragraphs": True
        }

    def split_into_chunks(self, text: str, metadata: Dict) -> List[Dict]:
        chunks = []
        articles = re.split(r'(Art\.\s*\d+)', text)  # Podzia艂 na artyku艂y

        for i in range(1, len(articles), 2):  # Przechodzimy przez artyku艂y
            article_title = articles[i].strip()
            article_content = articles[i + 1].strip() if i + 1 < len(articles) else ""

            processed_article = self.process_article(article_title + " " + article_content)

            chunk_metadata = {
                **metadata,
                "article": processed_article["article_num"]
            }

            if processed_article["has_paragraphs"]:
                for par_num, par_content in processed_article["paragraphs"]:
                    chunks.append({
                        "text": f"{article_title}{par_num}. {par_content.strip()}",
                        "metadata": {**chunk_metadata, "paragraph": par_num}
                    })
            else:
                chunks.append({
                    "text": processed_article["content"],
                    "metadata": chunk_metadata
                })

        logging.info("Podzielono tekst na %d chunk贸w.", len(chunks))
        return chunks

    def process_file(self, filepath: str) -> None:
        logging.info("Przetwarzanie pliku: %s", filepath)

        with open(filepath, 'r', encoding='utf-8') as file:
            content = file.read()

        header, main_content = self.split_header_and_content(content)
        metadata = self.extract_metadata(main_content)
        metadata['filename'] = os.path.basename(filepath)

        chunks = self.split_into_chunks(main_content, metadata)

        for i, chunk in enumerate(chunks):
            self.collection.add(
                documents=[chunk["text"]],
                metadatas=[chunk["metadata"]],
                ids=[f"{metadata['filename']}_{chunk['metadata']['article']}_{i}"]
            )
            logging.info("Dodano chunk: %s", chunk["text"])  # Logowanie dodawanych chunk贸w

        logging.info("Dodano %d chunk贸w z pliku %s", len(chunks), metadata['filename'])

    def process_all_files(self, directory: str) -> None:
        logging.info("Rozpocz臋cie przetwarzania wszystkich plik贸w w katalogu: %s", directory)
        for filename in os.listdir(directory):
            if filename.endswith('.txt'):
                filepath = os.path.join(directory, filename)
                self.process_file(filepath)

    def search(self, query: str, n_results: int = 3) -> Dict:
        logging.info("Wyszukiwanie w bazie danych dla zapytania: %s", query)
        results = self.collection.query(
            query_texts=[query],
            n_results=n_results
        )
        logging.info("Znaleziono %d wynik贸w dla zapytania: %s", len(results['documents'][0]), query)
        return results

    def list_all_documents(self) -> None:
        all_docs = self.collection.query(query_texts=[""], n_results=1000)  # Pobierz wszystkie dokumenty
        for doc in all_docs['documents'][0]:
            logging.info("Dokument: %s", doc)