broadfield-dev commited on
Commit
e3abb04
·
verified ·
1 Parent(s): e9d9741

Update rss_processor.py

Browse files
Files changed (1) hide show
  1. rss_processor.py +67 -112
rss_processor.py CHANGED
@@ -7,38 +7,29 @@ import logging
7
  from huggingface_hub import HfApi, login, snapshot_download
8
  import shutil
9
  import rss_feeds
10
- from datetime import datetime, date
11
  import dateutil.parser
12
  import hashlib
13
  import re
14
 
15
- # Setup logging
16
  logging.basicConfig(level=logging.INFO)
17
  logger = logging.getLogger(__name__)
18
 
19
- # Constants
20
- MAX_ARTICLES_PER_FEED = 10
21
  RSS_FEEDS = rss_feeds.RSS_FEEDS
22
  COLLECTION_NAME = "news_articles"
23
- HF_API_TOKEN = os.getenv("HF_TOKEN")
24
  REPO_ID = "broadfield-dev/news-rag-db"
25
 
26
- # Initialize Hugging Face API
27
  login(token=HF_API_TOKEN)
28
  hf_api = HfApi()
29
 
30
  def get_embedding_model():
31
- """Returns a singleton instance of the embedding model to avoid reloading."""
32
  if not hasattr(get_embedding_model, "model"):
33
  get_embedding_model.model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
34
  return get_embedding_model.model
35
 
36
- def get_daily_db_dir():
37
- """Returns the path for today's Chroma DB."""
38
- return f"chroma_db_{date.today().isoformat()}"
39
-
40
  def clean_text(text):
41
- """Clean text by removing HTML tags and extra whitespace."""
42
  if not text or not isinstance(text, str):
43
  return ""
44
  text = re.sub(r'<.*?>', '', text)
@@ -57,16 +48,15 @@ def fetch_rss_feeds():
57
  continue
58
  article_count = 0
59
  for entry in feed.entries:
60
- if article_count >= MAX_ARTICLES_PER_FEED:
61
  break
62
  title = entry.get("title", "No Title")
63
  link = entry.get("link", "")
64
  description = entry.get("summary", entry.get("description", ""))
65
 
66
- title = clean_text(title)
67
- link = clean_text(link)
68
- description = clean_text(description)
69
-
70
  published = "Unknown Date"
71
  for date_field in ["published", "updated", "created", "pubDate"]:
72
  if date_field in entry:
@@ -74,20 +64,16 @@ def fetch_rss_feeds():
74
  parsed_date = dateutil.parser.parse(entry[date_field])
75
  published = parsed_date.strftime("%Y-%m-%d %H:%M:%S")
76
  break
77
- except (ValueError, TypeError) as e:
78
- logger.debug(f"Failed to parse {date_field} '{entry[date_field]}': {e}")
79
  continue
80
 
81
- description_hash = hashlib.sha256(description.encode('utf-8')).hexdigest()
82
- key = f"{title}|{link}|{published}|{description_hash}"
83
  if key not in seen_keys:
84
  seen_keys.add(key)
85
  image = "svg"
86
  for img_source in [
87
  lambda e: clean_text(e.get("media_content", [{}])[0].get("url")) if e.get("media_content") else "",
88
  lambda e: clean_text(e.get("media_thumbnail", [{}])[0].get("url")) if e.get("media_thumbnail") else "",
89
- lambda e: clean_text(e.get("enclosure", {}).get("url")) if e.get("enclosure") else "",
90
- lambda e: clean_text(next((lnk.get("href") for lnk in e.get("links", []) if lnk.get("type", "").startswith("image")), "")),
91
  ]:
92
  try:
93
  img = img_source(entry)
@@ -112,129 +98,98 @@ def fetch_rss_feeds():
112
  return articles
113
 
114
  def categorize_feed(url):
115
- """Categorize an RSS feed based on its URL."""
116
  if not url or not isinstance(url, str):
117
- logger.warning(f"Invalid URL provided for categorization: {url}")
118
  return "Uncategorized"
119
-
120
  url = url.lower().strip()
