File size: 8,758 Bytes
6448a30 2ad5eb7 f473a07 6448a30 2ad5eb7 6448a30 2ad5eb7 4643020 2ad5eb7 6448a30 4643020 6448a30 580dec9 22e8191 4643020 580dec9 6448a30 22e8191 6448a30 22e8191 6448a30 4643020 6448a30 4643020 6448a30 580dec9 6448a30 4643020 580dec9 6448a30 22e8191 4643020 580dec9 6448a30 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 |
import weaviate
import langchain
import apscheduler
import tempfile
import gradio as gr
from langchain.embeddings import CohereEmbeddings
from langchain.document_loaders import UnstructuredFileLoader
from langchain.vectorstores import Weaviate
from langchain.llms import OpenAI
from langchain.chains import RetrievalQA
import os
import urllib.request
import ssl
import mimetypes
from dotenv import load_dotenv
import cohere
from apscheduler.schedulers.background import BackgroundScheduler
import time
# Load environment variables
load_dotenv()
openai_api_key = os.getenv('OPENAI')
cohere_api_key = os.getenv('COHERE')
weaviate_api_key = os.getenv('WEAVIATE')
weaviate_url = os.getenv('WEAVIATE_URL')
weaviate_username = os.getenv('WEAVIATE_USERNAME')
weaviate_password = os.getenv('WEAVIATE_PASSWORD')
# Function to refresh authentication
def refresh_authentication():
global my_credentials, client
my_credentials = weaviate.auth.AuthClientPassword(username=weaviate_username, password=weaviate_password)
client = weaviate.Client(weaviate_url, auth_client_secret=my_credentials)
# Initialize the scheduler for authentication refresh
scheduler = BackgroundScheduler()
scheduler.add_job(refresh_authentication, 'interval', minutes=30)
scheduler.start()
# Initial authentication
refresh_authentication()
Article = {
"class": "Article",
"description": "A class representing articles in the application",
"properties": [
{
"name": "title",
"description": "The title of the article",
"dataType": ["text"]
},
{
"name": "content",
"description": "The content of the article",
"dataType": ["text"]
},
{
"name": "author",
"description": "The author of the article",
"dataType": ["text"]
},
{
"name": "publishDate",
"description": "The date the article was published",
"dataType": ["date"]
}
],
# "vectorIndexType": "hnsw",
# "vectorizer": "text2vec-contextionary"
}
# Function to check if a class exists in the schema
def class_exists(class_name):
try:
existing_schema = client.schema.get()
existing_classes = [cls["class"] for cls in existing_schema["classes"]]
return class_name in existing_classes
except Exception as e:
print(f"Error checking if class exists: {e}")
return False
# Check if 'Article' class already exists
if not class_exists("Article"):
# Create the schema if 'Article' class does not exist
try:
client.schema.create(schema)
except Exception as e:
print(f"Error creating schema: {e}")
else:
print("Class 'Article' already exists in the schema.")
# Initialize the schema
schema = {
"classes": [Article]
}
# Check if 'Article' class already exists
if not class_exists("Article"):
# Create the schema if 'Article' class does not exist
try:
client.schema.create(schema)
except Exception as e:
print(f"Error creating schema: {e}")
else:
# Retrieve the existing schema if 'Article' class exists
try:
existing_schema = client.schema.get()
print("Existing schema retrieved:", existing_schema)
except Exception as e:
print(f"Error retrieving existing schema: {e}")
# Initialize vectorstore
vectorstore = Weaviate(client, index_name="HereChat", text_key="text")
vectorstore._query_attrs = ["text", "title", "url", "views", "lang", "_additional {distance}"]
vectorstore.embedding = CohereEmbeddings(model="embed-multilingual-v2.0", cohere_api_key=cohere_api_key)
# Initialize Cohere client
co = cohere.Client(api_key=cohere_api_key)
def embed_pdf(file, filename, collection_name, file_type):
# Check the file type and handle accordingly
if file_type == "URL":
# Download the file from the URL
try:
context = ssl._create_unverified_context()
with urllib.request.urlopen(file, context=context) as response, open(filename, 'wb') as out_file:
data = response.read()
out_file.write(data)
file_path = filename
except Exception as e:
return {"error": f"Error downloading file from URL: {e}"}
elif file_type == "Binary":
# Handle binary file
if isinstance(file, str):
# Convert string to bytes if necessary
file = file.encode()
file_content = file
file_path = os.path.join('./', filename)
with open(file_path, 'wb') as f:
f.write(file_content)
else:
return {"error": "Invalid file type"}
# Checking filetype for document parsing
mime_type = mimetypes.guess_type(file_path)[0]
loader = UnstructuredFileLoader(file_path)
docs = loader.load()
# Generate embeddings and store documents in Weaviate
embeddings = CohereEmbeddings(model="embed-multilingual-v2.0", cohere_api_key=cohere_api_key)
for doc in docs:
embedding = embeddings.embed([doc['text']])
weaviate_document = {
"text": doc['text'],
"embedding": embedding
}
client.data_object.create(data_object=weaviate_document, class_name=collection_name)
# Clean up if a temporary file was created
if isinstance(file, bytes):
os.remove(file_path)
return {"message": f"Documents embedded in Weaviate collection '{collection_name}'"}
def retrieve_info(query):
llm = OpenAI(temperature=0, openai_api_key=openai_api_key)
qa = RetrievalQA.from_chain_type(llm, retriever=vectorstore.as_retriever())
# Retrieve initial results
initial_results = qa({"query": query})
# Assuming initial_results are in the desired format, extract the top documents
top_docs = initial_results[:25] # Adjust this if your result format is different
# Rerank the top results
reranked_results = co.rerank(query=query, documents=top_docs, top_n=3, model='rerank-english-v2.0')
# Format the reranked results according to the Article schema
formatted_results = []
for idx, r in enumerate(reranked_results):
formatted_result = {
"Document Rank": idx + 1,
"Title": r.document['title'],
"Content": r.document['content'],
"Author": r.document['author'],
"Publish Date": r.document['publishDate'],
"Relevance Score": f"{r.relevance_score:.2f}"
}
formatted_results.append(formatted_result)
return {"results": formatted_results}
# Format the reranked results and append to user prompt
user_prompt = f"User: {query}\n"
for idx, r in enumerate(reranked_results):
user_prompt += f"Document {idx + 1}: {r.document['text']}\nRelevance Score: {r.relevance_score:.2f}\n\n"
# Final API call to OpenAI
final_response = client.chat.completions.create(
model="gpt-4-1106-preview",
messages=[
{
"role": "system",
"content": "You are a redditor. Assess, rephrase, and explain the following. Provide long answers. Use the same words and language you receive."
},
{
"role": "user",
"content": user_prompt
}
],
temperature=1.63,
max_tokens=2240,
top_p=1,
frequency_penalty=1.73,
presence_penalty=1.76
)
return final_response.choices[0].text
def combined_interface(query, file, collection_name):
if query:
article_info = retrieve_info(query)
return article_info
elif file is not None and collection_name:
filename = file[1] # Extract filename
file_content = file[0] # Extract file content
# Check if file_content is a URL or binary data
if isinstance(file_content, str) and file_content.startswith("http"):
file_type = "URL"
# Handle URL case (if needed)
else:
file_type = "Binary"
# Write binary data to a temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(filename)[1]) as temp_file:
temp_file.write(file_content)
temp_filepath = temp_file.name
# Pass the file path to embed_pdf
result = embed_pdf(temp_filepath, collection_name)
# Clean up the temporary file
os.remove(temp_filepath)
return result
else:
return "Please enter a query or upload a PDF file and specify a collection name."
iface = gr.Interface(
fn=combined_interface,
inputs=[
gr.Textbox(label="Query"),
gr.File(label="PDF File"),
gr.Textbox(label="Collection Name")
],
outputs="text"
)
iface.launch() |