Spaces:
Sleeping
Sleeping
import os | |
import feedparser | |
from langchain.vectorstores import Chroma | |
from langchain.embeddings import HuggingFaceEmbeddings | |
from langchain.docstore.document import Document | |
import logging | |
from huggingface_hub import HfApi, login, snapshot_download | |
from huggingface_hub.utils import HfHubHTTPError | |
import json | |
import dateutil.parser | |
import hashlib | |
import re | |
from datetime import datetime | |
# Setup logging | |
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
logger = logging.getLogger(__name__) | |
# --- Constants --- | |
# Local and repository configuration | |
LOCAL_DB_DIR = "chroma_db_news" # Using a more descriptive local directory name | |
FEEDS_FILE = "rss_feeds.json" | |
COLLECTION_NAME = "news_articles" | |
REPO_ID = "broadfield-dev/news-rag-db" # Your Hugging Face Hub repo ID | |
# RSS feed fetching configuration | |
MAX_ARTICLES_PER_FEED = 1000 | |
# Hugging Face credentials | |
HF_API_TOKEN = os.getenv("HF_TOKEN") | |
if not HF_API_TOKEN: | |
raise ValueError("Hugging Face API token not found. Please set the HF_TOKEN environment variable.") | |
# --- Global Initializations --- | |
# Initialize Hugging Face API | |
login(token=HF_API_TOKEN) | |
hf_api = HfApi() | |
# Initialize embedding model once to be reused | |
logger.info("Loading embedding model...") | |
embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
logger.info("Embedding model loaded.") | |
def setup_local_db(): | |
""" | |
Ensures the local database directory exists. | |
If it doesn't, it attempts to download from Hugging Face Hub. | |
If the Hub repo is empty or doesn't exist, it creates a new local directory. | |
This function handles the "build a new dataset if one does not exist" requirement. | |
""" | |
if os.path.exists(LOCAL_DB_DIR): | |
logger.info(f"Local database found at '{LOCAL_DB_DIR}'.") | |
return | |
logger.info(f"Local database not found. Attempting to download from Hugging Face Hub repo: {REPO_ID}") | |
try: | |
# snapshot_download is the correct function for downloading a whole repository/folder | |
snapshot_download( | |
repo_id=REPO_ID, | |
repo_type="dataset", | |
local_dir=LOCAL_DB_DIR, | |
token=HF_API_TOKEN, | |
) | |
logger.info(f"Database downloaded successfully from {REPO_ID} to {LOCAL_DB_DIR}.") | |
except HfHubHTTPError as e: | |
# This error (e.g., 404 Not Found) is expected if the repo is new or empty. | |
logger.warning( | |
f"Failed to download from Hub (Repo might be new or empty): {e}. " | |
f"A new local database will be created at '{LOCAL_DB_DIR}'." | |
) | |
os.makedirs(LOCAL_DB_DIR, exist_ok=True) | |
except Exception as e: | |
logger.error(f"An unexpected error occurred during DB download: {e}") | |
logger.info(f"Creating a new local database at '{LOCAL_DB_DIR}'.") | |
os.makedirs(LOCAL_DB_DIR, exist_ok=True) | |
def clean_text(text): | |
"""Clean text by removing HTML tags and extra whitespace.""" | |
if not text or not isinstance(text, str): | |
return "" | |
# Remove HTML tags | |
text = re.sub(r'<.*?>', '', text) | |
# Normalize whitespace | |
text = ' '.join(text.split()) | |
return text.strip().lower() | |
def fetch_rss_feeds(): | |
"""Fetches and parses articles from a list of RSS feeds in a JSON file.""" | |
articles = [] | |
seen_keys = set() | |
try: | |
with open(FEEDS_FILE, 'r') as f: | |
feed_categories = json.load(f) | |
except FileNotFoundError: | |
logger.error(f"'{FEEDS_FILE}' not found. Please create it. No feeds to process.") | |
return [] | |
for category, feeds in feed_categories.items(): | |
for feed_info in feeds: | |
feed_url = feed_info.get("url") | |
if not feed_url: | |
logger.warning(f"Skipping feed with no URL in category '{category}'") | |
continue | |
try: | |
logger.info(f"Fetching {feed_url}") | |
feed = feedparser.parse(feed_url) | |
if feed.bozo: | |
logger.warning(f"Feed parsing error for {feed_url}: {feed.bozo_exception}") | |
continue | |
for entry in feed.entries[:MAX_ARTICLES_PER_FEED]: | |
title = clean_text(entry.get("title", "No Title")) | |
link = entry.get("link", "") # Don't clean link URL | |
description = clean_text(entry.get("summary", entry.get("description", ""))) | |
if not description: # Skip articles without content | |
continue | |
published = "Unknown Date" | |
for date_field in ["published", "updated", "created", "pubDate"]: | |
if date_field in entry: | |
try: | |
parsed_date = dateutil.parser.parse(entry[date_field]) | |
published = parsed_date.strftime("%Y-%m-%d %H:%M:%S") | |
break | |
except (ValueError, TypeError): | |
continue | |
# Create a unique key to deduplicate articles before processing | |
description_hash = hashlib.sha256(description.encode('utf-8')).hexdigest() | |
key = f"{title}|{link}|{published}|{description_hash}" | |
if key not in seen_keys: | |
seen_keys.add(key) | |
articles.append({ | |
"title": entry.get("title", "No Title"), | |
"link": link, | |
"description": description, | |
"published": published, | |
"category": category, | |
}) | |
except Exception as e: | |
logger.error(f"Error fetching or parsing feed {feed_url}: {e}") | |
logger.info(f"Total unique articles fetched: {len(articles)}") | |
return articles | |
def process_and_store_articles(articles, vector_db): | |
"""Processes articles and stores them in the Chroma DB, avoiding duplicates.""" | |
new_docs = [] | |
new_doc_ids = [] | |
# Get all existing document IDs from the database once to check for duplicates | |
try: | |
existing_ids = set(vector_db.get(include=[])["ids"]) | |
logger.info(f"Found {len(existing_ids)} existing documents in the database.") | |
except Exception as e: | |
logger.error(f"Could not retrieve existing IDs from DB. Assuming empty. Error: {e}") | |
existing_ids = set() | |
for article in articles: | |
try: | |
# Recreate the same unique ID format for checking against the DB | |
title = clean_text(article["title"]) | |
link = article["link"] | |
published = article["published"] | |
description = article["description"] | |
description_hash = hashlib.sha256(description.encode('utf-8')).hexdigest() | |
doc_id = f"{title}|{link}|{published}|{description_hash}" | |
if doc_id in existing_ids: | |
logger.debug(f"Skipping duplicate article (ID already in DB): {title[:50]}...") | |
continue | |
# Add to our in-memory set to avoid duplicates from the same batch | |
existing_ids.add(doc_id) | |
metadata = { | |
"title": article["title"], | |
"link": article["link"], | |
"published": article["published"], | |
"category": article["category"], | |
# Store original description if needed, or keep it clean | |
# "original_description": article["description"], | |
} | |
# The Document object itself doesn't take an ID | |
doc = Document(page_content=description, metadata=metadata) | |
new_docs.append(doc) | |
new_doc_ids.append(doc_id) | |
except Exception as e: | |
logger.error(f"Error processing article '{article.get('title', 'N/A')}': {e}") | |
if new_docs: | |
logger.info(f"Adding {len(new_docs)} new documents to the database...") | |
try: | |
# Provide the list of documents and a parallel list of their unique IDs | |
vector_db.add_documents(documents=new_docs, ids=new_doc_ids) | |
vector_db.persist() # Save changes to disk | |
logger.info("Successfully added new documents and persisted the database.") | |
except Exception as e: | |
logger.error(f"Failed to add documents to Chroma DB: {e}") | |
else: | |
logger.info("No new articles to add to the database.") | |
def upload_to_hf_hub(): | |
"""Uploads the local Chroma DB directory to the Hugging Face Hub.""" | |
if not os.path.exists(LOCAL_DB_DIR): | |
logger.warning(f"Local database directory '{LOCAL_DB_DIR}' not found. Nothing to upload.") | |
return | |
try: | |
# Ensure the repo exists before uploading. | |
hf_api.create_repo(repo_id=REPO_ID, repo_type="dataset", exist_ok=True) | |
logger.info(f"Uploading database from '{LOCAL_DB_DIR}' to Hugging Face repo: {REPO_ID}...") | |
# upload_folder is the recommended way to upload a directory's contents. | |
hf_api.upload_folder( | |
folder_path=LOCAL_DB_DIR, | |
repo_id=REPO_ID, | |
repo_type="dataset", | |
commit_message=f"Update database - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}" | |
) | |
logger.info(f"Database successfully uploaded to {REPO_ID}.") | |
except Exception as e: | |
logger.error(f"Error uploading to Hugging Face Hub: {e}") | |
def main(): | |
"""Main execution function to run the data pipeline.""" | |
# 1. Ensure local DB exists by downloading from Hub or creating a new one | |
setup_local_db() | |
# 2. Initialize the vector DB object *after* the directory is guaranteed to exist | |
logger.info("Initializing Chroma vector database...") | |
vector_db = Chroma( | |
persist_directory=LOCAL_DB_DIR, | |
embedding_function=embedding_model, | |
collection_name=COLLECTION_NAME | |
) | |
logger.info("Chroma DB initialized.") | |
# 3. Fetch new articles from RSS feeds | |
articles = fetch_rss_feeds() | |
# 4. Process new articles and add them to the DB | |
if articles: | |
process_and_store_articles(articles, vector_db) | |
# 5. Upload the potentially updated database back to the Hub | |
upload_to_hf_hub() | |
logger.info("Script finished.") | |
if __name__ == "__main__": | |
main() |