|
""" |
|
MIT License |
|
|
|
Copyright (C) 2021 ROCKY4546 |
|
https://github.com/rocky4546 |
|
|
|
This file is part of Cabernet |
|
|
|
Permission is hereby granted, free of charge, to any person obtaining a copy of this software |
|
and associated documentation files (the "Software"), to deal in the Software without restriction, |
|
including without limitation the rights to use, copy, modify, merge, publish, distribute, |
|
sublicense, and/or sell copies of the Software, and to permit persons to whom the Software |
|
is furnished to do so, subject to the following conditions: |
|
|
|
The above copyright notice and this permission notice shall be included in all copies or |
|
substantial portions of the Software. |
|
""" |
|
|
|
import gzip |
|
import logging |
|
import os |
|
import pathlib |
|
import shutil |
|
import time |
|
import urllib.request |
|
import zipfile |
|
|
|
|
|
import lib.common.utils as utils |
|
from lib.common.decorators import handle_url_except |
|
|
|
|
|
class TMPMgmt: |
|
|
|
def __init__(self, _config): |
|
self.logger = logging.getLogger(__name__) |
|
self.config = _config |
|
|
|
@handle_url_except() |
|
def download_file(self, _url, _retries, _folder, _filename, _file_type): |
|
if _filename == None: |
|
_filename = '{}{}'.format(time.time(), _file_type) |
|
if _folder is None: |
|
save_path = pathlib.Path( |
|
self.config['paths']['tmp_dir']) \ |
|
.joinpath(_filename) |
|
else: |
|
save_path = pathlib.Path( |
|
self.config['paths']['tmp_dir']) \ |
|
.joinpath(_folder) \ |
|
.joinpath(_filename) |
|
buf_size = 2 * 16 * 16 * 1024 |
|
|
|
if not save_path.parent.is_dir(): |
|
save_path.parent.mkdir() |
|
h = {'User-agent': utils.DEFAULT_USER_AGENT} |
|
req = urllib.request.Request(_url, headers=h) |
|
with urllib.request.urlopen(req) as resp: |
|
with open(save_path, 'wb') as out_file: |
|
while True: |
|
chunk = resp.read(buf_size) |
|
if not chunk: |
|
break |
|
out_file.write(chunk) |
|
return save_path |
|
|
|
def extract_gzip(self, _in_filename): |
|
try: |
|
out_filename = _in_filename.with_suffix('') |
|
with gzip.open(_in_filename, 'rb') as f_in: |
|
with open(out_filename, 'wb') as f_out: |
|
shutil.copyfileobj(f_in, f_out) |
|
return out_filename |
|
except (gzip.BadGzipFile, FileNotFoundError) as ex: |
|
raise exceptions.CabernetException( |
|
'Unable to gunzip File, {} {}' \ |
|
.format(_in_filename, str(ex))) |
|
|
|
def extract_zip(self, _in_filename, outfile=None, is_single_file=False): |
|
try: |
|
if out_file is None: |
|
out_folder = os.path.dirname(_filename) |
|
with zipfile.ZipFile(_filename, 'r') as z: |
|
files = z.namelist() |
|
if is_single_file and len(files) > 1: |
|
raise exceptions.CabernetException( |
|
'Zip file contains more than one file, aborting, {}' \ |
|
.format(files)) |
|
top_folder = files[0] |
|
z.extractall(out_folder) |
|
return pathlib.Path(out_folder, top_folder) |
|
except (zipfile.BadZipFile, FileNotFoundError) as ex: |
|
raise exceptions.CabernetException( |
|
'Unable to unzip File, {} {}' \ |
|
.format(_filename, str(ex))) |
|
|
|
def cleanup_tmp(self, folder=None): |
|
self.logger.debug('Cleaning up tmp folder, subfolder {}'.format(folder)) |
|
if folder is None: |
|
dir = pathlib.Path(self.config['paths']['tmp_dir']) |
|
for files in os.listdir(dir): |
|
path = os.path.join(dir, files) |
|
try: |
|
shutil.rmtree(path) |
|
except OSError: |
|
os.remove(path) |
|
else: |
|
dir = pathlib.Path(self.config['paths']['tmp_dir']) \ |
|
.joinpath(folder) |
|
shutil.rmtree(dir) |
|
|