Spaces:
Running
Running
import asyncio | |
import gradio as gr | |
import requests | |
import re | |
from typing import List, Dict, Tuple, Any # νμ ννΈ μΆκ° | |
# api_usage λͺ¨λμ μ¬μ©μμ νκ²½μ λ§κ² μ‘΄μ¬νλ€κ³ κ°μ ν©λλ€. | |
# νμν ν¨μλ€μ μν¬νΈ (μ€μ μ¬μ© μ ν΄λΉ λͺ¨λ νμ) | |
from api_usage import ( | |
get_subscription, | |
check_key_availability, | |
get_orgs_me, | |
check_key_ant_availability, | |
check_ant_rate_limit, | |
check_key_gemini_availability, # μ΄ ν¨μλ (bool, str) ννμ λ°ννλ€κ³ κ°μ | |
check_key_azure_availability, | |
get_azure_status, | |
get_azure_deploy, | |
check_key_mistral_availability, | |
check_mistral_quota, | |
check_key_replicate_availability, | |
check_key_aws_availability, | |
check_key_or_availability, | |
check_key_or_limits, | |
check_gcp_anthropic, | |
check_groq_status, | |
check_nai_status, | |
check_elevenlabs_status, | |
check_xai_status, | |
check_stability_status, | |
check_deepseek_status, | |
) | |
# βββββββββββββββββββββββββββββββββββββββββ | |
# Key-specific helper functions (OpenAI ν¨μ μμ λ¨) | |
# βββββββββββββββββββββββββββββββββββββββββ | |
def get_key_oai_info(key: str) -> Dict[str, Any]: | |
session = requests.Session() | |
# raw_status_codeλ HTTP μλ΅ μ½λ λλ μ μ¬ν μν νμμ | |
# org_data_or_errorλ μ±κ³΅ μ λ°μ΄ν°, μ€ν¨ μ μλ¬ μ 보 λ± | |
raw_status_code, org_data_or_error = check_key_availability(session, key) | |
info_dict = { | |
"key_type": "OpenAI", | |
"key_availability": False, | |
"has_sufficient_quota": True, # κΈ°λ³Έμ μΌλ‘ Trueλ‘ κ°μ , λ¬Έμ μ Falseλ‘ λ³κ²½ | |
"gpt4_availability": "", | |
"gpt4_32k_availability": "", | |
"default_org": "", | |
"org_description": "", | |
"organization": "", | |
"models": "", | |
"requests_per_minute": "", | |
"tokens_per_minute": "", | |
"quota": "", | |
"all_models": "", | |
"status_message": "" | |
} | |
org_data_for_subscription = None | |
if raw_status_code == 200: | |
info_dict["key_availability"] = True | |
org_data_for_subscription = org_data_or_error | |
elif raw_status_code == 401: # Unauthorized | |
info_dict["status_message"] = "Unauthorized: Invalid API key." | |
info_dict["has_sufficient_quota"] = False | |
return info_dict | |
elif raw_status_code == 403: # Forbidden | |
status_me, orgs_me_data = get_orgs_me(session, key) | |
if status_me == 200: | |
info_dict["key_availability"] = True | |
org_data_for_subscription = orgs_me_data | |
else: | |
info_dict["status_message"] = f"Forbidden, and get_orgs_me failed (status: {status_me}). Key might be inactive or lack permissions." | |
info_dict["has_sufficient_quota"] = False | |
return info_dict | |
elif raw_status_code == 429: # Too Many Requests (Rate Limit or Quota) | |
info_dict["key_availability"] = True # ν€ μ체λ μ ν¨ν μ μμ | |
info_dict["has_sufficient_quota"] = False # μΏΌν° λΆμ‘±μΌλ‘ κ°μ£Ό | |
info_dict["status_message"] = "Rate limit or quota likely exceeded (initial check)." | |
if isinstance(org_data_or_error, dict) and "error" in org_data_or_error: | |
error_details = org_data_or_error["error"] | |
current_quota_message = error_details.get("message", "Quota details unavailable from initial check") | |
info_dict["quota"] = current_quota_message | |
if "insufficient_quota" not in current_quota_message.lower(): # μμΈ λ©μμ§μ insufficient_quotaκ° μλ€λ©΄ μν λ©μμ§μ μΆκ° | |
info_dict["status_message"] += f" Error: {current_quota_message}" | |
# μ¬κΈ°μ λ°ννλ©΄ μλ ꡬλ μ 보 νμΈμ 건λλ°μ§λ§, has_sufficient_quotaκ° μ΄λ―Έ Falseμ | |
# return info_dict # νμμ λ°λΌ μ£Όμ ν΄μ νμ¬ λ λΉ λ₯Έ λ°ν κ°λ₯ | |
else: # κΈ°ν μλ¬ | |
info_dict["status_message"] = f"Key check failed (status: {raw_status_code})." | |
if isinstance(org_data_or_error, dict) and "error" in org_data_or_error: | |
info_dict["status_message"] += f" Error: {org_data_or_error['error'].get('message', str(org_data_or_error))}" | |
elif isinstance(org_data_or_error, str): | |
info_dict["status_message"] += f" Details: {org_data_or_error}" | |
info_dict["has_sufficient_quota"] = False | |
return info_dict | |
if not info_dict["key_availability"]: | |
return info_dict # ν€κ° μ ν¨νμ§ μμΌλ©΄ λ μ΄μ μ§ν μ ν¨ | |
# org_data_for_subscriptionμ΄ μ€λΉλμ§ μμ κ²½μ° (μ: μ΄κΈ° 200μ΄μμ§λ§ org_dataκ° λΆμ μ ) | |
if not org_data_for_subscription: | |
status_me, orgs_me_data = get_orgs_me(session, key) | |
if status_me == 200: | |
org_data_for_subscription = orgs_me_data | |
else: | |
info_dict["status_message"] = (info_dict["status_message"] + " Could not identify organization for subscription.").strip() | |
info_dict["key_availability"] = False | |
info_dict["has_sufficient_quota"] = False | |
return info_dict | |
subscription_info = get_subscription(key, session, org_data_for_subscription) | |
if subscription_info: | |
info_dict.update( | |
{ | |
"gpt4_availability": subscription_info.get("has_gpt4", ""), | |
"gpt4_32k_availability": subscription_info.get("has_gpt4_32k", ""), | |
"default_org": subscription_info.get("default_org", ""), | |
"org_description": subscription_info.get("org_description", ""), | |
"organization": subscription_info.get("organization", ""), | |
"models": subscription_info.get("models", ""), | |
"requests_per_minute": subscription_info.get("rpm", ""), | |
"tokens_per_minute": subscription_info.get("tpm", ""), | |
"quota": subscription_info.get("quota", info_dict["quota"]), | |
"all_models": subscription_info.get("all_models", ""), | |
} | |
) | |
error_info = subscription_info.get("error") | |
if error_info and isinstance(error_info, dict): | |
err_type = error_info.get("type", "").lower() | |
err_code = error_info.get("code", "").lower() | |
err_msg = error_info.get("message", "").lower() | |
if "insufficient_quota" in err_type or \ | |
"insufficient_quota" in err_code or \ | |
"you exceeded your current quota" in err_msg or \ | |
"payment required" in err_msg or \ | |
("billing" in err_msg and "issue" in err_msg): | |
info_dict["has_sufficient_quota"] = False | |
new_quota_message = f"Insufficient: {error_info.get('message', err_type)}" | |
info_dict["quota"] = new_quota_message # μΏΌν° νλμ λͺ μμ λ©μμ§ | |
info_dict["status_message"] = (info_dict["status_message"] + f" Quota/Billing issue: {error_info.get('message', err_type)}").strip() | |
# κ³μ λΉνμ±ν λ± νμΈ | |
if "account_deactivated" in str(subscription_info).lower() or \ | |
"payment_failed" in str(subscription_info).lower(): | |
info_dict["has_sufficient_quota"] = False | |
if "Account issue" not in info_dict["status_message"]: # μ€λ³΅ λ©μμ§ λ°©μ§ | |
info_dict["status_message"] = (info_dict["status_message"] + " Account issue (e.g., deactivated, payment failed).").strip() | |
else: | |
info_dict["status_message"] = (info_dict["status_message"] + " Failed to retrieve full subscription details.").strip() | |
info_dict["has_sufficient_quota"] = False | |
# ꡬλ μ 보 μμΌλ©΄ key_availabilityλ Falseλ‘ κ°μ£Όν μ μμ. | |
# info_dict["key_availability"] = False # μ£Όμ μ²λ¦¬νμ¬ ν€ μ체λ μ ν¨ν μ μμμ λ¨κΉ | |
return info_dict | |
async def get_key_ant_info(key: str, rate_limit: bool, claude_model: str) -> Dict[str, Any]: | |
key_avai = await check_key_ant_availability(key, claude_model) | |
info_dict = { | |
"key_type": "Anthropic Claude", | |
"key_availability": key_avai[0], | |
"status": "", | |
"filter_response": "", | |
"requests_per_minute": "", | |
"tokens_per_minute": "", | |
"tokens_input_per_minute": "", | |
"tokens_output_per_minute": "", | |
"tier": "", | |
"concurrent_rate_limit": "", | |
"models": "", | |
} | |
info_dict["status"] = key_avai[1] | |
info_dict["filter_response"] = key_avai[2] | |
info_dict["requests_per_minute"] = key_avai[3] + ("" if key_avai[3] == "" else f" ({key_avai[4]} left)") | |
info_dict["tokens_per_minute"] = key_avai[5] + ("" if key_avai[5] == "" else f" ({key_avai[6]} left)") | |
info_dict["tokens_input_per_minute"] = key_avai[8] + ("" if key_avai[8] == "" else f" ({key_avai[9]} left)") | |
info_dict["tokens_output_per_minute"] = key_avai[10] + ("" if key_avai[10] == "" else f" ({key_avai[11]} left)") | |
info_dict["tier"] = key_avai[7] | |
info_dict["models"] = key_avai[12] | |
if rate_limit: | |
rate = await check_ant_rate_limit(key, claude_model) | |
info_dict["concurrent_rate_limit"] = rate | |
return info_dict | |
# μ¬μ©μμ μλ³Έ get_key_gemini_info ν¨μ μ μ§ | |
def get_key_gemini_info(key: str) -> Dict[str, Any]: | |
"""Gemini ν€ μ 보λ₯Ό κ°μ Έμ΅λλ€ (μ¬μ©μ μλ³Έ λ²μ ).""" | |
key_avai = check_key_gemini_availability(key) # (bool, str) νν λ°ν κ°μ | |
info_dict = { | |
"key": key, # μλ³Έμλ key νλκ° μμμ | |
"key_availability": key_avai[0], | |
"status": key_avai[1], | |
} | |
return info_dict | |
def get_key_azure_info(endpoint: str, api_key: str) -> Dict[str, Any]: | |
key_avai = check_key_azure_availability(endpoint, api_key) | |
info_dict = { | |
"key_type": "Microsoft Azure OpenAI", | |
"key_availability": key_avai[0], | |
"gpt35_availability": "", | |
"gpt4_availability": "", | |
"gpt4_32k_availability": "", | |
"dall_e_3_availability": "", | |
"moderation_status": "", | |
"models": "", | |
"deployments": "", | |
} | |
if key_avai[0]: | |
azure_deploy = get_azure_deploy(endpoint, api_key) | |
if azure_deploy: | |
status = get_azure_status(endpoint, api_key, azure_deploy) | |
if status: | |
info_dict["gpt35_availability"] = status[1] | |
info_dict["gpt4_availability"] = status[2] | |
info_dict["gpt4_32k_availability"] = status[3] | |
info_dict["dall_e_3_availability"] = status[4] | |
info_dict["moderation_status"] = status[0] | |
info_dict["models"] = key_avai[1] | |
info_dict["deployments"] = azure_deploy if azure_deploy else "N/A" | |
return info_dict | |
def get_key_mistral_info(key: str) -> Dict[str, Any]: | |
key_avai = check_key_mistral_availability(key) | |
info_dict = { | |
"key_type": "Mistral AI", | |
"key_availability": True if key_avai else False, | |
"has_quota": "", | |
"limits": "", | |
"models": "", | |
} | |
if key_avai: | |
quota_info = check_mistral_quota(key) | |
if quota_info: | |
info_dict["has_quota"] = quota_info[0] | |
if quota_info[1]: | |
info_dict["limits"] = quota_info[1] | |
info_dict["models"] = key_avai | |
return info_dict | |
def get_key_replicate_info(key: str) -> Dict[str, Any]: | |
key_avai = check_key_replicate_availability(key) | |
info_dict = { | |
"key_type": "Replicate", | |
"key_availability": key_avai[0], | |
"account_name": "", | |
"type": "", | |
"has_quota": "", | |
"hardware_available": "", | |
} | |
if key_avai[0] and isinstance(key_avai[1], dict): | |
info_dict["account_name"] = key_avai[1].get("username", "") | |
info_dict["type"] = key_avai[1].get("type", "") | |
info_dict["has_quota"] = key_avai[2] | |
info_dict["hardware_available"] = key_avai[3] | |
return info_dict | |
async def get_key_aws_info(key: str) -> Dict[str, Any]: | |
key_avai = await check_key_aws_availability(key) | |
info_dict = { | |
"key_type": "Amazon AWS Claude", | |
"key_availability": key_avai[0], | |
"username": "", | |
"root": "", | |
"admin": "", | |
"quarantine": "", | |
"iam_full_access": "", | |
"iam_user_change_password": "", | |
"aws_bedrock_full_access": "", | |
"enabled_region": "", | |
"models_usage": "", | |
"cost_and_usage": key_avai[1] if not key_avai[0] else "", | |
} | |
if key_avai[0]: | |
info_dict["username"] = key_avai[1] | |
info_dict["root"] = key_avai[2] | |
info_dict["admin"] = key_avai[3] | |
info_dict["quarantine"] = key_avai[4] | |
info_dict["iam_full_access"] = key_avai[5] | |
info_dict["iam_user_change_password"] = key_avai[6] | |
info_dict["aws_bedrock_full_access"] = key_avai[7] | |
info_dict["enabled_region"] = key_avai[8] | |
info_dict["models_usage"] = key_avai[9] | |
info_dict["cost_and_usage"] = key_avai[10] | |
return info_dict | |
def get_key_openrouter_info(key: str) -> Dict[str, Any]: | |
key_avai = check_key_or_availability(key) | |
info_dict = { | |
"key_type": "OpenRouter", | |
"key_availability": key_avai[0], | |
"is_free_tier": "", | |
"usage": "", | |
"balance": "", | |
"limit": "", | |
"limit_remaining": "", | |
"rate_limit_per_minite": "", | |
"4_turbo_per_request_tokens_limit": "", | |
"sonnet_per_request_tokens_limit": "", | |
"opus_per_request_tokens_limit": "", | |
} | |
if key_avai[0] and isinstance(key_avai[1], dict): | |
models_info = check_key_or_limits(key) | |
info_dict["is_free_tier"] = key_avai[1].get("is_free_tier", "") | |
info_dict["limit"] = key_avai[1].get("limit", "") | |
info_dict["limit_remaining"] = key_avai[1].get("limit_remaining", "") | |
usage_val = key_avai[1].get("usage") | |
info_dict["usage"] = f"${format(usage_val, '.4f')}" if isinstance(usage_val, (int, float)) else "" | |
balance_val = models_info[0] if models_info else None | |
rate_limit_val = key_avai[2] if len(key_avai) > 2 else None | |
if balance_val is not None: | |
info_dict["balance"] = f"${format(balance_val, '.4f')}" | |
elif rate_limit_val is not None: | |
try: | |
estimated_balance = float(rate_limit_val) / 60 | |
info_dict["balance"] = f"${format(estimated_balance, '.4f')} (estimated)" | |
except (ValueError, TypeError): | |
info_dict["balance"] = "$N/A (estimation failed)" | |
else: | |
info_dict["balance"] = "$N/A" | |
info_dict["rate_limit_per_minite"] = rate_limit_val if rate_limit_val is not None else "" | |
if models_info and isinstance(models_info[1], dict): | |
model_limits = models_info[1] | |
info_dict["4_turbo_per_request_tokens_limit"] = model_limits.get("openai/gpt-4o", "") | |
info_dict["sonnet_per_request_tokens_limit"] = model_limits.get("anthropic/claude-3.5-sonnet:beta", "") | |
info_dict["opus_per_request_tokens_limit"] = model_limits.get("anthropic/claude-3-opus:beta", "") | |
elif not key_avai[0] and len(key_avai) > 1: | |
info_dict["usage"] = key_avai[1] | |
return info_dict | |
async def get_key_gcp_info(key: str, type: int) -> Dict[str, Any]: | |
key_avai = await check_gcp_anthropic(key, type) | |
info_dict = { | |
"key_type": "Vertex AI (GCP)", | |
"key_availability": key_avai[0], | |
"status": "", | |
"enabled_region": "", | |
} | |
if key_avai[0]: | |
info_dict["enabled_region"] = key_avai[2] | |
elif len(key_avai) > 1: | |
info_dict["status"] = key_avai[1] | |
return info_dict | |
def get_key_groq_info(key: str) -> Dict[str, Any]: | |
key_avai = check_groq_status(key) | |
info_dict = { | |
"key_type": "Groq", | |
"key_availability": True if key_avai else False, | |
"models": key_avai if key_avai else "", | |
} | |
return info_dict | |
def get_key_nai_info(key: str) -> Dict[str, Any]: | |
key_avai = check_nai_status(key) | |
info_dict = { | |
"key_type": "NovelAI", | |
"key_availability": True if key_avai[0] else False, | |
"user_info": key_avai[1] if key_avai[0] else "", | |
} | |
return info_dict | |
def get_key_elevenlabs_info(key: str) -> Dict[str, Any]: | |
key_avai = check_elevenlabs_status(key) | |
info_dict = { | |
"key_type": "ElevenLabs", | |
"key_availability": key_avai[0], | |
"user_info": key_avai[1] if len(key_avai) > 1 else "", | |
"voices_info": key_avai[2] if len(key_avai) > 2 else "", | |
} | |
return info_dict | |
def get_key_xai_info(key: str) -> Dict[str, Any]: | |
key_avai = check_xai_status(key) | |
info_dict = { | |
"key_type": "xAI Grok", | |
"key_availability": key_avai[0], | |
"key_status": "", | |
"models": "", | |
} | |
if key_avai[0]: | |
info_dict["key_status"] = key_avai[1] if len(key_avai) > 1 else "" | |
info_dict["models"] = key_avai[2] if len(key_avai) > 2 else "" | |
return info_dict | |
def get_key_stability_info(key: str) -> Dict[str, Any]: | |
key_avai = check_stability_status(key) | |
info_dict = { | |
"key_type": "Stability AI", | |
"key_availability": key_avai[0], | |
"account_info": "", | |
"credits": "", | |
"models": "", | |
} | |
if key_avai[0]: | |
info_dict["account_info"] = key_avai[1] if len(key_avai) > 1 else "" | |
info_dict["credits"] = key_avai[2] if len(key_avai) > 2 else "" | |
info_dict["models"] = key_avai[3] if len(key_avai) > 3 else "" | |
return info_dict | |
def get_key_deepseek_info(key: str) -> Dict[str, Any]: | |
key_avai = check_deepseek_status(key) | |
info_dict = { | |
"key_type": "DeepSeek", | |
"key_availability": key_avai[0], | |
"balance": "", | |
"models": "", | |
} | |
if key_avai[0]: | |
info_dict["models"] = key_avai[1] if len(key_avai) > 1 else "" | |
info_dict["balance"] = key_avai[2] if len(key_avai) > 2 else "" | |
return info_dict | |
def not_supported(key: str) -> Dict[str, Any]: | |
return { | |
"key_type": "Not supported", | |
"key_availability": False, | |
"status": "Unknown key format", | |
} | |
# βββββββββββββββββββββββββββββββββββββββββ | |
# λ¨μΌ ν€ λΉλκΈ° μ²λ¦¬ (μ λ°μ΄νΈλ¨) | |
# βββββββββββββββββββββββββββββββββββββββββ | |
async def process_single_key(key: str, rate_limit: bool, claude_model: str) -> Dict[str, Any]: | |
"""μ£Όμ΄μ§ key νλλ₯Ό λΆμν΄ μ 보 dict λ°ν. Gemini ν€μ μλ μ¬λΆ νλκ·Έ ν¬ν¨.""" | |
_key = key.strip() | |
if not _key: | |
return {"key": "", "key_type": "Empty", "key_availability": False} | |
# OpenRouter | |
if re.match(re.compile(r"sk-or-v1-[a-z0-9]{64}"), _key): | |
result = get_key_openrouter_info(_key) | |
return {"key": _key, **result} | |
# Anthropic Claude | |
if re.match(re.compile(r"sk-ant-api03-[a-zA-Z0-9\-_]{93}AA"), _key) or \ | |
(_key.startswith("sk-ant-") and len(_key) == 93) or \ | |
(len(_key) == 89 and re.match(re.compile(r"sk-[a-zA-Z0-9]{86}"), _key)): | |
result = await get_key_ant_info(_key, rate_limit, claude_model) | |
return {"key": _key, **result} | |
# Stability | |
if re.match(re.compile(r"sk-[a-zA-Z0-9]{48}"), _key) and len(_key) == 51 and "T3BlbkFJ" not in _key: | |
result = get_key_stability_info(_key) | |
return {"key": _key, **result} | |
# Deepseek | |
if re.match(re.compile(r"sk-[a-f0-9]{32}"), _key): | |
result = get_key_deepseek_info(_key) | |
return {"key": _key, **result} | |
# OpenAI (λ€λ₯Έ sk- ν¨ν΄λ³΄λ€ λ€μ μμΌ ν¨) | |
if _key.startswith("sk-"): | |
result = get_key_oai_info(_key) # μμ λ ν¨μ νΈμΆ | |
return {"key": _key, **result} | |
# Google Gemini μ²λ¦¬ | |
if _key.startswith("AIzaSy"): | |
gemini_info = get_key_gemini_info(_key) | |
is_working = gemini_info.get("key_availability") and gemini_info.get("status") == "Working" | |
result = { | |
"key_type": "Google Gemini", | |
**gemini_info, | |
"is_gemini_working": is_working | |
} | |
return result | |
# NovelAI | |
if _key.startswith("pst-"): | |
result = get_key_nai_info(_key) | |
return {"key": _key, **result} | |
# Replicate | |
if (_key.startswith("r8_") and len(_key) == 40) or (_key.islower() and len(_key) == 40): | |
result = get_key_replicate_info(_key) | |
return {"key": _key, **result} | |
# xAI | |
if _key.startswith("xai-"): | |
result = get_key_xai_info(_key) | |
return {"key": _key, **result} | |
# Azure endpoint: "name:key" | |
if len(_key.split(":")) == 2: | |
name, potential_key = _key.split(":", 1) | |
if re.fullmatch(r'[a-fA-F0-9]{32}', potential_key) and "openai.azure.com" not in name: | |
endpoint = f"https://{name}.openai.azure.com/" | |
api_key = potential_key | |
result = get_key_azure_info(endpoint, api_key) | |
return {"key": _key, **result} | |
# Azure endpoint: "https://xxx.openai.azure.com;key" | |
if ";" in _key and "openai.azure.com" in _key.split(";")[0]: | |
endpoint, api_key = _key.split(";", 1) | |
result = get_key_azure_info(endpoint, api_key) | |
return {"key": _key, **result} | |
# AWS | |
if _key.startswith("AKIA") and len(_key.split(":")[0]) == 20 and _key.split(":")[0].isalnum() and _key.split(":")[0].isupper() and len(_key.split(':')) == 2: | |
result = await get_key_aws_info(_key) | |
return {"key": _key, **result} | |
# ElevenLabs | |
if re.fullmatch(r"[a-f0-9]{32}", _key) or re.fullmatch(r"sk_[a-f0-9]{48}", _key): | |
result = get_key_elevenlabs_info(_key) | |
return {"key": _key, **result} | |
# Mistral | |
if re.fullmatch(r"[a-zA-Z0-9]{32}", _key) and not _key.startswith('sk-'): | |
result = get_key_mistral_info(_key) | |
return {"key": _key, **result} | |
# Groq | |
if re.match(re.compile(r"gsk_[a-zA-Z0-9]{20}WGdyb3FY[a-zA-Z0-9]{24}"), _key): | |
result = get_key_groq_info(_key) | |
return {"key": _key, **result} | |
# GCP - refresh token | |
if re.match(re.compile(r"[\w\-]+:[\w\-@\.]+:.+:.+"), _key): | |
parts = _key.split(':') | |
if len(parts) >= 4: | |
result = await get_key_gcp_info(_key, 0) | |
return {"key": _key, **result} | |
# GCP - service account | |
if re.match(re.compile(r"[\w\-]+:[\w\-@\.]+:.+\\n"), _key): | |
parts = _key.split(':') | |
if len(parts) >= 3: | |
result = await get_key_gcp_info(_key, 1) | |
return {"key": _key, **result} | |
# Not supported | |
result = not_supported(_key) | |
return {"key": _key, **result} | |
# βββββββββββββββββββββββββββββββββββββββββ | |
# μ¬λ¬ key λΉλκΈ° μ²λ¦¬ ν¨μ (μ λ°μ΄νΈλ¨) | |
# βββββββββββββββββββββββββββββββββββββββββ | |
async def sort_keys(text: str, rate_limit: bool, claude_model: str) -> Tuple[List[Dict[str, Any]], str, str, str]: | |
""" | |
ν μ€νΈ λ°μ€μ μ λ ₯λ μ¬λ¬ ν€λ₯Ό μ€ λ¨μλ‘ λΆμνκ³ , | |
μ 체 κ²°κ³Όμ μλνλ OAI, Anthropic, Gemini ν€ λͺ©λ‘μ κ°κ° λ°νν©λλ€. | |
""" | |
keys = [k.strip() for k in text.splitlines() if k.strip()] | |
if not keys: | |
return [], "", "", "" | |
tasks = [process_single_key(k, rate_limit, claude_model) for k in keys] | |
results = await asyncio.gather(*tasks) | |
working_gemini_keys = [] | |
working_oai_keys = [] | |
working_anthropic_keys = [] | |
for result in results: | |
key_value = result.get("key") | |
if not key_value: | |
continue | |
key_type = result.get("key_type") | |
# Gemini ν€ νν°λ§ | |
if result.get("is_gemini_working"): | |
working_gemini_keys.append(key_value) | |
# OpenAI ν€ νν°λ§ (μμ λ¨: has_sufficient_quota νμΈ) | |
elif key_type == "OpenAI" and \ | |
result.get("key_availability") is True and \ | |
result.get("has_sufficient_quota") is True: # μ΄ νλκ° Trueμ¬μΌ ν¨ | |
working_oai_keys.append(key_value) | |
# Anthropic ν€ νν°λ§ | |
elif key_type == "Anthropic Claude" and result.get("key_availability") is True: | |
working_anthropic_keys.append(key_value) | |
return results, "\n".join(working_oai_keys), "\n".join(working_anthropic_keys), "\n".join(working_gemini_keys) | |
# βββββββββββββββββββββββββββββββββββββββββ | |
# UI util (μ λ°μ΄νΈλ¨) | |
# βββββββββββββββββββββββββββββββββββββββββ | |
def clear_inputs(): | |
return "", "", "", "", "" | |
# βββββββββββββββββββββββββββββββββββββββββ | |
# Gradio UI (μ λ°μ΄νΈλ¨) | |
# βββββββββββββββββββββββββββββββββββββββββ | |
with gr.Blocks() as demo: | |
gr.Markdown( | |
""" | |
# Multi-API Key Status Checker (OAI, Anthropic, Gemini Enhanced) | |
*(Based on shaocongma, CncAnon1, su, Drago, kingbased key checkers)* | |
Check the status and details of various API keys including OpenAI, Anthropic, Gemini, Azure, Mistral, Replicate, AWS Claude, OpenRouter, Vertex AI (GCP Anthropic), Groq, NovelAI, ElevenLabs, xAI, Stability AI, and DeepSeek. | |
This version highlights working OpenAI (with sufficient quota), Anthropic, and Gemini keys in separate text boxes. | |
**Key Formats:** | |
* **AWS:** `AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY` (root might not be accurate) | |
* **Azure:** `RESOURCE_NAME:API_KEY` **or** `https://RESOURCE_NAME.openai.azure.com;API_KEY` | |
* **GCP Service Account:** `PROJECT_ID:CLIENT_EMAIL:PRIVATE_KEY` (ensure `\\n` is included for newlines in the key) | |
* **GCP Refresh Token:** `PROJECT_ID:CLIENT_ID:CLIENT_SECRET:REFRESH_TOKEN` | |
* **Other keys:** Standard format provided by the vendor. | |
""" | |
) | |
claude_options = [ | |
"claude-3-haiku-20240307", | |
"claude-3-sonnet-20240229", | |
"claude-3-opus-20240229", | |
"claude-3-5-sonnet-20240620", | |
] | |
with gr.Row(): | |
with gr.Column(scale=1): | |
key_box = gr.Textbox( | |
lines=5, | |
max_lines=20, | |
label="API Key(s) - One per line", | |
placeholder="Enter one API key per line here.", | |
) | |
with gr.Row(): | |
claude_model = gr.Dropdown( | |
claude_options, | |
value="claude-3-haiku-20240307", | |
label="Claude Model (for filter/concurrent check)", | |
scale=3 | |
) | |
rate_limit = gr.Checkbox(label="Check Claude concurrent limit (exp.)", scale=1) | |
with gr.Row(): | |
clear_button = gr.Button("Clear All") | |
submit_button = gr.Button("Check Keys", variant="primary") | |
with gr.Column(scale=2): | |
info = gr.JSON(label=" API Key Information (All Results)", open=True) | |
oai_keys_output = gr.Textbox( | |
label="Working OpenAI Keys (Sufficient Quota)", | |
info="Lists OpenAI keys confirmed as working and having sufficient quota.", | |
lines=3, | |
max_lines=10, | |
interactive=False, | |
) | |
anthropic_keys_output = gr.Textbox( | |
label="Working Anthropic Keys", | |
info="Lists Anthropic keys confirmed as working (key_availability is True).", | |
lines=3, | |
max_lines=10, | |
interactive=False, | |
) | |
gemini_keys_output = gr.Textbox( | |
label="Working Gemini Keys", | |
info="Lists Gemini keys confirmed as 'Working'.", | |
lines=3, | |
max_lines=10, | |
interactive=False, | |
) | |
clear_button.click( | |
fn=clear_inputs, | |
inputs=None, | |
outputs=[key_box, info, oai_keys_output, anthropic_keys_output, gemini_keys_output] | |
) | |
submit_button.click( | |
fn=sort_keys, | |
inputs=[key_box, rate_limit, claude_model], | |
outputs=[info, oai_keys_output, anthropic_keys_output, gemini_keys_output], | |
api_name="sort_keys", | |
) | |
# demo.launch(share=True) | |
demo.launch() |