broadfield-dev commited on
Commit
7e1ea76
·
verified ·
1 Parent(s): ebf6a83

Update rss_processor.py

Browse files
Files changed (1) hide show
  1. rss_processor.py +142 -81
rss_processor.py CHANGED
@@ -4,7 +4,7 @@ from langchain.vectorstores import Chroma
4
  from langchain.embeddings import HuggingFaceEmbeddings
5
  from langchain.docstore.document import Document
6
  import logging
7
- from huggingface_hub import HfApi, login, snapshot_download
8
  import shutil
9
  import json
10
  from datetime import datetime
@@ -12,24 +12,34 @@ import dateutil.parser
12
  import hashlib
13
  import re
14
 
 
15
  logging.basicConfig(level=logging.INFO)
16
  logger = logging.getLogger(__name__)
17
 
 
 
18
  LOCAL_DB_DIR = "chroma_db"
 
19
  COLLECTION_NAME = "news_articles"
20
  HF_API_TOKEN = os.getenv("HF_TOKEN")
21
  REPO_ID = "broadfield-dev/news-rag-db"
22
- FEEDS_FILE = "rss_feeds.json"
23
 
 
24
  login(token=HF_API_TOKEN)
25
  hf_api = HfApi()
26
 
27
- def get_embedding_model():
28
- if not hasattr(get_embedding_model, "model"):
29
- get_embedding_model.model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
30
- return get_embedding_model.model
 
 
 
 
 
31
 
32
  def clean_text(text):
 
33
  if not text or not isinstance(text, str):
34
  return ""
35
  text = re.sub(r'<.*?>', '', text)
@@ -51,25 +61,27 @@ def fetch_rss_feeds():
51
  for feed_info in feeds:
52
  feed_url = feed_info.get("url")
53
  if not feed_url:
 
54
  continue
55
 
56
  try:
57
- logger.info(f"Fetching '{feed_info.get('name', feed_url)}' from category '{category}'")
58
- # Add a User-Agent to prevent getting blocked
59
- feed = feedparser.parse(feed_url, agent="RSSNewsBot/1.0 (+http://huggingface.co/spaces/broadfield-dev/RSS_News)")
60
-
61
  if feed.bozo:
62
  logger.warning(f"Parse error for {feed_url}: {feed.bozo_exception}")
63
  continue
64
-
65
- for entry in feed.entries[:10]: # Process max 10 entries per feed
 
 
66
  title = entry.get("title", "No Title")
67
  link = entry.get("link", "")
68
  description = entry.get("summary", entry.get("description", ""))
69
 
70
- cleaned_title = clean_text(title)
71
- cleaned_link = clean_text(link)
72
-
 
73
  published = "Unknown Date"
74
  for date_field in ["published", "updated", "created", "pubDate"]:
75
  if date_field in entry:
@@ -77,104 +89,153 @@ def fetch_rss_feeds():
77
  parsed_date = dateutil.parser.parse(entry[date_field])
78
  published = parsed_date.strftime("%Y-%m-%d %H:%M:%S")
79
  break
80
- except (ValueError, TypeError):
 
81
  continue
82
 
83
- key = f"{cleaned_title}|{cleaned_link}|{published}"
 
84
  if key not in seen_keys:
85
  seen_keys.add(key)
86
  image = "svg"
87
- if 'media_content' in entry and entry.media_content:
88
- image = entry.media_content[0].get('url', 'svg')
89
- elif 'media_thumbnail' in entry and entry.media_thumbnail:
90
- image = entry.media_thumbnail[0].get('url', 'svg')
 
 
 
 
 
 
 
 
 
91
 
92
  articles.append({
93
  "title": title,
94
  "link": link,
95
  "description": description,
96
  "published": published,
97
- "category": category, # Directly use category from JSON
98
  "image": image,
99
  })
 
100
  except Exception as e:
101
  logger.error(f"Error fetching {feed_url}: {e}")
102
-
103
  logger.info(f"Total articles fetched: {len(articles)}")
104
  return articles
105
 
106
- def process_and_store_articles(articles):
107
- vector_db = Chroma(
108
- persist_directory=LOCAL_DB_DIR,
109
- embedding_function=get_embedding_model(),
110
- collection_name=COLLECTION_NAME
111
- )
112
-
113
- try:
114
- existing_ids = set(vector_db.get(include=[])["ids"])
115
- except Exception:
116
- existing_ids = set()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
117
 
118
- docs_to_add = []
119
- ids_to_add = []
120
-
121
  for article in articles:
