File size: 6,994 Bytes
6fd8a52 a248251 6fd8a52 a248251 6fd8a52 a248251 6fd8a52 a248251 6fd8a52 a248251 6fd8a52 a248251 6fd8a52 a248251 6fd8a52 a248251 6fd8a52 a248251 6fd8a52 a248251 6fd8a52 a248251 6fd8a52 a248251 6fd8a52 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
import os
import json
import httpx
import time # Import time for checking token expiry
from dotenv import load_dotenv
from supabase import create_client, Client # Import Supabase client
load_dotenv()
# Import the token_store from store.py
from store import token_store
# Initialize Supabase client
# Ensure SUPABASE_URL and SUPABASE_KEY are set in your .env file
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_KEY")
if not SUPABASE_URL or not SUPABASE_KEY:
print("Error: SUPABASE_URL or SUPABASE_KEY not found in environment variables.")
# Depending on your application's needs, you might want to exit or handle this differently
# For now, we'll just print an error and the client will be None
supabase: Client | None = None
else:
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
print("Supabase client initialized.")
async def get_app_secret_from_db(app_id: str) -> str | None:
"""
Reads the app_secret from the Supabase database based on app_id.
Assumes a table named 'feishu_apps' with columns 'app_id' and 'app_secret'.
"""
if not supabase:
print("Supabase client not initialized, cannot fetch app_secret.")
return None # Correctly return None if supabase client is not initialized
try:
# Query the 'feishu_bot_config' table for the platform_specific column matching the bot_id
# Use maybe_single() as we expect at most one row, and execute() to get the result
response = supabase.table('feishu_bot_config').select('platform_specific').eq('bot_id', app_id).single().execute()
# Check if a row was returned and extract the app_secret from platform_specific
# The result of maybe_single().execute() will have a 'data' attribute
if response.data and 'platform_specific' in response.data and 'app_secret' in response.data['platform_specific']:
app_secret = response.data['platform_specific']['app_secret']
return app_secret
else:
print(f"No app_secret found in Supabase for app_id: {app_id}.")
return None
except Exception as e:
print(f"Error fetching app_secret from Supabase for app_id {app_id}: {e}")
return None
async def get_valid_tenant_access_token(app_id: str) -> str | None:
"""
Retrieves a valid tenant access token for the given app_id.
Prioritizes fetching from the global store if not expired,
otherwise generates a new one and stores it.
Args:
app_id: The Feishu App ID.
Returns:
A valid tenant access token or None if unable to obtain one.
"""
# 1. Prioritize fetching from the global store
stored_token_info = token_store.get(app_id)
if stored_token_info:
token = stored_token_info.get('token')
created_time = stored_token_info.get('created_time')
expires_in = stored_token_info.get('expires_in')
# 2. Check if the stored token is still valid
if token and created_time is not None and expires_in is not None:
current_time = time.time()
# Add a small buffer (e.g., 60 seconds) to avoid using tokens that are about to expire
if current_time < created_time + expires_in - 60:
print(f"Using cached tenant access token for app_id: {app_id}")
return token
else:
print(f"Cached tenant access token for app_id: {app_id} has expired.")
else:
print(f"Incomplete token info found in store for app_id: {app_id}.")
# If no valid token in store, generate a new one
print(f"\n\nGenerating new tenant access token for app_id: {app_id}")
# Read app_secret from the simulated database
app_secret = await get_app_secret_from_db(app_id)
if not app_secret:
print(f"Could not retrieve valid app_secret for app_id: {app_id}")
return None
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
headers = {
"Content-Type": "application/json"
}
payload = {
"app_id": app_id,
"app_secret": app_secret
}
async with httpx.AsyncClient() as client:
try:
response = await client.post(url, headers=headers, json=payload)
response.raise_for_status() # Raise an exception for bad status codes
data = response.json()
if data.get("code") == 0:
new_token = data.get("tenant_access_token")
expire = data.get("expire") # Note: Feishu API returns 'expire_in'
if new_token and expire is not None:
# 4. Store the new token and its expiry information
token_store[app_id] = {
'token': new_token,
'created_time': time.time(),
'expire': expire
}
return new_token
else:
print(f"Error generating new tenant access token: Missing token or expiry info in response.")
return None
else:
print(f"Error getting tenant access token from API: {data.get('msg')}")
return None
except httpx.HTTPStatusError as e:
print(f"HTTP error occurred while generating token: {e}")
return None
except httpx.RequestError as e:
print(f"An error occurred while requesting {e.request.url!r} to generate token: {e}")
return None
except Exception as e:
print(f"An unexpected error occurred while generating token: {e}")
return None
async def send_feishu_reply(msg_id: str, tenant_access_token: str, content: dict, msg_type: str):
"""
Sends a reply message to Feishu API.
Args:
msg_id: The message ID to reply to.
tenant_access_token: The tenant access token.
content: The content of the reply message (dictionary).
msg_type: The type of the message (e.g., 'text').
"""
url = f"https://open.feishu.cn/open-apis/im/v1/messages/{msg_id}/reply"
headers = {
"Authorization": f"Bearer {tenant_access_token}",
"Content-Type": "application/json"
}
# The content needs to be a JSON string within the 'content' field of the body
body = {
"content": json.dumps(content),
"msg_type": msg_type
}
async with httpx.AsyncClient() as client:
try:
response = await client.post(url, headers=headers, json=body)
response.raise_for_status() # Raise an exception for bad status codes
except httpx.HTTPStatusError as e:
print(f"HTTP error sending reply: {e}")
except httpx.RequestError as e:
print(f"An error occurred while requesting {e.request.url!r} to send reply: {e}")
except Exception as e:
print(f"An unexpected error occurred while sending reply: {e}")
|