|
from fastapi import FastAPI, HTTPException, Depends, Header, Request |
|
from fastapi.responses import JSONResponse, StreamingResponse |
|
from fastapi.security import APIKeyHeader |
|
from pydantic import BaseModel, ConfigDict, Field |
|
from typing import List, Dict, Any, Optional, Union, Literal |
|
import base64 |
|
import re |
|
import json |
|
import time |
|
import os |
|
import glob |
|
import random |
|
from google.oauth2 import service_account |
|
import config |
|
|
|
from google.genai import types |
|
|
|
from google import genai |
|
|
|
client = None |
|
|
|
app = FastAPI(title="OpenAI to Gemini Adapter") |
|
|
|
|
|
api_key_header = APIKeyHeader(name="Authorization", auto_error=False) |
|
|
|
|
|
async def get_api_key(authorization: Optional[str] = Header(None)): |
|
if authorization is None: |
|
raise HTTPException( |
|
status_code=401, |
|
detail="Missing API key. Please include 'Authorization: Bearer YOUR_API_KEY' header." |
|
) |
|
|
|
|
|
if not authorization.startswith("Bearer "): |
|
raise HTTPException( |
|
status_code=401, |
|
detail="Invalid API key format. Use 'Authorization: Bearer YOUR_API_KEY'" |
|
) |
|
|
|
|
|
api_key = authorization.replace("Bearer ", "") |
|
|
|
|
|
if not config.validate_api_key(api_key): |
|
raise HTTPException( |
|
status_code=401, |
|
detail="Invalid API key" |
|
) |
|
|
|
return api_key |
|
|
|
|
|
class CredentialManager: |
|
def __init__(self, default_credentials_dir="/app/credentials"): |
|
|
|
self.credentials_dir = os.environ.get("CREDENTIALS_DIR", default_credentials_dir) |
|
self.credentials_files = [] |
|
self.current_index = 0 |
|
self.credentials = None |
|
self.project_id = None |
|
self.load_credentials_list() |
|
|
|
def load_credentials_list(self): |
|
"""Load the list of available credential files""" |
|
|
|
pattern = os.path.join(self.credentials_dir, "*.json") |
|
self.credentials_files = glob.glob(pattern) |
|
|
|
if not self.credentials_files: |
|
print(f"No credential files found in {self.credentials_dir}") |
|
return False |
|
|
|
print(f"Found {len(self.credentials_files)} credential files: {[os.path.basename(f) for f in self.credentials_files]}") |
|
return True |
|
|
|
def refresh_credentials_list(self): |
|
"""Refresh the list of credential files (useful if files are added/removed)""" |
|
old_count = len(self.credentials_files) |
|
self.load_credentials_list() |
|
new_count = len(self.credentials_files) |
|
|
|
if old_count != new_count: |
|
print(f"Credential files updated: {old_count} -> {new_count}") |
|
|
|
return len(self.credentials_files) > 0 |
|
|
|
def get_next_credentials(self): |
|
"""Rotate to the next credential file and load it""" |
|
if not self.credentials_files: |
|
return None, None |
|
|
|
|
|
file_path = self.credentials_files[self.current_index] |
|
self.current_index = (self.current_index + 1) % len(self.credentials_files) |
|
|
|
try: |
|
credentials = service_account.Credentials.from_service_account_file(file_path,scopes=['https://www.googleapis.com/auth/cloud-platform']) |
|
project_id = credentials.project_id |
|
print(f"Loaded credentials from {file_path} for project: {project_id}") |
|
self.credentials = credentials |
|
self.project_id = project_id |
|
return credentials, project_id |
|
except Exception as e: |
|
print(f"Error loading credentials from {file_path}: {e}") |
|
|
|
if len(self.credentials_files) > 1: |
|
print("Trying next credential file...") |
|
return self.get_next_credentials() |
|
return None, None |
|
|
|
def get_random_credentials(self): |
|
"""Get a random credential file and load it""" |
|
if not self.credentials_files: |
|
return None, None |
|
|
|
|
|
file_path = random.choice(self.credentials_files) |
|
|
|
try: |
|
credentials = service_account.Credentials.from_service_account_file(file_path,scopes=['https://www.googleapis.com/auth/cloud-platform']) |
|
project_id = credentials.project_id |
|
print(f"Loaded credentials from {file_path} for project: {project_id}") |
|
self.credentials = credentials |
|
self.project_id = project_id |
|
return credentials, project_id |
|
except Exception as e: |
|
print(f"Error loading credentials from {file_path}: {e}") |
|
|
|
if len(self.credentials_files) > 1: |
|
print("Trying another credential file...") |
|
return self.get_random_credentials() |
|
return None, None |
|
|
|
|
|
credential_manager = CredentialManager() |
|
|
|
|
|
class ImageUrl(BaseModel): |
|
url: str |
|
|
|
class ContentPartImage(BaseModel): |
|
type: Literal["image_url"] |
|
image_url: ImageUrl |
|
|
|
class ContentPartText(BaseModel): |
|
type: Literal["text"] |
|
text: str |
|
|
|
class OpenAIMessage(BaseModel): |
|
role: str |
|
content: Union[str, List[Union[ContentPartText, ContentPartImage, Dict[str, Any]]]] |
|
|
|
class OpenAIRequest(BaseModel): |
|
model: str |
|
messages: List[OpenAIMessage] |
|
temperature: Optional[float] = 1.0 |
|
max_tokens: Optional[int] = None |
|
top_p: Optional[float] = 1.0 |
|
top_k: Optional[int] = None |
|
stream: Optional[bool] = False |
|
stop: Optional[List[str]] = None |
|
presence_penalty: Optional[float] = None |
|
frequency_penalty: Optional[float] = None |
|
seed: Optional[int] = None |
|
logprobs: Optional[int] = None |
|
response_logprobs: Optional[bool] = None |
|
n: Optional[int] = None |
|
|
|
|
|
model_config = ConfigDict(extra='allow') |
|
|
|
|
|
def init_vertex_ai(): |
|
global client |
|
try: |
|
|
|
credentials_json_str = os.environ.get("GOOGLE_CREDENTIALS_JSON") |
|
if credentials_json_str: |
|
try: |
|
|
|
try: |
|
credentials_info = json.loads(credentials_json_str) |
|
|
|
if not isinstance(credentials_info, dict): |
|
|
|
raise ValueError("Credentials JSON must be a dictionary") |
|
|
|
required_fields = ["type", "project_id", "private_key_id", "private_key", "client_email"] |
|
missing_fields = [field for field in required_fields if field not in credentials_info] |
|
if missing_fields: |
|
|
|
raise ValueError(f"Credentials JSON missing required fields: {missing_fields}") |
|
except json.JSONDecodeError as json_err: |
|
print(f"ERROR: Failed to parse GOOGLE_CREDENTIALS_JSON as JSON: {json_err}") |
|
raise |
|
|
|
|
|
try: |
|
|
|
credentials = service_account.Credentials.from_service_account_info( |
|
credentials_info, |
|
scopes=['https://www.googleapis.com/auth/cloud-platform'] |
|
) |
|
project_id = credentials.project_id |
|
print(f"Successfully created credentials object for project: {project_id}") |
|
except Exception as cred_err: |
|
print(f"ERROR: Failed to create credentials from service account info: {cred_err}") |
|
raise |
|
|
|
|
|
try: |
|
client = genai.Client(vertexai=True, credentials=credentials, project=project_id, location="us-central1") |
|
print(f"Initialized Vertex AI using GOOGLE_CREDENTIALS_JSON env var for project: {project_id}") |
|
except Exception as client_err: |
|
print(f"ERROR: Failed to initialize genai.Client: {client_err}") |
|
raise |
|
return True |
|
except Exception as e: |
|
print(f"Error loading credentials from GOOGLE_CREDENTIALS_JSON: {e}") |
|
|
|
|
|
|
|
print(f"Trying credential manager (directory: {credential_manager.credentials_dir})") |
|
credentials, project_id = credential_manager.get_next_credentials() |
|
|
|
if credentials and project_id: |
|
try: |
|
client = genai.Client(vertexai=True, credentials=credentials, project=project_id, location="us-central1") |
|
print(f"Initialized Vertex AI using Credential Manager for project: {project_id}") |
|
return True |
|
except Exception as e: |
|
print(f"ERROR: Failed to initialize client with credentials from Credential Manager: {e}") |
|
|
|
|
|
file_path = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS") |
|
if file_path: |
|
print(f"Checking GOOGLE_APPLICATION_CREDENTIALS file path: {file_path}") |
|
if os.path.exists(file_path): |
|
try: |
|
print(f"File exists, attempting to load credentials") |
|
credentials = service_account.Credentials.from_service_account_file( |
|
file_path, |
|
scopes=['https://www.googleapis.com/auth/cloud-platform'] |
|
) |
|
project_id = credentials.project_id |
|
print(f"Successfully loaded credentials from file for project: {project_id}") |
|
|
|
try: |
|
client = genai.Client(vertexai=True, credentials=credentials, project=project_id, location="us-central1") |
|
print(f"Initialized Vertex AI using GOOGLE_APPLICATION_CREDENTIALS file path for project: {project_id}") |
|
return True |
|
except Exception as client_err: |
|
print(f"ERROR: Failed to initialize client with credentials from file: {client_err}") |
|
except Exception as e: |
|
print(f"ERROR: Failed to load credentials from GOOGLE_APPLICATION_CREDENTIALS path {file_path}: {e}") |
|
else: |
|
print(f"ERROR: GOOGLE_APPLICATION_CREDENTIALS file does not exist at path: {file_path}") |
|
|
|
|
|
print(f"ERROR: No valid credentials found. Tried GOOGLE_CREDENTIALS_JSON, Credential Manager ({credential_manager.credentials_dir}), and GOOGLE_APPLICATION_CREDENTIALS.") |
|
return False |
|
except Exception as e: |
|
print(f"Error initializing authentication: {e}") |
|
return False |
|
|
|
|
|
@app.on_event("startup") |
|
async def startup_event(): |
|
if not init_vertex_ai(): |
|
print("WARNING: Failed to initialize Vertex AI authentication") |
|
|
|
|
|
def create_gemini_prompt(messages: List[OpenAIMessage]) -> Union[str, List[Any]]: |
|
""" |
|
Convert OpenAI messages to Gemini format. |
|
Returns either a string prompt or a list of content parts if images are present. |
|
""" |
|
|
|
has_images = False |
|
for message in messages: |
|
if isinstance(message.content, list): |
|
for part in message.content: |
|
if isinstance(part, dict) and part.get('type') == 'image_url': |
|
has_images = True |
|
break |
|
elif isinstance(part, ContentPartImage): |
|
has_images = True |
|
break |
|
if has_images: |
|
break |
|
|
|
|
|
if not has_images: |
|
prompt = "" |
|
|
|
|
|
system_message = None |
|
for message in messages: |
|
if message.role == "system": |
|
|
|
if isinstance(message.content, str): |
|
system_message = message.content |
|
elif isinstance(message.content, list) and message.content and isinstance(message.content[0], dict) and 'text' in message.content[0]: |
|
system_message = message.content[0]['text'] |
|
else: |
|
|
|
system_message = str(message.content) |
|
break |
|
|
|
|
|
if system_message: |
|
prompt += f"System: {system_message}\n\n" |
|
|
|
|
|
for message in messages: |
|
if message.role == "system": |
|
continue |
|
|
|
|
|
content_text = "" |
|
if isinstance(message.content, str): |
|
content_text = message.content |
|
elif isinstance(message.content, list) and message.content and isinstance(message.content[0], dict) and 'text' in message.content[0]: |
|
content_text = message.content[0]['text'] |
|
else: |
|
|
|
content_text = str(message.content) |
|
|
|
if message.role == "user": |
|
prompt += f"Human: {content_text}\n" |
|
elif message.role == "assistant": |
|
prompt += f"AI: {content_text}\n" |
|
|
|
|
|
if messages[-1].role == "user": |
|
prompt += "AI: " |
|
|
|
return prompt |
|
|
|
|
|
gemini_contents = [] |
|
|
|
|
|
for message in messages: |
|
if message.role == "system": |
|
if isinstance(message.content, str): |
|
gemini_contents.append(f"System: {message.content}") |
|
elif isinstance(message.content, list): |
|
|
|
system_text = "" |
|
for part in message.content: |
|
if isinstance(part, dict) and part.get('type') == 'text': |
|
system_text += part.get('text', '') |
|
elif isinstance(part, ContentPartText): |
|
system_text += part.text |
|
if system_text: |
|
gemini_contents.append(f"System: {system_text}") |
|
break |
|
|
|
|
|
for message in messages: |
|
if message.role == "system": |
|
continue |
|
|
|
|
|
if isinstance(message.content, str): |
|
prefix = "Human: " if message.role == "user" else "AI: " |
|
gemini_contents.append(f"{prefix}{message.content}") |
|
|
|
|
|
elif isinstance(message.content, list): |
|
|
|
text_content = "" |
|
|
|
for part in message.content: |
|
|
|
if isinstance(part, dict) and part.get('type') == 'text': |
|
text_content += part.get('text', '') |
|
elif isinstance(part, ContentPartText): |
|
text_content += part.text |
|
|
|
|
|
if text_content: |
|
prefix = "Human: " if message.role == "user" else "AI: " |
|
gemini_contents.append(f"{prefix}{text_content}") |
|
|
|
|
|
for part in message.content: |
|
|
|
if isinstance(part, dict) and part.get('type') == 'image_url': |
|
image_url = part.get('image_url', {}).get('url', '') |
|
if image_url.startswith('data:'): |
|
|
|
mime_match = re.match(r'data:([^;]+);base64,(.+)', image_url) |
|
if mime_match: |
|
mime_type, b64_data = mime_match.groups() |
|
image_bytes = base64.b64decode(b64_data) |
|
gemini_contents.append(types.Part.from_bytes(data=image_bytes, mime_type=mime_type)) |
|
elif isinstance(part, ContentPartImage): |
|
image_url = part.image_url.url |
|
if image_url.startswith('data:'): |
|
|
|
mime_match = re.match(r'data:([^;]+);base64,(.+)', image_url) |
|
if mime_match: |
|
mime_type, b64_data = mime_match.groups() |
|
image_bytes = base64.b64decode(b64_data) |
|
gemini_contents.append(types.Part.from_bytes(data=image_bytes, mime_type=mime_type)) |
|
|
|
return gemini_contents |
|
|
|
def create_generation_config(request: OpenAIRequest) -> Dict[str, Any]: |
|
config = {} |
|
|
|
|
|
if request.temperature is not None: |
|
config["temperature"] = request.temperature |
|
|
|
if request.max_tokens is not None: |
|
config["max_output_tokens"] = request.max_tokens |
|
|
|
if request.top_p is not None: |
|
config["top_p"] = request.top_p |
|
|
|
if request.top_k is not None: |
|
config["top_k"] = request.top_k |
|
|
|
if request.stop is not None: |
|
config["stop_sequences"] = request.stop |
|
|
|
|
|
if request.presence_penalty is not None: |
|
config["presence_penalty"] = request.presence_penalty |
|
|
|
if request.frequency_penalty is not None: |
|
config["frequency_penalty"] = request.frequency_penalty |
|
|
|
if request.seed is not None: |
|
config["seed"] = request.seed |
|
|
|
if request.logprobs is not None: |
|
config["logprobs"] = request.logprobs |
|
|
|
if request.response_logprobs is not None: |
|
config["response_logprobs"] = request.response_logprobs |
|
|
|
|
|
if request.n is not None: |
|
config["candidate_count"] = request.n |
|
|
|
return config |
|
|
|
|
|
def convert_to_openai_format(gemini_response, model: str) -> Dict[str, Any]: |
|
|
|
if hasattr(gemini_response, 'candidates') and len(gemini_response.candidates) > 1: |
|
choices = [] |
|
for i, candidate in enumerate(gemini_response.candidates): |
|
choices.append({ |
|
"index": i, |
|
"message": { |
|
"role": "assistant", |
|
"content": candidate.text |
|
}, |
|
"finish_reason": "stop" |
|
}) |
|
else: |
|
|
|
choices = [ |
|
{ |
|
"index": 0, |
|
"message": { |
|
"role": "assistant", |
|
"content": gemini_response.text |
|
}, |
|
"finish_reason": "stop" |
|
} |
|
] |
|
|
|
|
|
for i, choice in enumerate(choices): |
|
if hasattr(gemini_response, 'candidates') and i < len(gemini_response.candidates): |
|
candidate = gemini_response.candidates[i] |
|
if hasattr(candidate, 'logprobs'): |
|
choice["logprobs"] = candidate.logprobs |
|
|
|
return { |
|
"id": f"chatcmpl-{int(time.time())}", |
|
"object": "chat.completion", |
|
"created": int(time.time()), |
|
"model": model, |
|
"choices": choices, |
|
"usage": { |
|
"prompt_tokens": 0, |
|
"completion_tokens": 0, |
|
"total_tokens": 0 |
|
} |
|
} |
|
|
|
def convert_chunk_to_openai(chunk, model: str, response_id: str, candidate_index: int = 0) -> str: |
|
chunk_content = chunk.text if hasattr(chunk, 'text') else "" |
|
|
|
chunk_data = { |
|
"id": response_id, |
|
"object": "chat.completion.chunk", |
|
"created": int(time.time()), |
|
"model": model, |
|
"choices": [ |
|
{ |
|
"index": candidate_index, |
|
"delta": { |
|
"content": chunk_content |
|
}, |
|
"finish_reason": None |
|
} |
|
] |
|
} |
|
|
|
|
|
if hasattr(chunk, 'logprobs'): |
|
chunk_data["choices"][0]["logprobs"] = chunk.logprobs |
|
|
|
return f"data: {json.dumps(chunk_data)}\n\n" |
|
|
|
def create_final_chunk(model: str, response_id: str, candidate_count: int = 1) -> str: |
|
choices = [] |
|
for i in range(candidate_count): |
|
choices.append({ |
|
"index": i, |
|
"delta": {}, |
|
"finish_reason": "stop" |
|
}) |
|
|
|
final_chunk = { |
|
"id": response_id, |
|
"object": "chat.completion.chunk", |
|
"created": int(time.time()), |
|
"model": model, |
|
"choices": choices |
|
} |
|
|
|
return f"data: {json.dumps(final_chunk)}\n\n" |
|
|
|
|
|
@app.get("/v1/models") |
|
async def list_models(api_key: str = Depends(get_api_key)): |
|
|
|
models = [ |
|
{ |
|
"id": "gemini-2.5-pro-exp-03-25", |
|
"object": "model", |
|
"created": int(time.time()), |
|
"owned_by": "google", |
|
"permission": [], |
|
"root": "gemini-2.5-pro-exp-03-25", |
|
"parent": None, |
|
}, |
|
{ |
|
"id": "gemini-2.5-pro-exp-03-25-search", |
|
"object": "model", |
|
"created": int(time.time()), |
|
"owned_by": "google", |
|
"permission": [], |
|
"root": "gemini-2.5-pro-exp-03-25", |
|
"parent": None, |
|
}, |
|
{ |
|
"id": "gemini-2.0-flash", |
|
"object": "model", |
|
"created": int(time.time()), |
|
"owned_by": "google", |
|
"permission": [], |
|
"root": "gemini-2.0-flash", |
|
"parent": None, |
|
}, |
|
{ |
|
"id": "gemini-2.0-flash-search", |
|
"object": "model", |
|
"created": int(time.time()), |
|
"owned_by": "google", |
|
"permission": [], |
|
"root": "gemini-2.0-flash", |
|
"parent": None, |
|
}, |
|
{ |
|
"id": "gemini-2.0-flash-lite", |
|
"object": "model", |
|
"created": int(time.time()), |
|
"owned_by": "google", |
|
"permission": [], |
|
"root": "gemini-2.0-flash-lite", |
|
"parent": None, |
|
}, |
|
{ |
|
"id": "gemini-2.0-flash-lite-search", |
|
"object": "model", |
|
"created": int(time.time()), |
|
"owned_by": "google", |
|
"permission": [], |
|
"root": "gemini-2.0-flash-lite", |
|
"parent": None, |
|
}, |
|
{ |
|
"id": "gemini-2.0-pro-exp-02-05", |
|
"object": "model", |
|
"created": int(time.time()), |
|
"owned_by": "google", |
|
"permission": [], |
|
"root": "gemini-2.0-pro-exp-02-05", |
|
"parent": None, |
|
}, |
|
{ |
|
"id": "gemini-1.5-flash", |
|
"object": "model", |
|
"created": int(time.time()), |
|
"owned_by": "google", |
|
"permission": [], |
|
"root": "gemini-1.5-flash", |
|
"parent": None, |
|
}, |
|
{ |
|
"id": "gemini-1.5-flash-8b", |
|
"object": "model", |
|
"created": int(time.time()), |
|
"owned_by": "google", |
|
"permission": [], |
|
"root": "gemini-1.5-flash-8b", |
|
"parent": None, |
|
}, |
|
{ |
|
"id": "gemini-1.5-pro", |
|
"object": "model", |
|
"created": int(time.time()), |
|
"owned_by": "google", |
|
"permission": [], |
|
"root": "gemini-1.5-pro", |
|
"parent": None, |
|
}, |
|
{ |
|
"id": "gemini-1.0-pro-002", |
|
"object": "model", |
|
"created": int(time.time()), |
|
"owned_by": "google", |
|
"permission": [], |
|
"root": "gemini-1.0-pro-002", |
|
"parent": None, |
|
}, |
|
{ |
|
"id": "gemini-1.0-pro-vision-001", |
|
"object": "model", |
|
"created": int(time.time()), |
|
"owned_by": "google", |
|
"permission": [], |
|
"root": "gemini-1.0-pro-vision-001", |
|
"parent": None, |
|
}, |
|
{ |
|
"id": "gemini-embedding-exp", |
|
"object": "model", |
|
"created": int(time.time()), |
|
"owned_by": "google", |
|
"permission": [], |
|
"root": "gemini-embedding-exp", |
|
"parent": None, |
|
} |
|
] |
|
|
|
return {"object": "list", "data": models} |
|
|
|
|
|
|
|
def create_openai_error_response(status_code: int, message: str, error_type: str) -> Dict[str, Any]: |
|
return { |
|
"error": { |
|
"message": message, |
|
"type": error_type, |
|
"code": status_code, |
|
"param": None, |
|
} |
|
} |
|
|
|
@app.post("/v1/chat/completions") |
|
async def chat_completions(request: OpenAIRequest, api_key: str = Depends(get_api_key)): |
|
try: |
|
|
|
models_response = await list_models() |
|
if not request.model or not any(model["id"] == request.model for model in models_response.get("data", [])): |
|
error_response = create_openai_error_response( |
|
400, f"Model '{request.model}' not found", "invalid_request_error" |
|
) |
|
return JSONResponse(status_code=400, content=error_response) |
|
|
|
|
|
is_grounded_search = request.model.endswith("-search") |
|
|
|
|
|
gemini_model = request.model.replace("-search", "") if is_grounded_search else request.model |
|
|
|
|
|
generation_config = create_generation_config(request) |
|
|
|
|
|
global client |
|
if client is None: |
|
|
|
error_response = create_openai_error_response( |
|
500, "Vertex AI client not initialized", "server_error" |
|
) |
|
return JSONResponse(status_code=500, content=error_response) |
|
print(f"Using globally initialized client.") |
|
|
|
|
|
search_tool = types.Tool(google_search=types.GoogleSearch()) |
|
|
|
safety_settings = [ |
|
types.SafetySetting( |
|
category="HARM_CATEGORY_HATE_SPEECH", |
|
threshold="OFF" |
|
),types.SafetySetting( |
|
category="HARM_CATEGORY_DANGEROUS_CONTENT", |
|
threshold="OFF" |
|
),types.SafetySetting( |
|
category="HARM_CATEGORY_SEXUALLY_EXPLICIT", |
|
threshold="OFF" |
|
),types.SafetySetting( |
|
category="HARM_CATEGORY_HARASSMENT", |
|
threshold="OFF" |
|
)] |
|
|
|
generation_config["safety_settings"] = safety_settings |
|
if is_grounded_search: |
|
generation_config["tools"] = [search_tool] |
|
|
|
|
|
prompt = create_gemini_prompt(request.messages) |
|
|
|
if request.stream: |
|
|
|
async def stream_generator(): |
|
response_id = f"chatcmpl-{int(time.time())}" |
|
candidate_count = request.n or 1 |
|
|
|
try: |
|
|
|
|
|
for candidate_index in range(candidate_count): |
|
|
|
|
|
responses = client.models.generate_content_stream( |
|
model=gemini_model, |
|
contents=prompt, |
|
config=generation_config, |
|
) |
|
|
|
|
|
for response in responses: |
|
yield convert_chunk_to_openai(response, request.model, response_id, candidate_index) |
|
|
|
|
|
yield create_final_chunk(request.model, response_id, candidate_count) |
|
yield "data: [DONE]\n\n" |
|
|
|
except Exception as stream_error: |
|
|
|
error_msg = f"Error during streaming: {str(stream_error)}" |
|
print(error_msg) |
|
error_response = create_openai_error_response(500, error_msg, "server_error") |
|
yield f"data: {json.dumps(error_response)}\n\n" |
|
yield "data: [DONE]\n\n" |
|
|
|
return StreamingResponse( |
|
stream_generator(), |
|
media_type="text/event-stream" |
|
) |
|
else: |
|
|
|
try: |
|
|
|
if request.n and request.n > 1: |
|
|
|
if "candidate_count" not in generation_config: |
|
generation_config["candidate_count"] = request.n |
|
|
|
response = client.models.generate_content( |
|
model=gemini_model, |
|
contents=prompt, |
|
config=generation_config, |
|
) |
|
|
|
|
|
openai_response = convert_to_openai_format(response, request.model) |
|
return JSONResponse(content=openai_response) |
|
except Exception as generate_error: |
|
error_msg = f"Error generating content: {str(generate_error)}" |
|
print(error_msg) |
|
error_response = create_openai_error_response(500, error_msg, "server_error") |
|
return JSONResponse(status_code=500, content=error_response) |
|
|
|
except Exception as e: |
|
error_msg = f"Error processing request: {str(e)}" |
|
print(error_msg) |
|
error_response = create_openai_error_response(500, error_msg, "server_error") |
|
return JSONResponse(status_code=500, content=error_response) |
|
|
|
|
|
@app.get("/health") |
|
def health_check(api_key: str = Depends(get_api_key)): |
|
|
|
credential_manager.refresh_credentials_list() |
|
|
|
return { |
|
"status": "ok", |
|
"credentials": { |
|
"available": len(credential_manager.credentials_files), |
|
"files": [os.path.basename(f) for f in credential_manager.credentials_files], |
|
"current_index": credential_manager.current_index |
|
} |
|
} |
|
|
|
|
|
@app.get("/debug/credentials") |
|
def debug_credentials(api_key: str = Depends(get_api_key)): |
|
""" |
|
Diagnostic endpoint to check credential configuration without actually authenticating. |
|
This helps troubleshoot issues with credential setup, especially on Hugging Face. |
|
""" |
|
|
|
creds_json = os.environ.get("GOOGLE_CREDENTIALS_JSON") |
|
creds_json_status = { |
|
"present": creds_json is not None, |
|
"length": len(creds_json) if creds_json else 0, |
|
"parse_status": "not_attempted" |
|
} |
|
|
|
|
|
if creds_json: |
|
try: |
|
creds_info = json.loads(creds_json) |
|
|
|
required_fields = ["type", "project_id", "private_key_id", "private_key", "client_email"] |
|
missing_fields = [field for field in required_fields if field not in creds_info] |
|
|
|
creds_json_status.update({ |
|
"parse_status": "success", |
|
"is_dict": isinstance(creds_info, dict), |
|
"missing_required_fields": missing_fields, |
|
"project_id": creds_info.get("project_id", "not_found"), |
|
|
|
"private_key_sample": creds_info.get("private_key", "not_found")[:10] + "..." if "private_key" in creds_info else "not_found" |
|
}) |
|
except json.JSONDecodeError as e: |
|
creds_json_status.update({ |
|
"parse_status": "error", |
|
"error": str(e), |
|
"sample": creds_json[:20] + "..." if len(creds_json) > 20 else creds_json |
|
}) |
|
|
|
|
|
credential_manager.refresh_credentials_list() |
|
|
|
|
|
app_creds_path = os.environ.get("GOOGLE_APPLICATION_CREDENTIALS") |
|
app_creds_status = { |
|
"present": app_creds_path is not None, |
|
"path": app_creds_path, |
|
"exists": os.path.exists(app_creds_path) if app_creds_path else False |
|
} |
|
|
|
return { |
|
"environment": { |
|
"GOOGLE_CREDENTIALS_JSON": creds_json_status, |
|
"CREDENTIALS_DIR": { |
|
"path": credential_manager.credentials_dir, |
|
"exists": os.path.exists(credential_manager.credentials_dir), |
|
"files_found": len(credential_manager.credentials_files), |
|
"files": [os.path.basename(f) for f in credential_manager.credentials_files] |
|
}, |
|
"GOOGLE_APPLICATION_CREDENTIALS": app_creds_status |
|
}, |
|
"recommendations": [ |
|
"Ensure GOOGLE_CREDENTIALS_JSON contains the full, properly formatted JSON content of your service account key", |
|
"Check for any special characters or line breaks that might need proper escaping", |
|
"Verify that the service account has the necessary permissions for Vertex AI" |
|
] |
|
} |
|
|