broadfield-dev commited on
Commit
54046f4
·
verified ·
1 Parent(s): 9a08a0f

Update rss_processor.py

Browse files
Files changed (1) hide show
  1. rss_processor.py +100 -121
rss_processor.py CHANGED
@@ -1,19 +1,17 @@
1
  import os
2
  import feedparser
3
  from chromadb import PersistentClient
4
- from langchain.embeddings import HuggingFaceEmbeddings
5
- from langchain.docstore.document import Document
6
  import logging
7
  from huggingface_hub import HfApi, login, snapshot_download
8
- import shutil
9
- import rss_feeds
10
  from datetime import datetime
11
  import dateutil.parser
12
  import hashlib
13
  import json
14
  import re
15
 
16
- logging.basicConfig(level=logging.INFO)
17
  logger = logging.getLogger(__name__)
18
 
19
  LOCAL_DB_DIR = "chroma_db"
@@ -22,8 +20,17 @@ COLLECTION_NAME = "news_articles"
22
  HF_API_TOKEN = os.getenv("HF_TOKEN")
23
  REPO_ID = "broadfield-dev/news-rag-db"
24
  MAX_ARTICLES_PER_FEED = 1000
25
- login(token=HF_API_TOKEN)
26
- hf_api = HfApi()
 
 
 
 
 
 
 
 
 
27
 
28
  def get_embedding_model():
29
  if not hasattr(get_embedding_model, "model"):
@@ -35,11 +42,10 @@ def clean_text(text):
35
  return ""
36
  text = re.sub(r'<.*?>', '', text)
37
  text = ' '.join(text.split())
38
- return text.strip().lower()
39
 
40
  def fetch_rss_feeds():
41
  articles = []
42
- seen_keys = set()
43
 
44
  try:
45
  with open(FEEDS_FILE, 'r') as f:
@@ -61,104 +67,56 @@ def fetch_rss_feeds():
61
  if feed.bozo:
62
  logger.warning(f"Parse error for {feed_url}: {feed.bozo_exception}")
63
  continue
64
- article_count = 0
65
- for entry in feed.entries:
66
- if article_count >= MAX_ARTICLES_PER_FEED:
67
- break
68
-
69
- title_raw = entry.get("title", "No Title")
70
  link = entry.get("link", "")
71
- description = entry.get("summary", entry.get("description", ""))
72
-
73
- clean_title_val = clean_text(title_raw)
74
- clean_desc_val = clean_text(description)
75
 
76
- if not clean_desc_val:
77
  continue
78
 
79
- published = "Unknown Date"
80
  for date_field in ["published", "updated", "created", "pubDate"]:
81
  if date_field in entry:
82
  try:
83
  parsed_date = dateutil.parser.parse(entry[date_field])
84
- published = parsed_date.strftime("%Y-%m-%d %H:%M:%S")
85
  break
86
  except (ValueError, TypeError):
87
  continue
88
-
89
- description_hash = hashlib.sha256(clean_desc_val.encode('utf-8')).hexdigest()
90
- key = f"{clean_title_val}|{link}|{published}|{description_hash}"
91
 
92
- if key not in seen_keys:
93
- seen_keys.add(key)
94
-
95
- image = "svg"
96
- for img_source in [
97
- lambda e: e.get("media_content", [{}])[0].get("url") if e.get("media_content") else "",
98
- lambda e: e.get("media_thumbnail", [{}])[0].get("url") if e.get("media_thumbnail") else "",
99
- lambda e: e.get("enclosure", {}).get("url") if e.get("enclosure") else "",
100
- lambda e: next((lnk.get("href") for lnk in e.get("links", []) if lnk.get("type", "").startswith("image")), ""),
101
- ]:
102
- try:
103
- img = img_source(entry)
104
- if img and img.strip():
105
- image = img
106
- break
107
- except (IndexError, AttributeError, TypeError):
108
- continue
109
-
110
- articles.append({
111
- "title": title_raw,
112
- "link": link,
113
- "description": clean_desc_val,
114
- "published": published,
115
- "category": category,
116
- "image": image,
117
- })
118
- article_count += 1
119
  except Exception as e:
