Spaces:
Running
Running
File size: 3,039 Bytes
3e8a166 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
import aiohttp
import json
import asyncio
# --- کد از فایل send_requests.py ---
async def send_request(endpoint, token, method, data=None):
panel_address = token["panel_address"]
token_type = token["token_type"]
access_token = token["access_token"]
request_address = f"{panel_address}/api/{endpoint}"
headers = {
"Content-Type": "application/json",
"Authorization": f"{token_type} {access_token}",
}
async with aiohttp.request(
method=method,
url=request_address,
headers=headers,
data=json.dumps(data) if data else None,
raise_for_status=True
) as response:
result = await response.json()
return result
# --- کد از فایل user.py ---
class User:
def __init__(self, username, **kwargs):
self.username = username
self.proxies = kwargs.get('proxies', {})
self.inbounds = kwargs.get('inbounds', {})
self.data_limit = kwargs.get('data_limit', 0)
self.data_limit_reset_strategy = kwargs.get('data_limit_reset_strategy', "no_reset")
self.status = kwargs.get('status', "")
self.expire = kwargs.get('expire', 0)
self.used_traffic = kwargs.get('used_traffic', 0)
self.lifetime_used_traffic = kwargs.get('lifetime_used_traffic', 0)
self.created_at = kwargs.get('created_at', "")
self.links = kwargs.get('links', [])
self.subscription_url = kwargs.get('subscription_url', "")
self.excluded_inbounds = kwargs.get('excluded_inbounds', {})
self.full_name = kwargs.get('full_name', "")
class UserMethods:
async def get_all_users(self, token: dict, username=None, status=None):
endpoint = "users"
if username:
endpoint += f"?username={username}"
if status:
if "?" in endpoint:
endpoint += f"&status={status}"
else:
endpoint += f"?status={status}"
request = await send_request(endpoint, token, "get")
user_list = [User(**user) for user in request["users"]]
return user_list
# --- کد از فایل admin.py ---
class Admin:
def __init__(self, username: str, password: str, panel_address: str):
self.username = username
self.password = password
self.panel_address = panel_address
async def get_token(self):
try:
async with aiohttp.request(
"post",
url=f"{self.panel_address}/api/admin/token",
data={"username": self.username, "password": self.password},
raise_for_status=True
) as response:
result = await response.json()
result["panel_address"] = self.panel_address
return result
except aiohttp.exceptions.RequestException as ex:
print(f"Request Exception: {ex}")
return None
except json.JSONDecodeError as ex:
print(f"JSON Decode Error: {ex}")
return None
|