Spaces:
Runtime error
Runtime error
File size: 7,821 Bytes
3006f55 bdc75a0 3006f55 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 |
from typing import List, Dict, Any, Tuple, Optional
import yaml
import os
cwd = os.getcwd()
os.environ["PYTORCH_TRANSFORMERS_CACHE"] = os.path.join(
cwd, "huggingface/transformers/"
)
os.environ["TRANSFORMERS_CACHE"] = os.path.join(cwd, "huggingface/transformers/")
os.environ["HF_HOME"] = os.path.join(cwd, "huggingface/")
import logging
import json
from time import sleep, time
import requests
import streamlit as st
from pathlib import Path
from haystack.pipelines.base import Pipeline
from haystack.nodes import EmbeddingRetriever, Shaper
from haystack.document_stores import FAISSDocumentStore, InMemoryDocumentStore
from haystack.errors import PipelineConfigError
# from googletrans import Translator
import openai
openai.api_key = st.secrets["gpt35_api_key"]
logger = logging.getLogger(__name__)
pipelines = None
system_prompt_template = """You are an US experienced immigration attorney. Your answer should be in your own words, be detailed and be no longer than 350 words.
You should ask for more information or clarifications to give a more precise answer for an each client's case.
Synthesize a comprehensive answer from your knowledge and the following topk most relevant paragraphs and the given question.
Do not tell them to have a consultation with a certified attorney. Clients have been warned that you are our assistant and that's enough.
Give an answer in the {0} language.
"""
users_prompt_template = """
Paragraphs: {0}
Question: {1}
"""
# Since each instance of FAISSDocumentStore creates an in-memory FAISS index, the Indexing & Query Pipelines would
# end up with different indices. The same applies for InMemoryDocumentStore.
UNSUPPORTED_DOC_STORES = (FAISSDocumentStore, InMemoryDocumentStore)
API_ENDPOINT = os.getenv("API_ENDPOINT", "http://localhost:8000")
STATUS = "initialized"
HS_VERSION = "hs_version"
DOC_REQUEST = "query"
DOC_FEEDBACK = "feedback"
DOC_UPLOAD = "file-upload"
# translator = Translator()
def query(
pipelines, query, filters={}, language="en", top_k_reader=3, top_k_retriever=5
) -> Tuple[List[Dict[str, Any]], Dict[str, str]]:
"""
Send a query to the REST API and parse the answer.
Returns both a ready-to-use representation of the results and the raw JSON.
"""
query_pipeline = pipelines.get("query_pipeline", None)
start_time = time()
params = {
"retriever": {"top_k": top_k_retriever},
}
lang = language.lower() or "english"
response = query_pipeline.run(
query=query,
params=params,
)
context = ""
sources = []
for doc in response["documents"]:
doc = doc.to_dict()
doc_name = doc["meta"].get("name")
doc_url = doc["meta"].get("url")
source = (
"https://www.uscis.gov/sites/default/files/document/forms/" + doc_name
if doc_name
else doc_url
)
if not source.endswith('.txt'):
sources.append(source)
if len(context)<top_k_reader:
context += " " + doc.get("content")
# Ensure answers and documents exist, even if they're empty lists
if not "documents" in response:
response["documents"] = []
# prepare openAI api call
messages = []
system_prompt = system_prompt_template.format(lang)
user_prompt = users_prompt_template.format(context, response["query"])
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": user_prompt})
openai_response = openai.ChatCompletion.create(
model="gpt-3.5-turbo", messages=messages
)
bot_response = openai_response["choices"][0]["message"]["content"]
response["answers"] = [bot_response]
logger.info(
json.dumps(
{
"request": query,
"response": response,
"time": f"{(time() - start_time):.2f}",
},
default=str,
)
)
# Format response
results = []
answers = response["answers"]
documents = response["documents"]
for answer, doc in zip(answers, documents):
doc = doc.to_dict()
if answer:
context = doc.get("content")
results.append(
{
"context": "..." + context if context else "",
"answer": answer,
"source": "\n".join(sources),
"_raw": answer,
}
)
else:
results.append({"context": None, "answer": None, "_raw": answer})
return results, response
def send_feedback(
query, answer_obj, is_correct_answer, is_correct_document, document
) -> None:
"""
Send a feedback (label) to the REST API
"""
url = f"{API_ENDPOINT}/{DOC_FEEDBACK}"
req = {
"query": query,
"document": document,
"is_correct_answer": is_correct_answer,
"is_correct_document": is_correct_document,
"origin": "user-feedback",
"answer": answer_obj,
}
response_raw = requests.post(url, json=req)
if response_raw.status_code >= 400:
raise ValueError(
f"An error was returned [code {response_raw.status_code}]: {response_raw.json()}"
)
def upload_doc(file):
url = f"{API_ENDPOINT}/{DOC_UPLOAD}"
files = [("files", file)]
response = requests.post(url, files=files).json()
return response
def get_backlink(result) -> Tuple[Optional[str], Optional[str]]:
if result.get("document", None):
doc = result["document"]
if isinstance(doc, dict):
if doc.get("meta", None):
if isinstance(doc["meta"], dict):
if doc["meta"].get("url", None) and doc["meta"].get("title", None):
return doc["meta"]["url"], doc["meta"]["title"]
return None, None
def setup_pipelines() -> Dict[str, Any]:
# Re-import the configuration variables
import config # pylint: disable=reimported
pipelines = {}
document_store = FAISSDocumentStore(
faiss_config_path="faiss.json", faiss_index_path="faiss.index"
)
retriever = EmbeddingRetriever(
document_store=document_store,
batch_size=128,
embedding_model="ada",
api_key=st.secrets["api_key"],
max_seq_len=1024,
)
shaper = Shaper(
func="join_documents", inputs={"documents": "documents"}, outputs=["documents"]
)
pipe = Pipeline()
pipe.add_node(component=retriever, name="retriever", inputs=["Query"])
logging.info(f"Loaded pipeline nodes: {pipe.graph.nodes.keys()}")
pipelines["query_pipeline"] = pipe
# Find document store
logging.info(f"Loaded docstore: {document_store}")
pipelines["document_store"] = document_store
# Load indexing pipeline (if available)
try:
indexing_pipeline = Pipeline.load_from_yaml(
Path(config.PIPELINE_YAML_PATH), pipeline_name=config.INDEXING_PIPELINE_NAME
)
docstore = indexing_pipeline.get_document_store()
if isinstance(docstore, UNSUPPORTED_DOC_STORES):
indexing_pipeline = None
raise PipelineConfigError(
"Indexing pipelines with FAISSDocumentStore or InMemoryDocumentStore are not supported by the REST APIs."
)
except PipelineConfigError as e:
indexing_pipeline = None
logger.error(f"{e.message}\nFile Upload API will not be available.")
finally:
pipelines["indexing_pipeline"] = indexing_pipeline
# Create directory for uploaded files
os.makedirs(config.FILE_UPLOAD_PATH, exist_ok=True)
return pipelines
def get_pipelines():
global pipelines # pylint: disable=global-statement
if not pipelines:
pipelines = setup_pipelines()
return pipelines
|