acecalisto3 commited on
Commit
c70f013
Β·
verified Β·
1 Parent(s): d7b800d

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +534 -269
app.py CHANGED
@@ -1,295 +1,560 @@
1
- import gradio as gr
2
  import os
 
 
3
  import logging
 
4
  import zipfile
5
- import io
6
- from pypdf import PdfReader
7
  import tempfile
8
- import traceback
9
-
10
- logging.basicConfig(level=logging.INFO)
11
-
12
- class FileProcessor:
13
- def __init__(self):
14
- pass
15
 
16
- def process_file(self, file_obj): # Modified to accept file_obj directly
17
- if file_obj is None:
18
- return "Error: No file uploaded."
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
19
 
20
- file_path = file_obj.name
21
- logging.info(f"Processing file: {file_path}")
22
- file_extension = os.path.splitext(file_path)[1].lower()
23
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
24
  try:
25
- if file_extension == '.pdf':
26
- return self._process_pdf_file(file_path)
27
- elif file_extension == '.zip':
28
- return self._process_zip_file(file_path)
29
- elif file_extension == '.txt':
30
- return self._process_txt_file(file_path)
31
- else:
32
- error_message = f"Error: Unsupported file type: {file_extension}. Please upload .pdf, .txt, or .zip files."
33
- logging.warning(error_message)
34
- return error_message
 
 
 
 
 
35
  except Exception as e:
36
- error_message = f"Fatal error processing file: {os.path.basename(file_path)}. Please try again or contact support. Technical details logged."
37
- logging.error(f"Unhandled exception processing file: {file_path} - {e}")
38
- logging.error(traceback.format_exc()) # Log full traceback for debugging
39
- return error_message
40
- finally:
41
- try:
42
- if os.path.exists(file_path):
43
- os.remove(file_path)
44
- logging.info(f"Temporary file removed: {file_path}")
45
- except OSError as e:
46
- logging.error(f"Error removing temporary file {file_path}: {e}")
47
-
48
-
49
- def _process_pdf_file(self, file_path):
50
- text = ""
51
- try:
52
- with open(file_path, 'rb') as f: # Open in binary mode for PdfReader
53
- reader = PdfReader(f)
54
- if not reader.is_encrypted: # Check if PDF is encrypted before processing
55
- for page in reader.pages:
56
- text += page.extract_text()
57
- logging.info(f"Successfully processed PDF file: {file_path}")
58
- if not text.strip(): # Check if extracted text is empty
59
- return "Warning: PDF processed, but no text content found. The PDF might contain images or scanned content."
60
- return text
61
- else:
62
- error_message = f"Error: Encrypted PDF file: {os.path.basename(file_path)}. Processing of encrypted PDFs is not supported."
63
- logging.warning(error_message)
64
- return error_message
65
-
66
- except FileNotFoundError:
67
- error_message = f"Error: PDF file not found: {os.path.basename(file_path)}. Please ensure the file was uploaded correctly."
68
- logging.error(f"File not found: {file_path}")
69
- return error_message
70
- except PdfReader.errors.PdfStreamError as e: # Specific error for corrupted PDF streams
71
- error_message = f"Error: Corrupted PDF file: {os.path.basename(file_path)}. The PDF file appears to be damaged or invalid. Error details: {e}"
72
- logging.error(f"Corrupted PDF stream error: {file_path} - {e}")
73
- return error_message
74
- except Exception as e: # Catch-all for other PDF processing errors
75
- error_message = f"Error processing PDF file: {os.path.basename(file_path)}. It might be corrupted or use unsupported features. Error details logged."
76
- logging.error(f"General PDF processing error: {file_path} - {e}")
77
- logging.error(traceback.format_exc()) # Log full traceback for debugging
78
- return error_message
79
-
80
-
81
- def _process_zip_file(self, file_path):
82
- extracted_text = ""
83
- error_occurred = False
84
  try:
