import re from typing import List, Dict, Optional from pathlib import Path from collections import defaultdict from dataclasses import dataclass import fitz # PyMuPDF from docx import Document from sentence_transformers import SentenceTransformer from sklearn.feature_extraction.text import TfidfVectorizer @dataclass class DocumentChunk: chunk_id: int text: str embedding: List[float] metadata: Dict class DocumentChunker: def __init__(self): self.embed_model = SentenceTransformer("all-MiniLM-L6-v2") self.category_patterns = { "Project Summary": [r"\bsummary\b", r"\bproject overview\b"], "Contact Information": [r"\bcontact\b", r"\bemail\b", r"\bphone\b", r"\baddress\b"], "Problem/ Need": [r"\bproblem\b", r"\bneed\b", r"\bchallenge\b"], "Mission Statement": [r"\bmission\b", r"\bvision\b"], "Fit or Alignment to Grant": [r"\balignment\b", r"\bfit\b", r"\bgrant (focus|priority)\b"], "Goals/ Vision / Objectives": [r"\bgoals?\b", r"\bobjectives?\b", r"\bvision\b"], "Our Solution *PROGRAMS* and Approach": [r"\bsolution\b", r"\bprogram\b", r"\bapproach\b"], "Impact, Results, or Outcomes": [r"\bimpact\b", r"\bresults?\b", r"\boutcomes?\b"], "Beneficiaries": [r"\bbeneficiaries\b", r"\bwho we serve\b", r"\btarget audience\b"], "Differentiation with Competitors": [r"\bcompetitor\b", r"\bdifferent\b", r"\bvalue proposition\b"], "Plan and Timeline": [r"\btimeline\b", r"\bschedule\b", r"\bmilestone\b"], "Budget and Funding": [r"\bbudget\b", r"\bfunding\b", r"\bcost\b"], "Sustainability and Strategy": [r"\bsustainability\b", r"\bexit strategy\b"], "Organization's History": [r"\bhistory\b", r"\borganization background\b"], "Team Member Descriptions": [r"\bteam\b", r"\bstaff\b", r"\blived experience\b"], } self.patterns = { 'grant_application': { 'header_patterns': [r'\*\*([^*]+)\*\*', r'^([A-Z][^a-z]*[A-Z])$', r'^([A-Z][A-Za-z\s]+)$'], 'question_patterns': [r'^.+\?$', r'^\*?Please .+', r'^How .+', r'^What .+', r'^Describe .+'] } } def match_category(self, text: str, return_first: bool = True) -> Optional[str] or List[str]: lower_text = text.lower() match_scores = defaultdict(int) for category, patterns in self.category_patterns.items(): for pattern in patterns: matches = re.findall(pattern, lower_text) match_scores[category] += len(matches) if not match_scores: return None if return_first else [] sorted_categories = sorted(match_scores.items(), key=lambda x: -x[1]) return sorted_categories[0][0] if return_first else [cat for cat, _ in sorted_categories if match_scores[cat] > 0] def extract_text(self, file_path: str) -> str: if file_path.endswith(".docx"): doc = Document(file_path) return '\n'.join([f"**{p.text}**" if any(r.bold for r in p.runs) else p.text for p in doc.paragraphs]) elif file_path.endswith(".pdf"): text = "" with fitz.open(file_path) as doc: for page in doc: text += page.get_text("text") # More accurate reading order return text else: return Path(file_path).read_text() def detect_document_type(self, text: str) -> str: keywords = ['grant', 'funding', 'mission'] return 'grant_application' if sum(k in text.lower() for k in keywords) >= 2 else 'generic' def extract_headers(self, text: str, doc_type: str) -> List[Dict]: lines = text.split('\n') headers = [] patterns = self.patterns.get(doc_type, self.patterns['grant_application']) for i, line in enumerate(lines): line = line.strip("* ") if any(re.match(p, line, re.IGNORECASE) for p in patterns['question_patterns']): headers.append({'text': line, 'line_number': i, 'pattern_type': 'question'}) elif any(re.match(p, line) for p in patterns['header_patterns']): headers.append({'text': line, 'line_number': i, 'pattern_type': 'header'}) return headers def fallback_chunking(self, text: str, max_words=150, stride=100) -> List[Dict]: words = text.split() chunks = [] for i in range(0, len(words), stride): chunk_text = ' '.join(words[i:i + max_words]) if len(chunk_text.split()) < 20: continue chunks.append({ 'chunk_id': len(chunks) + 1, 'header': '', 'questions': [], 'content': chunk_text, 'pattern_type': 'fallback', 'split_index': i // stride }) return chunks def chunk_by_headers(self, text: str, headers: List[Dict], max_words=150) -> List[Dict]: lines = text.split('\n') chunks = [] for i, header in enumerate(headers): start, end = header['line_number'], headers[i + 1]['line_number'] if i + 1 < len(headers) else len(lines) content_lines = lines[start + 1:end] questions = [l.strip() for l in content_lines if l.strip().endswith('?') and len(l.split()) <= 20] content = ' '.join([l.strip() for l in content_lines if l.strip() and l.strip() not in questions]) for j in range(0, len(content.split()), max_words): chunk_text = ' '.join(content.split()[j:j + max_words]) if len(chunk_text.split()) < 20: continue chunks.append({ 'chunk_id': len(chunks) + 1, 'header': header['text'] if header['pattern_type'] == 'header' else '', 'questions': questions if header['pattern_type'] == 'question' else [], 'content': chunk_text, 'pattern_type': header['pattern_type'], 'split_index': j // max_words }) return chunks def extract_topics_tfidf(self, text: str, max_features: int = 3) -> List[str]: clean = re.sub(r'[^a-z0-9\s]', ' ', text.lower()) vectorizer = TfidfVectorizer(max_features=max_features * 2, stop_words='english') tfidf = vectorizer.fit_transform([clean]) terms = vectorizer.get_feature_names_out() scores = tfidf.toarray()[0] top_terms = [term for term, score in sorted(zip(terms, scores), key=lambda x: -x[1]) if score > 0] return top_terms[:max_features] def calculate_confidence_score(self, chunk: Dict) -> float: score = 0.0 if chunk.get('header'): score += 0.3 if chunk.get('content') and len(chunk['content'].split()) > 20: score += 0.3 if chunk.get('questions'): score += 0.2 return min(score, 1.0) def process_document(self, file_path: str, title: Optional[str] = None) -> List[Dict]: file_path = Path(file_path) text = self.extract_text(str(file_path)) doc_type = self.detect_document_type(text) headers = self.extract_headers(text, doc_type) chunks = self.chunk_by_headers(text, headers) if not chunks: chunks = self.fallback_chunking(text) final_chunks = [] for chunk in chunks: full_text = f"{chunk['header']} {' '.join(chunk['questions'])} {chunk['content']}".strip() category = self.match_category(full_text, return_first=True) categories = self.match_category(full_text, return_first=False) embedding = self.embed_model.encode(full_text).tolist() topics = self.extract_topics_tfidf(full_text) confidence = self.calculate_confidence_score(chunk) final_chunks.append({ "chunk_id": chunk['chunk_id'], "text": full_text, "embedding": embedding, "metadata": { **chunk, "title": title or file_path.name, "category": category, "categories": categories, "topics": topics, "chunking_strategy": chunk['pattern_type'], "confidence_score": confidence } }) return final_chunks # import re # from typing import List, Dict, Optional # from pathlib import Path # from collections import defaultdict # from dataclasses import dataclass # from docx import Document # from sentence_transformers import SentenceTransformer # from sklearn.feature_extraction.text import TfidfVectorizer # import fitz # PyMuPDF # @dataclass # class DocumentChunk: # chunk_id: int # text: str # embedding: List[float] # metadata: Dict # class DocumentChunker: # def __init__(self): # self.embed_model = SentenceTransformer("all-MiniLM-L6-v2") # self.category_patterns = { # "Project Summary": [r"\bsummary\b", r"\bproject overview\b"], # "Contact Information": [r"\bcontact\b", r"\bemail\b", r"\bphone\b", r"\baddress\b"], # "Problem/ Need": [r"\bproblem\b", r"\bneed\b", r"\bchallenge\b"], # "Mission Statement": [r"\bmission\b", r"\bvision\b"], # "Fit or Alignment to Grant": [r"\balignment\b", r"\bfit\b", r"\bgrant (focus|priority)\b"], # "Goals/ Vision / Objectives": [r"\bgoals?\b", r"\bobjectives?\b", r"\bvision\b"], # "Our Solution *PROGRAMS* and Approach": [r"\bsolution\b", r"\bprogram\b", r"\bapproach\b"], # "Impact, Results, or Outcomes": [r"\bimpact\b", r"\bresults?\b", r"\boutcomes?\b"], # "Beneficiaries": [r"\bbeneficiaries\b", r"\bwho we serve\b", r"\btarget audience\b"], # "Differentiation with Competitors": [r"\bcompetitor\b", r"\bdifferent\b", r"\bvalue proposition\b"], # "Plan and Timeline": [r"\btimeline\b", r"\bschedule\b", r"\bmilestone\b"], # "Budget and Funding": [r"\bbudget\b", r"\bfunding\b", r"\bcost\b"], # "Sustainability and Strategy": [r"\bsustainability\b", r"\bexit strategy\b"], # "Organization's History": [r"\bhistory\b", r"\borganization background\b"], # "Team Member Descriptions": [r"\bteam\b", r"\bstaff\b", r"\blived experience\b"], # } # self.patterns = { # 'grant_application': { # 'header_patterns': [ # r'\*\*([^*]+)\*\*', # r'^([A-Z][^a-z]*[A-Z])$', # r'^([A-Z][A-Za-z\s]+)$', # ], # 'question_patterns': [ # r'^.+\?$', # r'^\*?Please .+', # r'^How .+', # r'^What .+', # r'^Describe .+', # ] # } # } # def extract_text(self, file_path: str) -> str: # if file_path.endswith(".docx"): # doc = Document(file_path) # return '\n'.join([f"**{p.text}**" if any(r.bold for r in p.runs) else p.text for p in doc.paragraphs]) # elif file_path.endswith(".pdf"): # text = "" # with fitz.open(file_path) as doc: # for page in doc: # text += page.get_text() # return text # elif file_path.endswith(".txt"): # return Path(file_path).read_text() # else: # raise ValueError("Unsupported file format") # def detect_document_type(self, text: str) -> str: # keywords = ['grant', 'funding', 'mission'] # return 'grant_application' if sum(k in text.lower() for k in keywords) >= 2 else 'generic' # def extract_headers(self, text: str, doc_type: str) -> List[Dict]: # lines = text.split('\n') # headers = [] # patterns = self.patterns.get(doc_type, self.patterns['grant_application']) # for i, line in enumerate(lines): # line = line.strip("* ") # if any(re.match(p, line, re.IGNORECASE) for p in patterns['question_patterns']): # headers.append({'text': line, 'line_number': i, 'pattern_type': 'question'}) # elif any(re.match(p, line) for p in patterns['header_patterns']): # headers.append({'text': line, 'line_number': i, 'pattern_type': 'header'}) # return headers # def chunk_by_headers(self, text: str, headers: List[Dict], max_words=150) -> List[Dict]: # lines = text.split('\n') # chunks = [] # if not headers: # words = text.split() # for i in range(0, len(words), max_words): # piece = ' '.join(words[i:i + max_words]) # chunks.append({ # 'chunk_id': len(chunks) + 1, # 'header': '', # 'questions': [], # 'content': piece, # 'pattern_type': 'auto' # }) # return chunks # for i, header in enumerate(headers): # start, end = header['line_number'], headers[i + 1]['line_number'] if i + 1 < len(headers) else len(lines) # content_lines = lines[start + 1:end] # questions = [l.strip() for l in content_lines if l.strip().endswith('?') and len(l.split()) <= 20] # content = ' '.join([l.strip() for l in content_lines if l.strip() and l.strip() not in questions]) # for j in range(0, len(content.split()), max_words): # chunk_text = ' '.join(content.split()[j:j + max_words]) # chunks.append({ # 'chunk_id': len(chunks) + 1, # 'header': header['text'] if header['pattern_type'] == 'header' else '', # 'questions': questions if header['pattern_type'] == 'question' else [], # 'content': chunk_text, # 'pattern_type': header['pattern_type'], # 'split_index': j // max_words # }) # return chunks # def match_category(self, text: str, return_first: bool = True) -> Optional[str] or List[str]: # lower_text = text.lower() # match_scores = defaultdict(int) # for category, patterns in self.category_patterns.items(): # for pattern in patterns: # matches = re.findall(pattern, lower_text) # match_scores[category] += len(matches) # if not match_scores: # return None if return_first else [] # sorted_categories = sorted(match_scores.items(), key=lambda x: -x[1]) # return sorted_categories[0][0] if return_first else [cat for cat, _ in sorted_categories if match_scores[cat] > 0] # def extract_topics_tfidf(self, text: str, max_features: int = 3) -> List[str]: # clean = re.sub(r'[^\w\s]', ' ', text.lower()) # vectorizer = TfidfVectorizer(max_features=max_features * 2, stop_words='english') # tfidf = vectorizer.fit_transform([clean]) # terms = vectorizer.get_feature_names_out() # scores = tfidf.toarray()[0] # top_terms = [term for term, score in sorted(zip(terms, scores), key=lambda x: -x[1]) if score > 0] # return top_terms[:max_features] # def calculate_confidence_score(self, chunk: Dict) -> float: # score = 0.0 # if chunk.get('header'): score += 0.3 # if chunk.get('content') and len(chunk['content'].split()) > 20: score += 0.3 # if chunk.get('questions'): score += 0.2 # return min(score, 1.0) # def process_document(self, file_path: str, title: Optional[str] = None) -> List[Dict]: # file_path = Path(file_path) # text = self.extract_text(str(file_path)) # doc_type = self.detect_document_type(text) # headers = self.extract_headers(text, doc_type) # raw_chunks = self.chunk_by_headers(text, headers) # final_chunks = [] # for chunk in raw_chunks: # full_text = f"{chunk['header']} {' '.join(chunk['questions'])} {chunk['content']}".strip() # category = self.match_category(full_text, return_first=True) # categories = self.match_category(full_text, return_first=False) # embedding = self.embed_model.encode(full_text).tolist() # topics = self.extract_topics_tfidf(full_text) # confidence = self.calculate_confidence_score(chunk) # final_chunks.append({ # "chunk_id": chunk['chunk_id'], # "text": full_text, # "embedding": embedding, # "metadata": { # **chunk, # "title": title or file_path.name, # "category": category, # "categories": categories, # "topics": topics, # "confidence_score": confidence # } # }) # return final_chunks