Spaces:
Running
Running
import re | |
from typing import List, Dict, Optional | |
from pathlib import Path | |
from collections import defaultdict | |
from dataclasses import dataclass | |
import fitz # PyMuPDF | |
from docx import Document | |
from sentence_transformers import SentenceTransformer | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
class DocumentChunk: | |
chunk_id: int | |
text: str | |
embedding: List[float] | |
metadata: Dict | |
class DocumentChunker: | |
def __init__(self): | |
self.embed_model = SentenceTransformer("all-MiniLM-L6-v2") | |
self.category_patterns = { | |
"Project Summary": [r"\bsummary\b", r"\bproject overview\b"], | |
"Contact Information": [r"\bcontact\b", r"\bemail\b", r"\bphone\b", r"\baddress\b"], | |
"Problem/ Need": [r"\bproblem\b", r"\bneed\b", r"\bchallenge\b"], | |
"Mission Statement": [r"\bmission\b", r"\bvision\b"], | |
"Fit or Alignment to Grant": [r"\balignment\b", r"\bfit\b", r"\bgrant (focus|priority)\b"], | |
"Goals/ Vision / Objectives": [r"\bgoals?\b", r"\bobjectives?\b", r"\bvision\b"], | |
"Our Solution *PROGRAMS* and Approach": [r"\bsolution\b", r"\bprogram\b", r"\bapproach\b"], | |
"Impact, Results, or Outcomes": [r"\bimpact\b", r"\bresults?\b", r"\boutcomes?\b"], | |
"Beneficiaries": [r"\bbeneficiaries\b", r"\bwho we serve\b", r"\btarget audience\b"], | |
"Differentiation with Competitors": [r"\bcompetitor\b", r"\bdifferent\b", r"\bvalue proposition\b"], | |
"Plan and Timeline": [r"\btimeline\b", r"\bschedule\b", r"\bmilestone\b"], | |
"Budget and Funding": [r"\bbudget\b", r"\bfunding\b", r"\bcost\b"], | |
"Sustainability and Strategy": [r"\bsustainability\b", r"\bexit strategy\b"], | |
"Organization's History": [r"\bhistory\b", r"\borganization background\b"], | |
"Team Member Descriptions": [r"\bteam\b", r"\bstaff\b", r"\blived experience\b"], | |
} | |
self.patterns = { | |
'grant_application': { | |
'header_patterns': [r'\*\*([^*]+)\*\*', r'^([A-Z][^a-z]*[A-Z])$', r'^([A-Z][A-Za-z\s]+)$'], | |
'question_patterns': [r'^.+\?$', r'^\*?Please .+', r'^How .+', r'^What .+', r'^Describe .+'] | |
} | |
} | |
def match_category(self, text: str, return_first: bool = True) -> Optional[str] or List[str]: | |
lower_text = text.lower() | |
match_scores = defaultdict(int) | |
for category, patterns in self.category_patterns.items(): | |
for pattern in patterns: | |
matches = re.findall(pattern, lower_text) | |
match_scores[category] += len(matches) | |
if not match_scores: | |
return None if return_first else [] | |
sorted_categories = sorted(match_scores.items(), key=lambda x: -x[1]) | |
return sorted_categories[0][0] if return_first else [cat for cat, _ in sorted_categories if match_scores[cat] > 0] | |
def extract_text(self, file_path: str) -> str: | |
if file_path.endswith(".docx"): | |
doc = Document(file_path) | |
return '\n'.join([f"**{p.text}**" if any(r.bold for r in p.runs) else p.text for p in doc.paragraphs]) | |
elif file_path.endswith(".pdf"): | |
text = "" | |
with fitz.open(file_path) as doc: | |
for page in doc: | |
text += page.get_text("text") # More accurate reading order | |
return text | |
else: | |
return Path(file_path).read_text() | |
def detect_document_type(self, text: str) -> str: | |
keywords = ['grant', 'funding', 'mission'] | |
return 'grant_application' if sum(k in text.lower() for k in keywords) >= 2 else 'generic' | |
def extract_headers(self, text: str, doc_type: str) -> List[Dict]: | |
lines = text.split('\n') | |
headers = [] | |
patterns = self.patterns.get(doc_type, self.patterns['grant_application']) | |
for i, line in enumerate(lines): | |
line = line.strip("* ") | |
if any(re.match(p, line, re.IGNORECASE) for p in patterns['question_patterns']): | |
headers.append({'text': line, 'line_number': i, 'pattern_type': 'question'}) | |
elif any(re.match(p, line) for p in patterns['header_patterns']): | |
headers.append({'text': line, 'line_number': i, 'pattern_type': 'header'}) | |
return headers | |
def fallback_chunking(self, text: str, max_words=150, stride=100) -> List[Dict]: | |
words = text.split() | |
chunks = [] | |
for i in range(0, len(words), stride): | |
chunk_text = ' '.join(words[i:i + max_words]) | |
if len(chunk_text.split()) < 20: | |
continue | |
chunks.append({ | |
'chunk_id': len(chunks) + 1, | |
'header': '', | |
'questions': [], | |
'content': chunk_text, | |
'pattern_type': 'fallback', | |
'split_index': i // stride | |
}) | |
return chunks | |
def chunk_by_headers(self, text: str, headers: List[Dict], max_words=150) -> List[Dict]: | |
lines = text.split('\n') | |
chunks = [] | |
for i, header in enumerate(headers): | |
start, end = header['line_number'], headers[i + 1]['line_number'] if i + 1 < len(headers) else len(lines) | |
content_lines = lines[start + 1:end] | |
questions = [l.strip() for l in content_lines if l.strip().endswith('?') and len(l.split()) <= 20] | |
content = ' '.join([l.strip() for l in content_lines if l.strip() and l.strip() not in questions]) | |
for j in range(0, len(content.split()), max_words): | |
chunk_text = ' '.join(content.split()[j:j + max_words]) | |
if len(chunk_text.split()) < 20: | |
continue | |
chunks.append({ | |
'chunk_id': len(chunks) + 1, | |
'header': header['text'] if header['pattern_type'] == 'header' else '', | |
'questions': questions if header['pattern_type'] == 'question' else [], | |
'content': chunk_text, | |
'pattern_type': header['pattern_type'], | |
'split_index': j // max_words | |
}) | |
return chunks | |
def extract_topics_tfidf(self, text: str, max_features: int = 3) -> List[str]: | |
clean = re.sub(r'[^a-z0-9\s]', ' ', text.lower()) | |
vectorizer = TfidfVectorizer(max_features=max_features * 2, stop_words='english') | |
tfidf = vectorizer.fit_transform([clean]) | |
terms = vectorizer.get_feature_names_out() | |
scores = tfidf.toarray()[0] | |
top_terms = [term for term, score in sorted(zip(terms, scores), key=lambda x: -x[1]) if score > 0] | |
return top_terms[:max_features] | |
def calculate_confidence_score(self, chunk: Dict) -> float: | |
score = 0.0 | |
if chunk.get('header'): score += 0.3 | |
if chunk.get('content') and len(chunk['content'].split()) > 20: score += 0.3 | |
if chunk.get('questions'): score += 0.2 | |
return min(score, 1.0) | |
def process_document(self, file_path: str, title: Optional[str] = None) -> List[Dict]: | |
file_path = Path(file_path) | |
text = self.extract_text(str(file_path)) | |
doc_type = self.detect_document_type(text) | |
headers = self.extract_headers(text, doc_type) | |
chunks = self.chunk_by_headers(text, headers) | |
if not chunks: | |
chunks = self.fallback_chunking(text) | |
final_chunks = [] | |
for chunk in chunks: | |
full_text = f"{chunk['header']} {' '.join(chunk['questions'])} {chunk['content']}".strip() | |
category = self.match_category(full_text, return_first=True) | |
categories = self.match_category(full_text, return_first=False) | |
embedding = self.embed_model.encode(full_text).tolist() | |
topics = self.extract_topics_tfidf(full_text) | |
confidence = self.calculate_confidence_score(chunk) | |
final_chunks.append({ | |
"chunk_id": chunk['chunk_id'], | |
"text": full_text, | |
"embedding": embedding, | |
"metadata": { | |
**chunk, | |
"title": title or file_path.name, | |
"category": category, | |
"categories": categories, | |
"topics": topics, | |
"chunking_strategy": chunk['pattern_type'], | |
"confidence_score": confidence | |
} | |
}) | |
return final_chunks | |
# import re | |
# from typing import List, Dict, Optional | |
# from pathlib import Path | |
# from collections import defaultdict | |
# from dataclasses import dataclass | |
# from docx import Document | |
# from sentence_transformers import SentenceTransformer | |
# from sklearn.feature_extraction.text import TfidfVectorizer | |
# import fitz # PyMuPDF | |
# @dataclass | |
# class DocumentChunk: | |
# chunk_id: int | |
# text: str | |
# embedding: List[float] | |
# metadata: Dict | |
# class DocumentChunker: | |
# def __init__(self): | |
# self.embed_model = SentenceTransformer("all-MiniLM-L6-v2") | |
# self.category_patterns = { | |
# "Project Summary": [r"\bsummary\b", r"\bproject overview\b"], | |
# "Contact Information": [r"\bcontact\b", r"\bemail\b", r"\bphone\b", r"\baddress\b"], | |
# "Problem/ Need": [r"\bproblem\b", r"\bneed\b", r"\bchallenge\b"], | |
# "Mission Statement": [r"\bmission\b", r"\bvision\b"], | |
# "Fit or Alignment to Grant": [r"\balignment\b", r"\bfit\b", r"\bgrant (focus|priority)\b"], | |
# "Goals/ Vision / Objectives": [r"\bgoals?\b", r"\bobjectives?\b", r"\bvision\b"], | |
# "Our Solution *PROGRAMS* and Approach": [r"\bsolution\b", r"\bprogram\b", r"\bapproach\b"], | |
# "Impact, Results, or Outcomes": [r"\bimpact\b", r"\bresults?\b", r"\boutcomes?\b"], | |
# "Beneficiaries": [r"\bbeneficiaries\b", r"\bwho we serve\b", r"\btarget audience\b"], | |
# "Differentiation with Competitors": [r"\bcompetitor\b", r"\bdifferent\b", r"\bvalue proposition\b"], | |
# "Plan and Timeline": [r"\btimeline\b", r"\bschedule\b", r"\bmilestone\b"], | |
# "Budget and Funding": [r"\bbudget\b", r"\bfunding\b", r"\bcost\b"], | |
# "Sustainability and Strategy": [r"\bsustainability\b", r"\bexit strategy\b"], | |
# "Organization's History": [r"\bhistory\b", r"\borganization background\b"], | |
# "Team Member Descriptions": [r"\bteam\b", r"\bstaff\b", r"\blived experience\b"], | |
# } | |
# self.patterns = { | |
# 'grant_application': { | |
# 'header_patterns': [ | |
# r'\*\*([^*]+)\*\*', | |
# r'^([A-Z][^a-z]*[A-Z])$', | |
# r'^([A-Z][A-Za-z\s]+)$', | |
# ], | |
# 'question_patterns': [ | |
# r'^.+\?$', | |
# r'^\*?Please .+', | |
# r'^How .+', | |
# r'^What .+', | |
# r'^Describe .+', | |
# ] | |
# } | |
# } | |
# def extract_text(self, file_path: str) -> str: | |
# if file_path.endswith(".docx"): | |
# doc = Document(file_path) | |
# return '\n'.join([f"**{p.text}**" if any(r.bold for r in p.runs) else p.text for p in doc.paragraphs]) | |
# elif file_path.endswith(".pdf"): | |
# text = "" | |
# with fitz.open(file_path) as doc: | |
# for page in doc: | |
# text += page.get_text() | |
# return text | |
# elif file_path.endswith(".txt"): | |
# return Path(file_path).read_text() | |
# else: | |
# raise ValueError("Unsupported file format") | |
# def detect_document_type(self, text: str) -> str: | |
# keywords = ['grant', 'funding', 'mission'] | |
# return 'grant_application' if sum(k in text.lower() for k in keywords) >= 2 else 'generic' | |
# def extract_headers(self, text: str, doc_type: str) -> List[Dict]: | |
# lines = text.split('\n') | |
# headers = [] | |
# patterns = self.patterns.get(doc_type, self.patterns['grant_application']) | |
# for i, line in enumerate(lines): | |
# line = line.strip("* ") | |
# if any(re.match(p, line, re.IGNORECASE) for p in patterns['question_patterns']): | |
# headers.append({'text': line, 'line_number': i, 'pattern_type': 'question'}) | |
# elif any(re.match(p, line) for p in patterns['header_patterns']): | |
# headers.append({'text': line, 'line_number': i, 'pattern_type': 'header'}) | |
# return headers | |
# def chunk_by_headers(self, text: str, headers: List[Dict], max_words=150) -> List[Dict]: | |
# lines = text.split('\n') | |
# chunks = [] | |
# if not headers: | |
# words = text.split() | |
# for i in range(0, len(words), max_words): | |
# piece = ' '.join(words[i:i + max_words]) | |
# chunks.append({ | |
# 'chunk_id': len(chunks) + 1, | |
# 'header': '', | |
# 'questions': [], | |
# 'content': piece, | |
# 'pattern_type': 'auto' | |
# }) | |
# return chunks | |
# for i, header in enumerate(headers): | |
# start, end = header['line_number'], headers[i + 1]['line_number'] if i + 1 < len(headers) else len(lines) | |
# content_lines = lines[start + 1:end] | |
# questions = [l.strip() for l in content_lines if l.strip().endswith('?') and len(l.split()) <= 20] | |
# content = ' '.join([l.strip() for l in content_lines if l.strip() and l.strip() not in questions]) | |
# for j in range(0, len(content.split()), max_words): | |
# chunk_text = ' '.join(content.split()[j:j + max_words]) | |
# chunks.append({ | |
# 'chunk_id': len(chunks) + 1, | |
# 'header': header['text'] if header['pattern_type'] == 'header' else '', | |
# 'questions': questions if header['pattern_type'] == 'question' else [], | |
# 'content': chunk_text, | |
# 'pattern_type': header['pattern_type'], | |
# 'split_index': j // max_words | |
# }) | |
# return chunks | |
# def match_category(self, text: str, return_first: bool = True) -> Optional[str] or List[str]: | |
# lower_text = text.lower() | |
# match_scores = defaultdict(int) | |
# for category, patterns in self.category_patterns.items(): | |
# for pattern in patterns: | |
# matches = re.findall(pattern, lower_text) | |
# match_scores[category] += len(matches) | |
# if not match_scores: | |
# return None if return_first else [] | |
# sorted_categories = sorted(match_scores.items(), key=lambda x: -x[1]) | |
# return sorted_categories[0][0] if return_first else [cat for cat, _ in sorted_categories if match_scores[cat] > 0] | |
# def extract_topics_tfidf(self, text: str, max_features: int = 3) -> List[str]: | |
# clean = re.sub(r'[^\w\s]', ' ', text.lower()) | |
# vectorizer = TfidfVectorizer(max_features=max_features * 2, stop_words='english') | |
# tfidf = vectorizer.fit_transform([clean]) | |
# terms = vectorizer.get_feature_names_out() | |
# scores = tfidf.toarray()[0] | |
# top_terms = [term for term, score in sorted(zip(terms, scores), key=lambda x: -x[1]) if score > 0] | |
# return top_terms[:max_features] | |
# def calculate_confidence_score(self, chunk: Dict) -> float: | |
# score = 0.0 | |
# if chunk.get('header'): score += 0.3 | |
# if chunk.get('content') and len(chunk['content'].split()) > 20: score += 0.3 | |
# if chunk.get('questions'): score += 0.2 | |
# return min(score, 1.0) | |
# def process_document(self, file_path: str, title: Optional[str] = None) -> List[Dict]: | |
# file_path = Path(file_path) | |
# text = self.extract_text(str(file_path)) | |
# doc_type = self.detect_document_type(text) | |
# headers = self.extract_headers(text, doc_type) | |
# raw_chunks = self.chunk_by_headers(text, headers) | |
# final_chunks = [] | |
# for chunk in raw_chunks: | |
# full_text = f"{chunk['header']} {' '.join(chunk['questions'])} {chunk['content']}".strip() | |
# category = self.match_category(full_text, return_first=True) | |
# categories = self.match_category(full_text, return_first=False) | |
# embedding = self.embed_model.encode(full_text).tolist() | |
# topics = self.extract_topics_tfidf(full_text) | |
# confidence = self.calculate_confidence_score(chunk) | |
# final_chunks.append({ | |
# "chunk_id": chunk['chunk_id'], | |
# "text": full_text, | |
# "embedding": embedding, | |
# "metadata": { | |
# **chunk, | |
# "title": title or file_path.name, | |
# "category": category, | |
# "categories": categories, | |
# "topics": topics, | |
# "confidence_score": confidence | |
# } | |
# }) | |
# return final_chunks | |