import logging import os import json import pickle import hashlib from datetime import datetime, timedelta from typing import Dict, Any, Optional, List import tempfile import sys def setup_logging(): """Setup logging configuration""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler('news_analyzer.log') ] ) # Reduce noise from transformers and other libraries logging.getLogger("transformers").setLevel(logging.WARNING) logging.getLogger("urllib3").setLevel(logging.WARNING) logging.getLogger("requests").setLevel(logging.WARNING) def load_config() -> Dict[str, Any]: """Load application configuration""" default_config = { 'max_articles': 50, 'cache_ttl_hours': 6, 'supported_languages': ['English', 'Hindi', 'Tamil'], 'sentiment_models': ['VADER', 'Loughran-McDonald', 'FinBERT'], 'summarization_max_length': 150, 'summarization_min_length': 50, 'audio_enabled': True, 'translation_enabled': True, 'keyword_extraction_enabled': True, 'max_keywords': 20, 'debug_mode': False } # Try to load config from file if it exists config_file = 'config.json' if os.path.exists(config_file): try: with open(config_file, 'r') as f: file_config = json.load(f) default_config.update(file_config) except Exception as e: logging.error(f"Failed to load config file: {str(e)}") return default_config class CacheManager: """Simple file-based caching system""" def __init__(self, cache_dir: str = None): self.cache_dir = cache_dir or tempfile.mkdtemp(prefix='news_cache_') self.ensure_cache_dir() logging.info(f"Cache manager initialized with directory: {self.cache_dir}") def ensure_cache_dir(self): """Ensure cache directory exists""" try: os.makedirs(self.cache_dir, exist_ok=True) except Exception as e: logging.error(f"Failed to create cache directory: {str(e)}") def _get_cache_key(self, key: str) -> str: """Generate a safe cache key""" return hashlib.md5(key.encode()).hexdigest() def get(self, key: str, ttl_hours: int = 6) -> Optional[Any]: """Get item from cache""" try: cache_key = self._get_cache_key(key) cache_file = os.path.join(self.cache_dir, f"{cache_key}.pkl") if not os.path.exists(cache_file): return None # Check if cache is expired file_age = datetime.now().timestamp() - os.path.getmtime(cache_file) if file_age > ttl_hours * 3600: try: os.remove(cache_file) except: pass return None # Load cached data with open(cache_file, 'rb') as f: data = pickle.load(f) logging.debug(f"Cache hit for key: {key[:50]}...") return data except Exception as e: logging.error(f"Cache get failed for key {key}: {str(e)}") return None def set(self, key: str, value: Any) -> bool: """Set item in cache""" try: cache_key = self._get_cache_key(key) cache_file = os.path.join(self.cache_dir, f"{cache_key}.pkl") with open(cache_file, 'wb') as f: pickle.dump(value, f) logging.debug(f"Cache set for key: {key[:50]}...") return True except Exception as e: logging.error(f"Cache set failed for key {key}: {str(e)}") return False def clear_expired(self, ttl_hours: int = 24): """Clear expired cache entries""" try: current_time = datetime.now().timestamp() max_age = ttl_hours * 3600 cleared_count = 0 for filename in os.listdir(self.cache_dir): if filename.endswith('.pkl'): filepath = os.path.join(self.cache_dir, filename) file_age = current_time - os.path.getmtime(filepath) if file_age > max_age: try: os.remove(filepath) cleared_count += 1 except Exception as e: logging.error(f"Failed to remove cache file {filepath}: {str(e)}") if cleared_count > 0: logging.info(f"Cleared {cleared_count} expired cache entries") except Exception as e: logging.error(f"Cache cleanup failed: {str(e)}") # Global cache instance cache_manager = CacheManager() def cache_results(func): """Decorator for caching function results""" def wrapper(*args, **kwargs): # Create cache key from function name and arguments cache_key = f"{func.__name__}_{str(args)}_{str(kwargs)}" # Try to get from cache cached_result = cache_manager.get(cache_key) if cached_result is not None: return cached_result # Execute function and cache result result = func(*args, **kwargs) cache_manager.set(cache_key, result) return result return wrapper def validate_input(text: str, min_length: int = 10, max_length: int = 10000) -> bool: """Validate input text""" if not text or not isinstance(text, str): return False text = text.strip() if len(text) < min_length or len(text) > max_length: return False return True def sanitize_filename(filename: str) -> str: """Sanitize filename for safe file system usage""" import re # Replace invalid characters sanitized = re.sub(r'[<>:"/\\|?*]', '_', filename) # Remove extra spaces and dots sanitized = re.sub(r'\s+', '_', sanitized) sanitized = re.sub(r'\.+', '.', sanitized) # Limit length if len(sanitized) > 200: sanitized = sanitized[:200] return sanitized def format_datetime(dt: datetime = None) -> str: """Format datetime for display""" if dt is None: dt = datetime.now() return dt.strftime("%Y-%m-%d %H:%M:%S") def calculate_processing_stats(start_time: datetime, num_articles: int) -> Dict[str, Any]: """Calculate processing statistics""" end_time = datetime.now() processing_time = (end_time - start_time).total_seconds() return { 'start_time': format_datetime(start_time), 'end_time': format_datetime(end_time), 'processing_time_seconds': processing_time, 'processing_time_formatted': f"{processing_time:.2f} seconds", 'articles_processed': num_articles, 'articles_per_second': round(num_articles / processing_time, 2) if processing_time > 0 else 0 } def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 100) -> List[str]: """Split text into overlapping chunks""" if len(text) <= chunk_size: return [text] chunks = [] start = 0 while start < len(text): end = start + chunk_size # If this isn't the last chunk, try to break at a sentence boundary if end < len(text): # Look for sentence boundaries in the last 100 characters last_part = text[end-100:end] sentence_end = max( last_part.rfind('.'), last_part.rfind('!'), last_part.rfind('?') ) if sentence_end != -1: end = end - 100 + sentence_end + 1 chunks.append(text[start:end].strip()) start = end - overlap return [chunk for chunk in chunks if chunk.strip()] def extract_domain(url: str) -> str: """Extract domain from URL""" try: from urllib.parse import urlparse parsed = urlparse(url) return parsed.netloc.replace('www.', '') except Exception: return 'unknown' def safe_divide(a: float, b: float, default: float = 0.0) -> float: """Safely divide two numbers""" try: return a / b if b != 0 else default except (TypeError, ZeroDivisionError): return default def truncate_text(text: str, max_length: int = 100, suffix: str = "...") -> str: """Truncate text to specified length""" if not text or len(text) <= max_length: return text return text[:max_length - len(suffix)] + suffix def get_file_size_mb(filepath: str) -> float: """Get file size in MB""" try: size_bytes = os.path.getsize(filepath) return round(size_bytes / (1024 * 1024), 2) except Exception: return 0.0 def ensure_directory(directory: str): """Ensure directory exists""" try: os.makedirs(directory, exist_ok=True) except Exception as e: logging.error(f"Failed to create directory {directory}: {str(e)}") def load_json_file(filepath: str) -> Optional[Dict]: """Load JSON file safely""" try: with open(filepath, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: logging.error(f"Failed to load JSON file {filepath}: {str(e)}") return None def save_json_file(data: Dict, filepath: str) -> bool: """Save data to JSON file safely""" try: ensure_directory(os.path.dirname(filepath)) with open(filepath, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, default=str) return True except Exception as e: logging.error(f"Failed to save JSON file {filepath}: {str(e)}") return False def merge_dictionaries(*dicts) -> Dict: """Merge multiple dictionaries""" result = {} for d in dicts: if isinstance(d, dict): result.update(d) return result def get_system_info() -> Dict[str, Any]: """Get basic system information""" import platform import psutil try: return { 'platform': platform.platform(), 'python_version': platform.python_version(), 'cpu_count': os.cpu_count(), 'memory_gb': round(psutil.virtual_memory().total / (1024**3), 2), 'available_memory_gb': round(psutil.virtual_memory().available / (1024**3), 2), 'disk_space_gb': round(psutil.disk_usage('/').total / (1024**3), 2) } except Exception as e: logging.error(f"Failed to get system info: {str(e)}") return {'error': str(e)} def format_number(num: float, precision: int = 2) -> str: """Format number for display""" try: if abs(num) >= 1_000_000: return f"{num / 1_000_000:.{precision}f}M" elif abs(num) >= 1_000: return f"{num / 1_000:.{precision}f}K" else: return f"{num:.{precision}f}" except Exception: return str(num) def calculate_sentiment_distribution(articles: List[Dict]) -> Dict[str, Any]: """Calculate sentiment distribution statistics""" try: if not articles: return {'positive': 0, 'negative': 0, 'neutral': 0, 'total': 0} sentiments = [] for article in articles: sentiment = article.get('sentiment', {}) compound = sentiment.get('compound', 0) sentiments.append(compound) positive_count = sum(1 for s in sentiments if s > 0.1) negative_count = sum(1 for s in sentiments if s < -0.1) neutral_count = len(sentiments) - positive_count - negative_count avg_sentiment = sum(sentiments) / len(sentiments) if sentiments else 0 return { 'positive': positive_count, 'negative': negative_count, 'neutral': neutral_count, 'total': len(articles), 'average_sentiment': round(avg_sentiment, 3), 'positive_percentage': round((positive_count / len(articles)) * 100, 1), 'negative_percentage': round((negative_count / len(articles)) * 100, 1), 'neutral_percentage': round((neutral_count / len(articles)) * 100, 1) } except Exception as e: logging.error(f"Sentiment distribution calculation failed: {str(e)}") return {'positive': 0, 'negative': 0, 'neutral': 0, 'total': 0} def create_progress_callback(progress_container=None): """Create a progress callback function for Streamlit""" def callback(progress: int, status: str): if progress_container: try: progress_container.progress(progress) if hasattr(progress_container, 'text'): progress_container.text(status) except Exception as e: logging.error(f"Progress callback error: {str(e)}") else: logging.info(f"Progress: {progress}% - {status}") return callback def validate_url(url: str) -> bool: """Validate if string is a valid URL""" import re url_pattern = re.compile( r'^https?://' # http:// or https:// r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain... r'localhost|' # localhost... r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip r'(?::\d+)?' # optional port r'(?:/?|[/?]\S+)', # resource path re.IGNORECASE ) return url_pattern.match(url) is not None class PerformanceTimer: """Context manager for timing operations""" def __init__(self, operation_name: str = "Operation"): self.operation_name = operation_name self.start_time = None self.end_time = None def __enter__(self): self.start_time = datetime.now() logging.info(f"Starting {self.operation_name}") return self def __exit__(self, exc_type, exc_val, exc_tb): self.end_time = datetime.now() duration = (self.end_time - self.start_time).total_seconds() logging.info(f"Completed {self.operation_name} in {duration:.2f} seconds") @property def duration(self) -> float: if self.start_time and self.end_time: return (self.end_time - self.start_time).total_seconds() return 0.0 def retry_operation(func, max_attempts: int = 3, delay: float = 1.0): """Retry an operation with exponential backoff""" import time for attempt in range(max_attempts): try: return func() except Exception as e: if attempt == max_attempts - 1: raise e wait_time = delay * (2 ** attempt) logging.warning(f"Attempt {attempt + 1} failed: {str(e)}. Retrying in {wait_time} seconds...") time.sleep(wait_time) return None