RSS_News_1 / rss_processor.py
broadfield-dev's picture
Create rss_processor.py
cf10c85 verified
raw
history blame
6.58 kB
import os
import feedparser
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.docstore.document import Document
import logging
from huggingface_hub import HfApi, login, snapshot_download
import shutil
import json
from datetime import datetime
import dateutil.parser
import hashlib
import re
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
LOCAL_DB_DIR = "chroma_db"
COLLECTION_NAME = "news_articles"
HF_API_TOKEN = os.getenv("DEMO_HF_API_TOKEN", "YOUR_HF_API_TOKEN")
REPO_ID = "broadfield-dev/news-rag-db"
FEEDS_FILE = "rss_feeds.json"
login(token=HF_API_TOKEN)
hf_api = HfApi()
def get_embedding_model():
if not hasattr(get_embedding_model, "model"):
get_embedding_model.model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
return get_embedding_model.model
def clean_text(text):
if not text or not isinstance(text, str):
return ""
text = re.sub(r'<.*?>', '', text)
text = ' '.join(text.split())
return text.strip().lower()
def fetch_rss_feeds():
articles = []
seen_keys = set()
try:
with open(FEEDS_FILE, 'r') as f:
feed_categories = json.load(f)
except FileNotFoundError:
logger.error(f"{FEEDS_FILE} not found. No feeds to process.")
return []
for category, feeds in feed_categories.items():
for feed_info in feeds:
feed_url = feed_info.get("url")
if not feed_url:
continue
try:
logger.info(f"Fetching '{feed_info.get('name', feed_url)}' from category '{category}'")
# Add a User-Agent to prevent getting blocked
feed = feedparser.parse(feed_url, agent="RSSNewsBot/1.0 (+http://huggingface.co/spaces/broadfield-dev/RSS_News)")
if feed.bozo:
logger.warning(f"Parse error for {feed_url}: {feed.bozo_exception}")
continue
for entry in feed.entries[:10]: # Process max 10 entries per feed
title = entry.get("title", "No Title")
link = entry.get("link", "")
description = entry.get("summary", entry.get("description", ""))
cleaned_title = clean_text(title)
cleaned_link = clean_text(link)
published = "Unknown Date"
for date_field in ["published", "updated", "created", "pubDate"]:
if date_field in entry:
try:
parsed_date = dateutil.parser.parse(entry[date_field])
published = parsed_date.strftime("%Y-%m-%d %H:%M:%S")
break
except (ValueError, TypeError):
continue
key = f"{cleaned_title}|{cleaned_link}|{published}"
if key not in seen_keys:
seen_keys.add(key)
image = "svg"
if 'media_content' in entry and entry.media_content:
image = entry.media_content[0].get('url', 'svg')
elif 'media_thumbnail' in entry and entry.media_thumbnail:
image = entry.media_thumbnail[0].get('url', 'svg')
articles.append({
"title": title,
"link": link,
"description": description,
"published": published,
"category": category, # Directly use category from JSON
"image": image,
})
except Exception as e:
logger.error(f"Error fetching {feed_url}: {e}")
logger.info(f"Total articles fetched: {len(articles)}")
return articles
def process_and_store_articles(articles):
vector_db = Chroma(
persist_directory=LOCAL_DB_DIR,
embedding_function=get_embedding_model(),
collection_name=COLLECTION_NAME
)
try:
existing_ids = set(vector_db.get(include=[])["ids"])
except Exception:
existing_ids = set()
docs_to_add = []
ids_to_add = []
for article in articles:
cleaned_title = clean_text(article["title"])
cleaned_link = clean_text(article["link"])
doc_id = f"{cleaned_title}|{cleaned_link}|{article['published']}"
if doc_id in existing_ids:
continue
metadata = {
"title": article["title"],
"link": article["link"],
"original_description": article["description"],
"published": article["published"],
"category": article["category"],
"image": article["image"],
}
doc = Document(page_content=clean_text(article["description"]), metadata=metadata)
docs_to_add.append(doc)
ids_to_add.append(doc_id)
existing_ids.add(doc_id)
if docs_to_add:
vector_db.add_documents(documents=docs_to_add, ids=ids_to_add)
vector_db.persist()
logger.info(f"Added {len(docs_to_add)} new articles to DB. Total in DB: {vector_db._collection.count()}")
def download_from_hf_hub():
if not os.path.exists(LOCAL_DB_DIR):
try:
snapshot_download(
repo_id=REPO_ID,
repo_type="dataset",
local_dir=".",
local_dir_use_symlinks=False,
allow_patterns=f"{LOCAL_DB_DIR}/**",
token=HF_API_TOKEN
)
except Exception as e:
logger.warning(f"Could not download DB from Hub (this is normal on first run): {e}")
def upload_to_hf_hub():
if os.path.exists(LOCAL_DB_DIR):
try:
hf_api.upload_folder(
folder_path=LOCAL_DB_DIR,
path_in_repo=LOCAL_DB_DIR,
repo_id=REPO_ID,
repo_type="dataset",
token=HF_API_TOKEN,
commit_message="Update RSS news database"
)
except Exception as e:
logger.error(f"Error uploading to Hugging Face Hub: {e}")
if __name__ == "__main__":
download_from_hf_hub()
articles = fetch_rss_feeds()
if articles:
process_and_store_articles(articles)
upload_to_hf_hub()