Update database.py
Browse files- database.py +38 -26
database.py
CHANGED
@@ -1,15 +1,21 @@
|
|
1 |
import os
|
2 |
import re
|
|
|
3 |
from typing import List, Dict, Tuple
|
4 |
import chromadb
|
5 |
from chromadb.utils import embedding_functions
|
6 |
-
from config import
|
|
|
|
|
|
|
7 |
|
8 |
class KodeksProcessor:
|
9 |
def __init__(self):
|
|
|
10 |
self.client = chromadb.PersistentClient(path=DATABASE_DIR)
|
11 |
try:
|
12 |
self.collection = self.client.get_collection("kodeksy")
|
|
|
13 |
except:
|
14 |
self.collection = self.client.create_collection(
|
15 |
name="kodeksy",
|
@@ -17,19 +23,21 @@ class KodeksProcessor:
|
|
17 |
model_name=EMBEDDING_MODEL
|
18 |
)
|
19 |
)
|
|
|
20 |
|
21 |
def extract_metadata(self, text: str) -> Dict:
|
22 |
metadata = {}
|
23 |
-
dz_u_match = re.search(r'Dz
|
24 |
if dz_u_match:
|
25 |
metadata['dz_u'] = f"Dz.U.{dz_u_match.group(1)}.{dz_u_match.group(2)}.{dz_u_match.group(3)}"
|
26 |
metadata['rok'] = dz_u_match.group(1)
|
27 |
-
|
28 |
-
nazwa_match = re.search(r'USTAWA
|
29 |
if nazwa_match:
|
30 |
metadata['data_ustawy'] = nazwa_match.group(1).strip()
|
31 |
metadata['nazwa'] = nazwa_match.group(2).strip()
|
32 |
-
|
|
|
33 |
return metadata
|
34 |
|
35 |
def split_header_and_content(self, text: str) -> Tuple[str, str]:
|
@@ -39,18 +47,18 @@ class KodeksProcessor:
|
|
39 |
return "", text
|
40 |
|
41 |
def process_article(self, article_text: str) -> Dict:
|
42 |
-
art_num_match = re.match(r'Art
|
43 |
article_num = art_num_match.group(1) if art_num_match else ""
|
44 |
-
|
45 |
-
paragraphs = re.findall(r'
|
46 |
-
|
47 |
if not paragraphs:
|
48 |
return {
|
49 |
"article_num": article_num,
|
50 |
"content": article_text.strip(),
|
51 |
"has_paragraphs": False
|
52 |
}
|
53 |
-
|
54 |
return {
|
55 |
"article_num": article_num,
|
56 |
"paragraphs": paragraphs,
|
@@ -59,29 +67,29 @@ class KodeksProcessor:
|
|
59 |
|
60 |
def split_into_chunks(self, text: str, metadata: Dict) -> List[Dict]:
|
61 |
chunks = []
|
62 |
-
chapters = re.split(r'(Rozdzia艂
|
63 |
current_chapter = ""
|
64 |
-
|
65 |
for i, section in enumerate(chapters):
|
66 |
if section.startswith('Rozdzia艂'):
|
67 |
current_chapter = section.strip()
|
68 |
continue
|
69 |
-
|
70 |
-
articles = re.split(r'(Art
|
71 |
-
|
72 |
for article in articles:
|
73 |
if not article.strip():
|
74 |
continue
|
75 |
-
|
76 |
if article.startswith('Art.'):
|
77 |
processed_article = self.process_article(article)
|
78 |
-
|
79 |
chunk_metadata = {
|
80 |
**metadata,
|
81 |
"chapter": current_chapter,
|
82 |
"article": processed_article["article_num"]
|
83 |
}
|
84 |
-
|
85 |
if processed_article["has_paragraphs"]:
|
86 |
for par_num, par_content in processed_article["paragraphs"]:
|
87 |
chunks.append({
|
@@ -93,39 +101,43 @@ class KodeksProcessor:
|
|
93 |
"text": processed_article["content"],
|
94 |
"metadata": chunk_metadata
|
95 |
})
|
96 |
-
|
|
|
97 |
return chunks
|
98 |
|
99 |
def process_file(self, filepath: str) -> None:
|
100 |
-
|
101 |
-
|
102 |
with open(filepath, 'r', encoding='utf-8') as file:
|
103 |
content = file.read()
|
104 |
-
|
105 |
header, main_content = self.split_header_and_content(content)
|
106 |
metadata = self.extract_metadata(main_content)
|
107 |
metadata['filename'] = os.path.basename(filepath)
|
108 |
-
|
109 |
chunks = self.split_into_chunks(main_content, metadata)
|
110 |
-
|
111 |
for i, chunk in enumerate(chunks):
|
112 |
self.collection.add(
|
113 |
documents=[chunk["text"]],
|
114 |
metadatas=[chunk["metadata"]],
|
115 |
ids=[f"{metadata['filename']}_{chunk['metadata']['article']}_{i}"]
|
116 |
)
|
117 |
-
|
118 |
-
|
119 |
|
120 |
def process_all_files(self, directory: str) -> None:
|
|
|
121 |
for filename in os.listdir(directory):
|
122 |
if filename.endswith('.txt'):
|
123 |
filepath = os.path.join(directory, filename)
|
124 |
self.process_file(filepath)
|
125 |
|
126 |
def search(self, query: str, n_results: int = 3) -> Dict:
|
|
|
127 |
results = self.collection.query(
|
128 |
query_texts=[query],
|
129 |
n_results=n_results
|
130 |
)
|
|
|
131 |
return results
|
|
|
1 |
import os
|
2 |
import re
|
3 |
+
import logging
|
4 |
from typing import List, Dict, Tuple
|
5 |
import chromadb
|
6 |
from chromadb.utils import embedding_functions
|
7 |
+
from config import EMBEDDING_MODEL, DATABASE_DIR
|
8 |
+
|
9 |
+
# Konfiguracja logowania
|
10 |
+
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
11 |
|
12 |
class KodeksProcessor:
|
13 |
def __init__(self):
|
14 |
+
logging.info("Inicjalizacja klienta bazy danych...")
|
15 |
self.client = chromadb.PersistentClient(path=DATABASE_DIR)
|
16 |
try:
|
17 |
self.collection = self.client.get_collection("kodeksy")
|
18 |
+
logging.info("Pobrano istniej膮c膮 kolekcj臋 'kodeksy'.")
|
19 |
except:
|
20 |
self.collection = self.client.create_collection(
|
21 |
name="kodeksy",
|
|
|
23 |
model_name=EMBEDDING_MODEL
|
24 |
)
|
25 |
)
|
26 |
+
logging.info("Utworzono now膮 kolekcj臋 'kodeksy'.")
|
27 |
|
28 |
def extract_metadata(self, text: str) -> Dict:
|
29 |
metadata = {}
|
30 |
+
dz_u_match = re.search(r'Dz\\.U\\.(\\d{4})\\.(\\d+)\\.(\\d+)', text)
|
31 |
if dz_u_match:
|
32 |
metadata['dz_u'] = f"Dz.U.{dz_u_match.group(1)}.{dz_u_match.group(2)}.{dz_u_match.group(3)}"
|
33 |
metadata['rok'] = dz_u_match.group(1)
|
34 |
+
|
35 |
+
nazwa_match = re.search(r'USTAWA\\s+z dnia(.*?)\\n(.*?)\\n', text)
|
36 |
if nazwa_match:
|
37 |
metadata['data_ustawy'] = nazwa_match.group(1).strip()
|
38 |
metadata['nazwa'] = nazwa_match.group(2).strip()
|
39 |
+
|
40 |
+
logging.info("Wydobyto metadane: %s", metadata)
|
41 |
return metadata
|
42 |
|
43 |
def split_header_and_content(self, text: str) -> Tuple[str, str]:
|
|
|
47 |
return "", text
|
48 |
|
49 |
def process_article(self, article_text: str) -> Dict:
|
50 |
+
art_num_match = re.match(r'Art\\.\\s*(\\d+)', article_text)
|
51 |
article_num = art_num_match.group(1) if art_num_match else ""
|
52 |
+
|
53 |
+
paragraphs = re.findall(r'搂\\s*(\\d+)[.\\s]+(.*?)(?=搂\\s*\\d+|$)', article_text, re.DOTALL)
|
54 |
+
|
55 |
if not paragraphs:
|
56 |
return {
|
57 |
"article_num": article_num,
|
58 |
"content": article_text.strip(),
|
59 |
"has_paragraphs": False
|
60 |
}
|
61 |
+
|
62 |
return {
|
63 |
"article_num": article_num,
|
64 |
"paragraphs": paragraphs,
|
|
|
67 |
|
68 |
def split_into_chunks(self, text: str, metadata: Dict) -> List[Dict]:
|
69 |
chunks = []
|
70 |
+
chapters = re.split(r'(Rozdzia艂 \\d+\\n\\n[^\\n]+)\\n', text)
|
71 |
current_chapter = ""
|
72 |
+
|
73 |
for i, section in enumerate(chapters):
|
74 |
if section.startswith('Rozdzia艂'):
|
75 |
current_chapter = section.strip()
|
76 |
continue
|
77 |
+
|
78 |
+
articles = re.split(r'(Art\\.\\s*\\d+.*?)(?=Art\\.\\s*\\d+|$)', section)
|
79 |
+
|
80 |
for article in articles:
|
81 |
if not article.strip():
|
82 |
continue
|
83 |
+
|
84 |
if article.startswith('Art.'):
|
85 |
processed_article = self.process_article(article)
|
86 |
+
|
87 |
chunk_metadata = {
|
88 |
**metadata,
|
89 |
"chapter": current_chapter,
|
90 |
"article": processed_article["article_num"]
|
91 |
}
|
92 |
+
|
93 |
if processed_article["has_paragraphs"]:
|
94 |
for par_num, par_content in processed_article["paragraphs"]:
|
95 |
chunks.append({
|
|
|
101 |
"text": processed_article["content"],
|
102 |
"metadata": chunk_metadata
|
103 |
})
|
104 |
+
|
105 |
+
logging.info("Podzielono tekst na %d chunk贸w.", len(chunks))
|
106 |
return chunks
|
107 |
|
108 |
def process_file(self, filepath: str) -> None:
|
109 |
+
logging.info("Przetwarzanie pliku: %s", filepath)
|
110 |
+
|
111 |
with open(filepath, 'r', encoding='utf-8') as file:
|
112 |
content = file.read()
|
113 |
+
|
114 |
header, main_content = self.split_header_and_content(content)
|
115 |
metadata = self.extract_metadata(main_content)
|
116 |
metadata['filename'] = os.path.basename(filepath)
|
117 |
+
|
118 |
chunks = self.split_into_chunks(main_content, metadata)
|
119 |
+
|
120 |
for i, chunk in enumerate(chunks):
|
121 |
self.collection.add(
|
122 |
documents=[chunk["text"]],
|
123 |
metadatas=[chunk["metadata"]],
|
124 |
ids=[f"{metadata['filename']}_{chunk['metadata']['article']}_{i}"]
|
125 |
)
|
126 |
+
|
127 |
+
logging.info("Dodano %d chunk贸w z pliku %s", len(chunks), metadata['filename'])
|
128 |
|
129 |
def process_all_files(self, directory: str) -> None:
|
130 |
+
logging.info("Rozpocz臋cie przetwarzania wszystkich plik贸w w katalogu: %s", directory)
|
131 |
for filename in os.listdir(directory):
|
132 |
if filename.endswith('.txt'):
|
133 |
filepath = os.path.join(directory, filename)
|
134 |
self.process_file(filepath)
|
135 |
|
136 |
def search(self, query: str, n_results: int = 3) -> Dict:
|
137 |
+
logging.info("Wyszukiwanie w bazie danych dla zapytania: %s", query)
|
138 |
results = self.collection.query(
|
139 |
query_texts=[query],
|
140 |
n_results=n_results
|
141 |
)
|
142 |
+
logging.info("Znaleziono %d wynik贸w dla zapytania: %s", len(results['documents'][0]), query)
|
143 |
return results
|