120
- logger.error(f"Error fetching {feed_url}: {e}")
121
- logger.info(f"Total unique articles fetched: {len(articles)}")
 
122
  return articles
123
 
124
- def categorize_feed(url):
125
- if not url or not isinstance(url, str):
126
- logger.warning(f"Invalid URL provided for categorization: {url}")
127
- return "Uncategorized"
128
- url = url.lower().strip()
129
- logger.debug(f"Categorizing URL: {url}")
130
- if any(keyword in url for keyword in ["nature", "science.org", "arxiv.org", "plos.org", "annualreviews.org", "journals.uchicago.edu", "jneurosci.org", "cell.com", "nejm.org", "lancet.com"]):
131
- return "Academic Papers"
132
- elif any(keyword in url for keyword in ["reuters.com/business", "bloomberg.com", "ft.com", "marketwatch.com", "cnbc.com", "foxbusiness.com", "wsj.com", "bworldonline.com", "economist.com", "forbes.com"]):
133
- return "Business"
134
- elif any(keyword in url for keyword in ["investing.com", "cnbc.com/market", "marketwatch.com/market", "fool.co.uk", "zacks.com", "seekingalpha.com", "barrons.com", "yahoofinance.com"]):
135
- return "Stocks & Markets"
136
- elif any(keyword in url for keyword in ["whitehouse.gov", "state.gov", "commerce.gov", "transportation.gov", "ed.gov", "dol.gov", "justice.gov", "federalreserve.gov", "occ.gov", "sec.gov", "bls.gov", "usda.gov", "gao.gov", "cbo.gov", "fema.gov", "defense.gov", "hhs.gov", "energy.gov", "interior.gov"]):
137
- return "Federal Government"
138
- elif any(keyword in url for keyword in ["weather.gov", "metoffice.gov.uk", "accuweather.com", "weatherunderground.com", "noaa.gov", "wunderground.com", "climate.gov", "ecmwf.int", "bom.gov.au"]):
139
- return "Weather"
140
- elif any(keyword in url for keyword in ["data.worldbank.org", "imf.org", "un.org", "oecd.org", "statista.com", "kff.org", "who.int", "cdc.gov", "bea.gov", "census.gov", "fdic.gov"]):
141
- return "Data & Statistics"
142
- elif any(keyword in url for keyword in ["nasa", "spaceweatherlive", "space", "universetoday", "skyandtelescope", "esa"]):
143
- return "Space"
144
- elif any(keyword in url for keyword in ["sciencedaily", "quantamagazine", "smithsonianmag", "popsci", "discovermagazine", "scientificamerican", "newscientist", "livescience", "atlasobscura"]):
145
- return "Science"
146
- elif any(keyword in url for keyword in ["wired", "techcrunch", "arstechnica", "gizmodo", "theverge"]):
147
- return "Tech"
148
- elif any(keyword in url for keyword in ["horoscope", "astrostyle"]):
149
- return "Astrology"
150
- elif any(keyword in url for keyword in ["cnn_allpolitics", "bbci.co.uk/news/politics", "reuters.com/arc/outboundfeeds/newsletter-politics", "politico.com/rss/politics", "thehill"]):
151
- return "Politics"
152
- elif any(keyword in url for keyword in ["weather", "swpc.noaa.gov", "foxweather"]):
153
- return "Earth Weather"
154
- elif "vogue" in url:
155
- return "Lifestyle"
156
- elif any(keyword in url for keyword in ["phys.org", "aps.org", "physicsworld"]):
157
- return "Physics"
158
- else:
159
- logger.warning(f"No matching category found for URL: {url}")
160
- return "Uncategorized"
161
-
162
  def process_and_store_articles(articles):
