import logging import os import tempfile from typing import Dict, List, Optional import hashlib from datetime import datetime # gTTS for text-to-speech try: from gtts import gTTS GTTS_AVAILABLE = True except ImportError: GTTS_AVAILABLE = False logger = logging.getLogger(__name__) class AudioGenerator: """Text-to-speech audio generation with multilingual support""" def __init__(self): self.supported_languages = { 'English': 'en', 'Hindi': 'hi', 'Tamil': 'ta' } # Audio cache directory self.cache_dir = tempfile.mkdtemp(prefix='news_audio_') self.audio_cache = {} logger.info(f"AudioGenerator initialized with cache directory: {self.cache_dir}") if not GTTS_AVAILABLE: logger.warning("gTTS not available. Audio generation will be limited.") def generate_audio(self, text: str, language: str = 'English', output_file: str = None) -> Optional[str]: """Generate audio from text""" if not text or not text.strip(): logger.warning("Empty text provided for audio generation") return None if not GTTS_AVAILABLE: logger.error("gTTS not available for audio generation") return None try: # Get language code lang_code = self.supported_languages.get(language, 'en') # Create cache key cache_key = self._create_cache_key(text, language) # Check cache first if cache_key in self.audio_cache: cached_file = self.audio_cache[cache_key] if os.path.exists(cached_file): logger.info(f"Using cached audio for {language}") return cached_file # Generate output filename if not provided if not output_file: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = os.path.join(self.cache_dir, f"audio_{lang_code}_{timestamp}.mp3") elif not os.path.dirname(output_file): output_file = os.path.join(self.cache_dir, output_file) # Prepare text for TTS clean_text = self._prepare_text_for_tts(text) if not clean_text: logger.warning("No valid text for TTS after cleaning") return None # Generate audio using gTTS if lang_code in ['en', 'hi']: # gTTS supports English and Hindi directly tts = gTTS(text=clean_text, lang=lang_code, slow=False) elif lang_code == 'ta': # For Tamil, use English as fallback or try Tamil if available try: tts = gTTS(text=clean_text, lang='ta', slow=False) except: logger.warning("Tamil not supported in gTTS, using English") tts = gTTS(text=clean_text, lang='en', slow=False) else: # Default to English tts = gTTS(text=clean_text, lang='en', slow=False) # Save audio file tts.save(output_file) # Verify file was created if os.path.exists(output_file) and os.path.getsize(output_file) > 0: # Cache the result self.audio_cache[cache_key] = output_file logger.info(f"Audio generated successfully: {output_file}") return output_file else: logger.error("Audio file was not created or is empty") return None except Exception as e: logger.error(f"Audio generation failed: {str(e)}") return None def _create_cache_key(self, text: str, language: str) -> str: """Create a cache key for the text and language combination""" try: combined = f"{text[:500]}_{language}" # Use first 500 chars to avoid very long keys return hashlib.md5(combined.encode()).hexdigest() except Exception as e: logger.error(f"Cache key creation failed: {str(e)}") return f"default_{datetime.now().strftime('%Y%m%d_%H%M%S')}" def _prepare_text_for_tts(self, text: str) -> str: """Prepare text for text-to-speech conversion""" if not text: return "" # Remove or replace problematic characters import re # Remove URLs text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text) # Remove email addresses text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '', text) # Replace multiple spaces with single space text = re.sub(r'\s+', ' ', text) # Remove excessive punctuation text = re.sub(r'[.]{3,}', '...', text) text = re.sub(r'[!]{2,}', '!', text) text = re.sub(r'[?]{2,}', '?', text) # Remove parenthetical citations and references text = re.sub(r'\([^)]*\)', '', text) text = re.sub(r'\[[^\]]*\]', '', text) # Limit text length for TTS (gTTS has limits) max_length = 5000 # Characters if len(text) > max_length: # Try to cut at sentence boundary sentences = re.split(r'[.!?]+', text[:max_length]) if len(sentences) > 1: text = '. '.join(sentences[:-1]) + '.' else: text = text[:max_length] + '...' return text.strip() def generate_batch_audio(self, texts: Dict[str, str], language: str = 'English') -> Dict[str, str]: """Generate audio for multiple texts""" results = {} for key, text in texts.items(): try: output_file = f"audio_{key}_{language.lower()}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3" audio_file = self.generate_audio(text, language, output_file) results[key] = audio_file except Exception as e: logger.error(f"Batch audio generation failed for {key}: {str(e)}") results[key] = None return results def generate_summary_audio(self, articles: List[Dict], languages: List[str] = None) -> Dict[str, str]: """Generate audio summaries for articles in multiple languages""" if languages is None: languages = ['English'] audio_files = {} try: # Create overall summary text summary_text = self._create_audio_summary(articles) if not summary_text: logger.warning("No summary text created for audio") return audio_files # Generate audio for each language for language in languages: if language in self.supported_languages: try: timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") output_file = f"summary_{language.lower()}_{timestamp}.mp3" audio_file = self.generate_audio(summary_text, language, output_file) if audio_file: audio_files[language] = audio_file else: logger.warning(f"Failed to generate audio for {language}") except Exception as e: logger.error(f"Audio generation failed for {language}: {str(e)}") continue else: logger.warning(f"Language {language} not supported for audio") return audio_files except Exception as e: logger.error(f"Summary audio generation failed: {str(e)}") return audio_files def _create_audio_summary(self, articles: List[Dict]) -> str: """Create a comprehensive audio summary from articles""" try: if not articles: return "" # Calculate sentiment distribution positive_count = sum(1 for article in articles if article.get('sentiment', {}).get('compound', 0) > 0.1) negative_count = sum(1 for article in articles if article.get('sentiment', {}).get('compound', 0) < -0.1) neutral_count = len(articles) - positive_count - negative_count # Start building summary summary_parts = [] # Opening summary_parts.append(f"News analysis summary for {len(articles)} articles.") # Sentiment overview if positive_count > negative_count: summary_parts.append(f"Overall sentiment is predominantly positive, with {positive_count} positive articles, {negative_count} negative, and {neutral_count} neutral.") elif negative_count > positive_count: summary_parts.append(f"Overall sentiment is predominantly negative, with {negative_count} negative articles, {positive_count} positive, and {neutral_count} neutral.") else: summary_parts.append(f"Sentiment is mixed with balanced coverage across {positive_count} positive, {negative_count} negative, and {neutral_count} neutral articles.") # Top stories # Most positive story positive_articles = sorted(articles, key=lambda x: x.get('sentiment', {}).get('compound', 0), reverse=True) if positive_articles and positive_articles[0].get('sentiment', {}).get('compound', 0) > 0.1: top_positive = positive_articles[0] summary_parts.append(f"Most positive coverage: {top_positive.get('title', '')[:100]}") # Most negative story negative_articles = sorted(articles, key=lambda x: x.get('sentiment', {}).get('compound', 0)) if negative_articles and negative_articles[0].get('sentiment', {}).get('compound', 0) < -0.1: top_negative = negative_articles[0] summary_parts.append(f"Most concerning coverage: {top_negative.get('title', '')[:100]}") # Recent developments (if we have dates) recent_articles = [a for a in articles if a.get('date')] if recent_articles: recent_articles.sort(key=lambda x: x.get('date', ''), reverse=True) if len(recent_articles) > 0: summary_parts.append(f"Latest development: {recent_articles[0].get('title', '')[:100]}") # Closing summary_parts.append("This concludes the news analysis summary.") # Join all parts full_summary = " ".join(summary_parts) # Ensure reasonable length if len(full_summary) > 1000: # Truncate to first few sentences sentences = full_summary.split('. ') truncated = '. '.join(sentences[:8]) + '.' return truncated return full_summary except Exception as e: logger.error(f"Audio summary creation failed: {str(e)}") return f"Analysis complete for {len(articles)} articles with mixed sentiment coverage." def cleanup_cache(self, max_age_hours: int = 24): """Clean up old audio files from cache""" try: if not os.path.exists(self.cache_dir): return current_time = datetime.now().timestamp() max_age_seconds = max_age_hours * 3600 removed_count = 0 for filename in os.listdir(self.cache_dir): filepath = os.path.join(self.cache_dir, filename) if os.path.isfile(filepath): file_age = current_time - os.path.getmtime(filepath) if file_age > max_age_seconds: try: os.remove(filepath) removed_count += 1 # Remove from cache dict as well cache_keys_to_remove = [k for k, v in self.audio_cache.items() if v == filepath] for key in cache_keys_to_remove: del self.audio_cache[key] except Exception as e: logger.error(f"Failed to remove old audio file {filepath}: {str(e)}") if removed_count > 0: logger.info(f"Cleaned up {removed_count} old audio files") except Exception as e: logger.error(f"Cache cleanup failed: {str(e)}") def get_cache_info(self) -> Dict[str, any]: """Get information about the audio cache""" try: cache_info = { 'cache_directory': self.cache_dir, 'cached_files': len(self.audio_cache), 'supported_languages': list(self.supported_languages.keys()), 'gtts_available': GTTS_AVAILABLE } if os.path.exists(self.cache_dir): files = [f for f in os.listdir(self.cache_dir) if f.endswith('.mp3')] cache_info['physical_files'] = len(files) total_size = sum(os.path.getsize(os.path.join(self.cache_dir, f)) for f in files) cache_info['total_size_bytes'] = total_size cache_info['total_size_mb'] = round(total_size / (1024 * 1024), 2) return cache_info except Exception as e: logger.error(f"Cache info retrieval failed: {str(e)}") return {'error': str(e)} def is_language_supported(self, language: str) -> bool: """Check if a language is supported for audio generation""" return language in self.supported_languages and GTTS_AVAILABLE