acecalisto3 commited on
Commit
505b1a3
·
verified ·
1 Parent(s): c70f013

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +58 -51
app.py CHANGED
@@ -119,7 +119,7 @@ class URLProcessor:
119
  'timestamp': datetime.now().isoformat()
120
  }
121
  except Exception as e:
122
- logger.error(f"Google Drive processing failed: {e}")
123
  return None
124
 
125
  def _handle_google_calendar(self, url: str) -> Optional[Dict]:
@@ -212,6 +212,58 @@ class FileProcessor:
212
 
213
  return dataset
214
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
215
  def chunk_data(self, data, max_size=2953): # 2953 is the max size for version 1 QR code
216
  """Chunk data into smaller pieces if it exceeds max_size."""
217
  json_str = json.dumps(data, ensure_ascii=False)
@@ -226,54 +278,14 @@ class FileProcessor:
226
  json_str = json_str[max_size:]
227
 
228
  return chunks
229
-
230
- def _process_single_file(self, file) -> List[Dict]:
231
- """Process a single file"""
232
- try:
233
- file_stat = os.stat(file.name)
234
-
235
- # For very large files, read in chunks and summarize
236
- if file_stat.st_size > 100 * 1024 * 1024: # 100MB
237
- logger.info(f"Processing large file: {file.name} ({file_stat.st_size} bytes)")
238
 
239
- # Read first and last 1MB for extremely large files
240
- content = ""
241
- with open(file.name, 'r', encoding='utf-8', errors='ignore') as f:
242
- content = f.read(1 * 1024 * 1024) # First 1MB
243
- content += "\n...[Content truncated due to large file size]...\n"
244
-
245
- # Seek to the last 1MB
246
- f.seek(max(0, file_stat.st_size - 1 * 1024 * 1024))
247
- content += f.read() # Last 1MB
248
- else:
249
- # Regular file processing
250
- with open(file.name, 'r', encoding='utf-8', errors='ignore') as f:
251
- content = f.read()
252
-
253
- return [{
254
- 'source': 'filename', # Assuming 'source' should be a string value
255
- 'filename': os.path.basename(file.name),
256
- 'file_size': file_stat.st_size,
257
- 'mime_type': mimetypes.guess_type(file.name)[0],
258
- 'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(),
259
- 'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(),
260
- 'content': content,
261
- 'timestamp': datetime.now().isoformat()
262
- }]
263
- except Exception as e:
264
- logger.error(f"File processing error: {e}")
265
- return []
266
-
267
  def clean_json(data: Union[str, Dict]) -> Optional[Dict]:
268
  """Clean and validate JSON data"""
269
  try:
270
- # If it's a string, try to parse it
271
  if isinstance(data, str):
272
- # Remove any existing content and extra whitespace
273
  data = data.strip()
274
  data = json.loads(data)
275
 
276
- # Convert to string and back to ensure proper JSON format
277
  cleaned = json.loads(json.dumps(data))
278
  return cleaned
279
  except json.JSONDecodeError as e:
@@ -290,9 +302,8 @@ def generate_qr_code(data: Union[str, Dict], combined: bool = True) -> List[str]
290
  output_dir.mkdir(parents=True, exist_ok=True)
291
 
292
  if combined:
293
- # Generate single QR code for all data
294
  cleaned_data = clean_json(data)
295
- if cleaned_data is None: # Check if cleaning failed
296
  logger.error("Failed to clean data for QR code generation.")
297
  return []
298
 
@@ -311,14 +322,13 @@ def generate_qr_code(data: Union[str, Dict], combined: bool = True) -> List[str]
311
  img.save(str(output_path))
312
  return [str(output_path)]
313
  else:
314
- # Generate separate QR codes for each item
315
  if isinstance(data, list):
316
  paths = []
317
  for idx, item in enumerate(data):
318
  cleaned_item = clean_json(item)
319
- if cleaned_item is None: # Check if cleaning failed
320
  logger.error(f"Failed to clean item {idx} for QR code generation.")
321
- continue # Skip this item
322
 
323
  qr = qrcode.QRCode(
324
  version=None,
@@ -336,9 +346,8 @@ def generate_qr_code(data: Union[str, Dict], combined: bool = True) -> List[str]
336
  paths.append(str(output_path))
337
  return paths
338
  else:
339
- # Single item, not combined
340
  cleaned_item = clean_json(data)
341
- if cleaned_item is None: # Check if cleaning failed
342
  logger.error("Failed to clean single item for QR code generation.")
343
  return []
344
 
@@ -453,7 +462,6 @@ def create_interface():
453
  # Process text input first (since it's direct JSON)
454
  if text and text.strip():
455
  try:
456
- # Try to parse as JSON
457
  json_data = json.loads(text)
458
  if isinstance(json_data, list):
459
  results.extend(json_data)
@@ -490,7 +498,6 @@ def create_interface():
490
  # Generate QR codes
491
  if results:
492
  if combine:
493
- # Chunk the data if necessary
494
  combined_data = []
495
  for item in results:
496
  combined_data.extend(file_processor.chunk_data(item))
@@ -505,7 +512,7 @@ def create_interface():
505
  f"✅ Successfully processed {len(results)} items and generated {len(qr_paths)} QR code(s)!"
506
  )
