broadfield-dev commited on
Commit
38db139
·
verified ·
1 Parent(s): 30e01c8

Update rss_processor.py

Browse files
Files changed (1) hide show
  1. rss_processor.py +158 -139
rss_processor.py CHANGED
@@ -4,49 +4,89 @@ from langchain.vectorstores import Chroma
4
  from langchain.embeddings import HuggingFaceEmbeddings
5
  from langchain.docstore.document import Document
6
  import logging
7
- from huggingface_hub import HfApi, login
8
- import shutil
9
  import json
10
- from datetime import datetime
11
  import dateutil.parser
12
  import hashlib
13
  import re
 
14
 
15
  # Setup logging
16
- logging.basicConfig(level=logging.INFO)
17
  logger = logging.getLogger(__name__)
18
 
19
- # Constants
20
- MAX_ARTICLES_PER_FEED = 1000
21
- LOCAL_DB_DIR = "chroma_db"
22
  FEEDS_FILE = "rss_feeds.json"
23
  COLLECTION_NAME = "news_articles"
 
 
 
 
 
 
24
  HF_API_TOKEN = os.getenv("HF_TOKEN")
25
- REPO_ID = "broadfield-dev/news-rag-db"
 
26
 
 
27
  # Initialize Hugging Face API
28
  login(token=HF_API_TOKEN)
29
  hf_api = HfApi()
30
 
31
- # Initialize embedding model (global, reusable)
 
32
  embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
 
33
 
