from typing import List, Dict, Any, Tuple, Optional import yaml import os cwd = os.getcwd() os.environ["PYTORCH_TRANSFORMERS_CACHE"] = os.path.join( cwd, "huggingface/transformers/" ) os.environ["TRANSFORMERS_CACHE"] = os.path.join(cwd, "huggingface/transformers/") os.environ["HF_HOME"] = os.path.join(cwd, "huggingface/") import logging import json from time import sleep, time import requests import streamlit as st from pathlib import Path from haystack.pipelines.base import Pipeline from haystack.nodes import EmbeddingRetriever, Shaper from haystack.document_stores import FAISSDocumentStore, InMemoryDocumentStore from haystack.errors import PipelineConfigError # from googletrans import Translator import openai openai.api_key = st.secrets["gpt35_api_key"] logger = logging.getLogger(__name__) pipelines = None system_prompt_template = """You are an US experienced immigration attorney. Your answer should be in your own words, be detailed and be no longer than 350 words. You should ask for more information or clarifications to give a more precise answer for an each client's case. Synthesize a comprehensive answer from your knowledge and the following topk most relevant paragraphs and the given question. Do not tell them to have a consultation with a certified attorney. Clients have been warned that you are our assistant and that's enough. Give an answer in the {0} language. """ users_prompt_template = """ Paragraphs: {0} Question: {1} """ # Since each instance of FAISSDocumentStore creates an in-memory FAISS index, the Indexing & Query Pipelines would # end up with different indices. The same applies for InMemoryDocumentStore. UNSUPPORTED_DOC_STORES = (FAISSDocumentStore, InMemoryDocumentStore) API_ENDPOINT = os.getenv("API_ENDPOINT", "http://localhost:8000") STATUS = "initialized" HS_VERSION = "hs_version" DOC_REQUEST = "query" DOC_FEEDBACK = "feedback" DOC_UPLOAD = "file-upload" # translator = Translator() def query( pipelines, query, filters={}, language="en", top_k_reader=3, top_k_retriever=5 ) -> Tuple[List[Dict[str, Any]], Dict[str, str]]: """ Send a query to the REST API and parse the answer. Returns both a ready-to-use representation of the results and the raw JSON. """ query_pipeline = pipelines.get("query_pipeline", None) start_time = time() params = { "retriever": {"top_k": top_k_retriever}, } lang = language.lower() or "english" response = query_pipeline.run( query=query, params=params, ) context = "" sources = [] for doc in response["documents"]: doc = doc.to_dict() doc_name = doc["meta"].get("name") doc_url = doc["meta"].get("url") source = ( "https://www.uscis.gov/sites/default/files/document/forms/" + doc_name if doc_name else doc_url ) if not source.endswith('.txt'): sources.append(source) if len(context) None: """ Send a feedback (label) to the REST API """ url = f"{API_ENDPOINT}/{DOC_FEEDBACK}" req = { "query": query, "document": document, "is_correct_answer": is_correct_answer, "is_correct_document": is_correct_document, "origin": "user-feedback", "answer": answer_obj, } response_raw = requests.post(url, json=req) if response_raw.status_code >= 400: raise ValueError( f"An error was returned [code {response_raw.status_code}]: {response_raw.json()}" ) def upload_doc(file): url = f"{API_ENDPOINT}/{DOC_UPLOAD}" files = [("files", file)] response = requests.post(url, files=files).json() return response def get_backlink(result) -> Tuple[Optional[str], Optional[str]]: if result.get("document", None): doc = result["document"] if isinstance(doc, dict): if doc.get("meta", None): if isinstance(doc["meta"], dict): if doc["meta"].get("url", None) and doc["meta"].get("title", None): return doc["meta"]["url"], doc["meta"]["title"] return None, None def setup_pipelines() -> Dict[str, Any]: # Re-import the configuration variables import config # pylint: disable=reimported pipelines = {} document_store = FAISSDocumentStore( faiss_config_path="faiss.json", faiss_index_path="faiss.index" ) retriever = EmbeddingRetriever( document_store=document_store, batch_size=128, embedding_model="ada", api_key=st.secrets["api_key"], max_seq_len=1024, ) shaper = Shaper( func="join_documents", inputs={"documents": "documents"}, outputs=["documents"] ) pipe = Pipeline() pipe.add_node(component=retriever, name="retriever", inputs=["Query"]) logging.info(f"Loaded pipeline nodes: {pipe.graph.nodes.keys()}") pipelines["query_pipeline"] = pipe # Find document store logging.info(f"Loaded docstore: {document_store}") pipelines["document_store"] = document_store # Load indexing pipeline (if available) try: indexing_pipeline = Pipeline.load_from_yaml( Path(config.PIPELINE_YAML_PATH), pipeline_name=config.INDEXING_PIPELINE_NAME ) docstore = indexing_pipeline.get_document_store() if isinstance(docstore, UNSUPPORTED_DOC_STORES): indexing_pipeline = None raise PipelineConfigError( "Indexing pipelines with FAISSDocumentStore or InMemoryDocumentStore are not supported by the REST APIs." ) except PipelineConfigError as e: indexing_pipeline = None logger.error(f"{e.message}\nFile Upload API will not be available.") finally: pipelines["indexing_pipeline"] = indexing_pipeline # Create directory for uploaded files os.makedirs(config.FILE_UPLOAD_PATH, exist_ok=True) return pipelines def get_pipelines(): global pipelines # pylint: disable=global-statement if not pipelines: pipelines = setup_pipelines() return pipelines