Spaces:
Sleeping
Sleeping
import logging | |
import os | |
import tempfile | |
from typing import Dict, List, Optional | |
import hashlib | |
from datetime import datetime | |
# gTTS for text-to-speech | |
try: | |
from gtts import gTTS | |
GTTS_AVAILABLE = True | |
except ImportError: | |
GTTS_AVAILABLE = False | |
logger = logging.getLogger(__name__) | |
class AudioGenerator: | |
"""Text-to-speech audio generation with multilingual support""" | |
def __init__(self): | |
self.supported_languages = { | |
'English': 'en', | |
'Hindi': 'hi', | |
'Tamil': 'ta' | |
} | |
# Audio cache directory | |
self.cache_dir = tempfile.mkdtemp(prefix='news_audio_') | |
self.audio_cache = {} | |
logger.info(f"AudioGenerator initialized with cache directory: {self.cache_dir}") | |
if not GTTS_AVAILABLE: | |
logger.warning("gTTS not available. Audio generation will be limited.") | |
def generate_audio(self, text: str, language: str = 'English', output_file: str = None) -> Optional[str]: | |
"""Generate audio from text""" | |
if not text or not text.strip(): | |
logger.warning("Empty text provided for audio generation") | |
return None | |
if not GTTS_AVAILABLE: | |
logger.error("gTTS not available for audio generation") | |
return None | |
try: | |
# Get language code | |
lang_code = self.supported_languages.get(language, 'en') | |
# Create cache key | |
cache_key = self._create_cache_key(text, language) | |
# Check cache first | |
if cache_key in self.audio_cache: | |
cached_file = self.audio_cache[cache_key] | |
if os.path.exists(cached_file): | |
logger.info(f"Using cached audio for {language}") | |
return cached_file | |
# Generate output filename if not provided | |
if not output_file: | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
output_file = os.path.join(self.cache_dir, f"audio_{lang_code}_{timestamp}.mp3") | |
elif not os.path.dirname(output_file): | |
output_file = os.path.join(self.cache_dir, output_file) | |
# Prepare text for TTS | |
clean_text = self._prepare_text_for_tts(text) | |
if not clean_text: | |
logger.warning("No valid text for TTS after cleaning") | |
return None | |
# Generate audio using gTTS | |
if lang_code in ['en', 'hi']: | |
# gTTS supports English and Hindi directly | |
tts = gTTS(text=clean_text, lang=lang_code, slow=False) | |
elif lang_code == 'ta': | |
# For Tamil, use English as fallback or try Tamil if available | |
try: | |
tts = gTTS(text=clean_text, lang='ta', slow=False) | |
except: | |
logger.warning("Tamil not supported in gTTS, using English") | |
tts = gTTS(text=clean_text, lang='en', slow=False) | |
else: | |
# Default to English | |
tts = gTTS(text=clean_text, lang='en', slow=False) | |
# Save audio file | |
tts.save(output_file) | |
# Verify file was created | |
if os.path.exists(output_file) and os.path.getsize(output_file) > 0: | |
# Cache the result | |
self.audio_cache[cache_key] = output_file | |
logger.info(f"Audio generated successfully: {output_file}") | |
return output_file | |
else: | |
logger.error("Audio file was not created or is empty") | |
return None | |
except Exception as e: | |
logger.error(f"Audio generation failed: {str(e)}") | |
return None | |
def _create_cache_key(self, text: str, language: str) -> str: | |
"""Create a cache key for the text and language combination""" | |
try: | |
combined = f"{text[:500]}_{language}" # Use first 500 chars to avoid very long keys | |
return hashlib.md5(combined.encode()).hexdigest() | |
except Exception as e: | |
logger.error(f"Cache key creation failed: {str(e)}") | |
return f"default_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
def _prepare_text_for_tts(self, text: str) -> str: | |
"""Prepare text for text-to-speech conversion""" | |
if not text: | |
return "" | |
# Remove or replace problematic characters | |
import re | |
# Remove URLs | |
text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text) | |
# Remove email addresses | |
text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '', text) | |
# Replace multiple spaces with single space | |
text = re.sub(r'\s+', ' ', text) | |
# Remove excessive punctuation | |
text = re.sub(r'[.]{3,}', '...', text) | |
text = re.sub(r'[!]{2,}', '!', text) | |
text = re.sub(r'[?]{2,}', '?', text) | |
# Remove parenthetical citations and references | |
text = re.sub(r'\([^)]*\)', '', text) | |
text = re.sub(r'\[[^\]]*\]', '', text) | |
# Limit text length for TTS (gTTS has limits) | |
max_length = 5000 # Characters | |
if len(text) > max_length: | |
# Try to cut at sentence boundary | |
sentences = re.split(r'[.!?]+', text[:max_length]) | |
if len(sentences) > 1: | |
text = '. '.join(sentences[:-1]) + '.' | |
else: | |
text = text[:max_length] + '...' | |
return text.strip() | |
def generate_batch_audio(self, texts: Dict[str, str], language: str = 'English') -> Dict[str, str]: | |
"""Generate audio for multiple texts""" | |
results = {} | |
for key, text in texts.items(): | |
try: | |
output_file = f"audio_{key}_{language.lower()}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3" | |
audio_file = self.generate_audio(text, language, output_file) | |
results[key] = audio_file | |
except Exception as e: | |
logger.error(f"Batch audio generation failed for {key}: {str(e)}") | |
results[key] = None | |
return results | |
def generate_summary_audio(self, articles: List[Dict], languages: List[str] = None) -> Dict[str, str]: | |
"""Generate audio summaries for articles in multiple languages""" | |
if languages is None: | |
languages = ['English'] | |
audio_files = {} | |
try: | |
# Create overall summary text | |
summary_text = self._create_audio_summary(articles) | |
if not summary_text: | |
logger.warning("No summary text created for audio") | |
return audio_files | |
# Generate audio for each language | |
for language in languages: | |
if language in self.supported_languages: | |
try: | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
output_file = f"summary_{language.lower()}_{timestamp}.mp3" | |
audio_file = self.generate_audio(summary_text, language, output_file) | |
if audio_file: | |
audio_files[language] = audio_file | |
else: | |
logger.warning(f"Failed to generate audio for {language}") | |
except Exception as e: | |
logger.error(f"Audio generation failed for {language}: {str(e)}") | |
continue | |
else: | |
logger.warning(f"Language {language} not supported for audio") | |
return audio_files | |
except Exception as e: | |
logger.error(f"Summary audio generation failed: {str(e)}") | |
return audio_files | |
def _create_audio_summary(self, articles: List[Dict]) -> str: | |
"""Create a comprehensive audio summary from articles""" | |
try: | |
if not articles: | |
return "" | |
# Calculate sentiment distribution | |
positive_count = sum(1 for article in articles if article.get('sentiment', {}).get('compound', 0) > 0.1) | |
negative_count = sum(1 for article in articles if article.get('sentiment', {}).get('compound', 0) < -0.1) | |
neutral_count = len(articles) - positive_count - negative_count | |
# Start building summary | |
summary_parts = [] | |
# Opening | |
summary_parts.append(f"News analysis summary for {len(articles)} articles.") | |
# Sentiment overview | |
if positive_count > negative_count: | |
summary_parts.append(f"Overall sentiment is predominantly positive, with {positive_count} positive articles, {negative_count} negative, and {neutral_count} neutral.") | |
elif negative_count > positive_count: | |
summary_parts.append(f"Overall sentiment is predominantly negative, with {negative_count} negative articles, {positive_count} positive, and {neutral_count} neutral.") | |
else: | |
summary_parts.append(f"Sentiment is mixed with balanced coverage across {positive_count} positive, {negative_count} negative, and {neutral_count} neutral articles.") | |
# Top stories | |
# Most positive story | |
positive_articles = sorted(articles, key=lambda x: x.get('sentiment', {}).get('compound', 0), reverse=True) | |
if positive_articles and positive_articles[0].get('sentiment', {}).get('compound', 0) > 0.1: | |
top_positive = positive_articles[0] | |
summary_parts.append(f"Most positive coverage: {top_positive.get('title', '')[:100]}") | |
# Most negative story | |
negative_articles = sorted(articles, key=lambda x: x.get('sentiment', {}).get('compound', 0)) | |
if negative_articles and negative_articles[0].get('sentiment', {}).get('compound', 0) < -0.1: | |
top_negative = negative_articles[0] | |
summary_parts.append(f"Most concerning coverage: {top_negative.get('title', '')[:100]}") | |
# Recent developments (if we have dates) | |
recent_articles = [a for a in articles if a.get('date')] | |
if recent_articles: | |
recent_articles.sort(key=lambda x: x.get('date', ''), reverse=True) | |
if len(recent_articles) > 0: | |
summary_parts.append(f"Latest development: {recent_articles[0].get('title', '')[:100]}") | |
# Closing | |
summary_parts.append("This concludes the news analysis summary.") | |
# Join all parts | |
full_summary = " ".join(summary_parts) | |
# Ensure reasonable length | |
if len(full_summary) > 1000: | |
# Truncate to first few sentences | |
sentences = full_summary.split('. ') | |
truncated = '. '.join(sentences[:8]) + '.' | |
return truncated | |
return full_summary | |
except Exception as e: | |
logger.error(f"Audio summary creation failed: {str(e)}") | |
return f"Analysis complete for {len(articles)} articles with mixed sentiment coverage." | |
def cleanup_cache(self, max_age_hours: int = 24): | |
"""Clean up old audio files from cache""" | |
try: | |
if not os.path.exists(self.cache_dir): | |
return | |
current_time = datetime.now().timestamp() | |
max_age_seconds = max_age_hours * 3600 | |
removed_count = 0 | |
for filename in os.listdir(self.cache_dir): | |
filepath = os.path.join(self.cache_dir, filename) | |
if os.path.isfile(filepath): | |
file_age = current_time - os.path.getmtime(filepath) | |
if file_age > max_age_seconds: | |
try: | |
os.remove(filepath) | |
removed_count += 1 | |
# Remove from cache dict as well | |
cache_keys_to_remove = [k for k, v in self.audio_cache.items() if v == filepath] | |
for key in cache_keys_to_remove: | |
del self.audio_cache[key] | |
except Exception as e: | |
logger.error(f"Failed to remove old audio file {filepath}: {str(e)}") | |
if removed_count > 0: | |
logger.info(f"Cleaned up {removed_count} old audio files") | |
except Exception as e: | |
logger.error(f"Cache cleanup failed: {str(e)}") | |
def get_cache_info(self) -> Dict[str, any]: | |
"""Get information about the audio cache""" | |
try: | |
cache_info = { | |
'cache_directory': self.cache_dir, | |
'cached_files': len(self.audio_cache), | |
'supported_languages': list(self.supported_languages.keys()), | |
'gtts_available': GTTS_AVAILABLE | |
} | |
if os.path.exists(self.cache_dir): | |
files = [f for f in os.listdir(self.cache_dir) if f.endswith('.mp3')] | |
cache_info['physical_files'] = len(files) | |
total_size = sum(os.path.getsize(os.path.join(self.cache_dir, f)) for f in files) | |
cache_info['total_size_bytes'] = total_size | |
cache_info['total_size_mb'] = round(total_size / (1024 * 1024), 2) | |
return cache_info | |
except Exception as e: | |
logger.error(f"Cache info retrieval failed: {str(e)}") | |
return {'error': str(e)} | |
def is_language_supported(self, language: str) -> bool: | |
"""Check if a language is supported for audio generation""" | |
return language in self.supported_languages and GTTS_AVAILABLE |