|
import logging |
|
import os |
|
import tempfile |
|
from typing import Dict, List, Optional |
|
import hashlib |
|
from datetime import datetime |
|
|
|
|
|
try: |
|
from gtts import gTTS |
|
GTTS_AVAILABLE = True |
|
except ImportError: |
|
GTTS_AVAILABLE = False |
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
class AudioGenerator: |
|
"""Text-to-speech audio generation with multilingual support""" |
|
|
|
def __init__(self): |
|
self.supported_languages = { |
|
'English': 'en', |
|
'Hindi': 'hi', |
|
'Tamil': 'ta' |
|
} |
|
|
|
|
|
self.cache_dir = tempfile.mkdtemp(prefix='news_audio_') |
|
self.audio_cache = {} |
|
|
|
logger.info(f"AudioGenerator initialized with cache directory: {self.cache_dir}") |
|
|
|
if not GTTS_AVAILABLE: |
|
logger.warning("gTTS not available. Audio generation will be limited.") |
|
|
|
def generate_audio(self, text: str, language: str = 'English', output_file: str = None) -> Optional[str]: |
|
"""Generate audio from text""" |
|
if not text or not text.strip(): |
|
logger.warning("Empty text provided for audio generation") |
|
return None |
|
|
|
if not GTTS_AVAILABLE: |
|
logger.error("gTTS not available for audio generation") |
|
return None |
|
|
|
try: |
|
|
|
lang_code = self.supported_languages.get(language, 'en') |
|
|
|
|
|
cache_key = self._create_cache_key(text, language) |
|
|
|
|
|
if cache_key in self.audio_cache: |
|
cached_file = self.audio_cache[cache_key] |
|
if os.path.exists(cached_file): |
|
logger.info(f"Using cached audio for {language}") |
|
return cached_file |
|
|
|
|
|
if not output_file: |
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
output_file = os.path.join(self.cache_dir, f"audio_{lang_code}_{timestamp}.mp3") |
|
elif not os.path.dirname(output_file): |
|
output_file = os.path.join(self.cache_dir, output_file) |
|
|
|
|
|
clean_text = self._prepare_text_for_tts(text) |
|
|
|
if not clean_text: |
|
logger.warning("No valid text for TTS after cleaning") |
|
return None |
|
|
|
|
|
if lang_code in ['en', 'hi']: |
|
|
|
tts = gTTS(text=clean_text, lang=lang_code, slow=False) |
|
elif lang_code == 'ta': |
|
|
|
try: |
|
tts = gTTS(text=clean_text, lang='ta', slow=False) |
|
except: |
|
logger.warning("Tamil not supported in gTTS, using English") |
|
tts = gTTS(text=clean_text, lang='en', slow=False) |
|
else: |
|
|
|
tts = gTTS(text=clean_text, lang='en', slow=False) |
|
|
|
|
|
tts.save(output_file) |
|
|
|
|
|
if os.path.exists(output_file) and os.path.getsize(output_file) > 0: |
|
|
|
self.audio_cache[cache_key] = output_file |
|
|
|
logger.info(f"Audio generated successfully: {output_file}") |
|
return output_file |
|
else: |
|
logger.error("Audio file was not created or is empty") |
|
return None |
|
|
|
except Exception as e: |
|
logger.error(f"Audio generation failed: {str(e)}") |
|
return None |
|
|
|
def _create_cache_key(self, text: str, language: str) -> str: |
|
"""Create a cache key for the text and language combination""" |
|
try: |
|
combined = f"{text[:500]}_{language}" |
|
return hashlib.md5(combined.encode()).hexdigest() |
|
except Exception as e: |
|
logger.error(f"Cache key creation failed: {str(e)}") |
|
return f"default_{datetime.now().strftime('%Y%m%d_%H%M%S')}" |
|
|
|
def _prepare_text_for_tts(self, text: str) -> str: |
|
"""Prepare text for text-to-speech conversion""" |
|
if not text: |
|
return "" |
|
|
|
|
|
import re |
|
|
|
|
|
text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text) |
|
|
|
|
|
text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '', text) |
|
|
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
|
|
|
|
text = re.sub(r'[.]{3,}', '...', text) |
|
text = re.sub(r'[!]{2,}', '!', text) |
|
text = re.sub(r'[?]{2,}', '?', text) |
|
|
|
|
|
text = re.sub(r'\([^)]*\)', '', text) |
|
text = re.sub(r'\[[^\]]*\]', '', text) |
|
|
|
|
|
max_length = 5000 |
|
if len(text) > max_length: |
|
|
|
sentences = re.split(r'[.!?]+', text[:max_length]) |
|
if len(sentences) > 1: |
|
text = '. '.join(sentences[:-1]) + '.' |
|
else: |
|
text = text[:max_length] + '...' |
|
|
|
return text.strip() |
|
|
|
def generate_batch_audio(self, texts: Dict[str, str], language: str = 'English') -> Dict[str, str]: |
|
"""Generate audio for multiple texts""" |
|
results = {} |
|
|
|
for key, text in texts.items(): |
|
try: |
|
output_file = f"audio_{key}_{language.lower()}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3" |
|
audio_file = self.generate_audio(text, language, output_file) |
|
results[key] = audio_file |
|
except Exception as e: |
|
logger.error(f"Batch audio generation failed for {key}: {str(e)}") |
|
results[key] = None |
|
|
|
return results |
|
|
|
def generate_summary_audio(self, articles: List[Dict], languages: List[str] = None) -> Dict[str, str]: |
|
"""Generate audio summaries for articles in multiple languages""" |
|
if languages is None: |
|
languages = ['English'] |
|
|
|
audio_files = {} |
|
|
|
try: |
|
|
|
summary_text = self._create_audio_summary(articles) |
|
|
|
if not summary_text: |
|
logger.warning("No summary text created for audio") |
|
return audio_files |
|
|
|
|
|
for language in languages: |
|
if language in self.supported_languages: |
|
try: |
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
output_file = f"summary_{language.lower()}_{timestamp}.mp3" |
|
|
|
audio_file = self.generate_audio(summary_text, language, output_file) |
|
|
|
if audio_file: |
|
audio_files[language] = audio_file |
|
else: |
|
logger.warning(f"Failed to generate audio for {language}") |
|
|
|
except Exception as e: |
|
logger.error(f"Audio generation failed for {language}: {str(e)}") |
|
continue |
|
else: |
|
logger.warning(f"Language {language} not supported for audio") |
|
|
|
return audio_files |
|
|
|
except Exception as e: |
|
logger.error(f"Summary audio generation failed: {str(e)}") |
|
return audio_files |
|
|
|
def _create_audio_summary(self, articles: List[Dict]) -> str: |
|
"""Create a comprehensive audio summary from articles""" |
|
try: |
|
if not articles: |
|
return "" |
|
|
|
|
|
positive_count = sum(1 for article in articles if article.get('sentiment', {}).get('compound', 0) > 0.1) |
|
negative_count = sum(1 for article in articles if article.get('sentiment', {}).get('compound', 0) < -0.1) |
|
neutral_count = len(articles) - positive_count - negative_count |
|
|
|
|
|
summary_parts = [] |
|
|
|
|
|
summary_parts.append(f"News analysis summary for {len(articles)} articles.") |
|
|
|
|
|
if positive_count > negative_count: |
|
summary_parts.append(f"Overall sentiment is predominantly positive, with {positive_count} positive articles, {negative_count} negative, and {neutral_count} neutral.") |
|
elif negative_count > positive_count: |
|
summary_parts.append(f"Overall sentiment is predominantly negative, with {negative_count} negative articles, {positive_count} positive, and {neutral_count} neutral.") |
|
else: |
|
summary_parts.append(f"Sentiment is mixed with balanced coverage across {positive_count} positive, {negative_count} negative, and {neutral_count} neutral articles.") |
|
|
|
|
|
|
|
positive_articles = sorted(articles, key=lambda x: x.get('sentiment', {}).get('compound', 0), reverse=True) |
|
if positive_articles and positive_articles[0].get('sentiment', {}).get('compound', 0) > 0.1: |
|
top_positive = positive_articles[0] |
|
summary_parts.append(f"Most positive coverage: {top_positive.get('title', '')[:100]}") |
|
|
|
|
|
negative_articles = sorted(articles, key=lambda x: x.get('sentiment', {}).get('compound', 0)) |
|
if negative_articles and negative_articles[0].get('sentiment', {}).get('compound', 0) < -0.1: |
|
top_negative = negative_articles[0] |
|
summary_parts.append(f"Most concerning coverage: {top_negative.get('title', '')[:100]}") |
|
|
|
|
|
recent_articles = [a for a in articles if a.get('date')] |
|
if recent_articles: |
|
recent_articles.sort(key=lambda x: x.get('date', ''), reverse=True) |
|
if len(recent_articles) > 0: |
|
summary_parts.append(f"Latest development: {recent_articles[0].get('title', '')[:100]}") |
|
|
|
|
|
summary_parts.append("This concludes the news analysis summary.") |
|
|
|
|
|
full_summary = " ".join(summary_parts) |
|
|
|
|
|
if len(full_summary) > 1000: |
|
|
|
sentences = full_summary.split('. ') |
|
truncated = '. '.join(sentences[:8]) + '.' |
|
return truncated |
|
|
|
return full_summary |
|
|
|
except Exception as e: |
|
logger.error(f"Audio summary creation failed: {str(e)}") |
|
return f"Analysis complete for {len(articles)} articles with mixed sentiment coverage." |
|
|
|
def cleanup_cache(self, max_age_hours: int = 24): |
|
"""Clean up old audio files from cache""" |
|
try: |
|
if not os.path.exists(self.cache_dir): |
|
return |
|
|
|
current_time = datetime.now().timestamp() |
|
max_age_seconds = max_age_hours * 3600 |
|
|
|
removed_count = 0 |
|
|
|
for filename in os.listdir(self.cache_dir): |
|
filepath = os.path.join(self.cache_dir, filename) |
|
|
|
if os.path.isfile(filepath): |
|
file_age = current_time - os.path.getmtime(filepath) |
|
|
|
if file_age > max_age_seconds: |
|
try: |
|
os.remove(filepath) |
|
removed_count += 1 |
|
|
|
|
|
cache_keys_to_remove = [k for k, v in self.audio_cache.items() if v == filepath] |
|
for key in cache_keys_to_remove: |
|
del self.audio_cache[key] |
|
|
|
except Exception as e: |
|
logger.error(f"Failed to remove old audio file {filepath}: {str(e)}") |
|
|
|
if removed_count > 0: |
|
logger.info(f"Cleaned up {removed_count} old audio files") |
|
|
|
except Exception as e: |
|
logger.error(f"Cache cleanup failed: {str(e)}") |
|
|
|
def get_cache_info(self) -> Dict[str, any]: |
|
"""Get information about the audio cache""" |
|
try: |
|
cache_info = { |
|
'cache_directory': self.cache_dir, |
|
'cached_files': len(self.audio_cache), |
|
'supported_languages': list(self.supported_languages.keys()), |
|
'gtts_available': GTTS_AVAILABLE |
|
} |
|
|
|
if os.path.exists(self.cache_dir): |
|
files = [f for f in os.listdir(self.cache_dir) if f.endswith('.mp3')] |
|
cache_info['physical_files'] = len(files) |
|
|
|
total_size = sum(os.path.getsize(os.path.join(self.cache_dir, f)) for f in files) |
|
cache_info['total_size_bytes'] = total_size |
|
cache_info['total_size_mb'] = round(total_size / (1024 * 1024), 2) |
|
|
|
return cache_info |
|
|
|
except Exception as e: |
|
logger.error(f"Cache info retrieval failed: {str(e)}") |
|
return {'error': str(e)} |
|
|
|
def is_language_supported(self, language: str) -> bool: |
|
"""Check if a language is supported for audio generation""" |
|
return language in self.supported_languages and GTTS_AVAILABLE |