File size: 5,053 Bytes
8e0c2ab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# ===================================================================== #
#                      Copyright (c) 2022 Itz-fork                      #
#                                                                       #
# This program is distributed in the hope that it will be useful,       #
# but WITHOUT ANY WARRANTY; without even the implied warranty of        #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.                  #
# See the GNU General Public License for more details.                  #
#                                                                       #
# You should have received a copy of the GNU General Public License     #
# along with this program. If not, see <http://www.gnu.org/licenses/>   #
# ===================================================================== #

# Credits: SpEcHiDe's AnyDL-Bot for progress_for_pyrogram, humanbytes and TimeFormatter

from re import sub
from time import time
from math import floor
from json import loads
from os import path, walk
from functools import partial
from subprocess import Popen, PIPE
from asyncio import get_running_loop


async def progress_for_pyrogram(current, total, ud_type, message, start):
    now = time()
    diff = now - start
    speed = current / diff
    if total:
        if round(diff % 10.00) == 0 or current == total:
            percentage = current * 100 / total
            elapsed_time = round(diff) * 1000
            estimated_total_time = elapsed_time + \
                round((total - current) / speed) * 1000

            elapsed_time = TimeFormatter(elapsed_time)
            estimated_total_time = TimeFormatter(estimated_total_time)

            progress = "[{0}{1}] \n**πŸ“Š Progress**: {2}%\n".format(
                # Filled
                ''.join(["β—‰" for i in range(floor(percentage / 5))]),
                # Empty
                ''.join(["β—Ž" for i in range(20 - floor(percentage / 5))]),
                round(percentage, 2))

            tmp = progress + "{0} of {1}\n**πŸƒ Speed:** {2}/s\n**⏰ ETA:** {3}\n".format(
                humanbytes(current),
                humanbytes(total),
                humanbytes(speed),
                estimated_total_time if estimated_total_time != '' else "0 s"
            )
            try:
                await message.edit("{}\n {} \n\n**Powered by @NexaBotsUpdates**".format(ud_type, tmp))
            except:
                pass
    else:
        tmp = "**πŸ“Š Progress:** {0} of {1}\n**πŸƒ Speed:** {2}/s\n**⏰ ETA:** {3}\n".format(
            humanbytes(current),
            "?",
            humanbytes(speed),
            "unknown"
        )
        try:
            await message.edit("{}\n {} \n\n**Powered by @NexaBotsUpdates**".format(ud_type, tmp))
        except:
            pass


def humanbytes(size: int):
    if not size:
        return "N/A"
    power = 2**10
    n = 0
    Dic_powerN = {0: " ", 1: "Ki", 2: "Mi", 3: "Gi", 4: "Ti"}
    while size > power:
        size /= power
        n += 1
    return f"{round(size, 2)} {Dic_powerN[n]}B"


def TimeFormatter(milliseconds: int) -> str:
    seconds, milliseconds = divmod(int(milliseconds), 1000)
    minutes, seconds = divmod(seconds, 60)
    hours, minutes = divmod(minutes, 60)
    days, hours = divmod(hours, 24)
    tmp = ((str(days) + "d, ") if days else "") + \
        ((str(hours) + "h, ") if hours else "") + \
        ((str(minutes) + "m, ") if minutes else "") + \
        ((str(seconds) + "s, ") if seconds else "") + \
        ((str(milliseconds) + "ms, ") if milliseconds else "")
    return tmp[:-2] if tmp[:-2] else "0ms"


def run_shell_cmds(command):
    """
    Execute shell commands and returns the output
    """
    run = Popen(command, stdout=PIPE,
                stderr=PIPE, shell=True)
    shell_ouput = run.stdout.read()[:-1].decode("utf-8")
    return shell_ouput


async def run_cmds_on_cr(func, *args, **kwargs):
    """
    Execute blocking functions asynchronously
    """
    loop = get_running_loop()
    return await loop.run_in_executor(
        None,
        partial(func, *args, **kwargs)
    )


async def get_files(fpath: str, filter_fn=None):
    """
    Returns files in a folder

    Parameters:

        - `fpath` - Path to the folder
        - `filter_fn` - Function to filter elements in the array
    """
    path_list = [val for sublist in [
        [path.join(i[0], j) for j in i[2]] for i in walk(fpath)] for val in sublist]
    if filter_fn:
        path_list = list(filter(filter_fn, path_list))
    return sorted(path_list)


def read_json_sync(name: str, as_items: bool = False) -> dict:
    """
    Reads json file and returns a dict

    Parameters:

        - `name` - File path
        - `as_items` - Pass "True" if you want to return items of the dict
    """
    with open(name) as fs:
        return loads(fs.read()).items() if as_items else loads(fs.read())


async def rm_mark_chars(text: str):
    """
    Remove basic markdown characters

    Parameters:

        - `text` - Text
    """
    return sub("[*`_]", "", text)