# app.py (Gradio UI Application) import gradio as gr import time import threading import queue from audio_capture import AudioRecorder from transcriber import SpeechTranscriber from analyzer import MeetingAnalyzer from integrations import Notifier import config class MeetingProcessor: def __init__(self): self.recorder = AudioRecorder() self.transcriber = SpeechTranscriber() self.analyzer = MeetingAnalyzer() self.notifier = Notifier() self.transcript_queue = queue.Queue() self.running = False self.start_time = None self.transcript_history = [] self.summary = "" self.action_items = [] self.urgent_alerts = [] def start_processing(self): if self.running: return "Already running!" self.running = True self.start_time = time.time() self.transcript_history = [] self.summary = "" self.action_items = [] self.urgent_alerts = [] # Start processing threads threading.Thread(target=self._audio_capture_thread, daemon=True).start() threading.Thread(target=self._transcription_thread, daemon=True).start() threading.Thread(target=self._analysis_thread, daemon=True).start() return "Meeting processing started! 🎤" def _audio_capture_thread(self): self.recorder.start_recording() while self.running: chunk = self.recorder.get_audio_chunk() if chunk: self.transcriber.add_audio_chunk(chunk) time.sleep(0.1) def _transcription_thread(self): while self.running: transcript = self.transcriber.get_transcript_chunk() if transcript: self.transcript_queue.put(transcript) self.transcript_history.append(transcript) time.sleep(0.5) def _analysis_thread(self): while self.running: try: transcript = self.transcript_queue.get(timeout=1.0) self.analyzer.process_chunk(transcript) # Check for urgent items periodically if len(self.transcript_history) % 5 == 0: urgent_items = self.analyzer.detect_urgent_action_items() if urgent_items: self.urgent_alerts.extend(urgent_items) self.notifier.send_urgent_alert(urgent_items) except queue.Empty: continue def stop_processing(self): if not self.running: return "Not running!" self.running = False self.recorder.stop_recording() # Generate final analysis self.summary = self.analyzer.generate_summary() self.action_items = self.analyzer.extract_action_items() # Send final report self.notifier.send_comprehensive_report( summary=self.summary, action_items=self.action_items, decisions=self.analyzer.extract_decisions(), transcript="\n".join(self.transcript_history), recipients=config.NOTIFICATION_RECIPIENTS ) return "Meeting processing stopped! Report sent. ✅" def get_current_status(self): if not self.running: return { "status": "Stopped", "duration": "00:00", "transcript": "", "summary": self.summary, "action_items": self.action_items, "alerts": self.urgent_alerts } elapsed = time.time() - self.start_time mins, secs = divmod(int(elapsed), 60) # Only show last 5 transcript entries recent_transcript = "\n".join(self.transcript_history[-5:]) return { "status": "Recording", "duration": f"{mins:02d}:{secs:02d}", "transcript": recent_transcript, "summary": self.summary if self.summary else "Summary will appear after meeting ends", "action_items": self.action_items, "alerts": self.urgent_alerts } # Initialize processor processor = MeetingProcessor() # Create Gradio interface with gr.Blocks(title="Real-Time Meeting Summarizer", theme="soft") as app: gr.Markdown("# 🎙️ Real-Time Meeting Summarizer") gr.Markdown("Start this during any meeting to get live transcription and automatic summaries") with gr.Row(): start_btn = gr.Button("Start Meeting", variant="primary") stop_btn = gr.Button("Stop Meeting", variant="stop") status_text = gr.Textbox(label="Status", interactive=False) with gr.Row(): with gr.Column(): duration_display = gr.Textbox(label="Duration", interactive=False) transcript_box = gr.Textbox(label="Live Transcript", lines=8, interactive=False) with gr.Column(): alerts_box = gr.Textbox(label="Urgent Alerts", lines=3, interactive=False) summary_box = gr.Textbox(label="Meeting Summary", lines=5, interactive=False) action_items_box = gr.Textbox(label="Action Items", lines=5, interactive=False) # State for live updates state = gr.State(value=processor.get_current_status()) # Update function for live components def update_components(): current_status = processor.get_current_status() return { duration_display: current_status["duration"], transcript_box: current_status["transcript"], summary_box: current_status["summary"], action_items_box: "\n".join( f"• {item['task']} (Owner: {item['owner']}, Deadline: {item['deadline']})" for item in current_status["action_items"] ), alerts_box: "\n".join( f"🚨 {item['task']} (Owner: {item['owner']}, Deadline: {item['deadline']})" for item in current_status["alerts"] ) if current_status["alerts"] else "No urgent alerts", state: current_status } # Button actions start_btn.click( fn=processor.start_processing, inputs=[], outputs=[status_text] ) stop_btn.click( fn=processor.stop_processing, inputs=[], outputs=[status_text] ) # Live updates app.load(update_components, inputs=[state], outputs=[ duration_display, transcript_box, summary_box, action_items_box, alerts_box, state ], every=1) if __name__ == "__main__": app.launch()