from motor.motor_asyncio import AsyncIOMotorClient from pymongo import MongoClient import logging from variables import * import os from quart import Quart from pyrogram import * from variables import * import asyncio logging.basicConfig( format="[OneApi] %(name)s - %(levelname)s - %(message)s", handlers=[logging.StreamHandler()], level=logging.INFO, ) MONGO_DB_URI = os.environ.get("MONGO_DB_URL") or VAR_MONGO_DB_URI API_ID = os.environ.get("API_ID") or VAR_API_ID API_HASH = os.environ.get("API_HASH") or VAR_API_HASH HANDLER = ["/"] TOKEN = os.environ.get("TOKEN") or VAR_TOKEN MY_VERSION = 0.1 DEVS = os.environ.get("DEVS") or VAR_DEVS # _______________________________________ if not API_ID or not API_HASH or not TOKEN or not MONGO_DB_URI: raise ValueError("Bro thought he can run anything lol, i mean you forgot some vars put on variables.py") exit() # _-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_| if len(TOKEN) > 50: bot = Client("OneApi", session_string=TOKEN, api_id=API_ID, api_hash=API_HASH, plugins=dict(root="OneApi/pyro")) else: bot = Client("OneApi", bot_token=TOKEN, api_id=API_ID, api_hash=API_HASH, plugins=dict(root="OneApi/pyro"), workdir="/tmp") # Bot command handlers @bot.on_message(filters.command("start")) async def start_command(client, message): user_id = message.from_user.id user_name = message.from_user.first_name welcome_text = f""" 🤖 **Welcome to OneApi Cloud Hosting!** Hi {user_name}! I'm your cloud hosting assistant. **Available Commands:** • `/start` - Show this help message • `/info` - Get your account info • `/projects` - List your projects • `/create ` - Create new project • `/connect` - Connect GitHub repository **Plans Available:** • Free - 0 coins • Basic - 99 coins • Advance - 199 coins • Pro - 269 coins **Your User ID:** `{user_id}` Start by creating a project with `/create myapp free` """ await message.reply_text(welcome_text) @bot.on_message(filters.command("info")) async def info_command(client, message): user_id = message.from_user.id from .database import user user_db = user() user_info = await user_db.find(user_id) if user_info: info_text = f""" 📊 **Account Information** **Name:** {user_info.get('name', 'Not set')} **Coins:** {user_info.get('coins', 0)} **Projects:** {len(user_info.get('projects', []))} **GitHub:** {user_info.get('git', 'Not connected')} **User ID:** {user_id} """ else: info_text = f""" ❌ **User Not Found** You need to register first. Use the API to create your account: ```bash curl -X POST https://your-api-url/create_user/ \\ -H "Content-Type: application/json" \\ -d '{{"user_id": {user_id}, "name": "Your Name"}}' ``` **Your User ID:** `{user_id}` """ await message.reply_text(info_text) @bot.on_message(filters.command("projects")) async def projects_command(client, message): user_id = message.from_user.id from .database import user user_db = user() projects = await user_db.get_projects(user_id) if projects == "not exists": await message.reply_text("❌ User not found. Please register first.") return elif projects == "projects not found": await message.reply_text("📁 No projects found. Create one with `/create myapp free`") return projects_text = "📁 **Your Projects:**\n\n" for i, project in enumerate(projects, 1): projects_text += f"{i}. **{project.get('name')}** (ID: {project.get('project_id')})\n" await message.reply_text(projects_text) @bot.on_message(filters.command("create")) async def create_project_command(client, message): user_id = message.from_user.id from .database import user user_db = user() try: args = message.text.split()[1:] if len(args) < 2: await message.reply_text("❌ Usage: `/create `\n\nExample: `/create myapp free`") return name = args[0] plan = args[1] result = await user_db.create_project(name, user_id, plan) if result == "ok": await message.reply_text(f"✅ Project '{name}' created successfully with {plan} plan!") elif result == "not exists": await message.reply_text("❌ User not found. Please register first.") elif result == "Name too short": await message.reply_text("❌ Project name must be at least 4 characters long.") elif result == "Name already used": await message.reply_text("❌ A project with this name already exists.") elif result == "Plan not found": await message.reply_text("❌ Invalid plan. Available: free, basic, advance, pro") elif result == "insufficient coins": await message.reply_text("❌ Insufficient coins for this plan.") else: await message.reply_text(f"❌ Error: {result}") except Exception as e: await message.reply_text(f"❌ Error creating project: {str(e)}") # Start the bot async def start_bot(): try: await bot.start() logging.info("Telegram bot started successfully!") except Exception as e: logging.error(f"Failed to start bot: {e}") # ———— R U N —————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————————— async def run(command): try: process = await asyncio.create_subprocess_shell( command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, start_new_session=True ) stdout, stderr = await process.communicate() if stdout: return stdout.decode().strip() if stderr: return stderr.decode().strip() except Exception as e: logging.error(f"Failed to run command '{command}': {e}") return False # _______________________________________________________________ DATABASE = AsyncIOMotorClient(MONGO_DB_URI)["OneApi"] app = Quart(__name__)