122
- cleaned_title = clean_text(article["title"])
123
- cleaned_link = clean_text(article["link"])
124
- doc_id = f"{cleaned_title}|{cleaned_link}|{article['published']}"
125
-
126
- if doc_id in existing_ids:
127
- continue
128
-
129
- metadata = {
130
- "title": article["title"],
131
- "link": article["link"],
132
- "original_description": article["description"],
133
- "published": article["published"],
134
- "category": article["category"],
135
- "image": article["image"],
136
- }
137
- doc = Document(page_content=clean_text(article["description"]), metadata=metadata)
138
- docs_to_add.append(doc)
139
- ids_to_add.append(doc_id)
140
- existing_ids.add(doc_id)
 
 
 
 
141
 
142
- if docs_to_add:
143
- vector_db.add_documents(documents=docs_to_add, ids=ids_to_add)
144
- vector_db.persist()
145
- logger.info(f"Added {len(docs_to_add)} new articles to DB. Total in DB: {vector_db._collection.count()}")
 
 
 
146
 
147
  def download_from_hf_hub():
148
  if not os.path.exists(LOCAL_DB_DIR):
149
  try:
150
- snapshot_download(
151
- repo_id=REPO_ID,
152
- repo_type="dataset",
153
- local_dir=".",
154
- local_dir_use_symlinks=False,
155
- allow_patterns=f"{LOCAL_DB_DIR}/**",
156
- token=HF_API_TOKEN
157
- )
158
  except Exception as e:
159
- logger.warning(f"Could not download DB from Hub (this is normal on first run): {e}")
 
 
160
 
161
  def upload_to_hf_hub():
162
  if os.path.exists(LOCAL_DB_DIR):
163
  try:
164
- hf_api.upload_folder(
165
- folder_path=LOCAL_DB_DIR,
166
- path_in_repo=LOCAL_DB_DIR,
167
- repo_id=REPO_ID,
168
- repo_type="dataset",
169
- token=HF_API_TOKEN,
170
- commit_message="Update RSS news database"
171
- )
 
 
 
 
 
172
  except Exception as e:
173
  logger.error(f"Error uploading to Hugging Face Hub: {e}")
174
 
175
  if __name__ == "__main__":
176
- download_from_hf_hub()
177
  articles = fetch_rss_feeds()
178
- if articles:
179
- process_and_store_articles(articles)
180
- upload_to_hf_hub()
 
4
  from langchain.embeddings import HuggingFaceEmbeddings
5
  from langchain.docstore.document import Document
6
  import logging
7
+ from huggingface_hub import HfApi, login
8
  import shutil
9
  import json
10
  from datetime import datetime
 
12
  import hashlib
13
  import re
14
 
15
+ # Setup logging
16
  logging.basicConfig(level=logging.INFO)
17
  logger = logging.getLogger(__name__)
18
 
19
+ # Constants
20
+ MAX_ARTICLES_PER_FEED = 10
21
  LOCAL_DB_DIR = "chroma_db"
22
+ FEEDS_FILE = "rss_feeds.json"
23
  COLLECTION_NAME = "news_articles"
24
  HF_API_TOKEN = os.getenv("HF_TOKEN")
25
  REPO_ID = "broadfield-dev/news-rag-db"
 
26
 
27
+ # Initialize Hugging Face API
28
  login(token=HF_API_TOKEN)
29
  hf_api = HfApi()
30
 
31
+ # Initialize embedding model (global, reusable)
32
+ embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
33
+
34
+ # Initialize vector DB with a specific collection name
35
+ vector_db = Chroma(
36
+ persist_directory=LOCAL_DB_DIR,
37
+ embedding_function=embedding_model,
38
+ collection_name=COLLECTION_NAME
39
+ )
40
 
41
  def clean_text(text):
42
+ """Clean text by removing HTML tags and extra whitespace."""
43
  if not text or not isinstance(text, str):
44
  return ""
45
  text = re.sub(r'<.*?>', '', text)
 
61
  for feed_info in feeds:
62
  feed_url = feed_info.get("url")
63
  if not feed_url:
64
+ logger.warning(f"Skipping feed with no URL in category '{category}'")
65
  continue
66
 
67
  try:
68
+ logger.info(f"Fetching {feed_url}")
69
+ feed = feedparser.parse(feed_url)
 
 
70
  if feed.bozo:
71
  logger.warning(f"Parse error for {feed_url}: {feed.bozo_exception}")
72
  continue
73
+ article_count = 0
74
+ for entry in feed.entries:
75
+ if article_count >= MAX_ARTICLES_PER_FEED:
76
+ break
77
  title = entry.get("title", "No Title")
78
  link = entry.get("link", "")
79
  description = entry.get("summary", entry.get("description", ""))