85
- with zipfile.ZipFile(file_path, 'r') as zf:
86
- if not zf.namelist(): # Check for empty ZIP file
87
- return "Warning: ZIP file is empty and contains no files to process."
88
- for filename in zf.namelist():
89
- try:
90
- if filename.lower().endswith('.pdf'):
91
- with zf.open(filename) as pdf_file:
92
- pdf_content = pdf_file.read()
93
- text = self._process_pdf_content(io.BytesIO(pdf_content), filename=filename) # Pass filename for better error context
94
- extracted_text += f"File: {filename}\nContent:\n{text}\n\n"
95
- logging.info(f"Successfully processed PDF within ZIP: {filename}")
96
- elif filename.lower().endswith('.txt'):
97
- with zf.open(filename) as txt_file:
98
- text = txt_file.read().decode('utf-8', errors='ignore') # Handle potential encoding issues in TXT
99
- extracted_text += f"File: {filename}\nContent:\n{text}\n\n"
100
- logging.info(f"Successfully processed TXT within ZIP: {filename}")
101
- else:
102
- logging.warning(f"Skipping unsupported file type within ZIP: {filename}")
103
- except Exception as e: # Catch errors for individual files within ZIP
104
- error_message = f"Error processing file '{filename}' within ZIP: {os.path.basename(file_path)}. Error: {e}"
105
- logging.error(error_message)
106
- logging.error(traceback.format_exc()) # Log traceback for inner ZIP errors
107
- extracted_text += f"File: {filename}\nError processing file. See logs for details.\n\n" # User-friendly error in output
108
- error_occurred = True # Flag that an error occurred within the zip
109
-
110
- if not error_occurred:
111
- logging.info(f"Successfully processed ZIP file: {file_path}")
112
- else:
113
- logging.warning(f"ZIP file processed with some errors: {file_path}. Check output for details.")
114
- return extracted_text
115
-
116
- except zipfile.BadZipFile: # Specific error for invalid ZIP file
117
- error_message = f"Error: Invalid or corrupted ZIP file: {os.path.basename(file_path)}. Please ensure it is a valid ZIP archive."
118
- logging.error(f"Bad ZIP file error: {file_path}")
119
- return error_message
120
- except Exception as e: # Catch-all for other ZIP processing errors
121
- error_message = f"Error processing ZIP file: {os.path.basename(file_path)}. It might be corrupted or have an unexpected structure. Error details logged."
122
- logging.error(f"General ZIP processing error: {file_path} - {e}")
123
- logging.error(traceback.format_exc()) # Log full traceback for debugging
124
- return error_message
125
-
126
-
127
- def _process_pdf_content(self, pdf_content_stream, filename=""): # Added filename for context
128
- text = ""
129
- try:
130
- reader = PdfReader(pdf_content_stream)
131
- if not reader.is_encrypted:
132
- for page in reader.pages:
133
- text += page.extract_text()
134
- if not text.strip():
135
- logging.warning(f"PDF content processed from '{filename}', but no text found.") # Filename context
136
- return "Warning: PDF content processed, but no text content found."
137
- return text
138
- else:
139
- error_message = f"Error: Encrypted PDF content found in '{filename}'. Processing encrypted PDFs is not supported."
140
- logging.warning(error_message)
141
- return error_message
142
-
143
- except PdfReader.errors.PdfStreamError as e:
144
- error_message = f"Error: Corrupted PDF content in '{filename}'. PDF stream error: {e}" # Filename context
145
- logging.error(error_message)
146
- return error_message
147
- except Exception as e:
148
- error_message = f"Error processing PDF content from '{filename}'. Error details logged." # Filename context
149
- logging.error(f"Error processing PDF content from stream (file: {filename}) - {e}")
150
- logging.error(traceback.format_exc())
151
- return error_message
152
-
153
 
154
- def _process_txt_file(self, file_path):
155
- text = ""
156
- try:
157
- with open(file_path, 'r', encoding='utf-8', errors='ignore') as file: # Handle potential encoding issues
158
- text = file.read()
159
- logging.info(f"Successfully processed TXT file: {file_path}")
160
- if not text.strip(): # Check for empty TXT
161
- return "Warning: TXT file processed, but it is empty."
162
- return text
163
- except FileNotFoundError:
164
- error_message = f"Error: TXT file not found: {os.path.basename(file_path)}. Please ensure the file was uploaded correctly."
165
- logging.error(f"File not found: {file_path}")
166
- return error_message
167
  except Exception as e:
168
- error_message = f"Error processing TXT file: {os.path.basename(file_path)}. Error details logged."
169
- logging.error(f"Error processing TXT file: {file_path} - {e}")
170
- logging.error(traceback.format_exc())
171
- return error_message
172
-
173
-
174
- # Initialize FileProcessor
175
- file_processor = FileProcessor()
176
-
177
- def process_file_and_respond(file_obj): # No change needed here as file_obj is now directly processed
178
- return file_processor.process_file(file_obj)
179
-
180
-
181
- def test_functionality_enhanced():
182
- temp_dir = tempfile.TemporaryDirectory()
183
- test_dir = temp_dir.name
184
-
185
- # --- Create test files in temporary directory ---
186
- def create_test_file(filepath, content, mode='w'): # Helper function for file creation
187
- with open(filepath, mode, encoding='utf-8') as f: # Default text mode
188
- f.write(content)
189
-
190
- def create_binary_test_file(filepath, content_binary, mode='wb'): # Helper for binary file creation
191
- with open(filepath, mode) as f:
192
- f.write(content_binary)
193
-
194
- pdf_content = "This is a test PDF file.\nWith multiple lines."
195
- txt_content = "This is a test TXT file.\nAnother line of text."
196
- zip_content_pdf = "PDF content inside ZIP."
197
- zip_content_txt = "TXT content inside ZIP."
198
- empty_txt_content = ""
199
- encrypted_pdf_content = "%PDF-1.5\n%οΏ½οΏ½οΏ½οΏ½\n1 0 obj\n<< /Type /Catalog /Pages 2 0 R >>\nendobj\n2 0 obj\n<< /Type /Pages /Kids [ 3 0 R ] /Count 1 >>\nendobj\n3 0 obj\n<< /Type /Page /MediaBox [ 0 0 612 792 ] /Contents 4 0 R /Parent 2 0 R >>\nendobj\n4 0 obj\n<< /Length 5 >>\nstream\nBT\n/F1 12 Tf\n72 712 Td\n(This is an encrypted PDF - fake content) Tj\nET\nendstream\nendobj\n5 0 obj\n<< /Length 44 >>\nstream\n/Filter /FlateDecode\n/Length 44\nstream\nxΕ“+οΏ½οΏ½\x0e@E\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\nendstream\nendstream\nendobj\nxref\n0 6\n0000000000 65535 f\n0000000015 00000 n\n0000000062 00000 n\n0000000112 00000 n\n0000000179 00000 n\n0000000259 00000 n\ntrailer\n<< /Size 6 /Root 1 0 R >>\nstartxref\n369\n%%EOF\n" # Minimal PDF structure - not actually encrypted, but enough to test encryption check
200
-
201
- pdf_file_path = os.path.join(test_dir, "test.pdf")
202
- txt_file_path = os.path.join(test_dir, "test.txt")
203
- zip_file_path = os.path.join(test_dir, "test.zip")
204
- unsupported_file_path = os.path.join(test_dir, "test.csv")
205
- corrupted_pdf_path = os.path.join(test_dir, "corrupted.pdf")
206
- empty_txt_path = os.path.join(test_dir, "empty.txt")
207
- empty_zip_path = os.path.join(test_dir, "empty.zip")
208
- encrypted_pdf_path = os.path.join(test_dir, "encrypted.pdf")
209
 
 
 
 
 
 
 
210
 
211
- create_test_file(pdf_file_path, pdf_content)
212
- create_test_file(txt_file_path, txt_content)
213
- create_test_file(unsupported_file_path, "test csv content")
214
- create_test_file(empty_txt_path, empty_txt_content)
215
- create_binary_test_file(encrypted_pdf_path, encrypted_pdf_content.encode('latin-1')) # Encrypted PDF test - use latin-1 to avoid encoding issues with PDF structure
216
 
217
- # Create a "corrupted" PDF by just writing plain text to a .pdf file.
218
- create_test_file(corrupted_pdf_path, "This is NOT a valid PDF file.")
 
 
 
219
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
220
 
221
- with zipfile.ZipFile(zip_file_path, 'w') as zf:
222
- zf.writestr("zip_test.pdf", zip_content_pdf)
223
- zf.writestr("zip_test.txt", zip_content_txt)
 
 
 
 
 
 
 
 
 
 
224
 
225
- with zipfile.ZipFile(empty_zip_path, 'w') as zf: # Create empty zip
226
- pass
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
227
 
 
 
228
 
229
- # --- Test cases ---
230
- test_cases = [
231
- {"name": "PDF Processing", "file_path": pdf_file_path, "expected_content": pdf_content, "expect_error": False},
232
- {"name": "TXT Processing", "file_path": txt_file_path, "expected_content": txt_content, "expect_error": False},
233
- {"name": "ZIP Processing (PDF & TXT)", "file_path": zip_file_path, "expected_content_in": [zip_content_pdf, zip_content_txt], "expect_error": False},
234
- {"name": "Unsupported File Type", "file_path": unsupported_file_path, "expected_content": "Unsupported file type", "expect_error": True},
235
- {"name": "Corrupted PDF Processing", "file_path": corrupted_pdf_path, "expected_content": "Error processing PDF file", "expect_error": True},
236
- {"name": "Empty TXT File", "file_path": empty_txt_path, "expected_content": "Warning: TXT file processed, but it is empty.", "expect_error": False},
237
- {"name": "Empty ZIP File", "file_path": empty_zip_path, "expected_content": "Warning: ZIP file is empty", "expect_error": False},
238
- {"name": "Encrypted PDF File", "file_path": encrypted_pdf_path, "expected_content": "Error: Encrypted PDF file", "expect_error": True},
239
- ]
240
 
241
- all_tests_passed = True
242
- for case in test_cases:
243
- print(f"\n--- Test Case: {case['name']} ---")
244
- result = file_processor.process_file(SimpleFileObject(case['file_path'])) # Use SimpleFileObject to simulate file upload
245
- print(f"Result: {result[:100]}...") # Print first 100 chars of result
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
246
 