507
  else:
508
- return None, [], "❌ Failed to generate QR codes. Please check the input data."
509
  else:
510
  return None, [], "⚠️ No valid content to process. Please provide some input data."
511
 
 
119
  'timestamp': datetime.now().isoformat()
120
  }
121
  except Exception as e:
122
+ logger.error (f"Google Drive processing failed: {e}")
123
  return None
124
 
125
  def _handle_google_calendar(self, url: str) -> Optional[Dict]:
 
212
 
213
  return dataset
214
 
215
+ def _process_single_file(self, file) -> List[Dict]:
216
+ """Process a single file"""
217
+ try:
218
+ file_stat = os.stat(file.name)
219
+
220
+ # For very large files, read in chunks and summarize
221
+ if file_stat.st_size > 100 * 1024 * 1024: # 100MB
222
+ logger.info(f"Processing large file: {file.name} ({file_stat.st_size} bytes)")
223
+
224
+ # Read first and last 1MB for extremely large files
225
+ content = ""
226
+ with open(file.name, 'r', encoding='utf-8', errors='ignore') as f:
227
+ content = f.read(1 * 1024 * 1024) # First 1MB
228
+ content += "\n...[Content truncated due to large file size]...\n"
229
+
230
+ # Seek to the last 1MB
231
+ f.seek(max(0, file_stat.st_size - 1 * 1024 * 1024))
232
+ content += f.read() # Last 1MB
233
+ else:
234
+ # Regular file processing
235
+ with open(file.name, 'r', encoding='utf-8', errors='ignore') as f:
236
+ content = f.read()
237
+
238
+ return [{
239
+ 'source': 'filename',
240
+ 'filename': os.path.basename(file.name),
241
+ 'file_size': file_stat.st_size,
242
+ 'mime_type': mimetypes.guess_type(file.name)[0],
243
+ 'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(),
244
+ 'modified': datetime.fromtimestamp(file_stat.st_m time).isoformat(),
245
+ 'content': content,
246
+ 'timestamp': datetime.now().isoformat()
247
+ }]
248
+ except Exception as e:
249
+ logger.error(f"File processing error: {e}")
250
+ return []
251
+
252
+ def _process_zip_file(self, zip_file_path: str, extract_to: str) -> List[Dict]:
253
+ """Process a zip file and extract its contents"""
254
+ dataset = []
255
+ try:
256
+ with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
257
+ zip_ref.extractall(extract_to)
258
+ for file_info in zip_ref.infolist():
259
+ if file_info.is_dir():
260
+ continue
261
+ extracted_file_path = os.path.join(extract_to, file_info.filename)
262
+ dataset.extend(self._process_single_file(open(extracted_file_path, 'rb')))
263
+ except Exception as e:
264
+ logger.error(f"Error processing zip file: {e}")
265
+ return dataset
266
+
267
  def chunk_data(self, data, max_size=2953): # 2953 is the max size for version 1 QR code
268
  """Chunk data into smaller pieces if it exceeds max_size."""
269
  json_str = json.dumps(data, ensure_ascii=False)
 
278
  json_str = json_str[max_size:]
279
 
280
  return chunks
 
 
 
 
 
 
 
 
 
281
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
282
  def clean_json(data: Union[str, Dict]) -> Optional[Dict]:
283
  """Clean and validate JSON data"""
284
  try:
 
285
  if isinstance(data, str):
 
286
  data = data.strip()
287
  data = json.loads(data)
288
 
 
289
  cleaned = json.loads(json.dumps(data))
290
  return cleaned
291
  except json.JSONDecodeError as e:
 
302
  output_dir.mkdir(parents=True, exist_ok=True)
303
 
304
  if combined:
 
305
  cleaned_data = clean_json(data)
306
+ if cleaned_data is None:
307
  logger.error("Failed to clean data for QR code generation.")
308
  return []
309
 
 
322
  img.save(str(output_path))
323
  return [str(output_path)]
324
  else:
 
325
  if isinstance(data, list):
326
  paths = []
327
  for idx, item in enumerate(data):
328
  cleaned_item = clean_json(item)
329
+ if cleaned_item is None:
330
  logger.error(f"Failed to clean item {idx} for QR code generation.")
331
+ continue
332
 
333
  qr = qrcode.QRCode(
334
  version=None,
 
346
  paths.append(str(output_path))
347
  return paths
348
  else:
 
349
  cleaned_item = clean_json(data)
350
+ if cleaned_item is None:
351
  logger.error("Failed to clean single item for QR code generation.")
352
  return []
353
 
 
462
  # Process text input first (since it's direct JSON)
463
  if text and text.strip():
464
  try:
 
465
  json_data = json.loads(text)
466
  if isinstance(json_data, list):
467
  results.extend(json_data)
 
498
  # Generate QR codes
499
  if results:
500
  if combine:
 
501
  combined_data = []
502
  for item in results:
503
  combined_data.extend(file_processor.chunk_data(item))
 
512
  f"✅ Successfully processed {len(results)} items and generated {len(qr_paths)} QR code(s)!"
513
  )
514
  else:
515
+ return None, [], " ❌ Failed to generate QR codes. Please check the input data."
516
  else:
517
  return None, [], "⚠️ No valid content to process. Please provide some input data."
518