import json import os import time import uuid import hashlib import base64 from datetime import datetime from concurrent.futures import ThreadPoolExecutor import requests from flask import Flask, request, jsonify, Response, stream_with_context from flask_cors import CORS from dotenv import load_dotenv # 加载环境变量 load_dotenv() # ==================== 配置管理类 ==================== class Config: """全局配置管理""" # 服务器配置 PORT = int(os.getenv('PORT', 7680)) MAX_WORKERS = int(os.getenv('MAX_WORKERS', 20)) # 认证配置 API_KEY = os.getenv('API_KEY', 'sk-123456') RAYCAST_TOKEN = os.getenv('RAYCAST_TOKEN', None) # Raycast API 配置 RAYCAST_BASE_URL = 'https://backend.raycast.com/api/v1' RAYCAST_CHAT_URL = f'{RAYCAST_BASE_URL}/ai/chat_completions' RAYCAST_FILES_URL = f'{RAYCAST_BASE_URL}/ai/files/' # Raycast 请求头配置 RAYCAST_HEADERS = { 'Content-Type': 'application/json', 'accept-language': 'zh-CN,zh-Hans;q=0.9', 'x-raycast-deviceid': 'c86ec3d4b2c9a66de6d1a19fc5bada76fc15af8f117dc1b69ba993391f0ad531', 'accept-encoding': 'gzip, deflate, br', 'user-agent': 'Raycast/1.0.4/747 (iOS Version 18.2.1 (Build 22C161))', 'Cookie': '__raycast_session=4eb4e28abc9196e140b1980c79b75cdc' } # 系统偏好设置 DEFAULT_SYSTEM_INSTRUCTIONS = f""" The user has the following system preferences: - Locale: en-CN - Timezone: Asia/Shanghai - Current Date: {datetime.now().strftime('%Y-%m-%d')} - Unit Currency: ¥ - Unit Temperature: °C - Unit Length: m - Unit Mass: kg - Decimal Separator: . - Grouping Separator: , Use the system preferences to format your answers accordingly. """ @classmethod def get_raycast_headers(cls, include_auth=True): """获取Raycast请求头""" headers = cls.RAYCAST_HEADERS.copy() if include_auth: headers['authorization'] = f'Bearer {cls.RAYCAST_TOKEN}' return headers # 配置Flask应用 app = Flask(__name__) CORS(app) # 创建线程池 executor = ThreadPoolExecutor(max_workers=Config.MAX_WORKERS) # ==================== 认证装饰器 ==================== def require_auth(f): """认证装饰器""" def decorated_function(*args, **kwargs): auth_header = request.headers.get('Authorization') if not auth_header: return jsonify({ 'error': { 'message': '缺少认证头', 'type': 'authentication_error', 'code': 'missing_authorization' } }), 401 # 检查Bearer token格式 if not auth_header.startswith('Bearer '): return jsonify({ 'error': { 'message': '认证格式错误', 'type': 'authentication_error', 'code': 'invalid_authorization_format' } }), 401 token = auth_header[7:] if token != Config.API_KEY: return jsonify({ 'error': { 'message': '认证失败', 'type': 'authentication_error', 'code': 'invalid_api_key' } }), 401 return f(*args, **kwargs) decorated_function.__name__ = f.__name__ return decorated_function # ==================== 工具类 ==================== class UtilsHelper: @staticmethod def generate_uuid(): return str(uuid.uuid4()) @staticmethod def get_current_timestamp(): return int(time.time()) @staticmethod def generate_md5(data): if isinstance(data, str): data = data.encode('utf-8') return base64.b64encode(hashlib.md5(data).digest()).decode('utf-8') @staticmethod def is_search_model(model): return model.endswith('-search') @staticmethod def get_base_model(model): return model[:-7] if model.endswith('-search') else model # ==================== 模型映射类 ==================== class ModelMapper: BASE_MODELS = { "ray1": "raycast", "ray1-mini": "raycast", "gpt-4.1": "openai", "gpt-4.1-mini": "openai", "gpt-4.1-nano": "openai", "gpt-4": "openai", "gpt-4-turbo": "openai", "gpt-4o": "openai", "gpt-4o-mini": "openai", "o3": "openai_o1", "o4-mini": "openai_o1", "o1-mini": "openai_o1", "o1-2024-12-17": "openai_o1", "o3-mini": "openai_o1", "claude-3-5-haiku-latest": "anthropic", "claude-3-5-sonnet-latest": "anthropic", "claude-3-7-sonnet-latest": "anthropic", "claude-3-7-sonnet-latest-reasoning": "anthropic", "claude-3-opus-20240229": "anthropic", "claude-sonnet-4-20250514": "anthropic", "claude-opus-4-20250514": "anthropic", "claude-sonnet-4-20250514-reasoning": "anthropic", "claude-opus-4-20250514-reasoning": "anthropic", "sonar": "perplexity", "sonar-pro": "perplexity", "sonar-reasoning": "perplexity", "sonar-reasoning-pro": "perplexity", "meta-llama/llama-4-scout-17b-16e-instruct": "groq", "llama-3.3-70b-versatile": "groq", "llama-3.1-8b-instant": "groq", "llama3-70b-8192": "groq", "meta-llama/Meta-Llama-3.1-405B-Instruct-Turbo": "together", "open-mistral-nemo": "mistral", "mistral-large-latest": "mistral", "mistral-medium-latest": "mistral", "mistral-small-latest": "mistral", "codestral-latest": "mistral", "deepseek-r1-distill-llama-70b": "groq", "gemini-2.5-pro-preview-06-05": "google", "gemini-1.5-flash": "google", "gemini-2.5-flash-preview-04-17": "google", "gemini-2.0-flash": "google", "gemini-2.0-flash-thinking-exp-01-21": "google", "deepseek-ai/DeepSeek-R1": "together", "deepseek-ai/DeepSeek-V3": "together", "grok-3-fast-beta": "xai", "grok-3-mini-fast-beta": "xai", "grok-2-latest": "xai" } # 生成完整的模型映射表(包含搜索版本) @classmethod def get_model_map(cls): model_map = cls.BASE_MODELS.copy() # 为每个基础模型添加带搜索功能的版本 for model in cls.BASE_MODELS.keys(): model_map[f"{model}-search"] = cls.BASE_MODELS[model] return model_map @classmethod def get_provider(cls, model): base_model = UtilsHelper.get_base_model(model) return cls.get_model_map().get(base_model, 'google') @classmethod def get_actual_model(cls, model): base_model = UtilsHelper.get_base_model(model) provider = cls.get_provider(model) if provider == 'raycast': return 'gemini-2.5-flash-preview-04-17' else: return base_model @classmethod def get_all_models(cls): return list(cls.get_model_map().keys()) # ==================== 工具功能类 ==================== class ToolsManager: @staticmethod def get_tools(use_search=False): if not use_search: return [] return [ { "name": "search_images", "type": "remote_tool" }, { "name": "web_search", "type": "remote_tool" } ] # ==================== 文件上传类 ==================== class FileUploader: @classmethod def upload_file(cls, file_data): try: filename = file_data['filename'] content = file_data['content'] content_type = file_data['contentType'] buffer = base64.b64decode(content) byte_size = len(buffer) checksum = UtilsHelper.generate_md5(buffer) # 创建文件元数据 create_file_payload = { 'blob': { 'byte_size': byte_size, 'checksum': checksum, 'content_type': content_type, 'filename': filename } } headers = Config.get_raycast_headers() headers['x-raycast-timestamp'] = str(UtilsHelper.get_current_timestamp()) headers['x-request-id'] = UtilsHelper.generate_uuid().upper() create_response = requests.post( Config.RAYCAST_FILES_URL, headers=headers, json=create_file_payload, timeout=30 ) if not create_response.ok: raise Exception(f'文件元数据创建失败: {create_response.status_code}') create_result = create_response.json() upload_url = create_result['direct_upload']['url'] file_id = create_result['id'] # 上传文件 upload_headers = { 'Content-Type': content_type, 'Content-MD5': checksum, 'Content-Length': str(byte_size), 'Content-Disposition': f'inline; filename="{filename}"; filename*=UTF-8\'\'{filename}', 'Upload-Complete': '?1' } upload_response = requests.put( upload_url, headers=upload_headers, data=buffer, timeout=60 ) if not upload_response.ok: raise Exception(f'文件上传失败: {upload_response.status_code}') return { 'id': file_id, 'type': 'file' } except Exception as error: print(f'文件上传错误: {error}') raise error @classmethod def extract_files_from_openai(cls, messages): files = [] for message in messages: if message.get('role') == 'user' and isinstance(message.get('content'), list): for content in message['content']: if content.get('type') == 'image_url' and content.get('image_url'): url = content['image_url']['url'] if url.startswith('data:'): # 处理base64图片 header, data = url.split(',', 1) mime_match = header.split(':')[1].split(';')[0] if ':' in header else 'image/jpeg' content_type = mime_match files.append({ 'filename': f'image_{UtilsHelper.generate_uuid()}.{content_type.split("/")[1]}', 'content': data, 'contentType': content_type }) return files # ==================== 转换类 ==================== class MessageConverter: @classmethod def merge_consecutive_messages(cls, messages): """合并连续相同角色的消息""" if not messages: return messages merged_messages = [] current_message = None for message in messages: role = message.get('role') content = message.get('content', '') # 处理content为list的情况 if isinstance(content, list): content = ''.join([ c.get('text', '') for c in content if c.get('type') == 'text' ]) if current_message is None: # 第一条消息 current_message = { 'role': role, 'content': content } elif current_message['role'] == role: # 相同角色,合并内容 current_message['content'] += '\n' + content else: # 不同角色,保存当前消息并开始新消息 merged_messages.append(current_message) current_message = { 'role': role, 'content': content } # 添加最后一条消息 if current_message: merged_messages.append(current_message) return merged_messages @classmethod def process_system_messages(cls, messages): # 先合并连续相同角色的消息 merged_messages = cls.merge_consecutive_messages(messages) processed_messages = [] additional_system_instructions = '' system_collection_stopped = False for message in merged_messages: if message.get('role') == 'system': if not system_collection_stopped: # 连续的 system 消息收集到 additional_system_instructions if additional_system_instructions: additional_system_instructions += '\n' + message['content'] else: additional_system_instructions = message['content'] else: # 后续的 system 消息转换为 user 消息 processed_messages.append({ 'author': 'user', 'content': { 'references': [], 'text': message['content'] } }) else: # 遇到非 system 消息,停止收集 system 消息 system_collection_stopped = True processed_message = { 'author': 'user' if message.get('role') == 'user' else 'assistant', 'content': { 'references': [], 'text': message['content'] } } processed_messages.append(processed_message) return processed_messages, additional_system_instructions @classmethod def convert_to_raycast_format(cls, openai_request): processed_messages, additional_system_instructions = cls.process_system_messages( openai_request['messages'] ) # 处理文件上传 files = FileUploader.extract_files_from_openai(openai_request['messages']) attachments = [] for file in files: try: uploaded_file = FileUploader.upload_file(file) attachments.append(uploaded_file) except Exception as error: print(f'文件上传失败: {error}') # 如果有附件,添加到最后一个用户消息中 if attachments and processed_messages: last_message = processed_messages[-1] if last_message['author'] == 'user': last_message['content']['attachments'] = attachments actual_model = ModelMapper.get_actual_model(openai_request['model']) provider = ModelMapper.get_provider(openai_request['model']) use_search = UtilsHelper.is_search_model(openai_request['model']) raycast_request = { 'additional_system_instructions': additional_system_instructions or Config.DEFAULT_SYSTEM_INSTRUCTIONS, 'debug': False, 'locale': 'en_CN', 'message_id': UtilsHelper.generate_uuid(), 'messages': processed_messages, 'model': actual_model, 'provider': 'google' if provider == 'raycast' else provider, 'source': 'ai_chat', 'tools': ToolsManager.get_tools(use_search) } return raycast_request # ==================== 响应处理类 ==================== class ResponseProcessor: def __init__(self): self.is_thinking = False self.thinking_content = '' def process_raycast_chunk(self, chunk): content = '' # 处理思考内容 if chunk.get('reasoning'): if not self.is_thinking: # 开始思考 self.is_thinking = True content += '' content += chunk['reasoning'] self.thinking_content += chunk['reasoning'] # 处理普通文本内容 if chunk.get('text'): if self.is_thinking: # 结束思考 content += '' self.is_thinking = False self.thinking_content = '' content += chunk['text'] return content def convert_to_openai_format(self, raycast_chunk, model, is_stream=False): content = self.process_raycast_chunk(raycast_chunk) if is_stream: return { 'id': 'chatcmpl-' + UtilsHelper.generate_uuid(), 'object': 'chat.completion.chunk', 'created': UtilsHelper.get_current_timestamp(), 'model': model, 'choices': [{ 'index': 0, 'delta': { 'content': content }, 'finish_reason': None }] } else: return { 'id': 'chatcmpl-' + UtilsHelper.generate_uuid(), 'object': 'chat.completion', 'created': UtilsHelper.get_current_timestamp(), 'model': model, 'choices': [{ 'index': 0, 'message': { 'role': 'assistant', 'content': content }, 'finish_reason': 'stop' }], 'usage': { 'prompt_tokens': 0, 'completion_tokens': 0, 'total_tokens': 0 } } def finish_thinking(self): if self.is_thinking: self.is_thinking = False return '' return '' # ==================== API服务类 ==================== class RaycastAPIService: @classmethod def send_request(cls, raycast_request): headers = Config.get_raycast_headers() headers['x-raycast-timestamp'] = str(UtilsHelper.get_current_timestamp()) response = requests.post( Config.RAYCAST_CHAT_URL, headers=headers, json=raycast_request, stream=True, timeout=120 ) if not response.ok: error_text = response.text print(f'Raycast API 错误响应: {error_text}') raise Exception(f'Raycast API 响应错误: {response.status_code} {response.reason}') return response # ==================== 处理函数 ==================== def handle_chat_completion(request_data): try: # 转换请求格式 raycast_request = MessageConverter.convert_to_raycast_format(request_data) # 发送请求到 Raycast response = RaycastAPIService.send_request(raycast_request) return response, request_data except Exception as error: print(f'代理错误: {error}') raise error def process_stream_response(response, request_data): processor = ResponseProcessor() def generate(): try: buffer = '' for chunk in response.iter_lines(): chunk = chunk.decode("utf-8").strip() if chunk: buffer += chunk + '\n' lines = buffer.split('\n') buffer = lines.pop() if lines else '' for line in lines: if line.strip(): try: if line.startswith('data: '): data = line[6:] if data == '[DONE]': # 检查是否需要关闭thinking标签 finish_content = processor.finish_thinking() if finish_content: finish_response = processor.convert_to_openai_format( {'text': finish_content}, request_data['model'], True ) yield f"data: {json.dumps(finish_response)}\n\n" yield 'data: [DONE]\n\n' return parsed = json.loads(data) openai_response = processor.convert_to_openai_format( parsed, request_data['model'], True ) yield f"data: {json.dumps(openai_response)}\n\n" except Exception as err: print(f'解析流式响应错误: {err}, 原始行: {line}') yield 'data: [DONE]\n\n' except Exception as err: print(f'流式响应错误: {err}') yield f'data: {json.dumps({"error": "流式响应处理错误"})}\n\n' finally: response.close() return generate() def process_non_stream_response(response, request_data): processor = ResponseProcessor() full_content = '' try: buffer = '' for chunk in response.iter_lines(): chunk = chunk.decode("utf-8").strip() if chunk: buffer += chunk + '\n' lines = buffer.split('\n') buffer = lines.pop() if lines else '' for line in lines: if line.strip(): try: if line.startswith('data: '): data = line[6:] if data == '[DONE]': break # 结束处理 parsed = json.loads(data) content = processor.process_raycast_chunk(parsed) full_content += content except Exception as err: print(f'解析非流式响应错误: {err}, 原始行: {line}') # 确保thinking标签正确关闭 finish_content = processor.finish_thinking() full_content += finish_content return { 'id': 'chatcmpl-' + UtilsHelper.generate_uuid(), 'object': 'chat.completion', 'created': UtilsHelper.get_current_timestamp(), 'model': request_data['model'], 'choices': [{ 'index': 0, 'message': { 'role': 'assistant', 'content': full_content }, 'finish_reason': 'stop' }], 'usage': { 'prompt_tokens': 0, 'completion_tokens': 0, 'total_tokens': 0 } } except Exception as err: print(f'非流式响应错误: {err}') raise err finally: response.close() # ==================== 路由处理 ==================== @app.route('/v1/chat/completions', methods=['POST']) @require_auth def chat_completions(): try: request_data = request.get_json() if not request_data: return jsonify({ 'error': { 'message': '请求数据为空', 'type': 'invalid_request', 'code': 'invalid_request' } }), 400 is_stream = request_data.get('stream', False) # 在线程池中处理请求 future = executor.submit(handle_chat_completion, request_data) response, req_data = future.result() if is_stream: return Response( stream_with_context(process_stream_response(response, req_data)), content_type='text/event-stream', headers={ 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*' } ) else: future = executor.submit(process_non_stream_response, response, req_data) result = future.result() return jsonify(result) except Exception as error: return jsonify({ 'error': { 'message': str(error) or '内部服务器错误', 'type': 'internal_error', 'code': 'internal_error' } }), 500 @app.route('/v1/models', methods=['GET']) def list_models(): models = [ { 'id': model, 'object': 'model', 'created': UtilsHelper.get_current_timestamp(), 'owned_by': 'raycast-proxy' } for model in ModelMapper.get_all_models() ] return jsonify({ 'object': 'list', 'data': models }) @app.route('/health', methods=['GET']) def health_check(): return jsonify({ 'status': 'ok', 'timestamp': datetime.now().isoformat(), 'models_count': len(ModelMapper.get_all_models()), 'config': { 'port': Config.PORT, 'max_workers': Config.MAX_WORKERS, 'auth_required': bool(Config.API_KEY) } }) @app.route('/', methods=['OPTIONS']) @app.route('/v1/chat/completions', methods=['OPTIONS']) @app.route('/v1/models', methods=['OPTIONS']) def handle_options(): return '', 200 if __name__ == '__main__': print(f'🚀 Raycast 代理服务器运行在端口 {Config.PORT}') print(f'🔗 OpenAI 兼容端点: http://localhost:{Config.PORT}/v1/chat/completions') print(f'📜 模型列表: http://localhost:{Config.PORT}/v1/models') print(f'⚡ 最大工作线程数: {Config.MAX_WORKERS}') # 使用支持多线程的WSGI服务器 app.run( host='0.0.0.0', port=Config.PORT, debug=False, threaded=True, processes=1 )