import asyncio import gradio as gr import requests import re from typing import List, Dict, Tuple, Any # 타입 힌트 추가 # api_usage 모듈은 사용자의 환경에 맞게 존재한다고 가정합니다. # 필요한 함수들을 임포트 (실제 사용 시 해당 모듈 필요) from api_usage import ( get_subscription, check_key_availability, get_orgs_me, check_key_ant_availability, check_ant_rate_limit, check_key_gemini_availability, # 이 함수는 (bool, str) 튜플을 반환한다고 가정 check_key_azure_availability, get_azure_status, get_azure_deploy, check_key_mistral_availability, check_mistral_quota, check_key_replicate_availability, check_key_aws_availability, check_key_or_availability, check_key_or_limits, check_gcp_anthropic, check_groq_status, check_nai_status, check_elevenlabs_status, check_xai_status, check_stability_status, check_deepseek_status, ) # ───────────────────────────────────────── # Key-specific helper functions (OpenAI 함수 수정됨) # ───────────────────────────────────────── def get_key_oai_info(key: str) -> Dict[str, Any]: session = requests.Session() # raw_status_code는 HTTP 응답 코드 또는 유사한 상태 표시자 # org_data_or_error는 성공 시 데이터, 실패 시 에러 정보 등 raw_status_code, org_data_or_error = check_key_availability(session, key) info_dict = { "key_type": "OpenAI", "key_availability": False, "has_sufficient_quota": True, # 기본적으로 True로 가정, 문제 시 False로 변경 "gpt4_availability": "", "gpt4_32k_availability": "", "default_org": "", "org_description": "", "organization": "", "models": "", "requests_per_minute": "", "tokens_per_minute": "", "quota": "", "all_models": "", "status_message": "" } org_data_for_subscription = None if raw_status_code == 200: info_dict["key_availability"] = True org_data_for_subscription = org_data_or_error elif raw_status_code == 401: # Unauthorized info_dict["status_message"] = "Unauthorized: Invalid API key." info_dict["has_sufficient_quota"] = False return info_dict elif raw_status_code == 403: # Forbidden status_me, orgs_me_data = get_orgs_me(session, key) if status_me == 200: info_dict["key_availability"] = True org_data_for_subscription = orgs_me_data else: info_dict["status_message"] = f"Forbidden, and get_orgs_me failed (status: {status_me}). Key might be inactive or lack permissions." info_dict["has_sufficient_quota"] = False return info_dict elif raw_status_code == 429: # Too Many Requests (Rate Limit or Quota) info_dict["key_availability"] = True # 키 자체는 유효할 수 있음 info_dict["has_sufficient_quota"] = False # 쿼터 부족으로 간주 info_dict["status_message"] = "Rate limit or quota likely exceeded (initial check)." if isinstance(org_data_or_error, dict) and "error" in org_data_or_error: error_details = org_data_or_error["error"] current_quota_message = error_details.get("message", "Quota details unavailable from initial check") info_dict["quota"] = current_quota_message if "insufficient_quota" not in current_quota_message.lower(): # 상세 메시지에 insufficient_quota가 없다면 상태 메시지에 추가 info_dict["status_message"] += f" Error: {current_quota_message}" # 여기서 반환하면 아래 구독 정보 확인을 건너뛰지만, has_sufficient_quota가 이미 False임 # return info_dict # 필요에 따라 주석 해제하여 더 빠른 반환 가능 else: # 기타 에러 info_dict["status_message"] = f"Key check failed (status: {raw_status_code})." if isinstance(org_data_or_error, dict) and "error" in org_data_or_error: info_dict["status_message"] += f" Error: {org_data_or_error['error'].get('message', str(org_data_or_error))}" elif isinstance(org_data_or_error, str): info_dict["status_message"] += f" Details: {org_data_or_error}" info_dict["has_sufficient_quota"] = False return info_dict if not info_dict["key_availability"]: return info_dict # 키가 유효하지 않으면 더 이상 진행 안 함 # org_data_for_subscription이 준비되지 않은 경우 (예: 초기 200이었지만 org_data가 부적절) if not org_data_for_subscription: status_me, orgs_me_data = get_orgs_me(session, key) if status_me == 200: org_data_for_subscription = orgs_me_data else: info_dict["status_message"] = (info_dict["status_message"] + " Could not identify organization for subscription.").strip() info_dict["key_availability"] = False info_dict["has_sufficient_quota"] = False return info_dict subscription_info = get_subscription(key, session, org_data_for_subscription) if subscription_info: info_dict.update( { "gpt4_availability": subscription_info.get("has_gpt4", ""), "gpt4_32k_availability": subscription_info.get("has_gpt4_32k", ""), "default_org": subscription_info.get("default_org", ""), "org_description": subscription_info.get("org_description", ""), "organization": subscription_info.get("organization", ""), "models": subscription_info.get("models", ""), "requests_per_minute": subscription_info.get("rpm", ""), "tokens_per_minute": subscription_info.get("tpm", ""), "quota": subscription_info.get("quota", info_dict["quota"]), "all_models": subscription_info.get("all_models", ""), } ) error_info = subscription_info.get("error") if error_info and isinstance(error_info, dict): err_type = error_info.get("type", "").lower() err_code = error_info.get("code", "").lower() err_msg = error_info.get("message", "").lower() if "insufficient_quota" in err_type or \ "insufficient_quota" in err_code or \ "you exceeded your current quota" in err_msg or \ "payment required" in err_msg or \ ("billing" in err_msg and "issue" in err_msg): info_dict["has_sufficient_quota"] = False new_quota_message = f"Insufficient: {error_info.get('message', err_type)}" info_dict["quota"] = new_quota_message # 쿼터 필드에 명시적 메시지 info_dict["status_message"] = (info_dict["status_message"] + f" Quota/Billing issue: {error_info.get('message', err_type)}").strip() # 계정 비활성화 등 확인 if "account_deactivated" in str(subscription_info).lower() or \ "payment_failed" in str(subscription_info).lower(): info_dict["has_sufficient_quota"] = False if "Account issue" not in info_dict["status_message"]: # 중복 메시지 방지 info_dict["status_message"] = (info_dict["status_message"] + " Account issue (e.g., deactivated, payment failed).").strip() else: info_dict["status_message"] = (info_dict["status_message"] + " Failed to retrieve full subscription details.").strip() info_dict["has_sufficient_quota"] = False # 구독 정보 없으면 key_availability도 False로 간주할 수 있음. # info_dict["key_availability"] = False # 주석 처리하여 키 자체는 유효할 수 있음을 남김 return info_dict async def get_key_ant_info(key: str, rate_limit: bool, claude_model: str) -> Dict[str, Any]: key_avai = await check_key_ant_availability(key, claude_model) info_dict = { "key_type": "Anthropic Claude", "key_availability": key_avai[0], "status": "", "filter_response": "", "requests_per_minute": "", "tokens_per_minute": "", "tokens_input_per_minute": "", "tokens_output_per_minute": "", "tier": "", "concurrent_rate_limit": "", "models": "", } info_dict["status"] = key_avai[1] info_dict["filter_response"] = key_avai[2] info_dict["requests_per_minute"] = key_avai[3] + ("" if key_avai[3] == "" else f" ({key_avai[4]} left)") info_dict["tokens_per_minute"] = key_avai[5] + ("" if key_avai[5] == "" else f" ({key_avai[6]} left)") info_dict["tokens_input_per_minute"] = key_avai[8] + ("" if key_avai[8] == "" else f" ({key_avai[9]} left)") info_dict["tokens_output_per_minute"] = key_avai[10] + ("" if key_avai[10] == "" else f" ({key_avai[11]} left)") info_dict["tier"] = key_avai[7] info_dict["models"] = key_avai[12] if rate_limit: rate = await check_ant_rate_limit(key, claude_model) info_dict["concurrent_rate_limit"] = rate return info_dict # 사용자의 원본 get_key_gemini_info 함수 유지 def get_key_gemini_info(key: str) -> Dict[str, Any]: """Gemini 키 정보를 가져옵니다 (사용자 원본 버전).""" key_avai = check_key_gemini_availability(key) # (bool, str) 튜플 반환 가정 info_dict = { "key": key, # 원본에는 key 필드가 있었음 "key_availability": key_avai[0], "status": key_avai[1], } return info_dict def get_key_azure_info(endpoint: str, api_key: str) -> Dict[str, Any]: key_avai = check_key_azure_availability(endpoint, api_key) info_dict = { "key_type": "Microsoft Azure OpenAI", "key_availability": key_avai[0], "gpt35_availability": "", "gpt4_availability": "", "gpt4_32k_availability": "", "dall_e_3_availability": "", "moderation_status": "", "models": "", "deployments": "", } if key_avai[0]: azure_deploy = get_azure_deploy(endpoint, api_key) if azure_deploy: status = get_azure_status(endpoint, api_key, azure_deploy) if status: info_dict["gpt35_availability"] = status[1] info_dict["gpt4_availability"] = status[2] info_dict["gpt4_32k_availability"] = status[3] info_dict["dall_e_3_availability"] = status[4] info_dict["moderation_status"] = status[0] info_dict["models"] = key_avai[1] info_dict["deployments"] = azure_deploy if azure_deploy else "N/A" return info_dict def get_key_mistral_info(key: str) -> Dict[str, Any]: key_avai = check_key_mistral_availability(key) info_dict = { "key_type": "Mistral AI", "key_availability": True if key_avai else False, "has_quota": "", "limits": "", "models": "", } if key_avai: quota_info = check_mistral_quota(key) if quota_info: info_dict["has_quota"] = quota_info[0] if quota_info[1]: info_dict["limits"] = quota_info[1] info_dict["models"] = key_avai return info_dict def get_key_replicate_info(key: str) -> Dict[str, Any]: key_avai = check_key_replicate_availability(key) info_dict = { "key_type": "Replicate", "key_availability": key_avai[0], "account_name": "", "type": "", "has_quota": "", "hardware_available": "", } if key_avai[0] and isinstance(key_avai[1], dict): info_dict["account_name"] = key_avai[1].get("username", "") info_dict["type"] = key_avai[1].get("type", "") info_dict["has_quota"] = key_avai[2] info_dict["hardware_available"] = key_avai[3] return info_dict async def get_key_aws_info(key: str) -> Dict[str, Any]: key_avai = await check_key_aws_availability(key) info_dict = { "key_type": "Amazon AWS Claude", "key_availability": key_avai[0], "username": "", "root": "", "admin": "", "quarantine": "", "iam_full_access": "", "iam_user_change_password": "", "aws_bedrock_full_access": "", "enabled_region": "", "models_usage": "", "cost_and_usage": key_avai[1] if not key_avai[0] else "", } if key_avai[0]: info_dict["username"] = key_avai[1] info_dict["root"] = key_avai[2] info_dict["admin"] = key_avai[3] info_dict["quarantine"] = key_avai[4] info_dict["iam_full_access"] = key_avai[5] info_dict["iam_user_change_password"] = key_avai[6] info_dict["aws_bedrock_full_access"] = key_avai[7] info_dict["enabled_region"] = key_avai[8] info_dict["models_usage"] = key_avai[9] info_dict["cost_and_usage"] = key_avai[10] return info_dict def get_key_openrouter_info(key: str) -> Dict[str, Any]: key_avai = check_key_or_availability(key) info_dict = { "key_type": "OpenRouter", "key_availability": key_avai[0], "is_free_tier": "", "usage": "", "balance": "", "limit": "", "limit_remaining": "", "rate_limit_per_minite": "", "4_turbo_per_request_tokens_limit": "", "sonnet_per_request_tokens_limit": "", "opus_per_request_tokens_limit": "", } if key_avai[0] and isinstance(key_avai[1], dict): models_info = check_key_or_limits(key) info_dict["is_free_tier"] = key_avai[1].get("is_free_tier", "") info_dict["limit"] = key_avai[1].get("limit", "") info_dict["limit_remaining"] = key_avai[1].get("limit_remaining", "") usage_val = key_avai[1].get("usage") info_dict["usage"] = f"${format(usage_val, '.4f')}" if isinstance(usage_val, (int, float)) else "" balance_val = models_info[0] if models_info else None rate_limit_val = key_avai[2] if len(key_avai) > 2 else None if balance_val is not None: info_dict["balance"] = f"${format(balance_val, '.4f')}" elif rate_limit_val is not None: try: estimated_balance = float(rate_limit_val) / 60 info_dict["balance"] = f"${format(estimated_balance, '.4f')} (estimated)" except (ValueError, TypeError): info_dict["balance"] = "$N/A (estimation failed)" else: info_dict["balance"] = "$N/A" info_dict["rate_limit_per_minite"] = rate_limit_val if rate_limit_val is not None else "" if models_info and isinstance(models_info[1], dict): model_limits = models_info[1] info_dict["4_turbo_per_request_tokens_limit"] = model_limits.get("openai/gpt-4o", "") info_dict["sonnet_per_request_tokens_limit"] = model_limits.get("anthropic/claude-3.5-sonnet:beta", "") info_dict["opus_per_request_tokens_limit"] = model_limits.get("anthropic/claude-3-opus:beta", "") elif not key_avai[0] and len(key_avai) > 1: info_dict["usage"] = key_avai[1] return info_dict async def get_key_gcp_info(key: str, type: int) -> Dict[str, Any]: key_avai = await check_gcp_anthropic(key, type) info_dict = { "key_type": "Vertex AI (GCP)", "key_availability": key_avai[0], "status": "", "enabled_region": "", } if key_avai[0]: info_dict["enabled_region"] = key_avai[2] elif len(key_avai) > 1: info_dict["status"] = key_avai[1] return info_dict def get_key_groq_info(key: str) -> Dict[str, Any]: key_avai = check_groq_status(key) info_dict = { "key_type": "Groq", "key_availability": True if key_avai else False, "models": key_avai if key_avai else "", } return info_dict def get_key_nai_info(key: str) -> Dict[str, Any]: key_avai = check_nai_status(key) info_dict = { "key_type": "NovelAI", "key_availability": True if key_avai[0] else False, "user_info": key_avai[1] if key_avai[0] else "", } return info_dict def get_key_elevenlabs_info(key: str) -> Dict[str, Any]: key_avai = check_elevenlabs_status(key) info_dict = { "key_type": "ElevenLabs", "key_availability": key_avai[0], "user_info": key_avai[1] if len(key_avai) > 1 else "", "voices_info": key_avai[2] if len(key_avai) > 2 else "", } return info_dict def get_key_xai_info(key: str) -> Dict[str, Any]: key_avai = check_xai_status(key) info_dict = { "key_type": "xAI Grok", "key_availability": key_avai[0], "key_status": "", "models": "", } if key_avai[0]: info_dict["key_status"] = key_avai[1] if len(key_avai) > 1 else "" info_dict["models"] = key_avai[2] if len(key_avai) > 2 else "" return info_dict def get_key_stability_info(key: str) -> Dict[str, Any]: key_avai = check_stability_status(key) info_dict = { "key_type": "Stability AI", "key_availability": key_avai[0], "account_info": "", "credits": "", "models": "", } if key_avai[0]: info_dict["account_info"] = key_avai[1] if len(key_avai) > 1 else "" info_dict["credits"] = key_avai[2] if len(key_avai) > 2 else "" info_dict["models"] = key_avai[3] if len(key_avai) > 3 else "" return info_dict def get_key_deepseek_info(key: str) -> Dict[str, Any]: key_avai = check_deepseek_status(key) info_dict = { "key_type": "DeepSeek", "key_availability": key_avai[0], "balance": "", "models": "", } if key_avai[0]: info_dict["models"] = key_avai[1] if len(key_avai) > 1 else "" info_dict["balance"] = key_avai[2] if len(key_avai) > 2 else "" return info_dict def not_supported(key: str) -> Dict[str, Any]: return { "key_type": "Not supported", "key_availability": False, "status": "Unknown key format", } # ───────────────────────────────────────── # 단일 키 비동기 처리 (업데이트됨) # ───────────────────────────────────────── async def process_single_key(key: str, rate_limit: bool, claude_model: str) -> Dict[str, Any]: """주어진 key 하나를 분석해 정보 dict 반환. Gemini 키의 작동 여부 플래그 포함.""" _key = key.strip() if not _key: return {"key": "", "key_type": "Empty", "key_availability": False} # OpenRouter if re.match(re.compile(r"sk-or-v1-[a-z0-9]{64}"), _key): result = get_key_openrouter_info(_key) return {"key": _key, **result} # Anthropic Claude if re.match(re.compile(r"sk-ant-api03-[a-zA-Z0-9\-_]{93}AA"), _key) or \ (_key.startswith("sk-ant-") and len(_key) == 93) or \ (len(_key) == 89 and re.match(re.compile(r"sk-[a-zA-Z0-9]{86}"), _key)): result = await get_key_ant_info(_key, rate_limit, claude_model) return {"key": _key, **result} # Stability if re.match(re.compile(r"sk-[a-zA-Z0-9]{48}"), _key) and len(_key) == 51 and "T3BlbkFJ" not in _key: result = get_key_stability_info(_key) return {"key": _key, **result} # Deepseek if re.match(re.compile(r"sk-[a-f0-9]{32}"), _key): result = get_key_deepseek_info(_key) return {"key": _key, **result} # OpenAI (다른 sk- 패턴보다 뒤에 와야 함) if _key.startswith("sk-"): result = get_key_oai_info(_key) # 수정된 함수 호출 return {"key": _key, **result} # Google Gemini 처리 if _key.startswith("AIzaSy"): gemini_info = get_key_gemini_info(_key) is_working = gemini_info.get("key_availability") and gemini_info.get("status") == "Working" result = { "key_type": "Google Gemini", **gemini_info, "is_gemini_working": is_working } return result # NovelAI if _key.startswith("pst-"): result = get_key_nai_info(_key) return {"key": _key, **result} # Replicate if (_key.startswith("r8_") and len(_key) == 40) or (_key.islower() and len(_key) == 40): result = get_key_replicate_info(_key) return {"key": _key, **result} # xAI if _key.startswith("xai-"): result = get_key_xai_info(_key) return {"key": _key, **result} # Azure endpoint: "name:key" if len(_key.split(":")) == 2: name, potential_key = _key.split(":", 1) if re.fullmatch(r'[a-fA-F0-9]{32}', potential_key) and "openai.azure.com" not in name: endpoint = f"https://{name}.openai.azure.com/" api_key = potential_key result = get_key_azure_info(endpoint, api_key) return {"key": _key, **result} # Azure endpoint: "https://xxx.openai.azure.com;key" if ";" in _key and "openai.azure.com" in _key.split(";")[0]: endpoint, api_key = _key.split(";", 1) result = get_key_azure_info(endpoint, api_key) return {"key": _key, **result} # AWS if _key.startswith("AKIA") and len(_key.split(":")[0]) == 20 and _key.split(":")[0].isalnum() and _key.split(":")[0].isupper() and len(_key.split(':')) == 2: result = await get_key_aws_info(_key) return {"key": _key, **result} # ElevenLabs if re.fullmatch(r"[a-f0-9]{32}", _key) or re.fullmatch(r"sk_[a-f0-9]{48}", _key): result = get_key_elevenlabs_info(_key) return {"key": _key, **result} # Mistral if re.fullmatch(r"[a-zA-Z0-9]{32}", _key) and not _key.startswith('sk-'): result = get_key_mistral_info(_key) return {"key": _key, **result} # Groq if re.match(re.compile(r"gsk_[a-zA-Z0-9]{20}WGdyb3FY[a-zA-Z0-9]{24}"), _key): result = get_key_groq_info(_key) return {"key": _key, **result} # GCP - refresh token if re.match(re.compile(r"[\w\-]+:[\w\-@\.]+:.+:.+"), _key): parts = _key.split(':') if len(parts) >= 4: result = await get_key_gcp_info(_key, 0) return {"key": _key, **result} # GCP - service account if re.match(re.compile(r"[\w\-]+:[\w\-@\.]+:.+\\n"), _key): parts = _key.split(':') if len(parts) >= 3: result = await get_key_gcp_info(_key, 1) return {"key": _key, **result} # Not supported result = not_supported(_key) return {"key": _key, **result} # ───────────────────────────────────────── # 여러 key 비동기 처리 함수 (업데이트됨) # ───────────────────────────────────────── async def sort_keys(text: str, rate_limit: bool, claude_model: str) -> Tuple[List[Dict[str, Any]], str, str, str]: """ 텍스트 박스에 입력된 여러 키를 줄 단위로 분석하고, 전체 결과와 작동하는 OAI, Anthropic, Gemini 키 목록을 각각 반환합니다. """ keys = [k.strip() for k in text.splitlines() if k.strip()] if not keys: return [], "", "", "" tasks = [process_single_key(k, rate_limit, claude_model) for k in keys] results = await asyncio.gather(*tasks) working_gemini_keys = [] working_oai_keys = [] working_anthropic_keys = [] for result in results: key_value = result.get("key") if not key_value: continue key_type = result.get("key_type") # Gemini 키 필터링 if result.get("is_gemini_working"): working_gemini_keys.append(key_value) # OpenAI 키 필터링 (수정됨: has_sufficient_quota 확인) elif key_type == "OpenAI" and \ result.get("key_availability") is True and \ result.get("has_sufficient_quota") is True: # 이 필드가 True여야 함 working_oai_keys.append(key_value) # Anthropic 키 필터링 elif key_type == "Anthropic Claude" and result.get("key_availability") is True: working_anthropic_keys.append(key_value) return results, "\n".join(working_oai_keys), "\n".join(working_anthropic_keys), "\n".join(working_gemini_keys) # ───────────────────────────────────────── # UI util (업데이트됨) # ───────────────────────────────────────── def clear_inputs(): return "", "", "", "", "" # ───────────────────────────────────────── # Gradio UI (업데이트됨) # ───────────────────────────────────────── with gr.Blocks() as demo: gr.Markdown( """ # Multi-API Key Status Checker (OAI, Anthropic, Gemini Enhanced) *(Based on shaocongma, CncAnon1, su, Drago, kingbased key checkers)* Check the status and details of various API keys including OpenAI, Anthropic, Gemini, Azure, Mistral, Replicate, AWS Claude, OpenRouter, Vertex AI (GCP Anthropic), Groq, NovelAI, ElevenLabs, xAI, Stability AI, and DeepSeek. This version highlights working OpenAI (with sufficient quota), Anthropic, and Gemini keys in separate text boxes. **Key Formats:** * **AWS:** `AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY` (root might not be accurate) * **Azure:** `RESOURCE_NAME:API_KEY` **or** `https://RESOURCE_NAME.openai.azure.com;API_KEY` * **GCP Service Account:** `PROJECT_ID:CLIENT_EMAIL:PRIVATE_KEY` (ensure `\\n` is included for newlines in the key) * **GCP Refresh Token:** `PROJECT_ID:CLIENT_ID:CLIENT_SECRET:REFRESH_TOKEN` * **Other keys:** Standard format provided by the vendor. """ ) claude_options = [ "claude-3-haiku-20240307", "claude-3-sonnet-20240229", "claude-3-opus-20240229", "claude-3-5-sonnet-20240620", ] with gr.Row(): with gr.Column(scale=1): key_box = gr.Textbox( lines=5, max_lines=20, label="API Key(s) - One per line", placeholder="Enter one API key per line here.", ) with gr.Row(): claude_model = gr.Dropdown( claude_options, value="claude-3-haiku-20240307", label="Claude Model (for filter/concurrent check)", scale=3 ) rate_limit = gr.Checkbox(label="Check Claude concurrent limit (exp.)", scale=1) with gr.Row(): clear_button = gr.Button("Clear All") submit_button = gr.Button("Check Keys", variant="primary") with gr.Column(scale=2): info = gr.JSON(label=" API Key Information (All Results)", open=True) oai_keys_output = gr.Textbox( label="Working OpenAI Keys (Sufficient Quota)", info="Lists OpenAI keys confirmed as working and having sufficient quota.", lines=3, max_lines=10, interactive=False, ) anthropic_keys_output = gr.Textbox( label="Working Anthropic Keys", info="Lists Anthropic keys confirmed as working (key_availability is True).", lines=3, max_lines=10, interactive=False, ) gemini_keys_output = gr.Textbox( label="Working Gemini Keys", info="Lists Gemini keys confirmed as 'Working'.", lines=3, max_lines=10, interactive=False, ) clear_button.click( fn=clear_inputs, inputs=None, outputs=[key_box, info, oai_keys_output, anthropic_keys_output, gemini_keys_output] ) submit_button.click( fn=sort_keys, inputs=[key_box, rate_limit, claude_model], outputs=[info, oai_keys_output, anthropic_keys_output, gemini_keys_output], api_name="sort_keys", ) # demo.launch(share=True) demo.launch()