wekey1998's picture
Rename tts_module.py to tts.py
0e57075 verified
import logging
import os
import tempfile
from typing import Dict, List, Optional
import hashlib
from datetime import datetime
# gTTS for text-to-speech
try:
from gtts import gTTS
GTTS_AVAILABLE = True
except ImportError:
GTTS_AVAILABLE = False
logger = logging.getLogger(__name__)
class AudioGenerator:
"""Text-to-speech audio generation with multilingual support"""
def __init__(self):
self.supported_languages = {
'English': 'en',
'Hindi': 'hi',
'Tamil': 'ta'
}
# Audio cache directory
self.cache_dir = tempfile.mkdtemp(prefix='news_audio_')
self.audio_cache = {}
logger.info(f"AudioGenerator initialized with cache directory: {self.cache_dir}")
if not GTTS_AVAILABLE:
logger.warning("gTTS not available. Audio generation will be limited.")
def generate_audio(self, text: str, language: str = 'English', output_file: str = None) -> Optional[str]:
"""Generate audio from text"""
if not text or not text.strip():
logger.warning("Empty text provided for audio generation")
return None
if not GTTS_AVAILABLE:
logger.error("gTTS not available for audio generation")
return None
try:
# Get language code
lang_code = self.supported_languages.get(language, 'en')
# Create cache key
cache_key = self._create_cache_key(text, language)
# Check cache first
if cache_key in self.audio_cache:
cached_file = self.audio_cache[cache_key]
if os.path.exists(cached_file):
logger.info(f"Using cached audio for {language}")
return cached_file
# Generate output filename if not provided
if not output_file:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = os.path.join(self.cache_dir, f"audio_{lang_code}_{timestamp}.mp3")
elif not os.path.dirname(output_file):
output_file = os.path.join(self.cache_dir, output_file)
# Prepare text for TTS
clean_text = self._prepare_text_for_tts(text)
if not clean_text:
logger.warning("No valid text for TTS after cleaning")
return None
# Generate audio using gTTS
if lang_code in ['en', 'hi']:
# gTTS supports English and Hindi directly
tts = gTTS(text=clean_text, lang=lang_code, slow=False)
elif lang_code == 'ta':
# For Tamil, use English as fallback or try Tamil if available
try:
tts = gTTS(text=clean_text, lang='ta', slow=False)
except:
logger.warning("Tamil not supported in gTTS, using English")
tts = gTTS(text=clean_text, lang='en', slow=False)
else:
# Default to English
tts = gTTS(text=clean_text, lang='en', slow=False)
# Save audio file
tts.save(output_file)
# Verify file was created
if os.path.exists(output_file) and os.path.getsize(output_file) > 0:
# Cache the result
self.audio_cache[cache_key] = output_file
logger.info(f"Audio generated successfully: {output_file}")
return output_file
else:
logger.error("Audio file was not created or is empty")
return None
except Exception as e:
logger.error(f"Audio generation failed: {str(e)}")
return None
def _create_cache_key(self, text: str, language: str) -> str:
"""Create a cache key for the text and language combination"""
try:
combined = f"{text[:500]}_{language}" # Use first 500 chars to avoid very long keys
return hashlib.md5(combined.encode()).hexdigest()
except Exception as e:
logger.error(f"Cache key creation failed: {str(e)}")
return f"default_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
def _prepare_text_for_tts(self, text: str) -> str:
"""Prepare text for text-to-speech conversion"""
if not text:
return ""
# Remove or replace problematic characters
import re
# Remove URLs
text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text)
# Remove email addresses
text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '', text)
# Replace multiple spaces with single space
text = re.sub(r'\s+', ' ', text)
# Remove excessive punctuation
text = re.sub(r'[.]{3,}', '...', text)
text = re.sub(r'[!]{2,}', '!', text)
text = re.sub(r'[?]{2,}', '?', text)
# Remove parenthetical citations and references
text = re.sub(r'\([^)]*\)', '', text)
text = re.sub(r'\[[^\]]*\]', '', text)
# Limit text length for TTS (gTTS has limits)
max_length = 5000 # Characters
if len(text) > max_length:
# Try to cut at sentence boundary
sentences = re.split(r'[.!?]+', text[:max_length])
if len(sentences) > 1:
text = '. '.join(sentences[:-1]) + '.'
else:
text = text[:max_length] + '...'
return text.strip()
def generate_batch_audio(self, texts: Dict[str, str], language: str = 'English') -> Dict[str, str]:
"""Generate audio for multiple texts"""
results = {}
for key, text in texts.items():
try:
output_file = f"audio_{key}_{language.lower()}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3"
audio_file = self.generate_audio(text, language, output_file)
results[key] = audio_file
except Exception as e:
logger.error(f"Batch audio generation failed for {key}: {str(e)}")
results[key] = None
return results
def generate_summary_audio(self, articles: List[Dict], languages: List[str] = None) -> Dict[str, str]:
"""Generate audio summaries for articles in multiple languages"""
if languages is None:
languages = ['English']
audio_files = {}
try:
# Create overall summary text
summary_text = self._create_audio_summary(articles)
if not summary_text:
logger.warning("No summary text created for audio")
return audio_files
# Generate audio for each language
for language in languages:
if language in self.supported_languages:
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = f"summary_{language.lower()}_{timestamp}.mp3"
audio_file = self.generate_audio(summary_text, language, output_file)
if audio_file:
audio_files[language] = audio_file
else:
logger.warning(f"Failed to generate audio for {language}")
except Exception as e:
logger.error(f"Audio generation failed for {language}: {str(e)}")
continue
else:
logger.warning(f"Language {language} not supported for audio")
return audio_files
except Exception as e:
logger.error(f"Summary audio generation failed: {str(e)}")
return audio_files
def _create_audio_summary(self, articles: List[Dict]) -> str:
"""Create a comprehensive audio summary from articles"""
try:
if not articles:
return ""
# Calculate sentiment distribution
positive_count = sum(1 for article in articles if article.get('sentiment', {}).get('compound', 0) > 0.1)
negative_count = sum(1 for article in articles if article.get('sentiment', {}).get('compound', 0) < -0.1)
neutral_count = len(articles) - positive_count - negative_count
# Start building summary
summary_parts = []
# Opening
summary_parts.append(f"News analysis summary for {len(articles)} articles.")
# Sentiment overview
if positive_count > negative_count:
summary_parts.append(f"Overall sentiment is predominantly positive, with {positive_count} positive articles, {negative_count} negative, and {neutral_count} neutral.")
elif negative_count > positive_count:
summary_parts.append(f"Overall sentiment is predominantly negative, with {negative_count} negative articles, {positive_count} positive, and {neutral_count} neutral.")
else:
summary_parts.append(f"Sentiment is mixed with balanced coverage across {positive_count} positive, {negative_count} negative, and {neutral_count} neutral articles.")
# Top stories
# Most positive story
positive_articles = sorted(articles, key=lambda x: x.get('sentiment', {}).get('compound', 0), reverse=True)
if positive_articles and positive_articles[0].get('sentiment', {}).get('compound', 0) > 0.1:
top_positive = positive_articles[0]
summary_parts.append(f"Most positive coverage: {top_positive.get('title', '')[:100]}")
# Most negative story
negative_articles = sorted(articles, key=lambda x: x.get('sentiment', {}).get('compound', 0))
if negative_articles and negative_articles[0].get('sentiment', {}).get('compound', 0) < -0.1:
top_negative = negative_articles[0]
summary_parts.append(f"Most concerning coverage: {top_negative.get('title', '')[:100]}")
# Recent developments (if we have dates)
recent_articles = [a for a in articles if a.get('date')]
if recent_articles:
recent_articles.sort(key=lambda x: x.get('date', ''), reverse=True)
if len(recent_articles) > 0:
summary_parts.append(f"Latest development: {recent_articles[0].get('title', '')[:100]}")
# Closing
summary_parts.append("This concludes the news analysis summary.")
# Join all parts
full_summary = " ".join(summary_parts)
# Ensure reasonable length
if len(full_summary) > 1000:
# Truncate to first few sentences
sentences = full_summary.split('. ')
truncated = '. '.join(sentences[:8]) + '.'
return truncated
return full_summary
except Exception as e:
logger.error(f"Audio summary creation failed: {str(e)}")
return f"Analysis complete for {len(articles)} articles with mixed sentiment coverage."
def cleanup_cache(self, max_age_hours: int = 24):
"""Clean up old audio files from cache"""
try:
if not os.path.exists(self.cache_dir):
return
current_time = datetime.now().timestamp()
max_age_seconds = max_age_hours * 3600
removed_count = 0
for filename in os.listdir(self.cache_dir):
filepath = os.path.join(self.cache_dir, filename)
if os.path.isfile(filepath):
file_age = current_time - os.path.getmtime(filepath)
if file_age > max_age_seconds:
try:
os.remove(filepath)
removed_count += 1
# Remove from cache dict as well
cache_keys_to_remove = [k for k, v in self.audio_cache.items() if v == filepath]
for key in cache_keys_to_remove:
del self.audio_cache[key]
except Exception as e:
logger.error(f"Failed to remove old audio file {filepath}: {str(e)}")
if removed_count > 0:
logger.info(f"Cleaned up {removed_count} old audio files")
except Exception as e:
logger.error(f"Cache cleanup failed: {str(e)}")
def get_cache_info(self) -> Dict[str, any]:
"""Get information about the audio cache"""
try:
cache_info = {
'cache_directory': self.cache_dir,
'cached_files': len(self.audio_cache),
'supported_languages': list(self.supported_languages.keys()),
'gtts_available': GTTS_AVAILABLE
}
if os.path.exists(self.cache_dir):
files = [f for f in os.listdir(self.cache_dir) if f.endswith('.mp3')]
cache_info['physical_files'] = len(files)
total_size = sum(os.path.getsize(os.path.join(self.cache_dir, f)) for f in files)
cache_info['total_size_bytes'] = total_size
cache_info['total_size_mb'] = round(total_size / (1024 * 1024), 2)
return cache_info
except Exception as e:
logger.error(f"Cache info retrieval failed: {str(e)}")
return {'error': str(e)}
def is_language_supported(self, language: str) -> bool:
"""Check if a language is supported for audio generation"""
return language in self.supported_languages and GTTS_AVAILABLE