# File: data_ingestion.py import arxiv from typing import List, Dict, Any from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_openai import OpenAIEmbeddings from langchain_qdrant import Qdrant from datasets import load_dataset, Dataset from langchain_community.document_loaders import PyMuPDFLoader from config import * import logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') embeddings = OpenAIEmbeddings(model="text-embedding-3-small") text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) def fetch_arxiv_metadata(query: str, max_results: int = 10) -> List[Dict[str, Any]]: logging.info(f"Fetching arXiv metadata for query: {query}") client = arxiv.Client(page_size=max_results, delay_seconds=3, num_retries=3) search = arxiv.Search(query=query, max_results=max_results) results = [] for result in client.results(search): metadata = { "title": result.title, "authors": [author.name for author in result.authors], "published": result.published.isoformat(), "updated": result.updated.isoformat(), "pdf_url": result.pdf_url, "entry_id": result.entry_id, "summary": result.summary } results.append(metadata) logging.info(f"Fetched metadata for {len(results)} papers") return results def process_pdf(pdf_url: str) -> str: logging.info(f"Processing PDF from URL: {pdf_url}") loader = PyMuPDFLoader(pdf_url) data = loader.load() return "\n".join([page.page_content for page in data]) def ingest_documents(metadata_list: List[Dict[str, Any]]) -> str: logging.info(f"Starting ingestion of {len(metadata_list)} documents") qdrant = Qdrant.from_documents( [], # We'll add documents one by one embeddings, url=QDRANT_API_URL, api_key=QDRANT_API_KEY, collection_name=COLLECTION_NAME, ) dataset = load_dataset(DATASET_NAME) new_data = [] for i, metadata in enumerate(metadata_list): try: pdf_text = process_pdf(metadata["pdf_url"]) chunks = text_splitter.split_text(pdf_text) # Add to Qdrant qdrant.add_texts(chunks, metadatas=[metadata] * len(chunks)) # Prepare data for Hugging Face dataset for chunk in chunks: new_data.append({ "text": chunk, "metadata": metadata, "embedding": embeddings.embed_query(chunk) }) logging.info(f"Processed document {i+1}/{len(metadata_list)}") except Exception as e: logging.error(f"Error processing document {i+1}: {str(e)}") # Update Hugging Face dataset new_dataset = Dataset.from_dict({k: [d[k] for d in new_data] for k in new_data[0]}) dataset = dataset.add_item(new_dataset) dataset.push_to_hub(DATASET_NAME) result_message = f"Ingested {len(metadata_list)} documents, adding {len(new_data)} chunks to the dataset." logging.info(result_message) return result_message def run_ingestion_pipeline(query: str, max_results: int = 10) -> str: try: metadata_list = fetch_arxiv_metadata(query, max_results) result = ingest_documents(metadata_list) return result except Exception as e: error_message = f"Error in ingestion pipeline: {str(e)}" logging.error(error_message) return error_message