Spaces:
Paused
Paused
import random | |
import time | |
import json | |
import re | |
import aiohttp | |
import requests | |
import asyncio | |
from flask import Flask, request, Response | |
from flask_cors import CORS | |
import uuid | |
app = Flask(__name__) | |
CORS(app) | |
MAGAI_TOKEN = { | |
"cookie": "_fbp=fb.1.1722349051350.28463402121267809;soquick-mobile_u1main=1722349236461x685414888067651300;intercom-id-jnjoad6e=cbbd8fc9-a010-4e8c-8e7e-9cffccd3abea;soquick-mobile_live_u2main.sig=HuQePfrEHGidu4eRyfiZkcL1_2E;__stripe_mid=7767e1a3-e87f-4456-b073-6c8b7ae9e82119b00d;__stripe_sid=99c612a5-a12a-426f-baa5-e61471a013f140c482;_ga=GA1.1.242967908.1722349051;_ga_GFQ25YSHT2=GS1.1.1726123356.1.0.1726123393.0.0.0;_ga_N5J29RVHDJ=GS1.1.1726123395.4.1.1726124637.0.0.0;intercom-device-id-jnjoad6e=35ee824e-f7f6-415d-8698-bd822cb46d3a;intercom-session-jnjoad6e=SXlhUDdqa0E5YTlmUVA2QXk3K3hBSXFNSnRzL2x0cEtvVDF3U1k3UlRCQmhPRVdUcktDYkpwVXpiZFA1SzhUSi0tTFlCazFUamcxbExFRU1LTVlWSitVQT09--40f0dca176cfb28add2903465c54bcee4f33de2b;soquick-mobile_live_u2main=bus|1722349236461x685414888067651300|1726123417637x655253536227564700", | |
"app_last_change": "21388518093", | |
"current_page_item": "1348695171700984260__LOOKUP__1726124636560x692535552825360400", | |
"current_user": "1348695171700984260__LOOKUP__1722349236461x685414888067651300", | |
} | |
MAGAI_MAPPING = { | |
"gpt-4o": "openai/gpt-4o", | |
"claude-3.5-sonnet": "anthropic/claude-3.5-sonnet:beta", | |
"claude-3-opus": "anthropic/claude-3-opus:beta", | |
"gemini-1.5-pro": "google/gemini-pro-1.5" | |
} | |
UUID_LENGTH = 1e18 | |
MODULO = 1e18 | |
def generate_uuid(): | |
return f"{int(time.time() * 1000)}x{str(round(random.random() * UUID_LENGTH)).zfill(18)}" | |
def create_luid(separator="x"): | |
timestamp = int(time.time() * 1000) | |
return f"{timestamp}{separator}1" | |
def format_model_name(model_name): | |
return re.sub(r"_+", "_", re.sub(r"[/:-]", "_", model_name)) | |
def find_token_in_object(obj): | |
if isinstance(obj, dict): | |
for key, value in obj.items(): | |
if key == "token" and isinstance(value, str): | |
return value | |
token = find_token_in_object(value) | |
if token: | |
return token | |
return None | |
def get_last_user_content(messages): | |
for message in reversed(messages): | |
if message["role"] == "user": | |
return message["content"] | |
return None | |
async def get_token(model, message): | |
server_call_id = generate_uuid() | |
created_id = MAGAI_TOKEN["current_page_item"].split("__")[0] | |
user_id = MAGAI_TOKEN["current_user"].split("__")[2] | |
model_id = "0060f9accd1dbade552f65ac646fb3da" | |
item_id = "bUNih7" | |
element_id = "bUNib7" | |
body = { | |
"app_last_change": MAGAI_TOKEN["app_last_change"], | |
"calls": [ | |
{ | |
"client_state": { | |
"element_instances": { | |
"bUNib7": { | |
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUNib7", | |
"parent_element_id": "bUMiq3", | |
}, | |
"bTezP": { | |
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezP", | |
"parent_element_id": "bTezJ", | |
}, | |
"bTezE": { | |
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezE", | |
"parent_element_id": "bTeqc", | |
}, | |
"bTezJ": { | |
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezJ", | |
"parent_element_id": "bUKFL2", | |
}, | |
"bTezQ": { | |
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezQ", | |
"parent_element_id": "bUKFL2", | |
}, | |
"bUiru0": { | |
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUiru0", | |
"parent_element_id": "bUjNK", | |
}, | |
"bUDVj0": { | |
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUDVj0", | |
"parent_element_id": "bUMiq3", | |
}, | |
"bUXzm2": { | |
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUXzm2", | |
"parent_element_id": "bUMhk3", | |
}, | |
"bUifI1": { | |
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUifI1", | |
"parent_element_id": "bTeqg", | |
}, | |
"bUMiq3": { | |
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUMiq3", | |
"parent_element_id": "bTezE", | |
}, | |
"bTekm": { | |
"dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTekm", | |
"parent_element_id": None, | |
}, | |
}, | |
"element_state": { | |
f"{created_id}__LOOKUP__ElementInstance::bTezP": { | |
"is_visible": True, | |
"value_that_is_valid": message, | |
"value": message, | |
}, | |
f"{created_id}__LOOKUP__ElementInstance::bTezE": { | |
"custom.images_": None, | |
"custom.file_": None, | |
"custom.file_content_": None, | |
"custom.file_name_": None, | |
"custom.file_type_": None, | |
}, | |
f"{created_id}__LOOKUP__ElementInstance::bTezJ": { | |
"custom.isrecording_": None, | |
"custom.prompt_": None, | |
}, | |
f"{created_id}__LOOKUP__ElementInstance::bUiru0": { | |
"AAE": message | |
}, | |
f"{created_id}__LOOKUP__ElementInstance::bUDVj0": { | |
"AAE": message | |
}, | |
f"{created_id}__LOOKUP__ElementInstance::bUifI1": { | |
"custom.is_visible_": None, | |
"group_data": None, | |
}, | |
f"{created_id}__LOOKUP__ElementInstance::bUMiq3": { | |
"group_data": None | |
}, | |
}, | |
"other_data": { | |
"Current Page Scroll Position": 0, | |
"Current Page Width": 661, | |
}, | |
"cache": { | |
f"{model_id}": format_model_name(model), | |
"true": True, | |
"CurrentPageItem": MAGAI_TOKEN["current_page_item"], | |
"CurrentUser": MAGAI_TOKEN["current_user"], | |
}, | |
"exists": { | |
f"{model_id}": True, | |
"true": True, | |
"CurrentPageItem": True, | |
"CurrentUser": True, | |
}, | |
}, | |
"run_id": generate_uuid(), | |
"server_call_id": server_call_id, | |
"item_id": item_id, | |
"element_id": element_id, | |
"page_id": "bTekm", | |
"uid_generator": { | |
"timestamp": int(time.time() * 1000), | |
"seed": round(random.random() * UUID_LENGTH) % MODULO, | |
}, | |
"random_seed": random.random(), | |
"current_date_time": int(time.time() * 1000), | |
"current_wf_params": {}, | |
} | |
], | |
"client_breaking_revision": 5, | |
"timezone_offset": -480, | |
"timezone_string": "Asia/Shanghai", | |
"user_id": user_id, | |
"wait_for": [], | |
} | |
url = "https://app.magai.co/workflow/start" | |
async with aiohttp.ClientSession() as session: | |
async with session.post( | |
url, | |
headers={ | |
"x-bubble-fiber-id": generate_uuid(), | |
"x-bubble-pl": create_luid(), | |
"accept": "application/json, text/javascript, */*; q=0.01", | |
"cookie": MAGAI_TOKEN["cookie"], | |
}, | |
json=body, | |
) as response: | |
response_data = await response.json() | |
if "error_class" in response_data: | |
raise Exception(response_data) | |
server_call_data = response_data.get(server_call_id) | |
if not server_call_data or "step_results" not in server_call_data: | |
return None | |
for step_result in server_call_data["step_results"].values(): | |
if isinstance(step_result.get("return_value"), dict): | |
token = find_token_in_object(step_result["return_value"]) | |
if token: | |
return token | |
async def get_request_data(model, messages): | |
if model not in MAGAI_MAPPING: | |
return Response( | |
json.dumps( | |
{ | |
"error": { | |
"message": "This model is currently unavailable. Please try again later or choose another model.", | |
"code": "model_not_exists", | |
} | |
} | |
), | |
status=400, | |
mimetype="application/json", | |
) | |
last_user_message = get_last_user_content(messages) | |
token = await get_token(MAGAI_MAPPING[model], last_user_message) | |
headers = { | |
"Content-Type": "application/json", | |
"HTTP-Referer": "https://magai.co", | |
"Origin": "https://app.magai.co", | |
"Pragma": "no-cache", | |
"Referer": "https://app.magai.co/", | |
"Token": token, | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/500.00 (KHTML, like Gecko) Chrome/100.0.0.0 Safari/500.00", | |
} | |
json_data = { | |
"model": MAGAI_MAPPING[model], | |
"messages": [{"role": "system", "content": "You are a helpful assistant."}] | |
+ messages, | |
"tools": [ | |
{ | |
"type": "function", | |
"function": { | |
"name": "get_actual_time_info", | |
"description": "Returns actual information from web about prompt theme.", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"query": { | |
"type": "string", | |
"description": "The query string based on users prompt to search information about.", | |
} | |
}, | |
"required": ["query"], | |
}, | |
}, | |
}, | |
{ | |
"type": "function", | |
"function": { | |
"name": "generate_image", | |
"description": "Returns generated image URL.", | |
"parameters": { | |
"type": "object", | |
"properties": { | |
"query": { | |
"type": "string", | |
"description": "Prompt to image generation AI model, that describes what image to generate.", | |
} | |
}, | |
"required": ["query"], | |
}, | |
}, | |
}, | |
], | |
"provider": {"data_collection": "deny"}, | |
"tool_choice": "auto", | |
"stream": True, | |
} | |
response = requests.post( | |
"https://live.proxy.magai.co:4430/opr/api/v1/chat/completions", | |
headers=headers, | |
json=json_data, | |
) | |
return response | |
def format_response(response): | |
content = "" | |
for line in response.iter_lines(): | |
if line: | |
decoded_line = line.decode("utf-8") | |
if decoded_line.startswith("data:"): | |
try: | |
data = json.loads(decoded_line[5:].strip()) | |
if "choices" in data and len(data["choices"]) > 0: | |
delta = data["choices"][0].get("delta", {}) | |
if "content" in delta: | |
content += delta["content"] | |
except json.JSONDecodeError: | |
pass | |
return content | |
def chat_completions(): | |
data = request.json | |
messages = data.get("messages", []) | |
model = data.get("model", "claude-3.5-sonnet") | |
async def process_request(): | |
response = await get_request_data(model, messages) | |
return format_response(response) | |
loop = asyncio.new_event_loop() | |
asyncio.set_event_loop(loop) | |
result = loop.run_until_complete(process_request()) | |
event_stream_response = "" | |
for part in result: | |
part = part.replace("\n", "\\n") | |
event_stream_response += f'data:{{"id":"{uuid.uuid4()}","object":"chat.completion.chunk","created":{int(time.time())},"model":"{model}","system_fingerprint":"fp_45ah8ld5a7","choices":[{{"index":0,"delta":{{"content":"{part}"}},"logprobs":null,"finish_reason":null}}]}}\n\n' | |
event_stream_response += "data:[DONE]\n" | |
return Response(event_stream_response, mimetype="text/event-stream") | |
if __name__ == "__main__": | |
app.run(host="0.0.0.0", port=7860) |