File size: 3,661 Bytes
8c3a73e
 
 
 
 
 
 
 
 
b4c442a
 
 
8c3a73e
 
 
 
 
b4c442a
8c3a73e
 
 
 
 
 
 
 
 
 
 
 
 
 
b4c442a
8c3a73e
 
 
b4c442a
8c3a73e
 
 
 
b4c442a
 
8c3a73e
 
 
 
 
 
 
 
 
 
 
b4c442a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8c3a73e
 
 
 
b4c442a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# File: data_ingestion.py
import arxiv
from typing import List, Dict, Any
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_openai import OpenAIEmbeddings
from langchain_qdrant import Qdrant
from datasets import load_dataset, Dataset
from langchain_community.document_loaders import PyMuPDFLoader
from config import *
import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)

def fetch_arxiv_metadata(query: str, max_results: int = 10) -> List[Dict[str, Any]]:
    logging.info(f"Fetching arXiv metadata for query: {query}")
    client = arxiv.Client(page_size=max_results, delay_seconds=3, num_retries=3)
    search = arxiv.Search(query=query, max_results=max_results)
    results = []
    for result in client.results(search):
        metadata = {
            "title": result.title,
            "authors": [author.name for author in result.authors],
            "published": result.published.isoformat(),
            "updated": result.updated.isoformat(),
            "pdf_url": result.pdf_url,
            "entry_id": result.entry_id,
            "summary": result.summary
        }
        results.append(metadata)
    logging.info(f"Fetched metadata for {len(results)} papers")
    return results

def process_pdf(pdf_url: str) -> str:
    logging.info(f"Processing PDF from URL: {pdf_url}")
    loader = PyMuPDFLoader(pdf_url)
    data = loader.load()
    return "\n".join([page.page_content for page in data])

def ingest_documents(metadata_list: List[Dict[str, Any]]) -> str:
    logging.info(f"Starting ingestion of {len(metadata_list)} documents")
    qdrant = Qdrant.from_documents(
        [],  # We'll add documents one by one
        embeddings,
        url=QDRANT_API_URL,
        api_key=QDRANT_API_KEY,
        collection_name=COLLECTION_NAME,
    )

    dataset = load_dataset(DATASET_NAME)
    new_data = []

    for i, metadata in enumerate(metadata_list):
        try:
            pdf_text = process_pdf(metadata["pdf_url"])
            chunks = text_splitter.split_text(pdf_text)
            
            # Add to Qdrant
            qdrant.add_texts(chunks, metadatas=[metadata] * len(chunks))
            
            # Prepare data for Hugging Face dataset
            for chunk in chunks:
                new_data.append({
                    "text": chunk,
                    "metadata": metadata,
                    "embedding": embeddings.embed_query(chunk)
                })
            logging.info(f"Processed document {i+1}/{len(metadata_list)}")
        except Exception as e:
            logging.error(f"Error processing document {i+1}: {str(e)}")
    
    # Update Hugging Face dataset
    new_dataset = Dataset.from_dict({k: [d[k] for d in new_data] for k in new_data[0]})
    dataset = dataset.add_item(new_dataset)
    dataset.push_to_hub(DATASET_NAME)
    
    result_message = f"Ingested {len(metadata_list)} documents, adding {len(new_data)} chunks to the dataset."
    logging.info(result_message)
    return result_message

def run_ingestion_pipeline(query: str, max_results: int = 10) -> str:
    try:
        metadata_list = fetch_arxiv_metadata(query, max_results)
        result = ingest_documents(metadata_list)
        return result
    except Exception as e:
        error_message = f"Error in ingestion pipeline: {str(e)}"
        logging.error(error_message)
        return error_message