deepsite / management /marzban_client.py
coerxso's picture
Upload 73 files
3e8a166 verified
raw
history blame
3.04 kB
import aiohttp
import json
import asyncio
# --- کد از فایل send_requests.py ---
async def send_request(endpoint, token, method, data=None):
panel_address = token["panel_address"]
token_type = token["token_type"]
access_token = token["access_token"]
request_address = f"{panel_address}/api/{endpoint}"
headers = {
"Content-Type": "application/json",
"Authorization": f"{token_type} {access_token}",
}
async with aiohttp.request(
method=method,
url=request_address,
headers=headers,
data=json.dumps(data) if data else None,
raise_for_status=True
) as response:
result = await response.json()
return result
# --- کد از فایل user.py ---
class User:
def __init__(self, username, **kwargs):
self.username = username
self.proxies = kwargs.get('proxies', {})
self.inbounds = kwargs.get('inbounds', {})
self.data_limit = kwargs.get('data_limit', 0)
self.data_limit_reset_strategy = kwargs.get('data_limit_reset_strategy', "no_reset")
self.status = kwargs.get('status', "")
self.expire = kwargs.get('expire', 0)
self.used_traffic = kwargs.get('used_traffic', 0)
self.lifetime_used_traffic = kwargs.get('lifetime_used_traffic', 0)
self.created_at = kwargs.get('created_at', "")
self.links = kwargs.get('links', [])
self.subscription_url = kwargs.get('subscription_url', "")
self.excluded_inbounds = kwargs.get('excluded_inbounds', {})
self.full_name = kwargs.get('full_name', "")
class UserMethods:
async def get_all_users(self, token: dict, username=None, status=None):
endpoint = "users"
if username:
endpoint += f"?username={username}"
if status:
if "?" in endpoint:
endpoint += f"&status={status}"
else:
endpoint += f"?status={status}"
request = await send_request(endpoint, token, "get")
user_list = [User(**user) for user in request["users"]]
return user_list
# --- کد از فایل admin.py ---
class Admin:
def __init__(self, username: str, password: str, panel_address: str):
self.username = username
self.password = password
self.panel_address = panel_address
async def get_token(self):
try:
async with aiohttp.request(
"post",
url=f"{self.panel_address}/api/admin/token",
data={"username": self.username, "password": self.password},
raise_for_status=True
) as response:
result = await response.json()
result["panel_address"] = self.panel_address
return result
except aiohttp.exceptions.RequestException as ex:
print(f"Request Exception: {ex}")
return None
except json.JSONDecodeError as ex:
print(f"JSON Decode Error: {ex}")
return None