import logging from typing import Dict, List, Any, Optional, Tuple import io from datetime import datetime import base64 logger = logging.getLogger(__name__) # ------------------------------- # Optional PDF backends # ------------------------------- try: from reportlab.lib.pagesizes import A4 from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle from reportlab.lib.units import inch from reportlab.lib import colors REPORTLAB_AVAILABLE = True except ImportError: REPORTLAB_AVAILABLE = False try: from fpdf import FPDF FPDF_AVAILABLE = True except ImportError: FPDF_AVAILABLE = False # Optional plotting for chart images (base64) try: import matplotlib.pyplot as plt import matplotlib matplotlib.use('Agg') MATPLOTLIB_AVAILABLE = True except ImportError: MATPLOTLIB_AVAILABLE = False # ------------------------------- # Small helpers # ------------------------------- def _safe_div(a: float, b: float) -> float: try: return (a / b) if b else 0.0 except Exception: return 0.0 def _norm_dist_from_results(results: Dict[str, Any]) -> Tuple[int, Dict[str, int], float]: """ Normalize fields from both the legacy structure and the new API structure. Returns: total_articles, counts dict {'Positive': int, 'Negative': int, 'Neutral': int}, average_sentiment (float) """ # Prefer the new API shape: results["summary"]["distribution"] etc. articles = results.get("articles", []) or [] total = results.get("total_articles") or len(articles) # backfill if missing avg = 0.0 if "summary" in results: avg = results["summary"].get("average_sentiment", 0.0) or 0.0 dist = results["summary"].get("distribution", {}) or {} pos = dist.get("positive") or dist.get("Positive") or 0 neg = dist.get("negative") or dist.get("Negative") or 0 neu = dist.get("neutral") or dist.get("Neutral") or 0 else: # Legacy keys (if present) avg = results.get("average_sentiment", 0.0) or 0.0 legacy = results.get("sentiment_distribution", {}) or {} pos = legacy.get("Positive") or legacy.get("positive") or 0 neg = legacy.get("Negative") or legacy.get("negative") or 0 neu = legacy.get("Neutral") or legacy.get("neutral") or 0 # If counts are 0 but we have articles, compute from article sentiments if (pos + neg + neu == 0) and articles: for a in articles: c = (a.get("sentiment") or {}).get("compound", 0.0) if c > 0.1: pos += 1 elif c < -0.1: neg += 1 else: neu += 1 return total, {"Positive": pos, "Negative": neg, "Neutral": neu}, float(avg) def _get_processing_time(results: Dict[str, Any]) -> float: # New structure: results["summary"]["processing"]["processing_time_seconds"] try: return float(results.get("summary", {}).get("processing", {}).get("processing_time_seconds", 0.0)) except Exception: pass # Legacy field try: return float(results.get("processing_time", 0.0)) except Exception: return 0.0 # ------------------------------- # Public API # ------------------------------- def generate_pdf_report(results: Dict[str, Any]) -> io.BytesIO: """ Generate a comprehensive PDF report. Returns a BytesIO buffer so Streamlit can download directly. """ if REPORTLAB_AVAILABLE: try: return _generate_pdf_with_reportlab(results) except Exception as e: logger.exception(f"ReportLab PDF generation failed: {e}") # Fallback if FPDF_AVAILABLE: return _generate_simple_pdf_fallback(results) # Last resort: a tiny text buffer buf = io.BytesIO() buf.write(b"PDF generation is unavailable (ReportLab/FPDF not installed).") buf.seek(0) return buf # ------------------------------- # ReportLab implementation # ------------------------------- def _generate_pdf_with_reportlab(results: Dict[str, Any]) -> io.BytesIO: buffer = io.BytesIO() doc = SimpleDocTemplate( buffer, pagesize=A4, rightMargin=72, leftMargin=72, topMargin=72, bottomMargin=18, ) styles = getSampleStyleSheet() title_style = ParagraphStyle( 'CustomTitle', parent=styles['Heading1'], fontSize=22, spaceAfter=24, textColor=colors.HexColor('#2E86AB'), alignment=1 # Center ) heading_style = ParagraphStyle( 'CustomHeading', parent=styles['Heading2'], fontSize=14, spaceAfter=10, spaceBefore=18, textColor=colors.HexColor('#2E86AB') ) story: List[Any] = [] # Title query = results.get('query', 'N/A') story.append(Paragraph(f"Global Business News Intelligence Report", title_style)) story.append(Spacer(1, 0.35 * inch)) story.append(Paragraph(f"Analysis Target: {query}", styles['Normal'])) story.append(Paragraph(f"Report Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}", styles['Normal'])) total, dist_counts, avg = _norm_dist_from_results(results) proc_time = _get_processing_time(results) story.append(Paragraph(f"Total Articles Analyzed: {total}", styles['Normal'])) story.append(Paragraph(f"Processing Time: {proc_time:.2f} seconds", styles['Normal'])) story.append(Spacer(1, 0.25 * inch)) # Executive Summary story.append(Paragraph("Executive Summary", heading_style)) story.append(Paragraph(_create_executive_summary(query, total, avg, dist_counts), styles['Normal'])) story.append(Spacer(1, 0.2 * inch)) # Sentiment Analysis story.append(Paragraph("Sentiment Analysis", heading_style)) story.extend(_create_sentiment_section(total, dist_counts, styles)) # Key Stories story.append(Paragraph("Key Stories", heading_style)) story.extend(_create_stories_section(results, styles)) # Keywords keywords = results.get('keywords') or [] if keywords: story.append(Paragraph("Key Topics and Themes", heading_style)) story.extend(_create_keywords_section(keywords, styles)) # Sources story.append(Paragraph("News Sources", heading_style)) story.extend(_create_sources_section(results, styles)) # Methodology story.append(Paragraph("Methodology", heading_style)) story.append(Paragraph(_create_methodology_section(results, total, proc_time), styles['Normal'])) doc.build(story) buffer.seek(0) return buffer def _create_executive_summary(query: str, total: int, avg_sentiment: float, dist_counts: Dict[str, int]) -> str: try: if total == 0: return f"No articles were available to analyze for “{query}”." label = "positive" if avg_sentiment > 0.1 else "negative" if avg_sentiment < -0.1 else "neutral" pos = dist_counts.get("Positive", 0) neg = dist_counts.get("Negative", 0) neu = dist_counts.get("Neutral", 0) pct_pos = _safe_div(pos, total) * 100.0 pct_neg = _safe_div(neg, total) * 100.0 pct_neu = _safe_div(neu, total) * 100.0 summary = ( f"This report analyzes {total} news articles related to “{query}”. " f"The overall sentiment reveals a {label} tone with an average sentiment score of {avg_sentiment:.3f}. " f"The analysis shows {pos} positive articles ({pct_pos:.1f}%), " f"{neg} negative articles ({pct_neg:.1f}%), and {neu} neutral articles ({pct_neu:.1f}%). " ) if avg_sentiment > 0.2: summary += "Predominantly positive coverage suggests favorable market conditions or public perception." elif avg_sentiment < -0.2: summary += "Predominantly negative coverage indicates concerns or challenges that may require attention." else: summary += "Balanced coverage suggests a mixed outlook with both opportunities and challenges." return summary except Exception as e: logger.exception(f"Executive summary creation failed: {e}") return "Analysis completed successfully with comprehensive sentiment evaluation across multiple news sources." def _create_sentiment_section(total: int, dist_counts: Dict[str, int], styles) -> List[Any]: story: List[Any] = [] try: pos = dist_counts.get("Positive", 0) neg = dist_counts.get("Negative", 0) neu = dist_counts.get("Neutral", 0) data = [ ['Sentiment', 'Count', 'Percentage'], ['Positive', str(pos), f"{_safe_div(pos, total) * 100:.1f}%"], ['Negative', str(neg), f"{_safe_div(neg, total) * 100:.1f}%"], ['Neutral', str(neu), f"{_safe_div(neu, total) * 100:.1f}%"], ] table = Table(data) table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#2E86AB')), ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke), ('ALIGN', (0, 0), (-1, -1), 'CENTER'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('FONTSIZE', (0, 0), (-1, 0), 12), ('BOTTOMPADDING', (0, 0), (-1, 0), 10), ('BACKGROUND', (0, 1), (-1, -1), colors.beige), ('GRID', (0, 0), (-1, -1), 1, colors.black), ])) story.append(table) story.append(Spacer(1, 0.2 * inch)) explanation = ( "Sentiment analysis was performed using multiple models including VADER, " "Loughran–McDonald (financial), and FinBERT. Scores range from -1.0 (most negative) " "to +1.0 (most positive), with -0.1 to +0.1 considered neutral." ) story.append(Paragraph(explanation, styles['Normal'])) story.append(Spacer(1, 0.1 * inch)) except Exception as e: logger.exception(f"Sentiment section creation failed: {e}") story.append(Paragraph("Sentiment analysis data unavailable.", styles['Normal'])) return story def _create_stories_section(results: Dict[str, Any], styles) -> List[Any]: story: List[Any] = [] try: articles = results.get('articles', []) or [] if not articles: story.append(Paragraph("No articles available for analysis.", styles['Normal'])) return story # Sort by compound sentiment sorted_by_pos = sorted(articles, key=lambda x: (x.get('sentiment') or {}).get('compound', 0.0), reverse=True) sorted_by_neg = sorted(articles, key=lambda x: (x.get('sentiment') or {}).get('compound', 0.0)) # Most positive if sorted_by_pos and (sorted_by_pos[0].get('sentiment') or {}).get('compound', 0.0) > 0.1: a = sorted_by_pos[0] story.append(Paragraph("Most Positive Coverage:", styles['Heading3'])) story.append(Paragraph(f"Title: {a.get('title','N/A')}", styles['Normal'])) story.append(Paragraph(f"Source: {a.get('source','N/A')}", styles['Normal'])) story.append(Paragraph(f"Sentiment Score: {(a.get('sentiment') or {}).get('compound', 0.0):.3f}", styles['Normal'])) if a.get('summary'): story.append(Paragraph(f"Summary: {a['summary'][:300]}{'...' if len(a['summary'])>300 else ''}", styles['Normal'])) story.append(Spacer(1, 0.15 * inch)) # Most negative if sorted_by_neg and (sorted_by_neg[0].get('sentiment') or {}).get('compound', 0.0) < -0.1: a = sorted_by_neg[0] story.append(Paragraph("Most Negative Coverage:", styles['Heading3'])) story.append(Paragraph(f"Title: {a.get('title','N/A')}", styles['Normal'])) story.append(Paragraph(f"Source: {a.get('source','N/A')}", styles['Normal'])) story.append(Paragraph(f"Sentiment Score: {(a.get('sentiment') or {}).get('compound', 0.0):.3f}", styles['Normal'])) if a.get('summary'): story.append(Paragraph(f"Summary: {a['summary'][:300]}{'...' if len(a['summary'])>300 else ''}", styles['Normal'])) # Latest coverage (if dates are present) recent = [a for a in articles if a.get('date')] if recent: try: recent.sort(key=lambda x: x.get('date'), reverse=True) r = recent[0] story.append(Spacer(1, 0.15 * inch)) story.append(Paragraph("Most Recent Coverage:", styles['Heading3'])) story.append(Paragraph(f"Title: {r.get('title','N/A')}", styles['Normal'])) story.append(Paragraph(f"Source: {r.get('source','N/A')}", styles['Normal'])) story.append(Paragraph(f"Date: {r.get('date')}", styles['Normal'])) story.append(Paragraph(f"Sentiment Score: {(r.get('sentiment') or {}).get('compound', 0.0):.3f}", styles['Normal'])) except Exception: pass except Exception as e: logger.exception(f"Stories section creation failed: {e}") story.append(Paragraph("Story analysis data unavailable.", styles['Normal'])) return story def _create_keywords_section(keywords: List[Dict[str, Any]], styles) -> List[Any]: story: List[Any] = [] try: top = keywords[:15] if not top: story.append(Paragraph("No keywords extracted.", styles['Normal'])) return story data = [['Keyword', 'Score', 'Category']] for kw in top: score = kw.get('score', 0.0) relevance = kw.get('relevance', 'medium') data.append([kw.get('keyword', 'N/A'), f"{score:.3f}", str(relevance).title()]) table = Table(data) table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#2E86AB')), ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke), ('ALIGN', (0, 0), (-1, -1), 'LEFT'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('FONTSIZE', (0, 0), (-1, 0), 10), ('BOTTOMPADDING', (0, 0), (-1, 0), 10), ('BACKGROUND', (0, 1), (-1, -1), colors.beige), ('GRID', (0, 0), (-1, -1), 1, colors.black), ])) story.append(table) story.append(Spacer(1, 0.15 * inch)) expl = ("Keywords were extracted using the YAKE algorithm, which identifies relevant terms and phrases " "based on statistical features of the text corpus.") story.append(Paragraph(expl, styles['Normal'])) except Exception as e: logger.exception(f"Keywords section creation failed: {e}") story.append(Paragraph("Keyword analysis data unavailable.", styles['Normal'])) return story def _create_sources_section(results: Dict[str, Any], styles) -> List[Any]: story: List[Any] = [] try: articles = results.get('articles', []) or [] if not articles: story.append(Paragraph("No source data available.", styles['Normal'])) return story # Count sources counts: Dict[str, int] = {} for a in articles: src = a.get('source', 'Unknown') counts[src] = counts.get(src, 0) + 1 total = len(articles) data = [['News Source', 'Article Count', 'Percentage']] for src, ct in sorted(counts.items(), key=lambda x: x[1], reverse=True): data.append([src, str(ct), f"{_safe_div(ct, total) * 100:.1f}%"]) table = Table(data) table.setStyle(TableStyle([ ('BACKGROUND', (0, 0), (-1, 0), colors.HexColor('#2E86AB')), ('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke), ('ALIGN', (0, 0), (-1, -1), 'LEFT'), ('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), ('FONTSIZE', (0, 0), (-1, 0), 10), ('BOTTOMPADDING', (0, 0), (-1, 0), 10), ('BACKGROUND', (0, 1), (-1, -1), colors.beige), ('GRID', (0, 0), (-1, -1), 1, colors.black), ])) story.append(table) story.append(Spacer(1, 0.15 * inch)) expl = (f"Articles were collected from {len(counts)} different sources, providing diverse perspectives. " "Source diversity helps ensure comprehensive coverage and reduces bias.") story.append(Paragraph(expl, styles['Normal'])) except Exception as e: logger.exception(f"Sources section creation failed: {e}") story.append(Paragraph("Source analysis data unavailable.", styles['Normal'])) return story def _create_methodology_section(results: Dict[str, Any], total: int, proc_time: float) -> str: meth = ( "This analysis employed a comprehensive NLP pipeline:\n\n" "1. Data Collection: Articles were gathered from multiple RSS/business feeds. " "Content was filtered for relevance and deduplicated.\n\n" "2. Sentiment Analysis: VADER (general), Loughran–McDonald (finance), and FinBERT (finance) were combined. " "Final scores reflect a weighted composite.\n\n" "3. Summarization & Keywords: Articles were cleaned and summarized (transformer models when available), " "and key themes extracted with YAKE.\n\n" "4. Quality Controls: English-only filtering, minimum length checks, and relevance filters.\n\n" ) try: meth += f"Processed {total} articles in {proc_time:.2f} seconds." except Exception: pass return meth # ------------------------------- # FPDF fallback # ------------------------------- def _generate_simple_pdf_fallback(results: Dict[str, Any]) -> io.BytesIO: total, dist_counts, avg = _norm_dist_from_results(results) query = results.get('query', 'N/A') pdf = FPDF() pdf.add_page() pdf.set_font('Arial', 'B', 16) pdf.cell(0, 10, 'News Analysis Report', ln=True) pdf.ln(5) pdf.set_font('Arial', '', 12) pdf.cell(0, 8, f"Query: {query}", ln=True) pdf.cell(0, 8, f"Articles: {total}", ln=True) pdf.cell(0, 8, f"Average Sentiment: {avg:.3f}", ln=True) pdf.ln(5) pos, neg, neu = dist_counts.get("Positive", 0), dist_counts.get("Negative", 0), dist_counts.get("Neutral", 0) pdf.cell(0, 8, "Sentiment Distribution:", ln=True) pdf.cell(0, 8, f" Positive: {pos} ({_safe_div(pos, total)*100:.1f}%)", ln=True) pdf.cell(0, 8, f" Negative: {neg} ({_safe_div(neg, total)*100:.1f}%)", ln=True) pdf.cell(0, 8, f" Neutral: {neu} ({_safe_div(neu, total)*100:.1f}%)", ln=True) buf = io.BytesIO() pdf_bytes = pdf.output(dest='S').encode('latin1') buf.write(pdf_bytes) buf.seek(0) return buf # ------------------------------- # Optional chart image (base64) # ------------------------------- def create_chart_image(data: Dict, chart_type: str = 'pie') -> Optional[str]: if not MATPLOTLIB_AVAILABLE: return None try: plt.figure(figsize=(6, 4)) if chart_type == 'pie': # Support both shapes total, dist_counts, _ = _norm_dist_from_results(data if 'articles' in data else {'summary': {'distribution': data}}) labels = ['Positive', 'Negative', 'Neutral'] sizes = [ dist_counts.get('Positive', 0), dist_counts.get('Negative', 0), dist_counts.get('Neutral', 0), ] plt.pie(sizes, labels=labels, autopct='%1.1f%%', startangle=90) plt.title('Sentiment Distribution') elif chart_type == 'bar' and 'articles' in data: sources: Dict[str, int] = {} for a in data.get('articles', []): s = a.get('source', 'Unknown') sources[s] = sources.get(s, 0) + 1 top = dict(sorted(sources.items(), key=lambda x: x[1], reverse=True)[:10]) plt.bar(range(len(top)), list(top.values())) plt.xticks(range(len(top)), list(top.keys()), rotation=45, ha='right') plt.title('Articles by Source') plt.ylabel('Count') plt.tight_layout() buf = io.BytesIO() plt.savefig(buf, format='png', dpi=150, bbox_inches='tight') buf.seek(0) img64 = base64.b64encode(buf.getvalue()).decode() plt.close() return img64 except Exception as e: logger.exception(f"Chart creation failed: {e}") return None # ------------------------------- # CSV / JSON helpers (unchanged public API) # ------------------------------- def generate_csv_report(results: Dict[str, Any]) -> str: try: import csv import io as _io out = _io.StringIO() w = csv.writer(out) w.writerow(['Title', 'Source', 'URL', 'Date', 'Sentiment_Score', 'Sentiment_Label', 'VADER_Score', 'LM_Score', 'FinBERT_Score', 'Summary']) for a in results.get('articles', []): s = a.get('sentiment', {}) or {} compound = s.get('compound', 0.0) if compound > 0.1: label = 'Positive' elif compound < -0.1: label = 'Negative' else: label = 'Neutral' w.writerow([ a.get('title', ''), a.get('source', ''), a.get('url', ''), a.get('date', ''), compound, label, s.get('vader', ''), s.get('loughran_mcdonald', ''), s.get('finbert', ''), (a.get('summary', '')[:200] + '...') if len(a.get('summary', '') or '') > 200 else a.get('summary', '') ]) return out.getvalue() except Exception as e: logger.exception(f"CSV generation failed: {e}") return "Error generating CSV report" def generate_json_report(results: Dict[str, Any]) -> str: try: import json meta = { 'report_generated': datetime.now().isoformat(), 'query': results.get('query', ''), 'languages': results.get('languages', ['English']), } total, dist_counts, avg = _norm_dist_from_results(results) summary = { 'total_articles': total, 'average_sentiment': avg, 'sentiment_distribution': dist_counts, 'top_sources': _get_top_sources(results), } report = { 'metadata': meta, 'summary': summary, 'articles': results.get('articles', []), 'keywords': (results.get('keywords', []) or [])[:20], 'analysis_methods': { 'sentiment_models': ['VADER', 'Loughran-McDonald', 'FinBERT'], 'summarization_model': 'BART/DistilBART/T5 (when available)', 'keyword_extraction': 'YAKE', 'translation_models': ['Helsinki-NLP Opus-MT'] } } return json.dumps(report, indent=2, default=str, ensure_ascii=False) except Exception as e: logger.exception(f"JSON generation failed: {e}") try: import json return json.dumps({'error': str(e)}, indent=2) except Exception: return '{"error":"JSON generation failed"}' def _get_top_sources(results: Dict[str, Any]) -> List[Dict[str, Any]]: try: arts = results.get('articles', []) or [] total = len(arts) counts: Dict[str, int] = {} for a in arts: src = a.get('source', 'Unknown') counts[src] = counts.get(src, 0) + 1 items = [ {'source': s, 'count': c, 'percentage': round(_safe_div(c, total) * 100.0, 1)} for s, c in counts.items() ] return sorted(items, key=lambda x: x['count'], reverse=True)[:10] except Exception as e: logger.exception(f"Top sources calculation failed: {e}") return [] def validate_report_data(results: Dict[str, Any]) -> bool: """ Validate that results contain required data for reporting. We’re lenient now: require 'articles' and 'query'. """ if 'query' not in results or 'articles' not in results: logger.error("Missing required keys: 'query' and/or 'articles'") return False if not isinstance(results['articles'], list) or len(results['articles']) == 0: logger.error("No articles available for reporting") return False return True __all__ = [ 'generate_pdf_report', 'generate_csv_report', 'generate_json_report', 'create_chart_image', 'validate_report_data', ]