broadfield-dev commited on
Commit
3aa40bc
·
verified ·
1 Parent(s): a9254a4

Update rss_processor.py

Browse files
Files changed (1) hide show
  1. rss_processor.py +66 -49
rss_processor.py CHANGED
@@ -1,14 +1,13 @@
1
- # rss_processor.py
2
  import os
3
  import feedparser
4
  from langchain.vectorstores import Chroma
5
  from langchain.embeddings import HuggingFaceEmbeddings
6
  from langchain.docstore.document import Document
7
  import logging
8
- from huggingface_hub import HfApi, login
9
  import shutil
10
  import rss_feeds
11
- from datetime import datetime
12
  import dateutil.parser
13
  import hashlib
14
  import re
@@ -19,7 +18,6 @@ logger = logging.getLogger(__name__)
19
 
20
  # Constants
21
  MAX_ARTICLES_PER_FEED = 10
22
- LOCAL_DB_DIR = "chroma_db"
23
  RSS_FEEDS = rss_feeds.RSS_FEEDS
24
  COLLECTION_NAME = "news_articles"
25
  HF_API_TOKEN = os.getenv("DEMO_HF_API_TOKEN", "YOUR_HF_API_TOKEN")
@@ -29,15 +27,15 @@ REPO_ID = "broadfield-dev/news-rag-db"
29
  login(token=HF_API_TOKEN)
30
  hf_api = HfApi()
31
 
32
- # Initialize embedding model (global, reusable)
33
- embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
 
 
 
34
 
35
- # Initialize vector DB with a specific collection name
36
- vector_db = Chroma(
37
- persist_directory=LOCAL_DB_DIR,
38
- embedding_function=embedding_model,
39
- collection_name=COLLECTION_NAME
40
- )
41
 
42
  def clean_text(text):
43
  """Clean text by removing HTML tags and extra whitespace."""
@@ -119,9 +117,9 @@ def categorize_feed(url):
119
  logger.warning(f"Invalid URL provided for categorization: {url}")
120
  return "Uncategorized"
121
 
122
- url = url.lower().strip() # Normalize the URL
123
 
124
- logger.debug(f"Categorizing URL: {url}") # Add debugging for visibility
125
 
126
  if any(keyword in url for keyword in ["nature", "science.org", "arxiv.org", "plos.org", "annualreviews.org", "journals.uchicago.edu", "jneurosci.org", "cell.com", "nejm.org", "lancet.com"]):
127
  return "Academic Papers"
@@ -156,8 +154,21 @@ def categorize_feed(url):
156
  return "Uncategorized"
157
 
158
  def process_and_store_articles(articles):
159
- documents = []
160
- existing_ids = set(vector_db.get()["ids"]) # Load existing IDs once
 
 
 
 
 
 
 
 
 
 
 
 
 
161
  for article in articles:
162
  try:
163
  title = clean_text(article["title"])
@@ -165,10 +176,13 @@ def process_and_store_articles(articles):
165
  description = clean_text(article["description"])
166
  published = article["published"]
167
  description_hash = hashlib.sha256(description.encode('utf-8')).hexdigest()
 
168
  doc_id = f"{title}|{link}|{published}|{description_hash}"
 
169
  if doc_id in existing_ids:
170
- logger.debug(f"Skipping duplicate in DB: {doc_id}")
171
  continue
 
172
  metadata = {
173
  "title": article["title"],
174
  "link": article["link"],
@@ -177,52 +191,55 @@ def process_and_store_articles(articles):
177
  "category": article["category"],
178
  "image": article["image"],
179
  }
180
- doc = Document(page_content=description, metadata=metadata, id=doc_id)
181
- documents.append(doc)
182
- existing_ids.add(doc_id) # Update in-memory set to avoid duplicates within this batch
 
183
  except Exception as e:
184
- logger.error(f"Error processing article {article['title']}: {e}")
185
 
