Spaces:
Paused
Paused
| import random | |
| import time | |
| import json | |
| import re | |
| import aiohttp | |
| import requests | |
| import asyncio | |
| from flask import Flask, request, Response | |
| from flask_cors import CORS | |
| import uuid | |
| app = Flask(__name__) | |
| CORS(app) | |
| MAGAI_TOKEN = { | |
| "cookie": "_fbp=fb.1.1722349051350.28463402121267809;soquick-mobile_u1main=1722349236461x685414888067651300;intercom-id-jnjoad6e=cbbd8fc9-a010-4e8c-8e7e-9cffccd3abea;soquick-mobile_live_u2main.sig=HuQePfrEHGidu4eRyfiZkcL1_2E;__stripe_mid=7767e1a3-e87f-4456-b073-6c8b7ae9e82119b00d;__stripe_sid=99c612a5-a12a-426f-baa5-e61471a013f140c482;_ga=GA1.1.242967908.1722349051;_ga_GFQ25YSHT2=GS1.1.1726123356.1.0.1726123393.0.0.0;_ga_N5J29RVHDJ=GS1.1.1726123395.4.1.1726123416.0.0.0;intercom-device-id-jnjoad6e=35ee824e-f7f6-415d-8698-bd822cb46d3a;intercom-session-jnjoad6e=TS9MVXB3RVNxOWJBTlVxN3MzcFRhZGJRL05TN2FpUlI2MVpnc3JDaG9TZi81ZERUNXkweVdzVUxicCt5VmJKVy0tTmo4d1loRSs0dWlqRUJmMm1NK2tHUT09--2f27d9afeb23d0d2ba40d57ca6ec33bb6ddb20d1;soquick-mobile_live_u2main=bus|1722349236461x685414888067651300|1726123417637x655253536227564700", | |
| "app_last_change": "填写app_last_change", | |
| "current_page_item": "填写current_page_item", | |
| "current_user": "填写current_user", | |
| } | |
| MAGAI_MAPPING = { | |
| "gpt-4o": "openai/gpt-4o", | |
| "claude-3.5-sonnet": "anthropic/claude-3.5-sonnet:beta", | |
| "claude-3-opus": "anthropic/claude-3-opus:beta", | |
| "gemini-1.5-pro": "google/gemini-pro-1.5" | |
| } | |
| UUID_LENGTH = 1e18 | |
| MODULO = 1e18 | |
| def generate_uuid(): | |
| return f"{int(time.time() * 1000)}x{str(round(random.random() * UUID_LENGTH)).zfill(18)}" | |
| def create_luid(separator="x"): | |
| timestamp = int(time.time() * 1000) | |
| return f"{timestamp}{separator}1" | |
| def format_model_name(model_name): | |
| return re.sub(r"_+", "_", re.sub(r"[/:-]", "_", model_name)) | |
| def find_token_in_object(obj): | |
| if isinstance(obj, dict): | |
| for key, value in obj.items(): | |
| if key == "token" and isinstance(value, str): | |
| return value | |
| token = find_token_in_object(value) | |
| if token: | |
| return token | |
| return None | |
| def get_last_user_content(messages): | |
| for message in reversed(messages): | |
| if message["role"] == "user": | |
| return message["content"] | |
| return None | |
| async def get_token(model, message): | |
| server_call_id = generate_uuid() | |
| created_id = MAGAI_TOKEN["current_page_item"].split("__")[0] | |
| user_id = MAGAI_TOKEN["current_user"].split("__")[2] | |
| model_id = "0060f9accd1dbade552f65ac646fb3da" | |
| item_id = "bUNih7" | |
| element_id = "bUNib7" | |
| body = { | |
| "app_last_change": MAGAI_TOKEN["app_last_change"], | |
| "calls": [ | |
| { | |
| "client_state": { | |
| "element_instances": { | |
| "bUNib7": { | |
| "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUNib7", | |
| "parent_element_id": "bUMiq3", | |
| }, | |
| "bTezP": { | |
| "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezP", | |
| "parent_element_id": "bTezJ", | |
| }, | |
| "bTezE": { | |
| "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezE", | |
| "parent_element_id": "bTeqc", | |
| }, | |
| "bTezJ": { | |
| "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezJ", | |
| "parent_element_id": "bUKFL2", | |
| }, | |
| "bTezQ": { | |
| "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTezQ", | |
| "parent_element_id": "bUKFL2", | |
| }, | |
| "bUiru0": { | |
| "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUiru0", | |
| "parent_element_id": "bUjNK", | |
| }, | |
| "bUDVj0": { | |
| "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUDVj0", | |
| "parent_element_id": "bUMiq3", | |
| }, | |
| "bUXzm2": { | |
| "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUXzm2", | |
| "parent_element_id": "bUMhk3", | |
| }, | |
| "bUifI1": { | |
| "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUifI1", | |
| "parent_element_id": "bTeqg", | |
| }, | |
| "bUMiq3": { | |
| "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bUMiq3", | |
| "parent_element_id": "bTezE", | |
| }, | |
| "bTekm": { | |
| "dehydrated": f"{created_id}__LOOKUP__ElementInstance::bTekm", | |
| "parent_element_id": None, | |
| }, | |
| }, | |
| "element_state": { | |
| f"{created_id}__LOOKUP__ElementInstance::bTezP": { | |
| "is_visible": True, | |
| "value_that_is_valid": message, | |
| "value": message, | |
| }, | |
| f"{created_id}__LOOKUP__ElementInstance::bTezE": { | |
| "custom.images_": None, | |
| "custom.file_": None, | |
| "custom.file_content_": None, | |
| "custom.file_name_": None, | |
| "custom.file_type_": None, | |
| }, | |
| f"{created_id}__LOOKUP__ElementInstance::bTezJ": { | |
| "custom.isrecording_": None, | |
| "custom.prompt_": None, | |
| }, | |
| f"{created_id}__LOOKUP__ElementInstance::bUiru0": { | |
| "AAE": message | |
| }, | |
| f"{created_id}__LOOKUP__ElementInstance::bUDVj0": { | |
| "AAE": message | |
| }, | |
| f"{created_id}__LOOKUP__ElementInstance::bUifI1": { | |
| "custom.is_visible_": None, | |
| "group_data": None, | |
| }, | |
| f"{created_id}__LOOKUP__ElementInstance::bUMiq3": { | |
| "group_data": None | |
| }, | |
| }, | |
| "other_data": { | |
| "Current Page Scroll Position": 0, | |
| "Current Page Width": 661, | |
| }, | |
| "cache": { | |
| f"{model_id}": format_model_name(model), | |
| "true": True, | |
| "CurrentPageItem": MAGAI_TOKEN["current_page_item"], | |
| "CurrentUser": MAGAI_TOKEN["current_user"], | |
| }, | |
| "exists": { | |
| f"{model_id}": True, | |
| "true": True, | |
| "CurrentPageItem": True, | |
| "CurrentUser": True, | |
| }, | |
| }, | |
| "run_id": generate_uuid(), | |
| "server_call_id": server_call_id, | |
| "item_id": item_id, | |
| "element_id": element_id, | |
| "uid_generator": { | |
| "timestamp": int(time.time() * 1000), | |
| "seed": round(random.random() * UUID_LENGTH) % MODULO, | |
| }, | |
| "random_seed": random.random(), | |
| "current_date_time": int(time.time() * 1000), | |
| "current_wf_params": {}, | |
| } | |
| ], | |
| "client_breaking_revision": 5, | |
| "timezone_offset": -480, | |
| "timezone_string": "Asia/Shanghai", | |
| "user_id": user_id, | |
| "wait_for": [], | |
| } | |
| url = "https://app.magai.co/workflow/start" | |
| async with aiohttp.ClientSession() as session: | |
| async with session.post( | |
| url, | |
| headers={ | |
| "x-bubble-fiber-id": generate_uuid(), | |
| "x-bubble-pl": create_luid(), | |
| "accept": "application/json, text/javascript, */*; q=0.01", | |
| "cookie": MAGAI_TOKEN["cookie"], | |
| }, | |
| json=body, | |
| ) as response: | |
| response_data = await response.json() | |
| if "error_class" in response_data: | |
| raise Exception(response_data) | |
| server_call_data = response_data.get(server_call_id) | |
| if not server_call_data or "step_results" not in server_call_data: | |
| return None | |
| for step_result in server_call_data["step_results"].values(): | |
| if isinstance(step_result.get("return_value"), dict): | |
| token = find_token_in_object(step_result["return_value"]) | |
| if token: | |
| return token | |
| async def get_request_data(model, messages): | |
| if model not in MAGAI_MAPPING: | |
| return Response( | |
| json.dumps( | |
| { | |
| "error": { | |
| "message": "This model is currently unavailable. Please try again later or choose another model.", | |
| "code": "model_not_exists", | |
| } | |
| } | |
| ), | |
| status=400, | |
| mimetype="application/json", | |
| ) | |
| last_user_message = get_last_user_content(messages) | |
| token = await get_token(MAGAI_MAPPING[model], last_user_message) | |
| headers = { | |
| "Content-Type": "application/json", | |
| "HTTP-Referer": "https://magai.co", | |
| "Origin": "https://app.magai.co", | |
| "Pragma": "no-cache", | |
| "Referer": "https://app.magai.co/", | |
| "Token": token, | |
| "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/500.00 (KHTML, like Gecko) Chrome/100.0.0.0 Safari/500.00", | |
| } | |
| json_data = { | |
| "model": MAGAI_MAPPING[model], | |
| "messages": [{"role": "system", "content": "You are a helpful assistant."}] | |
| + messages, | |
| "tools": [ | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "get_actual_time_info", | |
| "description": "Returns actual information from web about prompt theme.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "query": { | |
| "type": "string", | |
| "description": "The query string based on users prompt to search information about.", | |
| } | |
| }, | |
| "required": ["query"], | |
| }, | |
| }, | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "generate_image", | |
| "description": "Returns generated image URL.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "query": { | |
| "type": "string", | |
| "description": "Prompt to image generation AI model, that describes what image to generate.", | |
| } | |
| }, | |
| "required": ["query"], | |
| }, | |
| }, | |
| }, | |
| ], | |
| "provider": {"data_collection": "deny"}, | |
| "tool_choice": "auto", | |
| "stream": True, | |
| } | |
| response = requests.post( | |
| "https://live.proxy.magai.co:4430/opr/api/v1/chat/completions", | |
| headers=headers, | |
| json=json_data, | |
| ) | |
| return response | |
| def format_response(response): | |
| content = "" | |
| for line in response.iter_lines(): | |
| if line: | |
| decoded_line = line.decode("utf-8") | |
| if decoded_line.startswith("data:"): | |
| try: | |
| data = json.loads(decoded_line[5:].strip()) | |
| if "choices" in data and len(data["choices"]) > 0: | |
| delta = data["choices"][0].get("delta", {}) | |
| if "content" in delta: | |
| content += delta["content"] | |
| except json.JSONDecodeError: | |
| pass | |
| return content | |
| def chat_completions(): | |
| data = request.json | |
| messages = data.get("messages", []) | |
| model = data.get("model", "claude-3.5-sonnet") | |
| async def process_request(): | |
| response = await get_request_data(model, messages) | |
| return format_response(response) | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| result = loop.run_until_complete(process_request()) | |
| event_stream_response = "" | |
| for part in result: | |
| part = part.replace("\n", "\\n") | |
| event_stream_response += f'data:{{"id":"{uuid.uuid4()}","object":"chat.completion.chunk","created":{int(time.time())},"model":"{model}","system_fingerprint":"fp_45ah8ld5a7","choices":[{{"index":0,"delta":{{"content":"{part}"}},"logprobs":null,"finish_reason":null}}]}}\n\n' | |
| event_stream_response += "data:[DONE]\n" | |
| return Response(event_stream_response, mimetype="text/event-stream") | |
| if __name__ == "__main__": | |
| app.run(host="0.0.0.0", port=7860) |