247
- if case.get("expect_error"):
248
- if case["expected_content"] not in result:
249
- print(f" ❌ FAIL: Expected error message containing '{case['expected_content']}', but got: {result}")
250
- all_tests_passed = False
251
- else:
252
- print(f" βœ… PASS: Expected error message found.")
253
- elif case.get("expected_content_in"): # For cases expecting multiple contents (like ZIP)
254
- all_contents_found = True
255
- for expected_content in case["expected_content_in"]:
256
- if expected_content not in result:
257
- print(f" ❌ FAIL: Expected content '{expected_content}' not found in result for {case['name']}. Got: {result[:100]}...")
258
- all_contents_found = False
259
- all_tests_passed = False
260
- break
261
- if all_contents_found:
262
- print(f" βœ… PASS: All expected contents found.")
263
-
264
- elif case.get("expected_content"):
265
- if case["expected_content"] not in result:
266
- print(f" ❌ FAIL: Expected content '{case['expected_content']}', but got: {result[:100]}...")
267
- all_tests_passed = False
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
268
  else:
269
- print(f" βœ… PASS: Expected content found.")
270
-
271
-
272
- if all_tests_passed:
273
- print("\nπŸŽ‰ All enhanced tests completed successfully! πŸŽ‰")
274
- else:
275
- print("\n⚠️ Some enhanced tests FAILED. See details above. ⚠️")
276
-
277
- temp_dir.cleanup() # Clean up temporary directory and files
278
-
279
-
280
- class SimpleFileObject: # Mock file object for testing
281
- def __init__(self, file_path):
282
- self.name = file_path
283
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
284
 
285
- iface = gr.Interface(
286
- fn=process_file_and_respond,
287
- inputs=gr.File(file_types=[".pdf", ".txt", ".zip"]),
288
- outputs="text",
289
- title="Robust File Processing Agent",
290
- description="Upload a PDF, TXT, or ZIP file to process its content. Enhanced for error handling and robustness."
291
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
292
 
293
  if __name__ == "__main__":
294
- test_functionality_enhanced() # Run enhanced tests
295
- iface.launch(debug=True)
 
1
+ import json
2
  import os
3
+ import re
4
+ import time
5
  import logging
6
+ import mimetypes
7
  import zipfile
 
 
8
  import tempfile
9
+ from datetime import datetime
10
+ from typing import List, Dict, Optional, Union
11
+ from pathlib import Path
12
+ from urllib.parse import urlparse
 
 
 
13
 
14
+ import requests
15
+ import validators
16
+ import gradio as gr
17
+ from diskcache import Cache
18
+ from bs4 import BeautifulSoup
19
+ from fake_useragent import UserAgent
20
+ from cleantext import clean
21
+ import qrcode
22
+
23
+ # Setup logging
24
+ logging.basicConfig(
25
+ level=logging.INFO,
26
+ format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s',
27
+ handlers=[
28
+ logging.StreamHandler(),
29
+ logging.FileHandler('app.log', encoding='utf-8')
30
+ ]
31
+ )
32
+ logger = logging.getLogger(__name__)
33
 
34
+ # Ensure output directories exist
35
+ Path('output/qr_codes').mkdir(parents=True, exist_ok=True)
 
36
 
37
+ class URLProcessor:
38
+ def __init__(self):
39
+ self.session = requests.Session()
40
+ self.timeout = 10 # seconds
41
+ self.session.headers.update({
42
+ 'User -Agent': UserAgent().random,
43
+ 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
44
+ 'Accept-Language': 'en-US,en;q=0.5',
45
+ 'Accept-Encoding': 'gzip, deflate, br',
46
+ 'Connection': 'keep-alive',
47
+ 'Upgrade-Insecure-Requests': '1'
48
+ })
49
+
50
+ def advanced_text_cleaning(self, text: str) -> str:
51
+ """Robust text cleaning with version compatibility"""
52
  try:
53
+ cleaned_text = clean(
54
+ text,
55
+ fix_unicode=True,
56
+ to_ascii=True,
57
+ lower=True,
58
+ no_line_breaks=True,
59
+ no_urls=True,
60
+ no_emails=True,
61
+ no_phone_numbers=True,
62
+ no_numbers=False,
63
+ no_digits=False,
64
+ no_currency_symbols=True,
65
+ no_punct=False
66
+ ).strip()
67
+ return cleaned_text
68
  except Exception as e:
69
+ logger.warning(f"Text cleaning error: {e}. Using fallback method.")
70
+ text = re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text) # Remove control characters
71
+ text = text.encode('ascii', 'ignore').decode('ascii') # Remove non-ASCII characters
72
+ text = re.sub(r'\s+', ' ', text) # Normalize whitespace
73
+ return text.strip()
74
+
75
+ def validate_url(self, url: str) -> Dict:
76
+ """Validate URL format and accessibility"""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
77
  try:
