Create speedtest.py
Browse files- Akeno/plugins/speedtest.py +167 -0
Akeno/plugins/speedtest.py
ADDED
@@ -0,0 +1,167 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
1 |
+
# Ported From DarkCobra Originally By UNIBORG
|
2 |
+
# Ultroid - UserBot
|
3 |
+
# converter telethon to pyrogram by @xtdevs
|
4 |
+
|
5 |
+
from datetime import datetime as dt
|
6 |
+
from speedtest import Speedtest
|
7 |
+
import speedtest
|
8 |
+
from pyrogram import *
|
9 |
+
from pyrogram.types import *
|
10 |
+
from pyrogram import Client as ren
|
11 |
+
|
12 |
+
from Akeno.utils.handler import *
|
13 |
+
from Akeno.utils.logger import LOGS
|
14 |
+
from config import CMD_HANDLER
|
15 |
+
|
16 |
+
temxt = (
|
17 |
+
"**Akeno Ubot Speedtest completed in {0} seconds.**\n\n" \
|
18 |
+
"**Download:** `{1}` \n" \
|
19 |
+
"**Upload:** `{2}` \n" \
|
20 |
+
"**Ping:** `{3} ms` \n" \
|
21 |
+
"**Internet Provider:** `{4} ({5})` \n"
|
22 |
+
)
|
23 |
+
|
24 |
+
def humanbytes(size: float) -> str:
|
25 |
+
""" humanize size """
|
26 |
+
if not size:
|
27 |
+
return "0 B"
|
28 |
+
power = 1024
|
29 |
+
t_n = 0
|
30 |
+
power_dict = {
|
31 |
+
0: '',
|
32 |
+
1: 'Ki',
|
33 |
+
2: 'Mi',
|
34 |
+
3: 'Gi',
|
35 |
+
4: 'Ti',
|
36 |
+
5: 'Pi',
|
37 |
+
6: 'Ei',
|
38 |
+
7: 'Zi',
|
39 |
+
8: 'Yi'}
|
40 |
+
while size > power:
|
41 |
+
size /= power
|
42 |
+
t_n += 1
|
43 |
+
return "{:.2f} {}B".format(size, power_dict[t_n]) # pylint: disable=consider-using-f-string
|
44 |
+
|
45 |
+
|
46 |
+
def convert_from_bytes(size):
|
47 |
+
power = 2 ** 10
|
48 |
+
n = 0
|
49 |
+
units = {0: "", 1: "Kilobytes/s", 2: "Megabytes/s", 3: "Gigabytes/s", 4: "Terabytes/s"}
|
50 |
+
while size > power:
|
51 |
+
size /= power
|
52 |
+
n += 1
|
53 |
+
return f"{round(size, 2)} {units[n]}"
|
54 |
+
|
55 |
+
@Akeno(
|
56 |
+
~filters.scheduled
|
57 |
+
& command(["speedtest"], CMD_HANDLER)
|
58 |
+
& filters.me
|
59 |
+
& ~filters.forwarded
|
60 |
+
)
|
61 |
+
async def speedtest_test(client: Client, message: Message):
|
62 |
+
args = message.text.split()
|
63 |
+
xx = await message.reply_text("`Calculating your Akeno's Server Speed ...`")
|
64 |
+
start = dt.now()
|
65 |
+
s = Speedtest()
|
66 |
+
s.get_best_server()
|
67 |
+
s.download()
|
68 |
+
s.upload() # dchehe
|
69 |
+
end = dt.now()
|
70 |
+
ms = (end - start).seconds
|
71 |
+
response = s.results.dict()
|
72 |
+
|
73 |
+
download_speed = response.get("download")
|
74 |
+
upload_speed = response.get("upload")
|
75 |
+
ping_time = response.get("ping")
|
76 |
+
client_infos = response.get("client")
|
77 |
+
i_s_p = client_infos.get("isp")
|
78 |
+
i_s_p_rating = client_infos.get("isprating")
|
79 |
+
|
80 |
+
if args and args == "text":
|
81 |
+
await xx.edit_text(temxt.format(
|
82 |
+
ms,
|
83 |
+
convert_from_bytes(download_speed),
|
84 |
+
convert_from_bytes(upload_speed),
|
85 |
+
ping_time,
|
86 |
+
i_s_p,
|
87 |
+
i_s_p_rating,
|
88 |
+
))
|
89 |
+
else:
|
90 |
+
try:
|
91 |
+
speedtest_image = s.results.share()
|
92 |
+
await client.send_photo(
|
93 |
+
message.chat.id,
|
94 |
+
photo=speedtest_image, # heeehe
|
95 |
+
caption="**SpeedTest** completed in {} seconds".format(ms),
|
96 |
+
reply_to_message_id=message.id
|
97 |
+
)
|
98 |
+
await xx.delete()
|
99 |
+
except Exception as exc: # dc
|
100 |
+
xx2 = temxt.format(
|
101 |
+
ms,
|
102 |
+
convert_from_bytes(download_speed),
|
103 |
+
convert_from_bytes(upload_speed),
|
104 |
+
ping_time,
|
105 |
+
i_s_p,
|
106 |
+
i_s_p_rating,
|
107 |
+
)
|
108 |
+
return await xx.edit_text(
|
109 |
+
f"{xx2} \n**Exception:** `{exc}`"
|
110 |
+
)
|
111 |
+
|
112 |
+
# Copyright (C) 2020-2022 by UsergeTeam@Github, < https://github.com/UsergeTeam >.
|
113 |
+
|
114 |
+
@Akeno(
|
115 |
+
~filters.scheduled
|
116 |
+
& filters.command(["speedtest2"], CMD_HANDLER)
|
117 |
+
& filters.me
|
118 |
+
& ~filters.forwarded
|
119 |
+
)
|
120 |
+
async def speedtst_2(client: Client, message: Message):
|
121 |
+
pro = await message.reply_text("`Running speed test . . .`")
|
122 |
+
try:
|
123 |
+
test = speedtest.Speedtest()
|
124 |
+
test.get_best_server()
|
125 |
+
await pro.edit_text("`Performing download test . . .`")
|
126 |
+
test.download()
|
127 |
+
await pro.edit_text("`Performing upload test . . . .`")
|
128 |
+
test.upload()
|
129 |
+
try:
|
130 |
+
test.results.share()
|
131 |
+
except speedtest.ShareResultsConnectFailure:
|
132 |
+
pass
|
133 |
+
result = test.results.dict()
|
134 |
+
except Exception as e:
|
135 |
+
await pro.edit_text(str(e))
|
136 |
+
return
|
137 |
+
output = f"""**--Started at {result['timestamp']}--
|
138 |
+
|
139 |
+
Client:
|
140 |
+
|
141 |
+
ISP: `{result['client']['isp']}`
|
142 |
+
Country: `{result['client']['country']}`
|
143 |
+
|
144 |
+
Server:
|
145 |
+
|
146 |
+
Name: `{result['server']['name']}`
|
147 |
+
Country: `{result['server']['country']}, {result['server']['cc']}`
|
148 |
+
Sponsor: `{result['server']['sponsor']}`
|
149 |
+
Latency: `{result['server']['latency']}`
|
150 |
+
|
151 |
+
Ping: `{result['ping']}`
|
152 |
+
Sent: `{humanbytes(result['bytes_sent'])}`
|
153 |
+
Received: `{humanbytes(result['bytes_received'])}`
|
154 |
+
Download: `{humanbytes(result['download'] / 8)}/s`
|
155 |
+
Upload: `{humanbytes(result['upload'] / 8)}/s`**"""
|
156 |
+
if result['share']:
|
157 |
+
await client.send_photo(
|
158 |
+
message.chat.id,
|
159 |
+
photo=result['share'],
|
160 |
+
caption=output
|
161 |
+
)
|
162 |
+
else:
|
163 |
+
await client.send_message(
|
164 |
+
message.chat.id,
|
165 |
+
text=output
|
166 |
+
)
|
167 |
+
await pro.delete()
|