File size: 7,788 Bytes
63d40fb
6fd8a52
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aa3e29b
6fd8a52
 
 
 
 
 
 
 
 
 
aa3e29b
6fd8a52
 
 
 
a248251
 
 
 
 
 
 
 
 
 
 
6fd8a52
 
 
 
63d40fb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d859f7d
 
 
 
 
 
 
 
 
 
63d40fb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d859f7d
 
 
 
 
 
 
 
 
 
6fd8a52
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
aa3e29b
6fd8a52
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import os, requests
import asyncio
import redis.asyncio as redis
from dotenv import load_dotenv
from redis.exceptions import ConnectionError
import json
from feishu_service import get_valid_tenant_access_token, send_feishu_reply # Import Feishu service functions

load_dotenv()

redis_host = os.getenv('REDIS_HOST')
redis_password = os.getenv('REDIS_PASSWORD')
redis_port = os.getenv('REDIS_PORT', 6379)

# Use async Redis client
r = redis.Redis(
  host=redis_host,
  port=int(redis_port),
  password=redis_password,
  ssl=True
)

async def connect_redis():
    """
    Connects to Redis and performs a ping test.
    """
    try:
        await r.ping()
        print("Connected to Redis!")

        # Perform SET and GET test after successful connection
        try:
            await r.set("mykey", "myvalue")
            value = await r.get("mykey")
        except Exception as e:
            print(f"Redis SET/GET test failed: {e}")

    except ConnectionError as e:
        print(f"Could not connect to Redis: {e}")
        # If connection fails, we might not be able to proceed further
        raise # Re-raise the exception to indicate connection failure

async def redis_subscriber(redis_client, channel_name):
    """
    Asynchronous function to subscribe to a Redis channel and process messages.
    """
    # print(f"Subscribing to channel: {channel_name}")
    pubsub = redis_client.pubsub()
    await asyncio.sleep(1) # Add a small delay before subscribing
    await pubsub.subscribe(channel_name) # Use await for async subscribe

    # Listen for messages
    while True:
        message = await pubsub.get_message(ignore_subscribe_messages=True)
        if message:
            channel = message['channel'].decode()
            data_str = message['data'].decode()
            # print(f"\n\n\nReceived message on channel '{channel}': {data_str}")

            # Convert the received JSON string back to a Python dictionary
            try:
                data_dict = json.loads(data_str)

                # Extract app_id from platform_specific
                app_id = data_dict.get('platform_specific', {}).get('app_id')

                # Get valid tenant access token using the Feishu service function, passing only app_id
                tenant_access_token = None
                if app_id:
                    tenant_access_token = await get_valid_tenant_access_token(app_id)
                else:
                    print("Missing app_id in received data, cannot get tenant access token.")

                if tenant_access_token:
                    # Extract necessary info from data_dict for sending reply
                    msg_id = data_dict.get('msg_id')
                    content = data_dict.get('content') # This should be the dictionary like {"text": "..."}
                    print('content["text"]',content["text"])


                    response = requests.get("https://tanbushi-adk-teams.hf.space/apps/ai_quant/users/tanbushi/sessions/ai_quant_tanbushi_session")
                    if (response.status_code != 200):
                        print("Failed to get session")
                        response=requests.post("https://tanbushi-adk-teams.hf.space/apps/ai_quant/users/tanbushi/sessions/ai_quant_tanbushi_session")
                        if (response.status_code==200):
                            data = {
                                "app_name": "ai_quant",
                                "user_id": "user",
                                "session_id": "232d4228-d3a2-41ee-99eb-3db5b307260e",
                                "new_message": {
                                    "parts": [
                                        {
                                            "text": content["text"]
                                        }
                                    ],
                                    "role": "user"
                                },
                                "streaming": False
                            }
                            response1 = requests.post("https://tanbushi-adk-teams.hf.space/run", json=data)
                            response1_json=response1.json()
                            print('\n\n\n\n\n\n\n\nresponse1_json: ',response1_json)
                            # 引用自检索结果[6]
                            if response1_json[0]["content"]["parts"][0].get("function_call"):
                                function_call = response1_json[0]["content"]["parts"][0]["function_call"]
                                print(f"函数调用: {function_call['name']}")
                            else:
                                text_content = "".join([part["text"] for part in response1_json[0]["content"]["parts"]])
                                print(text_content)

                    else:
                        print("Successfully get session")
                        data = {
                            "app_name": "ai_quant",
                            "user_id": "tanbushi",
                            "session_id": "ai_quant_tanbushi_session",
                            "new_message": {
                                "parts": [
                                    {
                                        "text": content["text"]
                                    }
                                ],
                                "role": "user"
                            },
                            "streaming": False
                        }
                        response1 = requests.post("https://tanbushi-adk-teams.hf.space/run", json=data)
                        response1_json=response1.json()
                        print('\n\n\n\n\n\n\n\nresponse1_json: ', response1_json)
                        if response1_json[0]["content"]["parts"][0].get("function_call"):
                            function_call = response1_json[0]["content"]["parts"][0]["function_call"]
                            print(f"函数调用: {function_call['name']}")
                        else:
                            text_content = "".join([part["text"] for part in response1_json[0]["content"]["parts"]])
                            print(text_content)

                    content = {"text": text_content}
                    # Correctly extract msg_type from the original_msg_type field
                    msg_type = data_dict.get('original_msg_type')

                    if msg_id and content and msg_type:
                         # Send the reply using the Feishu service function
                        await send_feishu_reply(msg_id, tenant_access_token, content, msg_type)
                    else:
                        print("Missing msg_id, content, or msg_type in received data, cannot send reply.")

                else:
                    print("Failed to get tenant access token, cannot process message.")

            except json.JSONDecodeError as e:
                print(f"Could not parse message data as JSON: {e}")
                # Handle cases where the message is not valid JSON
            except Exception as e:
                print(f"An unexpected error occurred while processing message: {e}")

        await asyncio.sleep(0.01) # Prevent blocking the event loop

async def publish_message(channel: str, message: str):
    """
    Publishes a message to a Redis channel.
    """
    try:
        # Publish the message to the specified channel (publish is asynchronous)
        await r.publish(channel, message)
        # print(f"Published message to channel '{channel}'")
        return {"status": "success", "message": f"Published message to channel '{channel}'"}

    except Exception as e:
        print(f"Error publishing message: {e}")
        return {"status": "error", "message": str(e)}