121
-
122
- logger.debug(f"Categorizing URL: {url}")
123
-
124
- if any(keyword in url for keyword in ["nature", "science.org", "arxiv.org", "plos.org", "annualreviews.org", "journals.uchicago.edu", "jneurosci.org", "cell.com", "nejm.org", "lancet.com"]):
125
- return "Academic Papers"
126
- elif any(keyword in url for keyword in ["reuters.com/business", "bloomberg.com", "ft.com", "marketwatch.com", "cnbc.com", "foxbusiness.com", "wsj.com", "bworldonline.com", "economist.com", "forbes.com"]):
127
- return "Business"
128
- elif any(keyword in url for keyword in ["investing.com", "cnbc.com/market", "marketwatch.com/market", "fool.co.uk", "zacks.com", "seekingalpha.com", "barrons.com", "yahoofinance.com"]):
129
- return "Stocks & Markets"
130
- elif any(keyword in url for keyword in ["whitehouse.gov", "state.gov", "commerce.gov", "transportation.gov", "ed.gov", "dol.gov", "justice.gov", "federalreserve.gov", "occ.gov", "sec.gov", "bls.gov", "usda.gov", "gao.gov", "cbo.gov", "fema.gov", "defense.gov", "hhs.gov", "energy.gov", "interior.gov"]):
131
- return "Federal Government"
132
- elif any(keyword in url for keyword in ["weather.gov", "metoffice.gov.uk", "accuweather.com", "weatherunderground.com", "noaa.gov", "wunderground.com", "climate.gov", "ecmwf.int", "bom.gov.au"]):
133
- return "Weather"
134
- elif any(keyword in url for keyword in ["data.worldbank.org", "imf.org", "un.org", "oecd.org", "statista.com", "kff.org", "who.int", "cdc.gov", "bea.gov", "census.gov", "fdic.gov"]):
135
- return "Data & Statistics"
136
- elif any(keyword in url for keyword in ["nasa", "spaceweatherlive", "space", "universetoday", "skyandtelescope", "esa"]):
137
- return "Space"
138
- elif any(keyword in url for keyword in ["sciencedaily", "quantamagazine", "smithsonianmag", "popsci", "discovermagazine", "scientificamerican", "newscientist", "livescience", "atlasobscura"]):
139
- return "Science"
140
- elif any(keyword in url for keyword in ["wired", "techcrunch", "arstechnica", "gizmodo", "theverge"]):
141
- return "Tech"
142
- elif any(keyword in url for keyword in ["horoscope", "astrostyle"]):
143
- return "Astrology"
144
- elif any(keyword in url for keyword in ["cnn_allpolitics", "bbci.co.uk/news/politics", "reuters.com/arc/outboundfeeds/newsletter-politics", "politico.com/rss/politics", "thehill"]):
145
- return "Politics"
146
- elif any(keyword in url for keyword in ["weather", "swpc.noaa.gov", "foxweather"]):
147
- return "Earth Weather"
148
- elif "vogue" in url:
149
- return "Lifestyle"
150
- elif any(keyword in url for keyword in ["phys.org", "aps.org", "physicsworld"]):
151
- return "Physics"
152
- else:
153
- logger.warning(f"No matching category found for URL: {url}")
154
- return "Uncategorized"
155
 
156
  def process_and_store_articles(articles):
157
- db_path = get_daily_db_dir()
158
  vector_db = Chroma(
159
- persist_directory=db_path,
160
  embedding_function=get_embedding_model(),
161
  collection_name=COLLECTION_NAME
162
  )
163
 
164
  try:
165
  existing_ids = set(vector_db.get(include=[])["ids"])
 
166
  except Exception:
167
  existing_ids = set()
 
168
 
169
  docs_to_add = []
170
  ids_to_add = []
171
 
172
  for article in articles:
173
- try:
174
- title = clean_text(article["title"])
175
- link = clean_text(article["link"])
176
- description = clean_text(article["description"])
177
- published = article["published"]
178
- description_hash = hashlib.sha256(description.encode('utf-8')).hexdigest()
179
-
180
- doc_id = f"{title}|{link}|{published}|{description_hash}"
181
-
182
- if doc_id in existing_ids:
183
- logger.debug(f"Skipping duplicate in DB {db_path}: {doc_id}")
184
- continue
185
-
186
- metadata = {
187
- "title": article["title"],
188
- "link": article["link"],
189
- "original_description": article["description"],
190
- "published": article["published"],
191
- "category": article["category"],
192
- "image": article["image"],
193
- }
194
- doc = Document(page_content=description, metadata=metadata)
195
- docs_to_add.append(doc)
196
- ids_to_add.append(doc_id)
197
- existing_ids.add(doc_id)
198
- except Exception as e:
199
- logger.error(f"Error processing article {article.get('title', 'N/A')}: {e}")
200
 
201
  if docs_to_add:
202
  try:
203
  vector_db.add_documents(documents=docs_to_add, ids=ids_to_add)
204
  vector_db.persist()
205
- logger.info(f"Added {len(docs_to_add)} new articles to DB {db_path}. Total in DB: {vector_db._collection.count()}")
206
  except Exception as e:
