Spaces:
Runtime error
Runtime error
import weaviate | |
import langchain | |
import apscheduler | |
import gradio as gr | |
from langchain.embeddings import CohereEmbeddings | |
from langchain.document_loaders import UnstructuredFileLoader | |
from langchain.vectorstores import Weaviate | |
from langchain.llms import OpenAI | |
from langchain.chains import RetrievalQA | |
import os | |
import urllib.request | |
import ssl | |
import mimetypes | |
from dotenv import load_dotenv | |
import cohere | |
from apscheduler.schedulers.background import BackgroundScheduler | |
import time | |
# Load environment variables | |
load_dotenv() | |
openai_api_key = os.getenv('OPENAI') | |
cohere_api_key = os.getenv('COHERE') | |
weaviate_api_key = os.getenv('WEAVIATE') | |
weaviate_url = os.getenv('WEAVIATE_URL') | |
weaviate_username = os.getenv('WEAVIATE_USERNAME') | |
weaviate_password = os.getenv('WEAVIATE_PASSWORD') | |
def refresh_token(): | |
url = weaviate_url | |
# Get Weaviate's OIDC configuration | |
weaviate_open_id_config = requests.get(url + "/v1/.well-known/openid-configuration") | |
if weaviate_open_id_config.status_code == "404": | |
print("Your Weaviate instance is not configured with openid") | |
response_json = weaviate_open_id_config.json() | |
client_id = response_json["clientId"] | |
href = response_json["href"] | |
# Get the token issuer's OIDC configuration | |
response_auth = requests.get(href) | |
if "grant_types_supported" in response_auth.json(): | |
# For resource owner password flow | |
assert "password" in response_auth.json()["grant_types_supported"] | |
username = "username" # <-- Replace with the actual username | |
password = "password" # <-- Replace with the actual password | |
# Construct the POST request to send to 'token_endpoint' | |
auth_body = { | |
"grant_type": "password", | |
"client_id": client_id, | |
"username": username, | |
"password": password, | |
} | |
response_post = requests.post(response_auth.json()["token_endpoint"], auth_body) | |
print("Your access_token is:") | |
print(response_post.json()["access_token"]) | |
else: | |
# For hybrid flow | |
authorization_url = response_auth.json()["authorization_endpoint"] | |
parameters = { | |
"client_id": client_id, | |
"response_type": "code%20id_token", | |
"response_mode": "fragment", | |
"redirect_url": url, | |
"scope": "openid", | |
"nonce": "abcd", | |
} | |
# Construct 'auth_url' | |
parameter_string = "&".join([key + "=" + item for key, item in parameters.items()]) | |
response_auth = requests.get(authorization_url + "?" + parameter_string) | |
print("Please visit the following url with your browser to login:") | |
print(authorization_url + "?" + parameter_string) | |
print( | |
"After the login you will be redirected, the token is the 'id_token' parameter of the redirection url." | |
) | |
# You could use this regular expression to parse the token | |
resp_txt = "Redirection URL" | |
token = re.search("(?<=id_token=).+(?=&)", resp_txt)[0] | |
print("Set as bearer token in the clients to access Weaviate.") | |
# Create a scheduler | |
scheduler = BackgroundScheduler() | |
# Schedule the token refresh function | |
scheduler.add_job(refresh_token, 'interval', minutes=30) # Adjust the interval as needed | |
# Start the scheduler | |
scheduler.start() | |
# Keep the script running | |
try: | |
while True: | |
time.sleep(2) | |
except (KeyboardInterrupt, SystemExit): | |
scheduler.shutdown() | |
# Weaviate connection | |
auth_config = weaviate.auth.AuthApiKey(api_key=weaviate_api_key) | |
client = weaviate.Client(url=weaviate_url, auth_client_secret=auth_config, | |
additional_headers={"X-Cohere-Api-Key": cohere_api_key}) | |
# Initialize vectorstore | |
vectorstore = Weaviate(client, index_name="HereChat", text_key="text") | |
vectorstore._query_attrs = ["text", "title", "url", "views", "lang", "_additional {distance}"] | |
vectorstore.embedding = CohereEmbeddings(model="embed-multilingual-v2.0", cohere_api_key=cohere_api_key) | |
# Initialize Cohere client | |
co = cohere.Client(api_key=cohere_api_key) | |
def embed_pdf(file, collection_name): | |
# Save the uploaded file | |
filename = file.name | |
file_path = os.path.join('./', filename) | |
with open(file_path, 'wb') as f: | |
f.write(file.read()) | |
# Checking filetype for document parsing | |
mime_type = mimetypes.guess_type(file_path)[0] | |
loader = UnstructuredFileLoader(file_path) | |
docs = loader.load() | |
# Generate embeddings and store documents in Weaviate | |
embeddings = CohereEmbeddings(model="embed-multilingual-v2.0", cohere_api_key=cohere_api_key) | |
for doc in docs: | |
embedding = embeddings.embed([doc['text']]) | |
weaviate_document = { | |
"text": doc['text'], | |
"embedding": embedding | |
} | |
client.data_object.create(data_object=weaviate_document, class_name=collection_name) | |
os.remove(file_path) | |
return {"message": f"Documents embedded in Weaviate collection '{collection_name}'"} | |
def retrieve_info(query): | |
llm = OpenAI(temperature=0, openai_api_key=openai_api_key) | |
qa = RetrievalQA.from_chain_type(llm, retriever=vectorstore.as_retriever()) | |
# Retrieve initial results | |
initial_results = qa({"query": query}) | |
# Assuming initial_results are in the desired format, extract the top documents | |
top_docs = initial_results[:25] # Adjust this if your result format is different | |
# Rerank the top results | |
reranked_results = co.rerank(query=query, documents=top_docs, top_n=3, model='rerank-english-v2.0') | |
# Format the reranked results | |
formatted_results = [] | |
for idx, r in enumerate(reranked_results): | |
formatted_result = { | |
"Document Rank": idx + 1, | |
"Document Index": r.index, | |
"Document": r.document['text'], | |
"Relevance Score": f"{r.relevance_score:.2f}" | |
} | |
formatted_results.append(formatted_result) | |
return {"results": formatted_results} | |
# Format the reranked results and append to user prompt | |
user_prompt = f"User: {query}\n" | |
for idx, r in enumerate(reranked_results): | |
user_prompt += f"Document {idx + 1}: {r.document['text']}\nRelevance Score: {r.relevance_score:.2f}\n\n" | |
# Final API call to OpenAI | |
final_response = client.chat.completions.create( | |
model="gpt-4-1106-preview", | |
messages=[ | |
{ | |
"role": "system", | |
"content": "You are a redditor. Assess, rephrase, and explain the following. Provide long answers. Use the same words and language you receive." | |
}, | |
{ | |
"role": "user", | |
"content": user_prompt | |
} | |
], | |
temperature=1.63, | |
max_tokens=2240, | |
top_p=1, | |
frequency_penalty=1.73, | |
presence_penalty=1.76 | |
) | |
return final_response.choices[0].text | |
def combined_interface(query, file, collection_name): | |
if query: | |
return retrieve_info(query) | |
elif file is not None and collection_name: | |
return embed_pdf(file, collection_name) | |
else: | |
return "Please enter a query or upload a PDF file." | |
iface = gr.Interface( | |
fn=combined_interface, | |
inputs=[ | |
gr.Textbox(label="Query"), | |
gr.File(label="PDF File"), | |
gr.Textbox(label="Collection Name") | |
], | |
outputs="text" | |
) | |
iface.launch() | |