Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, HTTPException, Query | |
from fastapi.middleware.cors import CORSMiddleware | |
from pydantic import BaseModel | |
from typing import List, Optional, Dict, Any | |
import asyncio | |
import logging | |
from datetime import datetime | |
import json | |
# Import our modules | |
from scraper import NewsletterScraper | |
from nlp import SentimentAnalyzer, KeywordExtractor | |
from summarizer import TextSummarizer | |
from translator import MultilingualTranslator | |
from tts import AudioGenerator | |
from utils import setup_logging, cache_results | |
# Setup logging | |
setup_logging() | |
logger = logging.getLogger(__name__) | |
# FastAPI app | |
app = FastAPI( | |
title="Global Business News Intelligence API", | |
description="Advanced news analysis with sentiment, summarization, and multilingual support", | |
version="1.0.0" | |
) | |
# CORS middleware | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
class AnalysisRequest(BaseModel): | |
query: str | |
num_articles: int = 20 | |
languages: List[str] = ["English"] | |
include_audio: bool = True | |
sentiment_models: List[str] = ["VADER", "Loughran-McDonald", "FinBERT"] | |
class AnalysisResponse(BaseModel): | |
query: str | |
total_articles: int | |
processing_time: float | |
average_sentiment: float | |
sentiment_distribution: Dict[str, int] | |
articles: List[Dict[str, Any]] | |
keywords: List[Dict[str, Any]] | |
summary: Dict[str, Any] | |
languages: List[str] | |
audio_files: Optional[Dict[str, str]] = None | |
class NewsAnalyzer: | |
"""Main news analysis orchestrator""" | |
def __init__(self): | |
self.scraper = NewsletterScraper() | |
self.sentiment_analyzer = SentimentAnalyzer() | |
self.keyword_extractor = KeywordExtractor() | |
self.summarizer = TextSummarizer() | |
self.translator = MultilingualTranslator() | |
self.audio_generator = AudioGenerator() | |
logger.info("NewsAnalyzer initialized successfully") | |
async def analyze_news_async(self, config: Dict[str, Any], progress_callback=None) -> Dict[str, Any]: | |
"""Async version of analyze_news""" | |
return self.analyze_news(config, progress_callback) | |
def analyze_news(self, config: Dict[str, Any], progress_callback=None) -> Dict[str, Any]: | |
"""Main analysis pipeline""" | |
start_time = datetime.now() | |
try: | |
query = config['query'] | |
num_articles = config.get('num_articles', 20) | |
languages = config.get('languages', ['English']) | |
include_audio = config.get('include_audio', True) | |
sentiment_models = config.get('sentiment_models', ['VADER', 'Loughran-McDonald', 'FinBERT']) | |
logger.info(f"Starting analysis for query: {query}") | |
if progress_callback: | |
progress_callback(10, "Scraping articles...") | |
# Step 1: Scrape articles | |
articles = self.scraper.scrape_news(query, num_articles) | |
logger.info(f"Scraped {len(articles)} articles") | |
if not articles: | |
raise ValueError("No articles found for the given query") | |
if progress_callback: | |
progress_callback(30, "Analyzing sentiment...") | |
# Step 2: Sentiment analysis | |
for article in articles: | |
article['sentiment'] = self.sentiment_analyzer.analyze_sentiment( | |
article['content'], | |
models=sentiment_models | |
) | |
if progress_callback: | |
progress_callback(50, "Extracting keywords...") | |
# Step 3: Keyword extraction | |
all_text = ' '.join([article['content'] for article in articles]) | |
keywords = self.keyword_extractor.extract_keywords(all_text) | |
if progress_callback: | |
progress_callback(60, "Generating summaries...") | |
# Step 4: Summarization | |
for article in articles: | |
article['summary'] = self.summarizer.summarize(article['content']) | |
# Multilingual summaries | |
if len(languages) > 1: | |
article['summaries'] = {} | |
for lang in languages: | |
if lang != 'English': | |
article['summaries'][lang] = self.translator.translate( | |
article['summary'], | |
target_lang=lang | |
) | |
else: | |
article['summaries'][lang] = article['summary'] | |
if progress_callback: | |
progress_callback(80, "Generating audio...") | |
# Step 5: Audio generation | |
audio_files = {} | |
if include_audio and languages: | |
# Create overall summary for audio | |
overall_summary = self.create_overall_summary(articles, keywords) | |
for lang in languages: | |
if lang in ['English', 'Hindi', 'Tamil']: | |
try: | |
if lang != 'English': | |
summary_text = self.translator.translate(overall_summary, target_lang=lang) | |
else: | |
summary_text = overall_summary | |
audio_file = self.audio_generator.generate_audio( | |
summary_text, | |
language=lang, | |
output_file=f"summary_{lang.lower()}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3" | |
) | |
audio_files[lang] = audio_file | |
except Exception as e: | |
logger.error(f"Error generating audio for {lang}: {str(e)}") | |
if progress_callback: | |
progress_callback(90, "Finalizing results...") | |
# Step 6: Calculate summary statistics | |
sentiments = [article['sentiment']['compound'] for article in articles] | |
average_sentiment = sum(sentiments) / len(sentiments) if sentiments else 0.0 | |
sentiment_distribution = { | |
'Positive': sum(1 for s in sentiments if s > 0.1), | |
'Negative': sum(1 for s in sentiments if s < -0.1), | |
'Neutral': sum(1 for s in sentiments if -0.1 <= s <= 0.1) | |
} | |
# Step 7: Prepare results | |
processing_time = (datetime.now() - start_time).total_seconds() | |
results = { | |
'query': query, | |
'total_articles': len(articles), | |
'processing_time': processing_time, | |
'average_sentiment': average_sentiment, | |
'sentiment_distribution': sentiment_distribution, | |
'articles': articles, | |
'keywords': keywords, | |
'languages': languages, | |
'audio_files': audio_files, | |
'summary': { | |
'average_sentiment': average_sentiment, | |
'total_articles': len(articles), | |
'sources': len(set([article['source'] for article in articles])), | |
'date_range': self.get_date_range(articles) | |
} | |
} | |
if progress_callback: | |
progress_callback(100, "Analysis complete!") | |
logger.info(f"Analysis completed successfully in {processing_time:.2f} seconds") | |
return results | |
except Exception as e: | |
logger.error(f"Error in analysis pipeline: {str(e)}") | |
raise e | |
def create_overall_summary(self, articles: List[Dict], keywords: List[Dict]) -> str: | |
"""Create an overall summary for audio generation""" | |
try: | |
# Get top keywords | |
top_keywords = [kw['keyword'] for kw in keywords[:10]] | |
# Calculate sentiment distribution | |
positive_count = sum(1 for article in articles if article['sentiment']['compound'] > 0.1) | |
negative_count = sum(1 for article in articles if article['sentiment']['compound'] < -0.1) | |
neutral_count = len(articles) - positive_count - negative_count | |
# Create summary text | |
summary = f"Analysis of {len(articles)} articles reveals " | |
if positive_count > negative_count: | |
summary += f"predominantly positive sentiment with {positive_count} positive, {negative_count} negative, and {neutral_count} neutral articles. " | |
elif negative_count > positive_count: | |
summary += f"predominantly negative sentiment with {negative_count} negative, {positive_count} positive, and {neutral_count} neutral articles. " | |
else: | |
summary += f"mixed sentiment with balanced coverage. " | |
if top_keywords: | |
summary += f"Key topics include: {', '.join(top_keywords[:5])}. " | |
# Add top stories | |
top_positive = sorted(articles, key=lambda x: x['sentiment']['compound'], reverse=True)[:2] | |
top_negative = sorted(articles, key=lambda x: x['sentiment']['compound'])[:2] | |
if top_positive[0]['sentiment']['compound'] > 0.1: | |
summary += f"Most positive coverage: {top_positive[0]['title'][:100]}. " | |
if top_negative[0]['sentiment']['compound'] < -0.1: | |
summary += f"Most concerning coverage: {top_negative[0]['title'][:100]}. " | |
return summary | |
except Exception as e: | |
logger.error(f"Error creating overall summary: {str(e)}") | |
return f"Analysis of {len(articles)} articles completed successfully." | |
def get_date_range(self, articles: List[Dict]) -> Dict[str, str]: | |
"""Get the date range of articles""" | |
try: | |
dates = [article['date'] for article in articles if 'date' in article and article['date']] | |
if dates: | |
dates = [d for d in dates if d is not None] | |
if dates: | |
min_date = min(dates) | |
max_date = max(dates) | |
return { | |
'start': str(min_date), | |
'end': str(max_date) | |
} | |
return {'start': 'Unknown', 'end': 'Unknown'} | |
except Exception as e: | |
logger.error(f"Error getting date range: {str(e)}") | |
return {'start': 'Unknown', 'end': 'Unknown'} | |
# Initialize the analyzer | |
analyzer = NewsAnalyzer() | |
# FastAPI endpoints | |
async def root(): | |
"""API root endpoint""" | |
return { | |
"message": "Global Business News Intelligence API", | |
"version": "1.0.0", | |
"docs": "/docs" | |
} | |
async def health_check(): | |
"""Health check endpoint""" | |
return {"status": "healthy", "timestamp": datetime.now().isoformat()} | |
async def analyze_news_endpoint( | |
query: str = Query(..., description="Company name, ticker, or keyword to analyze"), | |
num_articles: int = Query(20, description="Number of articles to analyze (5-50)", ge=5, le=50), | |
languages: List[str] = Query(["English"], description="Languages for summaries"), | |
include_audio: bool = Query(True, description="Generate audio summaries"), | |
sentiment_models: List[str] = Query(["VADER", "Loughran-McDonald", "FinBERT"], description="Sentiment models to use") | |
): | |
"""Main analysis endpoint""" | |
try: | |
config = { | |
'query': query, | |
'num_articles': num_articles, | |
'languages': languages, | |
'include_audio': include_audio, | |
'sentiment_models': sentiment_models | |
} | |
results = await analyzer.analyze_news_async(config) | |
return AnalysisResponse(**results) | |
except Exception as e: | |
logger.error(f"Error in analyze endpoint: {str(e)}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def analyze_news_post(request: AnalysisRequest): | |
"""POST version of analysis endpoint""" | |
try: | |
config = request.dict() | |
results = await analyzer.analyze_news_async(config) | |
return AnalysisResponse(**results) | |
except Exception as e: | |
logger.error(f"Error in analyze POST endpoint: {str(e)}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
async def get_available_sources(): | |
"""Get list of available news sources""" | |
return analyzer.scraper.get_available_sources() | |
async def get_available_models(): | |
"""Get list of available models""" | |
return { | |
"sentiment_models": ["VADER", "Loughran-McDonald", "FinBERT"], | |
"summarization_models": ["distilbart-cnn-12-6"], | |
"translation_models": ["Helsinki-NLP/opus-mt-en-hi", "Helsinki-NLP/opus-mt-en-fi"], | |
"audio_languages": ["English", "Hindi", "Tamil"] | |
} | |
async def extract_keywords_endpoint( | |
query: str, | |
num_keywords: int = Query(20, description="Number of keywords to extract", ge=5, le=50) | |
): | |
"""Extract keywords from a query or text""" | |
try: | |
# For demo purposes, we'll scrape a few articles and extract keywords | |
articles = analyzer.scraper.scrape_news(query, 5) | |
if not articles: | |
raise HTTPException(status_code=404, detail="No articles found for query") | |
all_text = ' '.join([article['content'] for article in articles]) | |
keywords = analyzer.keyword_extractor.extract_keywords(all_text, num_keywords=num_keywords) | |
return keywords | |
except Exception as e: | |
logger.error(f"Error in keywords endpoint: {str(e)}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) |