207
- logger.error(f"Error storing articles in {db_path}: {e}")
208
 
209
  def download_from_hf_hub():
210
- try:
211
- hf_api.create_repo(repo_id=REPO_ID, repo_type="dataset", exist_ok=True, token=HF_API_TOKEN)
212
- logger.info(f"Downloading all DBs from {REPO_ID}...")
213
- snapshot_download(
214
- repo_id=REPO_ID,
215
- repo_type="dataset",
216
- local_dir=".",
217
- local_dir_use_symlinks=False,
218
- allow_patterns="chroma_db_*/**",
219
- token=HF_API_TOKEN
220
- )
221
- logger.info("Finished downloading DBs.")
222
- except Exception as e:
223
- logger.error(f"Error downloading from Hugging Face Hub: {e}")
 
 
224
 
225
  def upload_to_hf_hub():
226
- db_path = get_daily_db_dir()
227
- if os.path.exists(db_path):
228
  try:
229
- logger.info(f"Uploading updated Chroma DB '{db_path}' to {REPO_ID}...")
230
  hf_api.upload_folder(
231
- folder_path=db_path,
232
- path_in_repo=db_path,
233
  repo_id=REPO_ID,
234
  repo_type="dataset",
235
- token=HF_API_TOKEN
 
236
  )
237
- logger.info(f"Database folder '{db_path}' uploaded to: {REPO_ID}")
238
  except Exception as e:
239
  logger.error(f"Error uploading to Hugging Face Hub: {e}")
240
 
 
7
  from huggingface_hub import HfApi, login, snapshot_download
8
  import shutil
9
  import rss_feeds
10
+ from datetime import datetime
11
  import dateutil.parser
12
  import hashlib
13
  import re
14
 
 
15
  logging.basicConfig(level=logging.INFO)
16
  logger = logging.getLogger(__name__)
17
 
18
+ LOCAL_DB_DIR = "chroma_db"
 
19
  RSS_FEEDS = rss_feeds.RSS_FEEDS
20
  COLLECTION_NAME = "news_articles"
21
+ HF_API_TOKEN = os.getenv("DEMO_HF_API_TOKEN", "YOUR_HF_API_TOKEN")
22
  REPO_ID = "broadfield-dev/news-rag-db"
23
 
 
24
  login(token=HF_API_TOKEN)
25
  hf_api = HfApi()
26
 
27
  def get_embedding_model():
 
28
  if not hasattr(get_embedding_model, "model"):
29
  get_embedding_model.model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
30
  return get_embedding_model.model
31
 
 
 
 
 
32
  def clean_text(text):
 
33
  if not text or not isinstance(text, str):
34
  return ""
35
  text = re.sub(r'<.*?>', '', text)
 
48
  continue
49
  article_count = 0
50
  for entry in feed.entries:
51
+ if article_count >= 10:
52
  break
53
  title = entry.get("title", "No Title")
54
  link = entry.get("link", "")
55
  description = entry.get("summary", entry.get("description", ""))
56
 
57
+ cleaned_title = clean_text(title)
58
+ cleaned_link = clean_text(link)
59
+
 
60
  published = "Unknown Date"
61
  for date_field in ["published", "updated", "created", "pubDate"]:
62
  if date_field in entry:
 
64
  parsed_date = dateutil.parser.parse(entry[date_field])
65
  published = parsed_date.strftime("%Y-%m-%d %H:%M:%S")
66
  break
67
+ except (ValueError, TypeError):
 
68
  continue
69
 
70
+ key = f"{cleaned_title}|{cleaned_link}|{published}"
 
71
  if key not in seen_keys:
72
  seen_keys.add(key)
73
  image = "svg"
74
  for img_source in [
75
  lambda e: clean_text(e.get("media_content", [{}])[0].get("url")) if e.get("media_content") else "",
76
  lambda e: clean_text(e.get("media_thumbnail", [{}])[0].get("url")) if e.get("media_thumbnail") else "",
 
 
77
  ]:
78
  try:
79
  img = img_source(entry)
 
98
  return articles
99
 
100
  def categorize_feed(url):
 
101
  if not url or not isinstance(url, str):
 
102
  return "Uncategorized"
 
103
  url = url.lower().strip()