186
- if documents:
187
  try:
188
- vector_db.add_documents(documents)
189
  vector_db.persist()
190
- logger.info(f"Added {len(documents)} new articles to DB. Total documents: {len(vector_db.get()['ids'])}")
191
  except Exception as e:
192
- logger.error(f"Error storing articles: {e}")
193
 
194
  def download_from_hf_hub():
195
- if not os.path.exists(LOCAL_DB_DIR):
196
- try:
197
- hf_api.create_repo(repo_id=REPO_ID, repo_type="dataset", exist_ok=True, token=HF_API_TOKEN)
198
- logger.info(f"Downloading Chroma DB from {REPO_ID}...")
199
- hf_api.hf_hub_download(repo_id=REPO_ID, filename="chroma_db", local_dir=LOCAL_DB_DIR, repo_type="dataset", token=HF_API_TOKEN)
200
- except Exception as e:
201
- logger.error(f"Error downloading from Hugging Face Hub: {e}")
202
- else:
203
- logger.info("Local Chroma DB exists, loading existing data.")
 
 
 
 
 
204
 
205
  def upload_to_hf_hub():
206
- if os.path.exists(LOCAL_DB_DIR):
 
207
  try:
208
- logger.info(f"Uploading updated Chroma DB to {REPO_ID}...")
209
- for root, _, files in os.walk(LOCAL_DB_DIR):
210
- for file in files:
211
- local_path = os.path.join(root, file)
212
- remote_path = os.path.relpath(local_path, LOCAL_DB_DIR)
213
- hf_api.upload_file(
214
- path_or_fileobj=local_path,
215
- path_in_repo=remote_path,
216
- repo_id=REPO_ID,
217
- repo_type="dataset",
218
- token=HF_API_TOKEN
219
- )
220
- logger.info(f"Database uploaded to: {REPO_ID}")
221
  except Exception as e:
222
  logger.error(f"Error uploading to Hugging Face Hub: {e}")
223
 
224
  if __name__ == "__main__":
225
- download_from_hf_hub() # Ensure DB is initialized
226
  articles = fetch_rss_feeds()
227
  process_and_store_articles(articles)
228
  upload_to_hf_hub()
 
 
1
  import os
2
  import feedparser
3
  from langchain.vectorstores import Chroma
4
  from langchain.embeddings import HuggingFaceEmbeddings
5
  from langchain.docstore.document import Document
6
  import logging
7
+ from huggingface_hub import HfApi, login, snapshot_download
8
  import shutil
9
  import rss_feeds
10
+ from datetime import datetime, date
11
  import dateutil.parser
12
  import hashlib
13
  import re
 
18
 
19
  # Constants
20
  MAX_ARTICLES_PER_FEED = 10
 
21
  RSS_FEEDS = rss_feeds.RSS_FEEDS
22
  COLLECTION_NAME = "news_articles"
23
  HF_API_TOKEN = os.getenv("DEMO_HF_API_TOKEN", "YOUR_HF_API_TOKEN")
 
27
  login(token=HF_API_TOKEN)
28
  hf_api = HfApi()
29
 
30
+ def get_embedding_model():
31
+ """Returns a singleton instance of the embedding model to avoid reloading."""
32
+ if not hasattr(get_embedding_model, "model"):
33
+ get_embedding_model.model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
34
+ return get_embedding_model.model
35
 
36
+ def get_daily_db_dir():
37
+ """Returns the path for today's Chroma DB."""
38
+ return f"chroma_db_{date.today().isoformat()}"
 
 
 
39
 
40
  def clean_text(text):
41
  """Clean text by removing HTML tags and extra whitespace."""
 
117
  logger.warning(f"Invalid URL provided for categorization: {url}")
118
  return "Uncategorized"
119
 
120
+ url = url.lower().strip()
121
 
122
+ logger.debug(f"Categorizing URL: {url}")
123
 
