import re import string import logging from typing import Dict, List, Any, Optional import pandas as pd import numpy as np from collections import Counter # NLTK imports import nltk try: from nltk.sentiment import SentimentIntensityAnalyzer from nltk.corpus import stopwords from nltk.tokenize import word_tokenize, sent_tokenize from nltk.stem import PorterStemmer except ImportError: pass # Download required NLTK data try: nltk.download('vader_lexicon', quiet=True) nltk.download('punkt', quiet=True) nltk.download('stopwords', quiet=True) except: pass # Transformers for FinBERT try: from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline import torch except ImportError: pass # YAKE for keyword extraction try: import yake except ImportError: pass logger = logging.getLogger(__name__) class SentimentAnalyzer: """Multi-model sentiment analysis""" def __init__(self): self.vader_analyzer = None self.finbert_pipeline = None self.loughran_mcdonald_dict = None self._initialize_models() logger.info("SentimentAnalyzer initialized") def _initialize_models(self): """Initialize all sentiment analysis models""" # VADER try: self.vader_analyzer = SentimentIntensityAnalyzer() logger.info("VADER model loaded") except Exception as e: logger.error(f"Failed to load VADER: {str(e)}") # FinBERT try: model_name = "ProsusAI/finbert" self.finbert_pipeline = pipeline( "sentiment-analysis", model=model_name, tokenizer=model_name, device=0 if torch.cuda.is_available() else -1 ) logger.info("FinBERT model loaded") except Exception as e: logger.warning(f"Failed to load FinBERT, using CPU fallback: {str(e)}") try: model_name = "ProsusAI/finbert" self.finbert_pipeline = pipeline( "sentiment-analysis", model=model_name, tokenizer=model_name, device=-1 ) logger.info("FinBERT model loaded on CPU") except Exception as e2: logger.error(f"Failed to load FinBERT completely: {str(e2)}") # Loughran-McDonald Dictionary try: self.loughran_mcdonald_dict = self._load_loughran_mcdonald() logger.info("Loughran-McDonald dictionary loaded") except Exception as e: logger.error(f"Failed to load Loughran-McDonald dictionary: {str(e)}") def _load_loughran_mcdonald(self) -> Dict[str, List[str]]: """Load Loughran-McDonald financial sentiment dictionary""" # Simplified version with key financial sentiment words return { 'positive': [ 'profit', 'profitable', 'profitability', 'revenue', 'revenues', 'growth', 'growing', 'increase', 'increased', 'increasing', 'success', 'successful', 'gain', 'gains', 'benefit', 'benefits', 'improvement', 'improved', 'strong', 'stronger', 'excellent', 'outstanding', 'exceed', 'exceeded', 'exceeds', 'beat', 'beats', 'positive', 'optimistic', 'bullish', 'rise', 'rising', 'surge', 'surged', 'boom', 'booming', 'expand', 'expansion', 'opportunity', 'opportunities', 'advance', 'advances', 'achievement', 'achieve', 'winner' ], 'negative': [ 'loss', 'losses', 'lose', 'losing', 'decline', 'declining', 'decrease', 'decreased', 'decreasing', 'fall', 'falling', 'drop', 'dropped', 'plunge', 'plunged', 'crash', 'crashed', 'failure', 'failed', 'weak', 'weakness', 'poor', 'worse', 'worst', 'bad', 'terrible', 'crisis', 'problem', 'problems', 'risk', 'risks', 'risky', 'concern', 'concerns', 'worried', 'worry', 'negative', 'pessimistic', 'bearish', 'bankruptcy', 'bankrupt', 'deficit', 'debt', 'lawsuit', 'sue', 'sued', 'investigation', 'fraud', 'scandal', 'volatility', 'volatile', 'uncertainty', 'uncertain', 'challenge', 'challenges' ] } def analyze_sentiment(self, text: str, models: List[str] = None) -> Dict[str, Any]: """Analyze sentiment using multiple models""" if models is None: models = ['VADER', 'Loughran-McDonald', 'FinBERT'] results = {} # Clean text cleaned_text = self._clean_text(text) # VADER Analysis if 'VADER' in models and self.vader_analyzer: try: vader_scores = self.vader_analyzer.polarity_scores(cleaned_text) results['vader'] = vader_scores['compound'] results['vader_detailed'] = vader_scores except Exception as e: logger.error(f"VADER analysis failed: {str(e)}") results['vader'] = 0.0 # Loughran-McDonald Analysis if 'Loughran-McDonald' in models and self.loughran_mcdonald_dict: try: lm_score = self._analyze_loughran_mcdonald(cleaned_text) results['loughran_mcdonald'] = lm_score except Exception as e: logger.error(f"Loughran-McDonald analysis failed: {str(e)}") results['loughran_mcdonald'] = 0.0 # FinBERT Analysis if 'FinBERT' in models and self.finbert_pipeline: try: # Truncate text for FinBERT (max 512 tokens) truncated_text = cleaned_text[:2000] # Approximate token limit finbert_result = self.finbert_pipeline(truncated_text)[0] # Convert to numerical score label = finbert_result['label'].lower() confidence = finbert_result['score'] if label == 'positive': finbert_score = confidence elif label == 'negative': finbert_score = -confidence else: # neutral finbert_score = 0.0 results['finbert'] = finbert_score results['finbert_detailed'] = finbert_result except Exception as e: logger.error(f"FinBERT analysis failed: {str(e)}") results['finbert'] = 0.0 # Calculate composite score scores = [] weights = {'vader': 0.3, 'loughran_mcdonald': 0.4, 'finbert': 0.3} for model in ['vader', 'loughran_mcdonald', 'finbert']: if model in results: scores.append(results[model] * weights[model]) results['compound'] = sum(scores) if scores else 0.0 return results def _clean_text(self, text: str) -> str: """Clean text for sentiment analysis""" if not text: return "" # Remove URLs text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text) # Remove email addresses text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '', text) # Remove extra whitespace text = re.sub(r'\s+', ' ', text) # Remove special characters but keep basic punctuation text = re.sub(r'[^\w\s.,!?;:\-\'"()]', '', text) return text.strip() def _analyze_loughran_mcdonald(self, text: str) -> float: """Analyze sentiment using Loughran-McDonald dictionary""" try: words = word_tokenize(text.lower()) positive_count = sum(1 for word in words if word in self.loughran_mcdonald_dict['positive']) negative_count = sum(1 for word in words if word in self.loughran_mcdonald_dict['negative']) total_sentiment_words = positive_count + negative_count if total_sentiment_words == 0: return 0.0 # Calculate normalized score score = (positive_count - negative_count) / len(words) * 10 # Scale factor # Clamp to [-1, 1] range return max(-1.0, min(1.0, score)) except Exception as e: logger.error(f"Loughran-McDonald calculation error: {str(e)}") return 0.0 class KeywordExtractor: """Extract important keywords from text using YAKE""" def __init__(self): self.stop_words = set() try: self.stop_words = set(stopwords.words('english')) except: # Fallback stop words self.stop_words = { 'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by', 'is', 'are', 'was', 'were', 'be', 'been', 'have', 'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should', 'may', 'might', 'must', 'can', 'this', 'that', 'these', 'those' } logger.info("KeywordExtractor initialized") def extract_keywords(self, text: str, num_keywords: int = 20) -> List[Dict[str, Any]]: """Extract keywords using YAKE algorithm""" try: # Use YAKE if available if 'yake' in globals(): return self._extract_with_yake(text, num_keywords) else: return self._extract_with_frequency(text, num_keywords) except Exception as e: logger.error(f"Keyword extraction failed: {str(e)}") return [] def _extract_with_yake(self, text: str, num_keywords: int) -> List[Dict[str, Any]]: """Extract keywords using YAKE algorithm""" try: # YAKE configuration kw_extractor = yake.KeywordExtractor( lan="en", n=3, # n-gram size dedupLim=0.9, top=num_keywords, features=None ) keywords = kw_extractor.extract_keywords(text) # Convert to desired format (lower score = more relevant in YAKE) result = [] for keyword, score in keywords: result.append({ 'keyword': keyword, 'score': 1.0 / (1.0 + score), # Invert score so higher = more relevant 'relevance': 'high' if score < 0.1 else 'medium' if score < 0.3 else 'low' }) return result except Exception as e: logger.error(f"YAKE extraction failed: {str(e)}") return self._extract_with_frequency(text, num_keywords) def _extract_with_frequency(self, text: str, num_keywords: int) -> List[Dict[str, Any]]: """Fallback keyword extraction using frequency analysis""" try: # Clean and tokenize words = word_tokenize(text.lower()) # Filter words filtered_words = [ word for word in words if (word not in self.stop_words and word not in string.punctuation and len(word) > 2 and word.isalpha()) ] # Count frequencies word_freq = Counter(filtered_words) # Get top keywords top_words = word_freq.most_common(num_keywords) # Calculate relevance scores max_freq = top_words[0][1] if top_words else 1 result = [] for word, freq in top_words: score = freq / max_freq result.append({ 'keyword': word, 'score': score, 'relevance': 'high' if score > 0.7 else 'medium' if score > 0.3 else 'low' }) return result except Exception as e: logger.error(f"Frequency extraction failed: {str(e)}") return [] class TextProcessor: """Text preprocessing and cleaning utilities""" def __init__(self): self.stemmer = PorterStemmer() logger.info("TextProcessor initialized") def clean_article_content(self, content: str) -> str: """Clean article content by removing boilerplate""" if not content: return "" # Remove common boilerplate patterns boilerplate_patterns = [ r'Subscribe to our newsletter.*', r'Sign up for.*', r'Follow us on.*', r'Copyright.*', r'All rights reserved.*', r'Terms of use.*', r'Privacy policy.*', r'Cookie policy.*', r'\d+ comments?', r'Share this article.*', r'Related articles?.*', r'More from.*', r'Advertisement.*', r'Sponsored content.*' ] cleaned_content = content for pattern in boilerplate_patterns: cleaned_content = re.sub(pattern, '', cleaned_content, flags=re.IGNORECASE) # Remove extra whitespace cleaned_content = re.sub(r'\s+', ' ', cleaned_content) # Remove very short sentences (likely navigation/boilerplate) sentences = sent_tokenize(cleaned_content) meaningful_sentences = [ sent for sent in sentences if len(sent.split()) > 5 and not self._is_boilerplate_sentence(sent) ] return ' '.join(meaningful_sentences).strip() def _is_boilerplate_sentence(self, sentence: str) -> bool: """Check if sentence is likely boilerplate""" boilerplate_indicators = [ 'click here', 'read more', 'subscribe', 'follow us', 'contact us', 'terms of service', 'privacy policy', 'copyright', 'all rights reserved', 'advertisement', 'sponsored', 'related articles' ] sentence_lower = sentence.lower() return any(indicator in sentence_lower for indicator in boilerplate_indicators) def extract_entities(self, text: str) -> Dict[str, List[str]]: """Extract named entities (companies, people, locations)""" # Simple regex-based entity extraction entities = { 'companies': [], 'people': [], 'locations': [], 'money': [], 'dates': [] } try: # Company patterns (simplified) company_pattern = r'\b[A-Z][a-zA-Z]+ (?:Inc|Corp|LLC|Ltd|Company|Co)\b' entities['companies'] = list(set(re.findall(company_pattern, text))) # Money patterns money_pattern = r'\$[\d,]+(?:\.\d{2})?(?:\s?(?:million|billion|trillion|k|M|B|T))?' entities['money'] = list(set(re.findall(money_pattern, text))) # Date patterns (simplified) date_pattern = r'\b(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}' entities['dates'] = list(set(re.findall(date_pattern, text))) except Exception as e: logger.error(f"Entity extraction failed: {str(e)}") return entities def calculate_readability(self, text: str) -> Dict[str, float]: """Calculate text readability metrics""" try: sentences = sent_tokenize(text) words = word_tokenize(text) if not sentences or not words: return {'flesch_score': 0.0, 'avg_sentence_length': 0.0, 'avg_word_length': 0.0} # Basic metrics num_sentences = len(sentences) num_words = len(words) num_syllables = sum(self._count_syllables(word) for word in words if word.isalpha()) # Average sentence length avg_sentence_length = num_words / num_sentences # Average word length avg_word_length = sum(len(word) for word in words if word.isalpha()) / num_words # Flesch Reading Ease Score (simplified) flesch_score = 206.835 - (1.015 * avg_sentence_length) - (84.6 * (num_syllables / num_words)) return { 'flesch_score': max(0.0, min(100.0, flesch_score)), 'avg_sentence_length': avg_sentence_length, 'avg_word_length': avg_word_length } except Exception as e: logger.error(f"Readability calculation failed: {str(e)}") return {'flesch_score': 0.0, 'avg_sentence_length': 0.0, 'avg_word_length': 0.0} def _count_syllables(self, word: str) -> int: """Count syllables in a word (simplified)""" word = word.lower() vowels = 'aeiouy' syllable_count = 0 prev_char_was_vowel = False for char in word: if char in vowels: if not prev_char_was_vowel: syllable_count += 1 prev_char_was_vowel = True else: prev_char_was_vowel = False # Handle silent e if word.endswith('e'): syllable_count -= 1 # Every word has at least one syllable return max(1, syllable_count)