HereChatBackend / app.py
Tonic's picture
Update app.py
b95c00f
import weaviate
import langchain
import apscheduler
import gradio as gr
from langchain.embeddings import CohereEmbeddings
from langchain.document_loaders import UnstructuredFileLoader
from langchain.vectorstores import Weaviate
from langchain.llms import OpenAI
from langchain.chains import RetrievalQA
import os
import urllib.request
import ssl
import bitsandbytes
import tempfile
import mimetypes
from dotenv import load_dotenv
import cohere
from apscheduler.schedulers.background import BackgroundScheduler
import time
# Load environment variables
load_dotenv()
openai_api_key = os.getenv('OPENAI')
cohere_api_key = os.getenv('COHERE')
weaviate_api_key = os.getenv('WEAVIATE')
weaviate_url = os.getenv('WEAVIATE_URL')
weaviate_username = os.getenv('WEAVIATE_USERNAME')
weaviate_password = os.getenv('WEAVIATE_PASSWORD')
# Function to refresh authentication
def refresh_authentication():
global my_credentials, client
my_credentials = weaviate.auth.AuthClientPassword(username=weaviate_username, password=weaviate_password)
client = weaviate.Client(weaviate_url, auth_client_secret=my_credentials)
# Initialize the scheduler for authentication refresh
scheduler = BackgroundScheduler()
scheduler.add_job(refresh_authentication, 'interval', minutes=30)
scheduler.start()
# Initial authentication
refresh_authentication()
Article = {
"class": "Article",
"description": "A class representing articles in the application",
"properties": [
{
"name": "title",
"description": "The title of the article",
"dataType": ["text"]
},
{
"name": "content",
"description": "The content of the article",
"dataType": ["text"]
},
{
"name": "author",
"description": "The author of the article",
"dataType": ["text"]
},
{
"name": "publishDate",
"description": "The date the article was published",
"dataType": ["date"]
}
],
# "vectorIndexType": "hnsw",
# "vectorizer": "text2vec-contextionary"
}
# Function to check if a class exists in the schema
def class_exists(class_name):
try:
existing_schema = client.schema.get()
existing_classes = [cls["class"] for cls in existing_schema["classes"]]
return class_name in existing_classes
except Exception as e:
print(f"Error checking if class exists: {e}")
return False
# Check if 'Article' class already exists
if not class_exists("Article"):
# Create the schema if 'Article' class does not exist
try:
client.schema.create(schema)
except Exception as e:
print(f"Error creating schema: {e}")
else:
print("Class 'Article' already exists in the schema.")
# Initialize the schema
schema = {
"classes": [Article]
}
# Check if 'Article' class already exists
if not class_exists("Article"):
# Create the schema if 'Article' class does not exist
try:
client.schema.create(schema)
except Exception as e:
print(f"Error creating schema: {e}")
else:
# Retrieve the existing schema if 'Article' class exists
try:
existing_schema = client.schema.get()
print("Existing schema retrieved:", existing_schema)
except Exception as e:
print(f"Error retrieving existing schema: {e}")
# Initialize vectorstore
vectorstore = Weaviate(client, index_name="HereChat", text_key="text")
vectorstore._query_attrs = ["text", "title", "url", "views", "lang", "_additional {distance}"]
vectorstore.embedding = CohereEmbeddings(model="embed-multilingual-v2.0", cohere_api_key=cohere_api_key)
# Initialize Cohere client
co = cohere.Client(api_key=cohere_api_key)
def embed_pdf(file, filename, collection_name, file_type):
# Check the file type and handle accordingly
if file_type == "URL":
# Download the file from the URL
try:
context = ssl._create_unverified_context()
with urllib.request.urlopen(file, context=context) as response, open(filename, 'wb') as out_file:
data = response.read()
out_file.write(data)
file_path = filename
except Exception as e:
return {"error": f"Error downloading file from URL: {e}"}
elif file_type == "Binary":
# Handle binary file
if isinstance(file, str):
# Convert string to bytes if necessary
file = file.encode()
file_content = file
file_path = os.path.join('./', filename)
with open(file_path, 'wb') as f:
f.write(file)
else:
return {"error": "Invalid file type"}
# Checking filetype for document parsing
mime_type = mimetypes.guess_type(file_path)[0]
loader = UnstructuredFileLoader(file_path)
docs = loader.load()
# Generate embeddings and store documents in Weaviate
for doc in docs:
embedding = vectorstore.embedding.embed([doc['text']])
weaviate_document = {
"text": doc['text'],
"embedding": embedding
}
client.data_object.create(data_object=weaviate_document, class_name=collection_name)
return {"message": f"Documents embedded in Weaviate collection '{collection_name}'"}
def retrieve_info(query):
llm = OpenAI(temperature=0, openai_api_key=openai_api_key)
qa = RetrievalQA.from_chain_type(llm, retriever=vectorstore.as_retriever())
# Retrieve initial results
initial_results = qa({"query": query})
# Assuming initial_results are in the desired format, extract the top documents
top_docs = initial_results[:25] # Adjust this if your result format is different
# Rerank the top results
reranked_results = co.rerank(query=query, documents=top_docs, top_n=3, model='rerank-english-v2.0')
# Format the reranked results according to the Article schema
formatted_results = []
for idx, r in enumerate(reranked_results):
formatted_result = {
"Document Rank": idx + 1,
"Title": r.document['title'],
"Content": r.document['content'],
"Author": r.document['author'],
"Publish Date": r.document['publishDate'],
"Relevance Score": f"{r.relevance_score:.2f}"
}
formatted_results.append(formatted_result)
return {"results": formatted_results}
# Format the reranked results and append to user prompt
user_prompt = f"User: {query}\n"
for idx, r in enumerate(reranked_results):
user_prompt += f"Document {idx + 1}: {r.document['text']}\nRelevance Score: {r.relevance_score:.2f}\n\n"
# Final API call to OpenAI
final_response = client.chat.completions.create(
model="gpt-4-1106-preview",
messages=[
{
"role": "system",
"content": "You are a redditor. Assess, rephrase, and explain the following. Provide long answers. Use the same words and language you receive."
},
{
"role": "user",
"content": user_prompt
}
],
temperature=1.63,
max_tokens=2240,
top_p=1,
frequency_penalty=1.73,
presence_penalty=1.76
)
return final_response.choices[0].text
def combined_interface(query, file, collection_name):
if query:
article_info = retrieve_info(query)
return article_info
elif file is not None and collection_name:
filename = file[1] # Extract filename
file_content = file[0] # Extract file content
# Check if file_content is a URL or binary data
if isinstance(file_content, str) and file_content.startswith("http"):
file_type = "URL"
# Handle URL case (if needed)
else:
file_type = "Binary"
# Write binary data to a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(filename)[1]) as temp_file:
temp_file.write(file_content)
temp_filepath = temp_file.name
# Pass the file path to embed_pdf
result = embed_pdf(temp_filepath, collection_name)
# Clean up the temporary file
os.remove(temp_filepath)
return result
else:
return "Please enter a query or upload a PDF file and specify a collection name."
iface = gr.Interface(
fn=combined_interface,
inputs=[
gr.Textbox(label="Query"),
gr.File(label="PDF File"),
gr.Textbox(label="Collection Name")
],
outputs="text"
)
iface.launch()