78
+ if not validators.url(url):
79
+ return {'is_valid': False, 'message': 'Invalid URL format'}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
80
 
81
+ response = self.session.head(url, timeout=self.timeout)
82
+ response.raise_for_status()
83
+ return {'is_valid': True, 'message': 'URL is valid and accessible'}
 
 
 
 
 
 
 
 
 
 
84
  except Exception as e:
85
+ return {'is_valid': False, 'message': f'URL validation failed: {str(e)}'}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
86
 
87
+ def fetch_content(self, url: str) -> Optional[Dict]:
88
+ """Universal content fetcher with special case handling"""
89
+ try:
90
+ # Google Drive document handling
91
+ if 'drive.google.com' in url:
92
+ return self._handle_google_drive(url)
93
 
94
+ # Google Calendar ICS handling
95
+ if 'calendar.google.com' in url and 'ical' in url:
96
+ return self._handle_google_calendar(url)
 
 
97
 
98
+ # Standard HTML processing
99
+ return self._fetch_html_content(url)
100
+ except Exception as e:
101
+ logger.error(f"Content fetch failed: {e}")
102
+ return None
103
 
104
+ def _handle_google_drive(self, url: str) -> Optional[Dict]:
105
+ """Process Google Drive file links"""
106
+ try:
107
+ file_id = re.search(r'/file/d/([a-zA-Z0-9_-]+)', url)
108
+ if not file_id:
109
+ logger.error(f"Invalid Google Drive URL: {url}")
110
+ return None
111
+
112
+ direct_url = f"https://drive.google.com/uc?export=download&id={file_id.group(1)}"
113
+ response = self.session.get(direct_url, timeout=self.timeout)
114
+ response.raise_for_status()
115
+
116
+ return {
117
+ 'content': response.text,
118
+ 'content_type': response.headers.get('Content-Type', ''),
119
+ 'timestamp': datetime.now().isoformat()
120
+ }
121
+ except Exception as e:
122
+ logger.error(f"Google Drive processing failed: {e}")
123
+ return None
124
 
125
+ def _handle_google_calendar(self, url: str) -> Optional[Dict]:
126
+ """Process Google Calendar ICS feeds"""
127
+ try:
128
+ response = self.session.get(url, timeout=self.timeout)
129
+ response.raise_for_status()
130
+ return {
131
+ 'content': response.text,
132
+ 'content_type': 'text/calendar',
133
+ 'timestamp': datetime.now().isoformat()
134
+ }
135
+ except Exception as e:
136
+ logger.error(f"Calendar fetch failed: {e}")
137
+ return None
138
 
139
+ def _fetch_html_content(self, url: str) -> Optional[Dict]:
140
+ """Standard HTML content processing"""
141
+ try:
142
+ response = self.session.get(url, timeout=self.timeout)
143
+ response.raise_for_status()
144
+
145
+ soup = BeautifulSoup(response.text, 'html.parser')
146
+
147
+ # Remove unwanted elements
148
+ for element in soup(['script', 'style', 'nav', 'footer', 'header', 'meta', 'link']):
149
+ element.decompose()
150
+
151
+ # Extract main content
152
+ main_content = soup.find('main') or soup.find('article') or soup.body
153
+
154
+ if main_content is None:
155
+ logger.warning(f"No main content found for URL: {url}")
156
+ return {
157
+ 'content': '',
158
+ 'content_type': response.headers.get('Content-Type', ''),
159
+ 'timestamp': datetime.now().isoformat()
160
+ }
161
+
162
+ # Clean and structure content
163
+ text_content = main_content.get_text(separator='\n', strip=True)
164
+ cleaned_content = self.advanced_text_cleaning(text_content)
165
+
166
+ return {
167
+ 'content': cleaned_content,
168
+ 'content_type': response.headers.get('Content-Type', ''),
169
+ 'timestamp': datetime.now().isoformat()
170
+ }
171
+ except Exception as e:
172
+ logger.error(f"HTML processing failed: {e}")
173
+ return None
174
 
175
+ class FileProcessor:
176
+ """Class to handle file processing"""
177
 
178
+ def __init__(self, max_file_size: int = 2 * 1024 * 1024 * 1024): # 2GB default
179
+ self.max_file_size = max_file_size
180
+ self.supported_text_extensions = {'.txt', '.md', '.csv', '.json', '.xml'}
 
 
 
 
 
 
 
 
181
 
