File size: 13,307 Bytes
10d17d3 7205096 10d17d3 6b86152 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 ea4e8b4 10d17d3 7205096 10d17d3 ea4e8b4 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 6b86152 7062b9f 6b86152 7062b9f 7205096 10d17d3 ea4e8b4 7062b9f 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 7205096 10d17d3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 |
import os
import re
import asyncio
import sys
import importlib.util
from pyrogram import Client, filters
from pyrogram.enums import ParseMode
import google.generativeai as genai
API_ID = os.environ.get("API_ID")
API_HASH = os.environ.get("API_HASH")
BOT_TOKEN = os.environ.get("BOT_TOKEN")
GEMINI_API_KEY = os.environ.get("GEMINI_API_KEY")
if not BOT_TOKEN:
raise RuntimeError("BOT_TOKEN environment variable not set!")
if not GEMINI_API_KEY:
raise RuntimeError("GEMINI_API_KEY environment variable not set!")
genai.configure(api_key=GEMINI_API_KEY)
model = genai.GenerativeModel("gemini-2.5-flash")
bot = Client("JarvisBot", api_id=API_ID, api_hash=API_HASH, bot_token=BOT_TOKEN)
os.makedirs("modules", exist_ok=True)
def load_modules(folder="modules"):
for file in os.listdir(folder):
if file.endswith(".py"):
path = os.path.join(folder, file)
name = file[:-3]
spec = importlib.util.spec_from_file_location(name, path)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
load_modules()
def extract_module_name(code: str) -> str:
match = re.search(r"def\\s+([a-zA-Z_][a-zA-Z0-9_]*)", code)
return match.group(1).lower() if match else f"mod_{os.urandom(2).hex()}"
def extract_commands(code: str) -> list:
return re.findall(r'filters\.command\(["\'](\w+)["\']', code)
def determine_intent(text: str) -> str:
lowered = text.lower()
if any(x in lowered for x in ["create", "make", "build"]):
return "CREATE"
elif any(x in lowered for x in ["edit", "modify"]):
return "EDIT"
elif "recode" in lowered:
return "RECODE"
else:
return "UNSURE"
def clean_code_blocks(code: str) -> str:
if code.startswith("```python"):
code = code[9:]
if code.startswith("```"):
code = code[3:]
if code.endswith("```"):
code = code[:-3]
return code.strip()
def extract_module_name_from_description(description: str) -> str:
import re
stopwords = {
'jarvis', 'make', 'create', 'build', 'generate', 'a', 'an', 'the', 'please', 'module',
'for', 'me', 'to', 'with', 'add', 'new', 'of', 'system', 'bot', 'command', 'that', 'and',
'in', 'on', 'by', 'as', 'is', 'it', 'my', 'this', 'do', 'can', 'you', 'i', 'want', 'need',
'from', 'like', 'using', 'feature', 'function', 'implement', 'write', 'code', 'file',
'python', 'telegram', 'pyrogram', 'handler', 'example', 'mod', 'mod_', 'particular', 'user'
}
desc = re.sub(r'[^a-zA-Z0-9_\s]', ' ', description.lower())
words = desc.split()
filtered_words = []
for word in words:
if word not in stopwords and len(word) > 2:
filtered_words.append(word)
if filtered_words:
name_words = filtered_words[:3]
name = '_'.join(name_words)
else:
meaningful_words = [w for w in words if len(w) > 3 and w not in stopwords]
if meaningful_words:
name = meaningful_words[0]
else:
clean_desc = re.sub(r'[^a-zA-Z0-9]', '', description.lower())
name = clean_desc[:10] if clean_desc else f"mod_{os.urandom(2).hex()}"
name = re.sub(r'_+', '_', name).strip('_')
name = name.lower()
if not name:
name = f"mod_{os.urandom(2).hex()}"
return name
async def restart_bot(chat_id):
await bot.send_message(chat_id, "Restarting to apply new module...")
await bot.stop()
os.execl(sys.executable, sys.executable, *sys.argv)
os._exit(0)
@bot.on_message(filters.private & filters.text)
async def jarvis_trigger(client, message):
if message.from_user.id != 7361622601:
return await message.reply("Access denied. Only the owner can use this bot.")
text = message.text.strip()
if not text.lower().startswith("jarvis"):
return
description = text[6:].strip()
if not description:
return await message.reply("Hello, what would you like me to do?")
intent = determine_intent(description)
progress_msg = await message.reply(f"Acknowledged.\nDetermining intent...\nIntent: {intent}\nTask: `{description}`", parse_mode=ParseMode.MARKDOWN)
if intent == "EDIT":
match = re.match(r"edit\s+([\w/\\.]+)\s+(.*)", description, re.IGNORECASE)
if match:
file_path = match.group(1)
edit_instruction = match.group(2)
if not os.path.exists(file_path):
await progress_msg.edit(f"File `{file_path}` not found.")
return
with open(file_path, "r", encoding="utf-8") as f:
current_content = f.read()
edit_prompt = (
f"You are an expert Python developer. Here is the current content of `{file_path}`:\n"
f"{current_content}\n"
f"Please edit this file to: {edit_instruction}.\n"
f"Output ONLY the new Python code, no explanations or markdown."
)
try:
response = model.generate_content(edit_prompt)
new_code = clean_code_blocks(response.text.strip())
if not new_code or "def " not in new_code:
await progress_msg.edit(f"Edit failed: No valid code returned.")
return
with open(file_path, "w", encoding="utf-8") as f:
f.write(new_code)
if file_path.startswith("modules/") and file_path.endswith(".py"):
mod_name = os.path.basename(file_path)[:-3]
try:
spec = importlib.util.spec_from_file_location(mod_name, file_path)
mod = importlib.util.module_from_spec(spec)
mod.bot = bot
spec.loader.exec_module(mod)
except Exception as test_error:
await progress_msg.edit(f"Edit applied, but module reload failed: {test_error}")
return
await progress_msg.edit(f"Edit applied to `{file_path}`. Restarting bot...")
await asyncio.sleep(2)
asyncio.create_task(restart_bot(message.chat.id))
return
except Exception as e:
await progress_msg.edit(f"Edit failed: `{str(e)[:100]}...`")
return
else:
await progress_msg.edit("Could not parse edit command. Use: 'edit <file> <instruction>'")
return
success = False
code = ""
last_error = None
for attempt in range(1, 6):
await progress_msg.edit(f"Attempt {attempt}/5: Thinking and generating code...\n- Executing prerequisite shell commands...")
try:
prompt = (
f"Write a full Pyrogram Telegram bot module that implements:\n"
f"{description}.\n\n"
f"IMPORTANT RULES:\n"
f"1. Use 'bot' variable (not 'Client') - it will be injected\n"
f"2. Include commands using @bot.on_message(filters.command(...))\n"
f"3. Import only what you need from pyrogram\n and use 'from ..t1 import bot'\n 'try: from ..t1 import bot except (ImportError, ValueError): \n from __main__ import bot' \n"
f"4. Don't create a new Client instance\n"
f"5. Make functions async when using bot methods\n\n"
f"Output ONLY Python code, no explanations or markdown."
)
if attempt > 1 and last_error is not None:
prompt += f"\n\nNote: Previous attempt failed with error: {str(last_error)}. Fix that issue this time."
response = model.generate_content(prompt)
code = clean_code_blocks(response.text.strip())
if "def " not in code or "@bot" not in code:
await progress_msg.edit(f"Attempt {attempt}: Invalid function or handler.")
continue
import re
file_path_match = re.search(r"(modules/[\w\-]+\.py)", description)
if file_path_match:
mod_path = file_path_match.group(1)
mod_name = os.path.basename(mod_path)[:-3]
else:
mod_name = extract_module_name_from_description(description)
mod_path = f"modules/{mod_name}.py"
if os.path.exists(mod_path):
with open(mod_path, "r", encoding="utf-8") as f:
current_content = f.read()
edit_prompt = (
f"You are an expert Python developer. Here is the current content of `{mod_path}`:\n"
f"{current_content}\n"
f"Please update this file to: {description}.\n"
f"Output ONLY the new Python code, no explanations or markdown."
)
response = model.generate_content(edit_prompt)
code = clean_code_blocks(response.text.strip())
with open(mod_path, "w", encoding="utf-8") as f:
f.write(code)
await progress_msg.edit(f"Attempt {attempt}/5: Thinking and generating code...\n- Executing prerequisite shell commands...\n- Testing generated code...")
try:
spec = importlib.util.spec_from_file_location(mod_name, mod_path)
mod = importlib.util.module_from_spec(spec)
mod.bot = bot
spec.loader.exec_module(mod)
except Exception as test_error:
raise RuntimeError(f"Testing error: {test_error}")
commands = extract_commands(code)
commands_str = ', '.join(f'/{cmd}' for cmd in commands) if commands else 'No commands found.'
try:
explain_prompt = (
f"Explain in 2-3 sentences what this Pyrogram Telegram bot module does, given the following code and the user's request.\n"
f"User request: {description}\n"
f"Module code:\n{code}\n"
f"Be concise and clear."
)
explain_response = model.generate_content(explain_prompt)
explanation = explain_response.text.strip()
except Exception as exp_explain:
explanation = f"Could not generate explanation: {exp_explain}"
await progress_msg.edit(
f"Module created successfully!\n\n"
f"Commands: {commands_str}\n\n"
f"What it does:\n{explanation}\n\n"
f"Restarting bot to load the new module...",
parse_mode=ParseMode.MARKDOWN
)
await asyncio.sleep(2)
asyncio.create_task(restart_bot(message.chat.id))
success = True
break
except Exception as e:
last_error = e
try:
await progress_msg.edit(f"Attempt {attempt} Error: {str(e)[:100]}...")
except:
await message.reply(f"Attempt {attempt} failed. Check logs.")
if not success:
await progress_msg.edit("All 5 attempts failed. Please try again with a simpler instruction.")
@bot.on_message(filters.command("modules") & filters.private)
async def list_modules(client, message):
if message.from_user.id != 7361622601:
return await message.reply("Access denied. Only the owner can use this bot.")
modules = [f for f in os.listdir("modules") if f.endswith(".py")]
if modules:
module_list = "\n".join([f"• {m[:-3]}" for m in modules])
await message.reply(f"Loaded Modules:\n{module_list}", parse_mode=ParseMode.MARKDOWN)
else:
await message.reply("No modules found.", parse_mode=ParseMode.MARKDOWN)
@bot.on_message(filters.command("delete") & filters.private)
async def delete_module(client, message):
if message.from_user.id != 7361622601:
return await message.reply("Access denied. Only the owner can use this bot.")
if len(message.command) < 2:
return await message.reply("Specify module name.\n\nExample: /delete calculator", parse_mode=ParseMode.MARKDOWN)
mod_name = message.command[1].lower()
mod_path = f"modules/{mod_name}.py"
if os.path.exists(mod_path):
os.remove(mod_path)
await message.reply(f"Deleted {mod_name}.py\nRestart required to take effect.", parse_mode=ParseMode.MARKDOWN)
else:
await message.reply(f"Module {mod_name}.py not found.", parse_mode=ParseMode.MARKDOWN)
@bot.on_message(filters.regex(r"(?i)what( can)? i do\??") & filters.private)
async def what_to_do(_, message):
if message.from_user.id != 7361622601:
return await message.reply("Access denied. Only the owner can use this bot.")
await message.reply(
"Jarvis Assistant\n\n"
"I can generate custom bot modules for you!\n\n"
"Commands:\n"
"jarvis make a calculator - Natural language trigger\n"
"/modules - List all modules\n"
"/delete <name> - Delete a module\n\n"
"Examples:\n"
"jarvis build a reminder system\n"
"jarvis create a dice game\n"
"jarvis weather checker",
parse_mode=ParseMode.MARKDOWN
)
print("Starting Jarvis Bot...")
load_modules()
print("Bot is ready!")
bot.run()
|