acecalisto3 commited on
Commit
d139998
·
verified ·
1 Parent(s): 431e17b

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +175 -18
app.py CHANGED
@@ -12,11 +12,18 @@ from typing import List, Dict, Tuple, Union, Optional
12
  import requests
13
  import validators
14
  import gradio as gr
15
- import zxing
16
  from bs4 import BeautifulSoup
17
  from fake_useragent import UserAgent
18
  from cleantext import clean
19
  import qrcode
 
 
 
 
 
 
 
20
 
21
  # Setup logging with detailed configuration
22
  logging.basicConfig(
@@ -41,6 +48,13 @@ class URLProcessor:
41
  'Connection': 'keep-alive',
42
  'Upgrade-Insecure-Requests': '1'
43
  })
 
 
 
 
 
 
 
44
 
45
  def advanced_text_cleaning(self, text: str) -> str:
46
  """Robust text cleaning with version compatibility"""
@@ -80,18 +94,31 @@ class URLProcessor:
80
  return {'is_valid': False, 'message': f'URL validation failed: {str(e)}'}
81
 
82
  def fetch_content(self, url: str) -> Optional[Dict]:
83
- """Universal content fetcher with special case handling"""
84
  try:
85
- # Google Drive document handling
86
  if 'drive.google.com' in url:
87
  return self._handle_google_drive(url)
88
-
89
- # Google Calendar ICS handling
90
  if 'calendar.google.com' in url and 'ical' in url:
91
  return self._handle_google_calendar(url)
92
-
93
- # Standard HTML processing
94
- return self._fetch_html_content(url)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
95
  except Exception as e:
96
  logger.error(f"Content fetch failed: {e}")
97
  return None
@@ -131,9 +158,8 @@ class URLProcessor:
131
  logger.error(f"Calendar fetch failed: {e}")
132
  return None
133
 
134
-
135
  def _fetch_html_content(self, url: str) -> Optional[Dict]:
136
- """Standard HTML content processing"""
137
  try:
138
  response = self.session.get(url, timeout=self.timeout)
139
  response.raise_for_status()
@@ -147,14 +173,13 @@ class URLProcessor:
147
  # Extract main content
148
  main_content = soup.find('main') or soup.find('article') or soup.body
149
 
150
- # Check if main_content is None
151
- if main_content is None:
152
- logger.warning(f"No main content found in the HTML for URL: {url}")
153
- return {
154
- 'content': "No main content found.",
155
- 'content_type': response.headers.get('Content-Type', ''),
156
- 'timestamp': datetime.now().isoformat()
157
- }
158
 
159
  # Clean and structure content
160
  text_content = main_content.get_text(separator='\n', strip=True)
@@ -162,12 +187,144 @@ class URLProcessor:
162
 
163
  return {
164
  'content': cleaned_content,
 
165
  'content_type': response.headers.get('Content-Type', ''),
166
  'timestamp': datetime.now().isoformat()
167
  }
168
  except Exception as e:
169
  logger.error(f"HTML processing failed: {e}")
170
  return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
171
 
172
  class FileProcessor:
173
  """Class to handle file processing"""
 
12
  import requests
13
  import validators
14
  import gradio as gr
15
+ from diskcache import Cache
16
  from bs4 import BeautifulSoup
17
  from fake_useragent import UserAgent
18
  from cleantext import clean
19
  import qrcode
20
+ import PyPDF2
21
+ from PIL import Image
22
+ import pytesseract
23
+ import cv2
24
+ import numpy as np
25
+ import fitz # PyMuPDF
26
+ import zipfile
27
 
28
  # Setup logging with detailed configuration
29
  logging.basicConfig(
 
48
  'Connection': 'keep-alive',
49
  'Upgrade-Insecure-Requests': '1'
50
  })
51
+ self.supported_content_types = {
52
+ 'text/html': self._fetch_html_content,
53
+ 'application/pdf': self._fetch_pdf_content,
54
+ 'image': self._fetch_image_content,
55
+ 'application/json': self._fetch_json_content,
56
+ 'text/plain': self._fetch_text_content
57
+ }
58
 
