File size: 5,017 Bytes
b45ba66 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# analyzer.py (Key Point Extraction)
from transformers import pipeline
import re
from datetime import datetime, timedelta
import config
# Load NLP model
summarizer = pipeline("summarization", model="philschmid/bart-large-cnn-samsum")
class MeetingAnalyzer:
def __init__(self):
self.transcript_chunks = []
self.speakers = {}
self.current_speaker = "Unknown"
self.action_items = []
self.decisions = []
def process_chunk(self, text_chunk):
self.transcript_chunks.append(text_chunk)
# Simple speaker detection
if ":" in text_chunk:
speaker, content = text_chunk.split(":", 1)
self.current_speaker = speaker.strip()
if self.current_speaker not in self.speakers:
self.speakers[self.current_speaker] = []
self.speakers[self.current_speaker].append(content.strip())
def generate_summary(self):
full_text = " ".join(self.transcript_chunks)
# If text is too short, skip summarization
if len(full_text.split()) < 50:
return "Not enough content for summary"
# Generate summary in chunks for long meetings
max_chunk_size = 1000
chunks = [full_text[i:i+max_chunk_size] for i in range(0, len(full_text), max_chunk_size)]
summaries = []
for chunk in chunks:
summary = summarizer(
chunk,
max_length=config.SUMMARY_MAX_LENGTH,
min_length=config.SUMMARY_MIN_LENGTH,
do_sample=False
)[0]['summary_text']
summaries.append(summary)
return " ".join(summaries)
def extract_action_items(self):
full_text = " ".join(self.transcript_chunks)
action_items = []
# Pattern matching for action items
patterns = [
r"(\bwill\b.*?\bby\b\s+\w+\s+\d{1,2})",
r"(\baction\b:\s*(.*?)(?:\bdeadline\b|\bby\b|\bfor\b)\s*(\w+\s+\d{1,2}))",
r"(\btodo\b:\s*(.*?)(?:\bdue\b|\bby\b)\s*(\w+\s+\d{1,2}))",
r"(\bassign(?:ed)? to\b\s+(\w+):\s*(.*?)(?:\bdeadline\b|\bby\b)\s*(\w+\s+\d{1,2}))"
]
for pattern in patterns:
for match in re.finditer(pattern, full_text, re.IGNORECASE):
groups = match.groups()
if groups:
# Different patterns have different group structures
if len(groups) == 1:
task = groups[0]
owner = "Unassigned"
deadline = "ASAP"
elif len(groups) == 3:
task = groups[1]
owner = groups[0]
deadline = groups[2]
else:
task = groups[0]
owner = "Unassigned"
deadline = "ASAP"
action_items.append({
"task": task.strip(),
"owner": owner.strip(),
"deadline": self.normalize_deadline(deadline.strip())
})
return action_items
def detect_urgent_action_items(self):
urgent_items = []
for item in self.action_items:
if "urgent" in item['task'].lower() or "asap" in item['deadline'].lower():
urgent_items.append(item)
return urgent_items
def extract_decisions(self):
full_text = " ".join(self.transcript_chunks)
decisions = []
# Pattern matching for decisions
patterns = [
r"\bdecided to\b (.*?)[\.\n]",
r"\bagreed that\b (.*?)[\.\n]",
r"\bconsensus is\b (.*?)[\.\n]",
r"\bresolution\b: (.*?)[\.\n]"
]
for pattern in patterns:
for match in re.finditer(pattern, full_text, re.IGNORECASE):
decision = match.group(1).strip()
decisions.append(decision)
return decisions
def normalize_deadline(self, deadline_str):
today = datetime.now()
lower_str = deadline_str.lower()
if "today" in lower_str:
return today.strftime("%Y-%m-%d")
elif "tomorrow" in lower_str:
return (today + timedelta(days=1)).strftime("%Y-%m-%d")
elif "next week" in lower_str:
return (today + timedelta(weeks=1)).strftime("%Y-%m-%d")
elif "eod" in lower_str:
return today.strftime("%Y-%m-%d")
elif "eow" in lower_str:
# Find next Friday
days_ahead = 4 - today.weekday() # 0 = Monday, 4 = Friday
if days_ahead <= 0: # If today is Friday or weekend
days_ahead += 7
return (today + timedelta(days=days_ahead)).strftime("%Y-%m-%d")
return deadline_str |