theshresthshukla commited on
Commit
21d27b2
·
verified ·
1 Parent(s): e4e699c

Upload 10 files

Browse files
src/services/__pycache__/appconfig.cpython-310.pyc ADDED
Binary file (1.69 kB). View file
 
src/services/__pycache__/entity_extractor.cpython-310.pyc ADDED
Binary file (5.48 kB). View file
 
src/services/__pycache__/image_downloader.cpython-310.pyc ADDED
Binary file (8.31 kB). View file
 
src/services/__pycache__/logo_downloader.cpython-310.pyc ADDED
Binary file (6.5 kB). View file
 
src/services/appconfig.py ADDED
@@ -0,0 +1,60 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Configuration settings for the Logo Downloader application
3
+ """
4
+ import os
5
+ from pathlib import Path
6
+ from dotenv import load_dotenv
7
+ load_dotenv()
8
+
9
+ # API Configuration
10
+ GEMINI_API_KEY = os.getenv('GEMINI_API_KEY', '')
11
+
12
+ # Directory Configuration
13
+ BASE_DIR = Path(__file__).parent
14
+ # DOWNLOADS_DIR = BASE_DIR / 'downloads'
15
+
16
+ DOWNLOADS_DIR = Path('downloads')
17
+
18
+ TEMP_DIR = BASE_DIR / 'temp'
19
+
20
+ # Download Configuration
21
+ MAX_ENTITIES = 20
22
+ MAX_LOGOS_PER_ENTITY = 15
23
+ DEFAULT_LOGOS_PER_ENTITY = 10
24
+ DOWNLOAD_TIMEOUT = 15
25
+ REQUEST_DELAY = 1 # seconds between requests
26
+
27
+ # File Configuration
28
+ ALLOWED_EXTENSIONS = ['.png', '.jpg', '.jpeg', '.svg', '.webp']
29
+ MIN_FILE_SIZE = 500 # bytes
30
+ MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
31
+
32
+ # HTTP Configuration
33
+ HEADERS = {
34
+ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
35
+ 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
36
+ 'Accept-Language': 'en-US,en;q=0.5',
37
+ 'Accept-Encoding': 'gzip, deflate',
38
+ 'Connection': 'keep-alive',
39
+ 'Upgrade-Insecure-Requests': '1',
40
+ }
41
+
42
+ # Image signatures for validation
43
+ IMAGE_SIGNATURES = [
44
+ b'\x89PNG', # PNG
45
+ b'\xff\xd8\xff', # JPEG
46
+ b'<svg', # SVG
47
+ b'RIFF', # WebP
48
+ b'GIF8', # GIF
49
+ ]
50
+
51
+ # Common tech entities for fallback
52
+ COMMON_TECH_ENTITIES = [
53
+ 'Microsoft', 'Google', 'Apple', 'Amazon', 'Adobe', 'React', 'Angular', 'Vue',
54
+ 'Docker', 'Kubernetes', 'AWS', 'Azure', 'Firebase', 'MongoDB', 'PostgreSQL',
55
+ 'Redis', 'Node.js', 'Python', 'JavaScript', 'TypeScript', 'Figma', 'Sketch',
56
+ 'Photoshop', 'Illustrator', 'AutoCAD', 'Unity', 'Blender', 'GitHub', 'GitLab',
57
+ 'Slack', 'Discord', 'Zoom', 'Teams', 'Spotify', 'Netflix', 'Instagram',
58
+ 'Facebook', 'Twitter', 'LinkedIn', 'TikTok', 'WhatsApp', 'Telegram',
59
+ 'Shopify', 'WordPress', 'Salesforce', 'Microsoft Fabric'
60
+ ]
src/services/entity_extractor.py ADDED
@@ -0,0 +1,195 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Entity extraction module using Gemini AI with fallback methods
3
+ """
4
+ import re
5
+ import logging
6
+ from typing import List, Optional
7
+ import google.generativeai as genai
8
+
9
+ from services.appconfig import GEMINI_API_KEY, COMMON_TECH_ENTITIES, MAX_ENTITIES
10
+
11
+ logger = logging.getLogger(__name__)
12
+
13
+
14
+ class EntityExtractor:
15
+ """Extract entities from text using Gemini AI or fallback methods"""
16
+
17
+ def __init__(self, api_key: Optional[str] = None):
18
+ """
19
+ Initialize EntityExtractor
20
+
21
+ Args:
22
+ api_key (str, optional): Gemini API key
23
+ """
24
+ self.api_key = api_key or GEMINI_API_KEY
25
+ self.model = None
26
+ self._setup_gemini()
27
+
28
+ def _setup_gemini(self) -> None:
29
+ """Setup Gemini API"""
30
+ if not self.api_key:
31
+ logger.warning("No Gemini API key provided, using fallback method")
32
+ return
33
+
34
+ try:
35
+ genai.configure(api_key=self.api_key)
36
+ self.model = genai.GenerativeModel('gemini-2.0-flash-exp')
37
+ logger.info("Gemini API initialized successfully")
38
+ except Exception as e:
39
+ logger.error(f"Failed to initialize Gemini API: {e}")
40
+ self.model = None
41
+
42
+ def extract_with_gemini(self, text: str) -> List[str]:
43
+ """
44
+ Extract entities using Gemini AI
45
+
46
+ Args:
47
+ text (str): Input text
48
+
49
+ Returns:
50
+ List[str]: List of extracted entities
51
+ """
52
+ if not self.model:
53
+ raise Exception("Gemini model not available")
54
+
55
+ prompt = """
56
+ Extract company names, product names, software names, tool names, and brand names from this text.
57
+ Only return names that would have recognizable logos (like Microsoft, Adobe, React, etc.).
58
+ Return as a simple list, one name per line, no bullet points or numbers.
59
+ Avoid generic terms like "cloud" or "database".
60
+
61
+ Text: {text}
62
+ """.format(text=text)
63
+
64
+ try:
65
+ response = self.model.generate_content(prompt)
66
+
67
+ if not response.text:
68
+ return []
69
+
70
+ entities = [
71
+ line.strip()
72
+ for line in response.text.strip().split('\n')
73
+ if line.strip() and not line.strip().startswith('-') and len(line.strip()) > 1
74
+ ]
75
+
76
+ # Filter out common words that aren't entities
77
+ filtered_entities = []
78
+ for entity in entities:
79
+ if self._is_valid_entity(entity):
80
+ filtered_entities.append(entity)
81
+
82
+ logger.info(f"Gemini extracted {len(filtered_entities)} entities")
83
+ return filtered_entities[:MAX_ENTITIES]
84
+
85
+ except Exception as e:
86
+ logger.error(f"Gemini extraction failed: {e}")
87
+ raise
88
+
89
+ def extract_with_fallback(self, text: str) -> List[str]:
90
+ """
91
+ Extract entities using fallback pattern matching
92
+
93
+ Args:
94
+ text (str): Input text
95
+
96
+ Returns:
97
+ List[str]: List of extracted entities
98
+ """
99
+ entities = []
100
+
101
+ # Find common tech entities
102
+ for tech_entity in COMMON_TECH_ENTITIES:
103
+ if tech_entity.lower() in text.lower():
104
+ entities.append(tech_entity)
105
+
106
+ # Find capitalized words (likely proper nouns)
107
+ cap_words = re.findall(r'\b[A-Z][a-zA-Z]{2,}\b', text)
108
+ for word in cap_words:
109
+ if self._is_valid_entity(word) and word not in entities:
110
+ entities.append(word)
111
+
112
+ # Find words with specific patterns (e.g., Node.js, C++)
113
+ pattern_words = re.findall(r'\b[A-Z][a-zA-Z]*\.[a-zA-Z]+\b', text)
114
+ for word in pattern_words:
115
+ if word not in entities:
116
+ entities.append(word)
117
+
118
+ # Remove duplicates while preserving order
119
+ unique_entities = []
120
+ seen = set()
121
+ for entity in entities:
122
+ if entity.lower() not in seen:
123
+ seen.add(entity.lower())
124
+ unique_entities.append(entity)
125
+
126
+ logger.info(f"Fallback extracted {len(unique_entities)} entities")
127
+ return unique_entities[:MAX_ENTITIES]
128
+
129
+ def _is_valid_entity(self, entity: str) -> bool:
130
+ """
131
+ Check if entity is valid for logo extraction
132
+
133
+ Args:
134
+ entity (str): Entity name
135
+
136
+ Returns:
137
+ bool: True if valid entity
138
+ """
139
+ # Filter out common words that aren't brand names
140
+ invalid_words = {
141
+ 'the', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with',
142
+ 'by', 'from', 'up', 'about', 'into', 'through', 'during', 'before',
143
+ 'after', 'above', 'below', 'between', 'among'}
144
+ # 'cloud', 'database',
145
+ # 'server', 'client', 'user', 'admin', 'data', 'system', 'network',
146
+ # 'security', 'management', 'development', 'application', 'platform',
147
+ # 'service', 'solution', 'technology', 'software', 'hardware', 'tool'
148
+ # }
149
+
150
+ entity_lower = entity.lower()
151
+
152
+ # Check length
153
+ if len(entity) < 2 or len(entity) > 50:
154
+ return False
155
+
156
+ # Check if it's a common invalid word
157
+ if entity_lower in invalid_words:
158
+ return False
159
+
160
+ # Must contain at least one letter
161
+ if not re.search(r'[a-zA-Z]', entity):
162
+ return False
163
+
164
+ return True
165
+
166
+ def extract_entities(self, text: str) -> List[str]:
167
+ """
168
+ Extract entities from text using available methods
169
+
170
+ Args:
171
+ text (str): Input text
172
+
173
+ Returns:
174
+ List[str]: List of extracted entities
175
+ """
176
+ if not text or not text.strip():
177
+ return []
178
+
179
+ logger.info("Starting entity extraction...")
180
+
181
+ # Try Gemini first
182
+ if self.model:
183
+ try:
184
+ entities = self.extract_with_gemini(text)
185
+ if entities:
186
+ logger.info(f"Successfully extracted {len(entities)} entities with Gemini")
187
+ return entities
188
+ except Exception as e:
189
+ logger.warning(f"Gemini extraction failed, using fallback: {e}")
190
+
191
+ # Use fallback method
192
+ entities = self.extract_with_fallback(text)
193
+ logger.info(f"Extracted {len(entities)} entities using fallback method")
194
+
195
+ return entities
src/services/image_downloader.py ADDED
@@ -0,0 +1,278 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Image downloading module with multiple search providers
3
+ """
4
+ import os
5
+ import json
6
+ import logging
7
+ from typing import List, Tuple
8
+ from urllib.parse import quote_plus, urlparse
9
+ import requests
10
+ from bs4 import BeautifulSoup
11
+
12
+ from services.appconfig import HEADERS, DOWNLOAD_TIMEOUT, REQUEST_DELAY, ALLOWED_EXTENSIONS
13
+ from utils.utils import is_valid_image_file, get_file_extension, clean_up_file, rate_limit_delay
14
+
15
+ logger = logging.getLogger(__name__)
16
+
17
+
18
+ class ImageDownloader:
19
+ """Download images from various search providers"""
20
+
21
+ def __init__(self):
22
+ """Initialize ImageDownloader"""
23
+ self.session = requests.Session()
24
+ self.session.headers.update(HEADERS)
25
+
26
+ def get_bing_image_urls(self, entity: str, num_images: int = 15) -> List[str]:
27
+ """
28
+ Get image URLs from Bing search
29
+
30
+ Args:
31
+ entity (str): Entity name to search for
32
+ num_images (int): Maximum number of URLs to return
33
+
34
+ Returns:
35
+ List[str]: List of image URLs
36
+ """
37
+ logger.info(f"Searching Bing for {entity} logos...")
38
+
39
+ query = f"{entity} logo png transparent high quality"
40
+ encoded_query = quote_plus(query)
41
+ search_url = f"https://www.bing.com/images/search?q={encoded_query}&form=HDRSC2&first=1&tsc=ImageBasicHover"
42
+
43
+ try:
44
+ response = self.session.get(search_url, timeout=DOWNLOAD_TIMEOUT, verify=False)
45
+ response.raise_for_status()
46
+
47
+ soup = BeautifulSoup(response.content, 'html.parser')
48
+ image_urls = []
49
+
50
+ # Find image data in Bing's format
51
+ img_containers = soup.find_all('a', {'class': 'iusc'})
52
+ for container in img_containers:
53
+ m_attr = container.get('m')
54
+ if m_attr:
55
+ try:
56
+ img_data = json.loads(m_attr)
57
+ img_url = img_data.get('murl') or img_data.get('turl')
58
+ if img_url and self._is_valid_image_url(img_url):
59
+ image_urls.append(img_url)
60
+ except json.JSONDecodeError:
61
+ continue
62
+
63
+ # Fallback: regular img tags
64
+ if len(image_urls) < 5:
65
+ img_tags = soup.find_all('img')
66
+ for img in img_tags:
67
+ src = img.get('src') or img.get('data-src')
68
+ if src and self._is_valid_image_url(src) and 'logo' in src.lower():
69
+ if src.startswith('http'):
70
+ image_urls.append(src)
71
+
72
+ logger.info(f"Found {len(image_urls)} URLs from Bing")
73
+ return image_urls[:num_images]
74
+
75
+ except Exception as e:
76
+ logger.error(f"Bing search failed for {entity}: {e}")
77
+ return []
78
+
79
+ def get_duckduckgo_image_urls(self, entity: str, num_images: int = 15) -> List[str]:
80
+ """
81
+ Get image URLs from DuckDuckGo search
82
+
83
+ Args:
84
+ entity (str): Entity name to search for
85
+ num_images (int): Maximum number of URLs to return
86
+
87
+ Returns:
88
+ List[str]: List of image URLs
89
+ """
90
+ logger.info(f"Searching DuckDuckGo for {entity} logos...")
91
+
92
+ query = f"{entity} logo hd png transparent"
93
+ encoded_query = quote_plus(query)
94
+ search_url = f"https://duckduckgo.com/?q={encoded_query}&t=h_&iax=images&ia=images"
95
+
96
+ try:
97
+ response = self.session.get(search_url, timeout=DOWNLOAD_TIMEOUT,verify=False)
98
+ response.raise_for_status()
99
+
100
+ soup = BeautifulSoup(response.content, 'html.parser')
101
+ image_urls = []
102
+
103
+ img_tags = soup.find_all('img')
104
+ for img in img_tags:
105
+ src = img.get('src') or img.get('data-src')
106
+ if src and self._is_valid_image_url(src) and src.startswith('http'):
107
+ image_urls.append(src)
108
+
109
+ logger.info(f"Found {len(image_urls)} URLs from DuckDuckGo")
110
+ return image_urls[:num_images]
111
+
112
+ except Exception as e:
113
+ logger.error(f"DuckDuckGo search failed for {entity}: {e}")
114
+ return []
115
+
116
+ def get_alternative_logo_sources(self, entity: str) -> List[str]:
117
+ """
118
+ Get URLs from alternative logo sources
119
+
120
+ Args:
121
+ entity (str): Entity name
122
+
123
+ Returns:
124
+ List[str]: List of alternative logo URLs
125
+ """
126
+ urls = []
127
+ entity_clean = entity.lower().replace(' ', '').replace('.', '')
128
+ entity_hyphen = entity.lower().replace(' ', '-')
129
+
130
+ # Try various logo services
131
+ logo_sources = [
132
+ f"https://cdn.worldvectorlogo.com/logos/{entity_hyphen}.svg",
133
+ f"https://logos-world.net/wp-content/uploads/2020/11/{entity.replace(' ', '-')}-Logo.png",
134
+ f"https://logoeps.com/wp-content/uploads/2013/03/vector-{entity_clean}-logo.png",
135
+ f"https://1000logos.net/wp-content/uploads/2016/10/{entity.replace(' ', '-')}-Logo.png",
136
+ ]
137
+
138
+ for url in logo_sources:
139
+ try:
140
+ response = self.session.head(url, timeout=5)
141
+ if response.status_code == 200:
142
+ urls.append(url)
143
+ logger.info(f"Found alternative logo: {url}")
144
+ except Exception:
145
+ continue
146
+
147
+ return urls
148
+
149
+ def _is_valid_image_url(self, url: str) -> bool:
150
+ """
151
+ Check if URL is a valid image URL
152
+
153
+ Args:
154
+ url (str): URL to check
155
+
156
+ Returns:
157
+ bool: True if valid image URL
158
+ """
159
+ if not url:
160
+ return False
161
+
162
+ # Check if URL contains image extension
163
+ url_lower = url.lower()
164
+ return any(ext in url_lower for ext in ALLOWED_EXTENSIONS)
165
+
166
+ def download_image(self, url: str, filepath: str) -> bool:
167
+ """
168
+ Download image from URL
169
+
170
+ Args:
171
+ url (str): Image URL
172
+ filepath (str): Local filepath to save image
173
+
174
+ Returns:
175
+ bool: True if download successful
176
+ """
177
+ try:
178
+ logger.debug(f"Downloading: {url}")
179
+
180
+ response = self.session.get(url, timeout=DOWNLOAD_TIMEOUT, stream=True,verify=False)
181
+ response.raise_for_status()
182
+
183
+ # Check content type
184
+ content_type = response.headers.get('content-type', '').lower()
185
+ if not any(img_type in content_type for img_type in ['image', 'svg']):
186
+ logger.warning(f"Invalid content type for {url}: {content_type}")
187
+ return False
188
+
189
+ # Download with streaming
190
+ with open(filepath, 'wb') as f:
191
+ for chunk in response.iter_content(chunk_size=8192):
192
+ if chunk:
193
+ f.write(chunk)
194
+
195
+ # Validate downloaded file
196
+ if is_valid_image_file(filepath):
197
+ logger.debug(f"Successfully downloaded: {filepath}")
198
+ return True
199
+ else:
200
+ clean_up_file(filepath)
201
+ logger.warning(f"Downloaded invalid image: {url}")
202
+ return False
203
+
204
+ except Exception as e:
205
+ clean_up_file(filepath)
206
+ logger.error(f"Download failed for {url}: {e}")
207
+ return False
208
+
209
+ def download_logos_for_entity(self, entity: str, entity_folder: str, num_logos: int = 10) -> Tuple[int, List[str]]:
210
+ """
211
+ Download logos for a single entity
212
+
213
+ Args:
214
+ entity (str): Entity name
215
+ entity_folder (str): Folder to save logos
216
+ num_logos (int): Number of logos to download
217
+
218
+ Returns:
219
+ Tuple[int, List[str]]: (number downloaded, list of downloaded files)
220
+ """
221
+ logger.info(f"Downloading top {num_logos} logos for: {entity}")
222
+
223
+ # Collect URLs from all sources
224
+ all_urls = []
225
+
226
+ # Alternative logo services
227
+ alt_urls = self.get_alternative_logo_sources(entity)
228
+ all_urls.extend(alt_urls)
229
+
230
+ # Bing search
231
+ bing_urls = self.get_bing_image_urls(entity, 20)
232
+ all_urls.extend(bing_urls)
233
+
234
+ # DuckDuckGo search
235
+ ddg_urls = self.get_duckduckgo_image_urls(entity, 15)
236
+ all_urls.extend(ddg_urls)
237
+
238
+ # Remove duplicates while preserving order
239
+ unique_urls = []
240
+ seen = set()
241
+ for url in all_urls:
242
+ if url not in seen:
243
+ seen.add(url)
244
+ unique_urls.append(url)
245
+
246
+ if not unique_urls:
247
+ logger.warning(f"No URLs found for {entity}")
248
+ return 0, []
249
+
250
+ logger.info(f"Found {len(unique_urls)} unique URLs for {entity}")
251
+
252
+ # Download images
253
+ downloaded_files = []
254
+ downloaded_count = 0
255
+
256
+ for i, url in enumerate(unique_urls):
257
+ if downloaded_count >= num_logos:
258
+ break
259
+
260
+ try:
261
+ extension = get_file_extension(url)
262
+ filename = f"{entity.replace(' ', '_')}_logo_{downloaded_count + 1}{extension}"
263
+ filepath = os.path.join(entity_folder, filename)
264
+
265
+ if self.download_image(url, filepath):
266
+ downloaded_count += 1
267
+ downloaded_files.append(filepath)
268
+ logger.info(f"Downloaded ({downloaded_count}/{num_logos}): {filename}")
269
+
270
+ # Be respectful to servers
271
+ rate_limit_delay(REQUEST_DELAY)
272
+
273
+ except Exception as e:
274
+ logger.error(f"Error processing URL {url}: {e}")
275
+ continue
276
+
277
+ logger.info(f"Successfully downloaded {downloaded_count}/{num_logos} logos for {entity}")
278
+ return downloaded_count, downloaded_files
src/services/logo_downloader.py ADDED
@@ -0,0 +1,228 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Main Logo Downloader class that orchestrates the entire process
3
+ """
4
+ import os
5
+ import zipfile
6
+ import logging
7
+ from pathlib import Path
8
+ from typing import List, Tuple, Dict, Optional
9
+
10
+ from services.appconfig import DOWNLOADS_DIR, DEFAULT_LOGOS_PER_ENTITY
11
+ from utils.utils import create_safe_filename, create_directory, format_file_size
12
+ from .entity_extractor import EntityExtractor
13
+ from .image_downloader import ImageDownloader
14
+
15
+ logger = logging.getLogger(__name__)
16
+
17
+ class LogoDownloader:
18
+ """Main class for downloading logos based on extracted entities"""
19
+
20
+ def __init__(self, gemini_api_key: str, output_dir: Optional[str] = None):
21
+ """
22
+ Initialize LogoDownloader
23
+
24
+ Args:
25
+ gemini_api_key (str): Gemini API key for entity extraction
26
+ output_dir (str): Directory to save downloads
27
+ """
28
+ self.output_dir = Path(output_dir) if output_dir else DOWNLOADS_DIR
29
+ self.entity_extractor = EntityExtractor(gemini_api_key)
30
+ self.image_downloader = ImageDownloader()
31
+ self.stats = {
32
+ 'total_entities': 0,
33
+ 'total_downloads': 0,
34
+ 'successful_entities': 0,
35
+ 'failed_entities': 0
36
+ }
37
+
38
+ # Create output directory
39
+ create_directory(self.output_dir)
40
+
41
+ def process_text(self, text: str, logos_per_entity: int = DEFAULT_LOGOS_PER_ENTITY) -> Dict:
42
+ """
43
+ Main processing function: extract entities and download logos
44
+
45
+ Args:
46
+ text (str): Input text containing entity references
47
+ logos_per_entity (int): Number of logos to download per entity
48
+
49
+ Returns:
50
+ Dict: Processing results and statistics
51
+ """
52
+ logger.info("Starting logo download process...")
53
+
54
+ # Reset stats
55
+ self._reset_stats()
56
+
57
+ # Extract entities
58
+ entities = self.entity_extractor.extract_entities(text)
59
+
60
+ if not entities:
61
+ logger.warning("No entities found in text")
62
+ return self._get_results("No entities found in the provided text")
63
+
64
+ self.stats['total_entities'] = len(entities)
65
+ logger.info(f"Found {len(entities)} entities: {', '.join(entities)}")
66
+
67
+ # Download logos for each entity
68
+ results = []
69
+ for i, entity in enumerate(entities, 1):
70
+ logger.info(f"Processing [{i}/{len(entities)}]: {entity}")
71
+
72
+ try:
73
+ result = self._process_single_entity(entity, logos_per_entity)
74
+ results.append(result)
75
+
76
+ if result['downloaded_count'] > 0:
77
+ self.stats['successful_entities'] += 1
78
+ self.stats['total_downloads'] += result['downloaded_count']
79
+ else:
80
+ self.stats['failed_entities'] += 1
81
+
82
+ except Exception as e:
83
+ logger.error(f"Failed to process entity {entity}: {e}")
84
+ self.stats['failed_entities'] += 1
85
+ results.append({
86
+ 'entity': entity,
87
+ 'downloaded_count': 0,
88
+ 'files': [],
89
+ 'error': str(e)
90
+ })
91
+
92
+ # Create zip package if we have downloads
93
+ zip_path = None
94
+ if self.stats['total_downloads'] > 0:
95
+ zip_path = self._create_zip_package()
96
+
97
+ return self._get_results(
98
+ "Processing completed successfully",
99
+ entities=entities,
100
+ results=results,
101
+ zip_path=zip_path
102
+ )
103
+
104
+ def _process_single_entity(self, entity: str, logos_per_entity: int) -> Dict:
105
+
106
+ """
107
+ Process a single entity: create folder and download logos
108
+
109
+ Args:
110
+ entity (str): Entity name
111
+ logos_per_entity (int): Number of logos to download
112
+
113
+ Returns:
114
+ Dict: Processing result for this entity
115
+ """
116
+
117
+ safe_name = create_safe_filename(entity)
118
+ entity_folder = self.output_dir / safe_name
119
+
120
+ # Create entity folder
121
+ if not create_directory(entity_folder):
122
+ raise Exception(f"Failed to create directory for {entity}")
123
+
124
+ # Download logos
125
+ downloaded_count, downloaded_files = self.image_downloader.download_logos_for_entity(
126
+ entity, str(entity_folder), logos_per_entity
127
+ )
128
+
129
+ return {
130
+ 'entity': entity,
131
+ 'safe_name': safe_name,
132
+ 'downloaded_count': downloaded_count,
133
+ 'files': downloaded_files,
134
+ 'folder': str(entity_folder)
135
+ }
136
+
137
+ def _create_zip_package(self) -> str:
138
+ """
139
+ Create ZIP package of all downloaded logos
140
+
141
+ Returns:
142
+ str: Path to created ZIP file
143
+ """
144
+ zip_filename = f"{self.output_dir.name}_logos.zip"
145
+ zip_path = self.output_dir.parent / zip_filename
146
+
147
+ logger.info(f"Creating ZIP package: {zip_path}")
148
+
149
+ try:
150
+ with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
151
+ for root, dirs, files in os.walk(self.output_dir):
152
+ for file in files:
153
+ file_path = os.path.join(root, file)
154
+ arcname = os.path.relpath(file_path, self.output_dir)
155
+ zipf.write(file_path, arcname)
156
+
157
+ file_size = os.path.getsize(zip_path)
158
+ logger.info(f"ZIP package created: {zip_path} ({format_file_size(file_size)})")
159
+ return str(zip_path)
160
+
161
+ except Exception as e:
162
+ logger.error(f"Failed to create ZIP package: {e}")
163
+ raise
164
+
165
+ def _reset_stats(self) -> None:
166
+ """Reset processing statistics"""
167
+ self.stats = {
168
+ 'total_entities': 0,
169
+ 'total_downloads': 0,
170
+ 'successful_entities': 0,
171
+ 'failed_entities': 0
172
+ }
173
+
174
+ def _get_results(self, message: str, **kwargs) -> Dict:
175
+ """
176
+ Get formatted results dictionary
177
+
178
+ Args:
179
+ message (str): Status message
180
+ **kwargs: Additional result data
181
+
182
+ Returns:
183
+ Dict: Formatted results
184
+ """
185
+ return {
186
+ 'status': 'success' if self.stats['total_downloads'] > 0 else 'warning',
187
+ 'message': message,
188
+ 'stats': self.stats.copy(),
189
+ **kwargs
190
+ }
191
+
192
+ def get_stats_summary(self) -> str:
193
+ """
194
+ Get human-readable stats summary
195
+
196
+ Returns:
197
+ str: Stats summary
198
+ """
199
+ if self.stats['total_entities'] == 0:
200
+ return "No entities processed"
201
+
202
+ avg_downloads = (
203
+ self.stats['total_downloads'] / self.stats['successful_entities']
204
+ if self.stats['successful_entities'] > 0 else 0
205
+ )
206
+
207
+ return (
208
+ f"Processed {self.stats['total_entities']} entities. "
209
+ f"Successfully downloaded {self.stats['total_downloads']} logos "
210
+ f"({avg_downloads:.1f} average per entity). "
211
+ f"Success rate: {self.stats['successful_entities']}/{self.stats['total_entities']}"
212
+ )
213
+
214
+
215
+ def download_logos(text: str, gemini_api_key: str, logos_per_entity: int = DEFAULT_LOGOS_PER_ENTITY) -> Dict:
216
+ """
217
+ Convenience function for downloading logos
218
+
219
+ Args:
220
+ text (str): Text containing entity references
221
+ gemini_api_key (str): Gemini API key
222
+ logos_per_entity (int): Number of logos per entity
223
+
224
+ Returns:
225
+ Dict: Processing results
226
+ """
227
+ downloader = LogoDownloader(gemini_api_key)
228
+ return downloader.process_text(text, logos_per_entity)
src/utils/__pycache__/utils.cpython-310.pyc ADDED
Binary file (4.52 kB). View file
 
