xteam-urbot / pyUltroid /fns /google_image.py
Devesh Pal
Ultroid v0.7 Updates
d7854c4
raw
history blame
43.1 kB
#!/usr/bin/env python
# In[ ]:
# coding: utf-8
###### Searching and Downloading Google Images to the local disk ######
import codecs
import datetime
import http.client
import json
import os
import re
import ssl
import sys
import time # Importing the time library to check the time of code execution
import urllib.request
from http.client import BadStatusLine
from urllib.parse import quote
from urllib.request import HTTPError, Request, URLError, urlopen
# Import Libraries
from .. import LOGS
from .tools import async_searcher
http.client._MAXHEADERS = 1000
args_list = [
"keywords",
"keywords_from_file",
"prefix_keywords",
"suffix_keywords",
"limit",
"format",
"color",
"color_type",
"usage_rights",
"size",
"exact_size",
"aspect_ratio",
"type",
"time",
"time_range",
"delay",
"url",
"single_image",
"output_directory",
"image_directory",
"no_directory",
"proxy",
"similar_images",
"specific_site",
"metadata",
"extract_metadata",
"socket_timeout",
"thumbnail",
"thumbnail_only",
"language",
"prefix",
"chromedriver",
"related_images",
"safe_search",
"no_numbering",
"offset",
"no_download",
"save_source",
"ignore_urls",
]
class googleimagesdownload:
def __init__(self):
pass
# Downloading entire Web Document (Raw Page Content)
async def download_page(self, url):
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/88.0.4324.104 Safari/537.36"
}
# req = urllib.request.Request(url, headers=headers)
# resp = urllib.request.urlopen(req)
# return str(resp.read())
resp = await async_searcher(url, re_content=True, headers=headers)
return str(resp)
except Exception as er:
LOGS.exception(
"Could not open URL. Please check your internet connection and/or ssl settings \n"
"If you are using proxy, make sure your proxy settings is configured correctly"
)
raise er
# Download Page for more than 100 images
def download_extended_page(self, url, chromedriver):
from selenium import webdriver
from selenium.webdriver.common.keys import Keys
options = webdriver.ChromeOptions()
options.add_argument("--no-sandbox")
options.add_argument("--headless")
try:
browser = webdriver.Chrome(chromedriver, chrome_options=options)
except Exception as e:
LOGS.info(
"Looks like we cannot locate the path the 'chromedriver' (use the '--chromedriver' "
"argument to specify the path to the executable.) or google chrome browser is not "
"installed on your machine (exception: %s)" % e
)
sys.exit()
browser.set_window_size(1024, 768)
# Open the link
browser.get(url)
time.sleep(1)
element = browser.find_element_by_tag_name("body")
# Scroll down
for i in range(30):
element.send_keys(Keys.PAGE_DOWN)
time.sleep(0.3)
try:
browser.find_element_by_id("smb").click()
for _ in range(50):
element.send_keys(Keys.PAGE_DOWN)
time.sleep(0.3) # bot id protection
except BaseException:
for _ in range(10):
element.send_keys(Keys.PAGE_DOWN)
time.sleep(0.3) # bot id protection
time.sleep(0.5)
source = browser.page_source # page source
# close the browser
browser.close()
return source
# Correcting the escape characters for python2
def replace_with_byte(self, match):
return chr(int(match.group(0)[1:], 8))
def repair(self, brokenjson):
# up to 3 digits for byte values up to FF
invalid_escape = re.compile(r"\\[0-7]{1,3}")
return invalid_escape.sub(self.replace_with_byte, brokenjson)
# Finding 'Next Image' from the given raw page
def get_next_tab(self, s):
start_line = s.find('class="dtviD"')
if start_line == -1: # If no links are found then give an error!
end_quote = 0
link = "no_tabs"
return link, "", end_quote
start_line = s.find('class="dtviD"')
start_content = s.find('href="', start_line + 1)
end_content = s.find('">', start_content + 1)
url_item = "https://www.google.com" + str(s[start_content + 6 : end_content])
url_item = url_item.replace("&", "&")
start_line_2 = s.find('class="dtviD"')
s = s.replace("&", "&")
start_content_2 = s.find(":", start_line_2 + 1)
end_content_2 = s.find("&usg=", start_content_2 + 1)
url_item_name = str(s[start_content_2 + 1 : end_content_2])
chars = url_item_name.find(",g_1:")
chars_end = url_item_name.find(":", chars + 6)
if chars_end == -1:
updated_item_name = (url_item_name[chars + 5 :]).replace("+", " ")
else:
updated_item_name = (url_item_name[chars + 5 : chars_end]).replace("+", " ")
return url_item, updated_item_name, end_content
# Getting all links with the help of '_images_get_next_image'
def get_all_tabs(self, page):
tabs = {}
while True:
item, item_name, end_content = self.get_next_tab(page)
if item == "no_tabs":
break
if len(item_name) > 100 or item_name == "background-color":
break
# Append all the links in the list named 'Links'
tabs[item_name] = item
# Timer could be used to slow down the request for image
# downloads
time.sleep(0.1)
page = page[end_content:]
return tabs
# Format the object in readable format
def format_object(self, object):
data = object[1]
main = data[3]
info = data[9]
return {
"image_height": main[2],
"image_width": main[1],
"image_link": main[0],
"image_format": main[0][-1 * (len(main[0]) - main[0].rfind(".") - 1) :],
"image_description": info["2003"][3],
"image_source": info["2003"][2],
"image_thumbnail_url": data[2][0],
}
# function to download single image
def single_image(self, image_url):
main_directory = "downloads"
extensions = (".jpg", ".gif", ".png", ".bmp", ".svg", ".webp", ".ico")
url = image_url
try:
os.makedirs(main_directory)
except OSError as e:
if e.errno != 17:
raise
req = Request(
url,
headers={
"User-Agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.17 (KHTML, like Gecko) Chrome/24.0.1312.27 Safari/537.17"
},
)
response = urlopen(req, None, 10)
data = response.read()
response.close()
image_name = str(url[(url.rfind("/")) + 1 :])
if "?" in image_name:
image_name = image_name[: image_name.find("?")]
# if ".jpg" in image_name or ".gif" in image_name or ".png" in
# image_name or ".bmp" in image_name or ".svg" in image_name or ".webp"
# in image_name or ".ico" in image_name:
if any(map(lambda extension: extension in image_name, extensions)):
file_name = main_directory + "/" + image_name
else:
file_name = main_directory + "/" + image_name + ".jpg"
image_name = image_name + ".jpg"
try:
with open(file_name, "wb") as output_file:
output_file.write(data)
except OSError as e:
raise e
def similar_images(self, similar_images):
try:
searchUrl = (
"https://www.google.com/searchbyimage?site=search&sa=X&image_url="
+ similar_images
)
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2228.0 Safari/537.36"
}
req1 = urllib.request.Request(searchUrl, headers=headers)
resp1 = urllib.request.urlopen(req1)
content = str(resp1.read())
l1 = content.find("AMhZZ")
l2 = content.find("&", l1)
urll = content[l1:l2]
newurl = (
"https://www.google.com/search?tbs=sbi:" + urll + "&site=search&sa=X"
)
req2 = urllib.request.Request(newurl, headers=headers)
urllib.request.urlopen(req2)
l3 = content.find("/search?sa=X&q=")
l4 = content.find(";", l3 + 19)
return content[l3 + 19 : l4]
except BaseException:
return "Cloud not connect to Google Images endpoint"
# Building URL parameters
def build_url_parameters(self, arguments):
if arguments["language"]:
lang = "&lr="
lang_param = {
"Arabic": "lang_ar",
"Chinese (Simplified)": "lang_zh-CN",
"Chinese (Traditional)": "lang_zh-TW",
"Czech": "lang_cs",
"Danish": "lang_da",
"Dutch": "lang_nl",
"English": "lang_en",
"Estonian": "lang_et",
"Finnish": "lang_fi",
"French": "lang_fr",
"German": "lang_de",
"Greek": "lang_el",
"Hebrew": "lang_iw ",
"Hungarian": "lang_hu",
"Icelandic": "lang_is",
"Italian": "lang_it",
"Japanese": "lang_ja",
"Korean": "lang_ko",
"Latvian": "lang_lv",
"Lithuanian": "lang_lt",
"Norwegian": "lang_no",
"Portuguese": "lang_pt",
"Polish": "lang_pl",
"Romanian": "lang_ro",
"Russian": "lang_ru",
"Spanish": "lang_es",
"Swedish": "lang_sv",
"Turkish": "lang_tr",
}
lang_url = lang + lang_param[arguments["language"]]
else:
lang_url = ""
if arguments["time_range"]:
json_acceptable_string = arguments["time_range"].replace("'", '"')
d = json.loads(json_acceptable_string)
time_range = ",cdr:1,cd_min:" + d["time_min"] + ",cd_max:" + d["time_max"]
else:
time_range = ""
if arguments["exact_size"]:
size_array = [x.strip() for x in arguments["exact_size"].split(",")]
exact_size = (
",isz:ex,iszw:" + str(size_array[0]) + ",iszh:" + str(size_array[1])
)
else:
exact_size = ""
built_url = "&tbs="
counter = 0
params = {
"color": [
arguments["color"],
{
"red": "ic:specific,isc:red",
"orange": "ic:specific,isc:orange",
"yellow": "ic:specific,isc:yellow",
"green": "ic:specific,isc:green",
"teal": "ic:specific,isc:teel",
"blue": "ic:specific,isc:blue",
"purple": "ic:specific,isc:purple",
"pink": "ic:specific,isc:pink",
"white": "ic:specific,isc:white",
"gray": "ic:specific,isc:gray",
"black": "ic:specific,isc:black",
"brown": "ic:specific,isc:brown",
},
],
"color_type": [
arguments["color_type"],
{
"full-color": "ic:color",
"black-and-white": "ic:gray",
"transparent": "ic:trans",
},
],
"usage_rights": [
arguments["usage_rights"],
{
"labeled-for-reuse-with-modifications": "sur:fmc",
"labeled-for-reuse": "sur:fc",
"labeled-for-noncommercial-reuse-with-modification": "sur:fm",
"labeled-for-nocommercial-reuse": "sur:f",
},
],
"size": [
arguments["size"],
{
"large": "isz:l",
"medium": "isz:m",
"icon": "isz:i",
">400*300": "isz:lt,islt:qsvga",
">640*480": "isz:lt,islt:vga",
">800*600": "isz:lt,islt:svga",
">1024*768": "visz:lt,islt:xga",
">2MP": "isz:lt,islt:2mp",
">4MP": "isz:lt,islt:4mp",
">6MP": "isz:lt,islt:6mp",
">8MP": "isz:lt,islt:8mp",
">10MP": "isz:lt,islt:10mp",
">12MP": "isz:lt,islt:12mp",
">15MP": "isz:lt,islt:15mp",
">20MP": "isz:lt,islt:20mp",
">40MP": "isz:lt,islt:40mp",
">70MP": "isz:lt,islt:70mp",
},
],
"type": [
arguments["type"],
{
"face": "itp:face",
"photo": "itp:photo",
"clipart": "itp:clipart",
"line-drawing": "itp:lineart",
"animated": "itp:animated",
},
],
"time": [
arguments["time"],
{
"past-24-hours": "qdr:d",
"past-7-days": "qdr:w",
"past-month": "qdr:m",
"past-year": "qdr:y",
},
],
"aspect_ratio": [
arguments["aspect_ratio"],
{
"tall": "iar:t",
"square": "iar:s",
"wide": "iar:w",
"panoramic": "iar:xw",
},
],
"format": [
arguments["format"],
{
"jpg": "ift:jpg",
"gif": "ift:gif",
"png": "ift:png",
"bmp": "ift:bmp",
"svg": "ift:svg",
"webp": "webp",
"ico": "ift:ico",
"raw": "ift:craw",
},
],
}
for value in params.values():
if value[0] is not None:
ext_param = value[1][value[0]]
# counter will tell if it is first param added or not
if counter == 0:
# add it to the built url
built_url += ext_param
else:
built_url = built_url + "," + ext_param
counter += 1
built_url = lang_url + built_url + exact_size + time_range
return built_url
# building main search URL
def build_search_url(
self, search_term, params, url, similar_images, specific_site, safe_search
):
# check the args and choose the URL
if url:
url = url
elif similar_images:
keywordem = self.similar_images(similar_images)
url = (
"https://www.google.com/search?q="
+ keywordem
+ "&espv=2&biw=1366&bih=667&site=webhp&source=lnms&tbm=isch&sa=X&ei=XosDVaCXD8TasATItgE&ved=0CAcQ_AUoAg"
)
elif specific_site:
url = (
"https://www.google.com/search?q="
+ quote(search_term.encode("utf-8"))
+ "&as_sitesearch="
+ specific_site
+ "&espv=2&biw=1366&bih=667&site=webhp&source=lnms&tbm=isch"
+ params
+ "&sa=X&ei=XosDVaCXD8TasATItgE&ved=0CAcQ_AUoAg"
)
else:
url = (
"https://www.google.com/search?q="
+ quote(search_term.encode("utf-8"))
+ "&espv=2&biw=1366&bih=667&site=webhp&source=lnms&tbm=isch"
+ params
+ "&sa=X&ei=XosDVaCXD8TasATItgE&ved=0CAcQ_AUoAg"
)
# safe search check
if safe_search:
# check safe_search
safe_search_string = "&safe=active"
url = url + safe_search_string
return url
# measures the file size
def file_size(self, file_path):
if os.path.isfile(file_path):
file_info = os.stat(file_path)
size = file_info.st_size
for x in ["bytes", "KB", "MB", "GB", "TB"]:
if size < 1024.0:
return "%3.1f %s" % (size, x)
size /= 1024.0
return size
# keywords from file
def keywords_from_file(self, file_name):
search_keyword = []
with codecs.open(file_name, "r", encoding="utf-8-sig") as f:
if ".csv" in file_name or ".txt" in file_name:
for line in f:
if line not in ["\n", "\r\n"]:
search_keyword.append(line.replace("\n", "").replace("\r", ""))
else:
LOGS.info(
"Invalid file type: Valid file types are either .txt or .csv \n"
"exiting..."
)
sys.exit()
return search_keyword
# make directories
def create_directories(self, main_directory, dir_name, thumbnail, thumbnail_only):
dir_name_thumbnail = dir_name + " - thumbnail"
# make a search keyword directory
try:
if not os.path.exists(main_directory):
os.makedirs(main_directory)
time.sleep(0.15)
path = dir_name
sub_directory = os.path.join(main_directory, path)
if not os.path.exists(sub_directory):
os.makedirs(sub_directory)
if thumbnail or thumbnail_only:
sub_directory_thumbnail = os.path.join(
main_directory, dir_name_thumbnail
)
if not os.path.exists(sub_directory_thumbnail):
os.makedirs(sub_directory_thumbnail)
except OSError as e:
if e.errno != 17:
raise
# Download Image thumbnails
def download_image_thumbnail(
self,
image_url,
main_directory,
dir_name,
return_image_name,
socket_timeout,
no_download,
save_source,
img_src,
):
if no_download:
return "success", "Printed url without downloading"
try:
req = Request(
image_url,
headers={
"User-Agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.17 (KHTML, like Gecko) Chrome/24.0.1312.27 Safari/537.17"
},
)
try:
# timeout time to download an image
timeout = float(socket_timeout) if socket_timeout else 10
response = urlopen(req, None, timeout)
data = response.read()
response.close()
path = (
main_directory
+ "/"
+ dir_name
+ " - thumbnail"
+ "/"
+ return_image_name
)
try:
with open(path, "wb") as output_file:
output_file.write(data)
if save_source:
list_path = main_directory + "/" + save_source + ".txt"
with open(list_path, "a") as list_file:
list_file.write(path + "\t" + img_src + "\n")
except OSError as e:
download_status = "fail"
download_message = (
"OSError on an image...trying next one..." + " Error: " + str(e)
)
download_status = "success"
download_message = (
"Completed Image Thumbnail ====> " + return_image_name
)
except UnicodeEncodeError as e:
download_status = "fail"
download_message = (
"UnicodeEncodeError on an image...trying next one..."
+ " Error: "
+ str(e)
)
except HTTPError as e: # If there is any HTTPError
download_status = "fail"
download_message = (
"HTTPError on an image...trying next one..." + " Error: " + str(e)
)
except URLError as e:
download_status = "fail"
download_message = (
"URLError on an image...trying next one..." + " Error: " + str(e)
)
except ssl.CertificateError as e:
download_status = "fail"
download_message = (
"CertificateError on an image...trying next one..."
+ " Error: "
+ str(e)
)
except IOError as e: # If there is any IOError
download_status = "fail"
download_message = (
"IOError on an image...trying next one..." + " Error: " + str(e)
)
return download_status, download_message
# Download Images
def download_image(
self,
image_url,
image_format,
main_directory,
dir_name,
count,
socket_timeout,
prefix,
no_numbering,
no_download,
save_source,
img_src,
thumbnail_only,
format,
ignore_urls,
):
if ignore_urls and any(url in image_url for url in ignore_urls.split(",")):
return (
"fail",
"Image ignored due to 'ignore url' parameter",
None,
image_url,
)
if thumbnail_only:
return (
"success",
"Skipping image download...",
str(image_url[(image_url.rfind("/")) + 1 :]),
image_url,
)
if no_download:
return "success", "Printed url without downloading", None, image_url
try:
req = Request(
image_url,
headers={
"User-Agent": "Mozilla/5.0 (X11; Linux i686) AppleWebKit/537.17 (KHTML, like Gecko) Chrome/24.0.1312.27 Safari/537.17"
},
)
try:
# timeout time to download an image
timeout = float(socket_timeout) if socket_timeout else 10
response = urlopen(req, None, timeout)
data = response.read()
response.close()
extensions = [
".jpg",
".jpeg",
".gif",
".png",
".bmp",
".svg",
".webp",
".ico",
]
# keep everything after the last '/'
image_name = str(image_url[(image_url.rfind("/")) + 1 :])
if format and (not image_format or image_format != format):
download_status = "fail"
download_message = "Wrong image format returned. Skipping..."
return_image_name = ""
absolute_path = ""
return (
download_status,
download_message,
return_image_name,
absolute_path,
)
if (
image_format == ""
or not image_format
or "." + image_format not in extensions
):
download_status = "fail"
download_message = "Invalid or missing image format. Skipping..."
return_image_name = ""
absolute_path = ""
return (
download_status,
download_message,
return_image_name,
absolute_path,
)
if image_name.lower().find("." + image_format) < 0:
image_name = image_name + "." + image_format
else:
image_name = image_name[
: image_name.lower().find("." + image_format)
+ (len(image_format) + 1)
]
# prefix name in image
prefix = prefix + " " if prefix else ""
if no_numbering:
path = main_directory + "/" + dir_name + "/" + prefix + image_name
else:
path = (
main_directory
+ "/"
+ dir_name
+ "/"
+ prefix
+ str(count)
+ "."
+ image_name
)
try:
with open(path, "wb") as output_file:
output_file.write(data)
if save_source:
list_path = main_directory + "/" + save_source + ".txt"
with open(list_path, "a") as list_file:
list_file.write(path + "\t" + img_src + "\n")
absolute_path = os.path.abspath(path)
except OSError as e:
download_status = "fail"
download_message = (
"OSError on an image...trying next one..." + " Error: " + str(e)
)
return_image_name = ""
absolute_path = ""
# return image name back to calling method to use it for
# thumbnail downloads
download_status = "success"
download_message = (
"Completed Image ====> " + prefix + str(count) + "." + image_name
)
return_image_name = prefix + str(count) + "." + image_name
except UnicodeEncodeError as e:
download_status = "fail"
download_message = (
"UnicodeEncodeError on an image...trying next one..."
+ " Error: "
+ str(e)
)
return_image_name = ""
absolute_path = ""
except URLError as e:
download_status = "fail"
download_message = (
"URLError on an image...trying next one..." + " Error: " + str(e)
)
return_image_name = ""
absolute_path = ""
except BadStatusLine as e:
download_status = "fail"
download_message = (
"BadStatusLine on an image...trying next one..."
+ " Error: "
+ str(e)
)
return_image_name = ""
absolute_path = ""
except HTTPError as e: # If there is any HTTPError
download_status = "fail"
download_message = (
"HTTPError on an image...trying next one..." + " Error: " + str(e)
)
return_image_name = ""
absolute_path = ""
except URLError as e:
download_status = "fail"
download_message = (
"URLError on an image...trying next one..." + " Error: " + str(e)
)
return_image_name = ""
absolute_path = ""
except ssl.CertificateError as e:
download_status = "fail"
download_message = (
"CertificateError on an image...trying next one..."
+ " Error: "
+ str(e)
)
return_image_name = ""
absolute_path = ""
except IOError as e: # If there is any IOError
download_status = "fail"
download_message = (
"IOError on an image...trying next one..." + " Error: " + str(e)
)
return_image_name = ""
absolute_path = ""
return download_status, download_message, return_image_name, absolute_path
# Finding 'Next Image' from the given raw page
def _get_next_item(self, s):
start_line = s.find("rg_meta notranslate")
if start_line == -1: # If no links are found then give an error!
end_quote = 0
link = "no_links"
return link, end_quote
start_line = s.find('class="rg_meta notranslate">')
start_object = s.find("{", start_line + 1)
end_object = s.find("</div>", start_object + 1)
object_raw = str(s[start_object:end_object])
# remove escape characters based on python version
try:
object_decode = bytes(object_raw, "utf-8").decode("unicode_escape")
final_object = json.loads(object_decode)
except BaseException:
final_object = ""
return final_object, end_object
# Getting all links with the help of '_images_get_next_image'
def _get_image_objects(self, s):
start_line = s.find("AF_initDataCallback({key: \\'ds:1\\'") - 10
start_object = s.find("[", start_line + 1)
end_object = s.find("</script>", start_object + 1) - 4
object_raw = str(s[start_object:end_object])
object_decode = bytes(object_raw[:-1], "utf-8").decode("unicode_escape")
# LOGS.info(_format.paste_text(object_decode[:-15]))
return json.loads(object_decode[:-15])[31][0][12][2]
def _get_all_items(self, page, main_directory, dir_name, limit, arguments):
items = []
abs_path = []
errorCount = 0
i = 0
count = 1
# LOGS.info(f"page : {_format.paste_text(page)}")
image_objects = self._get_image_objects(page)
while count < limit + 1:
if not image_objects:
print("no_links")
break
else:
# format the item for readability
try:
object = self.format_object(image_objects[i])
# download the images
(
download_status,
download_message,
return_image_name,
absolute_path,
) = self.download_image(
object["image_link"],
object["image_format"],
main_directory,
dir_name,
count,
arguments["socket_timeout"],
arguments["prefix"],
arguments["no_numbering"],
arguments["no_download"],
arguments["save_source"],
object["image_source"],
arguments["thumbnail_only"],
arguments["format"],
arguments["ignore_urls"],
)
except (TypeError, IndexError) as er:
LOGS.debug(er)
download_status = None
if download_status == "success":
# download image_thumbnails
if arguments["thumbnail"] or arguments["thumbnail_only"]:
(
download_status,
download_message_thumbnail,
) = self.download_image_thumbnail(
object["image_thumbnail_url"],
main_directory,
dir_name,
return_image_name,
arguments["socket_timeout"],
arguments["no_download"],
arguments["save_source"],
object["image_source"],
arguments["ignore_urls"],
)
count += 1
object["image_filename"] = return_image_name
# Append all the links in the list named 'Links'
items.append(object)
abs_path.append(absolute_path)
else:
errorCount += 1
# delay param
if arguments["delay"]:
time.sleep(int(arguments["delay"]))
i += 1
if count < limit:
LOGS.info(
"\n\nUnfortunately all "
+ str(limit)
+ " could not be downloaded because some images were not downloadable. "
+ str(count - 1)
+ " is all we got for this search filter!"
)
return items, errorCount, abs_path
# Bulk Download
async def download(self, arguments):
paths_agg = {}
# for input coming from other python files
if __name__ != "__main__":
# if the calling file contains config_file param
if "config_file" in arguments:
records = []
json_file = json.load(open(arguments["config_file"]))
for item in json_file["Records"]:
arguments = {}
for i in args_list:
arguments[i] = None
for key, value in item.items():
arguments[key] = value
records.append(arguments)
total_errors = 0
for rec in records:
paths, errors = await self.download_executor(rec)
for i in paths:
paths_agg[i] = paths[i]
total_errors += errors
return paths_agg, total_errors
# if the calling file contains params directly
paths, errors = await self.download_executor(arguments)
for i in paths:
paths_agg[i] = paths[i]
return paths_agg, errors
# for input coming from CLI
paths, errors = await self.download_executor(arguments)
for i in paths:
paths_agg[i] = paths[i]
return paths_agg, errors
async def download_executor(self, arguments):
paths = {}
errorCount = None
for arg in args_list:
if arg not in arguments:
arguments[arg] = None
# Initialization and Validation of user arguments
if arguments["keywords"]:
search_keyword = [str(item) for item in arguments["keywords"].split(",")]
if arguments["keywords_from_file"]:
search_keyword = self.keywords_from_file(arguments["keywords_from_file"])
# both time and time range should not be allowed in the same query
if arguments["time"] and arguments["time_range"]:
raise ValueError(
"Either time or time range should be used in a query. Both cannot be used at the same time."
)
# both time and time range should not be allowed in the same query
if arguments["size"] and arguments["exact_size"]:
raise ValueError(
'Either "size" or "exact_size" should be used in a query. Both cannot be used at the same time.'
)
# both image directory and no image directory should not be allowed in
# the same query
if arguments["image_directory"] and arguments["no_directory"]:
raise ValueError(
"You can either specify image directory or specify no image directory, not both!"
)
# Additional words added to keywords
if arguments["suffix_keywords"]:
suffix_keywords = [
" " + str(sk) for sk in arguments["suffix_keywords"].split(",")
]
else:
suffix_keywords = [""]
# Additional words added to keywords
if arguments["prefix_keywords"]:
prefix_keywords = [
str(sk) + " " for sk in arguments["prefix_keywords"].split(",")
]
else:
prefix_keywords = [""]
# Setting limit on number of images to be downloaded
limit = int(arguments["limit"]) if arguments["limit"] else 100
if arguments["url"]:
current_time = str(datetime.datetime.now()).split(".")[0]
search_keyword = [current_time.replace(":", "_")]
if arguments["similar_images"]:
current_time = str(datetime.datetime.now()).split(".")[0]
search_keyword = [current_time.replace(":", "_")]
# If single_image or url argument not present then keywords is
# mandatory argument
if (
arguments["single_image"] is None
and arguments["url"] is None
and arguments["similar_images"] is None
and arguments["keywords"] is None
and arguments["keywords_from_file"] is None
):
LOGS.info(
"-------------------------------\n"
"Uh oh! Keywords is a required argument \n\n"
"Please refer to the documentation on guide to writing queries \n"
"https://github.com/hardikvasa/google-images-download#examples"
"\n\nexiting!\n"
"-------------------------------"
)
sys.exit()
# If this argument is present, set the custom output directory
main_directory = arguments["output_directory"] or "downloads"
# Proxy settings
if arguments["proxy"]:
os.environ["http_proxy"] = arguments["proxy"]
os.environ["https_proxy"] = arguments["proxy"]
# Initialization Complete
total_errors = 0
for pky in prefix_keywords: # 1.for every prefix keywords
for sky in suffix_keywords: # 2.for every suffix keywords
for ii, e in enumerate(search_keyword): # 3.for every main keyword
iteration = (
"\n"
+ "Item no.: "
+ str(ii + 1)
+ " -->"
+ " Item name = "
+ (pky)
+ (e)
+ (sky)
)
search_term = pky + e + sky
if arguments["image_directory"]:
dir_name = arguments["image_directory"]
elif arguments["no_directory"]:
dir_name = ""
else:
dir_name = search_term + (
"-" + arguments["color"] if arguments["color"] else ""
) # sub-directory
if not arguments["no_download"]:
self.create_directories(
main_directory,
dir_name,
arguments["thumbnail"],
arguments["thumbnail_only"],
) # create directories in OS
params = self.build_url_parameters(
arguments
) # building URL with params
url = self.build_search_url(
search_term,
params,
arguments["url"],
arguments["similar_images"],
arguments["specific_site"],
arguments["safe_search"],
) # building main search url
if limit < 101:
# download page
raw_html = await self.download_page(url)
else:
raw_html = self.download_extended_page(
url, arguments["chromedriver"]
)
items, errorCount, abs_path = self._get_all_items(
raw_html, main_directory, dir_name, limit, arguments
) # get all image items and download images
paths[pky + e + sky] = abs_path
# dumps into a json file
if arguments["extract_metadata"]:
try:
if not os.path.exists("logs"):
os.makedirs("logs")
except OSError as e:
LOGS.exception(e)
with open("logs/" + e + ".json", "w") as json_file:
json.dump(items, json_file, indent=4, sort_keys=True)
# Related images
if arguments["related_images"]:
tabs = self.get_all_tabs(raw_html)
for key, value in tabs.items():
final_search_term = search_term + " - " + key
if limit < 101:
new_raw_html = await self.download_page(
value
) # download page
else:
new_raw_html = self.download_extended_page(
value, arguments["chromedriver"]
)
self.create_directories(
main_directory,
final_search_term,
arguments["thumbnail"],
arguments["thumbnail_only"],
)
self._get_all_items(
new_raw_html,
main_directory,
search_term + " - " + key,
limit,
arguments,
)
total_errors += errorCount
return paths, total_errors