|
import os |
|
import asyncio |
|
import redis.asyncio as redis |
|
from dotenv import load_dotenv |
|
from redis.exceptions import ConnectionError |
|
import json |
|
from feishu_service import get_valid_tenant_access_token, send_feishu_reply |
|
|
|
load_dotenv() |
|
|
|
redis_host = os.getenv('REDIS_HOST') |
|
redis_password = os.getenv('REDIS_PASSWORD') |
|
redis_port = os.getenv('REDIS_PORT', 6379) |
|
|
|
|
|
r = redis.Redis( |
|
host=redis_host, |
|
port=int(redis_port), |
|
password=redis_password, |
|
ssl=True |
|
) |
|
|
|
async def connect_redis(): |
|
""" |
|
Connects to Redis and performs a ping test. |
|
""" |
|
try: |
|
await r.ping() |
|
print("Connected to Redis!") |
|
|
|
|
|
try: |
|
await r.set("mykey", "myvalue") |
|
value = await r.get("mykey") |
|
except Exception as e: |
|
print(f"Redis SET/GET test failed: {e}") |
|
|
|
except ConnectionError as e: |
|
print(f"Could not connect to Redis: {e}") |
|
|
|
raise |
|
|
|
async def redis_subscriber(redis_client, channel_name): |
|
""" |
|
Asynchronous function to subscribe to a Redis channel and process messages. |
|
""" |
|
|
|
pubsub = redis_client.pubsub() |
|
await asyncio.sleep(1) |
|
await pubsub.subscribe(channel_name) |
|
|
|
|
|
while True: |
|
message = await pubsub.get_message(ignore_subscribe_messages=True) |
|
if message: |
|
channel = message['channel'].decode() |
|
data_str = message['data'].decode() |
|
|
|
|
|
|
|
try: |
|
data_dict = json.loads(data_str) |
|
|
|
|
|
app_id = data_dict.get('platform_specific', {}).get('app_id') |
|
|
|
|
|
tenant_access_token = None |
|
if app_id: |
|
tenant_access_token = await get_valid_tenant_access_token(app_id) |
|
else: |
|
print("Missing app_id in received data, cannot get tenant access token.") |
|
|
|
if tenant_access_token: |
|
|
|
msg_id = data_dict.get('msg_id') |
|
content = data_dict.get('content') |
|
|
|
msg_type = data_dict.get('original_msg_type') |
|
|
|
if msg_id and content and msg_type: |
|
|
|
await send_feishu_reply(msg_id, tenant_access_token, content, msg_type) |
|
else: |
|
print("Missing msg_id, content, or msg_type in received data, cannot send reply.") |
|
|
|
else: |
|
print("Failed to get tenant access token, cannot process message.") |
|
|
|
except json.JSONDecodeError as e: |
|
print(f"Could not parse message data as JSON: {e}") |
|
|
|
except Exception as e: |
|
print(f"An unexpected error occurred while processing message: {e}") |
|
|
|
await asyncio.sleep(0.01) |
|
|
|
async def publish_message(channel: str, message: str): |
|
""" |
|
Publishes a message to a Redis channel. |
|
""" |
|
try: |
|
|
|
await r.publish(channel, message) |
|
|
|
return {"status": "success", "message": f"Published message to channel '{channel}'"} |
|
|
|
except Exception as e: |
|
print(f"Error publishing message: {e}") |
|
return {"status": "error", "message": str(e)} |
|
|