Spaces:
Sleeping
Sleeping
""" | |
Image downloading module with multiple search providers | |
""" | |
import os | |
import json | |
import logging | |
from typing import List, Tuple | |
from urllib.parse import quote_plus, urlparse | |
import requests | |
from bs4 import BeautifulSoup | |
from services.appconfig import HEADERS, DOWNLOAD_TIMEOUT, REQUEST_DELAY, ALLOWED_EXTENSIONS | |
from utils.utils import is_valid_image_file, get_file_extension, clean_up_file, rate_limit_delay | |
logger = logging.getLogger(__name__) | |
class ImageDownloader: | |
"""Download images from various search providers""" | |
def __init__(self): | |
"""Initialize ImageDownloader""" | |
self.session = requests.Session() | |
self.session.headers.update(HEADERS) | |
def get_bing_image_urls(self, entity: str, num_images: int = 15) -> List[str]: | |
""" | |
Get image URLs from Bing search | |
Args: | |
entity (str): Entity name to search for | |
num_images (int): Maximum number of URLs to return | |
Returns: | |
List[str]: List of image URLs | |
""" | |
logger.info(f"Searching Bing for {entity} logos...") | |
query = f"{entity} logo png transparent high quality" | |
encoded_query = quote_plus(query) | |
search_url = f"https://www.bing.com/images/search?q={encoded_query}&form=HDRSC2&first=1&tsc=ImageBasicHover" | |
try: | |
response = self.session.get(search_url, timeout=DOWNLOAD_TIMEOUT, verify=False) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.content, 'html.parser') | |
image_urls = [] | |
# Find image data in Bing's format | |
img_containers = soup.find_all('a', {'class': 'iusc'}) | |
for container in img_containers: | |
m_attr = container.get('m') | |
if m_attr: | |
try: | |
img_data = json.loads(m_attr) | |
img_url = img_data.get('murl') or img_data.get('turl') | |
if img_url and self._is_valid_image_url(img_url): | |
image_urls.append(img_url) | |
except json.JSONDecodeError: | |
continue | |
# Fallback: regular img tags | |
if len(image_urls) < 5: | |
img_tags = soup.find_all('img') | |
for img in img_tags: | |
src = img.get('src') or img.get('data-src') | |
if src and self._is_valid_image_url(src) and 'logo' in src.lower(): | |
if src.startswith('http'): | |
image_urls.append(src) | |
logger.info(f"Found {len(image_urls)} URLs from Bing") | |
return image_urls[:num_images] | |
except Exception as e: | |
logger.error(f"Bing search failed for {entity}: {e}") | |
return [] | |
def get_duckduckgo_image_urls(self, entity: str, num_images: int = 15) -> List[str]: | |
""" | |
Get image URLs from DuckDuckGo search | |
Args: | |
entity (str): Entity name to search for | |
num_images (int): Maximum number of URLs to return | |
Returns: | |
List[str]: List of image URLs | |
""" | |
logger.info(f"Searching DuckDuckGo for {entity} logos...") | |
query = f"{entity} logo hd png transparent" | |
encoded_query = quote_plus(query) | |
search_url = f"https://duckduckgo.com/?q={encoded_query}&t=h_&iax=images&ia=images" | |
try: | |
response = self.session.get(search_url, timeout=DOWNLOAD_TIMEOUT,verify=False) | |
response.raise_for_status() | |
soup = BeautifulSoup(response.content, 'html.parser') | |
image_urls = [] | |
img_tags = soup.find_all('img') | |
for img in img_tags: | |
src = img.get('src') or img.get('data-src') | |
if src and self._is_valid_image_url(src) and src.startswith('http'): | |
image_urls.append(src) | |
logger.info(f"Found {len(image_urls)} URLs from DuckDuckGo") | |
return image_urls[:num_images] | |
except Exception as e: | |
logger.error(f"DuckDuckGo search failed for {entity}: {e}") | |
return [] | |
def get_alternative_logo_sources(self, entity: str) -> List[str]: | |
""" | |
Get URLs from alternative logo sources | |
Args: | |
entity (str): Entity name | |
Returns: | |
List[str]: List of alternative logo URLs | |
""" | |
urls = [] | |
entity_clean = entity.lower().replace(' ', '').replace('.', '') | |
entity_hyphen = entity.lower().replace(' ', '-') | |
# Try various logo services | |
logo_sources = [ | |
f"https://cdn.worldvectorlogo.com/logos/{entity_hyphen}.svg", | |
f"https://logos-world.net/wp-content/uploads/2020/11/{entity.replace(' ', '-')}-Logo.png", | |
f"https://logoeps.com/wp-content/uploads/2013/03/vector-{entity_clean}-logo.png", | |
f"https://1000logos.net/wp-content/uploads/2016/10/{entity.replace(' ', '-')}-Logo.png", | |
] | |
for url in logo_sources: | |
try: | |
response = self.session.head(url, timeout=5) | |
if response.status_code == 200: | |
urls.append(url) | |
logger.info(f"Found alternative logo: {url}") | |
except Exception: | |
continue | |
return urls | |
def _is_valid_image_url(self, url: str) -> bool: | |
""" | |
Check if URL is a valid image URL | |
Args: | |
url (str): URL to check | |
Returns: | |
bool: True if valid image URL | |
""" | |
if not url: | |
return False | |
# Check if URL contains image extension | |
url_lower = url.lower() | |
return any(ext in url_lower for ext in ALLOWED_EXTENSIONS) | |
def download_image(self, url: str, filepath: str) -> bool: | |
""" | |
Download image from URL | |
Args: | |
url (str): Image URL | |
filepath (str): Local filepath to save image | |
Returns: | |
bool: True if download successful | |
""" | |
try: | |
logger.debug(f"Downloading: {url}") | |
response = self.session.get(url, timeout=DOWNLOAD_TIMEOUT, stream=True,verify=False) | |
response.raise_for_status() | |
# Check content type | |
content_type = response.headers.get('content-type', '').lower() | |
if not any(img_type in content_type for img_type in ['image', 'svg']): | |
logger.warning(f"Invalid content type for {url}: {content_type}") | |
return False | |
# Download with streaming | |
with open(filepath, 'wb') as f: | |
for chunk in response.iter_content(chunk_size=8192): | |
if chunk: | |
f.write(chunk) | |
# Validate downloaded file | |
if is_valid_image_file(filepath): | |
logger.debug(f"Successfully downloaded: {filepath}") | |
return True | |
else: | |
clean_up_file(filepath) | |
logger.warning(f"Downloaded invalid image: {url}") | |
return False | |
except Exception as e: | |
clean_up_file(filepath) | |
logger.error(f"Download failed for {url}: {e}") | |
return False | |
def download_logos_for_entity(self, entity: str, entity_folder: str, num_logos: int = 10) -> Tuple[int, List[str]]: | |
""" | |
Download logos for a single entity | |
Args: | |
entity (str): Entity name | |
entity_folder (str): Folder to save logos | |
num_logos (int): Number of logos to download | |
Returns: | |
Tuple[int, List[str]]: (number downloaded, list of downloaded files) | |
""" | |
logger.info(f"Downloading top {num_logos} logos for: {entity}") | |
# Collect URLs from all sources | |
all_urls = [] | |
# Alternative logo services | |
alt_urls = self.get_alternative_logo_sources(entity) | |
all_urls.extend(alt_urls) | |
# Bing search | |
bing_urls = self.get_bing_image_urls(entity, 20) | |
all_urls.extend(bing_urls) | |
# DuckDuckGo search | |
ddg_urls = self.get_duckduckgo_image_urls(entity, 15) | |
all_urls.extend(ddg_urls) | |
# Remove duplicates while preserving order | |
unique_urls = [] | |
seen = set() | |
for url in all_urls: | |
if url not in seen: | |
seen.add(url) | |
unique_urls.append(url) | |
if not unique_urls: | |
logger.warning(f"No URLs found for {entity}") | |
return 0, [] | |
logger.info(f"Found {len(unique_urls)} unique URLs for {entity}") | |
# Download images | |
downloaded_files = [] | |
downloaded_count = 0 | |
for i, url in enumerate(unique_urls): | |
if downloaded_count >= num_logos: | |
break | |
try: | |
extension = get_file_extension(url) | |
filename = f"{entity.replace(' ', '_')}_logo_{downloaded_count + 1}{extension}" | |
filepath = os.path.join(entity_folder, filename) | |
if self.download_image(url, filepath): | |
downloaded_count += 1 | |
downloaded_files.append(filepath) | |
logger.info(f"Downloaded ({downloaded_count}/{num_logos}): {filename}") | |
# Be respectful to servers | |
rate_limit_delay(REQUEST_DELAY) | |
except Exception as e: | |
logger.error(f"Error processing URL {url}: {e}") | |
continue | |
logger.info(f"Successfully downloaded {downloaded_count}/{num_logos} logos for {entity}") | |
return downloaded_count, downloaded_files |