broadfield-dev commited on
Commit
4f6bd49
·
verified ·
1 Parent(s): 24dab84

Update rss_processor.py

Browse files
Files changed (1) hide show
  1. rss_processor.py +65 -83
rss_processor.py CHANGED
@@ -159,104 +159,86 @@ def categorize_feed(url):
159
  logger.warning(f"No matching category found for URL: {url}")
160
  return "Uncategorized"
161
 
162
- def process_and_store_articles(articles, vector_db):
163
- documents = []
164
- doc_ids = []
 
 
 
165
 
166
  try:
167
  existing_ids = set(vector_db.get(include=[])["ids"])
168
- logger.info(f"Found {len(existing_ids)} existing document IDs in the database.")
169
  except Exception:
170
  existing_ids = set()
171
- logger.info("No existing documents found or error retrieving them. Starting fresh.")
172
 
 
 
 
173
  for article in articles:
174
- try:
175
- title = clean_text(article["title"])
176
- link = article["link"]
177
- description = article["description"]
178
- published = article["published"]
179
- description_hash = hashlib.sha256(description.encode('utf-8')).hexdigest()
180
- doc_id = f"{title}|{link}|{published}|{description_hash}"
181
-
182
- if doc_id in existing_ids:
183
- continue
184
-
185
- metadata = {
186
- "title": article["title"],
187
- "link": article["link"],
188
- "published": article["published"],
189
- "category": article["category"],
190
- "image": article["image"],
191
- }
192
- doc = Document(page_content=description, metadata=metadata)
193
- documents.append(doc)
194
- doc_ids.append(doc_id)
195
- existing_ids.add(doc_id)
196
- except Exception as e:
197
- logger.error(f"Error processing article {article['title']}: {e}")
198
 
199
- if documents:
200
  try:
201
- vector_db.add_documents(documents=documents, ids=doc_ids)
202
  vector_db.persist()
203
- logger.info(f"Added {len(documents)} new articles to DB. Total documents now: {len(vector_db.get()['ids'])}")
204
  except Exception as e:
205
  logger.error(f"Error storing articles: {e}")
206
- else:
207
- logger.info("No new articles to add.")
208
 
209
  def download_from_hf_hub():
210
- if os.path.exists(LOCAL_DB_DIR):
211
- logger.info(f"Local database directory '{LOCAL_DB_DIR}' already exists. Skipping download.")
212
- return
213
-
214
- logger.info(f"Attempting to download database from Hugging Face Hub repo: {REPO_ID}")
215
- try:
216
- snapshot_download(
217
- repo_id=REPO_ID,
218
- repo_type="dataset",
219
- local_dir=LOCAL_DB_DIR,
220
- token=HF_API_TOKEN,
221
- )
222
- logger.info(f"Database successfully downloaded to '{LOCAL_DB_DIR}'.")
223
- except HfHubHTTPError as e:
224
- logger.warning(f"Failed to download from Hub (repo may be new or empty): {e}. Building new dataset locally.")
225
- os.makedirs(LOCAL_DB_DIR, exist_ok=True)
226
- except Exception as e:
227
- logger.error(f"An unexpected error occurred during download: {e}. Creating new local directory.")
228
- os.makedirs(LOCAL_DB_DIR, exist_ok=True)
229
 
230
  def upload_to_hf_hub():
231
- if not os.path.exists(LOCAL_DB_DIR):
232
- logger.warning(f"Local database directory '{LOCAL_DB_DIR}' not found. Nothing to upload.")
233
- return
 
 
 
 
 
 
 
 
 
 
 
 
 
234
 
235
- try:
236
- hf_api.create_repo(repo_id=REPO_ID, repo_type="dataset", exist_ok=True)
237
- logger.info(f"Uploading updated Chroma DB to {REPO_ID}...")
238
- hf_api.upload_folder(
239
- folder_path=LOCAL_DB_DIR,
240
- repo_id=REPO_ID,
241
- repo_type="dataset",
242
- commit_message=f"Update database - {datetime.now().isoformat()}"
243
- )
244
- logger.info(f"Database uploaded successfully to Hugging Face Hub.")
245
- except Exception as e:
246
- logger.error(f"Error uploading to Hugging Face Hub: {e}")
247
 