182
+ def is_text_file(self, filepath: str) -> bool:
183
+ """Check if file is a text file"""
184
+ try:
185
+ mime_type, _ = mimetypes.guess_type(filepath)
186
+ return (mime_type and mime_type.startswith('text/')) or \
187
+ (os.path.splitext(filepath)[1].lower() in self.supported_text_extensions)
188
+ except Exception:
189
+ return False
190
+
191
+ def process_file(self, file) -> List[Dict]:
192
+ """Process uploaded file with enhanced error handling"""
193
+ if not file:
194
+ return []
195
+
196
+ dataset = []
197
+ try:
198
+ file_size = os.path.getsize(file.name)
199
+ if file_size > self.max_file_size:
200
+ logger.warning(f"File size ({file_size} bytes) exceeds maximum allowed size")
201
+ return []
202
+
203
+ with tempfile.TemporaryDirectory() as temp_dir:
204
+ if zipfile.is_zipfile(file.name):
205
+ dataset.extend(self._process_zip_file(file.name, temp_dir))
206
+ else:
207
+ dataset.extend(self._process_single_file(file))
208
 
209
+ except Exception as e:
210
+ logger.error(f"Error processing file: {str(e)}")
211
+ return []
212
+
213
+ return dataset
214
+
215
+ def chunk_data(self, data, max_size=2953): # 2953 is the max size for version 1 QR code
216
+ """Chunk data into smaller pieces if it exceeds max_size."""
217
+ json_str = json.dumps(data, ensure_ascii=False)
218
+ if len(json_str) <= max_size:
219
+ return [json_str]
220
+
221
+ # Split into chunks
222
+ chunks = []
223
+ while json_str:
224
+ chunk = json_str[:max_size]
225
+ chunks.append(chunk)
226
+ json_str = json_str[max_size:]
227
+
228
+ return chunks
229
+
230
+ def _process_single_file(self, file) -> List[Dict]:
231
+ """Process a single file"""
232
+ try:
233
+ file_stat = os.stat(file.name)
234
+
235
+ # For very large files, read in chunks and summarize
236
+ if file_stat.st_size > 100 * 1024 * 1024: # 100MB
237
+ logger.info(f"Processing large file: {file.name} ({file_stat.st_size} bytes)")
238
+
239
+ # Read first and last 1MB for extremely large files
240
+ content = ""
241
+ with open(file.name, 'r', encoding='utf-8', errors='ignore') as f:
242
+ content = f.read(1 * 1024 * 1024) # First 1MB
243
+ content += "\n...[Content truncated due to large file size]...\n"
244
+
245
+ # Seek to the last 1MB
246
+ f.seek(max(0, file_stat.st_size - 1 * 1024 * 1024))
247
+ content += f.read() # Last 1MB
248
+ else:
249
+ # Regular file processing
250
+ with open(file.name, 'r', encoding='utf-8', errors='ignore') as f:
251
+ content = f.read()
252
+
253
+ return [{
254
+ 'source': 'filename', # Assuming 'source' should be a string value
255
+ 'filename': os.path.basename(file.name),
256
+ 'file_size': file_stat.st_size,
257
+ 'mime_type': mimetypes.guess_type(file.name)[0],
258
+ 'created': datetime.fromtimestamp(file_stat.st_ctime).isoformat(),
259
+ 'modified': datetime.fromtimestamp(file_stat.st_mtime).isoformat(),
260
+ 'content': content,
261
+ 'timestamp': datetime.now().isoformat()
262
+ }]
263
+ except Exception as e:
264
+ logger.error(f"File processing error: {e}")
265
+ return []
266
+
267
+ def clean_json(data: Union[str, Dict]) -> Optional[Dict]:
268
+ """Clean and validate JSON data"""
269
+ try:
270
+ # If it's a string, try to parse it
271
+ if isinstance(data, str):
272
+ # Remove any existing content and extra whitespace
273
+ data = data.strip()
274
+ data = json.loads(data)
275
+
276
+ # Convert to string and back to ensure proper JSON format
277
+ cleaned = json.loads(json.dumps(data))
278
+ return cleaned
279
+ except json.JSONDecodeError as e:
280
+ logger.error(f"JSON cleaning error: {e}")
281
+ return None
282
+ except Exception as e:
283
+ logger.error(f"Unexpected error while cleaning JSON: {e}")
284
+ return None
285
+
286
+ def generate_qr_code(data: Union[str, Dict], combined: bool = True) -> List[str]:
287
+ """Generate QR code(s) from data"""
288
+ try:
289
+ output_dir = Path('output/qr_codes')
290
+ output_dir.mkdir(parents=True, exist_ok=True)
291
+
292
+ if combined:
293
+ # Generate single QR code for all data
294
+ cleaned_data = clean_json(data)
295
+ if cleaned_data is None: # Check if cleaning failed
296
+ logger.error("Failed to clean data for QR code generation.")
297
+ return []
298
+
299
+ qr = qrcode.QRCode(
300
+ version=None,
301
+ error_correction=qrcode.constants.ERROR_CORRECT_L,
302
+ box_size=10,
303
+ border=4,
304
+ )
305
+ json_str = json.dumps(cleaned_data, ensure_ascii=False)
306
+ qr.add_data(json_str)
307
+ qr.make(fit=True)
308
+
309
+ img = qr.make_image(fill_color="black", back_color="white")
310
+ output_path = output_dir / f'combined_qr_{int(time.time())}.png'
311
+ img.save(str(output_path))
312
+ return [str(output_path)]
313
+ else:
314
+ # Generate separate QR codes for each item
315
+ if isinstance(data, list):
316
+ paths = []
317
+ for idx, item in enumerate(data):
318
+ cleaned_item = clean_json(item)
319
+ if cleaned_item is None: # Check if cleaning failed
320
+ logger.error(f"Failed to clean item {idx} for QR code generation.")
321
+ continue # Skip this item
322
+
323
+ qr = qrcode.QRCode(
324
+ version=None,
325
+ error_correction=qrcode.constants.ERROR_CORRECT_L,
326
+ box_size=10,
327
+ border=4,
328
+ )
329
+ json_str = json.dumps(cleaned_item, ensure_ascii=False)
330
+ qr.add_data(json_str)
331
+ qr.make(fit=True)
332
+
333
+ img = qr.make_image(fill_color="black", back_color="white")
334
+ output_path = output_dir / f'item_{idx}_qr_{int(time.time())}.png'
335
+ img.save(str(output_path))
336
+ paths.append(str(output_path))
337
+ return paths
338
  else:
