imseldrith's picture
Upload 4 files
8e0c2ab
# ===================================================================== #
# Copyright (c) 2022 Itz-fork #
# #
# This program is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. #
# See the GNU General Public License for more details. #
# #
# You should have received a copy of the GNU General Public License #
# along with this program. If not, see <http://www.gnu.org/licenses/> #
# ===================================================================== #
# Credits: SpEcHiDe's AnyDL-Bot for progress_for_pyrogram, humanbytes and TimeFormatter
from re import sub
from time import time
from math import floor
from json import loads
from os import path, walk
from functools import partial
from subprocess import Popen, PIPE
from asyncio import get_running_loop
async def progress_for_pyrogram(current, total, ud_type, message, start):
now = time()
diff = now - start
speed = current / diff
if total:
if round(diff % 10.00) == 0 or current == total:
percentage = current * 100 / total
elapsed_time = round(diff) * 1000
estimated_total_time = elapsed_time + \
round((total - current) / speed) * 1000
elapsed_time = TimeFormatter(elapsed_time)
estimated_total_time = TimeFormatter(estimated_total_time)
progress = "[{0}{1}] \n**πŸ“Š Progress**: {2}%\n".format(
# Filled
''.join(["β—‰" for i in range(floor(percentage / 5))]),
# Empty
''.join(["β—Ž" for i in range(20 - floor(percentage / 5))]),
round(percentage, 2))
tmp = progress + "{0} of {1}\n**πŸƒ Speed:** {2}/s\n**⏰ ETA:** {3}\n".format(
humanbytes(current),
humanbytes(total),
humanbytes(speed),
estimated_total_time if estimated_total_time != '' else "0 s"
)
try:
await message.edit("{}\n {} \n\n**Powered by @NexaBotsUpdates**".format(ud_type, tmp))
except:
pass
else:
tmp = "**πŸ“Š Progress:** {0} of {1}\n**πŸƒ Speed:** {2}/s\n**⏰ ETA:** {3}\n".format(
humanbytes(current),
"?",
humanbytes(speed),
"unknown"
)
try:
await message.edit("{}\n {} \n\n**Powered by @NexaBotsUpdates**".format(ud_type, tmp))
except:
pass
def humanbytes(size: int):
if not size:
return "N/A"
power = 2**10
n = 0
Dic_powerN = {0: " ", 1: "Ki", 2: "Mi", 3: "Gi", 4: "Ti"}
while size > power:
size /= power
n += 1
return f"{round(size, 2)} {Dic_powerN[n]}B"
def TimeFormatter(milliseconds: int) -> str:
seconds, milliseconds = divmod(int(milliseconds), 1000)
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
days, hours = divmod(hours, 24)
tmp = ((str(days) + "d, ") if days else "") + \
((str(hours) + "h, ") if hours else "") + \
((str(minutes) + "m, ") if minutes else "") + \
((str(seconds) + "s, ") if seconds else "") + \
((str(milliseconds) + "ms, ") if milliseconds else "")
return tmp[:-2] if tmp[:-2] else "0ms"
def run_shell_cmds(command):
"""
Execute shell commands and returns the output
"""
run = Popen(command, stdout=PIPE,
stderr=PIPE, shell=True)
shell_ouput = run.stdout.read()[:-1].decode("utf-8")
return shell_ouput
async def run_cmds_on_cr(func, *args, **kwargs):
"""
Execute blocking functions asynchronously
"""
loop = get_running_loop()
return await loop.run_in_executor(
None,
partial(func, *args, **kwargs)
)
async def get_files(fpath: str, filter_fn=None):
"""
Returns files in a folder
Parameters:
- `fpath` - Path to the folder
- `filter_fn` - Function to filter elements in the array
"""
path_list = [val for sublist in [
[path.join(i[0], j) for j in i[2]] for i in walk(fpath)] for val in sublist]
if filter_fn:
path_list = list(filter(filter_fn, path_list))
return sorted(path_list)
def read_json_sync(name: str, as_items: bool = False) -> dict:
"""
Reads json file and returns a dict
Parameters:
- `name` - File path
- `as_items` - Pass "True" if you want to return items of the dict
"""
with open(name) as fs:
return loads(fs.read()).items() if as_items else loads(fs.read())
async def rm_mark_chars(text: str):
"""
Remove basic markdown characters
Parameters:
- `text` - Text
"""
return sub("[*`_]", "", text)