Spaces:
Running
Running
import weaviate | |
from sentence_transformers import SentenceTransformer | |
from langchain_community.document_loaders import BSHTMLLoader | |
from pathlib import Path | |
from lxml import html | |
import logging | |
from semantic_text_splitter import HuggingFaceTextSplitter | |
from tokenizers import Tokenizer | |
import json | |
import os | |
import re | |
import logging | |
import llama_cpp | |
from llama_cpp import Llama | |
import ipywidgets as widgets | |
import time | |
from IPython.display import display, clear_output | |
weaviate_logger = logging.getLogger("httpx") | |
weaviate_logger.setLevel(logging.WARNING) | |
logger = logging.getLogger(__name__) | |
logging.basicConfig(level=logging.INFO) | |
################################################################# | |
# Connect to Weaviate vector database. | |
################################################################# | |
client = "" | |
def connectToWeaviateDB(): | |
###################################################### | |
# Connect to the Weaviate vector database. | |
logger.info("#### Create Weaviate db client connection.") | |
client = weaviate.connect_to_custom( | |
http_host="127.0.0.1", | |
http_port=8080, | |
http_secure=False, | |
grpc_host="127.0.0.1", | |
grpc_port=50051, | |
grpc_secure=False | |
) | |
client.connect() | |
####################################################### | |
# Read each text input file, parse it into a document, | |
# chunk it, collect chunks and document name. | |
####################################################### | |
webpageDocNames = [] | |
page_contentArray = [] | |
webpageTitles = [] | |
webpageChunks = [] | |
webpageChunksDocNames = [] | |
def readParseChunkFiles(): | |
logger.info("#### Read and chunk input text files.") | |
for filename in os.listdir(pathString): | |
logger.info(filename) | |
path = Path(pathString + "/" + filename) | |
filename = filename.rstrip(".html") | |
webpageDocNames.append(filename) | |
htmlLoader = BSHTMLLoader(path,"utf-8") | |
htmlData = htmlLoader.load() | |
title = htmlData[0].metadata['title'] | |
page_content = htmlData[0].page_content | |
# Clean data. Remove multiple newlines, etc. | |
page_content = re.sub(r'\n+', '\n',page_content) | |
page_contentArray.append(page_content); | |
webpageTitles.append (title) | |
max_tokens = 1000 | |
tokenizer = Tokenizer.from_pretrained("bert-base-uncased") | |
logger.debug(f"### tokenizer: {tokenizer}") | |
splitter = HuggingFaceTextSplitter(tokenizer, trim_chunks=True) | |
chunksOnePage = splitter.chunks(page_content, chunk_capacity=50) | |
chunks = [] | |
for chnk in chunksOnePage: | |
logger.debug(f"#### chnk in file: {chnk}") | |
chunks.append(chnk) | |
logger.debug(f"chunks: {chunks}") | |
webpageChunks.append(chunks) | |
webpageChunksDocNames.append(filename + "Chunks") | |
logger.debug(f"### filename, title: {filename}, {title}") | |
logger.debug(f"### webpageDocNames: {webpageDocNames}") | |
################################################################# | |
# Create the chunks collection for the Weaviate database. | |
################################################################# | |
def createChunksCollection(): | |
logger.info("#### createChunksCollection() entered.") | |
if client.collections.exists("Chunks"): | |
client.collections.delete("Chunks") | |
class_obj = { | |
"class": "Chunks", | |
"description": "Collection for document chunks.", | |
"vectorizer": "text2vec-transformers", | |
"moduleConfig": { | |
"text2vec-transformers": { | |
"vectorizeClassName": True | |
} | |
}, | |
"vectorIndexType": "hnsw", | |
"vectorIndexConfig": { | |
"distance": "cosine", | |
}, | |
"properties": [ | |
{ | |
"name": "chunk", | |
"dataType": ["text"], | |
"description": "Single webpage chunk.", | |
"vectorizer": "text2vec-transformers", | |
"moduleConfig": { | |
"text2vec-transformers": { | |
"vectorizePropertyName": False, | |
"skip": False, | |
"tokenization": "lowercase" | |
} | |
} | |
}, | |
{ | |
"name": "chunk_index", | |
"dataType": ["int"] | |
}, | |
{ | |
"name": "webpage", | |
"dataType": ["Documents"], | |
"description": "Webpage content chunks.", | |
"invertedIndexConfig": { | |
"bm25": { | |
"b": 0.75, | |
"k1": 1.2 | |
} | |
} | |
} | |
] | |
} | |
return(client.collections.create_from_dict(class_obj)) | |
##################################################################### | |
# Create the document collection for the Weaviate database. | |
##################################################################### | |
def createWebpageCollection(): | |
logger.info("#### createWebpageCollection() entered.") | |
if client.collections.exists("Documents"): | |
client.collections.delete("Documents") | |
class_obj = { | |
"class": "Documents", | |
"description": "For first attempt at loading a Weviate database.", | |
"vectorizer": "text2vec-transformers", | |
"moduleConfig": { | |
"text2vec-transformers": { | |
"vectorizeClassName": False | |
} | |
}, | |
"vectorIndexType": "hnsw", | |
"vectorIndexConfig": { | |
"distance": "cosine", | |
}, | |
"properties": [ | |
{ | |
"name": "title", | |
"dataType": ["text"], | |
"description": "HTML doc title.", | |
"vectorizer": "text2vec-transformers", | |
"moduleConfig": { | |
"text2vec-transformers": { | |
"vectorizePropertyName": True, | |
"skip": False, | |
"tokenization": "lowercase" | |
} | |
}, | |
"invertedIndexConfig": { | |
"bm25": { | |
"b": 0.75, | |
"k1": 1.2 | |
}, | |
} | |
}, | |
{ | |
"name": "content", | |
"dataType": ["text"], | |
"description": "HTML page content.", | |
"moduleConfig": { | |
"text2vec-transformers": { | |
"vectorizePropertyName": True, | |
"tokenization": "whitespace" | |
} | |
} | |
} | |
] | |
} | |
return(client.collections.create_from_dict(class_obj)) | |
################################################################# | |
# Create document and chunk objects in database. | |
################################################################# | |
def createDatabaseObjects(): | |
logger.info("#### Create page/doc and chunk db objects.") | |
for i, className in enumerate(webpageDocNames): | |
title = webpageTitles[i] | |
logger.debug(f"## className, title: {className}, {title}") | |
# Create Webpage Object | |
page_content = page_contentArray[i] | |
# Insert the document. | |
wpCollectionObj_uuid = wpCollection.data.insert( | |
{ | |
"name": className, | |
"title": title, | |
"content": page_content | |
} | |
) | |
# Insert the chunks for the document. | |
for i2, chunk in enumerate(webpageChunks[i]): | |
chunk_uuid = wpChunkCollection.data.insert( | |
{ | |
"title": title, | |
"chunk": chunk, | |
"chunk_index": i2, | |
"references": | |
{ | |
"webpage": wpCollectionObj_uuid | |
} | |
} | |
) | |
################################################################# | |
# Create display widgets. | |
################################################################# | |
output_widget = "" | |
systemTextArea = "" | |
userTextArea = "" | |
ragPromptTextArea = "" | |
responseTextArea = "" | |
selectRag = "" | |
submitButton = "" | |
def createWidgets(): | |
output_widget = widgets.Output() | |
with output_widget: | |
print("### Create widgets entered.") | |
systemTextArea = widgets.Textarea( | |
value='', | |
placeholder='Enter System Prompt.', | |
description='Sys Prompt: ', | |
disabled=False, | |
layout=widgets.Layout(width='300px', height='80px') | |
) | |
userTextArea = widgets.Textarea( | |
value='', | |
placeholder='Enter User Prompt.', | |
description='User Prompt: ', | |
disabled=False, | |
layout=widgets.Layout(width='435px', height='110px') | |
) | |
ragPromptTextArea = widgets.Textarea( | |
value='', | |
placeholder='App generated prompt with RAG information.', | |
description='RAG Prompt: ', | |
disabled=False, | |
layout=widgets.Layout(width='580px', height='180px') | |
) | |
responseTextArea = widgets.Textarea( | |
value='', | |
placeholder='LLM generated response.', | |
description='LLM Resp: ', | |
disabled=False, | |
layout=widgets.Layout(width='780px', height='200px') | |
) | |
selectRag = widgets.Checkbox( | |
value=False, | |
description='Use RAG', | |
disabled=False | |
) | |
submitButton = widgets.Button( | |
description='Run Model.', | |
disabled=False, | |
button_style='', # 'success', 'info', 'warning', 'danger' or '' | |
tooltip='Click', | |
icon='check' # (FontAwesome names without the `fa-` prefix) | |
) | |
###################################################################### | |
# MAINLINE | |
###################################################################### | |
logger.info("#### MAINLINE ENTERED.") | |
#pathString = "/Users/660565/KPSAllInOne/ProgramFilesX86/WebCopy/DownloadedWebSites/LLMPOC_HTML" | |
pathString = "/app/inputDocs" | |
chunks = [] | |
webpageDocNames = [] | |
page_contentArray = [] | |
webpageChunks = [] | |
webpageTitles = [] | |
webpageChunksDocNames = [] | |
#connectToWeaviateDB() | |
logger.info("#### Create Weaviate db client connection.") | |
client = weaviate.connect_to_custom( | |
http_host="127.0.0.1", | |
http_port=8080, | |
http_secure=False, | |
grpc_host="127.0.0.1", | |
grpc_port=50051, | |
grpc_secure=False | |
) | |
client.connect() | |
readParseChunkFiles() | |
wpCollection = createWebpageCollection() | |
wpChunkCollection = createChunksCollection() | |
#createDatabaseObjects() | |
logger.info("#### Create page/doc and chunk db objects.") | |
for i, className in enumerate(webpageDocNames): | |
title = webpageTitles[i] | |
logger.debug(f"## className, title: {className}, {title}") | |
# Create Webpage Object | |
page_content = page_contentArray[i] | |
# Insert the document. | |
wpCollectionObj_uuid = wpCollection.data.insert( | |
{ | |
"name": className, | |
"title": title, | |
"content": page_content | |
} | |
) | |
# Insert the chunks for the document. | |
for i2, chunk in enumerate(webpageChunks[i]): | |
chunk_uuid = wpChunkCollection.data.insert( | |
{ | |
"title": title, | |
"chunk": chunk, | |
"chunk_index": i2, | |
"references": | |
{ | |
"webpage": wpCollectionObj_uuid | |
} | |
} | |
) | |
############################################################################### | |
# text contains prompt for vector DB. | |
text = "human-made computer cognitive ability" | |
############################################################################### | |
# Initial the the sentence transformer and encode the query prompt. | |
logger.info(f"#### Encode text query prompt to create vectors. {text}") | |
model = SentenceTransformer('/app/multi-qa-MiniLM-L6-cos-v1') | |
vector = model.encode(text) | |
vectorList = [] | |
logger.debug("#### Print vectors.") | |
for vec in vector: | |
vectorList.append(vec) | |
logger.debug(f"vectorList: {vectorList[2]}") | |
# Fetch chunks and print chunks. | |
logger.info("#### Retrieve semchunks from db using vectors from prompt.") | |
semChunks = wpChunkCollection.query.near_vector( | |
near_vector=vectorList, | |
distance=0.7, | |
limit=3 | |
) | |
logger.debug(f"### semChunks[0]: {semChunks}") | |
# Print chunks, corresponding document and document title. | |
logger.info("#### Print individual retrieved chunks.") | |
for chunk in enumerate(semChunks.objects): | |
logger.info(f"#### chunk: {chunk}") | |
webpage_uuid = chunk[1].properties['references']['webpage'] | |
logger.info(f"webpage_uuid: {webpage_uuid}") | |
wpFromChunk = wpCollection.query.fetch_object_by_id(webpage_uuid) | |
logger.info(f"### wpFromChunk title: {wpFromChunk.properties['title']}") | |
logger.info("#### Closing client db connection.") | |
client.close() | |
logger.info("#### Program terminating.") | |