163
  if not os.path.exists(LOCAL_DB_DIR):
164
  os.makedirs(LOCAL_DB_DIR)
@@ -169,17 +127,19 @@ def process_and_store_articles(articles):
169
  try:
170
  existing_ids = set(collection.get(include=[])["ids"])
171
  logger.info(f"Loaded {len(existing_ids)} existing document IDs from {LOCAL_DB_DIR}.")
172
- except Exception as e:
173
- logger.info(f"No existing DB found or error loading IDs: {e}. Starting fresh.")
174
  existing_ids = set()
175
 
176
- docs_to_add = []
 
177
  ids_to_add = []
178
-
179
  for article in articles:
180
- cleaned_title = clean_text(article["title"])
181
- cleaned_link = clean_text(article["link"])
182
- doc_id = f"{cleaned_title}|{cleaned_link}|{article['published']}"
 
183
 
184
  if doc_id in existing_ids:
185
  continue
@@ -187,32 +147,35 @@ def process_and_store_articles(articles):
187
  metadata = {
188
  "title": article["title"],
189
  "link": article["link"],
190
- "original_description": article["description"],
191
  "published": article["published"],
192
  "category": article["category"],
193
  "image": article["image"],
194
  }
195
- doc = Document(page_content=clean_text(article["description"]), metadata=metadata)
196
- docs_to_add.append(doc)
 
197
  ids_to_add.append(doc_id)
198
 
199
- if docs_to_add:
 
200
  try:
201
- embeddings = get_embedding_model()
202
- for doc, doc_id in zip(docs_to_add, ids_to_add):
203
- collection.add(
204
- documents=[doc.page_content],
205
- metadatas=[doc.metadata],
206
- ids=[doc_id],
207
- embeddings=[embeddings.embed_query(doc.page_content)]
208
- )
209
- client.persist()
210
- logger.info(f"Added {len(docs_to_add)} new articles to DB. Total in DB: {collection.count()}")
211
  except Exception as e:
212
- logger.error(f"Error storing articles: {e}")
 
 
213
 
214
  def download_from_hf_hub():
215
- if not os.path.exists(LOCAL_DB_DIR):
216
  try:
217
  logger.info(f"Downloading Chroma DB from {REPO_ID} to {LOCAL_DB_DIR}...")
218
  snapshot_download(
@@ -220,16 +183,16 @@ def download_from_hf_hub():
220
  repo_type="dataset",
221
  local_dir=".",
222
  local_dir_use_symlinks=False,
223
- allow_patterns=f"{LOCAL_DB_DIR}/**",
224
  token=HF_API_TOKEN
225
  )
226
  logger.info("Finished downloading DB.")
227
  except Exception as e:
228
  logger.warning(f"Could not download from Hugging Face Hub (this is normal on first run): {e}")
229
  else:
230
- logger.info("Local Chroma DB exists, loading existing data.")
231
 
232
- def upload_to_hf_hub():
233
  if os.path.exists(LOCAL_DB_DIR):
234
  try:
235
  logger.info(f"Uploading updated Chroma DB '{LOCAL_DB_DIR}' to {REPO_ID}...")
@@ -238,9 +201,25 @@ def upload_to_hf_hub():
238
  path_in_repo=LOCAL_DB_DIR,
239
  repo_id=REPO_ID,
240
  repo_type="dataset",
241
- token=HF_API_TOKEN,
242
- commit_message=f"Update RSS news database {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
243
  )
244
  logger.info(f"Database folder '{LOCAL_DB_DIR}' uploaded to: {REPO_ID}")
245
  except Exception as e:
