Magai2API / app.py
tianlong12's picture
Update app.py
520b9c6 verified
raw
history blame
14.1 kB
import os
import time
import json
import uuid
import asyncio
import aiohttp
import random
import re
from flask import Flask, request, Response
from flask_cors import CORS
app = Flask(__name__)
CORS(app, resources={r"/*": {"origins": "*"}})
# Load environment variables
MAGAI_TOKEN = {
"cookie": "_fbp=fb.1.1722349051350.28463402121267809;soquick-mobile_u1main=1722349236461x685414888067651300;intercom-id-jnjoad6e=cbbd8fc9-a010-4e8c-8e7e-9cffccd3abea;soquick-mobile_live_u2main.sig=HuQePfrEHGidu4eRyfiZkcL1_2E;__stripe_mid=7767e1a3-e87f-4456-b073-6c8b7ae9e82119b00d;__stripe_sid=99c612a5-a12a-426f-baa5-e61471a013f140c482;_ga=GA1.1.242967908.1722349051;_ga_GFQ25YSHT2=GS1.1.1726123356.1.0.1726123393.0.0.0;_ga_N5J29RVHDJ=GS1.1.1726123395.4.1.1726124637.0.0.0;intercom-device-id-jnjoad6e=35ee824e-f7f6-415d-8698-bd822cb46d3a;intercom-session-jnjoad6e=SXlhUDdqa0E5YTlmUVA2QXk3K3hBSXFNSnRzL2x0cEtvVDF3U1k3UlRCQmhPRVdUcktDYkpwVXpiZFA1SzhUSi0tTFlCazFUamcxbExFRU1LTVlWSitVQT09--40f0dca176cfb28add2903465c54bcee4f33de2b;soquick-mobile_live_u2main=bus|1722349236461x685414888067651300|1726123417637x655253536227564700",
"app_last_change": "21388518093",
"current_page_item": "1348695171700984260__LOOKUP__1726124636560x692535552825360400",
"current_user": "1348695171700984260__LOOKUP__1722349236461x685414888067651300",
}
MAGAI_COOKIE = MAGAI_TOKEN["cookie"]
MAGAI_APP_LAST_CHANGE = MAGAI_TOKEN["app_last_change"]
MAGAI_CURRENT_PAGE_ITEM = MAGAI_TOKEN["current_page_item"]
MAGAI_CURRENT_USER = MAGAI_TOKEN["current_user"]
MAGAI_MAPPING = {
"gpt-4o": "openai/gpt-4o",
"claude-3.5-sonnet": "anthropic/claude-3.5-sonnet:beta",
"claude-3-opus": "anthropic/claude-3-opus:beta",
"gemini-1.5-pro": "google/gemini-pro-1.5"
}
UUID_LENGTH = 1e18
MODULO = 1e18
def generate_uuid():
return f"{int(time.time() * 1000)}x{str(round(random.random() * UUID_LENGTH)).zfill(18)}"
def create_luid(separator="x"):
timestamp = int(time.time() * 1000)
return f"{timestamp}{separator}1"
def format_model_name(model_name):
return re.sub(r"_+", "_", re.sub(r"[/:-]", "_", model_name))
def find_token_in_object(obj):
if isinstance(obj, dict):
for key, value in obj.items():
if key == "token" and isinstance(value, str):
return value
token = find_token_in_object(value)
if token:
return token
return None
def get_last_user_content(messages):
for message in reversed(messages):
if message["role"] == "user":
return message["content"]
return None
async def get_token(model, message):
server_call_id = generate_uuid()
created_id = MAGAI_CURRENT_PAGE_ITEM.split("__")[0]
user_id = MAGAI_CURRENT_USER.split("__")[2]
model_id = "0060f9accd1dbade552f65ac646fb3da"
item_id = "bUNih7"
element_id = "bUNib7"
body = {
"app_last_change": MAGAI_APP_LAST_CHANGE,
"calls": [
{
"client_state": {
"element_instances": {
"bUNib7": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUNib7",
"parent_element_id": "bUMiq3",
},
"bTezP": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezP",
"parent_element_id": "bTezJ",
},
"bTezE": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezE",
"parent_element_id": "bTeqc",
},
"bTezJ": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezJ",
"parent_element_id": "bUKFL2",
},
"bTezQ": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezQ",
"parent_element_id": "bUKFL2",
},
"bUiru0": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUiru0",
"parent_element_id": "bUjNK",
},
"bUDVj0": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUDVj0",
"parent_element_id": "bUMiq3",
},
"bUXzm2": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUXzm2",
"parent_element_id": "bUMhk3",
},
"bUifI1": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUifI1",
"parent_element_id": "bTeqg",
},
"bUMiq3": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUMiq3",
"parent_element_id": "bTezE",
},
"bTekm": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTekm",
"parent_element_id": None,
},
},
"element_state": {
f"{created_id}__LOOKUP__ElementInstance::bTezP": {
"is_visible": True,
"value_that_is_valid": message,
"value": message,
},
f"{created_id}__LOOKUP__ElementInstance::bTezE": {
"custom.images_": None,
"custom.file_": None,
"custom.file_content_": None,
"custom.file_name_": None,
"custom.file_type_": None,
},
f"{created_id}__LOOKUP__ElementInstance::bTezJ": {
"custom.isrecording_": None,
"custom.prompt_": None,
},
f"{created_id}__LOOKUP__ElementInstance::bUiru0": {
"AAE": message
},
f"{created_id}__LOOKUP__ElementInstance::bUDVj0": {
"AAE": message
},
f"{created_id}__LOOKUP__ElementInstance::bUifI1": {
"custom.is_visible_": None,
"group_data": None,
},
f"{created_id}__LOOKUP__ElementInstance::bUMiq3": {
"group_data": None
},
},
"other_data": {
"Current Page Scroll Position": 0,
"Current Page Width": 661,
},
"cache": {
f"{model_id}": format_model_name(model),
"true": True,
"CurrentPageItem": MAGAI_CURRENT_PAGE_ITEM,
"CurrentUser": MAGAI_CURRENT_USER,
},
"exists": {
f"{model_id}": True,
"true": True,
"CurrentPageItem": True,
"CurrentUser": True,
},
},
"run_id": generate_uuid(),
"server_call_id": server_call_id,
"item_id": item_id,
"element_id": element_id,
"uid_generator": {
"timestamp": int(time.time() * 1000),
"seed": round(random.random() * UUID_LENGTH) % MODULO,
},
"random_seed": random.random(),
"current_date_time": int(time.time() * 1000),
"current_wf_params": {},
}
],
"client_breaking_revision": 5,
"timezone_offset": -480,
"timezone_string": "Asia/Shanghai",
"user_id": user_id,
"wait_for": [],
}
url = "https://app.magai.co/workflow/start"
async with aiohttp.ClientSession() as session:
async with session.post(
url,
headers={
"x-bubble-fiber-id": generate_uuid(),
"x-bubble-pl": create_luid(),
"accept": "application/json, text/javascript, */*; q=0.01",
"cookie": MAGAI_COOKIE,
},
json=body,
) as response:
response_data = await response.json()
if "error_class" in response_data:
raise Exception(response_data)
server_call_data = response_data.get(server_call_id)
if not server_call_data or "step_results" not in server_call_data:
return None
for step_result in server_call_data["step_results"].values():
if isinstance(step_result.get("return_value"), dict):
token = find_token_in_object(step_result["return_value"])
if token:
return token
async def get_request_data(model, messages):
if model not in MAGAI_MAPPING:
return None, "Model not available"
last_user_message = get_last_user_content(messages)
if not last_user_message:
return None, "No user message found"
try:
token = await get_token(MAGAI_MAPPING[model], last_user_message)
if not token:
return None, "Failed to obtain token"
headers = {
"Content-Type": "application/json",
"HTTP-Referer": "https://magai.co",
"Origin": "https://app.magai.co",
"Pragma": "no-cache",
"Referer": "https://app.magai.co/",
"Token": token,
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/500.00 (KHTML, like Gecko) Chrome/100.0.0.0 Safari/500.00",
}
json_data = {
"model": MAGAI_MAPPING[model],
"messages": [{"role": "system", "content": "You are a helpful assistant."}] + messages,
"tools": [
{
"type": "function",
"function": {
"name": "get_actual_time_info",
"description": "Returns actual information from web about prompt theme.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The query string based on users prompt to search information about.",
}
},
"required": ["query"],
},
},
},
{
"type": "function",
"function": {
"name": "generate_image",
"description": "Returns generated image URL.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Prompt to image generation AI model, that describes what image to generate.",
}
},
"required": ["query"],
},
},
},
],
"provider": {"data_collection": "deny"},
"tool_choice": "auto",
"stream": True,
}
async with aiohttp.ClientSession() as session:
async with session.post(
"https://live.proxy.magai.co:4430/opr/api/v1/chat/completions",
headers=headers,
json=json_data,
) as response:
return response, None
except Exception as e:
return None, str(e)
@app.route("/hf/v1/chat/completions", methods=["POST"])
async def chat_completions():
try:
data = request.json
messages = data.get("messages", [])
model = data.get("model", "claude-3.5-sonnet")
response, error = await get_request_data(model, messages)
if error:
return Response(json.dumps({"error": error}), status=400, mimetype="application/json")
async def generate():
async for line in response.content:
if line:
decoded_line = line.decode("utf-8")
if decoded_line.startswith("data:"):
try:
data = json.loads(decoded_line[5:].strip())
if "choices" in data and len(data["choices"]) > 0:
delta = data["choices"][0].get("delta", {})
if "content" in delta:
content = delta["content"].replace("\n", "\\n")
yield f'data:{{"id":"{uuid.uuid4()}","object":"chat.completion.chunk","created":{int(time.time())},"model":"{model}","system_fingerprint":"fp_45ah8ld5a7","choices":[{{"index":0,"delta":{{"content":"{content}"}},"logprobs":null,"finish_reason":null}}]}}\n\n'
except json.JSONDecodeError:
pass
yield "data:[DONE]\n"
return Response(generate(), mimetype="text/event-stream")
except Exception as e:
return Response(json.dumps({"error": str(e)}), status=500, mimetype="application/json")
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860)