RSS_News_1 / rss_processor.py
broadfield-dev's picture
Update rss_processor.py
38db139 verified
raw
history blame
10.5 kB
import os
import feedparser
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.docstore.document import Document
import logging
from huggingface_hub import HfApi, login, snapshot_download
from huggingface_hub.utils import HfHubHTTPError
import json
import dateutil.parser
import hashlib
import re
from datetime import datetime
# Setup logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# --- Constants ---
# Local and repository configuration
LOCAL_DB_DIR = "chroma_db_news" # Using a more descriptive local directory name
FEEDS_FILE = "rss_feeds.json"
COLLECTION_NAME = "news_articles"
REPO_ID = "broadfield-dev/news-rag-db" # Your Hugging Face Hub repo ID
# RSS feed fetching configuration
MAX_ARTICLES_PER_FEED = 1000
# Hugging Face credentials
HF_API_TOKEN = os.getenv("HF_TOKEN")
if not HF_API_TOKEN:
raise ValueError("Hugging Face API token not found. Please set the HF_TOKEN environment variable.")
# --- Global Initializations ---
# Initialize Hugging Face API
login(token=HF_API_TOKEN)
hf_api = HfApi()
# Initialize embedding model once to be reused
logger.info("Loading embedding model...")
embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
logger.info("Embedding model loaded.")
def setup_local_db():
"""
Ensures the local database directory exists.
If it doesn't, it attempts to download from Hugging Face Hub.
If the Hub repo is empty or doesn't exist, it creates a new local directory.
This function handles the "build a new dataset if one does not exist" requirement.
"""
if os.path.exists(LOCAL_DB_DIR):
logger.info(f"Local database found at '{LOCAL_DB_DIR}'.")
return
logger.info(f"Local database not found. Attempting to download from Hugging Face Hub repo: {REPO_ID}")
try:
# snapshot_download is the correct function for downloading a whole repository/folder
snapshot_download(
repo_id=REPO_ID,
repo_type="dataset",
local_dir=LOCAL_DB_DIR,
token=HF_API_TOKEN,
)
logger.info(f"Database downloaded successfully from {REPO_ID} to {LOCAL_DB_DIR}.")
except HfHubHTTPError as e:
# This error (e.g., 404 Not Found) is expected if the repo is new or empty.
logger.warning(
f"Failed to download from Hub (Repo might be new or empty): {e}. "
f"A new local database will be created at '{LOCAL_DB_DIR}'."
)
os.makedirs(LOCAL_DB_DIR, exist_ok=True)
except Exception as e:
logger.error(f"An unexpected error occurred during DB download: {e}")
logger.info(f"Creating a new local database at '{LOCAL_DB_DIR}'.")
os.makedirs(LOCAL_DB_DIR, exist_ok=True)
def clean_text(text):
"""Clean text by removing HTML tags and extra whitespace."""
if not text or not isinstance(text, str):
return ""
# Remove HTML tags
text = re.sub(r'<.*?>', '', text)
# Normalize whitespace
text = ' '.join(text.split())
return text.strip().lower()
def fetch_rss_feeds():
"""Fetches and parses articles from a list of RSS feeds in a JSON file."""
articles = []
seen_keys = set()
try:
with open(FEEDS_FILE, 'r') as f:
feed_categories = json.load(f)
except FileNotFoundError:
logger.error(f"'{FEEDS_FILE}' not found. Please create it. No feeds to process.")
return []
for category, feeds in feed_categories.items():
for feed_info in feeds:
feed_url = feed_info.get("url")
if not feed_url:
logger.warning(f"Skipping feed with no URL in category '{category}'")
continue
try:
logger.info(f"Fetching {feed_url}")
feed = feedparser.parse(feed_url)
if feed.bozo:
logger.warning(f"Feed parsing error for {feed_url}: {feed.bozo_exception}")
continue
for entry in feed.entries[:MAX_ARTICLES_PER_FEED]:
title = clean_text(entry.get("title", "No Title"))
link = entry.get("link", "") # Don't clean link URL
description = clean_text(entry.get("summary", entry.get("description", "")))
if not description: # Skip articles without content
continue
published = "Unknown Date"
for date_field in ["published", "updated", "created", "pubDate"]:
if date_field in entry:
try:
parsed_date = dateutil.parser.parse(entry[date_field])
published = parsed_date.strftime("%Y-%m-%d %H:%M:%S")
break
except (ValueError, TypeError):
continue
# Create a unique key to deduplicate articles before processing
description_hash = hashlib.sha256(description.encode('utf-8')).hexdigest()
key = f"{title}|{link}|{published}|{description_hash}"
if key not in seen_keys:
seen_keys.add(key)
articles.append({
"title": entry.get("title", "No Title"),
"link": link,
"description": description,
"published": published,
"category": category,
})
except Exception as e:
logger.error(f"Error fetching or parsing feed {feed_url}: {e}")
logger.info(f"Total unique articles fetched: {len(articles)}")
return articles
def process_and_store_articles(articles, vector_db):
"""Processes articles and stores them in the Chroma DB, avoiding duplicates."""
new_docs = []
new_doc_ids = []
# Get all existing document IDs from the database once to check for duplicates
try:
existing_ids = set(vector_db.get(include=[])["ids"])
logger.info(f"Found {len(existing_ids)} existing documents in the database.")
except Exception as e:
logger.error(f"Could not retrieve existing IDs from DB. Assuming empty. Error: {e}")
existing_ids = set()
for article in articles:
try:
# Recreate the same unique ID format for checking against the DB
title = clean_text(article["title"])
link = article["link"]
published = article["published"]
description = article["description"]
description_hash = hashlib.sha256(description.encode('utf-8')).hexdigest()
doc_id = f"{title}|{link}|{published}|{description_hash}"
if doc_id in existing_ids:
logger.debug(f"Skipping duplicate article (ID already in DB): {title[:50]}...")
continue
# Add to our in-memory set to avoid duplicates from the same batch
existing_ids.add(doc_id)
metadata = {
"title": article["title"],
"link": article["link"],
"published": article["published"],
"category": article["category"],
# Store original description if needed, or keep it clean
# "original_description": article["description"],
}
# The Document object itself doesn't take an ID
doc = Document(page_content=description, metadata=metadata)
new_docs.append(doc)
new_doc_ids.append(doc_id)
except Exception as e:
logger.error(f"Error processing article '{article.get('title', 'N/A')}': {e}")
if new_docs:
logger.info(f"Adding {len(new_docs)} new documents to the database...")
try:
# Provide the list of documents and a parallel list of their unique IDs
vector_db.add_documents(documents=new_docs, ids=new_doc_ids)
vector_db.persist() # Save changes to disk
logger.info("Successfully added new documents and persisted the database.")
except Exception as e:
logger.error(f"Failed to add documents to Chroma DB: {e}")
else:
logger.info("No new articles to add to the database.")
def upload_to_hf_hub():
"""Uploads the local Chroma DB directory to the Hugging Face Hub."""
if not os.path.exists(LOCAL_DB_DIR):
logger.warning(f"Local database directory '{LOCAL_DB_DIR}' not found. Nothing to upload.")
return
try:
# Ensure the repo exists before uploading.
hf_api.create_repo(repo_id=REPO_ID, repo_type="dataset", exist_ok=True)
logger.info(f"Uploading database from '{LOCAL_DB_DIR}' to Hugging Face repo: {REPO_ID}...")
# upload_folder is the recommended way to upload a directory's contents.
hf_api.upload_folder(
folder_path=LOCAL_DB_DIR,
repo_id=REPO_ID,
repo_type="dataset",
commit_message=f"Update database - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
logger.info(f"Database successfully uploaded to {REPO_ID}.")
except Exception as e:
logger.error(f"Error uploading to Hugging Face Hub: {e}")
def main():
"""Main execution function to run the data pipeline."""
# 1. Ensure local DB exists by downloading from Hub or creating a new one
setup_local_db()
# 2. Initialize the vector DB object *after* the directory is guaranteed to exist
logger.info("Initializing Chroma vector database...")
vector_db = Chroma(
persist_directory=LOCAL_DB_DIR,
embedding_function=embedding_model,
collection_name=COLLECTION_NAME
)
logger.info("Chroma DB initialized.")
# 3. Fetch new articles from RSS feeds
articles = fetch_rss_feeds()
# 4. Process new articles and add them to the DB
if articles:
process_and_store_articles(articles, vector_db)
# 5. Upload the potentially updated database back to the Hub
upload_to_hf_hub()
logger.info("Script finished.")
if __name__ == "__main__":
main()