246
- logger.error(f"Error uploading to Hugging Face Hub: {e}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import os
2
  import feedparser
3
  from chromadb import PersistentClient
4
+ from langchain_community.embeddings import HuggingFaceEmbeddings
5
+ from langchain_core.documents import Document
6
  import logging
7
  from huggingface_hub import HfApi, login, snapshot_download
 
 
8
  from datetime import datetime
9
  import dateutil.parser
10
  import hashlib
11
  import json
12
  import re
13
 
14
+ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
15
  logger = logging.getLogger(__name__)
16
 
17
  LOCAL_DB_DIR = "chroma_db"
 
20
  HF_API_TOKEN = os.getenv("HF_TOKEN")
21
  REPO_ID = "broadfield-dev/news-rag-db"
22
  MAX_ARTICLES_PER_FEED = 1000
23
+
24
+ def initialize_hf_api():
25
+ if not HF_API_TOKEN:
26
+ logger.error("Hugging Face API token (HF_TOKEN) not set.")
27
+ raise ValueError("HF_TOKEN environment variable is not set.")
28
+ try:
29
+ login(token=HF_API_TOKEN)
30
+ return HfApi()
31
+ except Exception as e:
32
+ logger.error(f"Failed to login to Hugging Face Hub: {e}")
33
+ raise
34
 
35
  def get_embedding_model():
36
  if not hasattr(get_embedding_model, "model"):
 
42
  return ""
43
  text = re.sub(r'<.*?>', '', text)
44
  text = ' '.join(text.split())
45
+ return text.strip()
46
 
47
  def fetch_rss_feeds():
48
  articles = []
 
49
 
50
  try:
51
  with open(FEEDS_FILE, 'r') as f:
 
67
  if feed.bozo:
68
  logger.warning(f"Parse error for {feed_url}: {feed.bozo_exception}")
69
  continue
70
+
71
+ for entry in feed.entries[:MAX_ARTICLES_PER_FEED]:
72
+ title = entry.get("title", "No Title")
 
 
 
73
  link = entry.get("link", "")
74
+ description_raw = entry.get("summary", entry.get("description", ""))
75
+ description = clean_text(description_raw)
 
 
76
 
77
+ if not description:
78
  continue
79
 
80
+ published_str = "Unknown Date"
81
  for date_field in ["published", "updated", "created", "pubDate"]:
82
  if date_field in entry:
83
  try:
84
  parsed_date = dateutil.parser.parse(entry[date_field])
85
+ published_str = parsed_date.isoformat()
86
  break
87
  except (ValueError, TypeError):
88
  continue
 
 
 
89
 
90
+ image = "svg"
91
+ image_sources = [
92
+ lambda e: e.get("media_content", [{}])[0].get("url") if e.get("media_content") else None,
93
+ lambda e: e.get("media_thumbnail", [{}])[0].get("url") if e.get("media_thumbnail") else None,
94
+ lambda e: e.get("enclosure", {}).get("url") if e.get("enclosure") and e.get("enclosure", {}).get('type', '').startswith('image') else None,
95
+ lambda e: next((lnk.get("href") for lnk in e.get("links", []) if lnk.get("type", "").startswith("image")), None),
96
+ ]
97
+ for source_func in image_sources:
98
+ try:
99
+ img_url = source_func(entry)
100
+ if img_url and isinstance(img_url, str) and img_url.strip():
101
+ image = img_url
102
+ break
103
+ except (IndexError, AttributeError, TypeError):
104
+ continue
105
+
106
+ articles.append({
107
+ "title": title,
108
+ "link": link,
109
+ "description": description,
110
+ "published": published_str,
111
+ "category": category,
112
+ "image": image,
113
+ })
 
 
 
114
  except Exception as e:
115
+ logger.error(f"Error fetching or parsing {feed_url}: {e}")
116
+
117
+ logger.info(f"Total articles fetched: {len(articles)}")
118
  return articles
119
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
120
  def process_and_store_articles(articles):
121
  if not os.path.exists(LOCAL_DB_DIR):
122
  os.makedirs(LOCAL_DB_DIR)
 
127
  try:
128
  existing_ids = set(collection.get(include=[])["ids"])
129
  logger.info(f"Loaded {len(existing_ids)} existing document IDs from {LOCAL_DB_DIR}.")
130
+ except Exception:
131
+ logger.info("No existing DB found or it is empty. Starting fresh.")
132
  existing_ids = set()
133
 
134
+ contents_to_add = []
135
+ metadatas_to_add = []
136
  ids_to_add = []
137
+
138
  for article in articles:
139
+ if not article.get('link'):
140
+ continue
141
+
142
+ doc_id = hashlib.sha256(article['link'].encode('utf-8')).hexdigest()
143
 
144
  if doc_id in existing_ids:
145
  continue
 
147
  metadata = {
148
  "title": article["title"],
149
  "link": article["link"],
 
150
  "published": article["published"],
151
  "category": article["category"],
152
  "image": article["image"],
153
  }
154
+
155
+ contents_to_add.append(article["description"])
156
+ metadatas_to_add.append(metadata)
157
  ids_to_add.append(doc_id)
158
 
159
+ if ids_to_add:
160
+ logger.info(f"Found {len(ids_to_add)} new articles to add to the database.")
161
  try:
162
+ embedding_model = get_embedding_model()
163
+ embeddings_to_add = embedding_model.embed_documents(contents_to_add)
164
+
165
+ collection.add(
166
+ embeddings=embeddings_to_add,
167
+ documents=contents_to_add,
168
+ metadatas=metadatas_to_add,
169
+ ids=ids_to_add
170
+ )
171
+ logger.info(f"Successfully added {len(ids_to_add)} new articles to DB. Total in DB: {collection.count()}")
172
  except Exception as e:
173
+ logger.error(f"Error storing articles in ChromaDB: {e}", exc_info=True)
174
+ else:
175
+ logger.info("No new articles to add to the database.")
176
 
177
  def download_from_hf_hub():
178
+ if not os.path.exists(os.path.join(LOCAL_DB_DIR, "chroma.sqlite3")):
179
  try:
180
  logger.info(f"Downloading Chroma DB from {REPO_ID} to {LOCAL_DB_DIR}...")
181
  snapshot_download(
 
183
  repo_type="dataset",
184
  local_dir=".",
185
  local_dir_use_symlinks=False,
186
+ allow_patterns=[f"{LOCAL_DB_DIR}/**"],
187
  token=HF_API_TOKEN
188
  )
189
  logger.info("Finished downloading DB.")
190
  except Exception as e:
191
  logger.warning(f"Could not download from Hugging Face Hub (this is normal on first run): {e}")
192
  else:
193
+ logger.info(f"Local Chroma DB found at '{LOCAL_DB_DIR}', skipping download.")
194
 
195
+ def upload_to_hf_hub(hf_api):
196
  if os.path.exists(LOCAL_DB_DIR):
197
  try:
198
  logger.info(f"Uploading updated Chroma DB '{LOCAL_DB_DIR}' to {REPO_ID}...")
 
201
  path_in_repo=LOCAL_DB_DIR,
202
  repo_id=REPO_ID,
203
  repo_type="dataset",
204
+ commit_message=f"Update RSS news database {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
205
+ ignore_patterns=["*.bak", "*.tmp"]
206
  )
207
  logger.info(f"Database folder '{LOCAL_DB_DIR}' uploaded to: {REPO_ID}")
208
  except Exception as e:
209
+ logger.error(f"Error uploading to Hugging Face Hub: {e}", exc_info=True)
210
+
211
+ def main():
212
+ try:
213
+ hf_api = initialize_hf_api()
214
+ download_from_hf_hub()
215
+ articles_to_process = fetch_rss_feeds()
216
+ if articles_to_process:
217
+ process_and_store_articles(articles_to_process)
218
+ upload_to_hf_hub(hf_api)
219
+ else:
220
+ logger.info("No articles fetched, skipping database processing and upload.")
221
+ except Exception as e:
222
+ logger.critical(f"An unhandled error occurred in main execution: {e}", exc_info=True)
223
+
224
+ if __name__ == "__main__":
225
+ main()