Spaces:
Runtime error
Runtime error
import urllib.request | |
import fitz | |
import re | |
import numpy as np | |
import tensorflow_hub as hub | |
import openai | |
import gradio as gr | |
import os | |
from sklearn.neighbors import NearestNeighbors | |
def download_pdf(url, output_path): | |
urllib.request.urlretrieve(url, output_path) | |
def preprocess(text): | |
text = text.replace('\n', ' ') | |
text = re.sub('\s+', ' ', text) | |
return text | |
def pdf_to_text(path, start_page=1, end_page=None): | |
doc = fitz.open(path) | |
total_pages = doc.page_count | |
if end_page is None: | |
end_page = total_pages | |
text_list = [] | |
for i in range(start_page - 1, end_page): | |
text = doc.load_page(i).get_text("text") | |
text = preprocess(text) | |
text_list.append(text) | |
doc.close() | |
return text_list | |
def text_to_chunks(texts, word_length=150, start_page=1): | |
text_toks = [t.split(' ') for t in texts] | |
page_nums = [] | |
chunks = [] | |
for idx, words in enumerate(text_toks): | |
for i in range(0, len(words), word_length): | |
chunk = words[i:i + word_length] | |
if (i + word_length) > len(words) and ( | |
len(chunk) < word_length) and (len(text_toks) | |
!= (idx + 1)): | |
text_toks[idx + 1] = chunk + text_toks[idx + 1] | |
continue | |
chunk = ' '.join(chunk).strip() | |
chunk = f'[Page no. {idx+start_page}]' + ' ' + '"' + chunk + '"' | |
chunks.append(chunk) | |
return chunks | |
class SemanticSearch: | |
def __init__(self): | |
self.use = hub.load( | |
"https://tfhub.dev/google/universal-sentence-encoder/4") | |
self.fitted = False | |
def fit(self, data, batch=1000, n_neighbors=5): | |
self.data = data | |
self.embeddings = self.get_text_embedding(data, batch=batch) | |
n_neighbors = min(n_neighbors, len(self.embeddings)) | |
self.nn = NearestNeighbors(n_neighbors=n_neighbors) | |
self.nn.fit(self.embeddings) | |
self.fitted = True | |
def __call__(self, text, return_data=True): | |
inp_emb = self.use([text]) | |
neighbors = self.nn.kneighbors(inp_emb, return_distance=False)[0] | |
if return_data: | |
return [self.data[i] for i in neighbors] | |
else: | |
return neighbors | |
def get_text_embedding(self, texts, batch=1000): | |
embeddings = [] | |
for i in range(0, len(texts), batch): | |
text_batch = texts[i:(i + batch)] | |
emb_batch = self.use(text_batch) | |
embeddings.append(emb_batch) | |
embeddings = np.vstack(embeddings) | |
return embeddings | |
def load_recommender(path, start_page=1): | |
global recommender | |
texts = pdf_to_text(path, start_page=start_page) | |
chunks = text_to_chunks(texts, start_page=start_page) | |
recommender.fit(chunks) | |
return 'Corpus Loaded.' | |
#################### | |
def generate_text(openAI_key, | |
openAI_base, | |
openAI_API_version, | |
prompt, | |
engine="chatgpt"): | |
openai.api_type = "azure" | |
openai.api_base = openAI_base | |
openai.api_version = openAI_API_version | |
openai.api_key = openAI_key | |
completions = openai.ChatCompletion.create(engine="chatgpt", | |
max_tokens=1024, | |
n=1, | |
stop=None, | |
temperature=1.0, | |
messages=[{ | |
"role": "user", | |
"content": prompt | |
}]) | |
print(completions) | |
message = completions['choices'][0]['message']['content'] | |
return message | |
def generate_answer(question, openAI_key, openAI_base, openAI_API_version): | |
topn_chunks = recommender(question) | |
# print(len(topn_chunks)) | |
# print(*topn_chunks, sep="\n") | |
prompt = "" | |
prompt += 'search results:\n\n' | |
for c in topn_chunks: | |
prompt += c + '\n\n' | |
# prompt += "Instructions: Compose a comprehensive reply to the query using the search results given. "\ | |
# "Cite each reference using [ Page Number] notation (every result has this number at the beginning). "\ | |
# "Citation should be done at the end of each sentence. "\ | |
# "If the search results mention multiple subjects with the same name, create separate answers for each. "\ | |
# "Make sure the answer is correct and don't output false content. "\ | |
# "Only answer what is asked. The answer should be in details."\ | |
# "\n\nQuery: {question}\nAnswer: " | |
prompt += "Instructions: Compose a comprehensive reply to the query using the search results given. "\ | |
"Cite each reference using [ Page Number] notation (every result has this number at the beginning). "\ | |
"Citation should be done at the end of each sentence. "\ | |
"If the search results mention multiple subjects with the same name, create separate answers for each. "\ | |
"Only include information found in the results and don't add any additional information."\ | |
"Make sure the answer is correct and don't output false content. "\ | |
"If the text does not relate to the query, simply state 'Found Nothing'. "\ | |
"Ignore outlier search results which has nothing to do with the question."\ | |
"Only answer what is asked. The answer should be short and concise."\ | |
"\n\nQuery: {question}\nAnswer: " | |
prompt += f"Query: {question}\nAnswer:" | |
print(prompt) | |
answer = generate_text(openAI_key, openAI_base, openAI_API_version, prompt, | |
"chatgpt") | |
return answer | |
def question_answer(url, file, question, openAI_key, openAI_base, | |
openAI_API_version): | |
if openAI_key.strip() == '': | |
return '[ERROR]: Please enter you Open AI Key. Get your key here : https://platform.openai.com/account/api-keys' | |
if url.strip() == '' and file == None: | |
return '[ERROR]: Both URL and PDF is empty. Provide atleast one.' | |
if url.strip() != '' and file != None: | |
return '[ERROR]: Both URL and PDF is provided. Please provide only one (eiter URL or PDF).' | |
if url.strip() != '': | |
glob_url = url | |
download_pdf(glob_url, 'corpus.pdf') | |
load_recommender('corpus.pdf') | |
else: | |
old_file_name = file.name | |
file_name = file.name | |
file_name = file_name[:-12] + file_name[-4:] | |
os.rename(old_file_name, file_name) | |
load_recommender(file_name) | |
if question.strip() == '': | |
return '[ERROR]: Question field is empty' | |
return generate_answer(question, openAI_key, openAI_base, | |
openAI_API_version) | |
def chatbot_respond(question, chat_history, url, file, openAI_key, openAI_base, | |
openAI_API_version): | |
bot_message = question_answer(url, file, question, openAI_key, openAI_base, | |
openAI_API_version) | |
chat_history.append((message, bot_message)) | |
return "", chat_history | |
recommender = SemanticSearch() | |
title = 'PDF GPT Azure' | |
description = """ | |
PDF GPT allows you to chat with your PDF file using Universal Sentence Encoder and Open AI. | |
It gives hallucination free response than other tools as the embeddings are better than OpenAI. | |
The returned response can even cite the page number in square brackets([]) where the information is located, | |
adding credibility to the responses and helping to locate pertinent information quickly. | |
""" | |
with gr.Blocks() as demo: | |
gr.Markdown(f'<center><h1>{title}</h1></center>') | |
gr.Markdown(description) | |
with gr.Row(): | |
with gr.Group(): | |
gr.Markdown( | |
f'<p style="text-align:center">Get your Open AI API key <a href="https://platform.openai.com/account/api-keys">here</a></p>' | |
) | |
gr.Dropdown(label="API Type", | |
choices=["azure", "OpenAI"], | |
info="Azure or Open AI", | |
value="azure"), | |
##################### | |
## | |
## REMEMBER to remove the key before public deploy | |
## | |
##################### | |
openAI_key = gr.Textbox( | |
label='Enter your Azure OpenAI API key here') | |
openAI_base = gr.Textbox(label='api_base', | |
value="https://api.hku.hk") | |
openAI_API_version = gr.Textbox(label='API version', | |
value="2023-03-15-preview") | |
url = gr.Textbox(label='Enter PDF URL here') | |
gr.Markdown("<center><h4>OR<h4></center>") | |
file = gr.File(label='Upload your PDF/ Research Paper / Book here', | |
file_types=['.pdf']) | |
with gr.Group(): | |
chatbot = gr.Chatbot() | |
question = gr.Textbox() | |
clear = gr.ClearButton([question, chatbot]) | |
question.submit( | |
chatbot_respond, | |
inputs=[ | |
question, chatbot, url, file, openAI_key, openAI_base, | |
openAI_API_version | |
], | |
outputs=[question, chatbot], | |
) | |
demo.launch() |