339
+ # Single item, not combined
340
+ cleaned_item = clean_json(data)
341
+ if cleaned_item is None: # Check if cleaning failed
342
+ logger.error("Failed to clean single item for QR code generation.")
343
+ return []
344
+
345
+ qr = qrcode.QRCode(
346
+ version=None,
347
+ error_correction=qrcode.constants.ERROR_CORRECT_L,
348
+ box_size=10,
349
+ border=4,
350
+ )
351
+ json_str = json.dumps(cleaned_item, ensure_ascii=False)
352
+ qr.add_data(json_str)
353
+ qr.make(fit=True)
354
+
355
+ img = qr.make_image(fill_color="black", back_color="white")
356
+ output_path = output_dir / f'single_qr_{int(time.time())}.png'
357
+ img.save(str(output_path))
358
+ return [str(output_path)]
359
+
360
+ return []
361
+ except Exception as e:
362
+ logger.error(f"QR generation error: {e}")
363
+ return []
364
+
365
+ def create_interface():
366
+ """Create a comprehensive Gradio interface with advanced features"""
367
+
368
+ css = """
369
+ .container { max-width: 1200px; margin: auto; }
370
+ .warning { background-color: #fff3cd; color: #856404; padding: 10px; border-radius: 4px; }
371
+ .error { background-color: #f8d7da; color: #721c24; padding: 10px; border-radius: 4px; }
372
+ .success { background-color: #d4edda; color: #155724; padding: 10px; border-radius: 4px; }
373
+ """
374
+
375
+ with gr.Blocks(css=css, title="Advanced Data Processor & QR Generator") as interface:
376
+ gr.Markdown("# 🌐 Advanced Data Processing & QR Code Generator")
377
+
378
+ with gr.Tab("URL Processing"):
379
+ url_input = gr.Textbox(
380
+ label="Enter URLs (comma or newline separated)",
381
+ lines=5,
382
+ placeholder="https://example1.com\nhttps://example2.com",
383
+ value=""
384
+ )
385
+
386
+ with gr.Tab("File Input"):
387
+ file_input = gr.File(
388
+ label="Upload text file or ZIP archive",
389
+ file_types=[".txt", ".zip", ".md", ".csv", ".json", ".xml"]
390
+ )
391
+
392
+ with gr.Tab("Notepad"):
393
+ text_input = gr.TextArea(
394
+ label="JSON Data Input",
395
+ lines=15,
396
+ placeholder="Paste your JSON data here...",
397
+ value=""
398
+ )
399
+
400
+ with gr.Row():
401
+ example_btn = gr.Button("πŸ“ Load Example JSON", variant="secondary")
402
+ clear_btn = gr.Button("πŸ—‘οΈ Clear Input", variant="secondary")
403
+
404
+ with gr.Row():
405
+ combine_data = gr.Checkbox(
406
+ label="Combine all data into single QR code",
407
+ value=True,
408
+ info="Generate one QR code for all data, or separate QR codes for each item"
409
+ )
410
+ process_btn = gr.Button("πŸ”„ Process & Generate QR", variant="primary", scale=2)
411
+
412
+ output_json = gr.JSON(label="Processed JSON Data")
413
+ output_gallery = gr.Gallery(label="Generated QR Codes", columns=2, height=400)
414
+ output_text = gr.Textbox(label="Processing Status", interactive=False)
415
+
416
+ def load_example():
417
+ example_json = {
418
+ "type": "product_catalog",
419
+ "items": [
420
+ {
421
+ "id": "123",
422
+ "name": "Test Product",
423
+ "description": "This is a test product description",
424
+ "price": 29.99,
425
+ "category": "electronics",
426
+ "tags": ["test", "sample", "demo"]
427
+ },
428
+ {
429
+ "id": "456",
430
+ "name": "Another Product",
431
+ "description": "Another test product description",
432
+ "price": 49.99,
433
+ "category": "accessories",
434
+ "tags": ["sample", "test"]
435
+ }
436
+ ],
437
+ "metadata": {
438
+ "timestamp": datetime.now().isoformat(),
439
+ "version": "1.0",
440
+ "source": "example"
441
+ }
442
+ }
443
+ return json.dumps(example_json, indent=2)
444
+
445
+ def clear_input():
446
+ return ""
447
+
448
+ def process_all_inputs(urls, file, text, combine):
449
+ """Process all input types and generate QR codes"""
450
+ try:
451
+ results = []
452
 
