marondeau commited on
Commit
fde3910
·
unverified ·
2 Parent(s): 1696c32 a3c0809

Merge pull request #59 from marondeau/schema

Browse files

New schema for the database, including structure information.

buster/documents/sqlite.py DELETED
@@ -1,122 +0,0 @@
1
- import sqlite3
2
- import warnings
3
- import zlib
4
-
5
- import numpy as np
6
- import pandas as pd
7
-
8
- from buster.documents.base import DocumentsManager
9
-
10
- documents_table = """CREATE TABLE IF NOT EXISTS documents (
11
- id INTEGER PRIMARY KEY AUTOINCREMENT,
12
- source TEXT NOT NULL,
13
- title TEXT NOT NULL,
14
- url TEXT NOT NULL,
15
- content TEXT NOT NULL,
16
- n_tokens INTEGER,
17
- embedding BLOB,
18
- current INTEGER
19
- )"""
20
-
21
- qa_table = """CREATE TABLE IF NOT EXISTS qa (
22
- id INTEGER PRIMARY KEY AUTOINCREMENT,
23
- source TEXT NOT NULL,
24
- prompt TEXT NOT NULL,
25
- answer TEXT NOT NULL,
26
- document_id_1 INTEGER,
27
- document_id_2 INTEGER,
28
- document_id_3 INTEGER,
29
- label_question INTEGER,
30
- label_answer INTEGER,
31
- testset INTEGER,
32
- FOREIGN KEY (document_id_1) REFERENCES documents (id),
33
- FOREIGN KEY (document_id_2) REFERENCES documents (id),
34
- FOREIGN KEY (document_id_3) REFERENCES documents (id)
35
- )"""
36
-
37
-
38
- class DocumentsDB(DocumentsManager):
39
- """Simple SQLite database for storing documents and questions/answers.
40
-
41
- The database is just a file on disk. It can store documents from different sources, and it can store multiple versions of the same document (e.g. if the document is updated).
42
- Questions/answers refer to the version of the document that was used at the time.
43
-
44
- Example:
45
- >>> db = DocumentsDB("/path/to/the/db.db")
46
- >>> db.add("source", df) # df is a DataFrame containing the documents from a given source, obtained e.g. by using buster.docparser.generate_embeddings
47
- >>> df = db.get_documents("source")
48
- """
49
-
50
- def __init__(self, filepath: str):
51
- self.db_path = filepath
52
- self.conn = sqlite3.connect(filepath)
53
- self.cursor = self.conn.cursor()
54
-
55
- self.__initialize()
56
-
57
- def __del__(self):
58
- self.conn.close()
59
-
60
- def __initialize(self):
61
- """Initialize the database."""
62
- self.cursor.execute(documents_table)
63
- self.cursor.execute(qa_table)
64
- self.conn.commit()
65
-
66
- def add(self, source: str, df: pd.DataFrame):
67
- """Write all documents from the dataframe into the db. All previous documents from that source will be set to `current = 0`."""
68
- df = df.copy()
69
-
70
- # Prepare the rows
71
- df["source"] = source
72
- df["current"] = 1
73
- columns = ["source", "title", "url", "content", "current"]
74
- if "embedding" in df.columns:
75
- columns.extend(
76
- [
77
- "n_tokens",
78
- "embedding",
79
- ]
80
- )
81
-
82
- # Check that the embeddings are float32
83
- if not df["embedding"].iloc[0].dtype == np.float32:
84
- warnings.warn(
85
- f"Embeddings are not float32, converting them to float32 from {df['embedding'].iloc[0].dtype}.",
86
- RuntimeWarning,
87
- )
88
- df["embedding"] = df["embedding"].apply(lambda x: x.astype(np.float32))
89
-
90
- # ZLIB compress the embeddings
91
- df["embedding"] = df["embedding"].apply(lambda x: sqlite3.Binary(zlib.compress(x.tobytes())))
92
-
93
- data = df[columns].values.tolist()
94
-
95
- # Set `current` to 0 for all previous documents from that source
96
- self.cursor.execute("UPDATE documents SET current = 0 WHERE source = ?", (source,))
97
-
98
- # Insert the new documents
99
- insert_statement = f"INSERT INTO documents ({', '.join(columns)}) VALUES ({', '.join(['?']*len(columns))})"
100
- self.cursor.executemany(insert_statement, data)
101
-
102
- self.conn.commit()
103
-
104
- def get_documents(self, source: str) -> pd.DataFrame:
105
- """Get all current documents from a given source."""
106
- # Execute the SQL statement and fetch the results
107
- if source is not None:
108
- results = self.cursor.execute("SELECT * FROM documents WHERE source = ? AND current = 1", (source,))
109
- else:
110
- results = self.cursor.execute("SELECT * FROM documents WHERE current = 1")
111
- rows = results.fetchall()
112
-
113
- # Convert the results to a pandas DataFrame
114
- df = pd.DataFrame(rows, columns=[description[0] for description in results.description])
115
-
116
- # ZLIB decompress the embeddings
117
- df["embedding"] = df["embedding"].apply(lambda x: np.frombuffer(zlib.decompress(x), dtype=np.float32).tolist())
118
-
119
- # Drop the `current` column
120
- df.drop(columns=["current"], inplace=True)
121
-
122
- return df
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
buster/documents/sqlite/__init__.py ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ from .documents import DocumentsDB
2
+
3
+ __all__ = [DocumentsDB]
buster/documents/sqlite/backward.py ADDED
@@ -0,0 +1,105 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Used to import existing DB as a new DB."""
2
+
3
+ import argparse
4
+ import itertools
5
+ import sqlite3
6
+ from typing import Iterable, NamedTuple
7
+
8
+ import numpy as np
9
+
10
+ import buster.documents.sqlite.documents as dest
11
+ from buster.documents.sqlite import DocumentsDB
12
+
13
+ IMPORT_QUERY = (
14
+ r"""SELECT source, url, title, content FROM documents WHERE current = 1 ORDER BY source, url, title, id"""
15
+ )
16
+ CHUNK_QUERY = r"""SELECT source, url, title, content, n_tokens, embedding FROM documents WHERE current = 1 ORDER BY source, url, id"""
17
+
18
+
19
+ class Document(NamedTuple):
20
+ """Document from the original db."""
21
+
22
+ source: str
23
+ url: str
24
+ title: str
25
+ content: str
26
+
27
+
28
+ class Section(NamedTuple):
29
+ """Reassemble section from the original db."""
30
+
31
+ url: str
32
+ title: str
33
+ content: str
34
+
35
+
36
+ class Chunk(NamedTuple):
37
+ """Chunk from the original db."""
38
+
39
+ source: str
40
+ url: str
41
+ title: str
42
+ content: str
43
+ n_tokens: int
44
+ embedding: np.ndarray
45
+
46
+
47
+ def get_documents(conn: sqlite3.Connection) -> Iterable[tuple[str, Iterable[Section]]]:
48
+ """Reassemble documents from the source db's chunks."""
49
+ documents = (Document(*row) for row in conn.execute(IMPORT_QUERY))
50
+ by_sources = itertools.groupby(documents, lambda doc: doc.source)
51
+ for source, documents in by_sources:
52
+ documents = itertools.groupby(documents, lambda doc: (doc.url, doc.title))
53
+ sections = (
54
+ Section(url, title, "".join(chunk.content for chunk in chunks)) for (url, title), chunks in documents
55
+ )
56
+ yield source, sections
57
+
58
+
59
+ def get_max_size(conn: sqlite3.Connection) -> int:
60
+ """Get the maximum chunk size from the source db."""
61
+ sizes = (size for size, in conn.execute("select max(length(content)) FROM documents"))
62
+ (size,) = sizes
63
+ return size
64
+
65
+
66
+ def get_chunks(conn: sqlite3.Connection) -> Iterable[tuple[str, Iterable[Iterable[dest.Chunk]]]]:
67
+ """Retrieve chunks from the source db."""
68
+ chunks = (Chunk(*row) for row in conn.execute(CHUNK_QUERY))
69
+ by_sources = itertools.groupby(chunks, lambda chunk: chunk.source)
70
+ for source, chunks in by_sources:
71
+ by_section = itertools.groupby(chunks, lambda chunk: (chunk.url, chunk.title))
72
+
73
+ sections = (
74
+ (dest.Chunk(chunk.content, chunk.n_tokens, chunk.embedding) for chunk in chunks) for _, chunks in by_section
75
+ )
76
+
77
+ yield source, sections
78
+
79
+
80
+ def main():
81
+ """Import the source db into the destination db."""
82
+ parser = argparse.ArgumentParser()
83
+ parser.add_argument("source")
84
+ parser.add_argument("destination")
85
+ parser.add_argument("--size", type=int, default=2000)
86
+ args = parser.parse_args()
87
+ org = sqlite3.connect(args.source)
88
+ db = DocumentsDB(args.destination)
89
+
90
+ for source, content in get_documents(org):
91
+ # sid, vid = db.start_version(source)
92
+ sections = (dest.Section(section.title, section.url, section.content) for section in content)
93
+ db.add_parse(source, sections)
94
+
95
+ size = max(args.size, get_max_size(org))
96
+ for source, chunks in get_chunks(org):
97
+ sid, vid = db.get_current_version(source)
98
+ db.add_chunking(sid, vid, size, chunks)
99
+ db.conn.commit()
100
+
101
+ return
102
+
103
+
104
+ if __name__ == "__main__":
105
+ main()
buster/documents/sqlite/documents.py ADDED
@@ -0,0 +1,155 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import itertools
2
+ import sqlite3
3
+ import warnings
4
+ import zlib
5
+ from pathlib import Path
6
+ from typing import Iterable, NamedTuple
7
+
8
+ import numpy as np
9
+ import pandas as pd
10
+
11
+ import buster.documents.sqlite.schema as schema
12
+ from buster.documents.base import DocumentsManager
13
+
14
+
15
+ class Section(NamedTuple):
16
+ title: str
17
+ url: str
18
+ content: str
19
+ parent: int | None = None
20
+ type: str = "section"
21
+
22
+
23
+ class Chunk(NamedTuple):
24
+ content: str
25
+ n_tokens: int
26
+ emb: np.ndarray
27
+
28
+
29
+ class DocumentsDB(DocumentsManager):
30
+ """Simple SQLite database for storing documents and questions/answers.
31
+
32
+ The database is just a file on disk. It can store documents from different sources, and it can store multiple versions of the same document (e.g. if the document is updated).
33
+ Questions/answers refer to the version of the document that was used at the time.
34
+
35
+ Example:
36
+ >>> db = DocumentsDB("/path/to/the/db.db")
37
+ >>> db.add("source", df) # df is a DataFrame containing the documents from a given source, obtained e.g. by using buster.docparser.generate_embeddings
38
+ >>> df = db.get_documents("source")
39
+ """
40
+
41
+ def __init__(self, db_path: sqlite3.Connection | str):
42
+ if isinstance(db_path, (str, Path)):
43
+ self.db_path = db_path
44
+ self.conn = sqlite3.connect(db_path, detect_types=sqlite3.PARSE_DECLTYPES)
45
+ else:
46
+ self.db_path = None
47
+ self.conn = db_path
48
+ schema.initialize_db(self.conn)
49
+ schema.setup_db(self.conn)
50
+
51
+ def __del__(self):
52
+ if self.db_path is not None:
53
+ self.conn.close()
54
+
55
+ def get_current_version(self, source: str) -> tuple[int, int]:
56
+ """Get the current version of a source."""
57
+ cur = self.conn.execute("SELECT source, version FROM latest_version WHERE name = ?", (source,))
58
+ row = cur.fetchone()
59
+ if row is None:
60
+ raise KeyError(f'"{source}" is not a known source')
61
+ sid, vid = row
62
+ return sid, vid
63
+
64
+ def get_source(self, source: str) -> int:
65
+ """Get the id of a source."""
66
+ cur = self.conn.execute("SELECT id FROM sources WHERE name = ?", (source,))
67
+ row = cur.fetchone()
68
+ if row is not None:
69
+ (sid,) = row
70
+ else:
71
+ cur = self.conn.execute("INSERT INTO sources (name) VALUES (?)", (source,))
72
+ cur = self.conn.execute("SELECT id FROM sources WHERE name = ?", (source,))
73
+ row = cur.fetchone()
74
+ (sid,) = row
75
+
76
+ return sid
77
+
78
+ def new_version(self, source: str) -> tuple[int, int]:
79
+ """Create a new version for a source."""
80
+ cur = self.conn.execute("SELECT source, version FROM latest_version WHERE name = ?", (source,))
81
+ row = cur.fetchone()
82
+ if row is None:
83
+ sid = self.get_source(source)
84
+ vid = 0
85
+ else:
86
+ sid, vid = row
87
+ vid = vid + 1
88
+ self.conn.execute("INSERT INTO versions (source, version) VALUES (?, ?)", (sid, vid))
89
+ return sid, vid
90
+
91
+ def add_parse(self, source: str, sections: Iterable[Section]) -> tuple[int, int]:
92
+ """Create a new version of a source filled with parsed sections."""
93
+ sid, vid = self.new_version(source)
94
+ values = (
95
+ (sid, vid, ind, section.title, section.url, section.content, section.parent, section.type)
96
+ for ind, section in enumerate(sections)
97
+ )
98
+ self.conn.executemany(
99
+ "INSERT INTO sections "
100
+ "(source, version, section, title, url, content, parent, type) "
101
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
102
+ values,
103
+ )
104
+ return sid, vid
105
+
106
+ def new_chunking(self, sid: int, vid: int, size: int, overlap: int = 0, strategy: str = "simple") -> int:
107
+ """Create a new chunking for a source."""
108
+ self.conn.execute(
109
+ "INSERT INTO chunkings (size, overlap, strategy, source, version) VALUES (?, ?, ?, ?, ?)",
110
+ (size, overlap, strategy, sid, vid),
111
+ )
112
+ cur = self.conn.execute(
113
+ "SELECT chunking FROM chunkings "
114
+ "WHERE size = ? AND overlap = ? AND strategy = ? AND source = ? AND version = ?",
115
+ (size, overlap, strategy, sid, vid),
116
+ )
117
+ (id,) = (id for id, in cur)
118
+ return id
119
+
120
+ def add_chunking(self, sid: int, vid: int, size: int, sections: Iterable[Iterable[Chunk]]) -> int:
121
+ """Create a new chunking for a source, filled with chunks organized by section."""
122
+ cid = self.new_chunking(sid, vid, size)
123
+ chunks = ((ind, jnd, chunk) for ind, section in enumerate(sections) for jnd, chunk in enumerate(section))
124
+ values = ((sid, vid, ind, cid, jnd, chunk.content, chunk.n_tokens, chunk.emb) for ind, jnd, chunk in chunks)
125
+ self.conn.executemany(
126
+ "INSERT INTO chunks "
127
+ "(source, version, section, chunking, sequence, content, n_tokens, embedding) "
128
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
129
+ values,
130
+ )
131
+ return cid
132
+
133
+ def add(self, source: str, df: pd.DataFrame):
134
+ """Write all documents from the dataframe into the db as a new version."""
135
+ data = sorted(df.itertuples(), key=lambda chunk: (chunk.url, chunk.title))
136
+ sections = []
137
+ size = 0
138
+ for (url, title), chunks in itertools.groupby(data, lambda chunk: (chunk.url, chunk.title)):
139
+ chunks = [Chunk(chunk.content, chunk.n_tokens, chunk.embedding) for chunk in chunks]
140
+ size = max(size, max(len(chunk.content) for chunk in chunks))
141
+ content = "".join(chunk.content for chunk in chunks)
142
+ sections.append((Section(title, url, content), chunks))
143
+
144
+ sid, vid = self.add_parse(source, (section for section, _ in sections))
145
+ self.add_chunking(sid, vid, size, (chunks for _, chunks in sections))
146
+
147
+ def get_documents(self, source: str) -> pd.DataFrame:
148
+ """Get all current documents from a given source."""
149
+ # Execute the SQL statement and fetch the results
150
+ results = self.conn.execute("SELECT * FROM documents WHERE source = ?", (source,))
151
+ rows = results.fetchall()
152
+
153
+ # Convert the results to a pandas DataFrame
154
+ df = pd.DataFrame(rows, columns=[description[0] for description in results.description])
155
+ return df
buster/documents/sqlite/schema.py ADDED
@@ -0,0 +1,133 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import sqlite3
2
+ import zlib
3
+
4
+ import numpy as np
5
+
6
+ SOURCE_TABLE = r"""CREATE TABLE IF NOT EXISTS sources (
7
+ id INTEGER PRIMARY KEY AUTOINCREMENT,
8
+ name TEXT NOT NULL,
9
+ note TEXT,
10
+ UNIQUE(name)
11
+ )"""
12
+
13
+
14
+ VERSION_TABLE = r"""CREATE TABLE IF NOT EXISTS versions (
15
+ source INTEGER,
16
+ version INTEGER,
17
+ parser TEXT,
18
+ note TEXT,
19
+ PRIMARY KEY (version, source, parser)
20
+ FOREIGN KEY (source) REFERENCES sources (id)
21
+ )"""
22
+
23
+
24
+ CHUNKING_TABLE = r"""CREATE TABLE IF NOT EXISTS chunkings (
25
+ chunking INTEGER PRIMARY KEY AUTOINCREMENT,
26
+ size INTEGER,
27
+ overlap INTEGER,
28
+ strategy TEXT,
29
+ chunker TEXT,
30
+ source INTEGER,
31
+ version INTEGER,
32
+ UNIQUE (size, overlap, strategy, chunker, source, version),
33
+ FOREIGN KEY (source, version) REFERENCES versions (source, version)
34
+ )"""
35
+
36
+
37
+ SECTION_TABLE = r"""CREATE TABLE IF NOT EXISTS sections (
38
+ source INTEGER,
39
+ version INTEGER,
40
+ section INTEGER,
41
+ title TEXT NOT NULL,
42
+ url TEXT NOT NULL,
43
+ content TEXT NOT NULL,
44
+ parent INTEGER,
45
+ type TEXT,
46
+ PRIMARY KEY (version, source, section),
47
+ FOREIGN KEY (source) REFERENCES versions (source),
48
+ FOREIGN KEY (version) REFERENCES versions (version)
49
+ )"""
50
+
51
+
52
+ CHUNK_TABLE = r"""CREATE TABLE IF NOT EXISTS chunks (
53
+ source INTEGER,
54
+ version INTEGER,
55
+ section INTEGER,
56
+ chunking INTEGER,
57
+ sequence INTEGER,
58
+ content TEXT NOT NULL,
59
+ n_tokens INTEGER,
60
+ embedding VECTOR,
61
+ PRIMARY KEY (source, version, section, chunking, sequence),
62
+ FOREIGN KEY (source, version, section) REFERENCES sections (source, version, section),
63
+ FOREIGN KEY (source, version, chunking) REFERENCES chunkings (source, version, chunking)
64
+ )"""
65
+
66
+
67
+ VERSION_VIEW = r"""CREATE VIEW IF NOT EXISTS latest_version (
68
+ name, source, version) AS
69
+ SELECT sources.name, versions.source, max(versions.version)
70
+ FROM sources INNER JOIN versions on sources.id = versions.source
71
+ GROUP BY sources.id
72
+ """
73
+
74
+ CHUNKING_VIEW = r"""CREATE VIEW IF NOT EXISTS latest_chunking (
75
+ name, source, version, chunking) AS
76
+ SELECT name, source, version, max(chunking) FROM
77
+ chunkings INNER JOIN latest_version USING (source, version)
78
+ GROUP by source, version
79
+ """
80
+
81
+ DOCUMENT_VIEW = r"""CREATE VIEW IF NOT EXISTS documents (
82
+ source, title, url, content, n_tokens, embedding)
83
+ AS SELECT latest_chunking.name, sections.title, sections.url,
84
+ chunks.content, chunks.n_tokens, chunks.embedding
85
+ FROM chunks INNER JOIN sections USING (source, version, section)
86
+ INNER JOIN latest_chunking USING (source, version, chunking)
87
+ """
88
+
89
+
90
+ INIT_STATEMENTS = [
91
+ SOURCE_TABLE,
92
+ VERSION_TABLE,
93
+ CHUNKING_TABLE,
94
+ SECTION_TABLE,
95
+ CHUNK_TABLE,
96
+ VERSION_VIEW,
97
+ CHUNKING_VIEW,
98
+ DOCUMENT_VIEW,
99
+ ]
100
+
101
+
102
+ def initialize_db(connection: sqlite3.Connection):
103
+ for statement in INIT_STATEMENTS:
104
+ try:
105
+ connection.execute(statement)
106
+ except sqlite3.Error as error:
107
+ connection.rollback()
108
+ raise
109
+ connection.commit()
110
+ return connection
111
+
112
+
113
+ def adapt_vector(vector: np.ndarray) -> bytes:
114
+ return sqlite3.Binary(zlib.compress(vector.astype(np.float32).tobytes()))
115
+
116
+
117
+ def convert_vector(buffer: bytes) -> np.ndarray:
118
+ return np.frombuffer(zlib.decompress(buffer), dtype=np.float32)
119
+
120
+
121
+ def cosine_similarity(a: bytes, b: bytes) -> float:
122
+ a = convert_vector(a)
123
+ b = convert_vector(b)
124
+ a = a / np.linalg.norm(a)
125
+ b = b / np.linalg.norm(b)
126
+ dopt = 0.5 * np.dot(a, b) + 0.5
127
+ return float(dopt)
128
+
129
+
130
+ def setup_db(connection: sqlite3.Connection):
131
+ sqlite3.register_adapter(np.ndarray, adapt_vector)
132
+ sqlite3.register_converter("vector", convert_vector)
133
+ connection.create_function("sim", 2, cosine_similarity, deterministic=True)