acecalisto3 commited on
Commit
d7b800d
·
verified ·
1 Parent(s): 89dda1c

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +269 -534
app.py CHANGED
@@ -1,560 +1,295 @@
1
- import json
2
  import os
3
- import re
4
- import time
5
  import logging
6
- import mimetypes
7
  import zipfile
 
 
8
  import tempfile
9
- from datetime import datetime
10
- from typing import List, Dict, Optional, Union
11
- from pathlib import Path
12
- from urllib.parse import urlparse
13
-
14
- import requests
15
- import validators
16
- import gradio as gr
17
- from diskcache import Cache
18
- from bs4 import BeautifulSoup
19
- from fake_useragent import UserAgent
20
- from cleantext import clean
21
- import qrcode
22
-
23
- # Setup logging
24
- logging.basicConfig(
25
- level=logging.INFO,
26
- format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s',
27
- handlers=[
28
- logging.StreamHandler(),
29
- logging.FileHandler('app.log', encoding='utf-8')
30
- ]
31
- )
32
- logger = logging.getLogger(__name__)
33
 
34
- # Ensure output directories exist
35
- Path('output/qr_codes').mkdir(parents=True, exist_ok=True)
36
 
37
- class URLProcessor:
38
  def __init__(self):
39
- self.session = requests.Session()
40
- self.timeout = 10 # seconds
41
- self.session.headers.update({
42
- 'User -Agent': UserAgent().random,
43
- 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
44
- 'Accept-Language': 'en-US,en;q=0.5',
45
- 'Accept-Encoding': 'gzip, deflate, br',
46
- 'Connection': 'keep-alive',
47
- 'Upgrade-Insecure-Requests': '1'
48
- })
49
-
50
- def advanced_text_cleaning(self, text: str) -> str:
51
- """Robust text cleaning with version compatibility"""
52
- try:
53
- cleaned_text = clean(
54
- text,
55
- fix_unicode=True,
56
- to_ascii=True,
57
- lower=True,
58
- no_line_breaks=True,
59
- no_urls=True,
60
- no_emails=True,
61
- no_phone_numbers=True,
62
- no_numbers=False,
63
- no_digits=False,
64
- no_currency_symbols=True,
65
- no_punct=False
66
- ).strip()
67
- return cleaned_text
68
- except Exception as e:
69
- logger.warning(f"Text cleaning error: {e}. Using fallback method.")
70
- text = re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text) # Remove control characters
71
- text = text.encode('ascii', 'ignore').decode('ascii') # Remove non-ASCII characters
72
- text = re.sub(r'\s+', ' ', text) # Normalize whitespace
73
- return text.strip()
74
-
75
- def validate_url(self, url: str) -> Dict:
76
- """Validate URL format and accessibility"""
77
- try:
78
- if not validators.url(url):
79
- return {'is_valid': False, 'message': 'Invalid URL format'}
80
-
81
- response = self.session.head(url, timeout=self.timeout)
82
- response.raise_for_status()
83
- return {'is_valid': True, 'message': 'URL is valid and accessible'}
84
- except Exception as e:
85
- return {'is_valid': False, 'message': f'URL validation failed: {str(e)}'}
86
 
87
- def fetch_content(self, url: str) -> Optional[Dict]:
88
- """Universal content fetcher with special case handling"""
89
- try:
90
- # Google Drive document handling
91
- if 'drive.google.com' in url:
92
- return self._handle_google_drive(url)
93
 
94
- # Google Calendar ICS handling
95
- if 'calendar.google.com' in url and 'ical' in url:
96
- return self._handle_google_calendar(url)
97
 
98
- # Standard HTML processing
99
- return self._fetch_html_content(url)
100
- except Exception as e:
101
- logger.error(f"Content fetch failed: {e}")
102
- return None
103
-
104
- def _handle_google_drive(self, url: str) -> Optional[Dict]:
105
- """Process Google Drive file links"""
106
  try:
107
- file_id = re.search(r'/file/d/([a-zA-Z0-9_-]+)', url)
108
- if not file_id:
109
- logger.error(f"Invalid Google Drive URL: {url}")
110
- return None
111
-
112
- direct_url = f"https://drive.google.com/uc?export=download&id={file_id.group(1)}"
113
- response = self.session.get(direct_url, timeout=self.timeout)
114
- response.raise_for_status()
115
-
116
- return {
117
- 'content': response.text,
118
- 'content_type': response.headers.get('Content-Type', ''),
119
- 'timestamp': datetime.now().isoformat()
120
- }
121
  except Exception as e:
122
- logger.error(f"Google Drive processing failed: {e}")
123
- return None
 
 
 
 
 
 
 
 
 
 
124
 