453
+ # Process text input first (since it's direct JSON)
454
+ if text and text.strip():
455
+ try:
456
+ # Try to parse as JSON
457
+ json_data = json.loads(text)
458
+ if isinstance(json_data, list):
459
+ results.extend(json_data)
460
+ else:
461
+ results.append(json_data)
462
+ except json.JSONDecodeError as e:
463
+ return None, [], f"❌ Invalid JSON format: {str(e)}"
464
+
465
+ # Process URLs if provided
466
+ if urls and urls.strip():
467
+ processor = URLProcessor()
468
+ url_list = re.split(r'[,\n]', urls)
469
+ url_list = [url.strip() for url in url_list if url.strip()]
470
+
471
+ for url in url_list:
472
+ validation = processor.validate_url(url)
473
+ if validation.get('is_valid'):
474
+ content = processor.fetch_content(url)
475
+ if content:
476
+ results.append({
477
+ 'source': 'url',
478
+ 'url': url,
479
+ 'content': content,
480
+ 'timestamp': datetime.now().isoformat()
481
+ })
482
+
483
+ # Process files if provided
484
+ if file:
485
+ file_processor = FileProcessor()
486
+ file_results = file_processor.process_file(file)
487
+ if file_results:
488
+ results.extend(file_results)
489
+
490
+ # Generate QR codes
491
+ if results:
492
+ if combine:
493
+ # Chunk the data if necessary
494
+ combined_data = []
495
+ for item in results:
496
+ combined_data.extend(file_processor.chunk_data(item))
497
+ qr_paths = generate_qr_code(combined_data, combined=False)
498
+ else:
499
+ qr_paths = generate_qr_code(results, combined=combine)
500
+
501
+ if qr_paths:
502
+ return (
503
+ results,
504
+ [str(path) for path in qr_paths],
505
+ f"βœ… Successfully processed {len(results)} items and generated {len(qr_paths)} QR code(s)!"
506
+ )
507
+ else:
508
+ return None, [], "❌ Failed to generate QR codes. Please check the input data."
509
+ else:
510
+ return None, [], "⚠️ No valid content to process. Please provide some input data."
511
+
512
+ except Exception as e:
513
+ logger.error(f"Processing error: {e}")
514
+ return None, [], f"❌ Error: {str(e)}"
515
+
516
+ # Set up event handlers
517
+ example_btn.click(load_example, outputs=[text_input])
518
+ clear_btn.click(clear_input, outputs=[text_input])
519
+ process_btn.click(
520
+ process_all_inputs,
521
+ inputs=[url_input, file_input, text_input, combine_data],
522
+ outputs=[output_json, output_gallery, output_text]
523
+ )
524
+
525
+ gr.Markdown("""
526
+ ### Features
527
+ - **URL Processing**: Extract content from websites
528
+ - **File Processing**: Handle text files and archives
529
+ - **Notepad**: Direct JSON data input/manipulation
530
+ - **JSON Cleaning**: Automatic JSON validation and formatting
531
+ - **QR Generation**: Generate QR codes with embedded JSON data
532
+ - **Flexible Output**: Choose between combined or separate QR codes
533
+
534
+ ### Usage Tips
535
+ 1. Use the **Notepad** tab for direct JSON input
536
+ 2. Click "Load Example JSON" to see a sample format
537
+ 3. Choose whether to combine all data into a single QR code
538
+ 4. The generated QR codes will contain the complete JSON data
539
+ """)
540
+
541
+ return interface
542
+
543
+ def main():
544
+ # Configure system settings
545
+ mimetypes.init()
546
+
547
+ # Create output directories
548
+ Path('output/qr_codes').mkdir(parents=True, exist_ok=True)
549
+
550
+ # Create and launch interface
551
+ interface = create_interface()
552
+
553
+ # Launch with proper configuration for Hugging Face
554
+ interface.launch(
555
+ share=False,
556
+ debug=False # Set to False for production
557
+ )
558
 
559
  if __name__ == "__main__":
560
+ main()