|
import os |
|
import json |
|
import httpx |
|
import time |
|
from dotenv import load_dotenv |
|
from supabase import create_client, Client |
|
|
|
load_dotenv() |
|
|
|
|
|
from store import token_store |
|
|
|
|
|
|
|
SUPABASE_URL = os.getenv("SUPABASE_URL") |
|
SUPABASE_KEY = os.getenv("SUPABASE_KEY") |
|
if not SUPABASE_URL or not SUPABASE_KEY: |
|
print("Error: SUPABASE_URL or SUPABASE_KEY not found in environment variables.") |
|
|
|
|
|
supabase: Client | None = None |
|
else: |
|
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) |
|
print("Supabase client initialized.") |
|
|
|
async def get_app_secret_from_db(app_id: str) -> str | None: |
|
""" |
|
Reads the app_secret from the Supabase database based on app_id. |
|
Assumes a table named 'feishu_apps' with columns 'app_id' and 'app_secret'. |
|
""" |
|
if not supabase: |
|
print("Supabase client not initialized, cannot fetch app_secret.") |
|
return None |
|
|
|
try: |
|
|
|
|
|
response = supabase.table('feishu_bot_config').select('platform_specific').eq('bot_id', app_id).single().execute() |
|
|
|
|
|
if response.data and 'platform_specific' in response.data and 'app_secret' in response.data['platform_specific']: |
|
app_secret = response.data['platform_specific']['app_secret'] |
|
return app_secret |
|
else: |
|
print(f"No app_secret found in Supabase for app_id: {app_id}.") |
|
return None |
|
except Exception as e: |
|
print(f"Error fetching app_secret from Supabase for app_id {app_id}: {e}") |
|
return None |
|
|
|
async def get_valid_tenant_access_token(app_id: str) -> str | None: |
|
""" |
|
Retrieves a valid tenant access token for the given app_id. |
|
Prioritizes fetching from the global store if not expired, |
|
otherwise generates a new one and stores it. |
|
|
|
Args: |
|
app_id: The Feishu App ID. |
|
|
|
Returns: |
|
A valid tenant access token or None if unable to obtain one. |
|
""" |
|
|
|
stored_token_info = token_store.get(app_id) |
|
if stored_token_info: |
|
token = stored_token_info.get('token') |
|
created_time = stored_token_info.get('created_time') |
|
expires_in = stored_token_info.get('expires_in') |
|
|
|
|
|
if token and created_time is not None and expires_in is not None: |
|
current_time = time.time() |
|
|
|
if current_time < created_time + expires_in - 60: |
|
print(f"Using cached tenant access token for app_id: {app_id}") |
|
return token |
|
else: |
|
print(f"Cached tenant access token for app_id: {app_id} has expired.") |
|
else: |
|
print(f"Incomplete token info found in store for app_id: {app_id}.") |
|
|
|
|
|
print(f"\n\nGenerating new tenant access token for app_id: {app_id}") |
|
|
|
|
|
app_secret = await get_app_secret_from_db(app_id) |
|
|
|
if not app_secret: |
|
print(f"Could not retrieve valid app_secret for app_id: {app_id}") |
|
return None |
|
|
|
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal" |
|
headers = { |
|
"Content-Type": "application/json" |
|
} |
|
payload = { |
|
"app_id": app_id, |
|
"app_secret": app_secret |
|
} |
|
|
|
async with httpx.AsyncClient() as client: |
|
try: |
|
response = await client.post(url, headers=headers, json=payload) |
|
response.raise_for_status() |
|
data = response.json() |
|
if data.get("code") == 0: |
|
new_token = data.get("tenant_access_token") |
|
expire = data.get("expire") |
|
if new_token and expire is not None: |
|
|
|
token_store[app_id] = { |
|
'token': new_token, |
|
'created_time': time.time(), |
|
'expire': expire |
|
} |
|
return new_token |
|
else: |
|
print(f"Error generating new tenant access token: Missing token or expiry info in response.") |
|
return None |
|
else: |
|
print(f"Error getting tenant access token from API: {data.get('msg')}") |
|
return None |
|
except httpx.HTTPStatusError as e: |
|
print(f"HTTP error occurred while generating token: {e}") |
|
return None |
|
except httpx.RequestError as e: |
|
print(f"An error occurred while requesting {e.request.url!r} to generate token: {e}") |
|
return None |
|
except Exception as e: |
|
print(f"An unexpected error occurred while generating token: {e}") |
|
return None |
|
|
|
async def send_feishu_reply(msg_id: str, tenant_access_token: str, content: dict, msg_type: str): |
|
""" |
|
Sends a reply message to Feishu API. |
|
|
|
Args: |
|
msg_id: The message ID to reply to. |
|
tenant_access_token: The tenant access token. |
|
content: The content of the reply message (dictionary). |
|
msg_type: The type of the message (e.g., 'text'). |
|
""" |
|
url = f"https://open.feishu.cn/open-apis/im/v1/messages/{msg_id}/reply" |
|
headers = { |
|
"Authorization": f"Bearer {tenant_access_token}", |
|
"Content-Type": "application/json" |
|
} |
|
|
|
body = { |
|
"content": json.dumps(content), |
|
"msg_type": msg_type |
|
} |
|
|
|
async with httpx.AsyncClient() as client: |
|
try: |
|
response = await client.post(url, headers=headers, json=body) |
|
response.raise_for_status() |
|
except httpx.HTTPStatusError as e: |
|
print(f"HTTP error sending reply: {e}") |
|
except httpx.RequestError as e: |
|
print(f"An error occurred while requesting {e.request.url!r} to send reply: {e}") |
|
except Exception as e: |
|
print(f"An unexpected error occurred while sending reply: {e}") |
|
|