File size: 4,309 Bytes
6fd8a52
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aa3e29b
6fd8a52
 
 
 
 
 
 
 
 
 
aa3e29b
6fd8a52
 
 
 
a248251
 
 
 
 
 
 
 
 
 
 
6fd8a52
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aa3e29b
6fd8a52
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import os
import asyncio
import redis.asyncio as redis
from dotenv import load_dotenv
from redis.exceptions import ConnectionError
import json
from feishu_service import get_valid_tenant_access_token, send_feishu_reply # Import Feishu service functions

load_dotenv()

redis_host = os.getenv('REDIS_HOST')
redis_password = os.getenv('REDIS_PASSWORD')
redis_port = os.getenv('REDIS_PORT', 6379)

# Use async Redis client
r = redis.Redis(
  host=redis_host,
  port=int(redis_port),
  password=redis_password,
  ssl=True
)

async def connect_redis():
    """
    Connects to Redis and performs a ping test.
    """
    try:
        await r.ping()
        print("Connected to Redis!")

        # Perform SET and GET test after successful connection
        try:
            await r.set("mykey", "myvalue")
            value = await r.get("mykey")
        except Exception as e:
            print(f"Redis SET/GET test failed: {e}")

    except ConnectionError as e:
        print(f"Could not connect to Redis: {e}")
        # If connection fails, we might not be able to proceed further
        raise # Re-raise the exception to indicate connection failure

async def redis_subscriber(redis_client, channel_name):
    """
    Asynchronous function to subscribe to a Redis channel and process messages.
    """
    # print(f"Subscribing to channel: {channel_name}")
    pubsub = redis_client.pubsub()
    await asyncio.sleep(1) # Add a small delay before subscribing
    await pubsub.subscribe(channel_name) # Use await for async subscribe

    # Listen for messages
    while True:
        message = await pubsub.get_message(ignore_subscribe_messages=True)
        if message:
            channel = message['channel'].decode()
            data_str = message['data'].decode()
            # print(f"\n\n\nReceived message on channel '{channel}': {data_str}")

            # Convert the received JSON string back to a Python dictionary
            try:
                data_dict = json.loads(data_str)

                # Extract app_id from platform_specific
                app_id = data_dict.get('platform_specific', {}).get('app_id')

                # Get valid tenant access token using the Feishu service function, passing only app_id
                tenant_access_token = None
                if app_id:
                    tenant_access_token = await get_valid_tenant_access_token(app_id)
                else:
                    print("Missing app_id in received data, cannot get tenant access token.")

                if tenant_access_token:
                    # Extract necessary info from data_dict for sending reply
                    msg_id = data_dict.get('msg_id')
                    content = data_dict.get('content') # This should be the dictionary like {"text": "..."}
                    # Correctly extract msg_type from the original_msg_type field
                    msg_type = data_dict.get('original_msg_type')

                    if msg_id and content and msg_type:
                         # Send the reply using the Feishu service function
                        await send_feishu_reply(msg_id, tenant_access_token, content, msg_type)
                    else:
                        print("Missing msg_id, content, or msg_type in received data, cannot send reply.")

                else:
                    print("Failed to get tenant access token, cannot process message.")

            except json.JSONDecodeError as e:
                print(f"Could not parse message data as JSON: {e}")
                # Handle cases where the message is not valid JSON
            except Exception as e:
                print(f"An unexpected error occurred while processing message: {e}")

        await asyncio.sleep(0.01) # Prevent blocking the event loop

async def publish_message(channel: str, message: str):
    """
    Publishes a message to a Redis channel.
    """
    try:
        # Publish the message to the specified channel (publish is asynchronous)
        await r.publish(channel, message)
        # print(f"Published message to channel '{channel}'")
        return {"status": "success", "message": f"Published message to channel '{channel}'"}

    except Exception as e:
        print(f"Error publishing message: {e}")
        return {"status": "error", "message": str(e)}