Spaces:
Paused
Paused
# File: data_ingestion.py | |
import arxiv | |
import io | |
import requests | |
from typing import List, Dict, Any | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_openai import OpenAIEmbeddings | |
from langchain_qdrant import Qdrant | |
from datasets import load_dataset, Dataset | |
from langchain_community.document_loaders import PyMuPDFLoader | |
from config import * | |
embeddings = OpenAIEmbeddings(model="text-embedding-3-small") | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) | |
def fetch_arxiv_metadata(query: str, max_results: int = 10) -> List[Dict[str, Any]]: | |
client = arxiv.Client(page_size=max_results, delay_seconds=3, num_retries=3) | |
search = arxiv.Search(query=query, max_results=max_results) | |
results = [] | |
for result in client.results(search): | |
metadata = { | |
"title": result.title, | |
"authors": [author.name for author in result.authors], | |
"published": result.published.isoformat(), | |
"updated": result.updated.isoformat(), | |
"pdf_url": result.pdf_url, | |
"entry_id": result.entry_id, | |
"summary": result.summary | |
} | |
results.append(metadata) | |
return results | |
def process_pdf(pdf_url: str) -> str: | |
loader = PyMuPDFLoader(pdf_url) | |
data = loader.load() | |
return "\n".join([page.page_content for page in data]) | |
def ingest_documents(metadata_list: List[Dict[str, Any]]): | |
qdrant = Qdrant.from_documents( | |
[], # We'll add documents one by one | |
embeddings, | |
url=QDRANT_API_URL, | |
api_key=QDRANT_API_KEY, | |
collection_name=COLLECTION_NAME, | |
) | |
dataset = load_dataset(DATASET_NAME) | |
new_data = [] | |
for metadata in metadata_list: | |
pdf_text = process_pdf(metadata["pdf_url"]) | |
chunks = text_splitter.split_text(pdf_text) | |
# Add to Qdrant | |
qdrant.add_texts(chunks, metadatas=[metadata] * len(chunks)) | |
# Prepare data for Hugging Face dataset | |
for chunk in chunks: | |
new_data.append({ | |
"text": chunk, | |
"metadata": metadata, | |
"embedding": embeddings.embed_query(chunk) | |
}) | |
# Update Hugging Face dataset | |
new_dataset = Dataset.from_dict({k: [d[k] for d in new_data] for k in new_data[0]}) | |
dataset = dataset.add_item(new_dataset) | |
dataset.push_to_hub(DATASET_NAME) |