from flask import Flask, request, jsonify, make_response, Response import requests import time import uuid import warnings from waitress import serve import json import tiktoken import socket from mistral_common.tokens.tokenizers.mistral import MistralTokenizer from mistral_common.protocol.instruct.messages import UserMessage from mistral_common.protocol.instruct.request import ChatCompletionRequest from pymemcache.client.base import Client from flask_limiter import Limiter from flask_limiter.util import get_remote_address import os import logging from io import BytesIO import coloredlogs import printedcolors import base64 # Suppress warnings from flask_limiter warnings.filterwarnings("ignore", category=UserWarning, module="flask_limiter.extension") # Create a logger object logger = logging.getLogger("1min-relay") # Install coloredlogs with desired log level coloredlogs.install(level='DEBUG', logger=logger) def check_memcached_connection(host='memcached', port=11211): try: client = Client((host, port)) client.set('test_key', 'test_value') if client.get('test_key') == b'test_value': client.delete('test_key') # Clean up return True else: return False except: return False logger.info(''' _ __ __ _ ___ _ / | \/ (_)_ _ | _ \___| |__ _ _ _ | | |\/| | | ' \| / -_) / _` | || | |_|_| |_|_|_||_|_|_\___|_\__,_|\_, | |__/ ''') def calculate_token(sentence, model="DEFAULT"): """Calculate the number of tokens in a sentence based on the specified model.""" if model.startswith("mistral"): # Initialize the Mistral tokenizer tokenizer = MistralTokenizer.v3(is_tekken=True) model_name = "open-mistral-nemo" # Default to Mistral Nemo tokenizer = MistralTokenizer.from_model(model_name) tokenized = tokenizer.encode_chat_completion( ChatCompletionRequest( messages=[ UserMessage(content=sentence), ], model=model_name, ) ) tokens = tokenized.tokens return len(tokens) elif model in ["gpt-3.5-turbo", "gpt-4"]: # Use OpenAI's tiktoken for GPT models encoding = tiktoken.encoding_for_model(model) tokens = encoding.encode(sentence) return len(tokens) else: # Default to openai encoding = tiktoken.encoding_for_model("gpt-4") tokens = encoding.encode(sentence) return len(tokens) app = Flask(__name__) if check_memcached_connection(): limiter = Limiter( get_remote_address, app=app, storage_uri="memcached://memcached:11211", # Connect to Memcached created with docker ) else: # Used for ratelimiting without memcached limiter = Limiter( get_remote_address, app=app, ) logger.warning("Memcached is not available. Using in-memory storage for rate limiting. Not-Recommended") ONE_MIN_API_URL = "https://api.1min.ai/api/features" ONE_MIN_CONVERSATION_API_URL = "https://api.1min.ai/api/conversations" ONE_MIN_CONVERSATION_API_STREAMING_URL = "https://api.1min.ai/api/features?isStreaming=true" ONE_MIN_ASSET_URL = "https://api.1min.ai/api/assets" # Define the models that are available for use ALL_ONE_MIN_AVAILABLE_MODELS = [ "deepseek-chat", "deepseek-reasoner", "o1-preview", "o1-mini", "gpt-4o-mini", "gpt-4o", "gpt-4-turbo", "gpt-4", "gpt-3.5-turbo", "claude-instant-1.2", "claude-2.1", "claude-3-7-sonnet-20250219", "claude-3-5-sonnet-20240620", "claude-3-opus-20240229", "claude-3-sonnet-20240229", "claude-3-haiku-20240307", "gemini-1.0-pro", "gemini-1.5-pro", "gemini-1.5-flash", "mistral-large-latest", "mistral-small-latest", "mistral-nemo", "open-mistral-7b", # Replicate "meta/llama-2-70b-chat", "meta/meta-llama-3-70b-instruct", "meta/meta-llama-3.1-405b-instruct", "command" ] # Define the models that support vision inputs vision_supported_models = [ "gpt-4o", "gpt-4o-mini", "gpt-4-turbo" ] # Default values SUBSET_OF_ONE_MIN_PERMITTED_MODELS = ["mistral-nemo", "gpt-4o", "deepseek-chat"] PERMIT_MODELS_FROM_SUBSET_ONLY = False # Read environment variables one_min_models_env = os.getenv("SUBSET_OF_ONE_MIN_PERMITTED_MODELS") # e.g. "mistral-nemo,gpt-4o,deepseek-chat" permit_not_in_available_env = os.getenv("PERMIT_MODELS_FROM_SUBSET_ONLY") # e.g. "True" or "False" # Parse or fall back to defaults if one_min_models_env: SUBSET_OF_ONE_MIN_PERMITTED_MODELS = one_min_models_env.split(",") if permit_not_in_available_env and permit_not_in_available_env.lower() == "true": PERMIT_MODELS_FROM_SUBSET_ONLY = True # Combine into a single list AVAILABLE_MODELS = [] AVAILABLE_MODELS.extend(SUBSET_OF_ONE_MIN_PERMITTED_MODELS) @app.route('/', methods=['GET', 'POST']) def index(): if request.method == 'POST': return ERROR_HANDLER(1212) if request.method == 'GET': internal_ip = socket.gethostbyname(socket.gethostname()) return "Congratulations! Your API is working! You can now make requests to the API.\n\nEndpoint: " + internal_ip + ':5001/v1' @app.route('/v1/models') @limiter.limit("500 per minute") def models(): # Dynamically create the list of models with additional fields models_data = [] if not PERMIT_MODELS_FROM_SUBSET_ONLY: one_min_models_data = [ { "id": model_name, "object": "model", "owned_by": "1minai", "created": 1727389042 } for model_name in ALL_ONE_MIN_AVAILABLE_MODELS ] else: one_min_models_data = [ {"id": model_name, "object": "model", "owned_by": "1minai", "created": 1727389042} for model_name in SUBSET_OF_ONE_MIN_PERMITTED_MODELS ] models_data.extend(one_min_models_data) return jsonify({"data": models_data, "object": "list"}) def ERROR_HANDLER(code, model=None, key=None): # Handle errors in OpenAI-Structued Error error_codes = { # Internal Error Codes 1002: {"message": f"The model {model} does not exist.", "type": "invalid_request_error", "param": None, "code": "model_not_found", "http_code": 400}, 1020: {"message": f"Incorrect API key provided: {key}. You can find your API key at https://app.1min.ai/api.", "type": "authentication_error", "param": None, "code": "invalid_api_key", "http_code": 401}, 1021: {"message": "Invalid Authentication", "type": "invalid_request_error", "param": None, "code": None, "http_code": 401}, 1212: {"message": f"Incorrect Endpoint. Please use the /v1/chat/completions endpoint.", "type": "invalid_request_error", "param": None, "code": "model_not_supported", "http_code": 400}, 1044: {"message": f"This model does not support image inputs.", "type": "invalid_request_error", "param": None, "code": "model_not_supported", "http_code": 400}, 1412: {"message": f"No message provided.", "type": "invalid_request_error", "param": "messages", "code": "invalid_request_error", "http_code": 400}, 1423: {"message": f"No content in last message.", "type": "invalid_request_error", "param": "messages", "code": "invalid_request_error", "http_code": 400}, } error_data = {k: v for k, v in error_codes.get(code, {"message": "Unknown error", "type": "unknown_error", "param": None, "code": None}).items() if k != "http_code"} # Remove http_code from the error data logger.error(f"An error has occurred while processing the user's request. Error code: {code}") return jsonify({"error": error_data}), error_codes.get(code, {}).get("http_code", 400) # Return the error data without http_code inside the payload and get the http_code to return. def format_conversation_history(messages, new_input): """ Formats the conversation history into a structured string. Args: messages (list): List of message dictionaries from the request new_input (str): The new user input message Returns: str: Formatted conversation history """ formatted_history = ["Conversation History:\n"] for message in messages: role = message.get('role', '').capitalize() content = message.get('content', '') # Handle potential list content if isinstance(content, list): content = '\n'.join(item['text'] for item in content if 'text' in item) formatted_history.append(f"{role}: {content}") # Append additional messages only if there are existing messages if messages: # Save credits if it is the first message. formatted_history.append("Respond like normal. The conversation history will be automatically updated on the next MESSAGE. DO NOT ADD User: or Assistant: to your output. Just respond like normal.") formatted_history.append("User Message:\n") formatted_history.append(new_input) return '\n'.join(formatted_history) @app.route('/v1/chat/completions', methods=['POST', 'OPTIONS']) @limiter.limit("500 per minute") def conversation(): if request.method == 'OPTIONS': return handle_options_request() image = False auth_header = request.headers.get('Authorization') if not auth_header or not auth_header.startswith("Bearer "): logger.error("Invalid Authentication") return ERROR_HANDLER(1021) api_key = auth_header.split(" ")[1] headers = { 'API-KEY': api_key } request_data = request.json all_messages = format_conversation_history(request_data.get('messages', []), request_data.get('new_input', '')) messages = request_data.get('messages', []) if not messages: return ERROR_HANDLER(1412) user_input = messages[-1].get('content') if not user_input: return ERROR_HANDLER(1423) # Check if user_input is a list and combine text if necessary image = False if isinstance(user_input, list): image_paths = [] for item in user_input: if 'text' in item: combined_text = '\n'.join(item['text']) try: if 'image_url' in item: if request_data.get('model', 'mistral-nemo') not in vision_supported_models: return ERROR_HANDLER(1044, request_data.get('model', 'mistral-nemo')) if item['image_url']['url'].startswith("data:image/png;base64,"): base64_image = item['image_url']['url'].split(",")[1] binary_data = base64.b64decode(base64_image) else: binary_data = requests.get(item['image_url']['url']) binary_data.raise_for_status() # Raise an error for bad responses binary_data = BytesIO(binary_data.content) files = { 'asset': ("relay" + str(uuid.uuid4()), binary_data, 'image/png') } asset = requests.post(ONE_MIN_ASSET_URL, files=files, headers=headers) asset.raise_for_status() # Raise an error for bad responses image_path = asset.json()['fileContent']['path'] image_paths.append(image_path) image = True except Exception as e: print(f"An error occurred e:" + str(e)[:60]) # Optionally log the error or return an appropriate response user_input = str(combined_text) prompt_token = calculate_token(str(all_messages)) if PERMIT_MODELS_FROM_SUBSET_ONLY and request_data.get('model', 'mistral-nemo') not in AVAILABLE_MODELS: return ERROR_HANDLER(1002, request_data.get('model', 'mistral-nemo')) # Handle invalid model logger.debug(f"Proccessing {prompt_token} prompt tokens with model {request_data.get('model', 'mistral-nemo')}") if not image: payload = { "type": "CHAT_WITH_AI", "model": request_data.get('model', 'mistral-nemo'), "promptObject": { "prompt": all_messages, "isMixed": False, "webSearch": False } } else: payload = { "type": "CHAT_WITH_IMAGE", "model": request_data.get('model', 'mistral-nemo'), "promptObject": { "prompt": all_messages, "isMixed": False, "imageList": image_paths } } headers = {"API-KEY": d785d7c21da4971f180e19295f278e34b7928539d93ccb283d2e98c02561337a, 'Content-Type': 'application/json'} if not request_data.get('stream', False): # Non-Streaming Response logger.debug("Non-Streaming AI Response") response = requests.post(ONE_MIN_API_URL, json=payload, headers=headers) response.raise_for_status() one_min_response = response.json() transformed_response = transform_response(one_min_response, request_data, prompt_token) response = make_response(jsonify(transformed_response)) set_response_headers(response) return response, 200 else: # Streaming Response logger.debug("Streaming AI Response") response_stream = requests.post(ONE_MIN_CONVERSATION_API_STREAMING_URL, data=json.dumps(payload), headers=headers, stream=True) if response_stream.status_code != 200: if response_stream.status_code == 401: return ERROR_HANDLER(1020) logger.error(f"An unknown error occurred while processing the user's request. Error code: {response_stream.status_code}") return ERROR_HANDLER(response_stream.status_code) return Response(stream_response(response_stream, request_data, request_data.get('model', 'mistral-nemo'), int(prompt_token)), content_type='text/event-stream') def handle_options_request(): response = make_response() response.headers.add('Access-Control-Allow-Origin', '*') response.headers.add('Access-Control-Allow-Headers', 'Content-Type,Authorization') response.headers.add('Access-Control-Allow-Methods', 'POST, OPTIONS') return response, 204 def transform_response(one_min_response, request_data, prompt_token): completion_token = calculate_token(one_min_response['aiRecord']["aiRecordDetail"]["resultObject"][0]) logger.debug(f"Finished processing Non-Streaming response. Completion tokens: {str(completion_token)}") logger.debug(f"Total tokens: {str(completion_token + prompt_token)}") return { "id": f"chatcmpl-{uuid.uuid4()}", "object": "chat.completion", "created": int(time.time()), "model": request_data.get('model', 'mistral-nemo'), "choices": [ { "index": 0, "message": { "role": "assistant", "content": one_min_response['aiRecord']["aiRecordDetail"]["resultObject"][0], }, "finish_reason": "stop" } ], "usage": { "prompt_tokens": prompt_token, "completion_tokens": completion_token, "total_tokens": prompt_token + completion_token } } def set_response_headers(response): response.headers['Content-Type'] = 'application/json' response.headers['Access -Control-Allow-Origin'] = '*' response.headers['X-Request-ID'] = str (uuid.uuid4()) def stream_response(response, request_data, model, prompt_tokens): all_chunks = "" for chunk in response.iter_content(chunk_size=1024): finish_reason = None return_chunk = { "id": f"chatcmpl-{uuid.uuid4()}", "object": "chat.completion.chunk", "created": int(time.time()), "model": request_data.get('model', 'mistral-nemo'), "choices": [ { "index": 0, "delta": { "content": chunk.decode('utf-8') }, "finish_reason": finish_reason } ] } all_chunks += chunk.decode('utf-8') yield f"data: {json.dumps(return_chunk)}\n\n" tokens = calculate_token(all_chunks) logger.debug(f"Finished processing streaming response. Completion tokens: {str(tokens)}") logger.debug(f"Total tokens: {str(tokens + prompt_tokens)}") # Final chunk when iteration stops final_chunk = { "id": f"chatcmpl-{uuid.uuid4()}", "object": "chat.completion.chunk", "created": int(time.time()), "model": request_data.get('model', 'mistral-nemo'), "choices": [ { "index": 0, "delta": { "content": "" }, "finish_reason": "stop" } ], "usage": { "prompt_tokens": prompt_tokens, "completion_tokens": tokens, "total_tokens": tokens + prompt_tokens } } yield f"data: {json.dumps(final_chunk)}\n\n" yield "data: [DONE]\n\n" if __name__ == '__main__': internal_ip = socket.gethostbyname(socket.gethostname()) response = requests.get('https://api.ipify.org') public_ip = response.text logger.info(f"""{printedcolors.Color.fg.lightcyan} Server is ready to serve at: Internal IP: {internal_ip}:5001 Public IP: {public_ip} (only if you've setup port forwarding on your router.) Enter this url to OpenAI clients supporting custom endpoint: {internal_ip}:5001/v1 If does not work, try: {internal_ip}:5001/v1/chat/completions {printedcolors.Color.reset}""") serve(app, host='0.0.0.0', port=5001, threads=6) # Thread has a default of 4 if not specified. We use 6 to increase performance and allow multiple requests at once.