Spaces:
Sleeping
Sleeping
import os | |
import threading | |
from flask import Flask, render_template, request, jsonify | |
from rss_processor import fetch_rss_feeds, process_and_store_articles, download_from_hf_hub, upload_to_hf_hub, clean_text, LOCAL_DB_DIR | |
import logging | |
import time | |
from datetime import datetime | |
from langchain.vectorstores import Chroma | |
from langchain.embeddings import HuggingFaceEmbeddings | |
# --- Basic Flask App Setup --- | |
app = Flask(__name__) | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# --- Global State Management --- | |
loading_complete = True | |
last_update_time = None | |
# --- Embedding and Vector DB Management --- | |
def get_embedding_model(): | |
"""Initializes and returns a singleton HuggingFace embedding model.""" | |
if not hasattr(get_embedding_model, "model"): | |
get_embedding_model.model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") | |
return get_embedding_model.model | |
def get_vector_db(): | |
"""Initializes and returns a singleton Chroma DB client.""" | |
if not os.path.exists(LOCAL_DB_DIR): | |
logger.warning(f"Vector DB not found at {LOCAL_DB_DIR}. It may need to be downloaded or created.") | |
return None | |
try: | |
if not hasattr(get_vector_db, "db_instance"): | |
get_vector_db.db_instance = Chroma( | |
persist_directory=LOCAL_DB_DIR, | |
embedding_function=get_embedding_model(), | |
collection_name="news_articles" | |
) | |
return get_vector_db.db_instance | |
except Exception as e: | |
logger.error(f"Failed to load vector DB: {e}") | |
if hasattr(get_vector_db, "db_instance"): | |
delattr(get_vector_db, "db_instance") | |
return None | |
# --- Background Processing --- | |
def load_feeds_in_background(): | |
"""Fetches RSS feeds, processes articles, and uploads to Hub in a background thread.""" | |
global loading_complete, last_update_time | |
if not loading_complete: | |
logger.info("An update is already in progress. Skipping.") | |
return | |
loading_complete = False | |
try: | |
logger.info("Starting background RSS feed fetch and processing...") | |
articles = fetch_rss_feeds() | |
logger.info(f"Fetched {len(articles)} articles from RSS feeds.") | |
if articles: | |
process_and_store_articles(articles) | |
upload_to_hf_hub() | |
last_update_time = datetime.now().isoformat() | |
logger.info("Background feed processing complete.") | |
except Exception as e: | |
logger.error(f"Error in background feed loading: {e}") | |
finally: | |
loading_complete = True | |
# --- Data Transformation Helper (Used by both SSR and API) --- | |
def format_articles_from_db(docs): | |
""" | |
Takes ChromaDB documents and formats them into a standardized list of article dictionaries. | |
""" | |
enriched_articles = [] | |
seen_keys = set() | |
items = [] | |
# Handle .get() results (dict of lists) | |
if isinstance(docs, dict) and 'metadatas' in docs: | |
items = zip(docs.get('documents', []), docs.get('metadatas', [])) | |
# Handle similarity_search results (list of (Document, score) tuples) | |
elif isinstance(docs, list): | |
items = [(doc.page_content, doc.metadata) for doc, score in docs] | |
for doc_content, meta in items: | |
if not meta: continue | |
title = meta.get("title", "No Title") | |
link = meta.get("link", "") | |
key = f"{title}|{link}" | |
if key not in seen_keys: | |
seen_keys.add(key) | |
published_str = meta.get("published", "").strip() | |
try: | |
# The format from your original `process_and_store_articles` | |
published_iso = datetime.strptime(published_str, "%Y-%m-%d %H:%M:%S").isoformat() | |
except (ValueError, TypeError): | |
published_iso = datetime.utcnow().isoformat() | |
enriched_articles.append({ | |
"id": meta.get("id", link), | |
"title": title, | |
"link": link, | |
"description": meta.get("original_description", "No Description"), | |
"category": meta.get("category", "Uncategorized"), | |
"published": published_iso, | |
"image": meta.get("image", "svg"), | |
}) | |
enriched_articles.sort(key=lambda x: x["published"], reverse=True) | |
return enriched_articles | |
# -------------------------------------------------------------------------------- | |
# --- Web Application Route (Server-Side Rendered) --- | |
# -------------------------------------------------------------------------------- | |
def index(): | |
""" | |
Renders the main web page by fetching, processing, and passing data | |
to the template on the server side. This preserves the original functionality. | |
""" | |
# Perform startup checks | |
if not os.path.exists(LOCAL_DB_DIR): | |
logger.info(f"No Chroma DB found at '{LOCAL_DB_DIR}', downloading from Hugging Face Hub...") | |
download_from_hf_hub() | |
# Trigger background update | |
threading.Thread(target=load_feeds_in_background, daemon=True).start() | |
try: | |
# Fetch all data from the DB for rendering | |
vector_db = get_vector_db() | |
if not vector_db: | |
raise ConnectionError("Database could not be loaded.") | |
all_docs = vector_db.get(include=['documents', 'metadatas']) | |
if not all_docs or not all_docs['metadatas']: | |
logger.info("No articles in the DB yet for initial render.") | |
return render_template("index.html", categorized_articles={}, has_articles=False, loading=True) | |
# Process and categorize articles for the template | |
enriched_articles = format_articles_from_db(all_docs) | |
categorized_articles = {} | |
for article in enriched_articles: | |
cat = article["category"] | |
categorized_articles.setdefault(cat, []).append(article) | |
categorized_articles = dict(sorted(categorized_articles.items())) | |
# Limit to 10 articles per category for the main page view | |
for cat in categorized_articles: | |
categorized_articles[cat] = categorized_articles[cat][:10] | |
return render_template( | |
"index.html", | |
categorized_articles=categorized_articles, | |
has_articles=True, | |
# The original code didn't pass loading, but it's good practice | |
loading=not loading_complete | |
) | |
except Exception as e: | |
logger.error(f"Error rendering index page: {e}", exc_info=True) | |
# Fallback render in case of error | |
return render_template("index.html", categorized_articles={}, has_articles=False, loading=True, error="Could not load articles.") | |
# Your original search route, which was also server-side | |
# We can keep it or decide to use the API for search on the frontend | |
def search(): | |
# This route returns a JSON payload to be handled by JavaScript. | |
# It functions like an API endpoint and is a good example of a hybrid approach. | |
query = request.form.get('search') | |
if not query: | |
return jsonify({"categorized_articles": {}, "has_articles": False, "loading": False}) | |
vector_db = get_vector_db() | |
if not vector_db: | |
return jsonify({"error": "Database not available"}), 503 | |
results = vector_db.similarity_search_with_relevance_scores(query, k=50) | |
enriched_articles = format_articles_from_db(results) | |
categorized_articles = {} | |
for article in enriched_articles: | |
cat = article["category"] | |
categorized_articles.setdefault(cat, []).append(article) | |
return jsonify({ | |
"categorized_articles": categorized_articles, | |
"has_articles": bool(enriched_articles), | |
"loading": False | |
}) | |
# -------------------------------------------------------------------------------- | |
# --- NEW: Standalone API v1 Endpoints (Return only JSON) --- | |
# -------------------------------------------------------------------------------- | |
def api_search(): | |
"""API endpoint for semantic search.""" | |
query = request.args.get('q') | |
limit = request.args.get('limit', default=20, type=int) | |
if not query: | |
return jsonify({"error": "Query parameter 'q' is required."}), 400 | |
vector_db = get_vector_db() | |
if not vector_db: | |
return jsonify({"error": "Database not available."}), 503 | |
try: | |
results = vector_db.similarity_search_with_relevance_scores(query, k=limit) | |
formatted_articles = format_articles_from_db(results) | |
return jsonify(formatted_articles) | |
except Exception as e: | |
logger.error(f"API Search error: {e}", exc_info=True) | |
return jsonify({"error": "An internal error occurred during search."}), 500 | |
def api_get_articles_by_category(category_name): | |
"""API endpoint to get articles filtered by category with pagination.""" | |
limit = request.args.get('limit', default=20, type=int) | |
offset = request.args.get('offset', default=0, type=int) | |
vector_db = get_vector_db() | |
if not vector_db: | |
return jsonify({"error": "Database not available."}), 503 | |
try: | |
results = vector_db.get(where={"category": category_name}, include=['documents', 'metadatas']) | |
formatted_articles = format_articles_from_db(results) | |
paginated_results = formatted_articles[offset : offset + limit] | |
return jsonify({ | |
"category": category_name, | |
"total_articles": len(formatted_articles), | |
"articles": paginated_results | |
}) | |
except Exception as e: | |
logger.error(f"API Category fetch error: {e}", exc_info=True) | |
return jsonify({"error": "An internal error occurred."}), 500 | |
# Other routes like /card, /get_updates, etc. from your original file would go here. | |
# --- Main Application Runner --- | |
if __name__ == "__main__": | |
app.run(host="0.0.0.0", port=7860, debug=False) |