|
import logging |
|
import os |
|
import json |
|
import pickle |
|
import hashlib |
|
from datetime import datetime, timedelta |
|
from typing import Dict, Any, Optional, List |
|
import tempfile |
|
import sys |
|
|
|
def setup_logging(): |
|
"""Setup logging configuration""" |
|
logging.basicConfig( |
|
level=logging.INFO, |
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', |
|
handlers=[ |
|
logging.StreamHandler(sys.stdout), |
|
logging.FileHandler('news_analyzer.log') |
|
] |
|
) |
|
|
|
|
|
logging.getLogger("transformers").setLevel(logging.WARNING) |
|
logging.getLogger("urllib3").setLevel(logging.WARNING) |
|
logging.getLogger("requests").setLevel(logging.WARNING) |
|
|
|
def load_config() -> Dict[str, Any]: |
|
"""Load application configuration""" |
|
default_config = { |
|
'max_articles': 50, |
|
'cache_ttl_hours': 6, |
|
'supported_languages': ['English', 'Hindi', 'Tamil'], |
|
'sentiment_models': ['VADER', 'Loughran-McDonald', 'FinBERT'], |
|
'summarization_max_length': 150, |
|
'summarization_min_length': 50, |
|
'audio_enabled': True, |
|
'translation_enabled': True, |
|
'keyword_extraction_enabled': True, |
|
'max_keywords': 20, |
|
'debug_mode': False |
|
} |
|
|
|
|
|
config_file = 'config.json' |
|
if os.path.exists(config_file): |
|
try: |
|
with open(config_file, 'r') as f: |
|
file_config = json.load(f) |
|
default_config.update(file_config) |
|
except Exception as e: |
|
logging.error(f"Failed to load config file: {str(e)}") |
|
|
|
return default_config |
|
|
|
class CacheManager: |
|
"""Simple file-based caching system""" |
|
|
|
def __init__(self, cache_dir: str = None): |
|
self.cache_dir = cache_dir or tempfile.mkdtemp(prefix='news_cache_') |
|
self.ensure_cache_dir() |
|
|
|
logging.info(f"Cache manager initialized with directory: {self.cache_dir}") |
|
|
|
def ensure_cache_dir(self): |
|
"""Ensure cache directory exists""" |
|
try: |
|
os.makedirs(self.cache_dir, exist_ok=True) |
|
except Exception as e: |
|
logging.error(f"Failed to create cache directory: {str(e)}") |
|
|
|
def _get_cache_key(self, key: str) -> str: |
|
"""Generate a safe cache key""" |
|
return hashlib.md5(key.encode()).hexdigest() |
|
|
|
def get(self, key: str, ttl_hours: int = 6) -> Optional[Any]: |
|
"""Get item from cache""" |
|
try: |
|
cache_key = self._get_cache_key(key) |
|
cache_file = os.path.join(self.cache_dir, f"{cache_key}.pkl") |
|
|
|
if not os.path.exists(cache_file): |
|
return None |
|
|
|
|
|
file_age = datetime.now().timestamp() - os.path.getmtime(cache_file) |
|
if file_age > ttl_hours * 3600: |
|
try: |
|
os.remove(cache_file) |
|
except: |
|
pass |
|
return None |
|
|
|
|
|
with open(cache_file, 'rb') as f: |
|
data = pickle.load(f) |
|
|
|
logging.debug(f"Cache hit for key: {key[:50]}...") |
|
return data |
|
|
|
except Exception as e: |
|
logging.error(f"Cache get failed for key {key}: {str(e)}") |
|
return None |
|
|
|
def set(self, key: str, value: Any) -> bool: |
|
"""Set item in cache""" |
|
try: |
|
cache_key = self._get_cache_key(key) |
|
cache_file = os.path.join(self.cache_dir, f"{cache_key}.pkl") |
|
|
|
with open(cache_file, 'wb') as f: |
|
pickle.dump(value, f) |
|
|
|
logging.debug(f"Cache set for key: {key[:50]}...") |
|
return True |
|
|
|
except Exception as e: |
|
logging.error(f"Cache set failed for key {key}: {str(e)}") |
|
return False |
|
|
|
def clear_expired(self, ttl_hours: int = 24): |
|
"""Clear expired cache entries""" |
|
try: |
|
current_time = datetime.now().timestamp() |
|
max_age = ttl_hours * 3600 |
|
cleared_count = 0 |
|
|
|
for filename in os.listdir(self.cache_dir): |
|
if filename.endswith('.pkl'): |
|
filepath = os.path.join(self.cache_dir, filename) |
|
file_age = current_time - os.path.getmtime(filepath) |
|
|
|
if file_age > max_age: |
|
try: |
|
os.remove(filepath) |
|
cleared_count += 1 |
|
except Exception as e: |
|
logging.error(f"Failed to remove cache file {filepath}: {str(e)}") |
|
|
|
if cleared_count > 0: |
|
logging.info(f"Cleared {cleared_count} expired cache entries") |
|
|
|
except Exception as e: |
|
logging.error(f"Cache cleanup failed: {str(e)}") |
|
|
|
|
|
cache_manager = CacheManager() |
|
|
|
def cache_results(func): |
|
"""Decorator for caching function results""" |
|
def wrapper(*args, **kwargs): |
|
|
|
cache_key = f"{func.__name__}_{str(args)}_{str(kwargs)}" |
|
|
|
|
|
cached_result = cache_manager.get(cache_key) |
|
if cached_result is not None: |
|
return cached_result |
|
|
|
|
|
result = func(*args, **kwargs) |
|
cache_manager.set(cache_key, result) |
|
|
|
return result |
|
|
|
return wrapper |
|
|
|
def validate_input(text: str, min_length: int = 10, max_length: int = 10000) -> bool: |
|
"""Validate input text""" |
|
if not text or not isinstance(text, str): |
|
return False |
|
|
|
text = text.strip() |
|
if len(text) < min_length or len(text) > max_length: |
|
return False |
|
|
|
return True |
|
|
|
def sanitize_filename(filename: str) -> str: |
|
"""Sanitize filename for safe file system usage""" |
|
import re |
|
|
|
|
|
sanitized = re.sub(r'[<>:"/\\|?*]', '_', filename) |
|
|
|
|
|
sanitized = re.sub(r'\s+', '_', sanitized) |
|
sanitized = re.sub(r'\.+', '.', sanitized) |
|
|
|
|
|
if len(sanitized) > 200: |
|
sanitized = sanitized[:200] |
|
|
|
return sanitized |
|
|
|
def format_datetime(dt: datetime = None) -> str: |
|
"""Format datetime for display""" |
|
if dt is None: |
|
dt = datetime.now() |
|
|
|
return dt.strftime("%Y-%m-%d %H:%M:%S") |
|
|
|
def calculate_processing_stats(start_time: datetime, num_articles: int) -> Dict[str, Any]: |
|
"""Calculate processing statistics""" |
|
end_time = datetime.now() |
|
processing_time = (end_time - start_time).total_seconds() |
|
|
|
return { |
|
'start_time': format_datetime(start_time), |
|
'end_time': format_datetime(end_time), |
|
'processing_time_seconds': processing_time, |
|
'processing_time_formatted': f"{processing_time:.2f} seconds", |
|
'articles_processed': num_articles, |
|
'articles_per_second': round(num_articles / processing_time, 2) if processing_time > 0 else 0 |
|
} |
|
|
|
def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 100) -> List[str]: |
|
"""Split text into overlapping chunks""" |
|
if len(text) <= chunk_size: |
|
return [text] |
|
|
|
chunks = [] |
|
start = 0 |
|
|
|
while start < len(text): |
|
end = start + chunk_size |
|
|
|
|
|
if end < len(text): |
|
|
|
last_part = text[end-100:end] |
|
sentence_end = max( |
|
last_part.rfind('.'), |
|
last_part.rfind('!'), |
|
last_part.rfind('?') |
|
) |
|
|
|
if sentence_end != -1: |
|
end = end - 100 + sentence_end + 1 |
|
|
|
chunks.append(text[start:end].strip()) |
|
start = end - overlap |
|
|
|
return [chunk for chunk in chunks if chunk.strip()] |
|
|
|
def extract_domain(url: str) -> str: |
|
"""Extract domain from URL""" |
|
try: |
|
from urllib.parse import urlparse |
|
parsed = urlparse(url) |
|
return parsed.netloc.replace('www.', '') |
|
except Exception: |
|
return 'unknown' |
|
|
|
def safe_divide(a: float, b: float, default: float = 0.0) -> float: |
|
"""Safely divide two numbers""" |
|
try: |
|
return a / b if b != 0 else default |
|
except (TypeError, ZeroDivisionError): |
|
return default |
|
|
|
def truncate_text(text: str, max_length: int = 100, suffix: str = "...") -> str: |
|
"""Truncate text to specified length""" |
|
if not text or len(text) <= max_length: |
|
return text |
|
|
|
return text[:max_length - len(suffix)] + suffix |
|
|
|
def get_file_size_mb(filepath: str) -> float: |
|
"""Get file size in MB""" |
|
try: |
|
size_bytes = os.path.getsize(filepath) |
|
return round(size_bytes / (1024 * 1024), 2) |
|
except Exception: |
|
return 0.0 |
|
|
|
def ensure_directory(directory: str): |
|
"""Ensure directory exists""" |
|
try: |
|
os.makedirs(directory, exist_ok=True) |
|
except Exception as e: |
|
logging.error(f"Failed to create directory {directory}: {str(e)}") |
|
|
|
def load_json_file(filepath: str) -> Optional[Dict]: |
|
"""Load JSON file safely""" |
|
try: |
|
with open(filepath, 'r', encoding='utf-8') as f: |
|
return json.load(f) |
|
except Exception as e: |
|
logging.error(f"Failed to load JSON file {filepath}: {str(e)}") |
|
return None |
|
|
|
def save_json_file(data: Dict, filepath: str) -> bool: |
|
"""Save data to JSON file safely""" |
|
try: |
|
ensure_directory(os.path.dirname(filepath)) |
|
with open(filepath, 'w', encoding='utf-8') as f: |
|
json.dump(data, f, indent=2, default=str) |
|
return True |
|
except Exception as e: |
|
logging.error(f"Failed to save JSON file {filepath}: {str(e)}") |
|
return False |
|
|
|
def merge_dictionaries(*dicts) -> Dict: |
|
"""Merge multiple dictionaries""" |
|
result = {} |
|
for d in dicts: |
|
if isinstance(d, dict): |
|
result.update(d) |
|
return result |
|
|
|
def get_system_info() -> Dict[str, Any]: |
|
"""Get basic system information""" |
|
import platform |
|
import psutil |
|
|
|
try: |
|
return { |
|
'platform': platform.platform(), |
|
'python_version': platform.python_version(), |
|
'cpu_count': os.cpu_count(), |
|
'memory_gb': round(psutil.virtual_memory().total / (1024**3), 2), |
|
'available_memory_gb': round(psutil.virtual_memory().available / (1024**3), 2), |
|
'disk_space_gb': round(psutil.disk_usage('/').total / (1024**3), 2) |
|
} |
|
except Exception as e: |
|
logging.error(f"Failed to get system info: {str(e)}") |
|
return {'error': str(e)} |
|
|
|
def format_number(num: float, precision: int = 2) -> str: |
|
"""Format number for display""" |
|
try: |
|
if abs(num) >= 1_000_000: |
|
return f"{num / 1_000_000:.{precision}f}M" |
|
elif abs(num) >= 1_000: |
|
return f"{num / 1_000:.{precision}f}K" |
|
else: |
|
return f"{num:.{precision}f}" |
|
except Exception: |
|
return str(num) |
|
|
|
def calculate_sentiment_distribution(articles: List[Dict]) -> Dict[str, Any]: |
|
"""Calculate sentiment distribution statistics""" |
|
try: |
|
if not articles: |
|
return {'positive': 0, 'negative': 0, 'neutral': 0, 'total': 0} |
|
|
|
sentiments = [] |
|
for article in articles: |
|
sentiment = article.get('sentiment', {}) |
|
compound = sentiment.get('compound', 0) |
|
sentiments.append(compound) |
|
|
|
positive_count = sum(1 for s in sentiments if s > 0.1) |
|
negative_count = sum(1 for s in sentiments if s < -0.1) |
|
neutral_count = len(sentiments) - positive_count - negative_count |
|
|
|
avg_sentiment = sum(sentiments) / len(sentiments) if sentiments else 0 |
|
|
|
return { |
|
'positive': positive_count, |
|
'negative': negative_count, |
|
'neutral': neutral_count, |
|
'total': len(articles), |
|
'average_sentiment': round(avg_sentiment, 3), |
|
'positive_percentage': round((positive_count / len(articles)) * 100, 1), |
|
'negative_percentage': round((negative_count / len(articles)) * 100, 1), |
|
'neutral_percentage': round((neutral_count / len(articles)) * 100, 1) |
|
} |
|
|
|
except Exception as e: |
|
logging.error(f"Sentiment distribution calculation failed: {str(e)}") |
|
return {'positive': 0, 'negative': 0, 'neutral': 0, 'total': 0} |
|
|
|
def create_progress_callback(progress_container=None): |
|
"""Create a progress callback function for Streamlit""" |
|
def callback(progress: int, status: str): |
|
if progress_container: |
|
try: |
|
progress_container.progress(progress) |
|
if hasattr(progress_container, 'text'): |
|
progress_container.text(status) |
|
except Exception as e: |
|
logging.error(f"Progress callback error: {str(e)}") |
|
else: |
|
logging.info(f"Progress: {progress}% - {status}") |
|
|
|
return callback |
|
|
|
def validate_url(url: str) -> bool: |
|
"""Validate if string is a valid URL""" |
|
import re |
|
|
|
url_pattern = re.compile( |
|
r'^https?://' |
|
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' |
|
r'localhost|' |
|
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' |
|
r'(?::\d+)?' |
|
r'(?:/?|[/?]\S+)', |
|
re.IGNORECASE |
|
) |
|
|
|
return url_pattern.match(url) is not None |
|
|
|
class PerformanceTimer: |
|
"""Context manager for timing operations""" |
|
|
|
def __init__(self, operation_name: str = "Operation"): |
|
self.operation_name = operation_name |
|
self.start_time = None |
|
self.end_time = None |
|
|
|
def __enter__(self): |
|
self.start_time = datetime.now() |
|
logging.info(f"Starting {self.operation_name}") |
|
return self |
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb): |
|
self.end_time = datetime.now() |
|
duration = (self.end_time - self.start_time).total_seconds() |
|
logging.info(f"Completed {self.operation_name} in {duration:.2f} seconds") |
|
|
|
@property |
|
def duration(self) -> float: |
|
if self.start_time and self.end_time: |
|
return (self.end_time - self.start_time).total_seconds() |
|
return 0.0 |
|
|
|
def retry_operation(func, max_attempts: int = 3, delay: float = 1.0): |
|
"""Retry an operation with exponential backoff""" |
|
import time |
|
|
|
for attempt in range(max_attempts): |
|
try: |
|
return func() |
|
except Exception as e: |
|
if attempt == max_attempts - 1: |
|
raise e |
|
|
|
wait_time = delay * (2 ** attempt) |
|
logging.warning(f"Attempt {attempt + 1} failed: {str(e)}. Retrying in {wait_time} seconds...") |
|
time.sleep(wait_time) |
|
|
|
return None |