Spaces:
Sleeping
Sleeping
import logging | |
import os | |
import json | |
import pickle | |
import hashlib | |
from datetime import datetime, timedelta | |
from typing import Dict, Any, Optional, List | |
import tempfile | |
import sys | |
def setup_logging(): | |
"""Setup logging configuration""" | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.StreamHandler(sys.stdout), | |
logging.FileHandler('news_analyzer.log') | |
] | |
) | |
# Reduce noise from transformers and other libraries | |
logging.getLogger("transformers").setLevel(logging.WARNING) | |
logging.getLogger("urllib3").setLevel(logging.WARNING) | |
logging.getLogger("requests").setLevel(logging.WARNING) | |
def load_config() -> Dict[str, Any]: | |
"""Load application configuration""" | |
default_config = { | |
'max_articles': 50, | |
'cache_ttl_hours': 6, | |
'supported_languages': ['English', 'Hindi', 'Tamil'], | |
'sentiment_models': ['VADER', 'Loughran-McDonald', 'FinBERT'], | |
'summarization_max_length': 150, | |
'summarization_min_length': 50, | |
'audio_enabled': True, | |
'translation_enabled': True, | |
'keyword_extraction_enabled': True, | |
'max_keywords': 20, | |
'debug_mode': False | |
} | |
# Try to load config from file if it exists | |
config_file = 'config.json' | |
if os.path.exists(config_file): | |
try: | |
with open(config_file, 'r') as f: | |
file_config = json.load(f) | |
default_config.update(file_config) | |
except Exception as e: | |
logging.error(f"Failed to load config file: {str(e)}") | |
return default_config | |
class CacheManager: | |
"""Simple file-based caching system""" | |
def __init__(self, cache_dir: str = None): | |
self.cache_dir = cache_dir or tempfile.mkdtemp(prefix='news_cache_') | |
self.ensure_cache_dir() | |
logging.info(f"Cache manager initialized with directory: {self.cache_dir}") | |
def ensure_cache_dir(self): | |
"""Ensure cache directory exists""" | |
try: | |
os.makedirs(self.cache_dir, exist_ok=True) | |
except Exception as e: | |
logging.error(f"Failed to create cache directory: {str(e)}") | |
def _get_cache_key(self, key: str) -> str: | |
"""Generate a safe cache key""" | |
return hashlib.md5(key.encode()).hexdigest() | |
def get(self, key: str, ttl_hours: int = 6) -> Optional[Any]: | |
"""Get item from cache""" | |
try: | |
cache_key = self._get_cache_key(key) | |
cache_file = os.path.join(self.cache_dir, f"{cache_key}.pkl") | |
if not os.path.exists(cache_file): | |
return None | |
# Check if cache is expired | |
file_age = datetime.now().timestamp() - os.path.getmtime(cache_file) | |
if file_age > ttl_hours * 3600: | |
try: | |
os.remove(cache_file) | |
except: | |
pass | |
return None | |
# Load cached data | |
with open(cache_file, 'rb') as f: | |
data = pickle.load(f) | |
logging.debug(f"Cache hit for key: {key[:50]}...") | |
return data | |
except Exception as e: | |
logging.error(f"Cache get failed for key {key}: {str(e)}") | |
return None | |
def set(self, key: str, value: Any) -> bool: | |
"""Set item in cache""" | |
try: | |
cache_key = self._get_cache_key(key) | |
cache_file = os.path.join(self.cache_dir, f"{cache_key}.pkl") | |
with open(cache_file, 'wb') as f: | |
pickle.dump(value, f) | |
logging.debug(f"Cache set for key: {key[:50]}...") | |
return True | |
except Exception as e: | |
logging.error(f"Cache set failed for key {key}: {str(e)}") | |
return False | |
def clear_expired(self, ttl_hours: int = 24): | |
"""Clear expired cache entries""" | |
try: | |
current_time = datetime.now().timestamp() | |
max_age = ttl_hours * 3600 | |
cleared_count = 0 | |
for filename in os.listdir(self.cache_dir): | |
if filename.endswith('.pkl'): | |
filepath = os.path.join(self.cache_dir, filename) | |
file_age = current_time - os.path.getmtime(filepath) | |
if file_age > max_age: | |
try: | |
os.remove(filepath) | |
cleared_count += 1 | |
except Exception as e: | |
logging.error(f"Failed to remove cache file {filepath}: {str(e)}") | |
if cleared_count > 0: | |
logging.info(f"Cleared {cleared_count} expired cache entries") | |
except Exception as e: | |
logging.error(f"Cache cleanup failed: {str(e)}") | |
# Global cache instance | |
cache_manager = CacheManager() | |
def cache_results(func): | |
"""Decorator for caching function results""" | |
def wrapper(*args, **kwargs): | |
# Create cache key from function name and arguments | |
cache_key = f"{func.__name__}_{str(args)}_{str(kwargs)}" | |
# Try to get from cache | |
cached_result = cache_manager.get(cache_key) | |
if cached_result is not None: | |
return cached_result | |
# Execute function and cache result | |
result = func(*args, **kwargs) | |
cache_manager.set(cache_key, result) | |
return result | |
return wrapper | |
def validate_input(text: str, min_length: int = 10, max_length: int = 10000) -> bool: | |
"""Validate input text""" | |
if not text or not isinstance(text, str): | |
return False | |
text = text.strip() | |
if len(text) < min_length or len(text) > max_length: | |
return False | |
return True | |
def sanitize_filename(filename: str) -> str: | |
"""Sanitize filename for safe file system usage""" | |
import re | |
# Replace invalid characters | |
sanitized = re.sub(r'[<>:"/\\|?*]', '_', filename) | |
# Remove extra spaces and dots | |
sanitized = re.sub(r'\s+', '_', sanitized) | |
sanitized = re.sub(r'\.+', '.', sanitized) | |
# Limit length | |
if len(sanitized) > 200: | |
sanitized = sanitized[:200] | |
return sanitized | |
def format_datetime(dt: datetime = None) -> str: | |
"""Format datetime for display""" | |
if dt is None: | |
dt = datetime.now() | |
return dt.strftime("%Y-%m-%d %H:%M:%S") | |
def calculate_processing_stats(start_time: datetime, num_articles: int) -> Dict[str, Any]: | |
"""Calculate processing statistics""" | |
end_time = datetime.now() | |
processing_time = (end_time - start_time).total_seconds() | |
return { | |
'start_time': format_datetime(start_time), | |
'end_time': format_datetime(end_time), | |
'processing_time_seconds': processing_time, | |
'processing_time_formatted': f"{processing_time:.2f} seconds", | |
'articles_processed': num_articles, | |
'articles_per_second': round(num_articles / processing_time, 2) if processing_time > 0 else 0 | |
} | |
def chunk_text(text: str, chunk_size: int = 1000, overlap: int = 100) -> List[str]: | |
"""Split text into overlapping chunks""" | |
if len(text) <= chunk_size: | |
return [text] | |
chunks = [] | |
start = 0 | |
while start < len(text): | |
end = start + chunk_size | |
# If this isn't the last chunk, try to break at a sentence boundary | |
if end < len(text): | |
# Look for sentence boundaries in the last 100 characters | |
last_part = text[end-100:end] | |
sentence_end = max( | |
last_part.rfind('.'), | |
last_part.rfind('!'), | |
last_part.rfind('?') | |
) | |
if sentence_end != -1: | |
end = end - 100 + sentence_end + 1 | |
chunks.append(text[start:end].strip()) | |
start = end - overlap | |
return [chunk for chunk in chunks if chunk.strip()] | |
def extract_domain(url: str) -> str: | |
"""Extract domain from URL""" | |
try: | |
from urllib.parse import urlparse | |
parsed = urlparse(url) | |
return parsed.netloc.replace('www.', '') | |
except Exception: | |
return 'unknown' | |
def safe_divide(a: float, b: float, default: float = 0.0) -> float: | |
"""Safely divide two numbers""" | |
try: | |
return a / b if b != 0 else default | |
except (TypeError, ZeroDivisionError): | |
return default | |
def truncate_text(text: str, max_length: int = 100, suffix: str = "...") -> str: | |
"""Truncate text to specified length""" | |
if not text or len(text) <= max_length: | |
return text | |
return text[:max_length - len(suffix)] + suffix | |
def get_file_size_mb(filepath: str) -> float: | |
"""Get file size in MB""" | |
try: | |
size_bytes = os.path.getsize(filepath) | |
return round(size_bytes / (1024 * 1024), 2) | |
except Exception: | |
return 0.0 | |
def ensure_directory(directory: str): | |
"""Ensure directory exists""" | |
try: | |
os.makedirs(directory, exist_ok=True) | |
except Exception as e: | |
logging.error(f"Failed to create directory {directory}: {str(e)}") | |
def load_json_file(filepath: str) -> Optional[Dict]: | |
"""Load JSON file safely""" | |
try: | |
with open(filepath, 'r', encoding='utf-8') as f: | |
return json.load(f) | |
except Exception as e: | |
logging.error(f"Failed to load JSON file {filepath}: {str(e)}") | |
return None | |
def save_json_file(data: Dict, filepath: str) -> bool: | |
"""Save data to JSON file safely""" | |
try: | |
ensure_directory(os.path.dirname(filepath)) | |
with open(filepath, 'w', encoding='utf-8') as f: | |
json.dump(data, f, indent=2, default=str) | |
return True | |
except Exception as e: | |
logging.error(f"Failed to save JSON file {filepath}: {str(e)}") | |
return False | |
def merge_dictionaries(*dicts) -> Dict: | |
"""Merge multiple dictionaries""" | |
result = {} | |
for d in dicts: | |
if isinstance(d, dict): | |
result.update(d) | |
return result | |
def get_system_info() -> Dict[str, Any]: | |
"""Get basic system information""" | |
import platform | |
import psutil | |
try: | |
return { | |
'platform': platform.platform(), | |
'python_version': platform.python_version(), | |
'cpu_count': os.cpu_count(), | |
'memory_gb': round(psutil.virtual_memory().total / (1024**3), 2), | |
'available_memory_gb': round(psutil.virtual_memory().available / (1024**3), 2), | |
'disk_space_gb': round(psutil.disk_usage('/').total / (1024**3), 2) | |
} | |
except Exception as e: | |
logging.error(f"Failed to get system info: {str(e)}") | |
return {'error': str(e)} | |
def format_number(num: float, precision: int = 2) -> str: | |
"""Format number for display""" | |
try: | |
if abs(num) >= 1_000_000: | |
return f"{num / 1_000_000:.{precision}f}M" | |
elif abs(num) >= 1_000: | |
return f"{num / 1_000:.{precision}f}K" | |
else: | |
return f"{num:.{precision}f}" | |
except Exception: | |
return str(num) | |
def calculate_sentiment_distribution(articles: List[Dict]) -> Dict[str, Any]: | |
"""Calculate sentiment distribution statistics""" | |
try: | |
if not articles: | |
return {'positive': 0, 'negative': 0, 'neutral': 0, 'total': 0} | |
sentiments = [] | |
for article in articles: | |
sentiment = article.get('sentiment', {}) | |
compound = sentiment.get('compound', 0) | |
sentiments.append(compound) | |
positive_count = sum(1 for s in sentiments if s > 0.1) | |
negative_count = sum(1 for s in sentiments if s < -0.1) | |
neutral_count = len(sentiments) - positive_count - negative_count | |
avg_sentiment = sum(sentiments) / len(sentiments) if sentiments else 0 | |
return { | |
'positive': positive_count, | |
'negative': negative_count, | |
'neutral': neutral_count, | |
'total': len(articles), | |
'average_sentiment': round(avg_sentiment, 3), | |
'positive_percentage': round((positive_count / len(articles)) * 100, 1), | |
'negative_percentage': round((negative_count / len(articles)) * 100, 1), | |
'neutral_percentage': round((neutral_count / len(articles)) * 100, 1) | |
} | |
except Exception as e: | |
logging.error(f"Sentiment distribution calculation failed: {str(e)}") | |
return {'positive': 0, 'negative': 0, 'neutral': 0, 'total': 0} | |
def create_progress_callback(progress_container=None): | |
"""Create a progress callback function for Streamlit""" | |
def callback(progress: int, status: str): | |
if progress_container: | |
try: | |
progress_container.progress(progress) | |
if hasattr(progress_container, 'text'): | |
progress_container.text(status) | |
except Exception as e: | |
logging.error(f"Progress callback error: {str(e)}") | |
else: | |
logging.info(f"Progress: {progress}% - {status}") | |
return callback | |
def validate_url(url: str) -> bool: | |
"""Validate if string is a valid URL""" | |
import re | |
url_pattern = re.compile( | |
r'^https?://' # http:// or https:// | |
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain... | |
r'localhost|' # localhost... | |
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip | |
r'(?::\d+)?' # optional port | |
r'(?:/?|[/?]\S+)', # resource path | |
re.IGNORECASE | |
) | |
return url_pattern.match(url) is not None | |
class PerformanceTimer: | |
"""Context manager for timing operations""" | |
def __init__(self, operation_name: str = "Operation"): | |
self.operation_name = operation_name | |
self.start_time = None | |
self.end_time = None | |
def __enter__(self): | |
self.start_time = datetime.now() | |
logging.info(f"Starting {self.operation_name}") | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
self.end_time = datetime.now() | |
duration = (self.end_time - self.start_time).total_seconds() | |
logging.info(f"Completed {self.operation_name} in {duration:.2f} seconds") | |
def duration(self) -> float: | |
if self.start_time and self.end_time: | |
return (self.end_time - self.start_time).total_seconds() | |
return 0.0 | |
def retry_operation(func, max_attempts: int = 3, delay: float = 1.0): | |
"""Retry an operation with exponential backoff""" | |
import time | |
for attempt in range(max_attempts): | |
try: | |
return func() | |
except Exception as e: | |
if attempt == max_attempts - 1: | |
raise e | |
wait_time = delay * (2 ** attempt) | |
logging.warning(f"Attempt {attempt + 1} failed: {str(e)}. Retrying in {wait_time} seconds...") | |
time.sleep(wait_time) | |
return None |