import os import json import httpx import time # Import time for checking token expiry from dotenv import load_dotenv from supabase import create_client, Client # Import Supabase client load_dotenv() # Import the token_store from store.py from store import token_store # Initialize Supabase client # Ensure SUPABASE_URL and SUPABASE_KEY are set in your .env file SUPABASE_URL = os.getenv("SUPABASE_URL") SUPABASE_KEY = os.getenv("SUPABASE_KEY") if not SUPABASE_URL or not SUPABASE_KEY: print("Error: SUPABASE_URL or SUPABASE_KEY not found in environment variables.") # Depending on your application's needs, you might want to exit or handle this differently # For now, we'll just print an error and the client will be None supabase: Client | None = None else: supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) print("Supabase client initialized.") async def get_app_secret_from_db(app_id: str) -> str | None: """ Reads the app_secret from the Supabase database based on app_id. Assumes a table named 'feishu_apps' with columns 'app_id' and 'app_secret'. """ if not supabase: print("Supabase client not initialized, cannot fetch app_secret.") return None # Correctly return None if supabase client is not initialized try: # Query the 'feishu_bot_config' table for the platform_specific column matching the bot_id # Use maybe_single() as we expect at most one row, and execute() to get the result response = supabase.table('feishu_bot_config').select('platform_specific').eq('bot_id', app_id).single().execute() # Check if a row was returned and extract the app_secret from platform_specific # The result of maybe_single().execute() will have a 'data' attribute if response.data and 'platform_specific' in response.data and 'app_secret' in response.data['platform_specific']: app_secret = response.data['platform_specific']['app_secret'] return app_secret else: print(f"No app_secret found in Supabase for app_id: {app_id}.") return None except Exception as e: print(f"Error fetching app_secret from Supabase for app_id {app_id}: {e}") return None async def get_valid_tenant_access_token(app_id: str) -> str | None: """ Retrieves a valid tenant access token for the given app_id. Prioritizes fetching from the global store if not expired, otherwise generates a new one and stores it. Args: app_id: The Feishu App ID. Returns: A valid tenant access token or None if unable to obtain one. """ # 1. Prioritize fetching from the global store stored_token_info = token_store.get(app_id) if stored_token_info: token = stored_token_info.get('token') created_time = stored_token_info.get('created_time') expires_in = stored_token_info.get('expires_in') # 2. Check if the stored token is still valid if token and created_time is not None and expires_in is not None: current_time = time.time() # Add a small buffer (e.g., 60 seconds) to avoid using tokens that are about to expire if current_time < created_time + expires_in - 60: print(f"Using cached tenant access token for app_id: {app_id}") return token else: print(f"Cached tenant access token for app_id: {app_id} has expired.") else: print(f"Incomplete token info found in store for app_id: {app_id}.") # If no valid token in store, generate a new one print(f"\n\nGenerating new tenant access token for app_id: {app_id}") # Read app_secret from the simulated database app_secret = await get_app_secret_from_db(app_id) if not app_secret: print(f"Could not retrieve valid app_secret for app_id: {app_id}") return None url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal" headers = { "Content-Type": "application/json" } payload = { "app_id": app_id, "app_secret": app_secret } async with httpx.AsyncClient() as client: try: response = await client.post(url, headers=headers, json=payload) response.raise_for_status() # Raise an exception for bad status codes data = response.json() if data.get("code") == 0: new_token = data.get("tenant_access_token") expire = data.get("expire") # Note: Feishu API returns 'expire_in' if new_token and expire is not None: # 4. Store the new token and its expiry information token_store[app_id] = { 'token': new_token, 'created_time': time.time(), 'expire': expire } return new_token else: print(f"Error generating new tenant access token: Missing token or expiry info in response.") return None else: print(f"Error getting tenant access token from API: {data.get('msg')}") return None except httpx.HTTPStatusError as e: print(f"HTTP error occurred while generating token: {e}") return None except httpx.RequestError as e: print(f"An error occurred while requesting {e.request.url!r} to generate token: {e}") return None except Exception as e: print(f"An unexpected error occurred while generating token: {e}") return None async def send_feishu_reply(msg_id: str, tenant_access_token: str, content: dict, msg_type: str): """ Sends a reply message to Feishu API. Args: msg_id: The message ID to reply to. tenant_access_token: The tenant access token. content: The content of the reply message (dictionary). msg_type: The type of the message (e.g., 'text'). """ url = f"https://open.feishu.cn/open-apis/im/v1/messages/{msg_id}/reply" headers = { "Authorization": f"Bearer {tenant_access_token}", "Content-Type": "application/json" } # The content needs to be a JSON string within the 'content' field of the body body = { "content": json.dumps(content), "msg_type": msg_type } async with httpx.AsyncClient() as client: try: response = await client.post(url, headers=headers, json=body) response.raise_for_status() # Raise an exception for bad status codes except httpx.HTTPStatusError as e: print(f"HTTP error sending reply: {e}") except httpx.RequestError as e: print(f"An error occurred while requesting {e.request.url!r} to send reply: {e}") except Exception as e: print(f"An unexpected error occurred while sending reply: {e}")