Spaces:
Paused
Paused
# Ultroid - UserBot | |
# Copyright (C) 2021-2022 TeamUltroid | |
# | |
# This file is a part of < https://github.com/TeamUltroid/Ultroid/ > | |
# PLease read the GNU Affero General Public License in | |
# <https://github.com/TeamUltroid/pyUltroid/blob/main/LICENSE>. | |
import time | |
from io import FileIO | |
from logging import WARNING | |
from mimetypes import guess_type | |
from apiclient.http import LOGGER, MediaFileUpload, MediaIoBaseDownload | |
from googleapiclient.discovery import build, logger | |
from httplib2 import Http | |
from oauth2client.client import OOB_CALLBACK_URN, OAuth2WebServerFlow | |
from oauth2client.client import logger as _logger | |
from oauth2client.file import Storage | |
from .. import udB | |
from .helper import humanbytes, time_formatter | |
for log in [LOGGER, logger, _logger]: | |
log.setLevel(WARNING) | |
class GDriveManager: | |
def __init__(self): | |
self._flow = {} | |
self.gdrive_creds = { | |
"oauth_scope": [ | |
"https://www.googleapis.com/auth/drive", | |
"https://www.googleapis.com/auth/drive.file", | |
"https://www.googleapis.com/auth/drive.metadata", | |
], | |
"dir_mimetype": "application/vnd.google-apps.folder", | |
"redirect_uri": OOB_CALLBACK_URN, | |
} | |
self.auth_token = udB.get_key("GDRIVE_AUTH_TOKEN") | |
self.folder_id = udB.get_key("GDRIVE_FOLDER_ID") | |
self.token_file = "resources/auth/gdrive_creds.json" | |
def _create_download_link(fileId: str): | |
return f"https://drive.google.com/uc?id={fileId}&export=download" | |
def _create_folder_link(folderId: str): | |
return f"https://drive.google.com/folderview?id={folderId}" | |
def _create_token_file(self, code: str = None): | |
if code and self._flow: | |
_auth_flow = self._flow["_"] | |
credentials = _auth_flow.step2_exchange(code) | |
Storage(self.token_file).put(credentials) | |
return udB.set_key("GDRIVE_AUTH_TOKEN", str(open(self.token_file).read())) | |
try: | |
_auth_flow = OAuth2WebServerFlow( | |
udB.get_key("GDRIVE_CLIENT_ID") | |
or "458306970678-jhfbv6o5sf1ar63o1ohp4c0grblp8qba.apps.googleusercontent.com", | |
udB.get_key("GDRIVE_CLIENT_SECRET") | |
or "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", | |
self.gdrive_creds["oauth_scope"], | |
redirect_uri=self.gdrive_creds["redirect_uri"], | |
) | |
self._flow["_"] = _auth_flow | |
except KeyError: | |
return "Fill GDRIVE client credentials" | |
return _auth_flow.step1_get_authorize_url() | |
def _http(self): | |
storage = Storage(self.token_file) | |
creds = storage.get() | |
http = Http() | |
http.redirect_codes = http.redirect_codes - {308} | |
creds.refresh(http) | |
return creds.authorize(http) | |
def _build(self): | |
return build("drive", "v2", http=self._http, cache_discovery=False) | |
def _set_permissions(self, fileId: str): | |
_permissions = { | |
"role": "reader", | |
"type": "anyone", | |
"value": None, | |
"withLink": True, | |
} | |
self._build.permissions().insert( | |
fileId=fileId, body=_permissions, supportsAllDrives=True | |
).execute(http=self._http) | |
async def _upload_file( | |
self, event, path: str, filename: str = None, folder_id: str = None | |
): | |
last_txt = "" | |
if not filename: | |
filename = path.split("/")[-1] | |
mime_type = guess_type(path)[0] or "application/octet-stream" | |
media_body = MediaFileUpload(path, mimetype=mime_type, resumable=True) | |
body = { | |
"title": filename, | |
"description": "Uploaded using Ultroid Userbot", | |
"mimeType": mime_type, | |
} | |
if folder_id: | |
body["parents"] = [{"id": folder_id}] | |
elif self.folder_id: | |
body["parents"] = [{"id": self.folder_id}] | |
upload = self._build.files().insert( | |
body=body, media_body=media_body, supportsAllDrives=True | |
) | |
start = time.time() | |
_status = None | |
while not _status: | |
_progress, _status = upload.next_chunk(num_retries=3) | |
if _progress: | |
diff = time.time() - start | |
completed = _progress.resumable_progress | |
total_size = _progress.total_size | |
percentage = round((completed / total_size) * 100, 2) | |
speed = round(completed / diff, 2) | |
eta = round((total_size - completed) / speed, 2) * 1000 | |
crnt_txt = ( | |
f"`Uploading {filename} to GDrive...\n\n" | |
+ f"Status: {humanbytes(completed)}/{humanbytes(total_size)} »» {percentage}%\n" | |
+ f"Speed: {humanbytes(speed)}/s\n" | |
+ f"ETA: {time_formatter(eta)}`" | |
) | |
if round((diff % 10.00) == 0) or last_txt != crnt_txt: | |
await event.edit(crnt_txt) | |
last_txt = crnt_txt | |
fileId = _status.get("id") | |
try: | |
self._set_permissions(fileId=fileId) | |
except BaseException: | |
pass | |
_url = self._build.files().get(fileId=fileId, supportsAllDrives=True).execute() | |
return _url.get("webContentLink") | |
async def _download_file(self, event, fileId: str, filename: str = None): | |
last_txt = "" | |
if fileId.startswith("http"): | |
if "=download" in fileId: | |
fileId = fileId.split("=")[1][:-7] | |
elif "/view" in fileId: | |
fileId = fileId.split("/")[::-1][1] | |
try: | |
if not filename: | |
filename = ( | |
self._build.files() | |
.get(fileId=fileId, supportsAllDrives=True) | |
.execute()["title"] | |
) | |
downloader = self._build.files().get_media( | |
fileId=fileId, supportsAllDrives=True | |
) | |
except Exception as ex: | |
return False, str(ex) | |
with FileIO(filename, "wb") as file: | |
start = time.time() | |
download = MediaIoBaseDownload(file, downloader) | |
_status = None | |
while not _status: | |
_progress, _status = download.next_chunk(num_retries=3) | |
if _progress: | |
diff = time.time() - start | |
completed = _progress.resumable_progress | |
total_size = _progress.total_size | |
percentage = round((completed / total_size) * 100, 2) | |
speed = round(completed / diff, 2) | |
eta = round((total_size - completed) / speed, 2) * 1000 | |
crnt_txt = ( | |
f"`Downloading {filename} from GDrive...\n\n" | |
+ f"Status: {humanbytes(completed)}/{humanbytes(total_size)} »» {percentage}%\n" | |
+ f"Speed: {humanbytes(speed)}/s\n" | |
+ f"ETA: {time_formatter(eta)}`" | |
) | |
if round((diff % 10.00) == 0) or last_txt != crnt_txt: | |
await event.edit(crnt_txt) | |
last_txt = crnt_txt | |
return True, filename | |
def _list_files(self): | |
_items = ( | |
self._build.files() | |
.list( | |
supportsTeamDrives=True, | |
includeTeamDriveItems=True, | |
spaces="drive", | |
fields="nextPageToken, items(id, title, mimeType)", | |
pageToken=None, | |
) | |
.execute() | |
) | |
_files = {} | |
for files in _items["items"]: | |
if files["mimeType"] == self.gdrive_creds["dir_mimetype"]: | |
_files[self._create_folder_link(files["id"])] = files["title"] | |
else: | |
_files[self._create_download_link(files["id"])] = files["title"] | |
return _files | |
def create_directory(self, directory): | |
body = { | |
"title": directory, | |
"mimeType": self.gdrive_creds["dir_mimetype"], | |
} | |
if self.folder_id: | |
body["parents"] = [{"id": self.folder_id}] | |
file = self._build.files().insert(body=body, supportsAllDrives=True).execute() | |
fileId = file.get("id") | |
self._set_permissions(fileId=fileId) | |
return fileId | |
def search(self, title): | |
query = f"title contains '{title}'" | |
if self.folder_id: | |
query = f"'{self.folder_id}' in parents and (title contains '{title}')" | |
_items = ( | |
self._build.files() | |
.list( | |
supportsTeamDrives=True, | |
includeTeamDriveItems=True, | |
q=query, | |
spaces="drive", | |
fields="nextPageToken, items(id, title, mimeType)", | |
pageToken=None, | |
) | |
.execute() | |
) | |
_files = {} | |
for files in _items["items"]: | |
_files[self._create_download_link(files["id"])] = files["title"] | |
return _files | |