import os import feedparser from langchain.vectorstores import Chroma from langchain.embeddings import HuggingFaceEmbeddings from langchain.docstore.document import Document import logging from huggingface_hub import HfApi, login import shutil import rss_feeds # Setup logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) # Constants MAX_ARTICLES_PER_FEED = 5 LOCAL_DB_DIR = "chroma_db" RSS_FEEDS = rss_feeds.RSS_FEEDS COLLECTION_NAME = "news_articles" # Explicitly name the collection HF_API_TOKEN = os.getenv("DEMO_HF_API_TOKEN", "YOUR_HF_API_TOKEN") REPO_ID = "broadfield-dev/news-rag-db" # Initialize Hugging Face API login(token=HF_API_TOKEN) hf_api = HfApi() # Initialize embedding model (global, reusable) embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") # Initialize vector DB with a specific collection name vector_db = Chroma( persist_directory=LOCAL_DB_DIR, embedding_function=embedding_model, collection_name=COLLECTION_NAME ) def fetch_rss_feeds(): articles = [] seen_keys = set() for feed_url in RSS_FEEDS: try: logger.info(f"Fetching {feed_url}") feed = feedparser.parse(feed_url) if feed.bozo: logger.warning(f"Parse error for {feed_url}: {feed.bozo_exception}") continue article_count = 0 for entry in feed.entries: if article_count >= MAX_ARTICLES_PER_FEED: break title = entry.get("title", "No Title").strip() link = entry.get("link", "").strip() description = entry.get("summary", entry.get("description", "No Description")) published = entry.get("published", "Unknown Date").strip() key = f"{title}|{link}|{published}" if key not in seen_keys: seen_keys.add(key) image = (entry.get("media_content", [{}])[0].get("url") or entry.get("media_thumbnail", [{}])[0].get("url") or "svg") articles.append({ "title": title, "link": link, "description": description, "published": published, "category": categorize_feed(feed_url), "image": image, }) article_count += 1 except Exception as e: logger.error(f"Error fetching {feed_url}: {e}") logger.info(f"Total articles fetched: {len(articles)}") return articles def categorize_feed(url): # (Unchanged, keeping your existing categorization logic) # ... def process_and_store_articles(articles): documents = [] existing_ids = set(vector_db.get()["ids"]) # Get existing document IDs to avoid duplicates for article in articles: try: # Create a unique ID for deduplication doc_id = f"{article['title']}|{article['link']}|{article['published']}" if doc_id in existing_ids: continue # Skip if already in DB metadata = { "title": article["title"], "link": article["link"], "original_description": article["description"], "published": article["published"], "category": article["category"], "image": article["image"], } doc = Document(page_content=article["description"], metadata=metadata, id=doc_id) documents.append(doc) except Exception as e: logger.error(f"Error processing article {article['title']}: {e}") if documents: try: vector_db.add_documents(documents) vector_db.persist() # Explicitly persist changes logger.info(f"Added {len(documents)} new articles to DB") except Exception as e: logger.error(f"Error storing articles: {e}") def download_from_hf_hub(): # Only download if the local DB doesn’t exist (initial setup) if not os.path.exists(LOCAL_DB_DIR): try: hf_api.create_repo(repo_id=REPO_ID, repo_type="dataset", exist_ok=True, token=HF_API_TOKEN) logger.info(f"Downloading Chroma DB from {REPO_ID}...") hf_api.download_repo(repo_id=REPO_ID, repo_type="dataset", local_dir=LOCAL_DB_DIR, token=HF_API_TOKEN) except Exception as e: logger.error(f"Error downloading from Hugging Face Hub: {e}") raise else: logger.info("Local Chroma DB already exists, skipping download.") def upload_to_hf_hub(): if os.path.exists(LOCAL_DB_DIR): try: logger.info(f"Uploading updated Chroma DB to {REPO_ID}...") for root, _, files in os.walk(LOCAL_DB_DIR): for file in files: local_path = os.path.join(root, file) remote_path = os.path.relpath(local_path, LOCAL_DB_DIR) hf_api.upload_file( path_or_fileobj=local_path, path_in_repo=remote_path, repo_id=REPO_ID, repo_type="dataset", token=HF_API_TOKEN ) logger.info(f"Database uploaded to: {REPO_ID}") except Exception as e: logger.error(f"Error uploading to Hugging Face Hub: {e}") raise if __name__ == "__main__": articles = fetch_rss_feeds() process_and_store_articles(articles) upload_to_hf_hub()