milwright commited on
Commit
b1f7185
·
verified ·
1 Parent(s): 7bd3594

Delete document_processor.py

Browse files
Files changed (1) hide show
  1. document_processor.py +0 -205
document_processor.py DELETED
@@ -1,205 +0,0 @@
1
- import os
2
- import json
3
- from typing import List, Dict, Any, Tuple
4
- from pathlib import Path
5
- import hashlib
6
-
7
- # Document parsing imports
8
- try:
9
- import fitz # PyMuPDF
10
- HAS_PYMUPDF = True
11
- except ImportError:
12
- HAS_PYMUPDF = False
13
-
14
- try:
15
- from docx import Document
16
- HAS_DOCX = True
17
- except ImportError:
18
- HAS_DOCX = False
19
-
20
- # Text processing
21
- import re
22
- from dataclasses import dataclass
23
-
24
-
25
- @dataclass
26
- class DocumentChunk:
27
- text: str
28
- metadata: Dict[str, Any]
29
- chunk_id: str
30
-
31
- def to_dict(self):
32
- return {
33
- 'text': self.text,
34
- 'metadata': self.metadata,
35
- 'chunk_id': self.chunk_id
36
- }
37
-
38
-
39
- class DocumentProcessor:
40
- def __init__(self, chunk_size: int = 800, chunk_overlap: int = 100):
41
- self.chunk_size = chunk_size
42
- self.chunk_overlap = chunk_overlap
43
- self.supported_extensions = ['.pdf', '.docx', '.txt', '.md']
44
-
45
- def process_file(self, file_path: str) -> List[DocumentChunk]:
46
- """Process a single file and return chunks"""
47
- path = Path(file_path)
48
-
49
- if not path.exists():
50
- raise FileNotFoundError(f"File not found: {file_path}")
51
-
52
- extension = path.suffix.lower()
53
- if extension not in self.supported_extensions:
54
- raise ValueError(f"Unsupported file type: {extension}")
55
-
56
- # Extract text based on file type
57
- if extension == '.pdf':
58
- text = self._extract_pdf_text(file_path)
59
- elif extension == '.docx':
60
- text = self._extract_docx_text(file_path)
61
- elif extension in ['.txt', '.md']:
62
- text = self._extract_text_file(file_path)
63
- else:
64
- raise ValueError(f"Unsupported file type: {extension}")
65
-
66
- # Create chunks
67
- chunks = self._create_chunks(text, file_path)
68
-
69
- return chunks
70
-
71
- def _extract_pdf_text(self, file_path: str) -> str:
72
- """Extract text from PDF file"""
73
- if not HAS_PYMUPDF:
74
- raise ImportError("PyMuPDF not installed. Install with: pip install PyMuPDF")
75
-
76
- text_parts = []
77
-
78
- try:
79
- with fitz.open(file_path) as pdf:
80
- for page_num in range(len(pdf)):
81
- page = pdf[page_num]
82
- text = page.get_text()
83
- if text.strip():
84
- text_parts.append(f"[Page {page_num + 1}]\n{text}")
85
- except Exception as e:
86
- raise Exception(f"Error processing PDF: {str(e)}")
87
-
88
- return "\n\n".join(text_parts)
89
-
90
- def _extract_docx_text(self, file_path: str) -> str:
91
- """Extract text from DOCX file"""
92
- if not HAS_DOCX:
93
- raise ImportError("python-docx not installed. Install with: pip install python-docx")
94
-
95
- text_parts = []
96
-
97
- try:
98
- doc = Document(file_path)
99
-
100
- for paragraph in doc.paragraphs:
101
- if paragraph.text.strip():
102
- text_parts.append(paragraph.text)
103
-
104
- # Also extract text from tables
105
- for table in doc.tables:
106
- for row in table.rows:
107
- row_text = []
108
- for cell in row.cells:
109
- if cell.text.strip():
110
- row_text.append(cell.text.strip())
111
- if row_text:
112
- text_parts.append(" | ".join(row_text))
113
-
114
- except Exception as e:
115
- raise Exception(f"Error processing DOCX: {str(e)}")
116
-
117
- return "\n\n".join(text_parts)
118
-
119
- def _extract_text_file(self, file_path: str) -> str:
120
- """Extract text from plain text or markdown file"""
121
- try:
122
- with open(file_path, 'r', encoding='utf-8') as f:
123
- return f.read()
124
- except Exception as e:
125
- raise Exception(f"Error reading text file: {str(e)}")
126
-
127
- def _create_chunks(self, text: str, file_path: str) -> List[DocumentChunk]:
128
- """Create overlapping chunks from text"""
129
- chunks = []
130
-
131
- # Clean and normalize text
132
- text = re.sub(r'\s+', ' ', text)
133
- text = text.strip()
134
-
135
- if not text:
136
- return chunks
137
-
138
- # Simple word-based chunking
139
- words = text.split()
140
-
141
- for i in range(0, len(words), self.chunk_size - self.chunk_overlap):
142
- chunk_words = words[i:i + self.chunk_size]
143
- chunk_text = ' '.join(chunk_words)
144
-
145
- # Create chunk ID
146
- chunk_id = hashlib.md5(f"{file_path}_{i}_{chunk_text[:50]}".encode()).hexdigest()[:8]
147
-
148
- # Create metadata
149
- metadata = {
150
- 'file_path': file_path,
151
- 'file_name': Path(file_path).name,
152
- 'chunk_index': len(chunks),
153
- 'start_word': i,
154
- 'word_count': len(chunk_words)
155
- }
156
-
157
- chunk = DocumentChunk(
158
- text=chunk_text,
159
- metadata=metadata,
160
- chunk_id=chunk_id
161
- )
162
-
163
- chunks.append(chunk)
164
-
165
- return chunks
166
-
167
- def process_multiple_files(self, file_paths: List[str]) -> Tuple[List[DocumentChunk], Dict[str, Any]]:
168
- """Process multiple files and return chunks with summary"""
169
- all_chunks = []
170
- summary = {
171
- 'total_files': 0,
172
- 'total_chunks': 0,
173
- 'files_processed': [],
174
- 'errors': []
175
- }
176
-
177
- for file_path in file_paths:
178
- try:
179
- chunks = self.process_file(file_path)
180
- all_chunks.extend(chunks)
181
-
182
- summary['files_processed'].append({
183
- 'path': file_path,
184
- 'name': Path(file_path).name,
185
- 'chunks': len(chunks)
186
- })
187
-
188
- except Exception as e:
189
- summary['errors'].append({
190
- 'path': file_path,
191
- 'error': str(e)
192
- })
193
-
194
- summary['total_files'] = len(summary['files_processed'])
195
- summary['total_chunks'] = len(all_chunks)
196
-
197
- return all_chunks, summary
198
-
199
-
200
- # Utility function for file size validation
201
- def validate_file_size(file_path: str, max_size_mb: float = 10.0) -> bool:
202
- """Check if file size is within limits"""
203
- size_bytes = os.path.getsize(file_path)
204
- size_mb = size_bytes / (1024 * 1024)
205
- return size_mb <= max_size_mb