|
|
|
""" |
|
FastAPI backend for the News Sentiment Analyzer. |
|
- Orchestrates scraping, NLP, summarization, translation, and TTS. |
|
- Safe for Hugging Face Spaces (CPU-only, lazy model loading, CORS open). |
|
""" |
|
|
|
from __future__ import annotations |
|
|
|
import os |
|
import json |
|
import logging |
|
from datetime import datetime |
|
from typing import Any, Dict, List, Optional |
|
|
|
from fastapi import FastAPI, Query |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from pydantic import BaseModel, Field |
|
|
|
|
|
from utils import ( |
|
setup_logging, |
|
load_config, |
|
calculate_processing_stats, |
|
calculate_sentiment_distribution, |
|
) |
|
from scraper import NewsletterScraper |
|
from summarizer import TextSummarizer, extract_key_sentences |
|
from translator import MultilingualTranslator |
|
from tts import AudioGenerator |
|
from nlp import SentimentAnalyzer, KeywordExtractor, TextProcessor |
|
|
|
|
|
|
|
|
|
|
|
setup_logging() |
|
logger = logging.getLogger("api") |
|
|
|
app = FastAPI( |
|
title="News Intelligence API", |
|
version="1.0.0", |
|
description="Backend for News Sentiment Analyzer (Hugging Face deploy-ready)", |
|
) |
|
|
|
|
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
class AnalyzeRequest(BaseModel): |
|
query: str = Field(..., description="Company / stock / keyword to analyze") |
|
num_articles: int = Field(20, ge=5, le=50, description="Number of articles (5-50)") |
|
languages: List[str] = Field(default_factory=lambda: ["English"]) |
|
include_audio: bool = True |
|
sentiment_models: List[str] = Field( |
|
default_factory=lambda: ["VADER", "Loughran-McDonald", "FinBERT"] |
|
) |
|
|
|
|
|
class AnalyzeResponse(BaseModel): |
|
query: str |
|
summary: Dict[str, Any] |
|
articles: List[Dict[str, Any]] |
|
keywords: List[Dict[str, Any]] |
|
audio_files: Optional[Dict[str, Optional[str]]] = None |
|
languages: List[str] |
|
config: Dict[str, Any] |
|
|
|
|
|
|
|
|
|
|
|
|
|
class NewsAnalyzer: |
|
""" |
|
All heavy components are created lazily to avoid high cold-start memory usage |
|
and to play nice with Hugging Face CPU-only Spaces. |
|
""" |
|
|
|
def __init__(self) -> None: |
|
self._cfg = load_config() |
|
|
|
self._scraper: Optional[NewsletterScraper] = None |
|
self._summarizer: Optional[TextSummarizer] = None |
|
self._translator: Optional[MultilingualTranslator] = None |
|
self._audio: Optional[AudioGenerator] = None |
|
self._sentiment: Optional[SentimentAnalyzer] = None |
|
self._keywords: Optional[KeywordExtractor] = None |
|
self._textproc: Optional[TextProcessor] = None |
|
|
|
logger.info("NewsAnalyzer initialized with lazy components.") |
|
|
|
|
|
|
|
@property |
|
def scraper(self) -> NewsletterScraper: |
|
if self._scraper is None: |
|
self._scraper = NewsletterScraper() |
|
return self._scraper |
|
|
|
@property |
|
def summarizer(self) -> TextSummarizer: |
|
if self._summarizer is None: |
|
self._summarizer = TextSummarizer() |
|
return self._summarizer |
|
|
|
@property |
|
def translator(self) -> MultilingualTranslator: |
|
if self._translator is None: |
|
self._translator = MultilingualTranslator() |
|
return self._translator |
|
|
|
@property |
|
def audio(self) -> AudioGenerator: |
|
if self._audio is None: |
|
self._audio = AudioGenerator() |
|
return self._audio |
|
|
|
@property |
|
def sentiment(self) -> SentimentAnalyzer: |
|
if self._sentiment is None: |
|
self._sentiment = SentimentAnalyzer() |
|
return self._sentiment |
|
|
|
@property |
|
def keyword_extractor(self) -> KeywordExtractor: |
|
if self._keywords is None: |
|
self._keywords = KeywordExtractor() |
|
return self._keywords |
|
|
|
@property |
|
def textproc(self) -> TextProcessor: |
|
if self._textproc is None: |
|
self._textproc = TextProcessor() |
|
return self._textproc |
|
|
|
|
|
|
|
def analyze_news( |
|
self, |
|
config: Dict[str, Any], |
|
progress_callback=None, |
|
) -> Dict[str, Any]: |
|
""" |
|
Synchronous pipeline used by Streamlit UI. |
|
(FastAPI endpoint wraps it synchronously as well.) |
|
""" |
|
start_time = datetime.now() |
|
query: str = config["query"].strip() |
|
num_articles: int = int(config.get("num_articles", 20)) |
|
languages: List[str] = config.get("languages", ["English"]) or ["English"] |
|
include_audio: bool = bool(config.get("include_audio", True)) |
|
sentiment_models: List[str] = config.get( |
|
"sentiment_models", ["VADER", "Loughran-McDonald", "FinBERT"] |
|
) |
|
|
|
if progress_callback: |
|
progress_callback(5, "Initializing pipeline...") |
|
|
|
|
|
if progress_callback: |
|
progress_callback(10, "Scraping articles...") |
|
articles = self.scraper.scrape_news(query, max_articles=num_articles) |
|
|
|
if not articles: |
|
|
|
return { |
|
"query": query, |
|
"summary": { |
|
"average_sentiment": 0.0, |
|
"distribution": {"positive": 0, "negative": 0, "neutral": 0, "total": 0}, |
|
"processing": calculate_processing_stats(start_time, 0), |
|
}, |
|
"articles": [], |
|
"keywords": [], |
|
"audio_files": {}, |
|
"languages": languages, |
|
"config": config, |
|
} |
|
|
|
|
|
for a in articles: |
|
if not a.get("content"): |
|
a["content"] = a.get("summary") or a.get("title") or "" |
|
|
|
|
|
if progress_callback: |
|
progress_callback(30, "Analyzing sentiment...") |
|
for a in articles: |
|
try: |
|
a["sentiment"] = self.sentiment.analyze_sentiment( |
|
a["content"], models=sentiment_models |
|
) |
|
except Exception as e: |
|
logger.exception(f"Sentiment failed for '{a.get('title','')[:60]}': {e}") |
|
a["sentiment"] = {"compound": 0.0} |
|
|
|
|
|
if progress_callback: |
|
progress_callback(50, "Generating summaries...") |
|
for a in articles: |
|
try: |
|
a["summary"] = self.summarizer.summarize(a["content"]) |
|
except Exception as e: |
|
logger.exception(f"Summarization failed: {e}") |
|
a["summary"] = self.textproc.clean_text(a["content"])[:300] + "..." |
|
|
|
|
|
if len(languages) > 1: |
|
if progress_callback: |
|
progress_callback(60, "Translating summaries...") |
|
for a in articles: |
|
a["summaries"] = {} |
|
for lang in languages: |
|
try: |
|
if lang == "English": |
|
a["summaries"][lang] = a["summary"] |
|
else: |
|
a["summaries"][lang] = self.translator.translate( |
|
a["summary"], target_lang=lang, source_lang="English" |
|
) |
|
except Exception as e: |
|
logger.exception(f"Translation failed ({lang}): {e}") |
|
a["summaries"][lang] = a["summary"] |
|
|
|
|
|
if progress_callback: |
|
progress_callback(70, "Extracting keywords...") |
|
joined = " ".join(a.get("content", "") for a in articles) |
|
keywords = self.keyword_extractor.extract_keywords(joined) if joined else [] |
|
|
|
|
|
audio_files: Dict[str, Optional[str]] = {} |
|
if include_audio and languages: |
|
if progress_callback: |
|
progress_callback(80, "Creating audio summaries...") |
|
overall_summary = self._overall_summary_text(articles, keywords) |
|
for lang in languages: |
|
try: |
|
summary_text = ( |
|
self.translator.translate(overall_summary, target_lang=lang) |
|
if lang != "English" |
|
else overall_summary |
|
) |
|
audio_files[lang] = self.audio.generate_audio( |
|
summary_text, |
|
language=lang, |
|
output_file=f"summary_{lang.lower()}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3", |
|
) |
|
except Exception as e: |
|
logger.exception(f"Audio failed ({lang}): {e}") |
|
audio_files[lang] = None |
|
|
|
|
|
if progress_callback: |
|
progress_callback(90, "Finalizing results...") |
|
dist = calculate_sentiment_distribution(articles) |
|
processing = calculate_processing_stats(start_time, len(articles)) |
|
|
|
results: Dict[str, Any] = { |
|
"query": query, |
|
"summary": { |
|
"average_sentiment": dist.get("average_sentiment", 0.0), |
|
"distribution": dist, |
|
"processing": processing, |
|
"top_sentences": extract_key_sentences(joined, num_sentences=3), |
|
}, |
|
"articles": articles, |
|
"keywords": keywords, |
|
"audio_files": audio_files, |
|
"languages": languages, |
|
"config": config, |
|
} |
|
|
|
if progress_callback: |
|
progress_callback(100, "Done.") |
|
return results |
|
|
|
|
|
|
|
def _overall_summary_text(self, articles: List[Dict[str, Any]], keywords: List[Dict[str, Any]]) -> str: |
|
"""Create a concise, human-friendly overall summary to read out in audio.""" |
|
pos = sum(1 for a in articles if a.get("sentiment", {}).get("compound", 0) > 0.1) |
|
neg = sum(1 for a in articles if a.get("sentiment", {}).get("compound", 0) < -0.1) |
|
neu = len(articles) - pos - neg |
|
|
|
top_kw = ", ".join(kw["keyword"] for kw in keywords[:8]) if keywords else "" |
|
|
|
latest_title = "" |
|
try: |
|
latest = sorted( |
|
[a for a in articles if a.get("date")], |
|
key=lambda x: x.get("date"), |
|
reverse=True, |
|
) |
|
if latest: |
|
latest_title = latest[0].get("title", "")[:120] |
|
except Exception: |
|
pass |
|
|
|
parts = [ |
|
f"News analysis summary for {len(articles)} articles.", |
|
f"Overall sentiment: {pos} positive, {neg} negative, and {neu} neutral articles.", |
|
] |
|
if latest_title: |
|
parts.append(f"Latest development: {latest_title}.") |
|
if top_kw: |
|
parts.append(f"Top themes include: {top_kw}.") |
|
parts.append("This concludes the summary.") |
|
return " ".join(parts) |
|
|
|
|
|
|
|
analyzer = NewsAnalyzer() |
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/health") |
|
def health() -> Dict[str, Any]: |
|
return { |
|
"status": "ok", |
|
"time": datetime.utcnow().isoformat(), |
|
"config": load_config(), |
|
} |
|
|
|
|
|
@app.get("/api/analyze", response_model=AnalyzeResponse) |
|
def analyze_get( |
|
query: str = Query(..., description="Company / stock / keyword"), |
|
num_articles: int = Query(20, ge=5, le=50), |
|
languages: str = Query("English", description="Comma-separated languages"), |
|
include_audio: bool = Query(True), |
|
sentiment_models: str = Query("VADER,Loughran-McDonald,FinBERT"), |
|
): |
|
req = AnalyzeRequest( |
|
query=query.strip(), |
|
num_articles=num_articles, |
|
languages=[x.strip() for x in languages.split(",") if x.strip()], |
|
include_audio=include_audio, |
|
sentiment_models=[x.strip() for x in sentiment_models.split(",") if x.strip()], |
|
) |
|
result = analyzer.analyze_news(req.dict()) |
|
return AnalyzeResponse(**result) |
|
|
|
|
|
@app.post("/api/analyze", response_model=AnalyzeResponse) |
|
def analyze_post(payload: AnalyzeRequest): |
|
result = analyzer.analyze_news(payload.dict()) |
|
return AnalyzeResponse(**result) |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
|
|
host = os.getenv("FASTAPI_HOST", "0.0.0.0") |
|
port = int(os.getenv("FASTAPI_PORT", "8000")) |
|
uvicorn.run("api:app", host=host, port=port, reload=False) |
|
|