MemoirAI / analyzer.py
gaur3009's picture
Create analyzer.py
b45ba66 verified
raw
history blame
5.02 kB
# analyzer.py (Key Point Extraction)
from transformers import pipeline
import re
from datetime import datetime, timedelta
import config
# Load NLP model
summarizer = pipeline("summarization", model="philschmid/bart-large-cnn-samsum")
class MeetingAnalyzer:
def __init__(self):
self.transcript_chunks = []
self.speakers = {}
self.current_speaker = "Unknown"
self.action_items = []
self.decisions = []
def process_chunk(self, text_chunk):
self.transcript_chunks.append(text_chunk)
# Simple speaker detection
if ":" in text_chunk:
speaker, content = text_chunk.split(":", 1)
self.current_speaker = speaker.strip()
if self.current_speaker not in self.speakers:
self.speakers[self.current_speaker] = []
self.speakers[self.current_speaker].append(content.strip())
def generate_summary(self):
full_text = " ".join(self.transcript_chunks)
# If text is too short, skip summarization
if len(full_text.split()) < 50:
return "Not enough content for summary"
# Generate summary in chunks for long meetings
max_chunk_size = 1000
chunks = [full_text[i:i+max_chunk_size] for i in range(0, len(full_text), max_chunk_size)]
summaries = []
for chunk in chunks:
summary = summarizer(
chunk,
max_length=config.SUMMARY_MAX_LENGTH,
min_length=config.SUMMARY_MIN_LENGTH,
do_sample=False
)[0]['summary_text']
summaries.append(summary)
return " ".join(summaries)
def extract_action_items(self):
full_text = " ".join(self.transcript_chunks)
action_items = []
# Pattern matching for action items
patterns = [
r"(\bwill\b.*?\bby\b\s+\w+\s+\d{1,2})",
r"(\baction\b:\s*(.*?)(?:\bdeadline\b|\bby\b|\bfor\b)\s*(\w+\s+\d{1,2}))",
r"(\btodo\b:\s*(.*?)(?:\bdue\b|\bby\b)\s*(\w+\s+\d{1,2}))",
r"(\bassign(?:ed)? to\b\s+(\w+):\s*(.*?)(?:\bdeadline\b|\bby\b)\s*(\w+\s+\d{1,2}))"
]
for pattern in patterns:
for match in re.finditer(pattern, full_text, re.IGNORECASE):
groups = match.groups()
if groups:
# Different patterns have different group structures
if len(groups) == 1:
task = groups[0]
owner = "Unassigned"
deadline = "ASAP"
elif len(groups) == 3:
task = groups[1]
owner = groups[0]
deadline = groups[2]
else:
task = groups[0]
owner = "Unassigned"
deadline = "ASAP"
action_items.append({
"task": task.strip(),
"owner": owner.strip(),
"deadline": self.normalize_deadline(deadline.strip())
})
return action_items
def detect_urgent_action_items(self):
urgent_items = []
for item in self.action_items:
if "urgent" in item['task'].lower() or "asap" in item['deadline'].lower():
urgent_items.append(item)
return urgent_items
def extract_decisions(self):
full_text = " ".join(self.transcript_chunks)
decisions = []
# Pattern matching for decisions
patterns = [
r"\bdecided to\b (.*?)[\.\n]",
r"\bagreed that\b (.*?)[\.\n]",
r"\bconsensus is\b (.*?)[\.\n]",
r"\bresolution\b: (.*?)[\.\n]"
]
for pattern in patterns:
for match in re.finditer(pattern, full_text, re.IGNORECASE):
decision = match.group(1).strip()
decisions.append(decision)
return decisions
def normalize_deadline(self, deadline_str):
today = datetime.now()
lower_str = deadline_str.lower()
if "today" in lower_str:
return today.strftime("%Y-%m-%d")
elif "tomorrow" in lower_str:
return (today + timedelta(days=1)).strftime("%Y-%m-%d")
elif "next week" in lower_str:
return (today + timedelta(weeks=1)).strftime("%Y-%m-%d")
elif "eod" in lower_str:
return today.strftime("%Y-%m-%d")
elif "eow" in lower_str:
# Find next Friday
days_ahead = 4 - today.weekday() # 0 = Monday, 4 = Friday
if days_ahead <= 0: # If today is Friday or weekend
days_ahead += 7
return (today + timedelta(days=days_ahead)).strftime("%Y-%m-%d")
return deadline_str