Spaces:
Running
Running
import aiohttp | |
import json | |
import asyncio | |
# --- کد از فایل send_requests.py --- | |
async def send_request(endpoint, token, method, data=None): | |
panel_address = token["panel_address"] | |
token_type = token["token_type"] | |
access_token = token["access_token"] | |
request_address = f"{panel_address}/api/{endpoint}" | |
headers = { | |
"Content-Type": "application/json", | |
"Authorization": f"{token_type} {access_token}", | |
} | |
async with aiohttp.request( | |
method=method, | |
url=request_address, | |
headers=headers, | |
data=json.dumps(data) if data else None, | |
raise_for_status=True | |
) as response: | |
result = await response.json() | |
return result | |
# --- کد از فایل user.py --- | |
class User: | |
def __init__(self, username, **kwargs): | |
self.username = username | |
self.proxies = kwargs.get('proxies', {}) | |
self.inbounds = kwargs.get('inbounds', {}) | |
self.data_limit = kwargs.get('data_limit', 0) | |
self.data_limit_reset_strategy = kwargs.get('data_limit_reset_strategy', "no_reset") | |
self.status = kwargs.get('status', "") | |
self.expire = kwargs.get('expire', 0) | |
self.used_traffic = kwargs.get('used_traffic', 0) | |
self.lifetime_used_traffic = kwargs.get('lifetime_used_traffic', 0) | |
self.created_at = kwargs.get('created_at', "") | |
self.links = kwargs.get('links', []) | |
self.subscription_url = kwargs.get('subscription_url', "") | |
self.excluded_inbounds = kwargs.get('excluded_inbounds', {}) | |
self.full_name = kwargs.get('full_name', "") | |
class UserMethods: | |
async def get_all_users(self, token: dict, username=None, status=None): | |
endpoint = "users" | |
if username: | |
endpoint += f"?username={username}" | |
if status: | |
if "?" in endpoint: | |
endpoint += f"&status={status}" | |
else: | |
endpoint += f"?status={status}" | |
request = await send_request(endpoint, token, "get") | |
user_list = [User(**user) for user in request["users"]] | |
return user_list | |
# --- کد از فایل admin.py --- | |
class Admin: | |
def __init__(self, username: str, password: str, panel_address: str): | |
self.username = username | |
self.password = password | |
self.panel_address = panel_address | |
async def get_token(self): | |
try: | |
async with aiohttp.request( | |
"post", | |
url=f"{self.panel_address}/api/admin/token", | |
data={"username": self.username, "password": self.password}, | |
raise_for_status=True | |
) as response: | |
result = await response.json() | |
result["panel_address"] = self.panel_address | |
return result | |
except aiohttp.exceptions.RequestException as ex: | |
print(f"Request Exception: {ex}") | |
return None | |
except json.JSONDecodeError as ex: | |
print(f"JSON Decode Error: {ex}") | |
return None | |