Tesneem commited on
Commit
899d177
·
verified ·
1 Parent(s): 1944757

Create document_chunker.py

Browse files
Files changed (1) hide show
  1. document_chunker.py +175 -0
document_chunker.py ADDED
@@ -0,0 +1,175 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import re
2
+ from typing import List, Dict, Optional
3
+ from pathlib import Path
4
+ from collections import defaultdict
5
+ from dataclasses import dataclass
6
+
7
+ from docx import Document
8
+ from sentence_transformers import SentenceTransformer
9
+ from sklearn.feature_extraction.text import TfidfVectorizer
10
+
11
+
12
+ @dataclass
13
+ class DocumentChunk:
14
+ chunk_id: int
15
+ text: str
16
+ embedding: List[float]
17
+ metadata: Dict
18
+
19
+
20
+ class DocumentChunker:
21
+ def __init__(self):
22
+ self.embed_model = SentenceTransformer("all-MiniLM-L6-v2")
23
+
24
+ self.category_patterns = {
25
+ "Project Summary": [r"\bsummary\b", r"\bproject overview\b"],
26
+ "Contact Information": [r"\bcontact\b", r"\bemail\b", r"\bphone\b", r"\baddress\b"],
27
+ "Problem/ Need": [r"\bproblem\b", r"\bneed\b", r"\bchallenge\b"],
28
+ "Mission Statement": [r"\bmission\b", r"\bvision\b"],
29
+ "Fit or Alignment to Grant": [r"\balignment\b", r"\bfit\b", r"\bgrant (focus|priority)\b"],
30
+ "Goals/ Vision / Objectives": [r"\bgoals?\b", r"\bobjectives?\b", r"\bvision\b"],
31
+ "Our Solution *PROGRAMS* and Approach": [r"\bsolution\b", r"\bprogram\b", r"\bapproach\b"],
32
+ "Impact, Results, or Outcomes": [r"\bimpact\b", r"\bresults?\b", r"\boutcomes?\b"],
33
+ "Beneficiaries": [r"\bbeneficiaries\b", r"\bwho we serve\b", r"\btarget audience\b"],
34
+ "Differentiation with Competitors": [r"\bcompetitor\b", r"\bdifferent\b", r"\bvalue proposition\b"],
35
+ "Plan and Timeline": [r"\btimeline\b", r"\bschedule\b", r"\bmilestone\b"],
36
+ "Budget and Funding": [r"\bbudget\b", r"\bfunding\b", r"\bcost\b"],
37
+ "Sustainability and Strategy": [r"\bsustainability\b", r"\bexit strategy\b"],
38
+ "Organization's History": [r"\bhistory\b", r"\borganization background\b"],
39
+ "Team Member Descriptions": [r"\bteam\b", r"\bstaff\b", r"\blived experience\b"],
40
+ }
41
+
42
+ self.patterns = {
43
+ 'grant_application': {
44
+ 'header_patterns': [
45
+ r'\*\*([^*]+)\*\*',
46
+ r'^([A-Z][^a-z]*[A-Z])$',
47
+ r'^([A-Z][A-Za-z\s]+)$',
48
+ ],
49
+ 'question_patterns': [
50
+ r'^.+\?$',
51
+ r'^\*?Please .+',
52
+ r'^How .+',
53
+ r'^What .+',
54
+ r'^Describe .+',
55
+ ]
56
+ }
57
+ }
58
+
59
+ def match_category(self, text: str, return_first: bool = True) -> Optional[str] or List[str]:
60
+ lower_text = text.lower()
61
+ match_scores = defaultdict(int)
62
+ for category, patterns in self.category_patterns.items():
63
+ for pattern in patterns:
64
+ matches = re.findall(pattern, lower_text)
65
+ match_scores[category] += len(matches)
66
+
67
+ if not match_scores:
68
+ return None if return_first else []
69
+
70
+ sorted_categories = sorted(match_scores.items(), key=lambda x: -x[1])
71
+ return sorted_categories[0][0] if return_first else [cat for cat, _ in sorted_categories if match_scores[cat] > 0]
72
+
73
+ def extract_text_from_docx(self, file_path: str) -> str:
74
+ doc = Document(file_path)
75
+ return '\n'.join([f"**{p.text}**" if any(r.bold for r in p.runs) else p.text for p in doc.paragraphs])
76
+
77
+ def detect_document_type(self, text: str) -> str:
78
+ keywords = ['grant', 'funding', 'mission']
79
+ return 'grant_application' if sum(k in text.lower() for k in keywords) >= 2 else 'generic'
80
+
81
+ def extract_headers(self, text: str, doc_type: str) -> List[Dict]:
82
+ lines = text.split('\n')
83
+ headers = []
84
+ patterns = self.patterns.get(doc_type, self.patterns['grant_application'])
85
+ for i, line in enumerate(lines):
86
+ line = line.strip("* ")
87
+ if any(re.match(p, line, re.IGNORECASE) for p in patterns['question_patterns']):
88
+ headers.append({'text': line, 'line_number': i, 'pattern_type': 'question'})
89
+ elif any(re.match(p, line) for p in patterns['header_patterns']):
90
+ headers.append({'text': line, 'line_number': i, 'pattern_type': 'header'})
91
+ return headers
92
+
93
+ def chunk_by_headers(self, text: str, headers: List[Dict], max_words=150) -> List[Dict]:
94
+ lines = text.split('\n')
95
+ chunks = []
96
+
97
+ if not headers:
98
+ # fallback chunking
99
+ words = text.split()
100
+ for i in range(0, len(words), max_words):
101
+ piece = ' '.join(words[i:i + max_words])
102
+ chunks.append({
103
+ 'chunk_id': len(chunks) + 1,
104
+ 'header': '',
105
+ 'questions': [],
106
+ 'content': piece,
107
+ 'pattern_type': 'auto'
108
+ })
109
+ return chunks
110
+
111
+ for i, header in enumerate(headers):
112
+ start, end = header['line_number'], headers[i + 1]['line_number'] if i + 1 < len(headers) else len(lines)
113
+ content_lines = lines[start + 1:end]
114
+ questions = [l.strip() for l in content_lines if l.strip().endswith('?') and len(l.split()) <= 20]
115
+ content = ' '.join([l.strip() for l in content_lines if l.strip() and l.strip() not in questions])
116
+
117
+ for j in range(0, len(content.split()), max_words):
118
+ chunk_text = ' '.join(content.split()[j:j + max_words])
119
+ chunks.append({
120
+ 'chunk_id': len(chunks) + 1,
121
+ 'header': header['text'] if header['pattern_type'] == 'header' else '',
122
+ 'questions': questions if header['pattern_type'] == 'question' else [],
123
+ 'content': chunk_text,
124
+ 'pattern_type': header['pattern_type'],
125
+ 'split_index': j // max_words
126
+ })
127
+ return chunks
128
+
129
+ def extract_topics_tfidf(self, text: str, max_features: int = 3) -> List[str]:
130
+ clean = re.sub(r'[^\w\s]', ' ', text.lower())
131
+ vectorizer = TfidfVectorizer(max_features=max_features * 2, stop_words='english')
132
+ tfidf = vectorizer.fit_transform([clean])
133
+ terms = vectorizer.get_feature_names_out()
134
+ scores = tfidf.toarray()[0]
135
+ top_terms = [term for term, score in sorted(zip(terms, scores), key=lambda x: -x[1]) if score > 0]
136
+ return top_terms[:max_features]
137
+
138
+ def calculate_confidence_score(self, chunk: Dict) -> float:
139
+ score = 0.0
140
+ if chunk.get('header'): score += 0.3
141
+ if chunk.get('content') and len(chunk['content'].split()) > 20: score += 0.3
142
+ if chunk.get('questions'): score += 0.2
143
+ return min(score, 1.0)
144
+
145
+ def process_document(self, file_path: str, title: Optional[str] = None) -> List[Dict]:
146
+ file_path = Path(file_path)
147
+ text = self.extract_text_from_docx(str(file_path)) if file_path.suffix == ".docx" else file_path.read_text()
148
+ doc_type = self.detect_document_type(text)
149
+ headers = self.extract_headers(text, doc_type)
150
+ raw_chunks = self.chunk_by_headers(text, headers)
151
+
152
+ final_chunks = []
153
+ for chunk in raw_chunks:
154
+ full_text = f"{chunk['header']} {' '.join(chunk['questions'])} {chunk['content']}".strip()
155
+ category = self.match_category(full_text, return_first=True)
156
+ categories = self.match_category(full_text, return_first=False)
157
+ embedding = self.embed_model.encode(full_text).tolist()
158
+ topics = self.extract_topics_tfidf(full_text)
159
+ confidence = self.calculate_confidence_score(chunk)
160
+
161
+ final_chunks.append({
162
+ "chunk_id": chunk['chunk_id'],
163
+ "text": full_text,
164
+ "embedding": embedding,
165
+ "metadata": {
166
+ **chunk,
167
+ "title": title or file_path.name,
168
+ "category": category,
169
+ "categories": categories,
170
+ "topics": topics,
171
+ "confidence_score": confidence
172
+ }
173
+ })
174
+
175
+ return final_chunks