Spaces:
Paused
Paused
| #!/usr/bin/env python | |
| # -*- coding: utf-8 -*- | |
| # Copyright 2020-2023 (c) Randy W @xtdevs, @xtsea | |
| # | |
| # from : https://github.com/TeamKillerX | |
| # Channel : @RendyProjects | |
| # This program is free software: you can redistribute it and/or modify | |
| # it under the terms of the GNU Affero General Public License as published by | |
| # the Free Software Foundation, either version 3 of the License, or | |
| # (at your option) any later version. | |
| # | |
| # This program is distributed in the hope that it will be useful, | |
| # but WITHOUT ANY WARRANTY; without even the implied warranty of | |
| # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
| # GNU Affero General Public License for more details. | |
| # | |
| # You should have received a copy of the GNU Affero General Public License | |
| # along with this program. If not, see <https://www.gnu.org/licenses/>. | |
| import base64 | |
| import json | |
| import logging | |
| import os | |
| import random | |
| import re | |
| from base64 import b64decode | |
| from base64 import b64decode as kc | |
| from datetime import datetime as dt | |
| from io import BytesIO | |
| from typing import * | |
| from typing import Union | |
| import g4f | |
| from g4f.client import Client as BingClient | |
| from g4f.cookies import set_cookies | |
| from g4f.Provider import BingCreateImages, OpenaiChat, Gemini | |
| import requests | |
| from bardapi import Bard | |
| from bs4 import BeautifulSoup | |
| from dotenv import load_dotenv | |
| from fastapi import Depends, FastAPI, Header, HTTPException, Query, Request | |
| from fastapi.responses import JSONResponse, StreamingResponse | |
| from gpytranslate import SyncTranslator | |
| from httpx import AsyncClient | |
| from pymongo import MongoClient | |
| from RyuzakiLib.hackertools.chatgpt import RendyDevChat | |
| from RyuzakiLib.hackertools.gemini import GeminiLatest | |
| from RyuzakiLib.mental import BadWordsList | |
| from serpapi import GoogleSearch | |
| from models import * | |
| logging.basicConfig(level=logging.ERROR) | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| MONGO_URL = os.environ["MONGO_URL"] | |
| load_dotenv() | |
| SOURCE_UNSPLASH_URL = os.environ["SOURCE_UNSPLASH_URL"] | |
| SOURCE_OCR_URL = os.environ["SOURCE_OCR_URL"] | |
| SOURCE_ALPHA_URL = os.environ["SOURCE_ALPHA_URL"] | |
| SOURCE_DALLE3XL_URL = os.environ["SOURCE_DALLE3XL_URL"] | |
| SOURCE_PROTEUSV0_2_URL = os.environ["SOURCE_PROTEUSV0_2_URL"] | |
| SOURCE_WAIFU_URL = "https://api.waifu.pics" | |
| SOURCE_TIKTOK_WTF_URL = os.environ["SOURCE_TIKTOK_WTF_URL"] | |
| SOURCE_TIKTOK_TECH_URL = os.environ["SOURCE_TIKTOK_TECH_URL"] | |
| SOURCE_ASSISTANT_GOOGLE_AI = os.environ["SOURCE_ASSISTANT_GOOGLE_AI"] | |
| SOURCE_OPENDALLE_URL = os.environ["SOURCE_OPENDALLE_URL"] | |
| SOURCE_OPENAI_ACCESS_URL = os.environ["SOURCE_OPENAI_ACCESS_URL"] | |
| DEVELOPER_ID = os.environ["DEVELOPER_ID"] | |
| # api keys | |
| REVERSE_IMAGE_API = os.environ["REVERSE_IMAGE_API"] | |
| OCR_API_KEY = os.environ["OCR_API_KEY"] | |
| ONLY_DEVELOPER_API_KEYS = os.environ["ONLY_DEVELOPER_API_KEYS"] | |
| HUGGING_TOKEN = random.choice([os.getenv("HUGGINGTOKEN_1"), os.getenv("HUGGINGTOKEN_2"), os.getenv("HUGGINGTOKEN_3")]) | |
| ASSISTANT_GOOGLE_API_KEYS = os.environ["ASSISTANT_GOOGLE_API_KEYS"] | |
| COOKIE_BARD_TOKEN = os.environ["COOKIE_BARD_TOKEN"] | |
| # unlocks | |
| ORACLE_TOKEN = os.environ["ORACLE_TOKEN"] | |
| TruAI = os.environ["TruAI"] | |
| client_mongo = MongoClient(MONGO_URL) | |
| db = client_mongo["tiktokbot"] | |
| collection = db["users"] | |
| description = """ | |
| ~ Developed written and powered by | |
| - Ryuzaki Library: [Library Here](https://github.com/TeamKillerX/RyuzakiLib) | |
| """ | |
| app = FastAPI( | |
| title="UFoP-API", | |
| description=description, | |
| version="0.1.0", | |
| terms_of_service="Use It Only For Personal Project Else I Need To Delete The Api", | |
| contact={ | |
| "name": "πΚΚβΦπ", | |
| "url": "https://t.me/UFoPInfo", | |
| }, | |
| docs_url="/", | |
| ) | |
| trans = SyncTranslator() | |
| BingImages = BingClient() | |
| timeout = 100 | |
| contact_support = """ | |
| We are aware that this service is currently offline. This seems to be caused by the API | |
| We are investigating and doing our best to get things back online as soon as possible. | |
| Thank you for your patience | |
| ~ Contact Support @SoulOfSukuna | |
| """ | |
| internal_error = """ | |
| There has been an Internal error. We are aware of this error and notice that it can be | |
| caused by your search terms being to explict, too confusing, or it can be caused by the API. | |
| Please modify your search terms and/or try again later thank you for your understanding. | |
| ~ πΚΚβΦπ Team | |
| """ | |
| def get_all_api_keys(): | |
| user = collection.find({}) | |
| api_keys = [] | |
| for x in user: | |
| api_key = x.get("ryuzaki_api_key") | |
| if api_key: | |
| api_keys.append(api_key) | |
| return api_keys | |
| def validate_api_key(api_key: str = Header(...)): | |
| USERS_API_KEYS = get_all_api_keys() | |
| if api_key not in USERS_API_KEYS: | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| def validate_api_key_only_devs(api_key: str = Header(...)): | |
| if api_key not in ONLY_DEVELOPER_API_KEYS: | |
| raise HTTPException(status_code=401, detail="Invalid API key") | |
| RAMDOM_STATUS = [ | |
| "Spammer", | |
| "Wanted", | |
| "Scammer", | |
| "Rogue_Agent", | |
| "PornBot_Prolly", | |
| "Fugitive", | |
| "SIMP", | |
| ] | |
| def remove_sibyl_system_banned(user_id): | |
| update_doc = { | |
| "sibyl_ban": None, | |
| "reason_sibyl": None, | |
| "is_banned_sibly": None, | |
| "date_joined_sib": None, | |
| "sibyl_userid": None, | |
| } | |
| return collection.update_one( | |
| {"user_id": user_id}, {"$unset": update_doc}, upsert=True | |
| ) | |
| def new_sibyl_system_banned(user_id, name, reason, date_joined): | |
| update_doc = { | |
| "sibyl_ban": name, | |
| "reason_sibyl": reason, | |
| "is_banned_sibly": True, | |
| "date_joined_sib": date_joined, | |
| "sibyl_userid": user_id, | |
| } | |
| return collection.update_one( | |
| {"user_id": user_id}, {"$set": update_doc}, upsert=True | |
| ) | |
| def cybersecuritydb(user_id, mongo_url): | |
| update_doc = {"mongodb": mongo_url} | |
| return collection.update_one( | |
| {"user_id": user_id}, {"$set": update_doc}, upsert=True | |
| ) | |
| def get_sibyl_system_banned(user_id): | |
| user = collection.find_one({"user_id": user_id}) | |
| if user: | |
| sibyl_name = user.get("sibyl_ban") | |
| reason = user.get("reason_sibyl") | |
| is_banned = user.get("is_banned_sibly") | |
| date_joined = user.get("date_joined_sib") | |
| sibyl_user_id = user.get("sibyl_userid") | |
| return sibyl_name, reason, is_banned, date_joined, sibyl_user_id | |
| else: | |
| return None, None, False, None, None | |
| def get_all_banned(): | |
| banned_users = [] | |
| users = collection.find({}) | |
| for user_id in users: | |
| reason = user_id.get("reason_sibyl") | |
| user_id = user_id.get("sibyl_userid") | |
| banned_users.append({"user_id": user_id, "reason": reason}) | |
| return banned_users | |
| def get_translate( | |
| item: TranslateCustom, | |
| ): | |
| try: | |
| source = trans.detect(item.text) | |
| translation = trans(item.text, sourcelang=source, targetlang=item.setlang) | |
| return SuccessResponse( | |
| status="True", | |
| randydev={ | |
| "translation": translation.text, | |
| "translation_original": item.text | |
| } | |
| ) | |
| except: | |
| return SuccessResponse( | |
| status="False", | |
| randydev={"message": contact_support}) | |
| def blacklist_words(): | |
| try: | |
| BLACKLIST_WORDS = BadWordsList() | |
| results_all = BLACKLIST_WORDS.banned_by_google( | |
| file_txt="banned_by_google.txt", storage=True | |
| ) | |
| return {"status": "true", "results": results_all} | |
| except Exception as e: | |
| return {"status": "false", "message": f"Internal server error: {str(e)}"} | |
| def sibyl_get_all_banlist(): | |
| banned_users = get_all_banned() | |
| return {"status": "True", "sukuna": {"results": banned_users}} | |
| def sibyl_system_delete( | |
| item: SibylSystemDel, api_key: None = Depends(validate_api_key_only_devs) | |
| ): | |
| try: | |
| _, _, _, _, sibyl_user_id = get_sibyl_system_banned(item.user_id) | |
| if sibyl_user_id: | |
| remove_sibyl_system_banned(item.user_id) | |
| return SuccessResponse( | |
| status="True", | |
| randydev={ | |
| "message": f"Successfully removed {item.user_id} from the Sibyl ban list" | |
| }, | |
| ) | |
| else: | |
| return SuccessResponse( | |
| status="False", randydev={"message": "Not Found UserID"} | |
| ) | |
| except Exception as e: | |
| return ErrorStatus(status="false", message=f"Internal server error: {str(e)}") | |
| def sibyl_system_ban( | |
| item: SibylSystemBan, api_key: None = Depends(validate_api_key_only_devs) | |
| ): | |
| if item.user_id == DEVELOPER_ID: | |
| return {"status": "false", "message": "Only Developer"} | |
| try: | |
| date_joined = str(dt.now()) | |
| sibyl_ban = random.choice(RAMDOM_STATUS) | |
| _, _, is_banned, _, sibyl_user_id = get_sibyl_system_banned(item.user_id) | |
| if sibyl_user_id is not None and is_banned: | |
| return SuccessResponse( | |
| status="False", randydev={"message": "User is already banned"} | |
| ) | |
| new_sibyl_system_banned(item.user_id, sibyl_ban, item.reason, date_joined) | |
| return SuccessResponse( | |
| status="True", | |
| randydev={ | |
| "user_id": item.user_id, | |
| "sibyl_name": sibyl_ban, | |
| "reason": item.reason, | |
| "date_joined": date_joined, | |
| "message": f"Successfully banned {item.user_id} from the Sibyl ban list.", | |
| }, | |
| ) | |
| except Exception as e: | |
| return ErrorStatus(status="false", message=f"Internal server error: {str(e)}") | |
| def sibyl_system( | |
| user_id: int = Query(..., description="User ID in query parameter"), | |
| api_key: None = Depends(validate_api_key), | |
| ): | |
| result = get_sibyl_system_banned(user_id) | |
| if result is not None: | |
| sibyl_name, reason, is_banned, date_joined, sibyl_user_id = result | |
| return { | |
| "status": "true", | |
| "sukuna": { | |
| "sibyl_name": sibyl_name, | |
| "reason": reason, | |
| "is_banned": is_banned, | |
| "date_joined": date_joined, | |
| "sibyl_user_id": sibyl_user_id, | |
| }, | |
| } | |
| else: | |
| return {"status": "false", "message": "Not Found User"} | |
| async def gemini_oracle(item: GeminiOracle): | |
| if item.is_multi_chat: | |
| selected_api_key = ASSISTANT_GOOGLE_API_KEYS or item.gemini_api_key | |
| oracle_base = ORACLE_TOKEN or item.oracle_base | |
| try: | |
| geni = GeminiLatest( | |
| api_key=selected_api_key, | |
| mongo_url=item.mongo_url, | |
| version=item.version, | |
| user_id=item.user_id, | |
| oracle_base=oracle_base, | |
| ) | |
| cybersecuritydb(item.user_id, item.mongo_url) | |
| if item.oracle_base == "Delete": | |
| clearedhistory = await geni._clear_oracle_history_in_db() | |
| return SuccessResponse( | |
| status="True", | |
| randydev={"message": f"Oracle Status: {clearedhistory}"}, | |
| ) | |
| else: | |
| answer, oracle_chat = await geni._GeminiLatest__get_response_oracle( | |
| item.query | |
| ) | |
| return SuccessResponse( | |
| status="True", randydev={"message": answer, "chat_history": oracle_chat} | |
| ) | |
| except Exception as excep: | |
| return SuccessResponse(status="False", randydev={"message": internal_error}) | |
| else: | |
| if item.is_login: | |
| token = item.bard_api_key | |
| else: | |
| token = COOKIE_BARD_TOKEN | |
| try: | |
| session = requests.Session() | |
| session.headers = { | |
| "Host": "bard.google.com", | |
| "X-Same-Domain": "1", | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36", | |
| "Content-Type": "application/x-www-form-urlencoded;charset=UTF-8", | |
| "Origin": "https://bard.google.com", | |
| "Referer": "https://bard.google.com/", | |
| } | |
| session.cookies.set("__Secure-1PSID", token) | |
| bard = Bard(token=token, session=session, timeout=30) | |
| bard.get_answer(owner_base)["content"] | |
| message = bard.get_answer(item.query)["content"] | |
| return SuccessResponse(status="True", randydev={"message": message}) | |
| except BaseException: | |
| return SuccessResponse( | |
| status="False", randydev={"message": contact_support} | |
| ) | |
| def v1beta3_google_ai(item: ChatgptCustom, api_key: None = Depends(validate_api_key)): | |
| api_url = f"{SOURCE_ASSISTANT_GOOGLE_AI}/models/text-bison-001:generateText?key={ASSISTANT_GOOGLE_API_KEYS}" | |
| try: | |
| headers = {"Content-Type": "application/json"} | |
| data = {"prompt": {"text": item.query}} | |
| response = requests.post(api_url, headers=headers, json=data) | |
| response_str = response.json() | |
| answer = response_str["candidates"] | |
| for results in answer: | |
| message = results.get("output") | |
| return SuccessResponse(status="True", randydev={"message": message}) | |
| except BaseException: | |
| return SuccessResponse(status="False", randydev={"message": internal_error}) | |
| def gemini_pro(item: GeminiPro): | |
| owner_base = TruAI | |
| if item.is_multi_chat: | |
| selected_api_key = ASSISTANT_GOOGLE_API_KEYS or item.gemini_api_key | |
| try: | |
| geni = GeminiLatest( | |
| api_key=selected_api_key, | |
| mongo_url=item.mongo_url, | |
| version=item.version, | |
| user_id=item.user_id, | |
| ) | |
| cybersecuritydb(item.user_id, item.mongo_url) | |
| answer, gemini_chat = geni._GeminiLatest__get_response_gemini(item.query) | |
| return SuccessResponse( | |
| status="True", randydev={"message": answer, "chat_history": gemini_chat} | |
| ) | |
| except Exception: | |
| return SuccessResponse( | |
| status="False", randydev={"message": contact_support} | |
| ) | |
| else: | |
| if item.is_login: | |
| token = item.bard_api_key | |
| else: | |
| token = COOKIE_BARD_TOKEN | |
| try: | |
| session = requests.Session() | |
| session.headers = { | |
| "Host": "bard.google.com", | |
| "X-Same-Domain": "1", | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36", | |
| "Content-Type": "application/x-www-form-urlencoded;charset=UTF-8", | |
| "Origin": "https://bard.google.com", | |
| "Referer": "https://bard.google.com/", | |
| } | |
| session.cookies.set("__Secure-1PSID", token) | |
| bard = Bard(token=token, session=session, timeout=30) | |
| bard.get_answer(owner_base)["content"] | |
| message = bard.get_answer(item.query)["content"] | |
| return SuccessResponse(status="True", randydev={"message": message}) | |
| except BaseException: | |
| return SuccessResponse( | |
| status="False", randydev={"message": contact_support} | |
| ) | |
| async def bing_dalle(item: BingDalle): | |
| try: | |
| set_cookies( | |
| ".bing.com", | |
| { | |
| "_U": item.cookie | |
| }, | |
| ) | |
| except requests.exceptions.RequestException: | |
| raise HTTPException(status_code=500, detail="Invalid cookie string, check your cookie string and try again") | |
| try: | |
| response = BingImages.images.generate( | |
| prompt=item.prompt, | |
| model=item.model, | |
| ) | |
| return {"status": "true", "sukuna": {"message": response.data[0].url}} | |
| except BaseException e: | |
| return {"status": "false", "message": f"Something went wrong: {e}"} | |
| def dalle_3xl(item: Dalle3XL, api_key: None = Depends(validate_api_key)): | |
| API_URL = SOURCE_DALLE3XL_URL | |
| try: | |
| payload = {"inputs": item.query} | |
| headers = { | |
| "Authorization": f"Bearer {HUGGING_TOKEN}", | |
| "Content-Type": "application/json", | |
| } | |
| response = requests.post( | |
| API_URL, headers=headers, json=payload, timeout=timeout | |
| ) | |
| response.raise_for_status() | |
| except requests.exceptions.RequestException: | |
| raise HTTPException(status_code=500, detail=internal_error) | |
| try: | |
| encoded_string = base64.b64encode(response.content).decode("utf-8") | |
| except Exception: | |
| raise HTTPException(status_code=500, detail=contact_support) | |
| if encoded_string: | |
| return SuccessResponse(status="True", randydev={"data": encoded_string}) | |
| else: | |
| return SuccessResponse(status="False", randydev={"data": contact_support}) | |
| def proteusv0_2(item: ProteusV02, api_key: None = Depends(validate_api_key)): | |
| API_URL = SOURCE_PROTEUSV0_2_URL | |
| try: | |
| payload = {"inputs": item.query} | |
| headers = { | |
| "Authorization": f"Bearer {HUGGING_TOKEN}", | |
| "Content-Type": "application/json", | |
| } | |
| response = requests.post( | |
| API_URL, headers=headers, json=payload, timeout=timeout | |
| ) | |
| response.raise_for_status() | |
| except requests.exceptions.RequestException: | |
| raise HTTPException(status_code=500, detail=internal_error) | |
| try: | |
| encoded_string = base64.b64encode(response.content).decode("utf-8") | |
| except Exception: | |
| raise HTTPException(status_code=500, detail=contact_support) | |
| if encoded_string: | |
| return SuccessResponse(status="True", randydev={"data": encoded_string}) | |
| else: | |
| return SuccessResponse(status="False", randydev={"data": contact_support}) | |
| async def get_image_unsplash(query: str, size: str = "500x500"): | |
| url = SOURCE_UNSPLASH_URL | |
| image_url = f"{url}/?{query}/{size}" | |
| try: | |
| response = requests.get(image_url) | |
| response.raise_for_status() | |
| except requests.exceptions.RequestException as e: | |
| raise HTTPException(status_code=500, detail=f"Error fetching image: {e}") | |
| return StreamingResponse(BytesIO(response.content), media_type="image/jpeg") | |
| def google_reverse( | |
| engine: str = "google_reverse_image", | |
| image_url: str = None, | |
| language: str = "en", | |
| google_lang: str = "us", | |
| api_key: None = Depends(validate_api_key), | |
| ): | |
| params = { | |
| "api_key": REVERSE_IMAGE_API, | |
| "engine": engine, | |
| "image_url": image_url, | |
| "hl": language, | |
| "gl": google_lang, | |
| } | |
| try: | |
| search = GoogleSearch(params) | |
| results = search.get_dict() | |
| link = results["search_metadata"]["google_reverse_image_url"] | |
| total_time_taken = results["search_metadata"]["total_time_taken"] | |
| create_at = results["search_metadata"]["created_at"] | |
| processed_at = results["search_metadata"]["processed_at"] | |
| return { | |
| "status": "true", | |
| "sukuna": { | |
| "link": link, | |
| "total_time_taken": total_time_taken, | |
| "create_at": create_at, | |
| "processed_at": processed_at, | |
| }, | |
| } | |
| except Exception as e: | |
| return {"status": "false", "message": f"Error {e}"} | |
| def ocr_space_url( | |
| url: str = Query(..., description="URL in query parameter"), | |
| overlay: bool = False, | |
| language: str = Query("eng", description="Language in query parameter"), | |
| api_key: None = Depends(validate_api_key), | |
| ): | |
| payload = { | |
| "url": url, | |
| "isOverlayRequired": overlay, | |
| "apikey": OCR_API_KEY, | |
| "language": language, | |
| } | |
| try: | |
| response = requests.post(SOURCE_OCR_URL, data=payload) | |
| response.raise_for_status() | |
| test_url = response.content.decode() | |
| except requests.exceptions.RequestException as e: | |
| return f"Error: {str(e)}" | |
| try: | |
| parsed_response = json.loads(test_url) | |
| if ( | |
| "ParsedResults" in parsed_response | |
| and len(parsed_response["ParsedResults"]) > 0 | |
| ): | |
| return { | |
| "status": "true", | |
| "sukuna": {"text": parsed_response["ParsedResults"][0]["ParsedText"]}, | |
| } | |
| else: | |
| return {"status": "false", "message": "Error response."} | |
| except (json.JSONDecodeError, KeyError): | |
| return "Error parsing the OCR response." | |
| def chatgpt4_support(query: str = None, api_key: None = Depends(validate_api_key)): | |
| try: | |
| response = g4f.ChatCompletion.create( | |
| model=g4f.models.gpt_4, | |
| messages=[{"role": "user", "content": query}], | |
| ) | |
| return {"status": "true", "sukuna": {"message": response}} | |
| except BaseException: | |
| return {"status": "false", "message": "Error response."} | |
| def chatgpt_model(query: str = None, model_id: int = 1, is_models: bool = True): | |
| try: | |
| response = RendyDevChat(query).get_response_model( | |
| model_id=model_id, is_models=is_models | |
| ) | |
| return {"status": "true", "sukuna": {"message": response}} | |
| except BaseException: | |
| return {"status": "false", "message": "Error response."} | |
| async def get_data(username): | |
| base_msg = "" | |
| async with AsyncClient() as gpx: | |
| req = (await gpx.get(f"https://api.github.com/users/{username}")).json() | |
| try: | |
| avatar = req["avatar_url"] | |
| twitter = req["twitter_username"] | |
| base_msg += "**β Gitub Information β** \n\n" | |
| base_msg += f"**Profile Url:** {req['html_url']} \n" | |
| base_msg += f"**Name:** `{req['name']}` \n" | |
| base_msg += f"**Username:** `{req['login']}` \n" | |
| base_msg += f"**User ID:** `{req['id']}` \n" | |
| base_msg += f"**Location:** `{req['location']}` \n" | |
| base_msg += f"**Company:** `{req['company']}` \n" | |
| base_msg += f"**Blog:** `{req['name']}` \n" | |
| base_msg += f"**Twitter:** `{f'https://twitter.com/{twitter}' if twitter else 'None'}` \n" | |
| base_msg += f"**Bio:** `{req['bio']}` \n" | |
| base_msg += f"**Public Repos:** `{req['public_repos']}` \n" | |
| base_msg += f"**Public Gists:** `{req['public_gists']}` \n" | |
| base_msg += f"**Followers:** `{req['followers']}` \n" | |
| base_msg += f"**Following:** `{req['following']}` \n" | |
| base_msg += f"**Created At:** `{req['created_at']}` \n" | |
| base_msg += f"**Update At:** `{req['updated_at']}` \n" | |
| return [base_msg, avatar] | |
| except Exception as e: | |
| base_msg += f"**An error occured while parsing the data!** \n\n**Traceback:** \n `{e}` \n\n`Make sure that you've sent the command with the correct username!`" | |
| return [base_msg, "https://telegra.ph//file/32f69c18190666ea96553.jpg"] | |
| async def github(username: str = None): | |
| try: | |
| details = await get_data(username) | |
| return { | |
| "status": "true", | |
| "sukuna": {"avatar": details[1], "results": details[0]}, | |
| } | |
| except BaseException: | |
| return {"status": "false", "message": "Error response."} | |
| def webshot( | |
| url: str = None, | |
| quality: str = "1920x1080", | |
| type_mine: str = "JPEG", | |
| pixels: str = "1024", | |
| cast: str = "Z100", | |
| ): | |
| try: | |
| required_url = ( | |
| f"https://mini.s-shot.ru/{quality}/{type_mine}/{pixels}/{cast}/?{url}" | |
| ) | |
| return {"status": "true", "sukuna": {"image_url": required_url}} | |
| except BaseException: | |
| return {"status": "false", "message": "Error response."} | |
| def chatbot( | |
| query: str = None, | |
| user_id: int = None, | |
| bot_name: str = None, | |
| bot_username: str = None, | |
| ): | |
| api_url = b64decode("aHR0cHM6Ly9hcGkuc2Fmb25lLmRldi9jaGF0Ym90").decode("utf-8") | |
| params = { | |
| "query": query, | |
| "user_id": user_id, | |
| "bot_name": bot_name, | |
| "bot_master": bot_username, | |
| } | |
| x = requests.get(f"{api_url}", params=params) | |
| if x.status_code != 200: | |
| return "Error api request" | |
| try: | |
| y = x.json() | |
| response = y["response"] | |
| return {"status": "true", "sukuna": {"message": response}} | |
| except BaseException: | |
| return {"status": "false", "message": "Error response."} | |
| def waifu_pics(types: str = "sfw", category: str = "neko"): | |
| waifu_api = f"{SOURCE_WAIFU_URL}/{types}" | |
| waifu_param = f"{waifu_api}/{category}" | |
| response = requests.get(waifu_param) | |
| if response.status_code != 200: | |
| return ( | |
| "Sorry, there was an error processing your request. Please try again later" | |
| ) | |
| data_waifu = response.json() | |
| try: | |
| waifu_image_url = data_waifu["url"] | |
| except Exception as e: | |
| return f"Error request {e}" | |
| if waifu_image_url: | |
| try: | |
| return {"status": "true", "sukuna": {"image_url": waifu_image_url}} | |
| except BaseException: | |
| return {"status": "false", "message": "Error response"} | |
| else: | |
| return {"status": "false", "message": "Error response."} | |
| def make_rayso( | |
| code=None, | |
| title: str = "Ryuzaki Dev", | |
| theme: str = None, | |
| setlang: str = "en", | |
| auto_translate: bool = None, | |
| ryuzaki_dark: bool = None, | |
| ): | |
| trans = SyncTranslator() | |
| api_url = b64decode("aHR0cHM6Ly9hcGkuc2Fmb25lLm1lL3JheXNv").decode("utf-8") | |
| if auto_translate: | |
| source = trans.detect(code) | |
| translation = trans(code, sourcelang=source, targetlang=setlang) | |
| code = translation.text | |
| else: | |
| code = code | |
| if ryuzaki_dark: | |
| x = requests.post( | |
| f"{api_url}", | |
| json={"code": code, "title": title, "theme": theme, "darkMode": True}, | |
| ) | |
| if x.status_code != 200: | |
| return "Error api Gay" | |
| data = x.json() | |
| try: | |
| image_data = base64.b64decode(data["image"]) | |
| return {"status": "true", "data": {"image": image_data}} | |
| except BaseException: | |
| return {"status": "false", "message": "Error response"} | |
| else: | |
| x = requests.post( | |
| f"{api_url}", | |
| json={"code": code, "title": title, "theme": theme, "darkMode": False}, | |
| ) | |
| if x.status_code != 200: | |
| return "Error api Gay" | |
| data = x.json() | |
| try: | |
| image_data = base64.b64decode(data["image"]) | |
| return {"status": "true", "data": {"image": image_data}} | |
| except BaseException: | |
| return {"status": "false", "message": "Error response"} | |
| def whois_ip_address(ip_address: str = None): | |
| apikey = kc("M0QwN0UyRUFBRjU1OTQwQUY0NDczNEMzRjJBQzdDMUE=").decode("utf-8") | |
| location_link = "https" | |
| location_api = "api.ip2location.io" | |
| location_key = f"key={apikey}" | |
| location_search = f"ip={ip_address}" | |
| location_param = ( | |
| f"{location_link}://{location_api}/?{location_key}&{location_search}" | |
| ) | |
| response = requests.get(location_param) | |
| if response.status_code != 200: | |
| return ( | |
| "Sorry, there was an error processing your request. Please try again later" | |
| ) | |
| data_location = response.json() | |
| try: | |
| location_ip = data_location["ip"] | |
| location_code = data_location["country_code"] | |
| location_name = data_location["country_name"] | |
| location_region = data_location["region_name"] | |
| location_city = data_location["city_name"] | |
| location_zip = data_location["zip_code"] | |
| location_zone = data_location["time_zone"] | |
| location_card = data_location["as"] | |
| except Exception as e: | |
| return f"error {e}" | |
| if ( | |
| location_ip | |
| and location_code | |
| and location_name | |
| and location_region | |
| and location_city | |
| and location_zip | |
| and location_zone | |
| and location_card | |
| ): | |
| return { | |
| "ip_address": location_ip, | |
| "country_code": location_code, | |
| "region_name": location_region, | |
| "city_name": location_city, | |
| "zip_code": location_zip, | |
| "time_zone": location_zone, | |
| "as": location_card, | |
| } | |
| else: | |
| return {"status": "false", "message": "Invalid ip address"} | |
| def tiktok_douyin(tiktok_url: str = None): | |
| response = requests.get(f"{SOURCE_TIKTOK_WTF_URL}={tiktok_url}") | |
| if response.status_code != 200: | |
| return "Error request:" | |
| try: | |
| download_video = response.json()["aweme_list"][0]["video"]["play_addr"][ | |
| "url_list" | |
| ][0] | |
| download_audio = response.json()["aweme_list"][0]["music"]["play_url"][ | |
| "url_list" | |
| ][0] | |
| description = response.json()["aweme_list"][0]["desc"] | |
| author = response.json()["aweme_list"][0]["author"]["nickname"] | |
| request = response.json()["aweme_list"][0]["author"]["signature"] | |
| return { | |
| "status": "true", | |
| "sukuna": { | |
| "video_url": download_video, | |
| "music_url": download_audio, | |
| "description": description, | |
| "author": author, | |
| "request": request, | |
| }, | |
| } | |
| except BaseException: | |
| return {"status": "false", "message": "Error request"} | |
| def tiktok_downloader(tiktok_url: Union[str, None] = None, only_video: bool = None): | |
| api_devs = SOURCE_TIKTOK_TECH_URL | |
| parameter = f"tiktok?url={tiktok_url}" | |
| api_url = f"{api_devs}/{parameter}" | |
| response = requests.get(api_url) | |
| if response.status_code != 200: | |
| return "Error: Unable to fetch data from the TikTok API" | |
| try: | |
| results = response.json() | |
| caption = results.get("result", {}).get("desc", "") | |
| if only_video: | |
| video_url = results.get("result", {}).get("withoutWaterMarkVideo", "") | |
| if video_url: | |
| return {"download_url": video_url, "caption": caption} | |
| else: | |
| music_mp3 = results.get("result", {}).get("music", "") | |
| if music_mp3: | |
| return {"music_url": music_mp3, "caption": caption} | |
| return "Error: TikTok data not found or unsupported format" | |
| except BaseException: | |
| return {"status": "false", "message": "Invalid Link"} | |
| def mediafire(link: Union[str, None] = None): | |
| try: | |
| down_link = str(link) | |
| mid = down_link.split("/", 5) | |
| if mid[3] == "view": | |
| mid[3] = "file" | |
| down_link = "/".join(mid) | |
| print(down_link) | |
| r = requests.get(down_link) | |
| soup = BeautifulSoup(r.content, "html.parser") | |
| a_href = soup.find("a", {"class": "input popsok"}).get("href") | |
| a = str(a_href) | |
| id = link.split("/", 5)[4] | |
| a_byte = soup.find("a", {"class": "input popsok"}).get_text() | |
| a_name = soup.find("div", {"class": "dl-btn-label"}).get_text() | |
| details = soup.find("ul", {"class": "details"}) | |
| li_items = details.find_all("li")[1] | |
| some = li_items.find_all("span")[0].get_text().split() | |
| dat = list(some) | |
| down = a_byte.replace(" ", "").strip() | |
| time = dat[1] | |
| date = dat[0] | |
| byte = down.split("(", 1)[1].split(")", 1)[0] | |
| name = a_name.replace(" ", "").strip() | |
| return { | |
| "status": "true", | |
| "data": { | |
| "file": { | |
| "url": { | |
| "directDownload": a, | |
| "original": link, | |
| }, | |
| "metadata": { | |
| "id": id, | |
| "name": name, | |
| "size": {"readable": byte}, | |
| "DateAndTime": {"time": time, "date": date}, | |
| }, | |
| } | |
| }, | |
| } | |
| except BaseException: | |
| return "{'status': 'false', 'message': 'Invalid Link'}" | |
| def gdrive(link: Union[str, None] = None): | |
| try: | |
| down = link.split("/", 6) | |
| url = f"https://drive.google.com/uc?export=download&id={down[5]}" | |
| session = requests.Session() | |
| response = session.get(url, stream=True) | |
| headers = response.headers | |
| content_disp = headers.get("content-disposition") | |
| filename = None | |
| if content_disp: | |
| match = re.search(r'filename="(.+)"', content_disp) | |
| if match: | |
| filename = match.group(1) | |
| content_length = headers.get("content-length") | |
| last_modified = headers.get("last-modified") | |
| content_type = headers.get("content-type") | |
| return { | |
| "status": "true", | |
| "data": { | |
| "file": { | |
| "url": { | |
| "directDownload": url, | |
| "original": link, | |
| }, | |
| "metadata": { | |
| "id": down[5], | |
| "name": ( | |
| filename | |
| if filename | |
| else "No filename provided by the server." | |
| ), | |
| "size": { | |
| "readable": ( | |
| f"{round(int(content_length) / (1024 * 1024), 2)} MB" | |
| if content_length | |
| else "No content length provided by the server." | |
| ), | |
| "type": ( | |
| content_type | |
| if content_type | |
| else "No content type provided by the server." | |
| ), | |
| }, | |
| "DateAndTime": ( | |
| last_modified | |
| if last_modified | |
| else "No last modified date provided by the server." | |
| ), | |
| }, | |
| } | |
| }, | |
| } | |
| except BaseException: | |
| return "{'status': 'false', 'message': 'Invalid Link'}" | |
| def anonfiles(link: Union[str, None] = None): | |
| try: | |
| r = requests.get(link) | |
| soup = BeautifulSoup(r.content, "html.parser") | |
| a_href = soup.find("a", {"id": "download-url"}).get("href") | |
| a = str(a_href) | |
| id = link.split("/", 4)[3] | |
| jsondata = requests.get(f"https://api.anonfiles.com/v2/file/{id}/info").json() | |
| jsondata["data"]["file"]["url"]["directDownload"] = a | |
| del jsondata["data"]["file"]["url"]["full"] | |
| return jsondata | |
| except BaseException: | |
| return "{'status': 'false', 'message': 'Invalid Link'}" | |
| def filechan(link: Union[str, None] = None): | |
| try: | |
| r = requests.get(link) | |
| soup = BeautifulSoup(r.content, "html.parser") | |
| a_href = soup.find("a", {"id": "download-url"}).get("href") | |
| a = str(a_href) | |
| id = link.split("/", 4)[3] | |
| jsondata = requests.get(f"https://api.filechan.org/v2/file/{id}/info").json() | |
| jsondata["data"]["file"]["url"]["directDownload"] = a | |
| del jsondata["data"]["file"]["url"]["full"] | |
| return jsondata | |
| except BaseException: | |
| return "{'status': 'false', 'message': 'Invalid Link'}" | |
| def letsupload(link: Union[str, None] = None): | |
| try: | |
| r = requests.get(link) | |
| soup = BeautifulSoup(r.content, "html.parser") | |
| a_href = soup.find("a", {"id": "download-url"}).get("href") | |
| a = str(a_href) | |
| id = link.split("/", 4)[3] | |
| jsondata = requests.get(f"https://api.letsupload.cc/v2/file/{id}/info").json() | |
| jsondata["data"]["file"]["url"]["directDownload"] = a | |
| del jsondata["data"]["file"]["url"]["full"] | |
| return jsondata | |
| except BaseException: | |
| return "{'status': 'false', 'message': 'Invalid Link'}" | |
| def megaupload(link: Union[str, None] = None): | |
| try: | |
| r = requests.get(link) | |
| soup = BeautifulSoup(r.content, "html.parser") | |
| a_href = soup.find("a", {"id": "download-url"}).get("href") | |
| a = str(a_href) | |
| id = link.split("/", 4)[3] | |
| jsondata = requests.get(f"https://api.megaupload.nz/v2/file/{id}/info").json() | |
| jsondata["data"]["file"]["url"]["directDownload"] = a | |
| del jsondata["data"]["file"]["url"]["full"] | |
| return jsondata | |
| except BaseException: | |
| return "{'status': 'false', 'message': 'Invalid Link'}" | |
| def myfile(link: Union[str, None] = None): | |
| try: | |
| r = requests.get(link) | |
| soup = BeautifulSoup(r.content, "html.parser") | |
| a_href = soup.find("a", {"id": "download-url"}).get("href") | |
| a = str(a_href) | |
| id = link.split("/", 4)[3] | |
| jsondata = requests.get(f"https://api.myfile.is/v2/file/{id}/info").json() | |
| jsondata["data"]["file"]["url"]["directDownload"] = a | |
| del jsondata["data"]["file"]["url"]["full"] | |
| return jsondata | |
| except BaseException: | |
| return "{'status': 'false', 'message': 'Invalid Link'}" | |
| def custom_exception_handler(request: Request, exc: HTTPException) -> JSONResponse: | |
| error_detail = [{"error": str(exc.detail)}] | |
| custom_error_model = CustomErrorResponseModel(detail=error_detail) | |
| return JSONResponse( | |
| status_code=exc.status_code, | |
| content=custom_error_model.dict(), | |
| headers=exc.headers, | |
| ) | |
| # Add the custom exception handler to your FastAPI app | |
| app.add_exception_handler(HTTPException, custom_exception_handler) | |