import sys try: import pysqlite3 sys.modules["sqlite3"] = sys.modules.pop("pysqlite3") except: pass import chromadb from langchain.vectorstores import Chroma # from chromadb.api.fastapi import requests from langchain.schema import Document from langchain.chains import RetrievalQA from langchain.embeddings import HuggingFaceBgeEmbeddings from langchain.retrievers.self_query.base import SelfQueryRetriever from langchain.chains.query_constructor.base import AttributeInfo from langchain.retrievers.self_query.chroma import ChromaTranslator from llm.llmFactory import LLMFactory from datetime import datetime import baseInfra.dropbox_handler as dbh from baseInfra.dbInterface import DbInterface from uuid import UUID from langchain.text_splitter import RecursiveCharacterTextSplitter import logging, asyncio logger = logging.getLogger("root") class myChromaTranslator(ChromaTranslator): allowed_operators = ["$and", "$or"] """Subset of allowed logical operators.""" allowed_comparators = [ "$eq", "$ne", "$gt", "$gte", "$lt", "$lte", "$contains", "$not_contains", "$in", "$nin", ] class ChromaIntf: def __init__(self): self.db_interface = DbInterface() model_name = "BAAI/bge-large-en-v1.5" encode_kwargs = { "normalize_embeddings": True } # set True to compute cosine similarity self.embedding = HuggingFaceBgeEmbeddings( model_name=model_name, model_kwargs={"device": "cpu"}, encode_kwargs=encode_kwargs, ) self.persist_db_directory = "db" self.persist_docs_directory = "persistence-docs" self.logger_file = "persistence.log" loop = asyncio.get_event_loop() try: loop.run_until_complete(dbh.restoreFolder(self.persist_db_directory)) loop.run_until_complete(dbh.restoreFolder(self.persist_docs_directory)) except: print("Probably folder doesn't exist as it is brand new setup") docs = [ Document( page_content="this is test doc", metadata={ "timestamp": 1696743148.474055, "ID": "2000-01-01 15:57:11::664165-test", "source": "test", }, id="2000-01-01 15:57:11::664165-test", ), ] self.vectorstore = Chroma.from_documents( documents=docs, embedding=self.embedding, persist_directory=self.persist_db_directory, ) # self.vectorstore._client. # timestamp --> time when added # source --> notes/references/web/youtube/book/conversation, default conversation # title --> of document , will be conversation when source is conversation, default blank # author --> will default to blank # "Year": 2024, # "Month": 1, # "Day": 3, # "Hour": 11, # "Minute": 29 self.metadata_field_info = [ AttributeInfo( name="timestamp", description="Python datetime.timestamp of the document in isoformat, should not be used for query", type="str", ), AttributeInfo( name="Year", description="Year from the date when the entry was added in YYYY format", type="int", ), AttributeInfo( name="Month", description="Month from the date when the entry was added it is from 1-12", type="int", ), AttributeInfo( name="Day", description="Day of month from the date-time stamp when the entry was added, it is from 1-31", type="int", ), AttributeInfo( name="Hour", description="Hour from the timestamp when the entry was added", type="int", ), AttributeInfo( name="Minute", description="Minute from the timestamp when the entry was added", type="int", ), AttributeInfo( name="source", description="Type of entry", type="string or list[string]", ), AttributeInfo( name="title", description="Title or Subject of the entry", type="string", ), AttributeInfo( name="author", description="Author of the entry", type="string", ), ] self.document_content_description = ( "Information to store for retrival from LLM based chatbot" ) lf = LLMFactory() # self.llm=lf.get_llm("executor2") self.llm = lf.get_llm("executor3") self.retriever = SelfQueryRetriever.from_llm( self.llm, self.vectorstore, self.document_content_description, self.metadata_field_info, structured_query_translator=ChromaTranslator(), verbose=True, ) async def getRelevantDocs(self, query: str, kwargs: dict): """This should also post the result to firebase""" print("retriver state", self.retriever.search_kwargs) print("retriver state", self.retriever.search_type) try: for key in kwargs.keys(): if "search_type" in key: self.retriever.search_type = kwargs[key] else: self.retriever.search_kwargs[key] = kwargs[key] except: print("setting search args failed") print("reaching step2") try: # loop=asyncio.get_event_loop() retVal = self.retriever.get_relevant_documents(query) except Exception as ex: logger.exception("Exception occured:", exc_info=True) value = [] excludeMeta = True print("reaching step3") print(str(len(retVal))) print("reaching step4") try: for item in retVal: if excludeMeta: v = item.page_content + " \n" else: v = "Info:" + item.page_content + " " for key in item.metadata.keys(): if key != "ID": v += key + ":" + str(item.metadata[key]) + " " value.append(v) print("reaching step5") self.db_interface.add_to_cache(input=query, value=value) except: print("reaching step6") for item in retVal: if excludeMeta: v = item["page_content"] + " \n" else: v = "Info:" + item["page_content"] + " " for key in item["metadata"].keys(): if key != "ID": v += key + ":" + str(item["metadata"][key]) + " " value.append(v) print("reaching step7") self.db_interface.add_to_cache(input=query, value=value) print("reaching step8") return retVal async def addText(self, inStr: str, metadata): # metadata expected is some of following # timestamp --> time when added # source --> notes/references/web/youtube/book/conversation, default conversation # title --> of document , will be conversation when source is conversation, default blank # author --> will default to blank ##TODO: Preprocess inStr to remove any html, markdown tags etc. metadata = metadata.dict() if "timestamp" not in metadata.keys(): metadata["timestamp"] = datetime.now().isoformat() else: metadata["timestamp"] = datetime.fromisoformat(metadata["timestamp"]) pass if "source" not in metadata.keys(): metadata["source"] = "conversation" if "title" not in metadata.keys(): metadata["title"] = "" if metadata["source"] == "conversation": metadata["title"] == "conversation" if "author" not in metadata.keys(): metadata["author"] = "" # TODO: If url is present in input or when the splitting need to be done, then we'll need to change how we # formulate the ID and may be filename to store information metadata["ID"] = ( metadata["timestamp"].strftime("%Y-%m-%d %H-%M-%S") + "-" + metadata["title"] ) metadata["Year"] = metadata["timestamp"].year metadata["Month"] = metadata["timestamp"].month metadata["Day"] = int(metadata["timestamp"].strftime("%d")) metadata["Hour"] = metadata["timestamp"].hour metadata["Minute"] = metadata["timestamp"].minute metadata["timestamp"] = metadata["timestamp"].isoformat() print("Metadata is:") print(metadata) # md.pop("timestamp") with open("./docs/" + metadata["ID"] + ".txt", "w") as fd: fd.write(inStr) print("written to file", inStr) text_splitter = RecursiveCharacterTextSplitter( chunk_size=800, chunk_overlap=50, length_function=len, is_separator_regex=False, ) # docs = [ Document(page_content=inStr, metadata=metadata)] docs = text_splitter.create_documents([inStr], [metadata]) partNumber = 0 for doc in docs: if partNumber > 0: doc.metadata["ID"] += f"__{partNumber}" partNumber += 1 print(f"{partNumber} follows:") print(doc) try: print(metadata["ID"]) ids = [doc.metadata["ID"] for doc in docs] print("ids are:") print(ids) return await self.vectorstore.aadd_documents(docs, ids=ids) except Exception as ex: logger.exception("exception in adding", exc_info=True) print("inside expect of addText") return await self.vectorstore.aadd_documents(docs, ids=[metadata.ID]) async def listDocs(self): collection = self.vectorstore._client.get_collection( self.vectorstore._LANGCHAIN_DEFAULT_COLLECTION_NAME, embedding_function=self.embedding, ) return collection.get() # return self.vectorstore._client._get(collection_id=self._uuid(collectionInfo.id)) async def persist(self): self.vectorstore.persist() await dbh.backupFile(self.logger_file) await dbh.backupFolder(self.persist_db_directory) return await dbh.backupFolder(self.persist_docs_directory) def _uuid(self, uuid_str: str) -> UUID: try: return UUID(uuid_str) except ValueError: print("Error generating uuid") raise ValueError(f"Could not parse {uuid_str} as a UUID")