RSS_News_1 / app.py
broadfield-dev's picture
Update app.py
679afad verified
raw
history blame
9.97 kB
import os
import threading
from flask import Flask, render_template, request, jsonify
from rss_processor import fetch_rss_feeds, process_and_store_articles, download_from_hf_hub, upload_to_hf_hub, clean_text, LOCAL_DB_DIR
import logging
import time
import json
from datetime import datetime
from langchain.vectorstores import Chroma
from langchain.embeddings import HuggingFaceEmbeddings
# --- Basic Flask App Setup ---
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# --- Global State Management ---
loading_complete = True
last_update_time = None
# --- Embedding and Vector DB Management ---
def get_embedding_model():
"""Initializes and returns a singleton HuggingFace embedding model."""
# Using a simple hasattr check for a singleton pattern
if not hasattr(get_embedding_model, "model"):
get_embedding_model.model = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
return get_embedding_model.model
def get_vector_db():
"""Initializes and returns a singleton Chroma DB client."""
if not os.path.exists(LOCAL_DB_DIR):
logger.warning(f"Vector DB not found at {LOCAL_DB_DIR}. It may need to be downloaded or created.")
return None
try:
# Using a simple hasattr check for a singleton pattern
if not hasattr(get_vector_db, "db_instance"):
get_vector_db.db_instance = Chroma(
persist_directory=LOCAL_DB_DIR,
embedding_function=get_embedding_model(),
collection_name="news_articles"
)
return get_vector_db.db_instance
except Exception as e:
logger.error(f"Failed to load vector DB: {e}")
# Invalidate instance on failure
if hasattr(get_vector_db, "db_instance"):
delattr(get_vector_db, "db_instance")
return None
# --- Background Processing ---
def load_feeds_in_background():
"""Fetches RSS feeds, processes articles, and uploads to Hub in a background thread."""
global loading_complete, last_update_time
# Ensure only one background process runs at a time
if not loading_complete:
logger.info("An update is already in progress. Skipping.")
return
loading_complete = False
try:
logger.info("Starting background RSS feed fetch and processing...")
articles = fetch_rss_feeds()
logger.info(f"Fetched {len(articles)} articles from RSS feeds.")
if articles:
process_and_store_articles(articles)
upload_to_hf_hub()
last_update_time = datetime.now().isoformat()
logger.info("Background feed processing complete.")
except Exception as e:
logger.error(f"Error in background feed loading: {e}")
finally:
loading_complete = True
# --- Data Transformation Helper ---
def format_articles_from_db(docs):
"""
Takes ChromaDB documents (with metadata) and formats them into a standardized list of article dictionaries.
Handles deduplication based on title and link.
"""
enriched_articles = []
seen_keys = set()
# The 'docs' can be a list of (Document, score) tuples or a dict from .get()
items = []
if isinstance(docs, dict) and 'metadatas' in docs:
items = zip(docs['documents'], docs['metadatas'])
elif isinstance(docs, list):
items = [(doc.page_content, doc.metadata) for doc, score in docs]
for doc_content, meta in items:
if not meta: continue
title = meta.get("title", "No Title")
link = meta.get("link", "")
# Use a composite key to identify unique articles
key = f"{title}|{link}"
if key not in seen_keys:
seen_keys.add(key)
# Safely parse the published date
published_str = meta.get("published", "").strip()
try:
published_iso = datetime.strptime(published_str, "%Y-%m-%d %H:%M:%S").isoformat()
except (ValueError, TypeError):
published_iso = datetime.utcnow().isoformat() # Default to now if format is wrong
enriched_articles.append({
"id": meta.get("id", link), # Provide a unique ID
"title": title,
"link": link,
"description": meta.get("original_description", "No Description"),
"category": meta.get("category", "Uncategorized"),
"published": published_iso,
"image": meta.get("image", "svg"),
})
# Sort by date descending by default
enriched_articles.sort(key=lambda x: x["published"], reverse=True)
return enriched_articles
# --------------------------------------------------------------------------------
# --- API v1 Endpoints ---
# --------------------------------------------------------------------------------
#
# API Usage Guide:
#
# GET /api/v1/search?q=<query>&limit=<n>
# - Performs semantic search.
# - `q`: The search term (required).
# - `limit`: Max number of results to return (optional, default=20).
#
# GET /api/v1/articles/category/<name>?limit=<n>&offset=<o>
# - Retrieves all articles for a given category.
# - `name`: The category name (e.g., "Technology").
# - `limit`: For pagination (optional, default=20).
# - `offset`: For pagination (optional, default=0).
#
# GET /api/v1/categories
# - Returns a list of all unique article categories.
#
# GET /api/v1/status
# - Checks the status of the background data processing task.
#
# --------------------------------------------------------------------------------
@app.route('/api/v1/search', methods=['GET'])
def api_search():
"""API endpoint for semantic search."""
query = request.args.get('q')
limit = request.args.get('limit', default=20, type=int)
if not query:
return jsonify({"error": "Query parameter 'q' is required."}), 400
vector_db = get_vector_db()
if not vector_db:
return jsonify({"error": "Database not available."}), 503
try:
logger.info(f"API: Performing semantic search for: '{query}'")
results = vector_db.similarity_search_with_relevance_scores(query, k=limit)
formatted_articles = format_articles_from_db(results)
return jsonify(formatted_articles)
except Exception as e:
logger.error(f"API Search error: {e}", exc_info=True)
return jsonify({"error": "An internal error occurred during search."}), 500
@app.route('/api/v1/articles/category/<string:category_name>', methods=['GET'])
def api_get_articles_by_category(category_name):
"""API endpoint to get articles filtered by category with pagination."""
limit = request.args.get('limit', default=20, type=int)
offset = request.args.get('offset', default=0, type=int)
vector_db = get_vector_db()
if not vector_db:
return jsonify({"error": "Database not available."}), 503
try:
logger.info(f"API: Fetching articles for category '{category_name}'")
# Use Chroma's metadata filtering for efficiency
results = vector_db.get(
where={"category": category_name},
include=['documents', 'metadatas']
)
formatted_articles = format_articles_from_db(results)
paginated_results = formatted_articles[offset : offset + limit]
return jsonify({
"category": category_name,
"total_articles": len(formatted_articles),
"articles": paginated_results
})
except Exception as e:
logger.error(f"API Category fetch error: {e}", exc_info=True)
return jsonify({"error": "An internal error occurred."}), 500
@app.route('/api/v1/categories', methods=['GET'])
def api_get_categories():
"""API endpoint to get a list of all unique categories."""
vector_db = get_vector_db()
if not vector_db:
return jsonify({"error": "Database not available."}), 503
try:
# Fetch only metadata to be efficient
all_metadata = vector_db.get(include=['metadatas'])['metadatas']
if not all_metadata:
return jsonify([])
unique_categories = sorted(list({meta['category'] for meta in all_metadata if 'category' in meta}))
return jsonify(unique_categories)
except Exception as e:
logger.error(f"API Categories fetch error: {e}", exc_info=True)
return jsonify({"error": "An internal error occurred."}), 500
@app.route('/api/v1/status', methods=['GET'])
def api_get_status():
"""API endpoint to check the data processing status."""
return jsonify({
"status": "complete" if loading_complete else "loading",
"last_update_time": last_update_time
})
# --------------------------------------------------------------------------------
# --- Web Application Routes ---
# --------------------------------------------------------------------------------
@app.route('/')
def index():
"""Renders the main web page. Data is fetched by frontend JavaScript."""
return render_template("index.html")
@app.route('/card')
def card_load():
"""Renders a sample card component."""
return render_template("card.html")
# --- Main Application Runner ---
if __name__ == "__main__":
# On startup, ensure the database exists or download it.
if not os.path.exists(LOCAL_DB_DIR):
logger.info(f"No local DB found at '{LOCAL_DB_DIR}'. Downloading from Hugging Face Hub...")
download_from_hf_hub()
# Initialize the vector DB instance
get_vector_db()
# Start the first background update immediately.
threading.Thread(target=load_feeds_in_background, daemon=True).start()
# Note: For a production environment, use a proper WSGI server like Gunicorn or uWSGI
# instead of Flask's built-in development server.
app.run(host="0.0.0.0", port=7860, debug=False)