acecalisto3 commited on
Commit
58c3484
·
verified ·
1 Parent(s): a27e1d0

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +57 -55
app.py CHANGED
@@ -39,37 +39,13 @@ class URLProcessor:
39
  self.session = requests.Session()
40
  self.timeout = 10 # seconds
41
  self.session.headers.update({
42
- 'User -Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3',
43
  'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
44
  'Accept-Language': 'en-US,en;q=0.5',
45
  'Accept-Encoding': 'gzip, deflate, br',
46
  'Connection': 'keep-alive',
47
  'Upgrade-Insecure-Requests': '1'
48
  })
49
- def advanced_text_cleaning(self, text: str) -> str:
50
- """Robust text cleaning with version compatibility"""
51
- try:
52
- cleaned_text = clean(
53
- text,
54
- fix_unicode=True,
55
- to_ascii=True,
56
- lower=True,
57
- no_line_breaks=True,
58
- no_urls=False,
59
- no_emails=True,
60
- no_phone_numbers=True,
61
- no_numbers=False,
62
- no_digits=False,
63
- no_currency_symbols=True,
64
- no_punct=False
65
- ).strip()
66
- return cleaned_text
67
- except Exception as e:
68
- logger.warning(f"Text cleaning error: {e}. Using fallback method.")
69
- text = re.sub(r'[\x00-\x1F\x7F-\x9F]', '', text) # Remove control characters
70
- text = text.encode('ascii', 'ignore').decode('ascii') # Remove non-ASCII characters
71
- text = re.sub(r'\s+', ' ', text) # Normalize whitespace
72
- return text.strip()
73
 
74
  def validate_url(self, url: str) -> Dict:
75
  """Validate URL format and accessibility"""
@@ -77,51 +53,77 @@ class URLProcessor:
77
  if not validators.url(url):
78
  return {'is_valid': False, 'message': 'Invalid URL format'}
79
 
80
- response = self.session.head(url, timeout=self.timeout)
 
81
  response.raise_for_status()
82
  return {'is_valid': True, 'message': 'URL is valid and accessible'}
83
  except Exception as e:
84
  return {'is_valid': False, 'message': f'URL validation failed: {str(e)}'}
85
 
86
  def fetch_content(self, url: str) -> Optional[Dict]:
87
- """Universal content fetcher with special case handling"""
88
  try:
89
- logger.info(f"Fetching content from URL: {url}") # Log the URL being fetched
90
  response = self.session.get(url, timeout=self.timeout)
91
- response.raise_for_status() # Raise an error for bad responses
92
-
93
- soup = BeautifulSoup(response.text, 'html.parser')
94
-
95
- # Remove unwanted elements
96
- for element in soup(['script', 'style', 'nav', 'footer', 'header', 'meta', 'link']):
97
- element.decompose()
98
-
99
- # Extract main content
100
- main_content = soup.find('main') or soup.find('article') or soup.body
101
-
102
- if main_content is None:
103
- logger.warning(f"No main content found for URL: {url}")
104
- return {
105
- 'content': response.text, # Return the full HTML if no main content found
106
- 'content_type': response.headers.get('Content-Type', ''),
107
- 'timestamp': datetime.now().isoformat()
108
- }
109
-
110
- # Clean and structure content
111
- text_content = main_content.get_text(separator='\n', strip=True)
112
- cleaned_content = self.advanced_text_cleaning(text_content)
113
-
114
  return {
115
- 'content': cleaned_content,
116
  'content_type': response.headers.get('Content-Type', ''),
117
  'timestamp': datetime.now().isoformat()
118
  }
119
- except requests.RequestException as e:
120
- logger.error(f"Request failed: {e}")
121
- return None
122
  except Exception as e:
123
  logger.error(f"Content fetch failed: {e}")
124
  return None
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
125
 
126
  def _handle_google_drive(self, url: str) -> Optional[Dict]:
127
  """Process Google Drive file links"""
 
39
  self.session = requests.Session()
40
  self.timeout = 10 # seconds
41
  self.session.headers.update({
42
+ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
43
  'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
44
  'Accept-Language': 'en-US,en;q=0.5',
45
  'Accept-Encoding': 'gzip, deflate, br',
46
  'Connection': 'keep-alive',
47
  'Upgrade-Insecure-Requests': '1'
48
  })
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49
 
50
  def validate_url(self, url: str) -> Dict:
51
  """Validate URL format and accessibility"""
 
53
  if not validators.url(url):
54
  return {'is_valid': False, 'message': 'Invalid URL format'}
55
 
56
+ # Try a simple GET request to check if the URL is accessible
57
+ response = self.session.get(url, timeout=self.timeout)
58
  response.raise_for_status()
59
  return {'is_valid': True, 'message': 'URL is valid and accessible'}
60
  except Exception as e:
61
  return {'is_valid': False, 'message': f'URL validation failed: {str(e)}'}
62
 
63
  def fetch_content(self, url: str) -> Optional[Dict]:
64
+ """Simple content fetcher that returns the raw HTML"""
65
  try:
66
+ logger.info(f"Fetching content from URL: {url}")
67
  response = self.session.get(url, timeout=self.timeout)
68
+ response.raise_for_status()
69
+
70
+ # Return the raw HTML content
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
71
  return {
72
+ 'content': response.text,
73
  'content_type': response.headers.get('Content-Type', ''),
74
  'timestamp': datetime.now().isoformat()
75
  }
 
 
 
76
  except Exception as e:
77
  logger.error(f"Content fetch failed: {e}")
78
  return None
79
+
80
+ def process_all_inputs(urls, file, text, combine):
81
+ """Process all input types and generate QR codes"""
82
+ try:
83
+ results = []
84
+ file_processor = FileProcessor() # Initialize file_processor here
85
+
86
+ # Process text input first (since it's direct JSON)
87
+ if text and text.strip():
88
+ try:
89
+ json_data = json.loads(text)
90
+ if isinstance(json_data, list):
91
+ results.extend(json_data)
92
+ else:
93
+ results.append(json_data)
94
+ except json.JSONDecodeError as e:
95
+ return None, [], f"❌ Invalid JSON format: {str(e)}"
96
+
97
+ # Process URLs if provided
98
+ if urls and urls.strip():
99
+ processor = URLProcessor()
100
+ url_list = re.split(r'[,\n]', urls)
101
+ url_list = [url.strip() for url in url_list if url.strip()]
102
+
103
+ for url in url_list:
104
+ logger.info(f"Processing URL: {url}")
105
+ validation = processor.validate_url(url)
106
+
107
+ if validation.get('is_valid'):
108
+ logger.info(f"URL {url} is valid, fetching content...")
109
+ content = processor.fetch_content(url)
110
+
111
+ if content:
112
+ logger.info(f"Content fetched successfully from {url}") results.append(content['content'])
113
+ else:
114
+ logger.warning(f"Failed to fetch content from {url}")
115
+ else:
116
+ logger.error(f"Invalid URL: {validation.get('message')}")
117
+
118
+ # Combine results if needed
119
+ if combine:
120
+ combined_content = "\n".join(results)
121
+ return combined_content, results, None
122
+
123
+ return results, [], None
124
+ except Exception as e:
125
+ logger.error(f"Error processing inputs: {e}")
126
+ return None, [], f"❌ An error occurred: {str(e)}"
127
 
128
  def _handle_google_drive(self, url: str) -> Optional[Dict]:
129
  """Process Google Drive file links"""