oneapi / OneApi /__init__.py
taslim19
Fix bot database file issue and clean up Dockerfile
dc005eb
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo import MongoClient
import logging
from variables import *
import os
from quart import Quart
from pyrogram import *
from variables import *
import asyncio
logging.basicConfig(
format="[OneApi] %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()],
level=logging.INFO,
)
MONGO_DB_URI = os.environ.get("MONGO_DB_URL") or VAR_MONGO_DB_URI
API_ID = os.environ.get("API_ID") or VAR_API_ID
API_HASH = os.environ.get("API_HASH") or VAR_API_HASH
HANDLER = ["/"]
TOKEN = os.environ.get("TOKEN") or VAR_TOKEN
MY_VERSION = 0.1
DEVS = os.environ.get("DEVS") or VAR_DEVS
# _______________________________________
if not API_ID or not API_HASH or not TOKEN or not MONGO_DB_URI:
raise ValueError("Bro thought he can run anything lol, i mean you forgot some vars put on variables.py")
exit()
# _-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_-_+_|
if len(TOKEN) > 50:
bot = Client("OneApi", session_string=TOKEN, api_id=API_ID, api_hash=API_HASH, plugins=dict(root="OneApi/pyro"))
else:
bot = Client("OneApi", bot_token=TOKEN, api_id=API_ID, api_hash=API_HASH, plugins=dict(root="OneApi/pyro"), workdir="/tmp")
# Bot command handlers
@bot.on_message(filters.command("start"))
async def start_command(client, message):
user_id = message.from_user.id
user_name = message.from_user.first_name
welcome_text = f"""
πŸ€– **Welcome to OneApi Cloud Hosting!**
Hi {user_name}! I'm your cloud hosting assistant.
**Available Commands:**
β€’ `/start` - Show this help message
β€’ `/info` - Get your account info
β€’ `/projects` - List your projects
β€’ `/create <name> <plan>` - Create new project
β€’ `/connect` - Connect GitHub repository
**Plans Available:**
β€’ Free - 0 coins
β€’ Basic - 99 coins
β€’ Advance - 199 coins
β€’ Pro - 269 coins
**Your User ID:** `{user_id}`
Start by creating a project with `/create myapp free`
"""
await message.reply_text(welcome_text)
@bot.on_message(filters.command("info"))
async def info_command(client, message):
user_id = message.from_user.id
from .database import user
user_db = user()
user_info = await user_db.find(user_id)
if user_info:
info_text = f"""
πŸ“Š **Account Information**
**Name:** {user_info.get('name', 'Not set')}
**Coins:** {user_info.get('coins', 0)}
**Projects:** {len(user_info.get('projects', []))}
**GitHub:** {user_info.get('git', 'Not connected')}
**User ID:** {user_id}
"""
else:
info_text = f"""
❌ **User Not Found**
You need to register first. Use the API to create your account:
```bash
curl -X POST https://your-api-url/create_user/ \\
-H "Content-Type: application/json" \\
-d '{{"user_id": {user_id}, "name": "Your Name"}}'
```
**Your User ID:** `{user_id}`
"""
await message.reply_text(info_text)
@bot.on_message(filters.command("projects"))
async def projects_command(client, message):
user_id = message.from_user.id
from .database import user
user_db = user()
projects = await user_db.get_projects(user_id)
if projects == "not exists":
await message.reply_text("❌ User not found. Please register first.")
return
elif projects == "projects not found":
await message.reply_text("πŸ“ No projects found. Create one with `/create myapp free`")
return
projects_text = "πŸ“ **Your Projects:**\n\n"
for i, project in enumerate(projects, 1):
projects_text += f"{i}. **{project.get('name')}** (ID: {project.get('project_id')})\n"
await message.reply_text(projects_text)
@bot.on_message(filters.command("create"))
async def create_project_command(client, message):
user_id = message.from_user.id
from .database import user
user_db = user()
try:
args = message.text.split()[1:]
if len(args) < 2:
await message.reply_text("❌ Usage: `/create <name> <plan>`\n\nExample: `/create myapp free`")
return
name = args[0]
plan = args[1]
result = await user_db.create_project(name, user_id, plan)
if result == "ok":
await message.reply_text(f"βœ… Project '{name}' created successfully with {plan} plan!")
elif result == "not exists":
await message.reply_text("❌ User not found. Please register first.")
elif result == "Name too short":
await message.reply_text("❌ Project name must be at least 4 characters long.")
elif result == "Name already used":
await message.reply_text("❌ A project with this name already exists.")
elif result == "Plan not found":
await message.reply_text("❌ Invalid plan. Available: free, basic, advance, pro")
elif result == "insufficient coins":
await message.reply_text("❌ Insufficient coins for this plan.")
else:
await message.reply_text(f"❌ Error: {result}")
except Exception as e:
await message.reply_text(f"❌ Error creating project: {str(e)}")
# Start the bot
async def start_bot():
try:
await bot.start()
logging.info("Telegram bot started successfully!")
except Exception as e:
logging.error(f"Failed to start bot: {e}")
# β€”β€”β€”β€” R U N β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”β€”
async def run(command):
try:
process = await asyncio.create_subprocess_shell(
command,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
start_new_session=True
)
stdout, stderr = await process.communicate()
if stdout:
return stdout.decode().strip()
if stderr:
return stderr.decode().strip()
except Exception as e:
logging.error(f"Failed to run command '{command}': {e}")
return False
# _______________________________________________________________
DATABASE = AsyncIOMotorClient(MONGO_DB_URI)["OneApi"]
app = Quart(__name__)