Sean-Case
Added basic semantic search functionality
78d71d4
raw
history blame
13.7 kB
# ---
# jupyter:
# jupytext:
# formats: ipynb,py:light
# text_representation:
# extension: .py
# format_name: light
# format_version: '1.5'
# jupytext_version: 1.14.6
# kernelspec:
# display_name: Python 3 (ipykernel)
# language: python
# name: python3
# ---
# # Ingest website to FAISS
# ## Install/ import stuff we need
import os
from pathlib import Path
import re
import pandas as pd
from typing import TypeVar, List
#from langchain.embeddings import HuggingFaceEmbeddings # HuggingFaceInstructEmbeddings,
from langchain.vectorstores.faiss import FAISS
from langchain.vectorstores import Chroma
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.docstore.document import Document
#from bs4 import BeautifulSoup
#from docx import Document as Doc
#from pypdf import PdfReader
PandasDataFrame = TypeVar('pd.core.frame.DataFrame')
# -
split_strat = ["\n\n", "\n", ". ", "! ", "? "]
chunk_size = 500
chunk_overlap = 0
start_index = True
## Parse files
def determine_file_type(file_path):
"""
Determine the file type based on its extension.
Parameters:
file_path (str): Path to the file.
Returns:
str: File extension (e.g., '.pdf', '.docx', '.txt', '.html').
"""
return os.path.splitext(file_path)[1].lower()
def parse_file(file_paths, text_column='text'):
"""
Accepts a list of file paths, determines each file's type based on its extension,
and passes it to the relevant parsing function.
Parameters:
file_paths (list): List of file paths.
text_column (str): Name of the column in CSV/Excel files that contains the text content.
Returns:
dict: A dictionary with file paths as keys and their parsed content (or error message) as values.
"""
if not isinstance(file_paths, list):
raise ValueError("Expected a list of file paths.")
extension_to_parser = {
# '.pdf': parse_pdf,
# '.docx': parse_docx,
# '.txt': parse_txt,
# '.html': parse_html,
# '.htm': parse_html, # Considering both .html and .htm for HTML files
'.csv': lambda file_path: parse_csv_or_excel(file_path, text_column),
'.xlsx': lambda file_path: parse_csv_or_excel(file_path, text_column)
}
parsed_contents = {}
file_names = []
for file_path in file_paths:
print(file_path.name)
#file = open(file_path.name, 'r')
#print(file)
file_extension = determine_file_type(file_path.name)
if file_extension in extension_to_parser:
parsed_contents[file_path.name] = extension_to_parser[file_extension](file_path.name)
else:
parsed_contents[file_path.name] = f"Unsupported file type: {file_extension}"
filename_end = get_file_path_end(file_path.name)
file_names.append(filename_end)
return parsed_contents, file_names
def text_regex_clean(text):
# Merge hyphenated words
text = re.sub(r"(\w+)-\n(\w+)", r"\1\2", text)
# If a double newline ends in a letter, add a full stop.
text = re.sub(r'(?<=[a-zA-Z])\n\n', '.\n\n', text)
# Fix newlines in the middle of sentences
text = re.sub(r"(?<!\n\s)\n(?!\s\n)", " ", text.strip())
# Remove multiple newlines
text = re.sub(r"\n\s*\n", "\n\n", text)
text = re.sub(r" ", " ", text)
# Add full stops and new lines between words with no space between where the second one has a capital letter
text = re.sub(r'(?<=[a-z])(?=[A-Z])', '. \n\n', text)
return text
def parse_csv_or_excel(file_path, text_column = "text"):
"""
Read in a CSV or Excel file.
Parameters:
file_path (str): Path to the CSV file.
text_column (str): Name of the column in the CSV file that contains the text content.
Returns:
Pandas DataFrame: Dataframe output from file read
"""
#out_df = pd.DataFrame()
#for file_path in file_paths:
file_extension = determine_file_type(file_path.name)
file_name = get_file_path_end(file_path.name)
file_names = [file_name]
if file_extension == ".csv":
df = pd.read_csv(file_path.name, low_memory=False)
if text_column not in df.columns: return pd.DataFrame(), ['Please choose a valid column name']
df['source'] = file_name
df['page_section'] = ""
elif file_extension == ".xlsx":
df = pd.read_excel(file_path.name, engine='openpyxl')
if text_column not in df.columns: return pd.DataFrame(), ['Please choose a valid column name']
df['source'] = file_name
df['page_section'] = ""
else:
print(f"Unsupported file type: {file_extension}")
return pd.DataFrame(), ['Please choose a valid file type']
# file_names.append(file_name)
# out_df = pd.concat([out_df, df])
#if text_column not in df.columns:
# return f"Column '{text_column}' not found in {file_path}"
#text_out = " ".join(df[text_column].dropna().astype(str))
return df, file_names
def parse_excel(file_path, text_column):
"""
Read text from an Excel file.
Parameters:
file_path (str): Path to the Excel file.
text_column (str): Name of the column in the Excel file that contains the text content.
Returns:
Pandas DataFrame: Dataframe output from file read
"""
df = pd.read_excel(file_path, engine='openpyxl')
#if text_column not in df.columns:
# return f"Column '{text_column}' not found in {file_path}"
#text_out = " ".join(df[text_column].dropna().astype(str))
return df
def get_file_path_end(file_path):
match = re.search(r'(.*[\/\\])?(.+)$', file_path)
filename_end = match.group(2) if match else ''
return filename_end
# +
# Convert parsed text to docs
# -
def text_to_docs(text_dict: dict, chunk_size: int = chunk_size) -> List[Document]:
"""
Converts the output of parse_file (a dictionary of file paths to content)
to a list of Documents with metadata.
"""
doc_sections = []
parent_doc_sections = []
for file_path, content in text_dict.items():
ext = os.path.splitext(file_path)[1].lower()
# Depending on the file extension, handle the content
# if ext == '.pdf':
# docs, page_docs = pdf_text_to_docs(content, chunk_size)
# elif ext in ['.html', '.htm', '.txt', '.docx']:
# docs = html_text_to_docs(content, chunk_size)
if ext in ['.csv', '.xlsx']:
docs, page_docs = csv_excel_text_to_docs(content, chunk_size)
else:
print(f"Unsupported file type {ext} for {file_path}. Skipping.")
continue
filename_end = get_file_path_end(file_path)
#match = re.search(r'(.*[\/\\])?(.+)$', file_path)
#filename_end = match.group(2) if match else ''
# Add filename as metadata
for doc in docs: doc.metadata["source"] = filename_end
#for parent_doc in parent_docs: parent_doc.metadata["source"] = filename_end
doc_sections.extend(docs)
#parent_doc_sections.extend(parent_docs)
return doc_sections#, page_docs
def write_out_metadata_as_string(metadata_in):
# If metadata_in is a single dictionary, wrap it in a list
if isinstance(metadata_in, dict):
metadata_in = [metadata_in]
metadata_string = [f"{' '.join(f'{k}: {v}' for k, v in d.items() if k != 'page_section')}" for d in metadata_in] # ['metadata']
return metadata_string
def csv_excel_text_to_docs(df, text_column='text', chunk_size=None) -> List[Document]:
"""Converts a DataFrame's content to a list of Documents with metadata."""
doc_sections = []
df[text_column] = df[text_column].astype(str) # Ensure column is a string column
# For each row in the dataframe
for idx, row in df.iterrows():
# Extract the text content for the document
doc_content = row[text_column]
# Generate metadata containing other columns' data
metadata = {"row": idx + 1}
for col, value in row.items():
if col != text_column:
metadata[col] = value
metadata_string = write_out_metadata_as_string(metadata)[0]
# If chunk_size is provided, split the text into chunks
if chunk_size:
# Assuming you have a text splitter function similar to the PDF handling
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
# Other arguments as required by the splitter
)
sections = text_splitter.split_text(doc_content)
# For each section, create a Document object
for i, section in enumerate(sections):
#section = '. '.join([metadata_string, section])
doc = Document(page_content=section,
metadata={**metadata, "section": i, "row_section": f"{metadata['row']}-{i}"})
doc_sections.append(doc)
else:
# If no chunk_size is provided, create a single Document object for the row
#doc_content = '. '.join([metadata_string, doc_content])
doc = Document(page_content=doc_content, metadata=metadata)
doc_sections.append(doc)
return doc_sections
# # Functions for working with documents after loading them back in
def pull_out_data(series):
# define a lambda function to convert each string into a tuple
to_tuple = lambda x: eval(x)
# apply the lambda function to each element of the series
series_tup = series.apply(to_tuple)
series_tup_content = list(zip(*series_tup))[1]
series = pd.Series(list(series_tup_content))#.str.replace("^Main post content", "", regex=True).str.strip()
return series
def docs_from_csv(df):
import ast
documents = []
page_content = pull_out_data(df["0"])
metadatas = pull_out_data(df["1"])
for x in range(0,len(df)):
new_doc = Document(page_content=page_content[x], metadata=metadatas[x])
documents.append(new_doc)
return documents
def docs_from_lists(docs, metadatas):
documents = []
for x, doc in enumerate(docs):
new_doc = Document(page_content=doc, metadata=metadatas[x])
documents.append(new_doc)
return documents
def docs_elements_from_csv_save(docs_path="documents.csv"):
documents = pd.read_csv(docs_path)
docs_out = docs_from_csv(documents)
out_df = pd.DataFrame(docs_out)
docs_content = pull_out_data(out_df[0].astype(str))
docs_meta = pull_out_data(out_df[1].astype(str))
doc_sources = [d['source'] for d in docs_meta]
return out_df, docs_content, docs_meta, doc_sources
# ## Create embeddings and save faiss vector store to the path specified in `save_to`
def load_embeddings(model_name = "BAAI/bge-base-en-v1.5"):
#if model_name == "hkunlp/instructor-large":
# embeddings_func = HuggingFaceInstructEmbeddings(model_name=model_name,
# embed_instruction="Represent the paragraph for retrieval: ",
# query_instruction="Represent the question for retrieving supporting documents: "
# )
#else:
embeddings_func = HuggingFaceEmbeddings(model_name=model_name)
global embeddings
embeddings = embeddings_func
return embeddings_func
def embed_faiss_save_to_zip(docs_out, save_to="faiss_lambeth_census_embedding", model_name = "BAAI/bge-base-en-v1.5"):
load_embeddings(model_name=model_name)
#embeddings_fast = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
print(f"> Total split documents: {len(docs_out)}")
vectorstore = FAISS.from_documents(documents=docs_out, embedding=embeddings)
if Path(save_to).exists():
vectorstore.save_local(folder_path=save_to)
print("> DONE")
print(f"> Saved to: {save_to}")
### Save as zip, then remove faiss/pkl files to allow for upload to huggingface
import shutil
shutil.make_archive(save_to, 'zip', save_to)
os.remove(save_to + "/index.faiss")
os.remove(save_to + "/index.pkl")
shutil.move(save_to + '.zip', save_to + "/" + save_to + '.zip')
return vectorstore
def docs_to_chroma_save(embeddings, docs_out:PandasDataFrame, save_to:str):
print(f"> Total split documents: {len(docs_out)}")
vectordb = Chroma.from_documents(documents=docs_out,
embedding=embeddings,
persist_directory=save_to)
# persiste the db to disk
vectordb.persist()
print("> DONE")
print(f"> Saved to: {save_to}")
return vectordb
def sim_search_local_saved_vec(query, k_val, save_to="faiss_lambeth_census_embedding"):
load_embeddings()
docsearch = FAISS.load_local(folder_path=save_to, embeddings=embeddings)
display(Markdown(question))
search = docsearch.similarity_search_with_score(query, k=k_val)
for item in search:
print(item[0].page_content)
print(f"Page: {item[0].metadata['source']}")
print(f"Date: {item[0].metadata['date']}")
print(f"Score: {item[1]}")
print("---")