124
  if any(keyword in url for keyword in ["nature", "science.org", "arxiv.org", "plos.org", "annualreviews.org", "journals.uchicago.edu", "jneurosci.org", "cell.com", "nejm.org", "lancet.com"]):
125
  return "Academic Papers"
 
154
  return "Uncategorized"
155
 
156
  def process_and_store_articles(articles):
157
+ db_path = get_daily_db_dir()
158
+ vector_db = Chroma(
159
+ persist_directory=db_path,
160
+ embedding_function=get_embedding_model(),
161
+ collection_name=COLLECTION_NAME
162
+ )
163
+
164
+ try:
165
+ existing_ids = set(vector_db.get(include=[])["ids"])
166
+ except Exception:
167
+ existing_ids = set()
168
+
169
+ docs_to_add = []
170
+ ids_to_add = []
171
+
172
  for article in articles:
173
  try:
174
  title = clean_text(article["title"])
 
176
  description = clean_text(article["description"])
177
  published = article["published"]
178
  description_hash = hashlib.sha256(description.encode('utf-8')).hexdigest()
179
+
180
  doc_id = f"{title}|{link}|{published}|{description_hash}"
181
+
182
  if doc_id in existing_ids:
183
+ logger.debug(f"Skipping duplicate in DB {db_path}: {doc_id}")
184
  continue
185
+
186
  metadata = {
187
  "title": article["title"],
188
  "link": article["link"],
 
191
  "category": article["category"],
192
  "image": article["image"],
193
  }
194
+ doc = Document(page_content=description, metadata=metadata)
195
+ docs_to_add.append(doc)
196
+ ids_to_add.append(doc_id)
197
+ existing_ids.add(doc_id)
198
  except Exception as e:
199
+ logger.error(f"Error processing article {article.get('title', 'N/A')}: {e}")
200
 
201
+ if docs_to_add:
202
  try:
203
+ vector_db.add_documents(documents=docs_to_add, ids=ids_to_add)
204
  vector_db.persist()
205
+ logger.info(f"Added {len(docs_to_add)} new articles to DB {db_path}. Total in DB: {vector_db._collection.count()}")
206
  except Exception as e:
207
+ logger.error(f"Error storing articles in {db_path}: {e}")
208
 
209
  def download_from_hf_hub():
210
+ try:
211
+ hf_api.create_repo(repo_id=REPO_ID, repo_type="dataset", exist_ok=True, token=HF_API_TOKEN)
212
+ logger.info(f"Downloading all DBs from {REPO_ID}...")
213
+ snapshot_download(
214
+ repo_id=REPO_ID,
215
+ repo_type="dataset",
216
+ local_dir=".",
217
+ local_dir_use_symlinks=False,
218
+ allow_patterns="chroma_db_*/**",
219
+ token=HF_API_TOKEN
220
+ )
221
+ logger.info("Finished downloading DBs.")
222
+ except Exception as e:
223
+ logger.error(f"Error downloading from Hugging Face Hub: {e}")
224
 
225
  def upload_to_hf_hub():
226
+ db_path = get_daily_db_dir()
227
+ if os.path.exists(db_path):
228
  try:
229
+ logger.info(f"Uploading updated Chroma DB '{db_path}' to {REPO_ID}...")
230
+ hf_api.upload_folder(
231
+ folder_path=db_path,
232
+ path_in_repo=db_path,
233
+ repo_id=REPO_ID,
234
+ repo_type="dataset",
235
+ token=HF_API_TOKEN
236
+ )
237
+ logger.info(f"Database folder '{db_path}' uploaded to: {REPO_ID}")
 
 
 
 
238
  except Exception as e:
239
  logger.error(f"Error uploading to Hugging Face Hub: {e}")
240
 
241
  if __name__ == "__main__":
242
+ download_from_hf_hub()
243
  articles = fetch_rss_feeds()
244
  process_and_store_articles(articles)
245
  upload_to_hf_hub()