Magai2API / app.py
tianlong12's picture
Update app.py
c1d6aa2 verified
raw
history blame
13.7 kB
import random
import time
import json
import re
import aiohttp
import requests
import asyncio
from flask import Flask, request, Response
from flask_cors import CORS
import uuid
app = Flask(__name__)
CORS(app)
MAGAI_TOKEN = {
"cookie": "_fbp=fb.1.1722349051350.28463402121267809;soquick-mobile_u1main=1722349236461x685414888067651300;intercom-id-jnjoad6e=cbbd8fc9-a010-4e8c-8e7e-9cffccd3abea;soquick-mobile_live_u2main.sig=HuQePfrEHGidu4eRyfiZkcL1_2E;__stripe_mid=7767e1a3-e87f-4456-b073-6c8b7ae9e82119b00d;__stripe_sid=99c612a5-a12a-426f-baa5-e61471a013f140c482;_ga=GA1.1.242967908.1722349051;_ga_GFQ25YSHT2=GS1.1.1726123356.1.0.1726123393.0.0.0;_ga_N5J29RVHDJ=GS1.1.1726123395.4.1.1726124637.0.0.0;intercom-device-id-jnjoad6e=35ee824e-f7f6-415d-8698-bd822cb46d3a;intercom-session-jnjoad6e=SXlhUDdqa0E5YTlmUVA2QXk3K3hBSXFNSnRzL2x0cEtvVDF3U1k3UlRCQmhPRVdUcktDYkpwVXpiZFA1SzhUSi0tTFlCazFUamcxbExFRU1LTVlWSitVQT09--40f0dca176cfb28add2903465c54bcee4f33de2b;soquick-mobile_live_u2main=bus|1722349236461x685414888067651300|1726123417637x655253536227564700",
"app_last_change": "21388518093",
"current_page_item": "1348695171700984260__LOOKUP__1726124636560x692535552825360400",
"current_user": "1348695171700984260__LOOKUP__1722349236461x685414888067651300",
}
MAGAI_MAPPING = {
"gpt-4o": "openai/gpt-4o",
"claude-3.5-sonnet": "anthropic/claude-3.5-sonnet:beta",
"claude-3-opus": "anthropic/claude-3-opus:beta",
"gemini-1.5-pro": "google/gemini-pro-1.5"
}
UUID_LENGTH = 1e18
MODULO = 1e18
def generate_uuid():
return f"{int(time.time() * 1000)}x{str(round(random.random() * UUID_LENGTH)).zfill(18)}"
def create_luid(separator="x"):
timestamp = int(time.time() * 1000)
return f"{timestamp}{separator}1"
def format_model_name(model_name):
return re.sub(r"_+", "_", re.sub(r"[/:-]", "_", model_name))
def find_token_in_object(obj):
if isinstance(obj, dict):
for key, value in obj.items():
if key == "token" and isinstance(value, str):
return value
token = find_token_in_object(value)
if token:
return token
return None
def get_last_user_content(messages):
for message in reversed(messages):
if message["role"] == "user":
return message["content"]
return None
async def get_token(model, message):
server_call_id = generate_uuid()
created_id = MAGAI_TOKEN["current_page_item"].split("__")[0]
user_id = MAGAI_TOKEN["current_user"].split("__")[2]
model_id = "0060f9accd1dbade552f65ac646fb3da"
item_id = "bUNih7"
element_id = "bUNib7"
body = {
"app_last_change": MAGAI_TOKEN["app_last_change"],
"calls": [
{
"client_state": {
"element_instances": {
"bUNib7": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUNib7",
"parent_element_id": "bUMiq3",
},
"bTezP": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezP",
"parent_element_id": "bTezJ",
},
"bTezE": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezE",
"parent_element_id": "bTeqc",
},
"bTezJ": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezJ",
"parent_element_id": "bUKFL2",
},
"bTezQ": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezQ",
"parent_element_id": "bUKFL2",
},
"bUiru0": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUiru0",
"parent_element_id": "bUjNK",
},
"bUDVj0": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUDVj0",
"parent_element_id": "bUMiq3",
},
"bUXzm2": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUXzm2",
"parent_element_id": "bUMhk3",
},
"bUifI1": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUifI1",
"parent_element_id": "bTeqg",
},
"bUMiq3": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUMiq3",
"parent_element_id": "bTezE",
},
"bTekm": {
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTekm",
"parent_element_id": None,
},
},
"element_state": {
f"{created_id}__LOOKUP__ElementInstance::bTezP": {
"is_visible": True,
"value_that_is_valid": message,
"value": message,
},
f"{created_id}__LOOKUP__ElementInstance::bTezE": {
"custom.images_": None,
"custom.file_": None,
"custom.file_content_": None,
"custom.file_name_": None,
"custom.file_type_": None,
},
f"{created_id}__LOOKUP__ElementInstance::bTezJ": {
"custom.isrecording_": None,
"custom.prompt_": None,
},
f"{created_id}__LOOKUP__ElementInstance::bUiru0": {
"AAE": message
},
f"{created_id}__LOOKUP__ElementInstance::bUDVj0": {
"AAE": message
},
f"{created_id}__LOOKUP__ElementInstance::bUifI1": {
"custom.is_visible_": None,
"group_data": None,
},
f"{created_id}__LOOKUP__ElementInstance::bUMiq3": {
"group_data": None
},
},
"other_data": {
"Current Page Scroll Position": 0,
"Current Page Width": 661,
},
"cache": {
f"{model_id}": format_model_name(model),
"true": True,
"CurrentPageItem": MAGAI_TOKEN["current_page_item"],
"CurrentUser": MAGAI_TOKEN["current_user"],
},
"exists": {
f"{model_id}": True,
"true": True,
"CurrentPageItem": True,
"CurrentUser": True,
},
},
"run_id": generate_uuid(),
"server_call_id": server_call_id,
"item_id": item_id,
"element_id": element_id,
"page_id": "bTekm",
"uid_generator": {
"timestamp": int(time.time() * 1000),
"seed": round(random.random() * UUID_LENGTH) % MODULO,
},
"random_seed": random.random(),
"current_date_time": int(time.time() * 1000),
"current_wf_params": {},
}
],
"client_breaking_revision": 5,
"timezone_offset": -480,
"timezone_string": "Asia/Shanghai",
"user_id": user_id,
"wait_for": [],
}
url = "https://app.magai.co/workflow/start"
async with aiohttp.ClientSession() as session:
async with session.post(
url,
headers={
"x-bubble-fiber-id": generate_uuid(),
"x-bubble-pl": create_luid(),
"accept": "application/json, text/javascript, */*; q=0.01",
"cookie": MAGAI_TOKEN["cookie"],
},
json=body,
) as response:
response_data = await response.json()
if "error_class" in response_data:
raise Exception(response_data)
server_call_data = response_data.get(server_call_id)
if not server_call_data or "step_results" not in server_call_data:
return None
for step_result in server_call_data["step_results"].values():
if isinstance(step_result.get("return_value"), dict):
token = find_token_in_object(step_result["return_value"])
if token:
return token
async def get_request_data(model, messages):
if model not in MAGAI_MAPPING:
return Response(
json.dumps(
{
"error": {
"message": "This model is currently unavailable. Please try again later or choose another model.",
"code": "model_not_exists",
}
}
),
status=400,
mimetype="application/json",
)
last_user_message = get_last_user_content(messages)
token = await get_token(MAGAI_MAPPING[model], last_user_message)
headers = {
"Content-Type": "application/json",
"HTTP-Referer": "https://magai.co",
"Origin": "https://app.magai.co",
"Pragma": "no-cache",
"Referer": "https://app.magai.co/",
"Token": token,
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/500.00 (KHTML, like Gecko) Chrome/100.0.0.0 Safari/500.00",
}
json_data = {
"model": MAGAI_MAPPING[model],
"messages": [{"role": "system", "content": "You are a helpful assistant."}]
+ messages,
"tools": [
{
"type": "function",
"function": {
"name": "get_actual_time_info",
"description": "Returns actual information from web about prompt theme.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The query string based on users prompt to search information about.",
}
},
"required": ["query"],
},
},
},
{
"type": "function",
"function": {
"name": "generate_image",
"description": "Returns generated image URL.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Prompt to image generation AI model, that describes what image to generate.",
}
},
"required": ["query"],
},
},
},
],
"provider": {"data_collection": "deny"},
"tool_choice": "auto",
"stream": True,
}
response = requests.post(
"https://live.proxy.magai.co:4430/opr/api/v1/chat/completions",
headers=headers,
json=json_data,
)
return response
def format_response(response):
content = ""
for line in response.iter_lines():
if line:
decoded_line = line.decode("utf-8")
if decoded_line.startswith("data:"):
try:
data = json.loads(decoded_line[5:].strip())
if "choices" in data and len(data["choices"]) > 0:
delta = data["choices"][0].get("delta", {})
if "content" in delta:
content += delta["content"]
except json.JSONDecodeError:
pass
return content
@app.route("/hf/v1/chat/completions", methods=["POST"])
def chat_completions():
data = request.json
messages = data.get("messages", [])
model = data.get("model", "claude-3.5-sonnet")
async def process_request():
response = await get_request_data(model, messages)
return format_response(response)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(process_request())
event_stream_response = ""
for part in result:
part = part.replace("\n", "\\n")
event_stream_response += f'data:{{"id":"{uuid.uuid4()}","object":"chat.completion.chunk","created":{int(time.time())},"model":"{model}","system_fingerprint":"fp_45ah8ld5a7","choices":[{{"index":0,"delta":{{"content":"{part}"}},"logprobs":null,"finish_reason":null}}]}}\n\n'
event_stream_response += "data:[DONE]\n"
return Response(event_stream_response, mimetype="text/event-stream")
if __name__ == "__main__":
app.run(host="0.0.0.0", port=7860)