104
+ if any(keyword in url for keyword in ["nature", "science.org", "arxiv.org", "plos.org", "jneurosci.org", "nejm.org", "lancet.com"]): return "Academic Papers"
105
+ if any(keyword in url for keyword in ["ft.com", "marketwatch.com", "cnbc.com", "wsj.com", "economist.com"]): return "Business"
106
+ if any(keyword in url for keyword in ["investing.com", "fool.co.uk", "seekingalpha.com", "yahoofinance.com"]): return "Stocks & Markets"
107
+ if any(keyword in url for keyword in ["nasa", "spaceweatherlive", "space.com", "universetoday.com", "esa.int"]): return "Space"
108
+ if any(keyword in url for keyword in ["sciencedaily", "quantamagazine", "scientificamerican", "newscientist", "livescience"]): return "Science"
109
+ if any(keyword in url for keyword in ["wired", "techcrunch", "arstechnica", "gizmodo", "theverge"]): return "Tech"
110
+ if any(keyword in url for keyword in ["horoscope", "astrostyle"]): return "Astrology"
111
+ if any(keyword in url for keyword in ["bbci.co.uk/news/politics", "politico.com", "thehill.com"]): return "Politics"
112
+ if any(keyword in url for keyword in ["weather.com", "weather.gov", "swpc.noaa.gov", "foxweather"]): return "Earth Weather"
113
+ if "phys.org" in url or "aps.org" in url: return "Physics"
114
+ return "Uncategorized"
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
115
 
116
  def process_and_store_articles(articles):
 
117
  vector_db = Chroma(
118
+ persist_directory=LOCAL_DB_DIR,
119
  embedding_function=get_embedding_model(),
120
  collection_name=COLLECTION_NAME
121
  )
122
 
123
  try:
124
  existing_ids = set(vector_db.get(include=[])["ids"])
125
+ logger.info(f"Loaded {len(existing_ids)} existing document IDs from {LOCAL_DB_DIR}.")
126
  except Exception:
127
  existing_ids = set()
128
+ logger.info("No existing DB found or it is empty. Starting fresh.")
129
 
130
  docs_to_add = []
131
  ids_to_add = []
132
 
133
  for article in articles:
134
+ cleaned_title = clean_text(article["title"])
135
+ cleaned_link = clean_text(article["link"])
136
+ doc_id = f"{cleaned_title}|{cleaned_link}|{article['published']}"
137
+
138
+ if doc_id in existing_ids:
139
+ continue
140
+
141
+ metadata = {
142
+ "title": article["title"],
143
+ "link": article["link"],
144
+ "original_description": article["description"],
145
+ "published": article["published"],
146
+ "category": article["category"],
147
+ "image": article["image"],
148
+ }
149
+ doc = Document(page_content=clean_text(article["description"]), metadata=metadata)
150
+ docs_to_add.append(doc)
151
+ ids_to_add.append(doc_id)
152
+ existing_ids.add(doc_id)
 
 
 
 
 
 
 
 
153
 
154
  if docs_to_add:
155
  try:
156
  vector_db.add_documents(documents=docs_to_add, ids=ids_to_add)
157
  vector_db.persist()
158
+ logger.info(f"Added {len(docs_to_add)} new articles to DB. Total in DB: {vector_db._collection.count()}")
159
  except Exception as e:
160
+ logger.error(f"Error storing articles: {e}")
161
 
162
  def download_from_hf_hub():
163
+ if not os.path.exists(LOCAL_DB_DIR):
164
+ try:
165
+ logger.info(f"Downloading Chroma DB from {REPO_ID} to {LOCAL_DB_DIR}...")
166
+ snapshot_download(
167
+ repo_id=REPO_ID,
168
+ repo_type="dataset",
169
+ local_dir=".",
170
+ local_dir_use_symlinks=False,
171
+ allow_patterns=f"{LOCAL_DB_DIR}/**",
172
+ token=HF_API_TOKEN
173
+ )
174
+ logger.info("Finished downloading DB.")
175
+ except Exception as e:
176
+ logger.warning(f"Could not download from Hugging Face Hub (this is normal on first run): {e}")
177
+ else:
178
+ logger.info("Local Chroma DB exists, loading existing data.")
179
 
180
  def upload_to_hf_hub():
181
+ if os.path.exists(LOCAL_DB_DIR):
 
182
  try:
183
+ logger.info(f"Uploading updated Chroma DB '{LOCAL_DB_DIR}' to {REPO_ID}...")
184
  hf_api.upload_folder(
185
+ folder_path=LOCAL_DB_DIR,
186
+ path_in_repo=LOCAL_DB_DIR,
187
  repo_id=REPO_ID,
188
  repo_type="dataset",
189
+ token=HF_API_TOKEN,
190
+ commit_message="Update RSS news database"
191
  )
192
+ logger.info(f"Database folder '{LOCAL_DB_DIR}' uploaded to: {REPO_ID}")
193
  except Exception as e:
194
  logger.error(f"Error uploading to Hugging Face Hub: {e}")
195