import weaviate import langchain import apscheduler import gradio as gr from langchain.embeddings import CohereEmbeddings from langchain.document_loaders import UnstructuredFileLoader from langchain.vectorstores import Weaviate from langchain.llms import OpenAI from langchain.chains import RetrievalQA import os import urllib.request import ssl import bitsandbytes import tempfile import mimetypes from dotenv import load_dotenv import cohere from apscheduler.schedulers.background import BackgroundScheduler import time # Load environment variables load_dotenv() openai_api_key = os.getenv('OPENAI') cohere_api_key = os.getenv('COHERE') weaviate_api_key = os.getenv('WEAVIATE') weaviate_url = os.getenv('WEAVIATE_URL') weaviate_username = os.getenv('WEAVIATE_USERNAME') weaviate_password = os.getenv('WEAVIATE_PASSWORD') # Function to refresh authentication def refresh_authentication(): global my_credentials, client my_credentials = weaviate.auth.AuthClientPassword(username=weaviate_username, password=weaviate_password) client = weaviate.Client(weaviate_url, auth_client_secret=my_credentials) # Initialize the scheduler for authentication refresh scheduler = BackgroundScheduler() scheduler.add_job(refresh_authentication, 'interval', minutes=30) scheduler.start() # Initial authentication refresh_authentication() Article = { "class": "Article", "description": "A class representing articles in the application", "properties": [ { "name": "title", "description": "The title of the article", "dataType": ["text"] }, { "name": "content", "description": "The content of the article", "dataType": ["text"] }, { "name": "author", "description": "The author of the article", "dataType": ["text"] }, { "name": "publishDate", "description": "The date the article was published", "dataType": ["date"] } ], # "vectorIndexType": "hnsw", # "vectorizer": "text2vec-contextionary" } # Function to check if a class exists in the schema def class_exists(class_name): try: existing_schema = client.schema.get() existing_classes = [cls["class"] for cls in existing_schema["classes"]] return class_name in existing_classes except Exception as e: print(f"Error checking if class exists: {e}") return False # Check if 'Article' class already exists if not class_exists("Article"): # Create the schema if 'Article' class does not exist try: client.schema.create(schema) except Exception as e: print(f"Error creating schema: {e}") else: print("Class 'Article' already exists in the schema.") # Initialize the schema schema = { "classes": [Article] } # Check if 'Article' class already exists if not class_exists("Article"): # Create the schema if 'Article' class does not exist try: client.schema.create(schema) except Exception as e: print(f"Error creating schema: {e}") else: # Retrieve the existing schema if 'Article' class exists try: existing_schema = client.schema.get() print("Existing schema retrieved:", existing_schema) except Exception as e: print(f"Error retrieving existing schema: {e}") # Initialize vectorstore vectorstore = Weaviate(client, index_name="HereChat", text_key="text") vectorstore._query_attrs = ["text", "title", "url", "views", "lang", "_additional {distance}"] vectorstore.embedding = CohereEmbeddings(model="embed-multilingual-v2.0", cohere_api_key=cohere_api_key) # Initialize Cohere client co = cohere.Client(api_key=cohere_api_key) def embed_pdf(file, filename, collection_name, file_type): # Check the file type and handle accordingly if file_type == "URL": # Download the file from the URL try: context = ssl._create_unverified_context() with urllib.request.urlopen(file, context=context) as response, open(filename, 'wb') as out_file: data = response.read() out_file.write(data) file_path = filename except Exception as e: return {"error": f"Error downloading file from URL: {e}"} elif file_type == "Binary": # Handle binary file if isinstance(file, str): # Convert string to bytes if necessary file = file.encode() file_content = file file_path = os.path.join('./', filename) with open(file_path, 'wb') as f: f.write(file) else: return {"error": "Invalid file type"} # Checking filetype for document parsing mime_type = mimetypes.guess_type(file_path)[0] loader = UnstructuredFileLoader(file_path) docs = loader.load() # Generate embeddings and store documents in Weaviate for doc in docs: embedding = vectorstore.embedding.embed([doc['text']]) weaviate_document = { "text": doc['text'], "embedding": embedding } client.data_object.create(data_object=weaviate_document, class_name=collection_name) return {"message": f"Documents embedded in Weaviate collection '{collection_name}'"} def retrieve_info(query): llm = OpenAI(temperature=0, openai_api_key=openai_api_key) qa = RetrievalQA.from_chain_type(llm, retriever=vectorstore.as_retriever()) # Retrieve initial results initial_results = qa({"query": query}) # Assuming initial_results are in the desired format, extract the top documents top_docs = initial_results[:25] # Adjust this if your result format is different # Rerank the top results reranked_results = co.rerank(query=query, documents=top_docs, top_n=3, model='rerank-english-v2.0') # Format the reranked results according to the Article schema formatted_results = [] for idx, r in enumerate(reranked_results): formatted_result = { "Document Rank": idx + 1, "Title": r.document['title'], "Content": r.document['content'], "Author": r.document['author'], "Publish Date": r.document['publishDate'], "Relevance Score": f"{r.relevance_score:.2f}" } formatted_results.append(formatted_result) return {"results": formatted_results} # Format the reranked results and append to user prompt user_prompt = f"User: {query}\n" for idx, r in enumerate(reranked_results): user_prompt += f"Document {idx + 1}: {r.document['text']}\nRelevance Score: {r.relevance_score:.2f}\n\n" # Final API call to OpenAI final_response = client.chat.completions.create( model="gpt-4-1106-preview", messages=[ { "role": "system", "content": "You are a redditor. Assess, rephrase, and explain the following. Provide long answers. Use the same words and language you receive." }, { "role": "user", "content": user_prompt } ], temperature=1.63, max_tokens=2240, top_p=1, frequency_penalty=1.73, presence_penalty=1.76 ) return final_response.choices[0].text def combined_interface(query, file, collection_name): if query: article_info = retrieve_info(query) return article_info elif file is not None and collection_name: filename = file[1] # Extract filename file_content = file[0] # Extract file content # Check if file_content is a URL or binary data if isinstance(file_content, str) and file_content.startswith("http"): file_type = "URL" # Handle URL case (if needed) else: file_type = "Binary" # Write binary data to a temporary file with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(filename)[1]) as temp_file: temp_file.write(file_content) temp_filepath = temp_file.name # Pass the file path to embed_pdf result = embed_pdf(temp_filepath, collection_name) # Clean up the temporary file os.remove(temp_filepath) return result else: return "Please enter a query or upload a PDF file and specify a collection name." iface = gr.Interface( fn=combined_interface, inputs=[ gr.Textbox(label="Query"), gr.File(label="PDF File"), gr.Textbox(label="Collection Name") ], outputs="text" ) iface.launch()