|
import os |
|
from typing import Literal, Optional, Dict, Any, Callable, Awaitable |
|
import requests |
|
|
|
from fastrtc import ( |
|
get_hf_turn_credentials, |
|
get_twilio_turn_credentials, |
|
get_cloudflare_turn_credentials, |
|
get_cloudflare_turn_credentials_async |
|
) |
|
|
|
|
|
def get_rtc_credentials( |
|
provider: Literal["hf", "twilio", "cloudflare", "hf-cloudflare"] = "hf-cloudflare", |
|
**kwargs |
|
) -> Dict[str, Any]: |
|
""" |
|
Get RTC configuration for different TURN server providers. |
|
|
|
Args: |
|
provider: The TURN server provider to use ('hf', 'twilio', 'cloudflare', or 'hf-cloudflare') |
|
**kwargs: Additional arguments passed to the specific provider's function |
|
|
|
Returns: |
|
Dictionary containing the RTC configuration |
|
""" |
|
try: |
|
if provider == "hf": |
|
|
|
|
|
|
|
|
|
token = kwargs.pop("token", os.environ.get("HF_TOKEN")) |
|
if not token: |
|
raise ValueError("HF_TOKEN environment variable not set") |
|
return get_hf_turn_credentials(token=token) |
|
|
|
elif provider == "twilio": |
|
|
|
|
|
|
|
|
|
account_sid = kwargs.pop("account_sid", os.environ.get("TWILIO_ACCOUNT_SID")) |
|
auth_token = kwargs.pop("auth_token", os.environ.get("TWILIO_AUTH_TOKEN")) |
|
if not account_sid or not auth_token: |
|
raise ValueError("Twilio credentials not found. Set TWILIO_ACCOUNT_SID and TWILIO_AUTH_TOKEN env vars") |
|
return get_twilio_turn_credentials(account_sid=account_sid, auth_token=auth_token) |
|
|
|
elif provider == "cloudflare": |
|
|
|
|
|
|
|
|
|
|
|
key_id = kwargs.pop("key_id", os.environ.get("TURN_KEY_ID")) |
|
api_token = kwargs.pop("api_token", os.environ.get("TURN_KEY_API_TOKEN")) |
|
ttl = kwargs.pop("ttl", 86400) |
|
if not key_id or not api_token: |
|
raise ValueError("Cloudflare credentials not found. Set TURN_KEY_ID and TURN_KEY_API_TOKEN env vars") |
|
return get_cloudflare_turn_credentials(key_id=key_id, api_token=api_token, ttl=ttl) |
|
|
|
elif provider == "hf-cloudflare": |
|
|
|
|
|
|
|
|
|
hf_token = kwargs.pop("hf_token", os.environ.get("HF_TOKEN")) |
|
ttl = kwargs.pop("ttl", 86400) |
|
if not hf_token: |
|
raise ValueError("HF_TOKEN environment variable not set") |
|
return get_cloudflare_turn_credentials(hf_token=hf_token, ttl=ttl) |
|
else: |
|
raise ValueError(f"Unknown provider: {provider}") |
|
except Exception as e: |
|
raise Exception(f"Failed to get RTC credentials ({provider}): {str(e)}") |
|
|
|
|
|
async def get_rtc_credentials_async( |
|
provider: Literal["hf-cloudflare"] = "hf-cloudflare", |
|
**kwargs |
|
) -> Dict[str, Any]: |
|
""" |
|
Get RTC configuration asynchronously for different TURN server providers. |
|
|
|
Args: |
|
provider: Currently only supports 'hf-cloudflare' |
|
**kwargs: Additional arguments passed to the specific provider's function |
|
|
|
Returns: |
|
Dictionary containing the RTC configuration |
|
""" |
|
if provider != "hf-cloudflare": |
|
raise NotImplementedError(f"Async credentials for {provider} not implemented") |
|
|
|
try: |
|
|
|
hf_token = kwargs.pop("hf_token", os.environ.get("HF_TOKEN")) |
|
ttl = kwargs.pop("ttl", 600) |
|
if not hf_token: |
|
raise ValueError("HF_TOKEN environment variable not set") |
|
return await get_cloudflare_turn_credentials_async(hf_token=hf_token, ttl=ttl) |
|
except Exception as e: |
|
raise Exception(f"Failed to get async RTC credentials: {str(e)}") |
|
|
|
|
|
def get_credential_function(provider: str, is_async: bool = False) -> Callable: |
|
""" |
|
Get the appropriate credential function based on provider and whether async is needed. |
|
|
|
Args: |
|
provider: The TURN server provider |
|
is_async: Whether to return an async function |
|
|
|
Returns: |
|
Function that returns credentials (async or sync) |
|
""" |
|
if is_async and provider == "hf-cloudflare": |
|
async def get_creds(): |
|
return await get_rtc_credentials_async(provider=provider) |
|
return get_creds |
|
else: |
|
def get_creds(): |
|
return get_rtc_credentials(provider=provider) |
|
return get_creds |