|
|
|
from transformers import pipeline |
|
import re |
|
from datetime import datetime, timedelta |
|
import config |
|
|
|
|
|
summarizer = pipeline("summarization", model="philschmid/bart-large-cnn-samsum") |
|
|
|
class MeetingAnalyzer: |
|
def __init__(self): |
|
self.transcript_chunks = [] |
|
self.speakers = {} |
|
self.current_speaker = "Unknown" |
|
self.action_items = [] |
|
self.decisions = [] |
|
|
|
def process_chunk(self, text_chunk): |
|
self.transcript_chunks.append(text_chunk) |
|
|
|
|
|
if ":" in text_chunk: |
|
speaker, content = text_chunk.split(":", 1) |
|
self.current_speaker = speaker.strip() |
|
if self.current_speaker not in self.speakers: |
|
self.speakers[self.current_speaker] = [] |
|
self.speakers[self.current_speaker].append(content.strip()) |
|
|
|
def generate_summary(self): |
|
full_text = " ".join(self.transcript_chunks) |
|
|
|
|
|
if len(full_text.split()) < 50: |
|
return "Not enough content for summary" |
|
|
|
|
|
max_chunk_size = 1000 |
|
chunks = [full_text[i:i+max_chunk_size] for i in range(0, len(full_text), max_chunk_size)] |
|
|
|
summaries = [] |
|
for chunk in chunks: |
|
summary = summarizer( |
|
chunk, |
|
max_length=config.SUMMARY_MAX_LENGTH, |
|
min_length=config.SUMMARY_MIN_LENGTH, |
|
do_sample=False |
|
)[0]['summary_text'] |
|
summaries.append(summary) |
|
|
|
return " ".join(summaries) |
|
|
|
def extract_action_items(self): |
|
full_text = " ".join(self.transcript_chunks) |
|
action_items = [] |
|
|
|
|
|
patterns = [ |
|
r"(\bwill\b.*?\bby\b\s+\w+\s+\d{1,2})", |
|
r"(\baction\b:\s*(.*?)(?:\bdeadline\b|\bby\b|\bfor\b)\s*(\w+\s+\d{1,2}))", |
|
r"(\btodo\b:\s*(.*?)(?:\bdue\b|\bby\b)\s*(\w+\s+\d{1,2}))", |
|
r"(\bassign(?:ed)? to\b\s+(\w+):\s*(.*?)(?:\bdeadline\b|\bby\b)\s*(\w+\s+\d{1,2}))" |
|
] |
|
|
|
for pattern in patterns: |
|
for match in re.finditer(pattern, full_text, re.IGNORECASE): |
|
groups = match.groups() |
|
if groups: |
|
|
|
if len(groups) == 1: |
|
task = groups[0] |
|
owner = "Unassigned" |
|
deadline = "ASAP" |
|
elif len(groups) == 3: |
|
task = groups[1] |
|
owner = groups[0] |
|
deadline = groups[2] |
|
else: |
|
task = groups[0] |
|
owner = "Unassigned" |
|
deadline = "ASAP" |
|
|
|
action_items.append({ |
|
"task": task.strip(), |
|
"owner": owner.strip(), |
|
"deadline": self.normalize_deadline(deadline.strip()) |
|
}) |
|
|
|
return action_items |
|
|
|
def detect_urgent_action_items(self): |
|
urgent_items = [] |
|
for item in self.action_items: |
|
if "urgent" in item['task'].lower() or "asap" in item['deadline'].lower(): |
|
urgent_items.append(item) |
|
return urgent_items |
|
|
|
def extract_decisions(self): |
|
full_text = " ".join(self.transcript_chunks) |
|
decisions = [] |
|
|
|
|
|
patterns = [ |
|
r"\bdecided to\b (.*?)[\.\n]", |
|
r"\bagreed that\b (.*?)[\.\n]", |
|
r"\bconsensus is\b (.*?)[\.\n]", |
|
r"\bresolution\b: (.*?)[\.\n]" |
|
] |
|
|
|
for pattern in patterns: |
|
for match in re.finditer(pattern, full_text, re.IGNORECASE): |
|
decision = match.group(1).strip() |
|
decisions.append(decision) |
|
|
|
return decisions |
|
|
|
def normalize_deadline(self, deadline_str): |
|
today = datetime.now() |
|
lower_str = deadline_str.lower() |
|
|
|
if "today" in lower_str: |
|
return today.strftime("%Y-%m-%d") |
|
elif "tomorrow" in lower_str: |
|
return (today + timedelta(days=1)).strftime("%Y-%m-%d") |
|
elif "next week" in lower_str: |
|
return (today + timedelta(weeks=1)).strftime("%Y-%m-%d") |
|
elif "eod" in lower_str: |
|
return today.strftime("%Y-%m-%d") |
|
elif "eow" in lower_str: |
|
|
|
days_ahead = 4 - today.weekday() |
|
if days_ahead <= 0: |
|
days_ahead += 7 |
|
return (today + timedelta(days=days_ahead)).strftime("%Y-%m-%d") |
|
|
|
return deadline_str |