34
- # Initialize vector DB with a specific collection name
35
- vector_db = Chroma(
36
- persist_directory=LOCAL_DB_DIR,
37
- embedding_function=embedding_model,
38
- collection_name=COLLECTION_NAME
39
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
40
 
41
  def clean_text(text):
42
  """Clean text by removing HTML tags and extra whitespace."""
43
  if not text or not isinstance(text, str):
44
  return ""
 
45
  text = re.sub(r'<.*?>', '', text)
 
46
  text = ' '.join(text.split())
47
  return text.strip().lower()
48
 
49
  def fetch_rss_feeds():
 
50
  articles = []
51
  seen_keys = set()
52
 
@@ -54,7 +94,7 @@ def fetch_rss_feeds():
54
  with open(FEEDS_FILE, 'r') as f:
55
  feed_categories = json.load(f)
56
  except FileNotFoundError:
57
- logger.error(f"{FEEDS_FILE} not found. No feeds to process.")
58
  return []
59
 
60
  for category, feeds in feed_categories.items():
@@ -68,19 +108,16 @@ def fetch_rss_feeds():
68
  logger.info(f"Fetching {feed_url}")
69
  feed = feedparser.parse(feed_url)
70
  if feed.bozo:
71
- logger.warning(f"Parse error for {feed_url}: {feed.bozo_exception}")
72
  continue
73
- article_count = 0
74
- for entry in feed.entries:
75
- if article_count >= MAX_ARTICLES_PER_FEED:
76
- break
77
- title = entry.get("title", "No Title")
78
- link = entry.get("link", "")
79
- description = entry.get("summary", entry.get("description", ""))
80
 
81
- title = clean_text(title)
82
- link = clean_text(link)
83
- description = clean_text(description)
84
 
85
  published = "Unknown Date"
86
  for date_field in ["published", "updated", "created", "pubDate"]:
@@ -89,153 +126,135 @@ def fetch_rss_feeds():
89
  parsed_date = dateutil.parser.parse(entry[date_field])
90
  published = parsed_date.strftime("%Y-%m-%d %H:%M:%S")
91
  break
92
- except (ValueError, TypeError) as e:
93
- logger.debug(f"Failed to parse {date_field} '{entry[date_field]}': {e}")
94
  continue
95
 
 
96
  description_hash = hashlib.sha256(description.encode('utf-8')).hexdigest()
97
  key = f"{title}|{link}|{published}|{description_hash}"
 
98
  if key not in seen_keys:
99
  seen_keys.add(key)
100
- image = "svg"
101
- for img_source in [
102
- lambda e: clean_text(e.get("media_content", [{}])[0].get("url")) if e.get("media_content") else "",
103
- lambda e: clean_text(e.get("media_thumbnail", [{}])[0].get("url")) if e.get("media_thumbnail") else "",
104
- lambda e: clean_text(e.get("enclosure", {}).get("url")) if e.get("enclosure") else "",
105
- lambda e: clean_text(next((lnk.get("href") for lnk in e.get("links", []) if lnk.get("type", "").startswith("image")), "")),
106
- ]:
107
- try:
108
- img = img_source(entry)
109
- if img and img.strip():
110
- image = img
111
- break
112
- except (IndexError, AttributeError, TypeError):
113
- continue
114
-
115
  articles.append({
116
- "title": title,
117
  "link": link,
118
  "description": description,
119
  "published": published,
120
- "category": category, # Use JSON category directly
121
- "image": image,
122
  })
123
- article_count += 1
124
  except Exception as e:
125
- logger.error(f"Error fetching {feed_url}: {e}")
126
- logger.info(f"Total articles fetched: {len(articles)}")
 
127
  return articles
128
 
129
- def categorize_feed(url):
130
- """Categorize an RSS feed based on its URL."""
131
- if not url or not isinstance(url, str):
132
- logger.warning(f"Invalid URL provided for categorization: {url}")
133
- return "Uncategorized"
134
-
135
- url = url.lower().strip() # Normalize the URL
136
-
137
- logger.debug(f"Categorizing URL: {url}") # Add debugging for visibility
138
-
139
- if any(keyword in url for keyword in ["nature", "science.org", "arxiv.org", "plos.org", "annualreviews.org", "journals.uchicago.edu", "jneurosci.org", "cell.com", "nejm.org", "lancet.com"]):
140
- return "Academic Papers"
141
- elif any(keyword in url for keyword in ["reuters.com/business", "bloomberg.com", "ft.com", "marketwatch.com", "cnbc.com", "foxbusiness.com", "wsj.com", "bworldonline.com", "economist.com", "forbes.com"]):
142
- return "Business"
143
- elif any(keyword in url for keyword in ["investing.com", "cnbc.com/market", "marketwatch.com/market", "fool.co.uk", "zacks.com", "seekingalpha.com", "barrons.com", "yahoofinance.com"]):
144
- return "Stocks & Markets"
145
- elif any(keyword in url for keyword in ["whitehouse.gov", "state.gov", "commerce.gov", "transportation.gov", "ed.gov", "dol.gov", "justice.gov", "federalreserve.gov", "occ.gov", "sec.gov", "bls.gov", "usda.gov", "gao.gov", "cbo.gov", "fema.gov", "defense.gov", "hhs.gov", "energy.gov", "interior.gov"]):
146
- return "Federal Government"
147
- elif any(keyword in url for keyword in ["weather.gov", "metoffice.gov.uk", "accuweather.com", "weatherunderground.com", "noaa.gov", "wunderground.com", "climate.gov", "ecmwf.int", "bom.gov.au"]):
148
- return "Weather"
149
- elif any(keyword in url for keyword in ["data.worldbank.org", "imf.org", "un.org", "oecd.org", "statista.com", "kff.org", "who.int", "cdc.gov", "bea.gov", "census.gov", "fdic.gov"]):
150
- return "Data & Statistics"
151
- elif any(keyword in url for keyword in ["nasa", "spaceweatherlive", "space", "universetoday", "skyandtelescope", "esa"]):
152
- return "Space"
153
- elif any(keyword in url for keyword in ["sciencedaily", "quantamagazine", "smithsonianmag", "popsci", "discovermagazine", "scientificamerican", "newscientist", "livescience", "atlasobscura"]):
154
- return "Science"
155
- elif any(keyword in url for keyword in ["wired", "techcrunch", "arstechnica", "gizmodo", "theverge"]):
156
- return "Tech"
157
- elif any(keyword in url for keyword in ["horoscope", "astrostyle"]):
158
- return "Astrology"
159
- elif any(keyword in url for keyword in ["cnn_allpolitics", "bbci.co.uk/news/politics", "reuters.com/arc/outboundfeeds/newsletter-politics", "politico.com/rss/politics", "thehill"]):
160
- return "Politics"
161
- elif any(keyword in url for keyword in ["weather", "swpc.noaa.gov", "foxweather"]):
162
- return "Earth Weather"
163
- elif "vogue" in url:
164
- return "Lifestyle"
165
- elif any(keyword in url for keyword in ["phys.org", "aps.org", "physicsworld"]):
166
- return "Physics"
167
- else:
168
- logger.warning(f"No matching category found for URL: {url}")
169
- return "Uncategorized"
170
 
171
- def process_and_store_articles(articles):
172
- documents = []
173
- existing_ids = set(vector_db.get()["ids"]) # Load existing IDs once
174
  for article in articles:
175
  try:
 
176
  title = clean_text(article["title"])
177
- link = clean_text(article["link"])
178
- description = clean_text(article["description"])
179
  published = article["published"]
 
180
  description_hash = hashlib.sha256(description.encode('utf-8')).hexdigest()
181
  doc_id = f"{title}|{link}|{published}|{description_hash}"
 
182
  if doc_id in existing_ids:
183
- logger.debug(f"Skipping duplicate in DB: {doc_id}")
184
  continue
 
 
 
 
185
  metadata = {
186
  "title": article["title"],
187
  "link": article["link"],
188
- "original_description": article["description"],
189
  "published": article["published"],
190
  "category": article["category"],
191
- "image": article["image"],
 
192
  }
193
- doc = Document(page_content=description, metadata=metadata, id=doc_id)
194
- documents.append(doc)
195
- existing_ids.add(doc_id) # Update in-memory set to avoid duplicates within this batch
 
 
196
  except Exception as e:
197
- logger.error(f"Error processing article {article['title']}: {e}")
198
 
199
- if documents:
 
200
  try:
201
- vector_db.add_documents(documents)
202
- vector_db.persist()
203
- logger.info(f"Added {len(documents)} new articles to DB. Total documents: {len(vector_db.get()['ids'])}")
 
204
  except Exception as e:
205
- logger.error(f"Error storing articles: {e}")
206
-
207
- def download_from_hf_hub():
208
- if not os.path.exists(LOCAL_DB_DIR):
209
- try:
210
- hf_api.create_repo(repo_id=REPO_ID, repo_type="dataset", exist_ok=True, token=HF_API_TOKEN)
211
- logger.info(f"Downloading Chroma DB from {REPO_ID}...")
212
- hf_api.hf_hub_download(repo_id=REPO_ID, filename="chroma_db", local_dir=LOCAL_DB_DIR, repo_type="dataset", token=HF_API_TOKEN)
213
- except Exception as e:
214
- logger.error(f"Error downloading from Hugging Face Hub: {e}")
215
  else:
216
- logger.info("Local Chroma DB exists, loading existing data.")
217
 
218
  def upload_to_hf_hub():
219
- if os.path.exists(LOCAL_DB_DIR):
220
- try:
221
- logger.info(f"Uploading updated Chroma DB to {REPO_ID}...")
222
- for root, _, files in os.walk(LOCAL_DB_DIR):
223
- for file in files:
224
- local_path = os.path.join(root, file)
225
- remote_path = os.path.relpath(local_path, LOCAL_DB_DIR)
226
- hf_api.upload_file(
227
- path_or_fileobj=local_path,
228
- path_in_repo=remote_path,
229
- repo_id=REPO_ID,
230
- repo_type="dataset",
231
- token=HF_API_TOKEN
232
- )
233
- logger.info(f"Database uploaded to: {REPO_ID}")
234
- except Exception as e:
235
- logger.error(f"Error uploading to Hugging Face Hub: {e}")
236
 
237
- if __name__ == "__main__":
238
- download_from_hf_hub() # Ensure DB is initialized
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
239
  articles = fetch_rss_feeds()
240
- process_and_store_articles(articles)
241
- upload_to_hf_hub()
 
 
 
 
 
 
 
 
 
 
 
 
4
  from langchain.embeddings import HuggingFaceEmbeddings
5
  from langchain.docstore.document import Document
6
  import logging
7
+ from huggingface_hub import HfApi, login, snapshot_download
8
+ from huggingface_hub.utils import HfHubHTTPError
9
  import json
 
10
  import dateutil.parser
11
  import hashlib
12
  import re
13
+ from datetime import datetime
14
 
15
  # Setup logging
16
+ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
17
  logger = logging.getLogger(__name__)
18
 
19
+ # --- Constants ---
20
+ # Local and repository configuration
21
+ LOCAL_DB_DIR = "chroma_db_news" # Using a more descriptive local directory name
22
  FEEDS_FILE = "rss_feeds.json"
23
  COLLECTION_NAME = "news_articles"
24
+ REPO_ID = "broadfield-dev/news-rag-db" # Your Hugging Face Hub repo ID
25
+
26
+ # RSS feed fetching configuration
27
+ MAX_ARTICLES_PER_FEED = 1000
28
+
29
+ # Hugging Face credentials
30
  HF_API_TOKEN = os.getenv("HF_TOKEN")
31
+ if not HF_API_TOKEN:
32
+ raise ValueError("Hugging Face API token not found. Please set the HF_TOKEN environment variable.")
33
 
34
+ # --- Global Initializations ---
35
  # Initialize Hugging Face API
36
  login(token=HF_API_TOKEN)
37
  hf_api = HfApi()
38
 
39
+ # Initialize embedding model once to be reused
40
+ logger.info("Loading embedding model...")
41
  embedding_model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
42
+ logger.info("Embedding model loaded.")
43
 
44
+
45
+ def setup_local_db():
46
+ """
47
+ Ensures the local database directory exists.
48
+ If it doesn't, it attempts to download from Hugging Face Hub.
49
+ If the Hub repo is empty or doesn't exist, it creates a new local directory.
50
+ This function handles the "build a new dataset if one does not exist" requirement.
51
+ """
52
+ if os.path.exists(LOCAL_DB_DIR):
53
+ logger.info(f"Local database found at '{LOCAL_DB_DIR}'.")
54
+ return
55
+
56
+ logger.info(f"Local database not found. Attempting to download from Hugging Face Hub repo: {REPO_ID}")
57
+ try:
58
+ # snapshot_download is the correct function for downloading a whole repository/folder
59
+ snapshot_download(
60
+ repo_id=REPO_ID,
61
+ repo_type="dataset",
62
+ local_dir=LOCAL_DB_DIR,
63
+ token=HF_API_TOKEN,
64
+ )
65
+ logger.info(f"Database downloaded successfully from {REPO_ID} to {LOCAL_DB_DIR}.")
66
+ except HfHubHTTPError as e:
67
+ # This error (e.g., 404 Not Found) is expected if the repo is new or empty.
68
+ logger.warning(
69
+ f"Failed to download from Hub (Repo might be new or empty): {e}. "
70
+ f"A new local database will be created at '{LOCAL_DB_DIR}'."
71
+ )
72
+ os.makedirs(LOCAL_DB_DIR, exist_ok=True)
73
+ except Exception as e:
74
+ logger.error(f"An unexpected error occurred during DB download: {e}")
75
+ logger.info(f"Creating a new local database at '{LOCAL_DB_DIR}'.")
76
+ os.makedirs(LOCAL_DB_DIR, exist_ok=True)
77
 
78
  def clean_text(text):
79
  """Clean text by removing HTML tags and extra whitespace."""
80
  if not text or not isinstance(text, str):
81
  return ""
82
+ # Remove HTML tags
83
  text = re.sub(r'<.*?>', '', text)
84
+ # Normalize whitespace
85
  text = ' '.join(text.split())
86
  return text.strip().lower()
87
 
88
  def fetch_rss_feeds():
89
+ """Fetches and parses articles from a list of RSS feeds in a JSON file."""
90
  articles = []
91
  seen_keys = set()
92
 
 
94
  with open(FEEDS_FILE, 'r') as f:
95
  feed_categories = json.load(f)
96
  except FileNotFoundError:
97
+ logger.error(f"'{FEEDS_FILE}' not found. Please create it. No feeds to process.")
98
  return []
99
 
100
  for category, feeds in feed_categories.items():
 
108
  logger.info(f"Fetching {feed_url}")
109
  feed = feedparser.parse(feed_url)
110
  if feed.bozo:
111
+ logger.warning(f"Feed parsing error for {feed_url}: {feed.bozo_exception}")
112
  continue
113
+
114
+ for entry in feed.entries[:MAX_ARTICLES_PER_FEED]:
115
+ title = clean_text(entry.get("title", "No Title"))
116
+ link = entry.get("link", "") # Don't clean link URL
117
+ description = clean_text(entry.get("summary", entry.get("description", "")))
 
 
118
 
119
+ if not description: # Skip articles without content
120
+ continue
 
121
 
122
  published = "Unknown Date"
123
  for date_field in ["published", "updated", "created", "pubDate"]:
 
126
  parsed_date = dateutil.parser.parse(entry[date_field])
127
  published = parsed_date.strftime("%Y-%m-%d %H:%M:%S")
128
  break
129
+ except (ValueError, TypeError):
 
130
  continue
131
 
132
+ # Create a unique key to deduplicate articles before processing
133
  description_hash = hashlib.sha256(description.encode('utf-8')).hexdigest()
134
  key = f"{title}|{link}|{published}|{description_hash}"
135
+
136
  if key not in seen_keys:
137
  seen_keys.add(key)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
138
  articles.append({
139
+ "title": entry.get("title", "No Title"),
140
  "link": link,
141
  "description": description,
142
  "published": published,
143
+ "category": category,
 
144
  })
 
145
  except Exception as e:
146
+ logger.error(f"Error fetching or parsing feed {feed_url}: {e}")
147
+
148
+ logger.info(f"Total unique articles fetched: {len(articles)}")
149
  return articles
150
 
151
+ def process_and_store_articles(articles, vector_db):
152
+ """Processes articles and stores them in the Chroma DB, avoiding duplicates."""
153
+ new_docs = []
154
+ new_doc_ids = []
155
+
156
+ # Get all existing document IDs from the database once to check for duplicates
157
+ try:
158
+ existing_ids = set(vector_db.get(include=[])["ids"])
159
+ logger.info(f"Found {len(existing_ids)} existing documents in the database.")
160
+ except Exception as e:
161
+ logger.error(f"Could not retrieve existing IDs from DB. Assuming empty. Error: {e}")
162
+ existing_ids = set()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
163
 
 
 
 
164
  for article in articles:
165
  try:
166
+ # Recreate the same unique ID format for checking against the DB
167
  title = clean_text(article["title"])
168
+ link = article["link"]
 
169
  published = article["published"]
170
+ description = article["description"]
171
  description_hash = hashlib.sha256(description.encode('utf-8')).hexdigest()
172
  doc_id = f"{title}|{link}|{published}|{description_hash}"
173
+
174
  if doc_id in existing_ids:
175
+ logger.debug(f"Skipping duplicate article (ID already in DB): {title[:50]}...")
176
  continue
177
+
178
+ # Add to our in-memory set to avoid duplicates from the same batch
179
+ existing_ids.add(doc_id)
180
+
181
  metadata = {
182
  "title": article["title"],
183
  "link": article["link"],
 
184
  "published": article["published"],
185
  "category": article["category"],
186
+ # Store original description if needed, or keep it clean
187
+ # "original_description": article["description"],
188
  }
189
+ # The Document object itself doesn't take an ID
190
+ doc = Document(page_content=description, metadata=metadata)
191
+ new_docs.append(doc)
192
+ new_doc_ids.append(doc_id)
193
+
194
  except Exception as e:
195
+ logger.error(f"Error processing article '{article.get('title', 'N/A')}': {e}")
196
 
197
+ if new_docs:
198
+ logger.info(f"Adding {len(new_docs)} new documents to the database...")
199
  try:
200
+ # Provide the list of documents and a parallel list of their unique IDs
201
+ vector_db.add_documents(documents=new_docs, ids=new_doc_ids)
202
+ vector_db.persist() # Save changes to disk
203
+ logger.info("Successfully added new documents and persisted the database.")
204
  except Exception as e:
205
+ logger.error(f"Failed to add documents to Chroma DB: {e}")
 
 
 
 
 
 
 
 
 
206
  else:
207
+ logger.info("No new articles to add to the database.")
208
 
209
  def upload_to_hf_hub():
210
+ """Uploads the local Chroma DB directory to the Hugging Face Hub."""
211
+ if not os.path.exists(LOCAL_DB_DIR):
212
+ logger.warning(f"Local database directory '{LOCAL_DB_DIR}' not found. Nothing to upload.")
213
+ return
214
+
215
+ try:
216
+ # Ensure the repo exists before uploading.
217
+ hf_api.create_repo(repo_id=REPO_ID, repo_type="dataset", exist_ok=True)
 
 
 
 
 
 
 
 
 
218
 
219
+ logger.info(f"Uploading database from '{LOCAL_DB_DIR}' to Hugging Face repo: {REPO_ID}...")
220
+ # upload_folder is the recommended way to upload a directory's contents.
221
+ hf_api.upload_folder(
222
+ folder_path=LOCAL_DB_DIR,
223
+ repo_id=REPO_ID,
224
+ repo_type="dataset",
225
+ commit_message=f"Update database - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
226
+ )
227
+ logger.info(f"Database successfully uploaded to {REPO_ID}.")
228
+ except Exception as e:
229
+ logger.error(f"Error uploading to Hugging Face Hub: {e}")
230
+
231
+
232
+ def main():
233
+ """Main execution function to run the data pipeline."""
234
+ # 1. Ensure local DB exists by downloading from Hub or creating a new one
235
+ setup_local_db()
236
+
237
+ # 2. Initialize the vector DB object *after* the directory is guaranteed to exist
238
+ logger.info("Initializing Chroma vector database...")
239
+ vector_db = Chroma(
240
+ persist_directory=LOCAL_DB_DIR,
241
+ embedding_function=embedding_model,
242
+ collection_name=COLLECTION_NAME
243
+ )
244
+ logger.info("Chroma DB initialized.")
245
+
246
+ # 3. Fetch new articles from RSS feeds
247
  articles = fetch_rss_feeds()
248
+
249
+ # 4. Process new articles and add them to the DB
250
+ if articles:
251
+ process_and_store_articles(articles, vector_db)
252
+
253
+ # 5. Upload the potentially updated database back to the Hub
254
+ upload_to_hf_hub()
255
+
256
+ logger.info("Script finished.")
257
+
258
+
259
+ if __name__ == "__main__":
260
+ main()