248
- def run_update_pipeline():
249
- download_from_hf_hub()
250
-
251
- vector_db = Chroma(
252
- persist_directory=LOCAL_DB_DIR,
253
- embedding_function=embedding_model,
254
- collection_name=COLLECTION_NAME
255
- )
256
-
257
- articles = fetch_rss_feeds()
258
- if articles:
259
- process_and_store_articles(articles, vector_db)
260
-
261
- upload_to_hf_hub()
262
- logger.info("Update pipeline finished.")
 
159
  logger.warning(f"No matching category found for URL: {url}")
160
  return "Uncategorized"
161
 
162
+ def process_and_store_articles(articles):
163
+ vector_db = Chroma(
164
+ persist_directory=LOCAL_DB_DIR,
165
+ embedding_function=get_embedding_model(),
166
+ collection_name=COLLECTION_NAME
167
+ )
168
 
169
  try:
170
  existing_ids = set(vector_db.get(include=[])["ids"])
171
+ logger.info(f"Loaded {len(existing_ids)} existing document IDs from {LOCAL_DB_DIR}.")
172
  except Exception:
173
  existing_ids = set()
174
+ logger.info("No existing DB found or it is empty. Starting fresh.")
175
 
176
+ docs_to_add = []
177
+ ids_to_add = []
178
+
179
  for article in articles:
180
+ cleaned_title = clean_text(article["title"])
181
+ cleaned_link = clean_text(article["link"])
182
+ doc_id = f"{cleaned_title}|{cleaned_link}|{article['published']}"
183
+
184
+ if doc_id in existing_ids:
185
+ continue
186
+
187
+ metadata = {
188
+ "title": article["title"],
189
+ "link": article["link"],
190
+ "original_description": article["description"],
191
+ "published": article["published"],
192
+ "category": article["category"],
193
+ "image": article["image"],
194
+ }
195
+ doc = Document(page_content=clean_text(article["description"]), metadata=metadata)
196
+ docs_to_add.append(doc)
197
+ ids_to_add.append(doc_id)
198
+ existing_ids.add(doc_id)
 
 
 
 
 
199
 
200
+ if docs_to_add:
201
  try:
202
+ vector_db.add_documents(documents=docs_to_add, ids=ids_to_add)
203
  vector_db.persist()
204
+ logger.info(f"Added {len(docs_to_add)} new articles to DB. Total in DB: {vector_db._collection.count()}")
205
  except Exception as e:
206
  logger.error(f"Error storing articles: {e}")
 
 
207
 
208
  def download_from_hf_hub():
209
+ if not os.path.exists(LOCAL_DB_DIR):
210
+ try:
211
+ logger.info(f"Downloading Chroma DB from {REPO_ID} to {LOCAL_DB_DIR}...")
212
+ snapshot_download(
213
+ repo_id=REPO_ID,
214
+ repo_type="dataset",
215
+ local_dir=".",
216
+ local_dir_use_symlinks=False,
217
+ allow_patterns=f"{LOCAL_DB_DIR}/**",
218
+ token=HF_API_TOKEN
219
+ )
220
+ logger.info("Finished downloading DB.")
221
+ except Exception as e:
222
+ logger.warning(f"Could not download from Hugging Face Hub (this is normal on first run): {e}")
223
+ else:
224
+ logger.info("Local Chroma DB exists, loading existing data.")
 
 
 
225
 
226
  def upload_to_hf_hub():
227
+ if os.path.exists(LOCAL_DB_DIR):
228
+ try:
229
+ logger.info(f"Uploading updated Chroma DB '{LOCAL_DB_DIR}' to {REPO_ID}...")
230
+ hf_api.upload_folder(
231
+ folder_path=LOCAL_DB_DIR,
232
+ path_in_repo=LOCAL_DB_DIR,
233
+ repo_id=REPO_ID,
234
+ repo_type="dataset",
235
+ token=HF_API_TOKEN,
236
+ commit_message="Update RSS news database"
237
+ )
238
+ logger.info(f"Database folder '{LOCAL_DB_DIR}' uploaded to: {REPO_ID}")
239
+ except Exception as e:
240
+ logger.error(f"Error uploading to Hugging Face Hub: {e}")
241
+
242
+
243
 
 
 
 
 
 
 
 
 
 
 
 
 
244