Spaces:
Running
Running
Update app.py
Browse files
app.py
CHANGED
@@ -151,35 +151,67 @@ class URLProcessor:
|
|
151 |
|
152 |
class FileProcessor:
|
153 |
"""Class to handle file processing"""
|
154 |
-
|
155 |
def __init__(self, max_file_size: int = 2 * 1024 * 1024 * 1024): # 2GB default
|
156 |
self.max_file_size = max_file_size
|
157 |
self.supported_text_extensions = {'.txt', '.md', '.csv', '.json', '.xml'}
|
|
|
|
|
|
|
|
|
|
|
|
|
158 |
|
159 |
-
def
|
160 |
-
"""Process uploaded
|
161 |
-
if not
|
162 |
return []
|
163 |
|
164 |
-
|
|
|
|
|
165 |
try:
|
166 |
-
|
167 |
-
|
168 |
-
|
169 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
170 |
|
171 |
-
|
172 |
-
if zipfile.is_zipfile(
|
173 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
174 |
else:
|
175 |
-
|
176 |
|
177 |
except Exception as e:
|
178 |
-
logger.error(f"Error processing
|
179 |
-
|
180 |
-
|
181 |
-
return dataset
|
182 |
-
|
183 |
def _process_zip_file(self, zip_path: str, temp_dir: str) -> List[Dict]:
|
184 |
"""Process ZIP file contents"""
|
185 |
results = []
|
|
|
151 |
|
152 |
class FileProcessor:
|
153 |
"""Class to handle file processing"""
|
154 |
+
|
155 |
def __init__(self, max_file_size: int = 2 * 1024 * 1024 * 1024): # 2GB default
|
156 |
self.max_file_size = max_file_size
|
157 |
self.supported_text_extensions = {'.txt', '.md', '.csv', '.json', '.xml'}
|
158 |
+
self.processed_zip_count = 0
|
159 |
+
self.max_zip_files = 5
|
160 |
+
|
161 |
+
def is_text_file(self, file_path: str) -> bool:
|
162 |
+
"""Check if the file is a text file based on its extension."""
|
163 |
+
return any(file_path.lower().endswith(ext) for ext in self.supported_text_extensions)
|
164 |
|
165 |
+
def process_files(self, files: Union[List[gr.File], List[str]]) -> List[Dict]:
|
166 |
+
"""Process multiple uploaded files and return a single JSON extraction"""
|
167 |
+
if not files:
|
168 |
return []
|
169 |
|
170 |
+
combined_data = []
|
171 |
+
self.processed_zip_count = 0
|
172 |
+
|
173 |
try:
|
174 |
+
for file in files:
|
175 |
+
# Check if the file is a Gradio File object or a string path
|
176 |
+
file_path = file.name if isinstance(file, gr.File) else file
|
177 |
+
|
178 |
+
# Log the file path being processed
|
179 |
+
logger.info(f"Processing file: {file_path}")
|
180 |
+
|
181 |
+
# Skip if it's a directory
|
182 |
+
if os.path.isdir(file_path):
|
183 |
+
logger.warning(f"Skipping directory: {file_path}")
|
184 |
+
continue
|
185 |
+
|
186 |
+
# Skip if file doesn't exist
|
187 |
+
if not os.path.exists(file_path):
|
188 |
+
logger.warning(f"File does not exist: {file_path}")
|
189 |
+
continue
|
190 |
+
|
191 |
+
# Check file size
|
192 |
+
file_size = os.path.getsize(file_path)
|
193 |
+
if file_size > self.max_file_size:
|
194 |
+
logger.warning(f"File size ({file_size} bytes) exceeds maximum allowed size")
|
195 |
+
continue # Skip this file
|
196 |
|
197 |
+
# Process based on file type
|
198 |
+
if zipfile.is_zipfile(file_path):
|
199 |
+
if self.processed_zip_count >= self.max_zip_files:
|
200 |
+
logger.warning(f"Maximum number of ZIP files ({self.max_zip_files}) reached, skipping {file_path}")
|
201 |
+
continue
|
202 |
+
self.processed_zip_count += 1
|
203 |
+
zip_results = self._process_zip_file(file_path)
|
204 |
+
combined_data.extend(zip_results)
|
205 |
+
elif self.is_text_file(file_path):
|
206 |
+
file_results = self._process_single_file(file_path)
|
207 |
+
combined_data.extend(file_results)
|
208 |
else:
|
209 |
+
logger.warning(f"Unsupported file type: {file_path}")
|
210 |
|
211 |
except Exception as e:
|
212 |
+
logger.error(f"Error processing files: {str(e)}")
|
213 |
+
|
214 |
+
return combined_data
|
|
|
|
|
215 |
def _process_zip_file(self, zip_path: str, temp_dir: str) -> List[Dict]:
|
216 |
"""Process ZIP file contents"""
|
217 |
results = []
|