Spaces:
Sleeping
Sleeping
import re | |
import string | |
import logging | |
from typing import Dict, List, Any, Optional | |
import pandas as pd | |
import numpy as np | |
from collections import Counter | |
# NLTK imports | |
import nltk | |
try: | |
from nltk.sentiment import SentimentIntensityAnalyzer | |
from nltk.corpus import stopwords | |
from nltk.tokenize import word_tokenize, sent_tokenize | |
from nltk.stem import PorterStemmer | |
except ImportError: | |
pass | |
# Download required NLTK data | |
try: | |
nltk.download('vader_lexicon', quiet=True) | |
nltk.download('punkt', quiet=True) | |
nltk.download('stopwords', quiet=True) | |
except: | |
pass | |
# Transformers for FinBERT | |
try: | |
from transformers import AutoTokenizer, AutoModelForSequenceClassification, pipeline | |
import torch | |
except ImportError: | |
pass | |
# YAKE for keyword extraction | |
try: | |
import yake | |
except ImportError: | |
pass | |
logger = logging.getLogger(__name__) | |
class SentimentAnalyzer: | |
"""Multi-model sentiment analysis""" | |
def __init__(self): | |
self.vader_analyzer = None | |
self.finbert_pipeline = None | |
self.loughran_mcdonald_dict = None | |
self._initialize_models() | |
logger.info("SentimentAnalyzer initialized") | |
def _initialize_models(self): | |
"""Initialize all sentiment analysis models""" | |
# VADER | |
try: | |
self.vader_analyzer = SentimentIntensityAnalyzer() | |
logger.info("VADER model loaded") | |
except Exception as e: | |
logger.error(f"Failed to load VADER: {str(e)}") | |
# FinBERT | |
try: | |
model_name = "ProsusAI/finbert" | |
self.finbert_pipeline = pipeline( | |
"sentiment-analysis", | |
model=model_name, | |
tokenizer=model_name, | |
device=0 if torch.cuda.is_available() else -1 | |
) | |
logger.info("FinBERT model loaded") | |
except Exception as e: | |
logger.warning(f"Failed to load FinBERT, using CPU fallback: {str(e)}") | |
try: | |
model_name = "ProsusAI/finbert" | |
self.finbert_pipeline = pipeline( | |
"sentiment-analysis", | |
model=model_name, | |
tokenizer=model_name, | |
device=-1 | |
) | |
logger.info("FinBERT model loaded on CPU") | |
except Exception as e2: | |
logger.error(f"Failed to load FinBERT completely: {str(e2)}") | |
# Loughran-McDonald Dictionary | |
try: | |
self.loughran_mcdonald_dict = self._load_loughran_mcdonald() | |
logger.info("Loughran-McDonald dictionary loaded") | |
except Exception as e: | |
logger.error(f"Failed to load Loughran-McDonald dictionary: {str(e)}") | |
def _load_loughran_mcdonald(self) -> Dict[str, List[str]]: | |
"""Load Loughran-McDonald financial sentiment dictionary""" | |
# Simplified version with key financial sentiment words | |
return { | |
'positive': [ | |
'profit', 'profitable', 'profitability', 'revenue', 'revenues', 'growth', | |
'growing', 'increase', 'increased', 'increasing', 'success', 'successful', | |
'gain', 'gains', 'benefit', 'benefits', 'improvement', 'improved', 'strong', | |
'stronger', 'excellent', 'outstanding', 'exceed', 'exceeded', 'exceeds', | |
'beat', 'beats', 'positive', 'optimistic', 'bullish', 'rise', 'rising', | |
'surge', 'surged', 'boom', 'booming', 'expand', 'expansion', 'opportunity', | |
'opportunities', 'advance', 'advances', 'achievement', 'achieve', 'winner' | |
], | |
'negative': [ | |
'loss', 'losses', 'lose', 'losing', 'decline', 'declining', 'decrease', | |
'decreased', 'decreasing', 'fall', 'falling', 'drop', 'dropped', 'plunge', | |
'plunged', 'crash', 'crashed', 'failure', 'failed', 'weak', 'weakness', | |
'poor', 'worse', 'worst', 'bad', 'terrible', 'crisis', 'problem', 'problems', | |
'risk', 'risks', 'risky', 'concern', 'concerns', 'worried', 'worry', | |
'negative', 'pessimistic', 'bearish', 'bankruptcy', 'bankrupt', 'deficit', | |
'debt', 'lawsuit', 'sue', 'sued', 'investigation', 'fraud', 'scandal', | |
'volatility', 'volatile', 'uncertainty', 'uncertain', 'challenge', 'challenges' | |
] | |
} | |
def analyze_sentiment(self, text: str, models: List[str] = None) -> Dict[str, Any]: | |
"""Analyze sentiment using multiple models""" | |
if models is None: | |
models = ['VADER', 'Loughran-McDonald', 'FinBERT'] | |
results = {} | |
# Clean text | |
cleaned_text = self._clean_text(text) | |
# VADER Analysis | |
if 'VADER' in models and self.vader_analyzer: | |
try: | |
vader_scores = self.vader_analyzer.polarity_scores(cleaned_text) | |
results['vader'] = vader_scores['compound'] | |
results['vader_detailed'] = vader_scores | |
except Exception as e: | |
logger.error(f"VADER analysis failed: {str(e)}") | |
results['vader'] = 0.0 | |
# Loughran-McDonald Analysis | |
if 'Loughran-McDonald' in models and self.loughran_mcdonald_dict: | |
try: | |
lm_score = self._analyze_loughran_mcdonald(cleaned_text) | |
results['loughran_mcdonald'] = lm_score | |
except Exception as e: | |
logger.error(f"Loughran-McDonald analysis failed: {str(e)}") | |
results['loughran_mcdonald'] = 0.0 | |
# FinBERT Analysis | |
if 'FinBERT' in models and self.finbert_pipeline: | |
try: | |
# Truncate text for FinBERT (max 512 tokens) | |
truncated_text = cleaned_text[:2000] # Approximate token limit | |
finbert_result = self.finbert_pipeline(truncated_text)[0] | |
# Convert to numerical score | |
label = finbert_result['label'].lower() | |
confidence = finbert_result['score'] | |
if label == 'positive': | |
finbert_score = confidence | |
elif label == 'negative': | |
finbert_score = -confidence | |
else: # neutral | |
finbert_score = 0.0 | |
results['finbert'] = finbert_score | |
results['finbert_detailed'] = finbert_result | |
except Exception as e: | |
logger.error(f"FinBERT analysis failed: {str(e)}") | |
results['finbert'] = 0.0 | |
# Calculate composite score | |
scores = [] | |
weights = {'vader': 0.3, 'loughran_mcdonald': 0.4, 'finbert': 0.3} | |
for model in ['vader', 'loughran_mcdonald', 'finbert']: | |
if model in results: | |
scores.append(results[model] * weights[model]) | |
results['compound'] = sum(scores) if scores else 0.0 | |
return results | |
def _clean_text(self, text: str) -> str: | |
"""Clean text for sentiment analysis""" | |
if not text: | |
return "" | |
# Remove URLs | |
text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text) | |
# Remove email addresses | |
text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '', text) | |
# Remove extra whitespace | |
text = re.sub(r'\s+', ' ', text) | |
# Remove special characters but keep basic punctuation | |
text = re.sub(r'[^\w\s.,!?;:\-\'"()]', '', text) | |
return text.strip() | |
def _analyze_loughran_mcdonald(self, text: str) -> float: | |
"""Analyze sentiment using Loughran-McDonald dictionary""" | |
try: | |
words = word_tokenize(text.lower()) | |
positive_count = sum(1 for word in words if word in self.loughran_mcdonald_dict['positive']) | |
negative_count = sum(1 for word in words if word in self.loughran_mcdonald_dict['negative']) | |
total_sentiment_words = positive_count + negative_count | |
if total_sentiment_words == 0: | |
return 0.0 | |
# Calculate normalized score | |
score = (positive_count - negative_count) / len(words) * 10 # Scale factor | |
# Clamp to [-1, 1] range | |
return max(-1.0, min(1.0, score)) | |
except Exception as e: | |
logger.error(f"Loughran-McDonald calculation error: {str(e)}") | |
return 0.0 | |
class KeywordExtractor: | |
"""Extract important keywords from text using YAKE""" | |
def __init__(self): | |
self.stop_words = set() | |
try: | |
self.stop_words = set(stopwords.words('english')) | |
except: | |
# Fallback stop words | |
self.stop_words = { | |
'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', | |
'of', 'with', 'by', 'is', 'are', 'was', 'were', 'be', 'been', 'have', | |
'has', 'had', 'do', 'does', 'did', 'will', 'would', 'could', 'should', | |
'may', 'might', 'must', 'can', 'this', 'that', 'these', 'those' | |
} | |
logger.info("KeywordExtractor initialized") | |
def extract_keywords(self, text: str, num_keywords: int = 20) -> List[Dict[str, Any]]: | |
"""Extract keywords using YAKE algorithm""" | |
try: | |
# Use YAKE if available | |
if 'yake' in globals(): | |
return self._extract_with_yake(text, num_keywords) | |
else: | |
return self._extract_with_frequency(text, num_keywords) | |
except Exception as e: | |
logger.error(f"Keyword extraction failed: {str(e)}") | |
return [] | |
def _extract_with_yake(self, text: str, num_keywords: int) -> List[Dict[str, Any]]: | |
"""Extract keywords using YAKE algorithm""" | |
try: | |
# YAKE configuration | |
kw_extractor = yake.KeywordExtractor( | |
lan="en", | |
n=3, # n-gram size | |
dedupLim=0.9, | |
top=num_keywords, | |
features=None | |
) | |
keywords = kw_extractor.extract_keywords(text) | |
# Convert to desired format (lower score = more relevant in YAKE) | |
result = [] | |
for keyword, score in keywords: | |
result.append({ | |
'keyword': keyword, | |
'score': 1.0 / (1.0 + score), # Invert score so higher = more relevant | |
'relevance': 'high' if score < 0.1 else 'medium' if score < 0.3 else 'low' | |
}) | |
return result | |
except Exception as e: | |
logger.error(f"YAKE extraction failed: {str(e)}") | |
return self._extract_with_frequency(text, num_keywords) | |
def _extract_with_frequency(self, text: str, num_keywords: int) -> List[Dict[str, Any]]: | |
"""Fallback keyword extraction using frequency analysis""" | |
try: | |
# Clean and tokenize | |
words = word_tokenize(text.lower()) | |
# Filter words | |
filtered_words = [ | |
word for word in words | |
if (word not in self.stop_words and | |
word not in string.punctuation and | |
len(word) > 2 and | |
word.isalpha()) | |
] | |
# Count frequencies | |
word_freq = Counter(filtered_words) | |
# Get top keywords | |
top_words = word_freq.most_common(num_keywords) | |
# Calculate relevance scores | |
max_freq = top_words[0][1] if top_words else 1 | |
result = [] | |
for word, freq in top_words: | |
score = freq / max_freq | |
result.append({ | |
'keyword': word, | |
'score': score, | |
'relevance': 'high' if score > 0.7 else 'medium' if score > 0.3 else 'low' | |
}) | |
return result | |
except Exception as e: | |
logger.error(f"Frequency extraction failed: {str(e)}") | |
return [] | |
class TextProcessor: | |
"""Text preprocessing and cleaning utilities""" | |
def __init__(self): | |
self.stemmer = PorterStemmer() | |
logger.info("TextProcessor initialized") | |
def clean_article_content(self, content: str) -> str: | |
"""Clean article content by removing boilerplate""" | |
if not content: | |
return "" | |
# Remove common boilerplate patterns | |
boilerplate_patterns = [ | |
r'Subscribe to our newsletter.*', | |
r'Sign up for.*', | |
r'Follow us on.*', | |
r'Copyright.*', | |
r'All rights reserved.*', | |
r'Terms of use.*', | |
r'Privacy policy.*', | |
r'Cookie policy.*', | |
r'\d+ comments?', | |
r'Share this article.*', | |
r'Related articles?.*', | |
r'More from.*', | |
r'Advertisement.*', | |
r'Sponsored content.*' | |
] | |
cleaned_content = content | |
for pattern in boilerplate_patterns: | |
cleaned_content = re.sub(pattern, '', cleaned_content, flags=re.IGNORECASE) | |
# Remove extra whitespace | |
cleaned_content = re.sub(r'\s+', ' ', cleaned_content) | |
# Remove very short sentences (likely navigation/boilerplate) | |
sentences = sent_tokenize(cleaned_content) | |
meaningful_sentences = [ | |
sent for sent in sentences | |
if len(sent.split()) > 5 and not self._is_boilerplate_sentence(sent) | |
] | |
return ' '.join(meaningful_sentences).strip() | |
def _is_boilerplate_sentence(self, sentence: str) -> bool: | |
"""Check if sentence is likely boilerplate""" | |
boilerplate_indicators = [ | |
'click here', 'read more', 'subscribe', 'follow us', 'contact us', | |
'terms of service', 'privacy policy', 'copyright', 'all rights reserved', | |
'advertisement', 'sponsored', 'related articles' | |
] | |
sentence_lower = sentence.lower() | |
return any(indicator in sentence_lower for indicator in boilerplate_indicators) | |
def extract_entities(self, text: str) -> Dict[str, List[str]]: | |
"""Extract named entities (companies, people, locations)""" | |
# Simple regex-based entity extraction | |
entities = { | |
'companies': [], | |
'people': [], | |
'locations': [], | |
'money': [], | |
'dates': [] | |
} | |
try: | |
# Company patterns (simplified) | |
company_pattern = r'\b[A-Z][a-zA-Z]+ (?:Inc|Corp|LLC|Ltd|Company|Co)\b' | |
entities['companies'] = list(set(re.findall(company_pattern, text))) | |
# Money patterns | |
money_pattern = r'\$[\d,]+(?:\.\d{2})?(?:\s?(?:million|billion|trillion|k|M|B|T))?' | |
entities['money'] = list(set(re.findall(money_pattern, text))) | |
# Date patterns (simplified) | |
date_pattern = r'\b(?:January|February|March|April|May|June|July|August|September|October|November|December)\s+\d{1,2},?\s+\d{4}' | |
entities['dates'] = list(set(re.findall(date_pattern, text))) | |
except Exception as e: | |
logger.error(f"Entity extraction failed: {str(e)}") | |
return entities | |
def calculate_readability(self, text: str) -> Dict[str, float]: | |
"""Calculate text readability metrics""" | |
try: | |
sentences = sent_tokenize(text) | |
words = word_tokenize(text) | |
if not sentences or not words: | |
return {'flesch_score': 0.0, 'avg_sentence_length': 0.0, 'avg_word_length': 0.0} | |
# Basic metrics | |
num_sentences = len(sentences) | |
num_words = len(words) | |
num_syllables = sum(self._count_syllables(word) for word in words if word.isalpha()) | |
# Average sentence length | |
avg_sentence_length = num_words / num_sentences | |
# Average word length | |
avg_word_length = sum(len(word) for word in words if word.isalpha()) / num_words | |
# Flesch Reading Ease Score (simplified) | |
flesch_score = 206.835 - (1.015 * avg_sentence_length) - (84.6 * (num_syllables / num_words)) | |
return { | |
'flesch_score': max(0.0, min(100.0, flesch_score)), | |
'avg_sentence_length': avg_sentence_length, | |
'avg_word_length': avg_word_length | |
} | |
except Exception as e: | |
logger.error(f"Readability calculation failed: {str(e)}") | |
return {'flesch_score': 0.0, 'avg_sentence_length': 0.0, 'avg_word_length': 0.0} | |
def _count_syllables(self, word: str) -> int: | |
"""Count syllables in a word (simplified)""" | |
word = word.lower() | |
vowels = 'aeiouy' | |
syllable_count = 0 | |
prev_char_was_vowel = False | |
for char in word: | |
if char in vowels: | |
if not prev_char_was_vowel: | |
syllable_count += 1 | |
prev_char_was_vowel = True | |
else: | |
prev_char_was_vowel = False | |
# Handle silent e | |
if word.endswith('e'): | |
syllable_count -= 1 | |
# Every word has at least one syllable | |
return max(1, syllable_count) |