125
- def _handle_google_calendar(self, url: str) -> Optional[Dict]:
126
- """Process Google Calendar ICS feeds"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
127
  try:
128
- response = self.session.get(url, timeout=self.timeout)
129
- response.raise_for_status()
130
- return {
131
- 'content': response.text,
132
- 'content_type': 'text/calendar',
133
- 'timestamp': datetime.now().isoformat()
134
- }
 
 
 
 
 
 
 
 
 
 
135
  except Exception as e:
136
- logger.error(f"Calendar fetch failed: {e}")
137
- return None
 
 
138
 
139
- def _fetch_html_content(self, url: str) -> Optional[Dict]:
140
- """Standard HTML content processing"""
 
141
  try:
142
- response = self.session.get(url, timeout=self.timeout)
143
- response.raise_for_status()
144
-
145
- soup = BeautifulSoup(response.text, 'html.parser')
146
-
147
- # Remove unwanted elements
148
- for element in soup(['script', 'style', 'nav', 'footer', 'header', 'meta', 'link']):
149
- element.decompose()
150
-
151
- # Extract main content
152
- main_content = soup.find('main') or soup.find('article') or soup.body
153
-
154
- if main_content is None:
155
- logger.warning(f"No main content found for URL: {url}")
156
- return {
157
- 'content': '',
158
- 'content_type': response.headers.get('Content-Type', ''),
159
- 'timestamp': datetime.now().isoformat()
160
- }
161
-
162
- # Clean and structure content
163
- text_content = main_content.get_text(separator='\n', strip=True)
164
- cleaned_content = self.advanced_text_cleaning(text_content)
165
-
166
- return {
167
- 'content': cleaned_content,
168
- 'content_type': response.headers.get('Content-Type', ''),
169
- 'timestamp': datetime.now().isoformat()
170
- }
171
  except Exception as e:
172
- logger.error(f"HTML processing failed: {e}")
173
- return None
 
 
174
 
175
- class FileProcessor:
176
- """Class to handle file processing"""
177
 
178
- def __init__(self, max_file_size: int = 2 * 1024 * 1024 * 1024): # 2GB default
179
- self.max_file_size = max_file_size
180
- self.supported_text_extensions = {'.txt', '.md', '.csv', '.json', '.xml'}
181
 
182
- def is_text_file(self, filepath: str) -> bool:
183
- """Check if file is a text file"""
184
- try:
185
- mime_type, _ = mimetypes.guess_type(filepath)
186
- return (mime_type and mime_type.startswith('text/')) or \
187
- (os.path.splitext(filepath)[1].lower() in self.supported_text_extensions)
188
- except Exception:
189
- return False
190
-
191
- def process_file(self, file) -> List[Dict]:
192
- """Process uploaded file with enhanced error handling"""
193
- if not file:
194
- return []
195
-
196
- dataset = []
197
- try:
198
- file_size = os.path.getsize(file.name)
199
- if file_size > self.max_file_size:
200
- logger.warning(f"File size ({file_size} bytes) exceeds maximum allowed size")
201
- return []
202
-
203
- with tempfile.TemporaryDirectory() as temp_dir:
204
- if zipfile.is_zipfile(file.name):
205
- dataset.extend(self._process_zip_file(file.name, temp_dir))
206
- else:
207
- dataset.extend(self._process_single_file(file))
208
 
209
- except Exception as e:
210
- logger.error(f"Error processing file: {str(e)}")
211
- return []
212
-
213
- return dataset
214
-
215
- def chunk_data(self, data, max_size=2953): # 2953 is the max size for version 1 QR code
216
- """Chunk data into smaller pieces if it exceeds max_size."""
217
- json_str = json.dumps(data, ensure_ascii=False)
218
- if len(json_str) <= max_size:
219
- return [json_str]
220
-
221
- # Split into chunks
222
- chunks = []
223
- while json_str:
224
- chunk = json_str[:max_size]
225
- chunks.append(chunk)
226
- json_str = json_str[max_size:]
227
-
228
- return chunks
229
-
230
- def _process_single_file(self, file) -> List[Dict]:
231
- """Process a single file"""
232
- try:
233
- file_stat = os.stat(file.name)
234
-
235
- # For very large files, read in chunks and summarize
236
- if file_stat.st_size > 100 * 1024 * 1024: # 100MB
237
- logger.info(f"Processing large file: {file.name} ({file_stat.st_size} bytes)")
238
-
239
- # Read first and last 1MB for extremely large files
240
- content = ""
241
- with open(file.name, 'r', encoding='utf-8', errors='ignore') as f:
242
- content = f.read(1 * 1024 * 1024) # First 1MB
243
- content += "\n...[Content truncated due to large file size]...\n"
244
-
245
- # Seek to the last 1MB
246
- f.seek(max(0, file_stat.st_size - 1 * 1024 * 1024))
247
- content += f.read() # Last 1MB
248
- else:
249
- # Regular file processing
250
- with open(file.name, 'r', encoding='utf-8', errors='ignore') as f:
251
- content = f.read()
252
-
253
- return [{
254
- 'source': 'filename', # Assuming 'source' should be a string value
255
- 'filename': os.path.basename(file.name),
256
- 'file_size': file_stat.st_size,
257
- 'mime_type': mimetypes.guess_type(file.name)[0],
258
- 'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(),
259
- 'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(),
260
- 'content': content,
261
- 'timestamp': datetime.now().isoformat()
262
- }]
263
- except Exception as e:
264
- logger.error(f"File processing error: {e}")
265
- return []
266
-
267
- def clean_json(data: Union[str, Dict]) -> Optional[Dict]:
268
- """Clean and validate JSON data"""
269
- try:
270
- # If it's a string, try to parse it
271
- if isinstance(data, str):
272
- # Remove any existing content and extra whitespace
273
- data = data.strip()
274
- data = json.loads(data)
275
-
276
- # Convert to string and back to ensure proper JSON format
277
- cleaned = json.loads(json.dumps(data))
278
- return cleaned
279
- except json.JSONDecodeError as e:
280
- logger.error(f"JSON cleaning error: {e}")
281
- return None
282
- except Exception as e:
283
- logger.error(f"Unexpected error while cleaning JSON: {e}")
284
- return None
285
-
286
- def generate_qr_code(data: Union[str, Dict], combined: bool = True) -> List[str]:
287
- """Generate QR code(s) from data"""
288
- try:
289
- output_dir = Path('output/qr_codes')
290
- output_dir.mkdir(parents=True, exist_ok=True)
291
-
292
- if combined:
293
- # Generate single QR code for all data
294
- cleaned_data = clean_json(data)
295
- if cleaned_data is None: # Check if cleaning failed
296
- logger.error("Failed to clean data for QR code generation.")
297
- return []
298
-
299
- qr = qrcode.QRCode(
300
- version=None,
301
- error_correction=qrcode.constants.ERROR_CORRECT_L,
302
- box_size=10,
303
- border=4,
304
- )
305
- json_str = json.dumps(cleaned_data, ensure_ascii=False)
306
- qr.add_data(json_str)
307
- qr.make(fit=True)
308
-
309
- img = qr.make_image(fill_color="black", back_color="white")
310
- output_path = output_dir / f'combined_qr_{int(time.time())}.png'
311
- img.save(str(output_path))
312
- return [str(output_path)]
313
- else:
314
- # Generate separate QR codes for each item
315
- if isinstance(data, list):
316
- paths = []
317
- for idx, item in enumerate(data):
318
- cleaned_item = clean_json(item)
319
- if cleaned_item is None: # Check if cleaning failed
320
- logger.error(f"Failed to clean item {idx} for QR code generation.")
321
- continue # Skip this item
322
-
323
- qr = qrcode.QRCode(
324
- version=None,
325
- error_correction=qrcode.constants.ERROR_CORRECT_L,
326
- box_size=10,
327
- border=4,
328
- )
329
- json_str = json.dumps(cleaned_item, ensure_ascii=False)
330
- qr.add_data(json_str)
331
- qr.make(fit=True)
332
-
333
- img = qr.make_image(fill_color="black", back_color="white")
334
- output_path = output_dir / f'item_{idx}_qr_{int(time.time())}.png'
335
- img.save(str(output_path))
336
- paths.append(str(output_path))
337
- return paths
338
  else:
339
- # Single item, not combined
340
- cleaned_item = clean_json(data)
341
- if cleaned_item is None: # Check if cleaning failed
342
- logger.error("Failed to clean single item for QR code generation.")
343
- return []
344
-
345
- qr = qrcode.QRCode(
346
- version=None,
347
- error_correction=qrcode.constants.ERROR_CORRECT_L,
348
- box_size=10,
349
- border=4,
350
- )
351
- json_str = json.dumps(cleaned_item, ensure_ascii=False)
352
- qr.add_data(json_str)
353
- qr.make(fit=True)
354
-
355
- img = qr.make_image(fill_color="black", back_color="white")
356
- output_path = output_dir / f'single_qr_{int(time.time())}.png'
357
- img.save(str(output_path))
358
- return [str(output_path)]
359
-
360
- return []
361
- except Exception as e:
362
- logger.error(f"QR generation error: {e}")
363
- return []
364
-
365
- def create_interface():
366
- """Create a comprehensive Gradio interface with advanced features"""
367
-
368
- css = """
369
- .container { max-width: 1200px; margin: auto; }
370
- .warning { background-color: #fff3cd; color: #856404; padding: 10px; border-radius: 4px; }
371
- .error { background-color: #f8d7da; color: #721c24; padding: 10px; border-radius: 4px; }
372
- .success { background-color: #d4edda; color: #155724; padding: 10px; border-radius: 4px; }
373
- """
374
-
375
- with gr.Blocks(css=css, title="Advanced Data Processor & QR Generator") as interface:
376
- gr.Markdown("# 🌐 Advanced Data Processing & QR Code Generator")
377
-
378
- with gr.Tab("URL Processing"):
379
- url_input = gr.Textbox(
380
- label="Enter URLs (comma or newline separated)",
381
- lines=5,
382
- placeholder="https://example1.com\nhttps://example2.com",
383
- value=""
384
- )
385
-
386
- with gr.Tab("File Input"):
387
- file_input = gr.File(
388
- label="Upload text file or ZIP archive",
389
- file_types=[".txt", ".zip", ".md", ".csv", ".json", ".xml"]
390
- )
391
-
392
- with gr.Tab("Notepad"):
393
- text_input = gr.TextArea(
394
- label="JSON Data Input",
395
- lines=15,
396
- placeholder="Paste your JSON data here...",
397
- value=""
398
- )
399
-
400
- with gr.Row():
401
- example_btn = gr.Button("📝 Load Example JSON", variant="secondary")
402
- clear_btn = gr.Button("🗑️ Clear Input", variant="secondary")
403
-
404
- with gr.Row():
405
- combine_data = gr.Checkbox(
406
- label="Combine all data into single QR code",
407
- value=True,
408
- info="Generate one QR code for all data, or separate QR codes for each item"
409
- )
410
- process_btn = gr.Button("🔄 Process & Generate QR", variant="primary", scale=2)
411
-
412
- output_json = gr.JSON(label="Processed JSON Data")
413
- output_gallery = gr.Gallery(label="Generated QR Codes", columns=2, height=400)
414
- output_text = gr.Textbox(label="Processing Status", interactive=False)
415
-
416
- def load_example():
417
- example_json = {
418
- "type": "product_catalog",
419
- "items": [
420
- {
421
- "id": "123",
422
- "name": "Test Product",
423
- "description": "This is a test product description",
424
- "price": 29.99,
425
- "category": "electronics",
426
- "tags": ["test", "sample", "demo"]
427
- },
428
- {
429
- "id": "456",
430
- "name": "Another Product",
431
- "description": "Another test product description",
432
- "price": 49.99,
433
- "category": "accessories",
434
- "tags": ["sample", "test"]
435
- }
436
- ],
437
- "metadata": {
438
- "timestamp": datetime.now().isoformat(),
439
- "version": "1.0",
440
- "source": "example"
441
- }
442
- }
443
- return json.dumps(example_json, indent=2)
444
-
445
- def clear_input():
446
- return ""
447
-
448
- def process_all_inputs(urls, file, text, combine):
449
- """Process all input types and generate QR codes"""
450
- try:
451
- results = []
452
 
453
- # Process text input first (since it's direct JSON)
454
- if text and text.strip():
455
- try:
456
- # Try to parse as JSON
457
- json_data = json.loads(text)
458
- if isinstance(json_data, list):
459
- results.extend(json_data)
460
- else:
461
- results.append(json_data)
462
- except json.JSONDecodeError as e:
463
- return None, [], f"❌ Invalid JSON format: {str(e)}"
464
-
465
- # Process URLs if provided
466
- if urls and urls.strip():
467
- processor = URLProcessor()
468
- url_list = re.split(r'[,\n]', urls)
469
- url_list = [url.strip() for url in url_list if url.strip()]
470
-
471
- for url in url_list:
472
- validation = processor.validate_url(url)
473
- if validation.get('is_valid'):
474
- content = processor.fetch_content(url)
475
- if content:
476
- results.append({
477
- 'source': 'url',
478
- 'url': url,
479
- 'content': content,
480
- 'timestamp': datetime.now().isoformat()
481
- })
482
-
483
- # Process files if provided
484
- if file:
485
- file_processor = FileProcessor()
486
- file_results = file_processor.process_file(file)
487
- if file_results:
488
- results.extend(file_results)
489
-
490
- # Generate QR codes
491
- if results:
492
- if combine:
493
- # Chunk the data if necessary
494
- combined_data = []
495
- for item in results:
496
- combined_data.extend(file_processor.chunk_data(item))
497
- qr_paths = generate_qr_code(combined_data, combined=False)
498
- else:
499
- qr_paths = generate_qr_code(results, combined=combine)
500
-
501
- if qr_paths:
502
- return (
503
- results,
504
- [str(path) for path in qr_paths],
505
- f"✅ Successfully processed {len(results)} items and generated {len(qr_paths)} QR code(s)!"
506
- )
507
- else:
508
- return None, [], "❌ Failed to generate QR codes. Please check the input data."
509
- else:
510
- return None, [], "⚠️ No valid content to process. Please provide some input data."
511
-
512
- except Exception as e:
513
- logger.error(f"Processing error: {e}")
514
- return None, [], f"❌ Error: {str(e)}"
515
-
516
- # Set up event handlers
517
- example_btn.click(load_example, outputs=[text_input])
518
- clear_btn.click(clear_input, outputs=[text_input])
519
- process_btn.click(
520
- process_all_inputs,
521
- inputs=[url_input, file_input, text_input, combine_data],
522
- outputs=[output_json, output_gallery, output_text]
523
- )
524
-
525
- gr.Markdown("""
526
- ### Features
527
- - **URL Processing**: Extract content from websites
528
- - **File Processing**: Handle text files and archives
529
- - **Notepad**: Direct JSON data input/manipulation
530
- - **JSON Cleaning**: Automatic JSON validation and formatting
531
- - **QR Generation**: Generate QR codes with embedded JSON data
532
- - **Flexible Output**: Choose between combined or separate QR codes
533
-
534
- ### Usage Tips
535
- 1. Use the **Notepad** tab for direct JSON input
536
- 2. Click "Load Example JSON" to see a sample format
537
- 3. Choose whether to combine all data into a single QR code
538
- 4. The generated QR codes will contain the complete JSON data
539
- """)
540
-
541
- return interface
542
-
543
- def main():
544
- # Configure system settings
545
- mimetypes.init()
546
-
547
- # Create output directories
548
- Path('output/qr_codes').mkdir(parents=True, exist_ok=True)
549
-
550
- # Create and launch interface
551
- interface = create_interface()
552
-
553
- # Launch with proper configuration for Hugging Face
554
- interface.launch(
555
- share=False,
556
- debug=False # Set to False for production
557
- )
558
 
559
  if __name__ == "__main__":
560
- main()
 
 
1
+ import gradio as gr
2
  import os
 
 
3
  import logging
 
4
  import zipfile
5
+ import io
6
+ from pypdf import PdfReader
7
  import tempfile
8
+ import traceback
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9
 
10
+ logging.basicConfig(level=logging.INFO)
 
11
 
12
+ class FileProcessor:
13
  def __init__(self):
14
+ pass
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
15
 
16
+ def process_file(self, file_obj): # Modified to accept file_obj directly
17
+ if file_obj is None:
18
+ return "Error: No file uploaded."
 
 
 
19
 
20
+ file_path = file_obj.name
21
+ logging.info(f"Processing file: {file_path}")
22
+ file_extension = os.path.splitext(file_path)[1].lower()
23
 
 
 
 
 
 
 
 
 
24
  try:
25
+ if file_extension == '.pdf':
26
+ return self._process_pdf_file(file_path)
27
+ elif file_extension == '.zip':
28
+ return self._process_zip_file(file_path)
29
+ elif file_extension == '.txt':
30
+ return self._process_txt_file(file_path)
31
+ else:
32
+ error_message = f"Error: Unsupported file type: {file_extension}. Please upload .pdf, .txt, or .zip files."
33
+ logging.warning(error_message)
34
+ return error_message
 
 
 
 
35
  except Exception as e:
36
+ error_message = f"Fatal error processing file: {os.path.basename(file_path)}. Please try again or contact support. Technical details logged."
37
+ logging.error(f"Unhandled exception processing file: {file_path} - {e}")
38
+ logging.error(traceback.format_exc()) # Log full traceback for debugging
39
+ return error_message
40
+ finally:
41
+ try:
42
+ if os.path.exists(file_path):
43
+ os.remove(file_path)
44
+ logging.info(f"Temporary file removed: {file_path}")
45
+ except OSError as e:
46
+ logging.error(f"Error removing temporary file {file_path}: {e}")
47
+
48
 
49
+ def _process_pdf_file(self, file_path):
50
+ text = ""
51
+ try:
52
+ with open(file_path, 'rb') as f: # Open in binary mode for PdfReader
53
+ reader = PdfReader(f)
54
+ if not reader.is_encrypted: # Check if PDF is encrypted before processing
55
+ for page in reader.pages:
56
+ text += page.extract_text()
57
+ logging.info(f"Successfully processed PDF file: {file_path}")
58
+ if not text.strip(): # Check if extracted text is empty
59
+ return "Warning: PDF processed, but no text content found. The PDF might contain images or scanned content."
60
+ return text
61
+ else:
62
+ error_message = f"Error: Encrypted PDF file: {os.path.basename(file_path)}. Processing of encrypted PDFs is not supported."
63
+ logging.warning(error_message)
64
+ return error_message
65
+
66
+ except FileNotFoundError:
67
+ error_message = f"Error: PDF file not found: {os.path.basename(file_path)}. Please ensure the file was uploaded correctly."
68
+ logging.error(f"File not found: {file_path}")
69
+ return error_message
70
+ except PdfReader.errors.PdfStreamError as e: # Specific error for corrupted PDF streams
71
+ error_message = f"Error: Corrupted PDF file: {os.path.basename(file_path)}. The PDF file appears to be damaged or invalid. Error details: {e}"
72
+ logging.error(f"Corrupted PDF stream error: {file_path} - {e}")
73
+ return error_message
74
+ except Exception as e: # Catch-all for other PDF processing errors
75
+ error_message = f"Error processing PDF file: {os.path.basename(file_path)}. It might be corrupted or use unsupported features. Error details logged."
76
+ logging.error(f"General PDF processing error: {file_path} - {e}")
77
+ logging.error(traceback.format_exc()) # Log full traceback for debugging
78
+ return error_message
79
+
80
+
81
+ def _process_zip_file(self, file_path):
82
+ extracted_text = ""
83
+ error_occurred = False
84
+ try:
85
+ with zipfile.ZipFile(file_path, 'r') as zf:
86
+ if not zf.namelist(): # Check for empty ZIP file
87
+ return "Warning: ZIP file is empty and contains no files to process."
88
+ for filename in zf.namelist():
89
+ try:
90
+ if filename.lower().endswith('.pdf'):
91
+ with zf.open(filename) as pdf_file:
92
+ pdf_content = pdf_file.read()
93
+ text = self._process_pdf_content(io.BytesIO(pdf_content), filename=filename) # Pass filename for better error context
94
+ extracted_text += f"File: {filename}\nContent:\n{text}\n\n"
95
+ logging.info(f"Successfully processed PDF within ZIP: {filename}")
96
+ elif filename.lower().endswith('.txt'):
97
+ with zf.open(filename) as txt_file:
98
+ text = txt_file.read().decode('utf-8', errors='ignore') # Handle potential encoding issues in TXT
99
+ extracted_text += f"File: {filename}\nContent:\n{text}\n\n"
100
+ logging.info(f"Successfully processed TXT within ZIP: {filename}")
101
+ else:
102
+ logging.warning(f"Skipping unsupported file type within ZIP: {filename}")
103
+ except Exception as e: # Catch errors for individual files within ZIP
104
+ error_message = f"Error processing file '{filename}' within ZIP: {os.path.basename(file_path)}. Error: {e}"
105
+ logging.error(error_message)
106
+ logging.error(traceback.format_exc()) # Log traceback for inner ZIP errors
107
+ extracted_text += f"File: {filename}\nError processing file. See logs for details.\n\n" # User-friendly error in output
108
+ error_occurred = True # Flag that an error occurred within the zip
109
+
110
+ if not error_occurred:
111
+ logging.info(f"Successfully processed ZIP file: {file_path}")
112
+ else:
113
+ logging.warning(f"ZIP file processed with some errors: {file_path}. Check output for details.")
114
+ return extracted_text
115
+
116
+ except zipfile.BadZipFile: # Specific error for invalid ZIP file
117
+ error_message = f"Error: Invalid or corrupted ZIP file: {os.path.basename(file_path)}. Please ensure it is a valid ZIP archive."
118
+ logging.error(f"Bad ZIP file error: {file_path}")
119
+ return error_message
120
+ except Exception as e: # Catch-all for other ZIP processing errors
121
+ error_message = f"Error processing ZIP file: {os.path.basename(file_path)}. It might be corrupted or have an unexpected structure. Error details logged."
122
+ logging.error(f"General ZIP processing error: {file_path} - {e}")
123
+ logging.error(traceback.format_exc()) # Log full traceback for debugging
124
+ return error_message
125
+
126
+
127
+ def _process_pdf_content(self, pdf_content_stream, filename=""): # Added filename for context
128
+ text = ""
129
  try:
130
+ reader = PdfReader(pdf_content_stream)
131
+ if not reader.is_encrypted:
132
+ for page in reader.pages:
133
+ text += page.extract_text()
134
+ if not text.strip():
135
+ logging.warning(f"PDF content processed from '{filename}', but no text found.") # Filename context
136
+ return "Warning: PDF content processed, but no text content found."
137
+ return text
138
+ else:
139
+ error_message = f"Error: Encrypted PDF content found in '{filename}'. Processing encrypted PDFs is not supported."
140
+ logging.warning(error_message)
141
+ return error_message
142
+
143
+ except PdfReader.errors.PdfStreamError as e:
144
+ error_message = f"Error: Corrupted PDF content in '{filename}'. PDF stream error: {e}" # Filename context
145
+ logging.error(error_message)
146
+ return error_message
147
  except Exception as e:
148
+ error_message = f"Error processing PDF content from '{filename}'. Error details logged." # Filename context
149
+ logging.error(f"Error processing PDF content from stream (file: {filename}) - {e}")
150
+ logging.error(traceback.format_exc())
151
+ return error_message
152
 
153
+
154
+ def _process_txt_file(self, file_path):
155
+ text = ""
156
  try:
157
+ with open(file_path, 'r', encoding='utf-8', errors='ignore') as file: # Handle potential encoding issues
158
+ text = file.read()
159
+ logging.info(f"Successfully processed TXT file: {file_path}")
160
+ if not text.strip(): # Check for empty TXT
161
+ return "Warning: TXT file processed, but it is empty."
162
+ return text
163
+ except FileNotFoundError:
164
+ error_message = f"Error: TXT file not found: {os.path.basename(file_path)}. Please ensure the file was uploaded correctly."
165
+ logging.error(f"File not found: {file_path}")
166
+ return error_message
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
167
  except Exception as e:
168
+ error_message = f"Error processing TXT file: {os.path.basename(file_path)}. Error details logged."
169
+ logging.error(f"Error processing TXT file: {file_path} - {e}")
170
+ logging.error(traceback.format_exc())
171
+ return error_message
172
 
 
 
173
 
174
+ # Initialize FileProcessor
175
+ file_processor = FileProcessor()
 
176
 
177
+ def process_file_and_respond(file_obj): # No change needed here as file_obj is now directly processed
178
+ return file_processor.process_file(file_obj)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
179
 
180
+
181
+ def test_functionality_enhanced():
182
+ temp_dir = tempfile.TemporaryDirectory()
183
+ test_dir = temp_dir.name
184
+
185
+ # --- Create test files in temporary directory ---
186
+ def create_test_file(filepath, content, mode='w'): # Helper function for file creation
187
+ with open(filepath, mode, encoding='utf-8') as f: # Default text mode
188
+ f.write(content)
189
+
190
+ def create_binary_test_file(filepath, content_binary, mode='wb'): # Helper for binary file creation
191
+ with open(filepath, mode) as f:
192
+ f.write(content_binary)
193
+
194
+ pdf_content = "This is a test PDF file.\nWith multiple lines."
195
+ txt_content = "This is a test TXT file.\nAnother line of text."
196
+ zip_content_pdf = "PDF content inside ZIP."
197
+ zip_content_txt = "TXT content inside ZIP."
198
+ empty_txt_content = ""
199
+ encrypted_pdf_content = "%PDF-1.5\n%����\n1 0 obj\n<< /Type /Catalog /Pages 2 0 R >>\nendobj\n2 0 obj\n<< /Type /Pages /Kids [ 3 0 R ] /Count 1 >>\nendobj\n3 0 obj\n<< /Type /Page /MediaBox [ 0 0 612 792 ] /Contents 4 0 R /Parent 2 0 R >>\nendobj\n4 0 obj\n<< /Length 5 >>\nstream\nBT\n/F1 12 Tf\n72 712 Td\n(This is an encrypted PDF - fake content) Tj\nET\nendstream\nendobj\n5 0 obj\n<< /Length 44 >>\nstream\n/Filter /FlateDecode\n/Length 44\nstream\nxœ+��\x0e@E\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\nendstream\nendstream\nendobj\nxref\n0 6\n0000000000 65535 f\n0000000015 00000 n\n0000000062 00000 n\n0000000112 00000 n\n0000000179 00000 n\n0000000259 00000 n\ntrailer\n<< /Size 6 /Root 1 0 R >>\nstartxref\n369\n%%EOF\n" # Minimal PDF structure - not actually encrypted, but enough to test encryption check
200
+
201
+ pdf_file_path = os.path.join(test_dir, "test.pdf")
202
+ txt_file_path = os.path.join(test_dir, "test.txt")
203
+ zip_file_path = os.path.join(test_dir, "test.zip")
204
+ unsupported_file_path = os.path.join(test_dir, "test.csv")
205
+ corrupted_pdf_path = os.path.join(test_dir, "corrupted.pdf")
206
+ empty_txt_path = os.path.join(test_dir, "empty.txt")
207
+ empty_zip_path = os.path.join(test_dir, "empty.zip")
208
+ encrypted_pdf_path = os.path.join(test_dir, "encrypted.pdf")
209
+
210
+
211
+ create_test_file(pdf_file_path, pdf_content)
212
+ create_test_file(txt_file_path, txt_content)
213
+ create_test_file(unsupported_file_path, "test csv content")
214
+ create_test_file(empty_txt_path, empty_txt_content)
215
+ create_binary_test_file(encrypted_pdf_path, encrypted_pdf_content.encode('latin-1')) # Encrypted PDF test - use latin-1 to avoid encoding issues with PDF structure
216
+
217
+ # Create a "corrupted" PDF by just writing plain text to a .pdf file.
218
+ create_test_file(corrupted_pdf_path, "This is NOT a valid PDF file.")
219
+
220
+
221
+ with zipfile.ZipFile(zip_file_path, 'w') as zf:
222
+ zf.writestr("zip_test.pdf", zip_content_pdf)
223
+ zf.writestr("zip_test.txt", zip_content_txt)
224
+
225
+ with zipfile.ZipFile(empty_zip_path, 'w') as zf: # Create empty zip
226
+ pass
227
+
228
+
229
+ # --- Test cases ---
230
+ test_cases = [
231
+ {"name": "PDF Processing", "file_path": pdf_file_path, "expected_content": pdf_content, "expect_error": False},
232
+ {"name": "TXT Processing", "file_path": txt_file_path, "expected_content": txt_content, "expect_error": False},
233
+ {"name": "ZIP Processing (PDF & TXT)", "file_path": zip_file_path, "expected_content_in": [zip_content_pdf, zip_content_txt], "expect_error": False},
234
+ {"name": "Unsupported File Type", "file_path": unsupported_file_path, "expected_content": "Unsupported file type", "expect_error": True},
235
+ {"name": "Corrupted PDF Processing", "file_path": corrupted_pdf_path, "expected_content": "Error processing PDF file", "expect_error": True},
236
+ {"name": "Empty TXT File", "file_path": empty_txt_path, "expected_content": "Warning: TXT file processed, but it is empty.", "expect_error": False},
237
+ {"name": "Empty ZIP File", "file_path": empty_zip_path, "expected_content": "Warning: ZIP file is empty", "expect_error": False},
238
+ {"name": "Encrypted PDF File", "file_path": encrypted_pdf_path, "expected_content": "Error: Encrypted PDF file", "expect_error": True},
239
+ ]
240
+
241
+ all_tests_passed = True
242
+ for case in test_cases:
243
+ print(f"\n--- Test Case: {case['name']} ---")
244
+ result = file_processor.process_file(SimpleFileObject(case['file_path'])) # Use SimpleFileObject to simulate file upload
245
+ print(f"Result: {result[:100]}...") # Print first 100 chars of result
246
+
247
+ if case.get("expect_error"):
248
+ if case["expected_content"] not in result:
249
+ print(f" ❌ FAIL: Expected error message containing '{case['expected_content']}', but got: {result}")
250
+ all_tests_passed = False
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
251
  else:
252
+ print(f" ✅ PASS: Expected error message found.")
253
+ elif case.get("expected_content_in"): # For cases expecting multiple contents (like ZIP)
254
+ all_contents_found = True
255
+ for expected_content in case["expected_content_in"]:
256
+ if expected_content not in result:
257
+ print(f" ❌ FAIL: Expected content '{expected_content}' not found in result for {case['name']}. Got: {result[:100]}...")
258
+ all_contents_found = False
259
+ all_tests_passed = False
260
+ break
261
+ if all_contents_found:
262
+ print(f" ✅ PASS: All expected contents found.")
263
+
264
+ elif case.get("expected_content"):
265
+ if case["expected_content"] not in result:
266
+ print(f" ❌ FAIL: Expected content '{case['expected_content']}', but got: {result[:100]}...")
267
+ all_tests_passed = False
268
+ else:
269
+ print(f" ✅ PASS: Expected content found.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
270
 
271
+
272
+ if all_tests_passed:
273
+ print("\n🎉 All enhanced tests completed successfully! 🎉")
274
+ else:
275
+ print("\n⚠️ Some enhanced tests FAILED. See details above. ⚠️")
276
+
277
+ temp_dir.cleanup() # Clean up temporary directory and files
278
+
279
+
280
+ class SimpleFileObject: # Mock file object for testing
281
+ def __init__(self, file_path):
282
+ self.name = file_path
283
+
284
+
285
+ iface = gr.Interface(
286
+ fn=process_file_and_respond,
287
+ inputs=gr.File(file_types=[".pdf", ".txt", ".zip"]),
288
+ outputs="text",
289
+ title="Robust File Processing Agent",
290
+ description="Upload a PDF, TXT, or ZIP file to process its content. Enhanced for error handling and robustness."
291
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
292
 
293
  if __name__ == "__main__":
294
+ test_functionality_enhanced() # Run enhanced tests
295
+ iface.launch(debug=True)