import os import feedparser from langchain.vectorstores import Chroma from langchain.embeddings import HuggingFaceEmbeddings from langchain.docstore.document import Document import logging from huggingface_hub import HfApi, login, snapshot_download import shutil import json from datetime import datetime import dateutil.parser import hashlib import re logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) LOCAL_DB_DIR = "chroma_db" COLLECTION_NAME = "news_articles" HF_API_TOKEN = os.getenv("DEMO_HF_API_TOKEN", "YOUR_HF_API_TOKEN") REPO_ID = "broadfield-dev/news-rag-db" FEEDS_FILE = "rss_feeds.json" login(token=HF_API_TOKEN) hf_api = HfApi() def get_embedding_model(): if not hasattr(get_embedding_model, "model"): get_embedding_model.model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") return get_embedding_model.model def clean_text(text): if not text or not isinstance(text, str): return "" text = re.sub(r'<.*?>', '', text) text = ' '.join(text.split()) return text.strip().lower() def fetch_rss_feeds(): articles = [] seen_keys = set() try: with open(FEEDS_FILE, 'r') as f: feed_categories = json.load(f) except FileNotFoundError: logger.error(f"{FEEDS_FILE} not found. No feeds to process.") return [] for category, feeds in feed_categories.items(): for feed_info in feeds: feed_url = feed_info.get("url") if not feed_url: continue try: logger.info(f"Fetching '{feed_info.get('name', feed_url)}' from category '{category}'") # Add a User-Agent to prevent getting blocked feed = feedparser.parse(feed_url, agent="RSSNewsBot/1.0 (+http://huggingface.co/spaces/broadfield-dev/RSS_News)") if feed.bozo: logger.warning(f"Parse error for {feed_url}: {feed.bozo_exception}") continue for entry in feed.entries[:10]: # Process max 10 entries per feed title = entry.get("title", "No Title") link = entry.get("link", "") description = entry.get("summary", entry.get("description", "")) cleaned_title = clean_text(title) cleaned_link = clean_text(link) published = "Unknown Date" for date_field in ["published", "updated", "created", "pubDate"]: if date_field in entry: try: parsed_date = dateutil.parser.parse(entry[date_field]) published = parsed_date.strftime("%Y-%m-%d %H:%M:%S") break except (ValueError, TypeError): continue key = f"{cleaned_title}|{cleaned_link}|{published}" if key not in seen_keys: seen_keys.add(key) image = "svg" if 'media_content' in entry and entry.media_content: image = entry.media_content[0].get('url', 'svg') elif 'media_thumbnail' in entry and entry.media_thumbnail: image = entry.media_thumbnail[0].get('url', 'svg') articles.append({ "title": title, "link": link, "description": description, "published": published, "category": category, # Directly use category from JSON "image": image, }) except Exception as e: logger.error(f"Error fetching {feed_url}: {e}") logger.info(f"Total articles fetched: {len(articles)}") return articles def process_and_store_articles(articles): vector_db = Chroma( persist_directory=LOCAL_DB_DIR, embedding_function=get_embedding_model(), collection_name=COLLECTION_NAME ) try: existing_ids = set(vector_db.get(include=[])["ids"]) except Exception: existing_ids = set() docs_to_add = [] ids_to_add = [] for article in articles: cleaned_title = clean_text(article["title"]) cleaned_link = clean_text(article["link"]) doc_id = f"{cleaned_title}|{cleaned_link}|{article['published']}" if doc_id in existing_ids: continue metadata = { "title": article["title"], "link": article["link"], "original_description": article["description"], "published": article["published"], "category": article["category"], "image": article["image"], } doc = Document(page_content=clean_text(article["description"]), metadata=metadata) docs_to_add.append(doc) ids_to_add.append(doc_id) existing_ids.add(doc_id) if docs_to_add: vector_db.add_documents(documents=docs_to_add, ids=ids_to_add) vector_db.persist() logger.info(f"Added {len(docs_to_add)} new articles to DB. Total in DB: {vector_db._collection.count()}") def download_from_hf_hub(): if not os.path.exists(LOCAL_DB_DIR): try: snapshot_download( repo_id=REPO_ID, repo_type="dataset", local_dir=".", local_dir_use_symlinks=False, allow_patterns=f"{LOCAL_DB_DIR}/**", token=HF_API_TOKEN ) except Exception as e: logger.warning(f"Could not download DB from Hub (this is normal on first run): {e}") def upload_to_hf_hub(): if os.path.exists(LOCAL_DB_DIR): try: hf_api.upload_folder( folder_path=LOCAL_DB_DIR, path_in_repo=LOCAL_DB_DIR, repo_id=REPO_ID, repo_type="dataset", token=HF_API_TOKEN, commit_message="Update RSS news database" ) except Exception as e: logger.error(f"Error uploading to Hugging Face Hub: {e}") if __name__ == "__main__": download_from_hf_hub() articles = fetch_rss_feeds() if articles: process_and_store_articles(articles) upload_to_hf_hub()