feishu / feishu_service.py
geqintan's picture
update
aa3e29b
import os
import json
import httpx
import time # Import time for checking token expiry
from dotenv import load_dotenv
from supabase import create_client, Client # Import Supabase client
load_dotenv()
# Import the token_store from store.py
from store import token_store
# Initialize Supabase client
# Ensure SUPABASE_URL and SUPABASE_KEY are set in your .env file
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_KEY")
if not SUPABASE_URL or not SUPABASE_KEY:
print("Error: SUPABASE_URL or SUPABASE_KEY not found in environment variables.")
# Depending on your application's needs, you might want to exit or handle this differently
# For now, we'll just print an error and the client will be None
supabase: Client | None = None
else:
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
print("Supabase client initialized.")
async def get_app_secret_from_db(app_id: str) -> str | None:
"""
Reads the app_secret from the Supabase database based on app_id.
Assumes a table named 'feishu_apps' with columns 'app_id' and 'app_secret'.
"""
if not supabase:
print("Supabase client not initialized, cannot fetch app_secret.")
return None # Correctly return None if supabase client is not initialized
try:
# Query the 'feishu_bot_config' table for the platform_specific column matching the bot_id
# Use maybe_single() as we expect at most one row, and execute() to get the result
response = supabase.table('feishu_bot_config').select('platform_specific').eq('bot_id', app_id).single().execute()
# Check if a row was returned and extract the app_secret from platform_specific
# The result of maybe_single().execute() will have a 'data' attribute
if response.data and 'platform_specific' in response.data and 'app_secret' in response.data['platform_specific']:
app_secret = response.data['platform_specific']['app_secret']
return app_secret
else:
print(f"No app_secret found in Supabase for app_id: {app_id}.")
return None
except Exception as e:
print(f"Error fetching app_secret from Supabase for app_id {app_id}: {e}")
return None
async def get_valid_tenant_access_token(app_id: str) -> str | None:
"""
Retrieves a valid tenant access token for the given app_id.
Prioritizes fetching from the global store if not expired,
otherwise generates a new one and stores it.
Args:
app_id: The Feishu App ID.
Returns:
A valid tenant access token or None if unable to obtain one.
"""
# 1. Prioritize fetching from the global store
stored_token_info = token_store.get(app_id)
if stored_token_info:
token = stored_token_info.get('token')
created_time = stored_token_info.get('created_time')
expires_in = stored_token_info.get('expires_in')
# 2. Check if the stored token is still valid
if token and created_time is not None and expires_in is not None:
current_time = time.time()
# Add a small buffer (e.g., 60 seconds) to avoid using tokens that are about to expire
if current_time < created_time + expires_in - 60:
print(f"Using cached tenant access token for app_id: {app_id}")
return token
else:
print(f"Cached tenant access token for app_id: {app_id} has expired.")
else:
print(f"Incomplete token info found in store for app_id: {app_id}.")
# If no valid token in store, generate a new one
print(f"\n\nGenerating new tenant access token for app_id: {app_id}")
# Read app_secret from the simulated database
app_secret = await get_app_secret_from_db(app_id)
if not app_secret:
print(f"Could not retrieve valid app_secret for app_id: {app_id}")
return None
url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
headers = {
"Content-Type": "application/json"
}
payload = {
"app_id": app_id,
"app_secret": app_secret
}
async with httpx.AsyncClient() as client:
try:
response = await client.post(url, headers=headers, json=payload)
response.raise_for_status() # Raise an exception for bad status codes
data = response.json()
if data.get("code") == 0:
new_token = data.get("tenant_access_token")
expire = data.get("expire") # Note: Feishu API returns 'expire_in'
if new_token and expire is not None:
# 4. Store the new token and its expiry information
token_store[app_id] = {
'token': new_token,
'created_time': time.time(),
'expire': expire
}
return new_token
else:
print(f"Error generating new tenant access token: Missing token or expiry info in response.")
return None
else:
print(f"Error getting tenant access token from API: {data.get('msg')}")
return None
except httpx.HTTPStatusError as e:
print(f"HTTP error occurred while generating token: {e}")
return None
except httpx.RequestError as e:
print(f"An error occurred while requesting {e.request.url!r} to generate token: {e}")
return None
except Exception as e:
print(f"An unexpected error occurred while generating token: {e}")
return None
async def send_feishu_reply(msg_id: str, tenant_access_token: str, content: dict, msg_type: str):
"""
Sends a reply message to Feishu API.
Args:
msg_id: The message ID to reply to.
tenant_access_token: The tenant access token.
content: The content of the reply message (dictionary).
msg_type: The type of the message (e.g., 'text').
"""
url = f"https://open.feishu.cn/open-apis/im/v1/messages/{msg_id}/reply"
headers = {
"Authorization": f"Bearer {tenant_access_token}",
"Content-Type": "application/json"
}
# The content needs to be a JSON string within the 'content' field of the body
body = {
"content": json.dumps(content),
"msg_type": msg_type
}
async with httpx.AsyncClient() as client:
try:
response = await client.post(url, headers=headers, json=body)
response.raise_for_status() # Raise an exception for bad status codes
except httpx.HTTPStatusError as e:
print(f"HTTP error sending reply: {e}")
except httpx.RequestError as e:
print(f"An error occurred while requesting {e.request.url!r} to send reply: {e}")
except Exception as e:
print(f"An unexpected error occurred while sending reply: {e}")