# analyzer.py (Key Point Extraction) from transformers import pipeline import re from datetime import datetime, timedelta import config # Load NLP model summarizer = pipeline("summarization", model="philschmid/bart-large-cnn-samsum") class MeetingAnalyzer: def __init__(self): self.transcript_chunks = [] self.speakers = {} self.current_speaker = "Unknown" self.action_items = [] self.decisions = [] def process_chunk(self, text_chunk): self.transcript_chunks.append(text_chunk) # Simple speaker detection if ":" in text_chunk: speaker, content = text_chunk.split(":", 1) self.current_speaker = speaker.strip() if self.current_speaker not in self.speakers: self.speakers[self.current_speaker] = [] self.speakers[self.current_speaker].append(content.strip()) def generate_summary(self): full_text = " ".join(self.transcript_chunks) # If text is too short, skip summarization if len(full_text.split()) < 50: return "Not enough content for summary" # Generate summary in chunks for long meetings max_chunk_size = 1000 chunks = [full_text[i:i+max_chunk_size] for i in range(0, len(full_text), max_chunk_size)] summaries = [] for chunk in chunks: summary = summarizer( chunk, max_length=config.SUMMARY_MAX_LENGTH, min_length=config.SUMMARY_MIN_LENGTH, do_sample=False )[0]['summary_text'] summaries.append(summary) return " ".join(summaries) def extract_action_items(self): full_text = " ".join(self.transcript_chunks) action_items = [] # Pattern matching for action items patterns = [ r"(\bwill\b.*?\bby\b\s+\w+\s+\d{1,2})", r"(\baction\b:\s*(.*?)(?:\bdeadline\b|\bby\b|\bfor\b)\s*(\w+\s+\d{1,2}))", r"(\btodo\b:\s*(.*?)(?:\bdue\b|\bby\b)\s*(\w+\s+\d{1,2}))", r"(\bassign(?:ed)? to\b\s+(\w+):\s*(.*?)(?:\bdeadline\b|\bby\b)\s*(\w+\s+\d{1,2}))" ] for pattern in patterns: for match in re.finditer(pattern, full_text, re.IGNORECASE): groups = match.groups() if groups: # Different patterns have different group structures if len(groups) == 1: task = groups[0] owner = "Unassigned" deadline = "ASAP" elif len(groups) == 3: task = groups[1] owner = groups[0] deadline = groups[2] else: task = groups[0] owner = "Unassigned" deadline = "ASAP" action_items.append({ "task": task.strip(), "owner": owner.strip(), "deadline": self.normalize_deadline(deadline.strip()) }) return action_items def detect_urgent_action_items(self): urgent_items = [] for item in self.action_items: if "urgent" in item['task'].lower() or "asap" in item['deadline'].lower(): urgent_items.append(item) return urgent_items def extract_decisions(self): full_text = " ".join(self.transcript_chunks) decisions = [] # Pattern matching for decisions patterns = [ r"\bdecided to\b (.*?)[\.\n]", r"\bagreed that\b (.*?)[\.\n]", r"\bconsensus is\b (.*?)[\.\n]", r"\bresolution\b: (.*?)[\.\n]" ] for pattern in patterns: for match in re.finditer(pattern, full_text, re.IGNORECASE): decision = match.group(1).strip() decisions.append(decision) return decisions def normalize_deadline(self, deadline_str): today = datetime.now() lower_str = deadline_str.lower() if "today" in lower_str: return today.strftime("%Y-%m-%d") elif "tomorrow" in lower_str: return (today + timedelta(days=1)).strftime("%Y-%m-%d") elif "next week" in lower_str: return (today + timedelta(weeks=1)).strftime("%Y-%m-%d") elif "eod" in lower_str: return today.strftime("%Y-%m-%d") elif "eow" in lower_str: # Find next Friday days_ahead = 4 - today.weekday() # 0 = Monday, 4 = Friday if days_ahead <= 0: # If today is Friday or weekend days_ahead += 7 return (today + timedelta(days=days_ahead)).strftime("%Y-%m-%d") return deadline_str