File size: 4,468 Bytes
99b7cde 4a49d79 99b7cde 324b092 99b7cde 324b092 99b7cde adb96e6 99b7cde cb31088 477f1cf 324b092 adb96e6 6c98e1c 477f1cf 324b092 2acbc36 324b092 4a49d79 3a5a4c6 4a49d79 324b092 3a5a4c6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 |
from tempfile import NamedTemporaryFile
from typing import List
import chainlit as cl
from chainlit.types import AskFileResponse
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.schema import Document, StrOutputParser
from langchain.chains import LLMChain
from langchain.document_loaders import PDFPlumberLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
def process_file(*, file: AskFileResponse) -> List[Document]:
"""Processes one PDF file from a Chainlit AskFileResponse object by first
loading the PDF document and then chunk it into sub documents. Only
supports PDF files.
Args:
file (AskFileResponse): input file to be processed
Raises:
ValueError: when we fail to process PDF files. We consider PDF file
processing failure when there's no text returned. For example, PDFs
with only image contents, corrupted PDFs, etc.
Returns:
List[Document]: List of Document(s). Each individual document has two
fields: page_content(string) and metadata(dict).
"""
if file.type != "application/pdf":
raise TypeError("Only PDF files are supported")
with NamedTemporaryFile() as tempfile:
tempfile.write(file.content)
loader = PDFPlumberLoader(tempfile.name)
documents = loader.load()
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=3000,
chunk_overlap=100
)
docs = text_splitter.split_documents(documents)
# We are adding source_id into the metadata here to denote which
# source document it is.
for i, doc in enumerate(docs):
doc.metadata["source"] = f"source_{i}"
if not docs:
raise ValueError("PDF file parsing failed.")
return docs
def create_search_engine(*, file: AskFileResponse) -> VectorStore:
# Process and save data in the user session
docs = process_file(file=file)
cl.user_session.set("docs", docs)
encoder = OpenAIEmbeddings(
model="text-embedding-ada-002"
)
# Initialize Chromadb client and settings, reset to ensure we get a clean
# search engine
client = chromadb.EphemeralClient()
client_settings=Settings(
allow_reset=True,
anonymized_telemetry=False
)
search_engine = Chroma(
client=client,
client_settings=client_settings
)
search_engine._client.reset()
search_engine = Chroma.from_documents(
client=client,
documents=docs,
embedding=encoder,
client_settings=client_settings
)
return search_engine
@cl.on_chat_start
async def on_chat_start():
"""This function is written to prepare the environments for the chat
with PDF application. It should be decorated with cl.on_chat_start.
Returns:
None
"""
files = None
while files is None:
files = await cl.AskFileMessage(
content="Please Upload the PDF file you want to chat with...",
accept=["application/pdf"],
max_size_mb=20,
).send()
file = files[0]
# Send message to user to let them know we are processing the file
msg = cl.Message(content=f"Processing `{file.name}`...")
await msg.send()
try:
search_engine = await cl.make_async(create_search_engine)(file=file)
except Exception as e:
await cl.Message(content=f"Error: {e}").send()
raise SystemError
model = ChatOpenAI(
model="gpt-3.5-turbo-16k-0613",
streaming=True
)
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are Chainlit GPT, a helpful assistant.",
),
(
"human",
"{question}"
),
]
)
chain = LLMChain(llm=model, prompt=prompt, output_parser=StrOutputParser())
# We are saving the chain in user_session, so we do not have to rebuild
# it every single time.
cl.user_session.set("chain", chain)
@cl.on_message
async def main(message: cl.Message):
# Let's load the chain from user_session
chain = cl.user_session.get("chain") # type: LLMChain
response = await chain.arun(
question=message.content, callbacks=[cl.LangchainCallbackHandler()]
)
await cl.Message(content=response).send()
|