59
  def advanced_text_cleaning(self, text: str) -> str:
60
  """Robust text cleaning with version compatibility"""
 
94
  return {'is_valid': False, 'message': f'URL validation failed: {str(e)}'}
95
 
96
  def fetch_content(self, url: str) -> Optional[Dict]:
97
+ """Universal content fetcher with enhanced content type handling"""
98
  try:
99
+ # Special case handling
100
  if 'drive.google.com' in url:
101
  return self._handle_google_drive(url)
 
 
102
  if 'calendar.google.com' in url and 'ical' in url:
103
  return self._handle_google_calendar(url)
104
+
105
+ # Get content type
106
+ response = self.session.head(url, timeout=self.timeout)
107
+ content_type = response.headers.get('Content-Type', '').split(';')[0].lower()
108
+
109
+ # Find appropriate handler
110
+ handler = None
111
+ for supported_type, type_handler in self.supported_content_types.items():
112
+ if content_type.startswith(supported_type):
113
+ handler = type_handler
114
+ break
115
+
116
+ if handler:
117
+ return handler(url)
118
+ else:
119
+ logger.warning(f"Unsupported content type: {content_type}")
120
+ return self._fetch_text_content(url)
121
+
122
  except Exception as e:
123
  logger.error(f"Content fetch failed: {e}")
124
  return None
 
158
  logger.error(f"Calendar fetch failed: {e}")
159
  return None
160
 
 
161
  def _fetch_html_content(self, url: str) -> Optional[Dict]:
162
+ """Enhanced HTML content processing with metadata extraction"""
163
  try:
164
  response = self.session.get(url, timeout=self.timeout)
165
  response.raise_for_status()
 
173
  # Extract main content
174
  main_content = soup.find('main') or soup.find('article') or soup.body
175
 
176
+ # Extract metadata
177
+ metadata = {
178
+ 'title': soup.title.string if soup.title else None,
179
+ 'description': soup.find('meta', {'name': 'description'})['content'] if soup.find('meta', {'name': 'description'}) else None,
180
+ 'keywords': soup.find('meta', {'name': 'keywords'})['content'] if soup.find('meta', {'name': 'keywords'}) else None,
181
+ 'author': soup.find('meta', {'name': 'author'})['content'] if soup.find('meta', {'name': 'author'}) else None
182
+ }
 
183
 
184
  # Clean and structure content
185
  text_content = main_content.get_text(separator='\n', strip=True)
 
187
 
188
  return {
189
  'content': cleaned_content,
190
+ 'metadata': metadata,
191
  'content_type': response.headers.get('Content-Type', ''),
192
  'timestamp': datetime.now().isoformat()
193
  }
194
  except Exception as e:
195
  logger.error(f"HTML processing failed: {e}")
196
  return None
