|
from datetime import datetime |
|
import json |
|
import uuid |
|
import asyncio |
|
import random |
|
import string |
|
import time |
|
import re |
|
from typing import Any, Dict, Optional |
|
|
|
import os |
|
import requests |
|
import httpx |
|
from fastapi import HTTPException |
|
from api.models import ChatRequest |
|
from api.logger import setup_logger |
|
|
|
logger = setup_logger(__name__) |
|
|
|
|
|
from dotenv import load_dotenv |
|
|
|
load_dotenv() |
|
|
|
|
|
BASE_URL = "https://www.blackbox.ai" |
|
|
|
APP_SECRET = os.getenv("APP_SECRET") |
|
|
|
|
|
ALLOWED_MODELS = [ |
|
{"id": "blackboxai", "name": "blackboxai"}, |
|
{"id": "blackboxai-pro", "name": "blackboxai-pro"}, |
|
{"id": "flux", "name": "flux"}, |
|
{"id": "llama-3.1-8b", "name": "llama-3.1-8b"}, |
|
{"id": "llama-3.1-70b", "name": "llama-3.1-70b"}, |
|
{"id": "llama-3.1-405b", "name": "llama-3.1-405"}, |
|
{"id": "gpt-4o", "name": "gpt-4o"}, |
|
{"id": "gemini-pro", "name": "gemini-pro"}, |
|
{"id": "gemini-1.5-flash", "name": "gemini-1.5-flash"}, |
|
{"id": "claude-sonnet-3.5", "name": "claude-sonnet-3.5"}, |
|
{"id": "PythonAgent", "name": "PythonAgent"}, |
|
{"id": "JavaAgent", "name": "JavaAgent"}, |
|
{"id": "JavaScriptAgent", "name": "JavaScriptAgent"}, |
|
{"id": "HTMLAgent", "name": "HTMLAgent"}, |
|
{"id": "GoogleCloudAgent", "name": "GoogleCloudAgent"}, |
|
{"id": "AndroidDeveloper", "name": "AndroidDeveloper"}, |
|
{"id": "SwiftDeveloper", "name": "SwiftDeveloper"}, |
|
{"id": "Next.jsAgent", "name": "Next.jsAgent"}, |
|
{"id": "MongoDBAgent", "name": "MongoDBAgent"}, |
|
{"id": "PyTorchAgent", "name": "PyTorchAgent"}, |
|
{"id": "ReactAgent", "name": "ReactAgent"}, |
|
{"id": "XcodeAgent", "name": "XcodeAgent"}, |
|
{"id": "AngularJSAgent", "name": "AngularJSAgent"}, |
|
{"id": "HerokuAgent", "name": "HerokuAgent"}, |
|
{"id": "GodotAgent", "name": "GodotAgent"}, |
|
{"id": "GoAgent", "name": "GoAgent"}, |
|
{"id": "GitlabAgent", "name": "GitlabAgent"}, |
|
{"id": "GitAgent", "name": "GitAgent"}, |
|
{"id": "RepoMap", "name": "RepoMap"}, |
|
{"id": "gemini-1.5-pro-latest", "name": "gemini-pro"}, |
|
{"id": "gemini-1.5-pro", "name": "gemini-1.5-pro"}, |
|
{"id": "claude-3-5-sonnet-20240620", "name": "claude-sonnet-3.5"}, |
|
{"id": "claude-3-5-sonnet", "name": "claude-sonnet-3.5"}, |
|
{"id": "Niansuh", "name": "Niansuh"}, |
|
{"id": "o1-preview", "name": "o1-preview"}, |
|
{"id": "claude-3-5-sonnet-20241022", "name": "claude-3-5-sonnet-20241022"}, |
|
{"id": "claude-3-5-sonnet-x", "name": "claude-3-5-sonnet-x"}, |
|
|
|
{"id": "FlaskAgent", "name": "FlaskAgent"}, |
|
{"id": "FirebaseAgent", "name": "FirebaseAgent"}, |
|
{"id": "FastAPIAgent", "name": "FastAPIAgent"}, |
|
{"id": "ErlangAgent", "name": "ErlangAgent"}, |
|
{"id": "ElectronAgent", "name": "ElectronAgent"}, |
|
{"id": "DockerAgent", "name": "DockerAgent"}, |
|
{"id": "DigitalOceanAgent", "name": "DigitalOceanAgent"}, |
|
{"id": "BitbucketAgent", "name": "BitbucketAgent"}, |
|
{"id": "AzureAgent", "name": "AzureAgent"}, |
|
{"id": "FlutterAgent", "name": "FlutterAgent"}, |
|
{"id": "YoutubeAgent", "name": "YoutubeAgent"}, |
|
{"id": "builderAgent", "name": "builderAgent"}, |
|
] |
|
|
|
|
|
MODEL_MAPPING = { |
|
"blackboxai": "blackboxai", |
|
"blackboxai-pro": "blackboxai-pro", |
|
"flux": "flux", |
|
"ImageGeneration": "flux", |
|
"llama-3.1-8b": "llama-3.1-8b", |
|
"llama-3.1-70b": "llama-3.1-70b", |
|
"llama-3.1-405b": "llama-3.1-405", |
|
"gpt-4o": "gpt-4o", |
|
"gemini-pro": "gemini-pro", |
|
"gemini-1.5-flash": "gemini-1.5-flash", |
|
"claude-sonnet-3.5": "claude-sonnet-3.5", |
|
"PythonAgent": "PythonAgent", |
|
"JavaAgent": "JavaAgent", |
|
"JavaScriptAgent": "JavaScriptAgent", |
|
"HTMLAgent": "HTMLAgent", |
|
"GoogleCloudAgent": "GoogleCloudAgent", |
|
"AndroidDeveloper": "AndroidDeveloper", |
|
"SwiftDeveloper": "SwiftDeveloper", |
|
"Next.jsAgent": "Next.jsAgent", |
|
"MongoDBAgent": "MongoDBAgent", |
|
"PyTorchAgent": "PyTorchAgent", |
|
"ReactAgent": "ReactAgent", |
|
"XcodeAgent": "XcodeAgent", |
|
"AngularJSAgent": "AngularJSAgent", |
|
"HerokuAgent": "HerokuAgent", |
|
"GodotAgent": "GodotAgent", |
|
"GoAgent": "GoAgent", |
|
"GitlabAgent": "GitlabAgent", |
|
"GitAgent": "GitAgent", |
|
"RepoMap": "RepoMap", |
|
|
|
"gemini-flash": "gemini-1.5-flash", |
|
"claude-3.5-sonnet": "claude-sonnet-3.5", |
|
"gemini-1.5-pro-latest": "gemini-pro", |
|
"gemini-1.5-pro": "gemini-1.5-pro", |
|
"claude-3-5-sonnet-20240620": "claude-sonnet-3.5", |
|
"claude-3-5-sonnet": "claude-sonnet-3.5", |
|
"Niansuh": "Niansuh", |
|
"o1-preview": "o1-preview", |
|
"claude-3-5-sonnet-20241022": "claude-3-5-sonnet-20241022", |
|
"claude-3-5-sonnet-x": "claude-3-5-sonnet-x", |
|
|
|
"FlaskAgent": "FlaskAgent", |
|
"FirebaseAgent": "FirebaseAgent", |
|
"FastAPIAgent": "FastAPIAgent", |
|
"ErlangAgent": "ErlangAgent", |
|
"ElectronAgent": "ElectronAgent", |
|
"DockerAgent": "DockerAgent", |
|
"DigitalOceanAgent": "DigitalOceanAgent", |
|
"BitbucketAgent": "BitbucketAgent", |
|
"AzureAgent": "AzureAgent", |
|
"FlutterAgent": "FlutterAgent", |
|
"YoutubeAgent": "YoutubeAgent", |
|
"builderAgent": "builderAgent", |
|
} |
|
|
|
|
|
AGENT_MODE = { |
|
'flux': {'mode': True, 'id': "ImageGenerationLV45LJp", 'name': "flux"}, |
|
'Niansuh': {'mode': True, 'id': "NiansuhAIk1HgESy", 'name': "Niansuh"}, |
|
'o1-preview': {'mode': True, 'id': "o1Dst8La8", 'name': "o1-preview"}, |
|
'claude-3-5-sonnet-20241022': {'mode': True, 'id': "Claude-Sonnet-3.5zO2HZSF", 'name': "claude-3-5-sonnet-20241022"}, |
|
'claude-3-5-sonnet-x': {'mode': True, 'id': "Claude-Sonnet-3.52022JE0UdQ3", 'name': "claude-3-5-sonnet-x"}, |
|
} |
|
|
|
|
|
TRENDING_AGENT_MODE = { |
|
"blackboxai": {}, |
|
"gemini-1.5-flash": {'mode': True, 'id': 'Gemini'}, |
|
"llama-3.1-8b": {'mode': True, 'id': "llama-3.1-8b"}, |
|
'llama-3.1-70b': {'mode': True, 'id': "llama-3.1-70b"}, |
|
'llama-3.1-405b': {'mode': True, 'id': "llama-3.1-405"}, |
|
'blackboxai-pro': {'mode': True, 'id': "BLACKBOXAI-PRO"}, |
|
'PythonAgent': {'mode': True, 'id': "Python Agent"}, |
|
'JavaAgent': {'mode': True, 'id': "Java Agent"}, |
|
'JavaScriptAgent': {'mode': True, 'id': "JavaScript Agent"}, |
|
'HTMLAgent': {'mode': True, 'id': "HTML Agent"}, |
|
'GoogleCloudAgent': {'mode': True, 'id': "Google Cloud Agent"}, |
|
'AndroidDeveloper': {'mode': True, 'id': "Android Developer"}, |
|
'SwiftDeveloper': {'mode': True, 'id': "Swift Developer"}, |
|
'Next.jsAgent': {'mode': True, 'id': "Next.js Agent"}, |
|
'MongoDBAgent': {'mode': True, 'id': "MongoDB Agent"}, |
|
'PyTorchAgent': {'mode': True, 'id': "PyTorch Agent"}, |
|
'ReactAgent': {'mode': True, 'id': "React Agent"}, |
|
'XcodeAgent': {'mode': True, 'id': "Xcode Agent"}, |
|
'AngularJSAgent': {'mode': True, 'id': "AngularJS Agent"}, |
|
'HerokuAgent': {'mode': True, 'id': "HerokuAgent"}, |
|
'GodotAgent': {'mode': True, 'id': "GodotAgent"}, |
|
'GoAgent': {'mode': True, 'id': "GoAgent"}, |
|
'GitlabAgent': {'mode': True, 'id': "GitlabAgent"}, |
|
'GitAgent': {'mode': True, 'id': "GitAgent"}, |
|
'RepoMap': {'mode': True, 'id': "repomap"}, |
|
|
|
'FlaskAgent': {'mode': True, 'id': "FlaskAgent"}, |
|
'FirebaseAgent': {'mode': True, 'id': "FirebaseAgent"}, |
|
'FastAPIAgent': {'mode': True, 'id': "FastAPIAgent"}, |
|
'ErlangAgent': {'mode': True, 'id': "ErlangAgent"}, |
|
'ElectronAgent': {'mode': True, 'id': "ElectronAgent"}, |
|
'DockerAgent': {'mode': True, 'id': "DockerAgent"}, |
|
'DigitalOceanAgent': {'mode': True, 'id': "DigitalOceanAgent"}, |
|
'BitbucketAgent': {'mode': True, 'id': "BitbucketAgent"}, |
|
'AzureAgent': {'mode': True, 'id': "AzureAgent"}, |
|
'FlutterAgent': {'mode': True, 'id': "FlutterAgent"}, |
|
'YoutubeAgent': {'mode': True, 'id': "YoutubeAgent"}, |
|
'builderAgent': {'mode': True, 'id': "builderAgent"}, |
|
} |
|
|
|
|
|
MODEL_PREFIXES = { |
|
'gpt-4o': '@GPT-4o', |
|
'gemini-pro': '@Gemini-PRO', |
|
'PythonAgent': '@Python Agent', |
|
'JavaAgent': '@Java Agent', |
|
'JavaScriptAgent': '@JavaScript Agent', |
|
'HTMLAgent': '@HTML Agent', |
|
'GoogleCloudAgent': '@Google Cloud Agent', |
|
'AndroidDeveloper': '@Android Developer', |
|
'SwiftDeveloper': '@Swift Developer', |
|
'Next.jsAgent': '@Next.js Agent', |
|
'MongoDBAgent': '@MongoDB Agent', |
|
'PyTorchAgent': '@PyTorch Agent', |
|
'ReactAgent': '@React Agent', |
|
'XcodeAgent': '@Xcode Agent', |
|
'AngularJSAgent': '@AngularJS Agent', |
|
'HerokuAgent': '@Heroku Agent', |
|
'GodotAgent': '@Godot Agent', |
|
'GoAgent': '@Go Agent', |
|
'GitlabAgent': '@Gitlab Agent', |
|
'GitAgent': '@Gitlab Agent', |
|
'blackboxai-pro': '@BLACKBOXAI-PRO', |
|
'flux': '@Image Generation', |
|
|
|
'FlaskAgent': '@Flask Agent', |
|
'FirebaseAgent': '@Firebase Agent', |
|
'FastAPIAgent': '@FastAPI Agent', |
|
'ErlangAgent': '@Erlang Agent', |
|
'ElectronAgent': '@Electron Agent', |
|
'DockerAgent': '@Docker Agent', |
|
'DigitalOceanAgent': '@DigitalOcean Agent', |
|
'BitbucketAgent': '@Bitbucket Agent', |
|
'AzureAgent': '@Azure Agent', |
|
'FlutterAgent': '@Flutter Agent', |
|
'YoutubeAgent': '@Youtube Agent', |
|
'builderAgent': '@builder Agent', |
|
} |
|
|
|
|
|
cached_hid = None |
|
cache_time = 0 |
|
CACHE_DURATION = 36000 |
|
|
|
|
|
headers = { |
|
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) ' |
|
'AppleWebKit/537.36 (KHTML, like Gecko) ' |
|
'Chrome/130.0.0.0 Safari/537.36', |
|
'Content-Type': 'application/json', |
|
} |
|
|
|
def getHid(force_refresh=False): |
|
global cached_hid, cache_time |
|
current_time = time.time() |
|
|
|
|
|
if not force_refresh and cached_hid and (current_time - cache_time) < CACHE_DURATION: |
|
logger.info(f"Using cached_hid: {cached_hid}") |
|
return cached_hid |
|
|
|
try: |
|
|
|
response = requests.get(BASE_URL, headers=headers) |
|
response.raise_for_status() |
|
content = response.text |
|
|
|
|
|
pattern = r"static/chunks/app/layout-[a-zA-Z0-9]+\.js" |
|
match = re.search(pattern, content) |
|
|
|
if match: |
|
|
|
js_path = match.group() |
|
full_url = f"{BASE_URL}/_next/{js_path}" |
|
|
|
|
|
js_response = requests.get(full_url, headers=headers) |
|
js_response.raise_for_status() |
|
|
|
|
|
h_pattern = r'h="([0-9a-f-]+)"' |
|
h_match = re.search(h_pattern, js_response.text) |
|
|
|
if h_match: |
|
h_value = h_match.group(1) |
|
logger.info(f"Found h-value: {h_value}") |
|
|
|
cached_hid = h_value |
|
cache_time = current_time |
|
return h_value |
|
else: |
|
logger.error("h-value not found in JS content") |
|
return None |
|
else: |
|
logger.error("Specified JS file path not found in HTML content") |
|
return None |
|
except requests.exceptions.RequestException as e: |
|
logger.error(f"An error occurred: {e}") |
|
return None |
|
|
|
|
|
def generate_chat_id(length: int = 7) -> str: |
|
characters = string.ascii_letters + string.digits |
|
return ''.join(random.choices(characters, k=length)) |
|
|
|
|
|
def create_chat_completion_data( |
|
content: str, model: str, timestamp: int, finish_reason: Optional[str] = None |
|
) -> Dict[str, Any]: |
|
return { |
|
"id": f"chatcmpl-{uuid.uuid4()}", |
|
"object": "chat.completion.chunk", |
|
"created": timestamp, |
|
"model": model, |
|
"choices": [ |
|
{ |
|
"index": 0, |
|
"delta": {"content": content, "role": "assistant"}, |
|
"finish_reason": finish_reason, |
|
} |
|
], |
|
"usage": None, |
|
} |
|
|
|
|
|
def message_to_dict(message, model_prefix: Optional[str] = None): |
|
if isinstance(message.content, str): |
|
content = message.content |
|
elif isinstance(message.content, list): |
|
content = message.content[0]["text"] |
|
else: |
|
content = message.content |
|
|
|
if model_prefix: |
|
content = f"{model_prefix} {content}" |
|
if isinstance(message.content, list) and len(message.content) == 2 and "image_url" in message.content[1]: |
|
|
|
return { |
|
"role": message.role, |
|
"content": content, |
|
"data": { |
|
"imageBase64": message.content[1]["image_url"]["url"], |
|
"fileText": "", |
|
"title": "snapshot", |
|
}, |
|
} |
|
return {"role": message.role, "content": content} |
|
|
|
|
|
def strip_model_prefix(content: str, model_prefix: Optional[str] = None) -> str: |
|
"""Remove the model prefix from the response content if present.""" |
|
if model_prefix and content.startswith(model_prefix): |
|
logger.debug(f"Stripping prefix '{model_prefix}' from content.") |
|
return content[len(model_prefix):].strip() |
|
return content |
|
|
|
|
|
async def process_streaming_response(request: ChatRequest): |
|
chat_id = generate_chat_id() |
|
referer_url = BASE_URL |
|
logger.info(f"Generated Chat ID: {chat_id} - Model: {request.model} - URL: {referer_url}") |
|
|
|
agent_mode = AGENT_MODE.get(request.model, {}) |
|
trending_agent_mode = TRENDING_AGENT_MODE.get(request.model, {}) |
|
model_prefix = MODEL_PREFIXES.get(request.model, "") |
|
|
|
headers_api_chat = headers.copy() |
|
headers_api_chat['Referer'] = referer_url |
|
|
|
if request.model == 'o1-preview': |
|
delay_seconds = random.randint(1, 60) |
|
logger.info(f"Introducing a delay of {delay_seconds} seconds for model 'o1-preview' (Chat ID: {chat_id})") |
|
await asyncio.sleep(delay_seconds) |
|
|
|
json_data = { |
|
"agentMode": agent_mode, |
|
"clickedAnswer2": False, |
|
"clickedAnswer3": False, |
|
"clickedForceWebSearch": False, |
|
"codeModelMode": True, |
|
"githubToken": None, |
|
"id": chat_id, |
|
"isChromeExt": False, |
|
"isMicMode": False, |
|
"maxTokens": request.max_tokens, |
|
"messages": [message_to_dict(msg, model_prefix=model_prefix) for msg in request.messages], |
|
"mobileClient": False, |
|
"playgroundTemperature": request.temperature, |
|
"playgroundTopP": request.top_p, |
|
"previewToken": None, |
|
"trendingAgentMode": trending_agent_mode, |
|
"userId": None, |
|
"userSelectedModel": MODEL_MAPPING.get(request.model, request.model), |
|
"userSystemPrompt": None, |
|
"validated": getHid(), |
|
"visitFromDelta": False, |
|
} |
|
|
|
async with httpx.AsyncClient() as client: |
|
try: |
|
async with client.stream( |
|
"POST", |
|
f"{BASE_URL}/api/chat", |
|
headers=headers_api_chat, |
|
json=json_data, |
|
timeout=100, |
|
) as response: |
|
response.raise_for_status() |
|
async for line in response.aiter_lines(): |
|
timestamp = int(datetime.now().timestamp()) |
|
if line: |
|
content = line |
|
if "https://www.blackbox.ai" in content: |
|
getHid(True) |
|
content = "hid已刷新,重新对话即可\n" |
|
yield f"data: {json.dumps(create_chat_completion_data(content, request.model, timestamp))}\n\n" |
|
break |
|
if content.startswith("$@$v=undefined-rv1$@$"): |
|
content = content[21:] |
|
cleaned_content = strip_model_prefix(content, model_prefix) |
|
yield f"data: {json.dumps(create_chat_completion_data(cleaned_content, request.model, timestamp))}\n\n" |
|
|
|
yield f"data: {json.dumps(create_chat_completion_data('', request.model, timestamp, 'stop'))}\n\n" |
|
yield "data: [DONE]\n\n" |
|
except httpx.HTTPStatusError as e: |
|
logger.error(f"HTTP error occurred for Chat ID {chat_id}: {e}") |
|
raise HTTPException(status_code=e.response.status_code, detail=str(e)) |
|
except httpx.RequestError as e: |
|
logger.error(f"Error occurred during request for Chat ID {chat_id}: {e}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
|
|
async def process_non_streaming_response(request: ChatRequest): |
|
chat_id = generate_chat_id() |
|
referer_url = BASE_URL |
|
logger.info(f"Generated Chat ID: {chat_id} - Model: {request.model} - URL: {referer_url}") |
|
|
|
agent_mode = AGENT_MODE.get(request.model, {}) |
|
trending_agent_mode = TRENDING_AGENT_MODE.get(request.model, {}) |
|
model_prefix = MODEL_PREFIXES.get(request.model, "") |
|
|
|
headers_api_chat = headers.copy() |
|
headers_api_chat['Referer'] = referer_url |
|
|
|
if request.model == 'o1-preview': |
|
delay_seconds = random.randint(20, 60) |
|
logger.info(f"Introducing a delay of {delay_seconds} seconds for model 'o1-preview' (Chat ID: {chat_id})") |
|
await asyncio.sleep(delay_seconds) |
|
|
|
json_data = { |
|
"agentMode": agent_mode, |
|
"clickedAnswer2": False, |
|
"clickedAnswer3": False, |
|
"clickedForceWebSearch": False, |
|
"codeModelMode": True, |
|
"githubToken": None, |
|
"id": chat_id, |
|
"isChromeExt": False, |
|
"isMicMode": False, |
|
"maxTokens": request.max_tokens, |
|
"messages": [message_to_dict(msg, model_prefix=model_prefix) for msg in request.messages], |
|
"mobileClient": False, |
|
"playgroundTemperature": request.temperature, |
|
"playgroundTopP": request.top_p, |
|
"previewToken": None, |
|
"trendingAgentMode": trending_agent_mode, |
|
"userId": None, |
|
"userSelectedModel": MODEL_MAPPING.get(request.model, request.model), |
|
"userSystemPrompt": None, |
|
"validated": getHid(), |
|
"visitFromDelta": False, |
|
} |
|
|
|
full_response = "" |
|
async with httpx.AsyncClient() as client: |
|
try: |
|
async with client.stream( |
|
method="POST", url=f"{BASE_URL}/api/chat", headers=headers_api_chat, json=json_data |
|
) as response: |
|
response.raise_for_status() |
|
async for chunk in response.aiter_text(): |
|
full_response += chunk |
|
except httpx.HTTPStatusError as e: |
|
logger.error(f"HTTP error occurred for Chat ID {chat_id}: {e}") |
|
raise HTTPException(status_code=e.response.status_code, detail=str(e)) |
|
except httpx.RequestError as e: |
|
logger.error(f"Error occurred during request for Chat ID {chat_id}: {e}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
if "https://www.blackbox.ai" in full_response: |
|
getHid(True) |
|
full_response = "hid已刷新,重新对话即可" |
|
if full_response.startswith("$@$v=undefined-rv1$@$"): |
|
full_response = full_response[21:] |
|
|
|
cleaned_full_response = strip_model_prefix(full_response, model_prefix) |
|
|
|
return { |
|
"id": f"chatcmpl-{uuid.uuid4()}", |
|
"object": "chat.completion", |
|
"created": int(datetime.now().timestamp()), |
|
"model": request.model, |
|
"choices": [ |
|
{ |
|
"index": 0, |
|
"message": {"role": "assistant", "content": cleaned_full_response}, |
|
"finish_reason": "stop", |
|
} |
|
], |
|
"usage": None, |
|
} |
|
|