import os import asyncio import redis.asyncio as redis from dotenv import load_dotenv from redis.exceptions import ConnectionError import json from feishu_service import get_valid_tenant_access_token, send_feishu_reply # Import Feishu service functions load_dotenv() redis_host = os.getenv('REDIS_HOST') redis_password = os.getenv('REDIS_PASSWORD') redis_port = os.getenv('REDIS_PORT', 6379) # Use async Redis client r = redis.Redis( host=redis_host, port=int(redis_port), password=redis_password, ssl=True ) async def connect_redis(): """ Connects to Redis and performs a ping test. """ try: await r.ping() print("Connected to Redis!") # Perform SET and GET test after successful connection try: await r.set("mykey", "myvalue") value = await r.get("mykey") except Exception as e: print(f"Redis SET/GET test failed: {e}") except ConnectionError as e: print(f"Could not connect to Redis: {e}") # If connection fails, we might not be able to proceed further raise # Re-raise the exception to indicate connection failure async def redis_subscriber(redis_client, channel_name): """ Asynchronous function to subscribe to a Redis channel and process messages. """ # print(f"Subscribing to channel: {channel_name}") pubsub = redis_client.pubsub() await asyncio.sleep(1) # Add a small delay before subscribing await pubsub.subscribe(channel_name) # Use await for async subscribe # Listen for messages while True: message = await pubsub.get_message(ignore_subscribe_messages=True) if message: channel = message['channel'].decode() data_str = message['data'].decode() # print(f"\n\n\nReceived message on channel '{channel}': {data_str}") # Convert the received JSON string back to a Python dictionary try: data_dict = json.loads(data_str) # Extract app_id from platform_specific app_id = data_dict.get('platform_specific', {}).get('app_id') # Get valid tenant access token using the Feishu service function, passing only app_id tenant_access_token = None if app_id: tenant_access_token = await get_valid_tenant_access_token(app_id) else: print("Missing app_id in received data, cannot get tenant access token.") if tenant_access_token: # Extract necessary info from data_dict for sending reply msg_id = data_dict.get('msg_id') content = data_dict.get('content') # This should be the dictionary like {"text": "..."} # Correctly extract msg_type from the original_msg_type field msg_type = data_dict.get('original_msg_type') if msg_id and content and msg_type: # Send the reply using the Feishu service function await send_feishu_reply(msg_id, tenant_access_token, content, msg_type) else: print("Missing msg_id, content, or msg_type in received data, cannot send reply.") else: print("Failed to get tenant access token, cannot process message.") except json.JSONDecodeError as e: print(f"Could not parse message data as JSON: {e}") # Handle cases where the message is not valid JSON except Exception as e: print(f"An unexpected error occurred while processing message: {e}") await asyncio.sleep(0.01) # Prevent blocking the event loop async def publish_message(channel: str, message: str): """ Publishes a message to a Redis channel. """ try: # Publish the message to the specified channel (publish is asynchronous) await r.publish(channel, message) # print(f"Published message to channel '{channel}'") return {"status": "success", "message": f"Published message to channel '{channel}'"} except Exception as e: print(f"Error publishing message: {e}") return {"status": "error", "message": str(e)}