197
+
198
+ def _fetch_pdf_content(self, url: str) -> Optional[Dict]:
199
+ """Process PDF content with enhanced metadata extraction"""
200
+ try:
201
+ response = self.session.get(url, timeout=self.timeout)
202
+ response.raise_for_status()
203
+
204
+ with tempfile.NamedTemporaryFile(suffix='.pdf') as temp_file:
205
+ temp_file.write(response.content)
206
+ temp_file.flush()
207
+
208
+ # Extract text and metadata using PyMuPDF
209
+ doc = fitz.open(temp_file.name)
210
+
211
+ # Extract text with formatting preservation
212
+ text = ""
213
+ metadata = {
214
+ 'title': doc.metadata.get('title'),
215
+ 'author': doc.metadata.get('author'),
216
+ 'subject': doc.metadata.get('subject'),
217
+ 'keywords': doc.metadata.get('keywords'),
218
+ 'creator': doc.metadata.get('creator'),
219
+ 'producer': doc.metadata.get('producer'),
220
+ 'page_count': len(doc),
221
+ 'file_size': os.path.getsize(temp_file.name),
222
+ 'version': doc.version
223
+ }
224
+
225
+ # Extract text with layout preservation
226
+ for page in doc:
227
+ blocks = page.get_text("blocks")
228
+ for block in blocks:
229
+ if block[6] == 0: # Text block
230
+ text += block[4] + "\n"
231
+
232
+ doc.close()
233
+ cleaned_content = self.advanced_text_cleaning(text)
234
+
235
+ return {
236
+ 'content': cleaned_content,
237
+ 'metadata': metadata,
238
+ 'content_type': 'application/pdf',
239
+ 'timestamp': datetime.now().isoformat()
240
+ }
241
+ except Exception as e:
242
+ logger.error(f"PDF processing failed: {e}")
243
+ return None
244
+
245
+ def _fetch_image_content(self, url: str) -> Optional[Dict]:
246
+ """Process image content with OCR and advanced image processing"""
247
+ try:
248
+ response = self.session.get(url, timeout=self.timeout)
249
+ response.raise_for_status()
250
+
251
+ with tempfile.NamedTemporaryFile(suffix='.jpg') as temp_file:
252
+ temp_file.write(response.content)
253
+ temp_file.flush()
254
+
255
+ # Load image with OpenCV
256
+ img = cv2.imread(temp_file.name)
257
+ if img is None:
258
+ raise ValueError("Failed to load image")
259
+
260
+ # Image preprocessing for better OCR
261
+ gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
262
+ denoised = cv2.fastNlMeansDenoising(gray)
263
+ thresh = cv2.threshold(denoised, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1]
264
+
265
+ # Extract text using Tesseract
266
+ text = pytesseract.image_to_string(thresh)
267
+ cleaned_text = self.advanced_text_cleaning(text) if text else None
268
+
269
+ # Extract metadata and additional image features
270
+ with Image.open(temp_file.name) as pil_img:
271
+ exif = pil_img._getexif() if hasattr(pil_img, '_getexif') else None
272
+ metadata = {
273
+ 'format': pil_img.format,
274
+ 'mode': pil_img.mode,
275
+ 'size': pil_img.size,
276
+ 'exif': exif,
277
+ 'image_features': {
278
+ 'resolution': img.shape,
279
+ 'channels': img.shape[2] if len(img.shape) > 2 else 1,
280
+ 'mean_brightness': np.mean(gray),
281
+ 'has_text': bool(cleaned_text and cleaned_text.strip())
282
+ }
283
+ }
284
+
285
+ return {
286
+ 'content': cleaned_text,
287
+ 'metadata': metadata,
288
+ 'content_type': response.headers.get('Content-Type', ''),
289
+ 'timestamp': datetime.now().isoformat()
290
+ }
291
+ except Exception as e:
292
+ logger.error(f"Image processing failed: {e}")
293
+ return None
294
+
295
+ def _fetch_json_content(self, url: str) -> Optional[Dict]:
296
+ """Process JSON content"""
297
+ try:
298
+ response = self.session.get(url, timeout=self.timeout)
299
+ response.raise_for_status()
300
+
301
+ content = response.json()
302
+
303
+ return {
304
+ 'content': json.dumps(content, indent=2),
305
+ 'content_type': 'application/json',
306
+ 'timestamp': datetime.now().isoformat()
307
+ }
308
+ except Exception as e:
309
+ logger.error(f"JSON processing failed: {e}")
310
+ return None
311
+
312
+ def _fetch_text_content(self, url: str) -> Optional[Dict]:
313
+ """Process plain text content"""
314
+ try:
315
+ response = self.session.get(url, timeout=self.timeout)
316
+ response.raise_for_status()
317
+
318
+ cleaned_content = self.advanced_text_cleaning(response.text)
319
+
320
+ return {
321
+ 'content': cleaned_content,
322
+ 'content_type': response.headers.get('Content-Type', ''),
323
+ 'timestamp': datetime.now().isoformat()
324
+ }
325
+ except Exception as e:
326
+ logger.error(f"Text processing failed: {e}")
327
+ return None
328
 
329
  class FileProcessor:
330
  """Class to handle file processing"""