import os import feedparser from langchain.vectorstores import Chroma from langchain.embeddings import HuggingFaceEmbeddings from langchain.docstore.document import Document import logging from huggingface_hub import HfApi, login, snapshot_download from huggingface_hub.utils import HfHubHTTPError import json from datetime import datetime import dateutil.parser import hashlib import re logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) MAX_ARTICLES_PER_FEED = 1000 LOCAL_DB_DIR = "chroma_db" FEEDS_FILE = "rss_feeds.json" COLLECTION_NAME = "news_articles" HF_API_TOKEN = os.getenv("HF_TOKEN") REPO_ID = "broadfield-dev/news-rag-db" if not HF_API_TOKEN: raise ValueError("HF_TOKEN environment variable not set.") login(token=HF_API_TOKEN) hf_api = HfApi() embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") def clean_text(text): if not text or not isinstance(text, str): return "" text = re.sub(r'<.*?>', '', text) text = ' '.join(text.split()) return text.strip().lower() def fetch_rss_feeds(): articles = [] seen_keys = set() try: with open(FEEDS_FILE, 'r') as f: feed_categories = json.load(f) except FileNotFoundError: logger.error(f"{FEEDS_FILE} not found. No feeds to process.") return [] for category, feeds in feed_categories.items(): for feed_info in feeds: feed_url = feed_info.get("url") if not feed_url: logger.warning(f"Skipping feed with no URL in category '{category}'") continue try: logger.info(f"Fetching {feed_url}") feed = feedparser.parse(feed_url) if feed.bozo: logger.warning(f"Parse error for {feed_url}: {feed.bozo_exception}") continue article_count = 0 for entry in feed.entries: if article_count >= MAX_ARTICLES_PER_FEED: break title_raw = entry.get("title", "No Title") link = entry.get("link", "") description = entry.get("summary", entry.get("description", "")) clean_title_val = clean_text(title_raw) clean_desc_val = clean_text(description) if not clean_desc_val: continue published = "Unknown Date" for date_field in ["published", "updated", "created", "pubDate"]: if date_field in entry: try: parsed_date = dateutil.parser.parse(entry[date_field]) published = parsed_date.strftime("%Y-%m-%d %H:%M:%S") break except (ValueError, TypeError): continue description_hash = hashlib.sha256(clean_desc_val.encode('utf-8')).hexdigest() key = f"{clean_title_val}|{link}|{published}|{description_hash}" if key not in seen_keys: seen_keys.add(key) image = "svg" for img_source in [ lambda e: e.get("media_content", [{}])[0].get("url") if e.get("media_content") else "", lambda e: e.get("media_thumbnail", [{}])[0].get("url") if e.get("media_thumbnail") else "", lambda e: e.get("enclosure", {}).get("url") if e.get("enclosure") else "", lambda e: next((lnk.get("href") for lnk in e.get("links", []) if lnk.get("type", "").startswith("image")), ""), ]: try: img = img_source(entry) if img and img.strip(): image = img break except (IndexError, AttributeError, TypeError): continue articles.append({ "title": title_raw, "link": link, "description": clean_desc_val, "published": published, "category": category, "image": image, }) article_count += 1 except Exception as e: logger.error(f"Error fetching {feed_url}: {e}") logger.info(f"Total unique articles fetched: {len(articles)}") return articles def categorize_feed(url): if not url or not isinstance(url, str): logger.warning(f"Invalid URL provided for categorization: {url}") return "Uncategorized" url = url.lower().strip() logger.debug(f"Categorizing URL: {url}") if any(keyword in url for keyword in ["nature", "science.org", "arxiv.org", "plos.org", "annualreviews.org", "journals.uchicago.edu", "jneurosci.org", "cell.com", "nejm.org", "lancet.com"]): return "Academic Papers" elif any(keyword in url for keyword in ["reuters.com/business", "bloomberg.com", "ft.com", "marketwatch.com", "cnbc.com", "foxbusiness.com", "wsj.com", "bworldonline.com", "economist.com", "forbes.com"]): return "Business" elif any(keyword in url for keyword in ["investing.com", "cnbc.com/market", "marketwatch.com/market", "fool.co.uk", "zacks.com", "seekingalpha.com", "barrons.com", "yahoofinance.com"]): return "Stocks & Markets" elif any(keyword in url for keyword in ["whitehouse.gov", "state.gov", "commerce.gov", "transportation.gov", "ed.gov", "dol.gov", "justice.gov", "federalreserve.gov", "occ.gov", "sec.gov", "bls.gov", "usda.gov", "gao.gov", "cbo.gov", "fema.gov", "defense.gov", "hhs.gov", "energy.gov", "interior.gov"]): return "Federal Government" elif any(keyword in url for keyword in ["weather.gov", "metoffice.gov.uk", "accuweather.com", "weatherunderground.com", "noaa.gov", "wunderground.com", "climate.gov", "ecmwf.int", "bom.gov.au"]): return "Weather" elif any(keyword in url for keyword in ["data.worldbank.org", "imf.org", "un.org", "oecd.org", "statista.com", "kff.org", "who.int", "cdc.gov", "bea.gov", "census.gov", "fdic.gov"]): return "Data & Statistics" elif any(keyword in url for keyword in ["nasa", "spaceweatherlive", "space", "universetoday", "skyandtelescope", "esa"]): return "Space" elif any(keyword in url for keyword in ["sciencedaily", "quantamagazine", "smithsonianmag", "popsci", "discovermagazine", "scientificamerican", "newscientist", "livescience", "atlasobscura"]): return "Science" elif any(keyword in url for keyword in ["wired", "techcrunch", "arstechnica", "gizmodo", "theverge"]): return "Tech" elif any(keyword in url for keyword in ["horoscope", "astrostyle"]): return "Astrology" elif any(keyword in url for keyword in ["cnn_allpolitics", "bbci.co.uk/news/politics", "reuters.com/arc/outboundfeeds/newsletter-politics", "politico.com/rss/politics", "thehill"]): return "Politics" elif any(keyword in url for keyword in ["weather", "swpc.noaa.gov", "foxweather"]): return "Earth Weather" elif "vogue" in url: return "Lifestyle" elif any(keyword in url for keyword in ["phys.org", "aps.org", "physicsworld"]): return "Physics" else: logger.warning(f"No matching category found for URL: {url}") return "Uncategorized" def process_and_store_articles(articles, vector_db): documents = [] doc_ids = [] try: existing_ids = set(vector_db.get(include=[])["ids"]) logger.info(f"Found {len(existing_ids)} existing document IDs in the database.") except Exception: existing_ids = set() logger.info("No existing documents found or error retrieving them. Starting fresh.") for article in articles: try: title = clean_text(article["title"]) link = article["link"] description = article["description"] published = article["published"] description_hash = hashlib.sha256(description.encode('utf-8')).hexdigest() doc_id = f"{title}|{link}|{published}|{description_hash}" if doc_id in existing_ids: continue metadata = { "title": article["title"], "link": article["link"], "published": article["published"], "category": article["category"], "image": article["image"], } doc = Document(page_content=description, metadata=metadata) documents.append(doc) doc_ids.append(doc_id) existing_ids.add(doc_id) except Exception as e: logger.error(f"Error processing article {article['title']}: {e}") if documents: try: vector_db.add_documents(documents=documents, ids=doc_ids) vector_db.persist() logger.info(f"Added {len(documents)} new articles to DB. Total documents now: {len(vector_db.get()['ids'])}") except Exception as e: logger.error(f"Error storing articles: {e}") else: logger.info("No new articles to add.") def download_from_hf_hub(): if os.path.exists(LOCAL_DB_DIR): logger.info(f"Local database directory '{LOCAL_DB_DIR}' already exists. Skipping download.") return logger.info(f"Attempting to download database from Hugging Face Hub repo: {REPO_ID}") try: snapshot_download( repo_id=REPO_ID, repo_type="dataset", local_dir=LOCAL_DB_DIR, token=HF_API_TOKEN, ) logger.info(f"Database successfully downloaded to '{LOCAL_DB_DIR}'.") except HfHubHTTPError as e: logger.warning(f"Failed to download from Hub (repo may be new or empty): {e}. Building new dataset locally.") os.makedirs(LOCAL_DB_DIR, exist_ok=True) except Exception as e: logger.error(f"An unexpected error occurred during download: {e}. Creating new local directory.") os.makedirs(LOCAL_DB_DIR, exist_ok=True) def upload_to_hf_hub(): if not os.path.exists(LOCAL_DB_DIR): logger.warning(f"Local database directory '{LOCAL_DB_DIR}' not found. Nothing to upload.") return try: hf_api.create_repo(repo_id=REPO_ID, repo_type="dataset", exist_ok=True) logger.info(f"Uploading updated Chroma DB to {REPO_ID}...") hf_api.upload_folder( folder_path=LOCAL_DB_DIR, repo_id=REPO_ID, repo_type="dataset", commit_message=f"Update database - {datetime.now().isoformat()}" ) logger.info(f"Database uploaded successfully to Hugging Face Hub.") except Exception as e: logger.error(f"Error uploading to Hugging Face Hub: {e}") def run_update_pipeline(): download_from_hf_hub() vector_db = Chroma( persist_directory=LOCAL_DB_DIR, embedding_function=embedding_model, collection_name=COLLECTION_NAME ) articles = fetch_rss_feeds() if articles: process_and_store_articles(articles, vector_db) upload_to_hf_hub() logger.info("Update pipeline finished.")