|
import re |
|
import string |
|
import logging |
|
from typing import Dict, List, Any, Optional |
|
import pandas as pd |
|
import numpy as np |
|
from collections import Counter |
|
|
|
|
|
import nltk |
|
try: |
|
from nltk.sentiment import SentimentIntensityAnalyzer |
|
from nltk.corpus import stopwords |
|
from nltk.tokenize import word_tokenize, sent_tokenize |
|
from nltk.stem import PorterStemmer |
|
except ImportError: |
|
pass |
|
|
|
|
|
try: |
|
nltk.download('vader_lexicon', quiet=True) |
|
nltk.download('punkt', quiet=True) |
|
nltk.download('stopwords', quiet=True) |
|
except: |
|
pass |
|
|
|
|
|
try: |
|
from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline |
|
import torch |
|
except ImportError: |
|
pass |
|
|
|
|
|
try: |
|
import yake |
|
except ImportError: |
|
pass |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class SentimentAnalyzer: |
|
"""Multi-model sentiment analysis""" |
|
|
|
def __init__(self): |
|
self.vader_analyzer = None |
|
self.finbert_pipeline = None |
|
self.loughran_mcdonald_dict = None |
|
|
|
self._initialize_models() |
|
logger.info("SentimentAnalyzer initialized") |
|
|
|
def _initialize_models(self): |
|
"""Initialize all sentiment analysis models""" |
|
|
|
try: |
|
self.vader_analyzer = SentimentIntensityAnalyzer() |
|
logger.info("VADER model loaded") |
|
except Exception as e: |
|
logger.error(f"Failed to load VADER: {str(e)}") |
|
|
|
|
|
try: |
|
model_name = "ProsusAI/finbert" |
|
self.finbert_pipeline = pipeline( |
|
"sentiment-analysis", |
|
model=model_name, |
|
tokenizer=model_name, |
|
device=0 if torch.cuda.is_available() else -1 |
|
) |
|
logger.info("FinBERT model loaded") |
|
except Exception as e: |
|
logger.warning(f"Failed to load FinBERT, using CPU fallback: {str(e)}") |
|
try: |
|
model_name = "ProsusAI/finbert" |
|
self.finbert_pipeline = pipeline( |
|
"sentiment-analysis", |
|
model=model_name, |
|
tokenizer=model_name, |
|
device=-1 |
|
) |
|
logger.info("FinBERT model loaded on CPU") |
|
except Exception as e2: |
|
logger.error(f"Failed to load FinBERT completely: {str(e2)}") |
|
|
|
|
|
try: |
|
self.loughran_mcdonald_dict = self._load_loughran_mcdonald() |
|
logger.info("Loughran-McDonald dictionary loaded") |
|
except Exception as e: |
|
logger.error(f"Failed to load Loughran-McDonald dictionary: {str(e)}") |
|
|
|
def _load_loughran_mcdonald(self) -> Dict[str, List[str]]: |
|
"""Load Loughran-McDonald financial sentiment dictionary""" |
|
|
|
return { |
|
'positive': [ |
|
'profit', 'profitable', 'profitability', 'revenue', 'revenues', 'growth', |
|
'growing', 'increase', 'increased', 'increasing', 'success', 'successful', |
|
'gain', 'gains', 'benefit', 'benefits', 'improvement', 'improved', 'strong', |
|
'stronger', 'excellent', 'outstanding', 'exceed', 'exceeded', 'exceeds', |
|
'beat', 'beats', 'positive', 'optimistic', 'bullish', 'rise', 'rising', |
|
'surge', 'surged', 'boom', 'booming', 'expand', 'expansion', 'opportunity', |
|
'opportunities', 'advance', 'advances', 'achievement', 'achieve', 'winner' |
|
], |
|
'negative': [ |
|
'loss', 'losses', 'lose', 'losing', 'decline', 'declining', 'decrease', |
|
'decreased', 'decreasing', 'fall', 'falling', 'drop', 'dropped', 'plunge', |
|
'plunged', 'crash', 'crashed', 'failure', 'failed', 'weak', 'weakness', |
|
'poor', 'worse', 'worst', 'bad', 'terrible', 'crisis', 'problem', 'problems', |
|
'risk', 'risks', 'risky', 'concern', 'concerns', 'worried', 'worry', |
|
'negative', 'pessimistic', 'bearish', 'bankruptcy', 'bankrupt', 'deficit', |
|
'debt', 'lawsuit', 'sue', 'sued', 'investigation', 'fraud', 'scandal', |
|
'volatility', 'volatile', 'uncertainty', 'uncertain', 'challenge', 'challenges' |
|
] |
|
} |
|
|
|
def analyze_sentiment(self, text: str, models: List[str] = None) -> Dict[str, Any]: |
|
"""Analyze sentiment using multiple models""" |
|
if models is None: |
|
models = ['VADER', 'Loughran-McDonald', 'FinBERT'] |
|
|
|
results = {} |
|
|
|
|
|
cleaned_text = self._clean_text(text) |
|
|
|
|
|
if 'VADER' in models and self.vader_analyzer: |
|
try: |
|
vader_scores = self.vader_analyzer.polarity_scores(cleaned_text) |
|
results['vader'] = vader_scores['compound'] |
|
results['vader_detailed'] = vader_scores |
|
except Exception as e: |
|
logger.error(f"VADER analysis failed: {str(e)}") |
|
results['vader'] = 0.0 |
|
|
|
|
|
if 'Loughran-McDonald' in models and self.loughran_mcdonald_dict: |
|
try: |
|
lm_score = self._analyze_loughran_mcdonald(cleaned_text) |
|
results['loughran_mcdonald'] = lm_score |
|
except Exception as e: |
|
logger.error(f"Loughran-McDonald analysis failed: {str(e)}") |
|
results['loughran_mcdonald'] = 0.0 |
|
|
|
|
|
if 'FinBERT' in models and self.finbert_pipeline: |
|
try: |
|
|
|
truncated_text = cleaned_text[:2000] |
|
finbert_result = self.finbert_pipeline(truncated_text)[0] |
|
|
|
|
|
label = finbert_result['label'].lower() |
|
confidence = finbert_result['score'] |
|
|
|
if label == 'positive': |
|
finbert_score = confidence |
|
elif label == 'negative': |
|
finbert_score = -confidence |
|
else: |
|
finbert_score = 0.0 |
|
|
|
results['finbert'] = finbert_score |
|
results['finbert_detailed'] = finbert_result |
|
|
|
except Exception as e: |
|
logger.error(f"FinBERT analysis failed: {str(e)}") |
|
results['finbert'] = 0.0 |
|
|
|
|
|
scores = [] |
|
weights = {'vader': 0.3, 'loughran_mcdonald': 0.4, 'finbert': 0.3} |
|
|
|
for model in ['vader', 'loughran_mcdonald', 'finbert']: |
|
if model in results: |
|
scores.append(results[model] * weights[model]) |
|
|
|
results['compound'] = sum(scores) if scores else 0.0 |
|
|
|
return results |
|
|
|
def _clean_text(self, text: str) -> str: |
|
"""Clean text for sentiment analysis""" |
|
if not text: |
|
return "" |
|
|
|
|
|
text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text) |
|
|
|
|
|
text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '', text) |
|
|
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
|
|
|
text = re.sub(r'[^\w\s.,!?;:\-\'"()]', '', text) |
|
|
|
return text.strip() |
|
|
|
def _analyze_loughran_mcdonald(self, text: str) -> float: |
|
"""Analyze sentiment using Loughran-McDonald dictionary""" |
|
try: |
|
words = word_tokenize(text.lower()) |
|
|
|
positive_count = sum(1 for word in words if word in self.loughran_mcdonald_dict['positive']) |
|
negative_count = sum(1 for word in words if word in self.loughran_mcdonald_dict['negative']) |
|
|
|
total_sentiment_words = positive_count + negative_count |
|
|
|
if total_sentiment_words == 0: |
|
return 0.0 |
|
|
|
|
|
score = (positive_count - negative_count) / len(words) * 10 |
|
|
|
|
|
return max(-1.0, min(1.0, score)) |
|
|
|
except Exception as e: |
|
logger.error(f"Loughran-McDonald calculation error: {str(e)}") |
|
return 0.0 |
|
|
|
class KeywordExtractor: |
|
"""Extract important keywords from text using YAKE""" |
|
|
|
def __init__(self): |
|
self.stop_words = set() |
|
try: |
|
self.stop_words = set(stopwords.words('english')) |
|
except: |
|
|
|
self.stop_words = { |
|
'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', |
|
'of', 'with', 'by', 'is', 'are', 'was', 'were', 'be', 'been', 'have', |
|
'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should', |
|
'may', 'might', 'must', 'can', 'this', 'that', 'these', 'those' |
|
} |
|
|
|
logger.info("KeywordExtractor initialized") |
|
|
|
def extract_keywords(self, text: str, num_keywords: int = 20) -> List[Dict[str, Any]]: |
|
"""Extract keywords using YAKE algorithm""" |
|
try: |
|
|
|
if 'yake' in globals(): |
|
return self._extract_with_yake(text, num_keywords) |
|
else: |
|
return self._extract_with_frequency(text, num_keywords) |
|
|
|
except Exception as e: |
|
logger.error(f"Keyword extraction failed: {str(e)}") |
|
return [] |
|
|
|
def _extract_with_yake(self, text: str, num_keywords: int) -> List[Dict[str, Any]]: |
|
"""Extract keywords using YAKE algorithm""" |
|
try: |
|
|
|
kw_extractor = yake.KeywordExtractor( |
|
lan="en", |
|
n=3, |
|
dedupLim=0.9, |
|
top=num_keywords, |
|
features=None |
|
) |
|
|
|
keywords = kw_extractor.extract_keywords(text) |
|
|
|
|
|
result = [] |
|
for keyword, score in keywords: |
|
result.append({ |
|
'keyword': keyword, |
|
'score': 1.0 / (1.0 + score), |
|
'relevance': 'high' if score < 0.1 else 'medium' if score < 0.3 else 'low' |
|
}) |
|
|
|
return result |
|
|
|
except Exception as e: |
|
logger.error(f"YAKE extraction failed: {str(e)}") |
|
return self._extract_with_frequency(text, num_keywords) |
|
|
|
def _extract_with_frequency(self, text: str, num_keywords: int) -> List[Dict[str, Any]]: |
|
"""Fallback keyword extraction using frequency analysis""" |
|
try: |
|
|
|
words = word_tokenize(text.lower()) |
|
|
|
|
|
filtered_words = [ |
|
word for word in words |
|
if (word not in self.stop_words and |
|
word not in string.punctuation and |
|
len(word) > 2 and |
|
word.isalpha()) |
|
] |
|
|
|
|
|
word_freq = Counter(filtered_words) |
|
|
|
|
|
top_words = word_freq.most_common(num_keywords) |
|
|
|
|
|
max_freq = top_words[0][1] if top_words else 1 |
|
|
|
result = [] |
|
for word, freq in top_words: |
|
score = freq / max_freq |
|
result.append({ |
|
'keyword': word, |
|
'score': score, |
|
'relevance': 'high' if score > 0.7 else 'medium' if score > 0.3 else 'low' |
|
}) |
|
|
|
return result |
|
|
|
except Exception as e: |
|
logger.error(f"Frequency extraction failed: {str(e)}") |
|
return [] |
|
|
|
class TextProcessor: |
|
"""Text preprocessing and cleaning utilities""" |
|
|
|
def __init__(self): |
|
self.stemmer = PorterStemmer() |
|
logger.info("TextProcessor initialized") |
|
|
|
def clean_article_content(self, content: str) -> str: |
|
"""Clean article content by removing boilerplate""" |
|
if not content: |
|
return "" |
|
|
|
|
|
boilerplate_patterns = [ |
|
r'Subscribe to our newsletter.*', |
|
r'Sign up for.*', |
|
r'Follow us on.*', |
|
r'Copyright.*', |
|
r'All rights reserved.*', |
|
r'Terms of use.*', |
|
r'Privacy policy.*', |
|
r'Cookie policy.*', |
|
r'\d+ comments?', |
|
r'Share this article.*', |
|
r'Related articles?.*', |
|
r'More from.*', |
|
r'Advertisement.*', |
|
r'Sponsored content.*' |
|
] |
|
|
|
cleaned_content = content |
|
for pattern in boilerplate_patterns: |
|
cleaned_content = re.sub(pattern, '', cleaned_content, flags=re.IGNORECASE) |
|
|
|
|
|
cleaned_content = re.sub(r'\s+', ' ', cleaned_content) |
|
|
|
|
|
sentences = sent_tokenize(cleaned_content) |
|
meaningful_sentences = [ |
|
sent for sent in sentences |
|
if len(sent.split()) > 5 and not self._is_boilerplate_sentence(sent) |
|
] |
|
|
|
return ' '.join(meaningful_sentences).strip() |
|
|
|
def _is_boilerplate_sentence(self, sentence: str) -> bool: |
|
"""Check if sentence is likely boilerplate""" |
|
boilerplate_indicators = [ |
|
'click here', 'read more', 'subscribe', 'follow us', 'contact us', |
|
'terms of service', 'privacy policy', 'copyright', 'all rights reserved', |
|
'advertisement', 'sponsored', 'related articles' |
|
] |
|
|
|
sentence_lower = sentence.lower() |
|
return any(indicator in sentence_lower for indicator in boilerplate_indicators) |
|
|
|
def extract_entities(self, text: str) -> Dict[str, List[str]]: |
|
"""Extract named entities (companies, people, locations)""" |
|
|
|
entities = { |
|
'companies': [], |
|
'people': [], |
|
'locations': [], |
|
'money': [], |
|
'dates': [] |
|
} |
|
|
|
try: |
|
|
|
company_pattern = r'\b[A-Z][a-zA-Z]+ (?:Inc|Corp|LLC|Ltd|Company|Co)\b' |
|
entities['companies'] = list(set(re.findall(company_pattern, text))) |
|
|
|
|
|
money_pattern = r'\$[\d,]+(?:\.\d{2})?(?:\s?(?:million|billion|trillion|k|M|B|T))?' |
|
entities['money'] = list(set(re.findall(money_pattern, text))) |
|
|
|
|
|
date_pattern = r'\b(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}' |
|
entities['dates'] = list(set(re.findall(date_pattern, text))) |
|
|
|
except Exception as e: |
|
logger.error(f"Entity extraction failed: {str(e)}") |
|
|
|
return entities |
|
|
|
def calculate_readability(self, text: str) -> Dict[str, float]: |
|
"""Calculate text readability metrics""" |
|
try: |
|
sentences = sent_tokenize(text) |
|
words = word_tokenize(text) |
|
|
|
if not sentences or not words: |
|
return {'flesch_score': 0.0, 'avg_sentence_length': 0.0, 'avg_word_length': 0.0} |
|
|
|
|
|
num_sentences = len(sentences) |
|
num_words = len(words) |
|
num_syllables = sum(self._count_syllables(word) for word in words if word.isalpha()) |
|
|
|
|
|
avg_sentence_length = num_words / num_sentences |
|
|
|
|
|
avg_word_length = sum(len(word) for word in words if word.isalpha()) / num_words |
|
|
|
|
|
flesch_score = 206.835 - (1.015 * avg_sentence_length) - (84.6 * (num_syllables / num_words)) |
|
|
|
return { |
|
'flesch_score': max(0.0, min(100.0, flesch_score)), |
|
'avg_sentence_length': avg_sentence_length, |
|
'avg_word_length': avg_word_length |
|
} |
|
|
|
except Exception as e: |
|
logger.error(f"Readability calculation failed: {str(e)}") |
|
return {'flesch_score': 0.0, 'avg_sentence_length': 0.0, 'avg_word_length': 0.0} |
|
|
|
def _count_syllables(self, word: str) -> int: |
|
"""Count syllables in a word (simplified)""" |
|
word = word.lower() |
|
vowels = 'aeiouy' |
|
syllable_count = 0 |
|
prev_char_was_vowel = False |
|
|
|
for char in word: |
|
if char in vowels: |
|
if not prev_char_was_vowel: |
|
syllable_count += 1 |
|
prev_char_was_vowel = True |
|
else: |
|
prev_char_was_vowel = False |
|
|
|
|
|
if word.endswith('e'): |
|
syllable_count -= 1 |
|
|
|
|
|
return max(1, syllable_count) |