src/utils/utils.py ADDED
@@ -0,0 +1,178 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Utility functions for the Logo Downloader application
3
+ """
4
+ import os
5
+ import re
6
+ import json
7
+ import time
8
+ from pathlib import Path
9
+ from typing import List, Optional
10
+ from urllib.parse import urlparse
11
+ import logging
12
+
13
+ from services.appconfig import IMAGE_SIGNATURES, MIN_FILE_SIZE, MAX_FILE_SIZE
14
+
15
+ # Setup logging
16
+ logging.basicConfig(level=logging.INFO)
17
+ logger = logging.getLogger(__name__)
18
+
19
+
20
+ def create_safe_filename(name: str) -> str:
21
+ """
22
+ Create a safe filename from entity name
23
+
24
+ Args:
25
+ name (str): Entity name
26
+
27
+ Returns:
28
+ str: Safe filename
29
+ """
30
+ safe_name = re.sub(r'[^\w\s-]', '', name).strip()
31
+ safe_name = re.sub(r'[-\s]+', '_', safe_name)
32
+ return safe_name
33
+
34
+
35
+ def get_file_extension(url: str) -> str:
36
+ """
37
+ Extract file extension from URL
38
+
39
+ Args:
40
+ url (str): Image URL
41
+
42
+ Returns:
43
+ str: File extension
44
+ """
45
+ parsed_url = urlparse(url)
46
+ extension = os.path.splitext(parsed_url.path)[1]
47
+
48
+ if not extension or extension.lower() not in ['.png', '.jpg', '.jpeg', '.svg', '.webp']:
49
+ extension = '.png'
50
+
51
+ return extension
52
+
53
+
54
+ def is_valid_image_file(filepath: str) -> bool:
55
+ """
56
+ Validate if file is a proper image
57
+
58
+ Args:
59
+ filepath (str): Path to image file
60
+
61
+ Returns:
62
+ bool: True if valid image
63
+ """
64
+ try:
65
+ # Check file exists and size
66
+ if not os.path.exists(filepath):
67
+ return False
68
+
69
+ file_size = os.path.getsize(filepath)
70
+ if file_size < MIN_FILE_SIZE or file_size > MAX_FILE_SIZE:
71
+ logger.warning(f"Invalid file size: {file_size}")
72
+ return False
73
+
74
+ # Check image signature
75
+ with open(filepath, 'rb') as f:
76
+ header = f.read(12)
77
+
78
+ for signature in IMAGE_SIGNATURES:
79
+ if header.startswith(signature):
80
+ return True
81
+
82
+ return False
83
+
84
+ except Exception as e:
85
+ logger.error(f"Error validating image: {e}")
86
+ return False
87
+
88
+
89
+ def create_directory(path: Path) -> bool:
90
+ """
91
+ Create directory if it doesn't exist
92
+
93
+ Args:
94
+ path (Path): Directory path
95
+
96
+ Returns:
97
+ bool: True if successful
98
+ """
99
+ try:
100
+ path.mkdir(parents=True, exist_ok=True)
101
+ return True
102
+ except Exception as e:
103
+ logger.error(f"Error creating directory {path}: {e}")
104
+ return False
105
+
106
+
107
+ def clean_up_file(filepath: str) -> None:
108
+ """
109
+ Remove file if it exists
110
+
111
+ Args:
112
+ filepath (str): Path to file to remove
113
+ """
114
+ try:
115
+ if os.path.exists(filepath):
116
+ os.remove(filepath)
117
+ except Exception as e:
118
+ logger.error(f"Error removing file {filepath}: {e}")
119
+
120
+
121
+ def parse_json_safely(json_string: str) -> Optional[dict]:
122
+ """
123
+ Safely parse JSON string
124
+
125
+ Args:
126
+ json_string (str): JSON string to parse
127
+
128
+ Returns:
129
+ dict or None: Parsed JSON or None if failed
130
+ """
131
+ try:
132
+ return json.loads(json_string)
133
+ except json.JSONDecodeError:
134
+ return None
135
+
136
+
137
+ def rate_limit_delay(delay: float = 1.0) -> None:
138
+ """
139
+ Add delay between requests to be respectful to servers
140
+
141
+ Args:
142
+ delay (float): Delay in seconds
143
+ """
144
+ time.sleep(delay)
145
+
146
+
147
+ def format_file_size(size_bytes: int) -> str:
148
+ """
149
+ Format file size in human readable format
150
+
151
+ Args:
152
+ size_bytes (int): Size in bytes
153
+
154
+ Returns:
155
+ str: Formatted size string
156
+ """
157
+ if size_bytes < 1024:
158
+ return f"{size_bytes} B"
159
+ elif size_bytes < 1024 * 1024:
160
+ return f"{size_bytes / 1024:.1f} KB"
161
+ else:
162
+ return f"{size_bytes / (1024 * 1024):.1f} MB"
163
+
164
+
165
+ def truncate_text(text: str, max_length: int = 100) -> str:
166
+ """
167
+ Truncate text to specified length
168
+
169
+ Args:
170
+ text (str): Text to truncate
171
+ max_length (int): Maximum length
172
+
173
+ Returns:
174
+ str: Truncated text
175
+ """
176
+ if len(text) <= max_length:
177
+ return text
178
+ return text[:max_length - 3] + "..."