|
from typing import List |
|
from pypdf import PdfReader |
|
from haystack.utils import Secret |
|
from haystack import Pipeline, Document, component |
|
|
|
from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter |
|
from haystack.components.writers import DocumentWriter |
|
from haystack.components.embedders import SentenceTransformersDocumentEmbedder, SentenceTransformersTextEmbedder |
|
from haystack.document_stores.in_memory import InMemoryDocumentStore |
|
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever |
|
from haystack.components.builders import PromptBuilder |
|
from haystack.components.generators.chat import OpenAIChatGenerator, HuggingFaceTGIChatGenerator |
|
from haystack.components.generators import OpenAIGenerator, HuggingFaceTGIGenerator |
|
from haystack.document_stores.types import DuplicatePolicy |
|
|
|
SENTENCE_RETREIVER_MODEL = "sentence-transformers/all-MiniLM-L6-v2" |
|
|
|
MAX_TOKENS = 500 |
|
|
|
template = """ |
|
As a professional HR recruiter given the following information, answer the question shortly and concisely in 1 or 2 sentences. |
|
|
|
Context: |
|
{% for document in documents %} |
|
{{ document.content }} |
|
{% endfor %} |
|
|
|
Question: {{question}} |
|
Answer: |
|
""" |
|
|
|
|
|
@component |
|
class UploadedFileConverter: |
|
""" |
|
A component to convert uploaded PDF files to Documents |
|
""" |
|
|
|
@component.output_types(documents=List[Document]) |
|
def run(self, uploaded_file): |
|
pdf = PdfReader(uploaded_file) |
|
documents = [] |
|
|
|
name = uploaded_file.name.rstrip('.PDF') + '_' |
|
for page in pdf.pages: |
|
documents.append( |
|
Document( |
|
content=page.extract_text(), |
|
meta={'name': name + f"_{page.page_number}"})) |
|
return {"documents": documents} |
|
|
|
|
|
def create_ingestion_pipeline(document_store): |
|
doc_embedder = SentenceTransformersDocumentEmbedder(model=SENTENCE_RETREIVER_MODEL) |
|
doc_embedder.warm_up() |
|
|
|
pipeline = Pipeline() |
|
pipeline.add_component("converter", UploadedFileConverter()) |
|
pipeline.add_component("cleaner", DocumentCleaner()) |
|
pipeline.add_component("splitter", |
|
DocumentSplitter(split_by="passage", split_length=100, split_overlap=10)) |
|
pipeline.add_component("embedder", doc_embedder) |
|
pipeline.add_component("writer", |
|
DocumentWriter(document_store=document_store, policy=DuplicatePolicy.OVERWRITE)) |
|
|
|
pipeline.connect("converter", "cleaner") |
|
pipeline.connect("cleaner", "splitter") |
|
pipeline.connect("splitter", "embedder") |
|
pipeline.connect("embedder", "writer") |
|
return pipeline |
|
|
|
|
|
def create_query_pipeline(document_store, model_name, api_key): |
|
prompt_builder = PromptBuilder(template=template) |
|
if model_name == "local LLM": |
|
generator = OpenAIGenerator(model=model_name, |
|
api_base_url="http://localhost:1234/v1", |
|
generation_kwargs={"max_tokens": MAX_TOKENS} |
|
) |
|
elif "gpt" in model_name: |
|
generator = OpenAIGenerator(api_key=Secret.from_token(api_key), model=model_name, |
|
generation_kwargs={"max_tokens": MAX_TOKENS} |
|
) |
|
else: |
|
generator = HuggingFaceTGIGenerator(token=Secret.from_token(api_key), model=model_name, |
|
generation_kwargs={"max_new_tokens": MAX_TOKENS} |
|
) |
|
|
|
query_pipeline = Pipeline() |
|
query_pipeline.add_component("text_embedder", |
|
SentenceTransformersTextEmbedder(model=SENTENCE_RETREIVER_MODEL)) |
|
query_pipeline.add_component("retriever", InMemoryEmbeddingRetriever(document_store, top_k=3)) |
|
query_pipeline.add_component("prompt_builder", prompt_builder) |
|
query_pipeline.add_component("generator", generator) |
|
|
|
query_pipeline.connect("text_embedder.embedding", "retriever.query_embedding") |
|
query_pipeline.connect("retriever.documents", "prompt_builder.documents") |
|
query_pipeline.connect("prompt_builder", "generator") |
|
|
|
return query_pipeline |
|
|
|
|
|
class DocumentQAEngine: |
|
def __init__(self, |
|
model_name, |
|
api_key=None |
|
): |
|
self.api_key = api_key |
|
self.model_name = model_name |
|
document_store = InMemoryDocumentStore() |
|
self.chunks = [] |
|
self.query_pipeline = create_query_pipeline(document_store, model_name, api_key) |
|
self.pdf_ingestion_pipeline = create_ingestion_pipeline(document_store) |
|
|
|
def ingest_pdf(self, uploaded_file): |
|
self.pdf_ingestion_pipeline.run({"converter": {"uploaded_file": uploaded_file}}) |
|
|
|
def process_message(self, query): |
|
response = self.query_pipeline.run({"text_embedder": {"text": query}, "prompt_builder": {"question": query}}) |
|
return response["generator"]["replies"][0] |
|
|