80
 
81
+ title = clean_text(title)
82
+ link = clean_text(link)
83
+ description = clean_text(description)
84
+
85
  published = "Unknown Date"
86
  for date_field in ["published", "updated", "created", "pubDate"]:
87
  if date_field in entry:
 
89
  parsed_date = dateutil.parser.parse(entry[date_field])
90
  published = parsed_date.strftime("%Y-%m-%d %H:%M:%S")
91
  break
92
+ except (ValueError, TypeError) as e:
93
+ logger.debug(f"Failed to parse {date_field} '{entry[date_field]}': {e}")
94
  continue
95
 
96
+ description_hash = hashlib.sha256(description.encode('utf-8')).hexdigest()
97
+ key = f"{title}|{link}|{published}|{description_hash}"
98
  if key not in seen_keys:
99
  seen_keys.add(key)
100
  image = "svg"
101
+ for img_source in [
102
+ lambda e: clean_text(e.get("media_content", [{}])[0].get("url")) if e.get("media_content") else "",
103
+ lambda e: clean_text(e.get("media_thumbnail", [{}])[0].get("url")) if e.get("media_thumbnail") else "",
104
+ lambda e: clean_text(e.get("enclosure", {}).get("url")) if e.get("enclosure") else "",
105
+ lambda e: clean_text(next((lnk.get("href") for lnk in e.get("links", []) if lnk.get("type", "").startswith("image")), "")),
106
+ ]:
107
+ try:
108
+ img = img_source(entry)
109
+ if img and img.strip():
110
+ image = img
111
+ break
112
+ except (IndexError, AttributeError, TypeError):
113
+ continue
114
 
115
  articles.append({
116
  "title": title,
117
  "link": link,
118
  "description": description,
119
  "published": published,
120
+ "category": category, # Use JSON category directly
121
  "image": image,
122
  })
123
+ article_count += 1
124
  except Exception as e:
125
  logger.error(f"Error fetching {feed_url}: {e}")
 
126
  logger.info(f"Total articles fetched: {len(articles)}")
127
  return articles
128
 
129
+ def categorize_feed(url):
130
+ """Categorize an RSS feed based on its URL."""
131
+ if not url or not isinstance(url, str):
132
+ logger.warning(f"Invalid URL provided for categorization: {url}")
133
+ return "Uncategorized"
134
+
135
+ url = url.lower().strip() # Normalize the URL
136
+
137
+ logger.debug(f"Categorizing URL: {url}") # Add debugging for visibility
138
+
139
+ if any(keyword in url for keyword in ["nature", "science.org", "arxiv.org", "plos.org", "annualreviews.org", "journals.uchicago.edu", "jneurosci.org", "cell.com", "nejm.org", "lancet.com"]):
140
+ return "Academic Papers"
141
+ elif any(keyword in url for keyword in ["reuters.com/business", "bloomberg.com", "ft.com", "marketwatch.com", "cnbc.com", "foxbusiness.com", "wsj.com", "bworldonline.com", "economist.com", "forbes.com"]):
142
+ return "Business"
143
+ elif any(keyword in url for keyword in ["investing.com", "cnbc.com/market", "marketwatch.com/market", "fool.co.uk", "zacks.com", "seekingalpha.com", "barrons.com", "yahoofinance.com"]):
144
+ return "Stocks & Markets"
145
+ elif any(keyword in url for keyword in ["whitehouse.gov", "state.gov", "commerce.gov", "transportation.gov", "ed.gov", "dol.gov", "justice.gov", "federalreserve.gov", "occ.gov", "sec.gov", "bls.gov", "usda.gov", "gao.gov", "cbo.gov", "fema.gov", "defense.gov", "hhs.gov", "energy.gov", "interior.gov"]):
146
+ return "Federal Government"
147
+ elif any(keyword in url for keyword in ["weather.gov", "metoffice.gov.uk", "accuweather.com", "weatherunderground.com", "noaa.gov", "wunderground.com", "climate.gov", "ecmwf.int", "bom.gov.au"]):
148
+ return "Weather"
149
+ elif any(keyword in url for keyword in ["data.worldbank.org", "imf.org", "un.org", "oecd.org", "statista.com", "kff.org", "who.int", "cdc.gov", "bea.gov", "census.gov", "fdic.gov"]):
150
+ return "Data & Statistics"
151
+ elif any(keyword in url for keyword in ["nasa", "spaceweatherlive", "space", "universetoday", "skyandtelescope", "esa"]):
152
+ return "Space"
153
+ elif any(keyword in url for keyword in ["sciencedaily", "quantamagazine", "smithsonianmag", "popsci", "discovermagazine", "scientificamerican", "newscientist", "livescience", "atlasobscura"]):
154
+ return "Science"
155
+ elif any(keyword in url for keyword in ["wired", "techcrunch", "arstechnica", "gizmodo", "theverge"]):
156
+ return "Tech"
157
+ elif any(keyword in url for keyword in ["horoscope", "astrostyle"]):
158
+ return "Astrology"
159
+ elif any(keyword in url for keyword in ["cnn_allpolitics", "bbci.co.uk/news/politics", "reuters.com/arc/outboundfeeds/newsletter-politics", "politico.com/rss/politics", "thehill"]):
160
+ return "Politics"
161
+ elif any(keyword in url for keyword in ["weather", "swpc.noaa.gov", "foxweather"]):
162
+ return "Earth Weather"
163
+ elif "vogue" in url:
164
+ return "Lifestyle"
165
+ elif any(keyword in url for keyword in ["phys.org", "aps.org", "physicsworld"]):
166
+ return "Physics"
167
+ else:
168
+ logger.warning(f"No matching category found for URL: {url}")
169
+ return "Uncategorized"
170
 
