ai_advisor / utils.py
rodrigomasini's picture
Duplicate from bondares/ai_advisor
b1f727f
from typing import List, Dict, Any, Tuple, Optional
import yaml
import os
cwd = os.getcwd()
os.environ["PYTORCH_TRANSFORMERS_CACHE"] = os.path.join(
cwd, "huggingface/transformers/"
)
os.environ["TRANSFORMERS_CACHE"] = os.path.join(cwd, "huggingface/transformers/")
os.environ["HF_HOME"] = os.path.join(cwd, "huggingface/")
import logging
import json
from time import sleep, time
import requests
import streamlit as st
from pathlib import Path
from haystack.pipelines.base import Pipeline
from haystack.nodes import EmbeddingRetriever, Shaper
from haystack.document_stores import FAISSDocumentStore, InMemoryDocumentStore
from haystack.errors import PipelineConfigError
# from googletrans import Translator
import openai
openai.api_key = st.secrets["gpt35_api_key"]
logger = logging.getLogger(__name__)
pipelines = None
system_prompt_template = """You are an US experienced immigration attorney. Your answer should be in your own words, be detailed and be no longer than 350 words.
You should ask for more information or clarifications to give a more precise answer for an each client's case.
Synthesize a comprehensive answer from your knowledge and the following topk most relevant paragraphs and the given question.
Do not tell them to have a consultation with a certified attorney. Clients have been warned that you are our assistant and that's enough.
Give an answer in the {0} language.
"""
users_prompt_template = """
Paragraphs: {0}
Question: {1}
"""
# Since each instance of FAISSDocumentStore creates an in-memory FAISS index, the Indexing & Query Pipelines would
# end up with different indices. The same applies for InMemoryDocumentStore.
UNSUPPORTED_DOC_STORES = (FAISSDocumentStore, InMemoryDocumentStore)
API_ENDPOINT = os.getenv("API_ENDPOINT", "http://localhost:8000")
STATUS = "initialized"
HS_VERSION = "hs_version"
DOC_REQUEST = "query"
DOC_FEEDBACK = "feedback"
DOC_UPLOAD = "file-upload"
# translator = Translator()
def query(
pipelines, query, filters={}, language="en", top_k_reader=3, top_k_retriever=5
) -> Tuple[List[Dict[str, Any]], Dict[str, str]]:
"""
Send a query to the REST API and parse the answer.
Returns both a ready-to-use representation of the results and the raw JSON.
"""
query_pipeline = pipelines.get("query_pipeline", None)
start_time = time()
params = {
"retriever": {"top_k": top_k_retriever},
}
lang = language.lower() or "english"
response = query_pipeline.run(
query=query,
params=params,
)
context = ""
sources = []
for doc in response["documents"]:
doc = doc.to_dict()
doc_name = doc["meta"].get("name")
doc_url = doc["meta"].get("url")
source = (
"https://www.uscis.gov/sites/default/files/document/forms/" + doc_name
if doc_name
else doc_url
)
if not source.endswith('.txt'):
sources.append(source)
if len(context)<top_k_reader:
context += " " + doc.get("content")
# Ensure answers and documents exist, even if they're empty lists
if not "documents" in response:
response["documents"] = []
# prepare openAI api call
messages = []
system_prompt = system_prompt_template.format(lang)
user_prompt = users_prompt_template.format(context, response["query"])
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": user_prompt})
openai_response = openai.ChatCompletion.create(
model="gpt-3.5-turbo", messages=messages
)
bot_response = openai_response["choices"][0]["message"]["content"]
response["answers"] = [bot_response]
logger.info(
json.dumps(
{
"request": query,
"response": response,
"time": f"{(time() - start_time):.2f}",
},
default=str,
)
)
# Format response
results = []
answers = response["answers"]
documents = response["documents"]
for answer, doc in zip(answers, documents):
doc = doc.to_dict()
if answer:
context = doc.get("content")
results.append(
{
"context": "..." + context if context else "",
"answer": answer,
"source": "\n".join(sources),
"_raw": answer,
}
)
else:
results.append({"context": None, "answer": None, "_raw": answer})
return results, response
def send_feedback(
query, answer_obj, is_correct_answer, is_correct_document, document
) -> None:
"""
Send a feedback (label) to the REST API
"""
url = f"{API_ENDPOINT}/{DOC_FEEDBACK}"
req = {
"query": query,
"document": document,
"is_correct_answer": is_correct_answer,
"is_correct_document": is_correct_document,
"origin": "user-feedback",
"answer": answer_obj,
}
response_raw = requests.post(url, json=req)
if response_raw.status_code >= 400:
raise ValueError(
f"An error was returned [code {response_raw.status_code}]: {response_raw.json()}"
)
def upload_doc(file):
url = f"{API_ENDPOINT}/{DOC_UPLOAD}"
files = [("files", file)]
response = requests.post(url, files=files).json()
return response
def get_backlink(result) -> Tuple[Optional[str], Optional[str]]:
if result.get("document", None):
doc = result["document"]
if isinstance(doc, dict):
if doc.get("meta", None):
if isinstance(doc["meta"], dict):
if doc["meta"].get("url", None) and doc["meta"].get("title", None):
return doc["meta"]["url"], doc["meta"]["title"]
return None, None
def setup_pipelines() -> Dict[str, Any]:
# Re-import the configuration variables
import config # pylint: disable=reimported
pipelines = {}
document_store = FAISSDocumentStore(
faiss_config_path="faiss.json", faiss_index_path="faiss.index"
)
retriever = EmbeddingRetriever(
document_store=document_store,
batch_size=128,
embedding_model="ada",
api_key=st.secrets["api_key"],
max_seq_len=1024,
)
shaper = Shaper(
func="join_documents", inputs={"documents": "documents"}, outputs=["documents"]
)
pipe = Pipeline()
pipe.add_node(component=retriever, name="retriever", inputs=["Query"])
logging.info(f"Loaded pipeline nodes: {pipe.graph.nodes.keys()}")
pipelines["query_pipeline"] = pipe
# Find document store
logging.info(f"Loaded docstore: {document_store}")
pipelines["document_store"] = document_store
# Load indexing pipeline (if available)
try:
indexing_pipeline = Pipeline.load_from_yaml(
Path(config.PIPELINE_YAML_PATH), pipeline_name=config.INDEXING_PIPELINE_NAME
)
docstore = indexing_pipeline.get_document_store()
if isinstance(docstore, UNSUPPORTED_DOC_STORES):
indexing_pipeline = None
raise PipelineConfigError(
"Indexing pipelines with FAISSDocumentStore or InMemoryDocumentStore are not supported by the REST APIs."
)
except PipelineConfigError as e:
indexing_pipeline = None
logger.error(f"{e.message}\nFile Upload API will not be available.")
finally:
pipelines["indexing_pipeline"] = indexing_pipeline
# Create directory for uploaded files
os.makedirs(config.FILE_UPLOAD_PATH, exist_ok=True)
return pipelines
def get_pipelines():
global pipelines # pylint: disable=global-statement
if not pipelines:
pipelines = setup_pipelines()
return pipelines