File size: 14,529 Bytes
8f8d0f6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
import logging
import os
import tempfile
from typing import Dict, List, Optional
import hashlib
from datetime import datetime

# gTTS for text-to-speech
try:
    from gtts import gTTS
    GTTS_AVAILABLE = True
except ImportError:
    GTTS_AVAILABLE = False

logger = logging.getLogger(__name__)

class AudioGenerator:
    """Text-to-speech audio generation with multilingual support"""
    
    def __init__(self):
        self.supported_languages = {
            'English': 'en',
            'Hindi': 'hi', 
            'Tamil': 'ta'
        }
        
        # Audio cache directory
        self.cache_dir = tempfile.mkdtemp(prefix='news_audio_')
        self.audio_cache = {}
        
        logger.info(f"AudioGenerator initialized with cache directory: {self.cache_dir}")
        
        if not GTTS_AVAILABLE:
            logger.warning("gTTS not available. Audio generation will be limited.")
    
    def generate_audio(self, text: str, language: str = 'English', output_file: str = None) -> Optional[str]:
        """Generate audio from text"""
        if not text or not text.strip():
            logger.warning("Empty text provided for audio generation")
            return None
        
        if not GTTS_AVAILABLE:
            logger.error("gTTS not available for audio generation")
            return None
        
        try:
            # Get language code
            lang_code = self.supported_languages.get(language, 'en')
            
            # Create cache key
            cache_key = self._create_cache_key(text, language)
            
            # Check cache first
            if cache_key in self.audio_cache:
                cached_file = self.audio_cache[cache_key]
                if os.path.exists(cached_file):
                    logger.info(f"Using cached audio for {language}")
                    return cached_file
            
            # Generate output filename if not provided
            if not output_file:
                timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
                output_file = os.path.join(self.cache_dir, f"audio_{lang_code}_{timestamp}.mp3")
            elif not os.path.dirname(output_file):
                output_file = os.path.join(self.cache_dir, output_file)
            
            # Prepare text for TTS
            clean_text = self._prepare_text_for_tts(text)
            
            if not clean_text:
                logger.warning("No valid text for TTS after cleaning")
                return None
            
            # Generate audio using gTTS
            if lang_code in ['en', 'hi']:
                # gTTS supports English and Hindi directly
                tts = gTTS(text=clean_text, lang=lang_code, slow=False)
            elif lang_code == 'ta':
                # For Tamil, use English as fallback or try Tamil if available
                try:
                    tts = gTTS(text=clean_text, lang='ta', slow=False)
                except:
                    logger.warning("Tamil not supported in gTTS, using English")
                    tts = gTTS(text=clean_text, lang='en', slow=False)
            else:
                # Default to English
                tts = gTTS(text=clean_text, lang='en', slow=False)
            
            # Save audio file
            tts.save(output_file)
            
            # Verify file was created
            if os.path.exists(output_file) and os.path.getsize(output_file) > 0:
                # Cache the result
                self.audio_cache[cache_key] = output_file
                
                logger.info(f"Audio generated successfully: {output_file}")
                return output_file
            else:
                logger.error("Audio file was not created or is empty")
                return None
                
        except Exception as e:
            logger.error(f"Audio generation failed: {str(e)}")
            return None
    
    def _create_cache_key(self, text: str, language: str) -> str:
        """Create a cache key for the text and language combination"""
        try:
            combined = f"{text[:500]}_{language}"  # Use first 500 chars to avoid very long keys
            return hashlib.md5(combined.encode()).hexdigest()
        except Exception as e:
            logger.error(f"Cache key creation failed: {str(e)}")
            return f"default_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
    
    def _prepare_text_for_tts(self, text: str) -> str:
        """Prepare text for text-to-speech conversion"""
        if not text:
            return ""
        
        # Remove or replace problematic characters
        import re
        
        # Remove URLs
        text = re.sub(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\\(\\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', '', text)
        
        # Remove email addresses
        text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', '', text)
        
        # Replace multiple spaces with single space
        text = re.sub(r'\s+', ' ', text)
        
        # Remove excessive punctuation
        text = re.sub(r'[.]{3,}', '...', text)
        text = re.sub(r'[!]{2,}', '!', text)
        text = re.sub(r'[?]{2,}', '?', text)
        
        # Remove parenthetical citations and references
        text = re.sub(r'\([^)]*\)', '', text)
        text = re.sub(r'\[[^\]]*\]', '', text)
        
        # Limit text length for TTS (gTTS has limits)
        max_length = 5000  # Characters
        if len(text) > max_length:
            # Try to cut at sentence boundary
            sentences = re.split(r'[.!?]+', text[:max_length])
            if len(sentences) > 1:
                text = '. '.join(sentences[:-1]) + '.'
            else:
                text = text[:max_length] + '...'
        
        return text.strip()
    
    def generate_batch_audio(self, texts: Dict[str, str], language: str = 'English') -> Dict[str, str]:
        """Generate audio for multiple texts"""
        results = {}
        
        for key, text in texts.items():
            try:
                output_file = f"audio_{key}_{language.lower()}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3"
                audio_file = self.generate_audio(text, language, output_file)
                results[key] = audio_file
            except Exception as e:
                logger.error(f"Batch audio generation failed for {key}: {str(e)}")
                results[key] = None
        
        return results
    
    def generate_summary_audio(self, articles: List[Dict], languages: List[str] = None) -> Dict[str, str]:
        """Generate audio summaries for articles in multiple languages"""
        if languages is None:
            languages = ['English']
        
        audio_files = {}
        
        try:
            # Create overall summary text
            summary_text = self._create_audio_summary(articles)
            
            if not summary_text:
                logger.warning("No summary text created for audio")
                return audio_files
            
            # Generate audio for each language
            for language in languages:
                if language in self.supported_languages:
                    try:
                        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
                        output_file = f"summary_{language.lower()}_{timestamp}.mp3"
                        
                        audio_file = self.generate_audio(summary_text, language, output_file)
                        
                        if audio_file:
                            audio_files[language] = audio_file
                        else:
                            logger.warning(f"Failed to generate audio for {language}")
                            
                    except Exception as e:
                        logger.error(f"Audio generation failed for {language}: {str(e)}")
                        continue
                else:
                    logger.warning(f"Language {language} not supported for audio")
            
            return audio_files
            
        except Exception as e:
            logger.error(f"Summary audio generation failed: {str(e)}")
            return audio_files
    
    def _create_audio_summary(self, articles: List[Dict]) -> str:
        """Create a comprehensive audio summary from articles"""
        try:
            if not articles:
                return ""
            
            # Calculate sentiment distribution
            positive_count = sum(1 for article in articles if article.get('sentiment', {}).get('compound', 0) > 0.1)
            negative_count = sum(1 for article in articles if article.get('sentiment', {}).get('compound', 0) < -0.1)
            neutral_count = len(articles) - positive_count - negative_count
            
            # Start building summary
            summary_parts = []
            
            # Opening
            summary_parts.append(f"News analysis summary for {len(articles)} articles.")
            
            # Sentiment overview
            if positive_count > negative_count:
                summary_parts.append(f"Overall sentiment is predominantly positive, with {positive_count} positive articles, {negative_count} negative, and {neutral_count} neutral.")
            elif negative_count > positive_count:
                summary_parts.append(f"Overall sentiment is predominantly negative, with {negative_count} negative articles, {positive_count} positive, and {neutral_count} neutral.")
            else:
                summary_parts.append(f"Sentiment is mixed with balanced coverage across {positive_count} positive, {negative_count} negative, and {neutral_count} neutral articles.")
            
            # Top stories
            # Most positive story
            positive_articles = sorted(articles, key=lambda x: x.get('sentiment', {}).get('compound', 0), reverse=True)
            if positive_articles and positive_articles[0].get('sentiment', {}).get('compound', 0) > 0.1:
                top_positive = positive_articles[0]
                summary_parts.append(f"Most positive coverage: {top_positive.get('title', '')[:100]}")
            
            # Most negative story
            negative_articles = sorted(articles, key=lambda x: x.get('sentiment', {}).get('compound', 0))
            if negative_articles and negative_articles[0].get('sentiment', {}).get('compound', 0) < -0.1:
                top_negative = negative_articles[0]
                summary_parts.append(f"Most concerning coverage: {top_negative.get('title', '')[:100]}")
            
            # Recent developments (if we have dates)
            recent_articles = [a for a in articles if a.get('date')]
            if recent_articles:
                recent_articles.sort(key=lambda x: x.get('date', ''), reverse=True)
                if len(recent_articles) > 0:
                    summary_parts.append(f"Latest development: {recent_articles[0].get('title', '')[:100]}")
            
            # Closing
            summary_parts.append("This concludes the news analysis summary.")
            
            # Join all parts
            full_summary = " ".join(summary_parts)
            
            # Ensure reasonable length
            if len(full_summary) > 1000:
                # Truncate to first few sentences
                sentences = full_summary.split('. ')
                truncated = '. '.join(sentences[:8]) + '.'
                return truncated
            
            return full_summary
            
        except Exception as e:
            logger.error(f"Audio summary creation failed: {str(e)}")
            return f"Analysis complete for {len(articles)} articles with mixed sentiment coverage."
    
    def cleanup_cache(self, max_age_hours: int = 24):
        """Clean up old audio files from cache"""
        try:
            if not os.path.exists(self.cache_dir):
                return
            
            current_time = datetime.now().timestamp()
            max_age_seconds = max_age_hours * 3600
            
            removed_count = 0
            
            for filename in os.listdir(self.cache_dir):
                filepath = os.path.join(self.cache_dir, filename)
                
                if os.path.isfile(filepath):
                    file_age = current_time - os.path.getmtime(filepath)
                    
                    if file_age > max_age_seconds:
                        try:
                            os.remove(filepath)
                            removed_count += 1
                            
                            # Remove from cache dict as well
                            cache_keys_to_remove = [k for k, v in self.audio_cache.items() if v == filepath]
                            for key in cache_keys_to_remove:
                                del self.audio_cache[key]
                                
                        except Exception as e:
                            logger.error(f"Failed to remove old audio file {filepath}: {str(e)}")
            
            if removed_count > 0:
                logger.info(f"Cleaned up {removed_count} old audio files")
                
        except Exception as e:
            logger.error(f"Cache cleanup failed: {str(e)}")
    
    def get_cache_info(self) -> Dict[str, any]:
        """Get information about the audio cache"""
        try:
            cache_info = {
                'cache_directory': self.cache_dir,
                'cached_files': len(self.audio_cache),
                'supported_languages': list(self.supported_languages.keys()),
                'gtts_available': GTTS_AVAILABLE
            }
            
            if os.path.exists(self.cache_dir):
                files = [f for f in os.listdir(self.cache_dir) if f.endswith('.mp3')]
                cache_info['physical_files'] = len(files)
                
                total_size = sum(os.path.getsize(os.path.join(self.cache_dir, f)) for f in files)
                cache_info['total_size_bytes'] = total_size
                cache_info['total_size_mb'] = round(total_size / (1024 * 1024), 2)
            
            return cache_info
            
        except Exception as e:
            logger.error(f"Cache info retrieval failed: {str(e)}")
            return {'error': str(e)}
    
    def is_language_supported(self, language: str) -> bool:
        """Check if a language is supported for audio generation"""
        return language in self.supported_languages and GTTS_AVAILABLE