broadfield-dev commited on
Commit
739c95c
·
verified ·
1 Parent(s): 38db139

Update rss_processor.py

Browse files
Files changed (1) hide show
  1. rss_processor.py +137 -128
rss_processor.py CHANGED
@@ -7,86 +7,39 @@ import logging
7
  from huggingface_hub import HfApi, login, snapshot_download
8
  from huggingface_hub.utils import HfHubHTTPError
9
  import json
 
10
  import dateutil.parser
11
  import hashlib
12
  import re
13
- from datetime import datetime
14
 
15
- # Setup logging
16
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
17
  logger = logging.getLogger(__name__)
18
 
19
- # --- Constants ---
20
- # Local and repository configuration
21
- LOCAL_DB_DIR = "chroma_db_news" # Using a more descriptive local directory name
22
  FEEDS_FILE = "rss_feeds.json"
23
  COLLECTION_NAME = "news_articles"
24
- REPO_ID = "broadfield-dev/news-rag-db" # Your Hugging Face Hub repo ID
25
-
26
- # RSS feed fetching configuration
27
- MAX_ARTICLES_PER_FEED = 1000
28
-
29
- # Hugging Face credentials
30
  HF_API_TOKEN = os.getenv("HF_TOKEN")
 
 
31
  if not HF_API_TOKEN:
32
- raise ValueError("Hugging Face API token not found. Please set the HF_TOKEN environment variable.")
33
 
34
- # --- Global Initializations ---
35
- # Initialize Hugging Face API
36
  login(token=HF_API_TOKEN)
37
  hf_api = HfApi()
38
 
39
- # Initialize embedding model once to be reused
40
- logger.info("Loading embedding model...")
41
  embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
42
- logger.info("Embedding model loaded.")
43
 
44
-
45
- def setup_local_db():
46
- """
47
- Ensures the local database directory exists.
48
- If it doesn't, it attempts to download from Hugging Face Hub.
49
- If the Hub repo is empty or doesn't exist, it creates a new local directory.
50
- This function handles the "build a new dataset if one does not exist" requirement.
51
- """
52
- if os.path.exists(LOCAL_DB_DIR):
53
- logger.info(f"Local database found at '{LOCAL_DB_DIR}'.")
54
- return
55
-
56
- logger.info(f"Local database not found. Attempting to download from Hugging Face Hub repo: {REPO_ID}")
57
- try:
58
- # snapshot_download is the correct function for downloading a whole repository/folder
59
- snapshot_download(
60
- repo_id=REPO_ID,
61
- repo_type="dataset",
62
- local_dir=LOCAL_DB_DIR,
63
- token=HF_API_TOKEN,
64
- )
65
- logger.info(f"Database downloaded successfully from {REPO_ID} to {LOCAL_DB_DIR}.")
66
- except HfHubHTTPError as e:
67
- # This error (e.g., 404 Not Found) is expected if the repo is new or empty.
68
- logger.warning(
69
- f"Failed to download from Hub (Repo might be new or empty): {e}. "
70
- f"A new local database will be created at '{LOCAL_DB_DIR}'."
71
- )
72
- os.makedirs(LOCAL_DB_DIR, exist_ok=True)
73
- except Exception as e:
74
- logger.error(f"An unexpected error occurred during DB download: {e}")
75
- logger.info(f"Creating a new local database at '{LOCAL_DB_DIR}'.")
76
- os.makedirs(LOCAL_DB_DIR, exist_ok=True)
77
 
78
  def clean_text(text):
79
- """Clean text by removing HTML tags and extra whitespace."""
80
  if not text or not isinstance(text, str):
81
  return ""
82
- # Remove HTML tags
83
  text = re.sub(r'<.*?>', '', text)
84
- # Normalize whitespace
85
  text = ' '.join(text.split())
86
  return text.strip().lower()
87
 
88
  def fetch_rss_feeds():
89
- """Fetches and parses articles from a list of RSS feeds in a JSON file."""
90
  articles = []
91
  seen_keys = set()
92
 
@@ -94,7 +47,7 @@ def fetch_rss_feeds():
94
  with open(FEEDS_FILE, 'r') as f:
95
  feed_categories = json.load(f)
96
  except FileNotFoundError:
97
- logger.error(f"'{FEEDS_FILE}' not found. Please create it. No feeds to process.")
98
  return []
99
 
100
  for category, feeds in feed_categories.items():
@@ -108,15 +61,21 @@ def fetch_rss_feeds():
108
  logger.info(f"Fetching {feed_url}")
109
  feed = feedparser.parse(feed_url)