171
+ def process_and_store_articles(articles):
172
+ documents = []
173
+ existing_ids = set(vector_db.get()["ids"]) # Load existing IDs once
174
  for article in articles:
175
+ try:
176
+ title = clean_text(article["title"])
177
+ link = clean_text(article["link"])
178
+ description = clean_text(article["description"])
179
+ published = article["published"]
180
+ description_hash = hashlib.sha256(description.encode('utf-8')).hexdigest()
181
+ doc_id = f"{title}|{link}|{published}|{description_hash}"
182
+ if doc_id in existing_ids:
183
+ logger.debug(f"Skipping duplicate in DB: {doc_id}")
184
+ continue
185
+ metadata = {
186
+ "title": article["title"],
187
+ "link": article["link"],
188
+ "original_description": article["description"],
189
+ "published": article["published"],
190
+ "category": article["category"],
191
+ "image": article["image"],
192
+ }
193
+ doc = Document(page_content=description, metadata=metadata, id=doc_id)
194
+ documents.append(doc)
195
+ existing_ids.add(doc_id) # Update in-memory set to avoid duplicates within this batch
196
+ except Exception as e:
197
+ logger.error(f"Error processing article {article['title']}: {e}")
198
 
199
+ if documents:
200
+ try:
201
+ vector_db.add_documents(documents)
202
+ vector_db.persist()
203
+ logger.info(f"Added {len(documents)} new articles to DB. Total documents: {len(vector_db.get()['ids'])}")
204
+ except Exception as e:
205
+ logger.error(f"Error storing articles: {e}")
206
 
207
  def download_from_hf_hub():
208
  if not os.path.exists(LOCAL_DB_DIR):
209
  try:
210
+ hf_api.create_repo(repo_id=REPO_ID, repo_type="dataset", exist_ok=True, token=HF_API_TOKEN)
211
+ logger.info(f"Downloading Chroma DB from {REPO_ID}...")
212
+ hf_api.hf_hub_download(repo_id=REPO_ID, filename="chroma_db", local_dir=LOCAL_DB_DIR, repo_type="dataset", token=HF_API_TOKEN)
 
 
 
 
 
213
  except Exception as e:
214
+ logger.error(f"Error downloading from Hugging Face Hub: {e}")
215
+ else:
216
+ logger.info("Local Chroma DB exists, loading existing data.")
217
 
218
  def upload_to_hf_hub():
219
  if os.path.exists(LOCAL_DB_DIR):
220
  try:
221
+ logger.info(f"Uploading updated Chroma DB to {REPO_ID}...")
222
+ for root, _, files in os.walk(LOCAL_DB_DIR):
223
+ for file in files:
224
+ local_path = os.path.join(root, file)
225
+ remote_path = os.path.relpath(local_path, LOCAL_DB_DIR)
226
+ hf_api.upload_file(
227
+ path_or_fileobj=local_path,
228
+ path_in_repo=remote_path,
229
+ repo_id=REPO_ID,
230
+ repo_type="dataset",
231
+ token=HF_API_TOKEN
232
+ )
233
+ logger.info(f"Database uploaded to: {REPO_ID}")
234
  except Exception as e:
235
  logger.error(f"Error uploading to Hugging Face Hub: {e}")
236
 
237
  if __name__ == "__main__":
238
+ download_from_hf_hub() # Ensure DB is initialized
239
  articles = fetch_rss_feeds()
240
+ process_and_store_articles(articles)
241
+ upload_to_hf_hub()