Spaces:
Build error
Build error
from typing import List, Dict, Any, Tuple, Optional | |
import yaml | |
import os | |
cwd = os.getcwd() | |
os.environ["PYTORCH_TRANSFORMERS_CACHE"] = os.path.join( | |
cwd, "huggingface/transformers/" | |
) | |
os.environ["TRANSFORMERS_CACHE"] = os.path.join(cwd, "huggingface/transformers/") | |
os.environ["HF_HOME"] = os.path.join(cwd, "huggingface/") | |
import logging | |
import json | |
from time import sleep, time | |
import requests | |
import streamlit as st | |
from pathlib import Path | |
from haystack.pipelines.base import Pipeline | |
from haystack.nodes import EmbeddingRetriever, Shaper | |
from haystack.document_stores import FAISSDocumentStore, InMemoryDocumentStore | |
from haystack.errors import PipelineConfigError | |
# from googletrans import Translator | |
import openai | |
openai.api_key = st.secrets["gpt35_api_key"] | |
logger = logging.getLogger(__name__) | |
pipelines = None | |
system_prompt_template = """You are an US experienced immigration attorney. Your answer should be in your own words, be detailed and be no longer than 350 words. | |
You should ask for more information or clarifications to give a more precise answer for an each client's case. | |
Synthesize a comprehensive answer from your knowledge and the following topk most relevant paragraphs and the given question. | |
Do not tell them to have a consultation with a certified attorney. Clients have been warned that you are our assistant and that's enough. | |
Give an answer in the {0} language. | |
""" | |
users_prompt_template = """ | |
Paragraphs: {0} | |
Question: {1} | |
""" | |
# Since each instance of FAISSDocumentStore creates an in-memory FAISS index, the Indexing & Query Pipelines would | |
# end up with different indices. The same applies for InMemoryDocumentStore. | |
UNSUPPORTED_DOC_STORES = (FAISSDocumentStore, InMemoryDocumentStore) | |
API_ENDPOINT = os.getenv("API_ENDPOINT", "http://localhost:8000") | |
STATUS = "initialized" | |
HS_VERSION = "hs_version" | |
DOC_REQUEST = "query" | |
DOC_FEEDBACK = "feedback" | |
DOC_UPLOAD = "file-upload" | |
# translator = Translator() | |
def query( | |
pipelines, query, filters={}, language="en", top_k_reader=3, top_k_retriever=5 | |
) -> Tuple[List[Dict[str, Any]], Dict[str, str]]: | |
""" | |
Send a query to the REST API and parse the answer. | |
Returns both a ready-to-use representation of the results and the raw JSON. | |
""" | |
query_pipeline = pipelines.get("query_pipeline", None) | |
start_time = time() | |
params = { | |
"retriever": {"top_k": top_k_retriever}, | |
} | |
lang = language.lower() or "english" | |
response = query_pipeline.run( | |
query=query, | |
params=params, | |
) | |
context = "" | |
sources = [] | |
for doc in response["documents"]: | |
doc = doc.to_dict() | |
doc_name = doc["meta"].get("name") | |
doc_url = doc["meta"].get("url") | |
source = ( | |
"https://www.uscis.gov/sites/default/files/document/forms/" + doc_name | |
if doc_name | |
else doc_url | |
) | |
if not source.endswith('.txt'): | |
sources.append(source) | |
if len(context)<top_k_reader: | |
context += " " + doc.get("content") | |
# Ensure answers and documents exist, even if they're empty lists | |
if not "documents" in response: | |
response["documents"] = [] | |
# prepare openAI api call | |
messages = [] | |
system_prompt = system_prompt_template.format(lang) | |
user_prompt = users_prompt_template.format(context, response["query"]) | |
messages.append({"role": "system", "content": system_prompt}) | |
messages.append({"role": "user", "content": user_prompt}) | |
openai_response = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", messages=messages | |
) | |
bot_response = openai_response["choices"][0]["message"]["content"] | |
response["answers"] = [bot_response] | |
logger.info( | |
json.dumps( | |
{ | |
"request": query, | |
"response": response, | |
"time": f"{(time() - start_time):.2f}", | |
}, | |
default=str, | |
) | |
) | |
# Format response | |
results = [] | |
answers = response["answers"] | |
documents = response["documents"] | |
for answer, doc in zip(answers, documents): | |
doc = doc.to_dict() | |
if answer: | |
context = doc.get("content") | |
results.append( | |
{ | |
"context": "..." + context if context else "", | |
"answer": answer, | |
"source": "\n".join(sources), | |
"_raw": answer, | |
} | |
) | |
else: | |
results.append({"context": None, "answer": None, "_raw": answer}) | |
return results, response | |
def send_feedback( | |
query, answer_obj, is_correct_answer, is_correct_document, document | |
) -> None: | |
""" | |
Send a feedback (label) to the REST API | |
""" | |
url = f"{API_ENDPOINT}/{DOC_FEEDBACK}" | |
req = { | |
"query": query, | |
"document": document, | |
"is_correct_answer": is_correct_answer, | |
"is_correct_document": is_correct_document, | |
"origin": "user-feedback", | |
"answer": answer_obj, | |
} | |
response_raw = requests.post(url, json=req) | |
if response_raw.status_code >= 400: | |
raise ValueError( | |
f"An error was returned [code {response_raw.status_code}]: {response_raw.json()}" | |
) | |
def upload_doc(file): | |
url = f"{API_ENDPOINT}/{DOC_UPLOAD}" | |
files = [("files", file)] | |
response = requests.post(url, files=files).json() | |
return response | |
def get_backlink(result) -> Tuple[Optional[str], Optional[str]]: | |
if result.get("document", None): | |
doc = result["document"] | |
if isinstance(doc, dict): | |
if doc.get("meta", None): | |
if isinstance(doc["meta"], dict): | |
if doc["meta"].get("url", None) and doc["meta"].get("title", None): | |
return doc["meta"]["url"], doc["meta"]["title"] | |
return None, None | |
def setup_pipelines() -> Dict[str, Any]: | |
# Re-import the configuration variables | |
import config # pylint: disable=reimported | |
pipelines = {} | |
document_store = FAISSDocumentStore( | |
faiss_config_path="faiss.json", faiss_index_path="faiss.index" | |
) | |
retriever = EmbeddingRetriever( | |
document_store=document_store, | |
batch_size=128, | |
embedding_model="ada", | |
api_key=st.secrets["api_key"], | |
max_seq_len=1024, | |
) | |
shaper = Shaper( | |
func="join_documents", inputs={"documents": "documents"}, outputs=["documents"] | |
) | |
pipe = Pipeline() | |
pipe.add_node(component=retriever, name="retriever", inputs=["Query"]) | |
logging.info(f"Loaded pipeline nodes: {pipe.graph.nodes.keys()}") | |
pipelines["query_pipeline"] = pipe | |
# Find document store | |
logging.info(f"Loaded docstore: {document_store}") | |
pipelines["document_store"] = document_store | |
# Load indexing pipeline (if available) | |
try: | |
indexing_pipeline = Pipeline.load_from_yaml( | |
Path(config.PIPELINE_YAML_PATH), pipeline_name=config.INDEXING_PIPELINE_NAME | |
) | |
docstore = indexing_pipeline.get_document_store() | |
if isinstance(docstore, UNSUPPORTED_DOC_STORES): | |
indexing_pipeline = None | |
raise PipelineConfigError( | |
"Indexing pipelines with FAISSDocumentStore or InMemoryDocumentStore are not supported by the REST APIs." | |
) | |
except PipelineConfigError as e: | |
indexing_pipeline = None | |
logger.error(f"{e.message}\nFile Upload API will not be available.") | |
finally: | |
pipelines["indexing_pipeline"] = indexing_pipeline | |
# Create directory for uploaded files | |
os.makedirs(config.FILE_UPLOAD_PATH, exist_ok=True) | |
return pipelines | |
def get_pipelines(): | |
global pipelines # pylint: disable=global-statement | |
if not pipelines: | |
pipelines = setup_pipelines() | |
return pipelines | |