Spaces:
Sleeping
Sleeping
import os | |
import threading | |
from flask import Flask, render_template, request, jsonify | |
from rss_processor import fetch_rss_feeds, process_and_store_articles, download_from_hf_hub, upload_to_hf_hub, clean_text, LOCAL_DB_DIR | |
import logging | |
import time | |
from datetime import datetime | |
import hashlib | |
from langchain.vectorstores import Chroma | |
from langchain.embeddings import HuggingFaceEmbeddings | |
app = Flask(__name__) | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
loading_complete = True | |
last_update_time = time.time() | |
last_data_hash = None | |
def get_embedding_model(): | |
if not hasattr(get_embedding_model, "model"): | |
get_embedding_model.model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
return get_embedding_model.model | |
def get_vector_db(): | |
if not os.path.exists(LOCAL_DB_DIR): | |
return None | |
try: | |
return Chroma( | |
persist_directory=LOCAL_DB_DIR, | |
embedding_function=get_embedding_model(), | |
collection_name="news_articles" | |
) | |
except Exception as e: | |
logger.error(f"Failed to load vector DB: {e}") | |
return None | |
def load_feeds_in_background(): | |
global loading_complete, last_update_time | |
try: | |
logger.info("Starting background RSS feed fetch") | |
articles = fetch_rss_feeds() | |
logger.info(f"Fetched {len(articles)} articles") | |
process_and_store_articles(articles) | |
last_update_time = time.time() | |
logger.info("Background feed processing complete") | |
upload_to_hf_hub() | |
except Exception as e: | |
logger.error(f"Error in background feed loading: {e}") | |
finally: | |
loading_complete = True | |
def get_all_docs_from_db(): | |
vector_db = get_vector_db() | |
if not vector_db or vector_db._collection.count() == 0: | |
return {'documents': [], 'metadatas': []} | |
return vector_db.get(include=['documents', 'metadatas']) | |
def compute_data_hash(categorized_articles): | |
if not categorized_articles: return "" | |
data_str = "" | |
for cat, articles in sorted(categorized_articles.items()): | |
for article in sorted(articles, key=lambda x: x["published"]): | |
data_str += f"{cat}|{article['title']}|{article['link']}|{article['published']}|" | |
return hashlib.sha256(data_str.encode('utf-8')).hexdigest() | |
def process_docs_into_articles(docs_data): | |
enriched_articles = [] | |
seen_keys = set() | |
for doc, meta in zip(docs_data['documents'], docs_data['metadatas']): | |
if not meta: continue | |
title = meta.get("title", "No Title") | |
link = meta.get("link", "") | |
description = meta.get("original_description", "No Description") | |
published = meta.get("published", "Unknown Date").strip() | |
key = f"{title}|{link}|{published}" | |
if key not in seen_keys: | |
seen_keys.add(key) | |
try: | |
published_iso = datetime.strptime(published, "%Y-%m-%d %H:%M:%S").isoformat() | |
except (ValueError, TypeError): | |
published_iso = "1970-01-01T00:00:00" | |
enriched_articles.append({ | |
"title": title, | |
"link": link, | |
"description": description, | |
"category": meta.get("category", "Uncategorized"), | |
"published": published_iso, | |
"image": meta.get("image", "svg"), | |
}) | |
return enriched_articles | |
def index(): | |
global loading_complete, last_update_time, last_data_hash | |
if not os.path.exists(LOCAL_DB_DIR): | |
logger.info(f"No Chroma DB found at '{LOCAL_DB_DIR}', downloading from Hugging Face Hub...") | |
download_from_hf_hub() | |
loading_complete = False | |
threading.Thread(target=load_feeds_in_background, daemon=True).start() | |
try: | |
all_docs = get_all_docs_from_db() | |
if not all_docs['metadatas']: | |
logger.info("No articles in the DB yet") | |
return render_template("index.html", categorized_articles={}, has_articles=False, loading=True) | |
enriched_articles = process_docs_into_articles(all_docs) | |
enriched_articles.sort(key=lambda x: x["published"], reverse=True) | |
categorized_articles = {} | |
for article in enriched_articles: | |
cat = article["category"] | |
categorized_articles.setdefault(cat, []).append(article) | |
categorized_articles = dict(sorted(categorized_articles.items())) | |
for cat in categorized_articles: | |
categorized_articles[cat] = categorized_articles[cat][:10] | |
last_data_hash = compute_data_hash(categorized_articles) | |
return render_template("index.html", categorized_articles=categorized_articles, has_articles=True, loading=True) | |
except Exception as e: | |
logger.error(f"Error retrieving articles at startup: {e}", exc_info=True) | |
return render_template("index.html", categorized_articles={}, has_articles=False, loading=True) | |
def search(): | |
query = request.form.get('search') | |
if not query: | |
return jsonify({"categorized_articles": {}, "has_articles": False, "loading": False}) | |
try: | |
logger.info(f"Performing semantic search for: '{query}'") | |
vector_db = get_vector_db() | |
if not vector_db: | |
return jsonify({"categorized_articles": {}, "has_articles": False, "loading": False}) | |
results = vector_db.similarity_search_with_relevance_scores(query, k=50) | |
enriched_articles = [] | |
seen_keys = set() | |
for doc, score in results: | |
meta = doc.metadata | |
title = meta.get("title", "No Title") | |
link = meta.get("link", "") | |
key = f"{title}|{link}|{meta.get('published', '')}" | |
if key not in seen_keys: | |
seen_keys.add(key) | |
enriched_articles.append({ | |
"title": title, | |
"link": link, | |
"description": meta.get("original_description", "No Description"), | |
"category": meta.get("category", "Uncategorized"), | |
"published": meta.get("published", "Unknown Date"), | |
"image": meta.get("image", "svg"), | |
}) | |
categorized_articles = {} | |
for article in enriched_articles: | |
cat = article["category"] | |
categorized_articles.setdefault(cat, []).append(article) | |
return jsonify({ | |
"categorized_articles": categorized_articles, | |
"has_articles": bool(enriched_articles), | |
"loading": False | |
}) | |
except Exception as e: | |
logger.error(f"Semantic search error: {e}", exc_info=True) | |
return jsonify({"categorized_articles": {}, "has_articles": False, "loading": False}), 500 | |
def get_all_articles(category): | |
try: | |
all_docs = get_all_docs_from_db() | |
enriched_articles = process_docs_into_articles(all_docs) | |
category_articles = [ | |
article for article in enriched_articles if article["category"] == category | |
] | |
category_articles.sort(key=lambda x: x["published"], reverse=True) | |
return jsonify({"articles": category_articles, "category": category}) | |
except Exception as e: | |
logger.error(f"Error fetching all articles for category {category}: {e}") | |
return jsonify({"articles": [], "category": category}), 500 | |
def check_loading(): | |
global loading_complete, last_update_time | |
return jsonify({"status": "complete" if loading_complete else "loading", "last_update": last_update_time}) | |
def get_updates(): | |
global last_update_time, last_data_hash | |
try: | |
all_docs = get_all_docs_from_db() | |
if not all_docs['metadatas']: | |
return jsonify({"articles": {}, "last_update": last_update_time, "has_updates": False}) | |
enriched_articles = process_docs_into_articles(all_docs) | |
categorized_articles = {} | |
for article in enriched_articles: | |
cat = article["category"] | |
categorized_articles.setdefault(cat, []).append(article) | |
for cat in categorized_articles: | |
categorized_articles[cat].sort(key=lambda x: x["published"], reverse=True) | |
categorized_articles[cat] = categorized_articles[cat][:10] | |
current_data_hash = compute_data_hash(categorized_articles) | |
has_updates = last_data_hash != current_data_hash | |
if has_updates: | |
logger.info("New RSS data detected, sending updates to frontend") | |
last_data_hash = current_data_hash | |
return jsonify({"articles": categorized_articles, "last_update": last_update_time, "has_updates": True}) | |
else: | |
return jsonify({"articles": {}, "last_update": last_update_time, "has_updates": False}) | |
except Exception as e: | |
logger.error(f"Error fetching updates: {e}") | |
return jsonify({"articles": {}, "last_update": last_update_time, "has_updates": False}), 500 | |
def card_load(): | |
return render_template("card.html") | |
if __name__ == "__main__": | |
app.run(host="0.0.0.0", port=7860) |