110
  if feed.bozo:
111
- logger.warning(f"Feed parsing error for {feed_url}: {feed.bozo_exception}")
112
  continue
113
-
114
- for entry in feed.entries[:MAX_ARTICLES_PER_FEED]:
115
- title = clean_text(entry.get("title", "No Title"))
116
- link = entry.get("link", "") # Don't clean link URL
117
- description = clean_text(entry.get("summary", entry.get("description", "")))
118
 
119
- if not description: # Skip articles without content
 
 
 
 
 
 
 
120
  continue
121
 
122
  published = "Unknown Date"
@@ -129,132 +88,182 @@ def fetch_rss_feeds():
129
  except (ValueError, TypeError):
130
  continue
131
 
132
- # Create a unique key to deduplicate articles before processing
133
- description_hash = hashlib.sha256(description.encode('utf-8')).hexdigest()
134
- key = f"{title}|{link}|{published}|{description_hash}"
135
 
136
  if key not in seen_keys:
137
  seen_keys.add(key)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
138
  articles.append({
139
- "title": entry.get("title", "No Title"),
140
  "link": link,
141
- "description": description,
142
  "published": published,
143
  "category": category,
 
144
  })
 
145
  except Exception as e:
146
- logger.error(f"Error fetching or parsing feed {feed_url}: {e}")
147
-
148
  logger.info(f"Total unique articles fetched: {len(articles)}")
149
  return articles
150
 
151
- def process_and_store_articles(articles, vector_db):
152
- """Processes articles and stores them in the Chroma DB, avoiding duplicates."""
153
- new_docs = []
154
- new_doc_ids = []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
155
 
156
- # Get all existing document IDs from the database once to check for duplicates
157
  try:
158
  existing_ids = set(vector_db.get(include=[])["ids"])
159
- logger.info(f"Found {len(existing_ids)} existing documents in the database.")
160
- except Exception as e:
161
- logger.error(f"Could not retrieve existing IDs from DB. Assuming empty. Error: {e}")
162
  existing_ids = set()
 
163
 
164
  for article in articles:
165
  try:
166
- # Recreate the same unique ID format for checking against the DB
167
  title = clean_text(article["title"])
168
  link = article["link"]
169
- published = article["published"]
170
  description = article["description"]
 
171
  description_hash = hashlib.sha256(description.encode('utf-8')).hexdigest()
172
  doc_id = f"{title}|{link}|{published}|{description_hash}"
173
-
174
  if doc_id in existing_ids:
175
- logger.debug(f"Skipping duplicate article (ID already in DB): {title[:50]}...")
176
  continue
177
-
178
- # Add to our in-memory set to avoid duplicates from the same batch
179
- existing_ids.add(doc_id)
180
 
181
  metadata = {
182
  "title": article["title"],
183
  "link": article["link"],
184
  "published": article["published"],
185
  "category": article["category"],
186
- # Store original description if needed, or keep it clean
187
- # "original_description": article["description"],
188
  }
189
- # The Document object itself doesn't take an ID
190
  doc = Document(page_content=description, metadata=metadata)
191
- new_docs.append(doc)
192
- new_doc_ids.append(doc_id)
193
-
194
  except Exception as e:
195
- logger.error(f"Error processing article '{article.get('title', 'N/A')}': {e}")
196
 
197
- if new_docs:
198
- logger.info(f"Adding {len(new_docs)} new documents to the database...")
199
  try:
200
- # Provide the list of documents and a parallel list of their unique IDs
201
- vector_db.add_documents(documents=new_docs, ids=new_doc_ids)
202
- vector_db.persist() # Save changes to disk
203
- logger.info("Successfully added new documents and persisted the database.")
204
  except Exception as e:
205
- logger.error(f"Failed to add documents to Chroma DB: {e}")
206
  else:
207
- logger.info("No new articles to add to the database.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
208
 
209
  def upload_to_hf_hub():
210
- """Uploads the local Chroma DB directory to the Hugging Face Hub."""
211
  if not os.path.exists(LOCAL_DB_DIR):
212
  logger.warning(f"Local database directory '{LOCAL_DB_DIR}' not found. Nothing to upload.")
213
  return
214
-
215
  try:
216
- # Ensure the repo exists before uploading.
217
  hf_api.create_repo(repo_id=REPO_ID, repo_type="dataset", exist_ok=True)
218
-
219
- logger.info(f"Uploading database from '{LOCAL_DB_DIR}' to Hugging Face repo: {REPO_ID}...")
220
- # upload_folder is the recommended way to upload a directory's contents.
221
  hf_api.upload_folder(
222
  folder_path=LOCAL_DB_DIR,
223
  repo_id=REPO_ID,
224
  repo_type="dataset",
225
- commit_message=f"Update database - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
226
  )
227
- logger.info(f"Database successfully uploaded to {REPO_ID}.")
228
  except Exception as e:
229
  logger.error(f"Error uploading to Hugging Face Hub: {e}")
230
 
231
-
232
- def main():
233
- """Main execution function to run the data pipeline."""
234
- # 1. Ensure local DB exists by downloading from Hub or creating a new one
235
- setup_local_db()
236
-
237
- # 2. Initialize the vector DB object *after* the directory is guaranteed to exist
238
- logger.info("Initializing Chroma vector database...")
239
  vector_db = Chroma(
240
  persist_directory=LOCAL_DB_DIR,
241
  embedding_function=embedding_model,
242
  collection_name=COLLECTION_NAME
243
  )
244
- logger.info("Chroma DB initialized.")
245
-
246
- # 3. Fetch new articles from RSS feeds
247
  articles = fetch_rss_feeds()
248
-
249
- # 4. Process new articles and add them to the DB
250
  if articles:
251
- process_and_store_articles(articles, vector_db)
252
 
253
- # 5. Upload the potentially updated database back to the Hub
254
  upload_to_hf_hub()
255
-
256
- logger.info("Script finished.")
257
-
258
-
259
- if __name__ == "__main__":
260
- main()
 
7
  from huggingface_hub import HfApi, login, snapshot_download
8
  from huggingface_hub.utils import HfHubHTTPError
9
  import json
10
+ from datetime import datetime
11
  import dateutil.parser
12
  import hashlib
13
  import re
 
14
 
 
15
  logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
16
  logger = logging.getLogger(__name__)
17
 
18
+ MAX_ARTICLES_PER_FEED = 1000
19
+ LOCAL_DB_DIR = "chroma_db"
 
20
  FEEDS_FILE = "rss_feeds.json"
21
  COLLECTION_NAME = "news_articles"
 
 
 
 
 
 
22
  HF_API_TOKEN = os.getenv("HF_TOKEN")
23
+ REPO_ID = "broadfield-dev/news-rag-db"
24
+
25
  if not HF_API_TOKEN:
26
+ raise ValueError("HF_TOKEN environment variable not set.")
27
 
 
 
28
  login(token=HF_API_TOKEN)
29
  hf_api = HfApi()
30
 
 
 
31
  embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
 
32
 
33
+ vector_db = None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
34
 
35
  def clean_text(text):
 
36
  if not text or not isinstance(text, str):
37
  return ""
 
38
  text = re.sub(r'<.*?>', '', text)
 
39
  text = ' '.join(text.split())
40
  return text.strip().lower()
41
 
42
  def fetch_rss_feeds():
 
43
  articles = []
44
  seen_keys = set()
45
 
 
47
  with open(FEEDS_FILE, 'r') as f:
48
  feed_categories = json.load(f)
49
  except FileNotFoundError:
50
+ logger.error(f"{FEEDS_FILE} not found. No feeds to process.")
51
  return []
52
 
53
  for category, feeds in feed_categories.items():
 
61
  logger.info(f"Fetching {feed_url}")
62
  feed = feedparser.parse(feed_url)
63
  if feed.bozo:
64
+ logger.warning(f"Parse error for {feed_url}: {feed.bozo_exception}")
65
  continue
66
+ article_count = 0
67
+ for entry in feed.entries:
68
+ if article_count >= MAX_ARTICLES_PER_FEED:
69
+ break
 
70
 
71
+ title_raw = entry.get("title", "No Title")
72
+ link = entry.get("link", "")
73
+ description = entry.get("summary", entry.get("description", ""))
74
+
75
+ clean_title_val = clean_text(title_raw)
76
+ clean_desc_val = clean_text(description)
77
+
78
+ if not clean_desc_val:
79
  continue
80
 
81
  published = "Unknown Date"
 
88
  except (ValueError, TypeError):
89
  continue
90
 
91
+ description_hash = hashlib.sha256(clean_desc_val.encode('utf-8')).hexdigest()
92
+ key = f"{clean_title_val}|{link}|{published}|{description_hash}"
 
93
 
94
  if key not in seen_keys:
95
  seen_keys.add(key)
96
+
97
+ image = "svg"
98
+ for img_source in [
99
+ lambda e: e.get("media_content", [{}])[0].get("url") if e.get("media_content") else "",
100
+ lambda e: e.get("media_thumbnail", [{}])[0].get("url") if e.get("media_thumbnail") else "",
101
+ lambda e: e.get("enclosure", {}).get("url") if e.get("enclosure") else "",
102
+ lambda e: next((lnk.get("href") for lnk in e.get("links", []) if lnk.get("type", "").startswith("image")), ""),
103
+ ]:
104
+ try:
105
+ img = img_source(entry)
106
+ if img and img.strip():
107
+ image = img
108
+ break
109
+ except (IndexError, AttributeError, TypeError):
110
+ continue
111
+
112
  articles.append({
113
+ "title": title_raw,
114
  "link": link,
115
+ "description": clean_desc_val,
116
  "published": published,
117
  "category": category,
118
+ "image": image,
119
  })
120
+ article_count += 1
121
  except Exception as e:
122
+ logger.error(f"Error fetching {feed_url}: {e}")
 
123
  logger.info(f"Total unique articles fetched: {len(articles)}")
124
  return articles
125
 
126
+ def categorize_feed(url):
127
+ if not url or not isinstance(url, str):
128
+ logger.warning(f"Invalid URL provided for categorization: {url}")
129
+ return "Uncategorized"
130
+ url = url.lower().strip()
131
+ logger.debug(f"Categorizing URL: {url}")
132
+ if any(keyword in url for keyword in ["nature", "science.org", "arxiv.org", "plos.org", "annualreviews.org", "journals.uchicago.edu", "jneurosci.org", "cell.com", "nejm.org", "lancet.com"]):
133
+ return "Academic Papers"
134
+ elif any(keyword in url for keyword in ["reuters.com/business", "bloomberg.com", "ft.com", "marketwatch.com", "cnbc.com", "foxbusiness.com", "wsj.com", "bworldonline.com", "economist.com", "forbes.com"]):
135
+ return "Business"
136
+ elif any(keyword in url for keyword in ["investing.com", "cnbc.com/market", "marketwatch.com/market", "fool.co.uk", "zacks.com", "seekingalpha.com", "barrons.com", "yahoofinance.com"]):
137
+ return "Stocks & Markets"
138
+ elif any(keyword in url for keyword in ["whitehouse.gov", "state.gov", "commerce.gov", "transportation.gov", "ed.gov", "dol.gov", "justice.gov", "federalreserve.gov", "occ.gov", "sec.gov", "bls.gov", "usda.gov", "gao.gov", "cbo.gov", "fema.gov", "defense.gov", "hhs.gov", "energy.gov", "interior.gov"]):
139
+ return "Federal Government"
140
+ elif any(keyword in url for keyword in ["weather.gov", "metoffice.gov.uk", "accuweather.com", "weatherunderground.com", "noaa.gov", "wunderground.com", "climate.gov", "ecmwf.int", "bom.gov.au"]):
141
+ return "Weather"
142
+ elif any(keyword in url for keyword in ["data.worldbank.org", "imf.org", "un.org", "oecd.org", "statista.com", "kff.org", "who.int", "cdc.gov", "bea.gov", "census.gov", "fdic.gov"]):
143
+ return "Data & Statistics"
144
+ elif any(keyword in url for keyword in ["nasa", "spaceweatherlive", "space", "universetoday", "skyandtelescope", "esa"]):
145
+ return "Space"
146
+ elif any(keyword in url for keyword in ["sciencedaily", "quantamagazine", "smithsonianmag", "popsci", "discovermagazine", "scientificamerican", "newscientist", "livescience", "atlasobscura"]):
147
+ return "Science"
148
+ elif any(keyword in url for keyword in ["wired", "techcrunch", "arstechnica", "gizmodo", "theverge"]):
149
+ return "Tech"
150
+ elif any(keyword in url for keyword in ["horoscope", "astrostyle"]):
151
+ return "Astrology"
152
+ elif any(keyword in url for keyword in ["cnn_allpolitics", "bbci.co.uk/news/politics", "reuters.com/arc/outboundfeeds/newsletter-politics", "politico.com/rss/politics", "thehill"]):
153
+ return "Politics"
154
+ elif any(keyword in url for keyword in ["weather", "swpc.noaa.gov", "foxweather"]):
155
+ return "Earth Weather"
156
+ elif "vogue" in url:
157
+ return "Lifestyle"
158
+ elif any(keyword in url for keyword in ["phys.org", "aps.org", "physicsworld"]):
159
+ return "Physics"
160
+ else:
161
+ logger.warning(f"No matching category found for URL: {url}")
162
+ return "Uncategorized"
163
+
164
+ def process_and_store_articles(articles):
165
+ if not vector_db:
166
+ logger.error("Vector database is not initialized. Cannot process articles.")
167
+ return
168
+
169
+ documents = []
170
+ doc_ids = []
171
 
 
172
  try:
173
  existing_ids = set(vector_db.get(include=[])["ids"])
174
+ logger.info(f"Found {len(existing_ids)} existing document IDs in the database.")
175
+ except Exception:
 
176
  existing_ids = set()
177
+ logger.info("No existing documents found or error retrieving them. Starting fresh.")
178
 
179
  for article in articles:
180
  try:
 
181
  title = clean_text(article["title"])
182
  link = article["link"]
 
183
  description = article["description"]
184
+ published = article["published"]
185
  description_hash = hashlib.sha256(description.encode('utf-8')).hexdigest()
186
  doc_id = f"{title}|{link}|{published}|{description_hash}"
187
+
188
  if doc_id in existing_ids:
 
189
  continue
 
 
 
190
 
191
  metadata = {
192
  "title": article["title"],
193
  "link": article["link"],
194
  "published": article["published"],
195
  "category": article["category"],
196
+ "image": article["image"],
 
197
  }
 
198
  doc = Document(page_content=description, metadata=metadata)
199
+ documents.append(doc)
200
+ doc_ids.append(doc_id)
201
+ existing_ids.add(doc_id)
202
  except Exception as e:
203
+ logger.error(f"Error processing article {article['title']}: {e}")
204
 
205
+ if documents:
 
206
  try:
207
+ vector_db.add_documents(documents=documents, ids=doc_ids)
208
+ vector_db.persist()
209
+ logger.info(f"Added {len(documents)} new articles to DB. Total documents now: {len(vector_db.get()['ids'])}")
 
210
  except Exception as e:
211
+ logger.error(f"Error storing articles: {e}")
212
  else:
213
+ logger.info("No new articles to add.")
214
+
215
+ def download_from_hf_hub():
216
+ if os.path.exists(LOCAL_DB_DIR):
217
+ logger.info(f"Local database directory '{LOCAL_DB_DIR}' already exists. Skipping download.")
218
+ return
219
+
220
+ logger.info(f"Attempting to download database from Hugging Face Hub repo: {REPO_ID}")
221
+ try:
222
+ snapshot_download(
223
+ repo_id=REPO_ID,
224
+ repo_type="dataset",
225
+ local_dir=LOCAL_DB_DIR,
226
+ token=HF_API_TOKEN,
227
+ )
228
+ logger.info(f"Database successfully downloaded to '{LOCAL_DB_DIR}'.")
229
+ except HfHubHTTPError as e:
230
+ logger.warning(f"Failed to download from Hub (repo may be new or empty): {e}. Building new dataset locally.")
231
+ os.makedirs(LOCAL_DB_DIR, exist_ok=True)
232
+ except Exception as e:
233
+ logger.error(f"An unexpected error occurred during download: {e}. Creating new local directory.")
234
+ os.makedirs(LOCAL_DB_DIR, exist_ok=True)
235
 
236
  def upload_to_hf_hub():
 
237
  if not os.path.exists(LOCAL_DB_DIR):
238
  logger.warning(f"Local database directory '{LOCAL_DB_DIR}' not found. Nothing to upload.")
239
  return
240
+
241
  try:
 
242
  hf_api.create_repo(repo_id=REPO_ID, repo_type="dataset", exist_ok=True)
243
+ logger.info(f"Uploading updated Chroma DB to {REPO_ID}...")
 
 
244
  hf_api.upload_folder(
245
  folder_path=LOCAL_DB_DIR,
246
  repo_id=REPO_ID,
247
  repo_type="dataset",
248
+ commit_message=f"Update database - {datetime.now().isoformat()}"
249
  )
250
+ logger.info(f"Database uploaded successfully to Hugging Face Hub.")
251
  except Exception as e:
252
  logger.error(f"Error uploading to Hugging Face Hub: {e}")
253
 
254
+ if __name__ == "__main__":
255
+ download_from_hf_hub()
256
+
257
+ global vector_db
 
 
 
 
258
  vector_db = Chroma(
259
  persist_directory=LOCAL_DB_DIR,
260
  embedding_function=embedding_model,
261
  collection_name=COLLECTION_NAME
262
  )
263
+
 
 
264
  articles = fetch_rss_feeds()
 
 
265
  if articles:
266
+ process_and_store_articles(articles)
267
 
 
268
  upload_to_hf_hub()
269
+ logger.info("Script finished.")