geqintan commited on
Commit
6fd8a52
·
1 Parent(s): 915340c
Files changed (5) hide show
  1. Dockerfile +1 -1
  2. app.py +0 -277
  3. feishu_service.py +76 -0
  4. main.py +114 -0
  5. redis_service.py +105 -0
Dockerfile CHANGED
@@ -13,4 +13,4 @@ COPY --chown=user ./requirements.txt requirements.txt
13
  RUN pip install --no-cache-dir --upgrade -r requirements.txt
14
 
15
  COPY --chown=user . /app
16
- CMD ["uvicorn", "app:app", "--host", "0.0.0.0", "--port", "7860"]
 
13
  RUN pip install --no-cache-dir --upgrade -r requirements.txt
14
 
15
  COPY --chown=user . /app
16
+ CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "7860"]
app.py DELETED
@@ -1,277 +0,0 @@
1
- from fastapi import FastAPI, Request
2
- import os, time
3
- import asyncio
4
- import redis.asyncio as redis # Import async Redis client
5
- from dotenv import load_dotenv
6
- from redis.exceptions import ConnectionError # Import ConnectionError
7
- import json # Import the json module
8
- import httpx # Import httpx for async HTTP requests
9
-
10
- load_dotenv()
11
-
12
- redis_host = os.getenv('REDIS_HOST')
13
- redis_password = os.getenv('REDIS_PASSWORD')
14
- redis_port = os.getenv('REDIS_PORT', 6379) # Default to 6379 if not set
15
- feishu_app_id = os.getenv('FEISHU_APP_ID', 'YOUR_FEISHU_APP_ID') # Placeholder
16
- feishu_app_secret = os.getenv('FEISHU_APP_SECRET', 'YOUR_FEISHU_APP_SECRET') # Placeholder
17
-
18
- # Use async Redis client
19
- r = redis.Redis(
20
- host=redis_host,
21
- port=int(redis_port), # Ensure port is an integer
22
- password=redis_password,
23
- ssl=True
24
- )
25
-
26
- reply_template = {
27
- "url":"https://open.feishu.cn/open-apis/im/v1/messages/{{msg_id}}/reply",
28
- "Authorization": f"Bearer {{tenant_access_token}}",
29
- "body": {
30
- "content": "{{body}}",
31
- "msg_type": "{{ type }}"
32
- }
33
- }
34
-
35
- def generate_reply_string(msg_id: str, tenant_access_token: str, body: str, type: str) -> str:
36
- """
37
- Generates a reply string based on the reply_template.
38
-
39
- Args:
40
- msg_id: The message ID.
41
- tenant_access_token: The tenant access token.
42
- body: The content body of the reply (should be a dictionary).
43
- type: The message type.
44
-
45
- Returns:
46
- A formatted string based on the reply_template.
47
- """
48
- # Use json.dumps to convert the body dictionary to a JSON string
49
- # and then escape the quotes for inclusion in the f-string
50
- body_json_string = json.dumps(body)
51
-
52
- # Construct the final string using an f-string
53
- # Note: The original template had {{body}} directly, which implies the body
54
- # should be inserted as a JSON object, not a string.
55
- # The f-string needs to correctly format the JSON structure.
56
- # The url and Authorization header use f-strings within the template itself,
57
- # so we need to evaluate those first, then insert the variables.
58
- # Let's reconstruct the template string directly with f-strings.
59
- reply_string = f"""{{
60
- "url":"https://open.feishu.cn/open-apis/im/v1/messages/{msg_id}/reply",
61
- "Authorization": "Bearer {tenant_access_token}",
62
- "body": {{
63
- "content": {{
64
- "text": "{body}"
65
- }},
66
- "msg_type": "{type}"
67
- }}
68
- }}"""
69
- return reply_string
70
-
71
- async def get_valid_tenant_access_token() -> str:
72
- # return "t-g1045baIWTQUCO2WMLFF4LZXMB4MFPGZ6JL4ZNN" # 测试,使用固定值
73
- """
74
- Retrieves a valid tenant access token from Feishu API.
75
- """
76
- url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
77
- headers = {
78
- "Content-Type": "application/json"
79
- }
80
- payload = {
81
- "app_id": feishu_app_id,
82
- "app_secret": feishu_app_secret
83
- }
84
-
85
- async with httpx.AsyncClient() as client:
86
- try:
87
- response = await client.post(url, headers=headers, json=payload)
88
- response.raise_for_status() # Raise an exception for bad status codes
89
- data = response.json()
90
- if data.get("code") == 0:
91
- return data.get("tenant_access_token")
92
- else:
93
- print(f"Error getting tenant access token: {data.get('msg')}")
94
- return None
95
- except httpx.HTTPStatusError as e:
96
- print(f"HTTP error occurred: {e}")
97
- return None
98
- except httpx.RequestError as e:
99
- print(f"An error occurred while requesting {e.request.url!r}: {e}")
100
- return None
101
- except Exception as e:
102
- print(f"An unexpected error occurred: {e}")
103
- return None
104
-
105
-
106
- # def get_tenant_access_token():
107
- # return "tenant_access_token"
108
-
109
- app = FastAPI()
110
-
111
- # Optional: Test connection (async ping)
112
- @app.on_event("startup")
113
- async def connect_redis():
114
- try:
115
- await r.ping()
116
- print("Connected to Redis!")
117
-
118
- # Perform SET and GET test after successful connection
119
- try:
120
- await r.set("mykey", "myvalue")
121
- print("Redis SET test successful: mykey = myvalue")
122
- value = await r.get("mykey")
123
- print(f"Redis GET test successful: mykey = {value.decode()}")
124
- except Exception as e:
125
- print(f"Redis SET/GET test failed: {e}")
126
-
127
- except ConnectionError as e:
128
- print(f"Could not connect to Redis: {e}")
129
- # If connection fails, we might not be able to proceed further
130
- return # Exit the startup event if connection fails
131
-
132
-
133
- # Define the Redis subscriber function
134
- async def redis_subscriber(redis_client, channel_name):
135
- """
136
- Asynchronous function to subscribe to a Redis channel and process messages.
137
- """
138
- print(f"Subscribing to channel: {channel_name}")
139
- pubsub = redis_client.pubsub()
140
- await asyncio.sleep(1) # Add a small delay before subscribing
141
- await pubsub.subscribe(channel_name) # Use await for async subscribe
142
-
143
- # Listen for messages
144
- while True:
145
- # print("Waiting for message...")
146
- message = await pubsub.get_message(ignore_subscribe_messages=True)
147
- # print(f"Message received: {message}")
148
- if message:
149
- print("\n\n\n**************************\n\nmessage in...")
150
- # 此处后续优化,先用str.decode()变成json字符串、再用json.loads()转换为json对象
151
- channel = message['channel'].decode()
152
- data_str = message['data'].decode()
153
- print(f"\n\n\nReceived message on channel '{channel}': {data_str}")
154
-
155
- # Convert the received JSON string back to a Python dictionary
156
- try:
157
- data_dict = json.loads(data_str)
158
- # Get valid tenant access token
159
- tenant_access_token = await get_valid_tenant_access_token()
160
- if tenant_access_token:
161
- reply_string = generate_reply_string(data_dict['msg_id'], tenant_access_token, data_dict['content']['body'], data_dict['content']['type'])
162
- # Send the reply string as an HTTP POST request
163
- try:
164
- reply_data = json.loads(reply_string)
165
- reply_url = reply_data.get("url")
166
- reply_headers = {"Authorization": reply_data.get("Authorization")}
167
- reply_body = reply_data.get("body")
168
- reply_content = reply_body.get("content")
169
- reply_content_string = json.dumps(reply_content)
170
- reply_body={
171
- "content":reply_content_string,
172
- "msg_type":reply_body.get("msg_type")
173
- }
174
- async with httpx.AsyncClient() as client:
175
- response = await client.post(reply_url, headers=reply_headers, json=reply_body)
176
- response.raise_for_status() # Raise an exception for bad status codes
177
- print(f"Successfully sent reply: Status Code {response.status_code}")
178
- print(f"Reply Response: {response.text}")
179
-
180
- except json.JSONDecodeError as e:
181
- print(f"Could not parse reply_string as JSON: {e}")
182
- except httpx.HTTPStatusError as e:
183
- print(f"HTTP error sending reply: {e}")
184
- except httpx.RequestError as e:
185
- print(f"An error occurred while requesting {e.request.url!r} to send reply: {e}")
186
- except Exception as e:
187
- print(f"An unexpected error occurred while sending reply: {e}")
188
-
189
- else:
190
- print("Failed to get tenant access token, cannot process message.")
191
-
192
-
193
- except json.JSONDecodeError as e:
194
- print(f"Could not parse message data as JSON: {e}")
195
- # Handle cases where the message is not valid JSON
196
-
197
- await asyncio.sleep(0.01) # Prevent blocking the event loop
198
-
199
- # Run the subscriber as a background task on startup
200
- @app.on_event("startup")
201
- async def startup_event():
202
- # Replace 'my_feishu_channel' with the actual channel name you want to subscribe to
203
- asyncio.create_task(redis_subscriber(r, 'my_feishu_channel'))
204
- print("Redis subscriber task started.")
205
-
206
-
207
- @app.get("/")
208
- def greet_json():
209
- return {"Hello": "World!"}
210
-
211
- @app.post("/webhook/feishu")
212
- async def handle_webhook(request: Request):
213
- # Read the incoming JSON body
214
- data = await request.json()
215
- # Check if 'type' exists in data and is equal to "url_verification"
216
- if data.get('type') == "url_verification":
217
- # If it's a URL verification request, return the challenge
218
- return {"challenge": data.get('challenge')}
219
-
220
- # Process other event types
221
- # The full event data is under data['body']['event'] based on previous examples
222
- event = data['event'] # Use .get() with default {} for safe access
223
- message = event['message']
224
- chat_type=message.get('chat_type')
225
- if chat_type=='p2p':
226
- message_type = 'private'
227
- elif chat_type=='group':
228
- if message.get('mentions'):
229
- message_type = 'group_mention'
230
- else:
231
- message_type = 'group_all'
232
-
233
- ret_data = {
234
- 'bot_id': data['header']['app_id'],
235
- 'message_type': message_type,
236
- 'msg_id': message['message_id'],
237
- "content":{
238
- "type": message['message_type'],
239
- "body": json.loads(message.get('content')).get('text')
240
- },
241
- "sender": {
242
- "id": event['sender']['sender_id']['open_id'],
243
- "sender_type": event['sender']['sender_type'],
244
- "name": "",
245
- "is_human": None
246
- },
247
- "timestamp":time.time(),
248
- "platform_specific": {
249
- "app_id": data['header']['app_id']
250
- }
251
- }
252
- await r.publish('my_feishu_channel', json.dumps(ret_data))
253
-
254
- print('\n\n\nret_data: ',json.dumps(ret_data))
255
- return {'status': 'OK'}
256
-
257
- @app.post("/publish")
258
- async def publish_message(request: Request):
259
- """
260
- Publishes a message to a Redis channel.
261
- Expects JSON body with 'channel' and 'message'.
262
- """
263
- try:
264
- data = await request.json()
265
- channel = data.get("channel")
266
- message = data.get("message")
267
-
268
- if not channel or not message:
269
- return {"status": "error", "message": "Missing channel or message"}
270
-
271
- # Publish the message to the specified channel (publish is asynchronous)
272
- await r.publish(channel, message)
273
-
274
- return {"status": "success", "message": f"Published message to channel '{channel}'"}
275
-
276
- except Exception as e:
277
- return {"status": "error", "message": str(e)}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
feishu_service.py ADDED
@@ -0,0 +1,76 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import json
3
+ import httpx
4
+ from dotenv import load_dotenv
5
+
6
+ load_dotenv()
7
+
8
+ feishu_app_id = os.getenv('FEISHU_APP_ID', 'YOUR_FEISHU_APP_ID') # Placeholder
9
+ feishu_app_secret = os.getenv('FEISHU_APP_SECRET', 'YOUR_FEISHU_APP_SECRET') # Placeholder
10
+
11
+ async def get_valid_tenant_access_token() -> str:
12
+ """
13
+ Retrieves a valid tenant access token from Feishu API.
14
+ """
15
+ url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
16
+ headers = {
17
+ "Content-Type": "application/json"
18
+ }
19
+ payload = {
20
+ "app_id": feishu_app_id,
21
+ "app_secret": feishu_app_secret
22
+ }
23
+
24
+ async with httpx.AsyncClient() as client:
25
+ try:
26
+ response = await client.post(url, headers=headers, json=payload)
27
+ response.raise_for_status() # Raise an exception for bad status codes
28
+ data = response.json()
29
+ if data.get("code") == 0:
30
+ return data.get("tenant_access_token")
31
+ else:
32
+ print(f"Error getting tenant access token: {data.get('msg')}")
33
+ return None
34
+ except httpx.HTTPStatusError as e:
35
+ print(f"HTTP error occurred: {e}")
36
+ return None
37
+ except httpx.RequestError as e:
38
+ print(f"An error occurred while requesting {e.request.url!r}: {e}")
39
+ return None
40
+ except Exception as e:
41
+ print(f"An unexpected error occurred: {e}")
42
+ return None
43
+
44
+ async def send_feishu_reply(msg_id: str, tenant_access_token: str, content: dict, msg_type: str):
45
+ """
46
+ Sends a reply message to Feishu API.
47
+
48
+ Args:
49
+ msg_id: The message ID to reply to.
50
+ tenant_access_token: The tenant access token.
51
+ content: The content of the reply message (dictionary).
52
+ msg_type: The type of the message (e.g., 'text').
53
+ """
54
+ url = f"https://open.feishu.cn/open-apis/im/v1/messages/{msg_id}/reply"
55
+ headers = {
56
+ "Authorization": f"Bearer {tenant_access_token}",
57
+ "Content-Type": "application/json"
58
+ }
59
+ # The content needs to be a JSON string within the 'content' field of the body
60
+ body = {
61
+ "content": json.dumps(content),
62
+ "msg_type": msg_type
63
+ }
64
+
65
+ async with httpx.AsyncClient() as client:
66
+ try:
67
+ response = await client.post(url, headers=headers, json=body)
68
+ response.raise_for_status() # Raise an exception for bad status codes
69
+ print(f"Successfully sent reply: Status Code {response.status_code}")
70
+ print(f"Reply Response: {response.text}")
71
+ except httpx.HTTPStatusError as e:
72
+ print(f"HTTP error sending reply: {e}")
73
+ except httpx.RequestError as e:
74
+ print(f"An error occurred while requesting {e.request.url!r} to send reply: {e}")
75
+ except Exception as e:
76
+ print(f"An unexpected error occurred while sending reply: {e}")
main.py ADDED
@@ -0,0 +1,114 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import FastAPI, Request
2
+ import time
3
+ import asyncio
4
+ import json
5
+ from dotenv import load_dotenv
6
+
7
+ # Import functions from the new service modules
8
+ from redis_service import connect_redis, redis_subscriber, r as redis_client, publish_message as publish_to_redis
9
+ from feishu_service import get_valid_tenant_access_token, send_feishu_reply
10
+
11
+ load_dotenv()
12
+
13
+ # Replace 'my_feishu_channel' with the actual channel name you want to subscribe to
14
+ REDIS_CHANNEL_NAME = 'my_feishu_channel'
15
+
16
+ app = FastAPI()
17
+
18
+ # Startup event to connect to Redis and start the subscriber task
19
+ @app.on_event("startup")
20
+ async def startup_event():
21
+ # Connect to Redis
22
+ await connect_redis()
23
+ # Start the Redis subscriber as a background task
24
+ asyncio.create_task(redis_subscriber(redis_client, REDIS_CHANNEL_NAME))
25
+ print("Redis subscriber task started.")
26
+
27
+ @app.get("/")
28
+ def greet_json():
29
+ return {"Hello": "World!"}
30
+
31
+ @app.post("/webhook/feishu")
32
+ async def handle_webhook(request: Request):
33
+ # Read the incoming JSON body
34
+ data = await request.json()
35
+
36
+ # Check if 'type' exists in data and is equal to "url_verification"
37
+ if data.get('type') == "url_verification":
38
+ # If it's a URL verification request, return the challenge
39
+ print("Received URL verification request.")
40
+ return {"challenge": data.get('challenge')}
41
+
42
+ # Process other event types
43
+ # The full event data is under data['event']
44
+ event = data.get('event', {})
45
+ message = event.get('message', {})
46
+ chat_type = message.get('chat_type')
47
+
48
+ message_type = 'unknown'
49
+ if chat_type == 'p2p':
50
+ message_type = 'private'
51
+ elif chat_type == 'group':
52
+ if message.get('mentions'):
53
+ message_type = 'group_mention'
54
+ else:
55
+ message_type = 'group_all'
56
+
57
+ # Extract content safely
58
+ content_str = message.get('content')
59
+ content_dict = {}
60
+ if content_str:
61
+ try:
62
+ content_dict = json.loads(content_str)
63
+ except json.JSONDecodeError:
64
+ print(f"Warning: Could not parse message content as JSON: {content_str}")
65
+ # Handle cases where content is not valid JSON, maybe store as plain text
66
+ content_dict = {"text": content_str} # Store raw content as text
67
+
68
+ ret_data = {
69
+ 'bot_id': data.get('header', {}).get('app_id'),
70
+ 'message_type': message_type, # This is chat type (private/group)
71
+ 'original_msg_type': message.get('message_type'), # Add original message type (text/image etc.)
72
+ 'msg_id': message.get('message_id'),
73
+ "content": content_dict, # Use the parsed content dictionary
74
+ "sender": {
75
+ "id": event.get('sender', {}).get('sender_id', {}).get('open_id'),
76
+ "sender_type": event.get('sender', {}).get('sender_type'),
77
+ "name": "", # Name is not directly available in this payload structure
78
+ "is_human": None # Not directly available
79
+ },
80
+ "timestamp": time.time(),
81
+ "platform_specific": {
82
+ "app_id": data.get('header', {}).get('app_id')
83
+ }
84
+ }
85
+
86
+ # Publish the processed message data to Redis
87
+ await publish_to_redis(REDIS_CHANNEL_NAME, json.dumps(ret_data))
88
+
89
+ print('\n\n\nret_data: ', json.dumps(ret_data, indent=2)) # Use indent for readability
90
+ return {'status': 'OK'}
91
+
92
+ # The /publish endpoint is now handled by the publish_message function in redis_service.py
93
+ # We can expose it here if needed, or remove it if it's only for internal use.
94
+ # For now, let's keep it as an example of calling the service function from a route.
95
+ @app.post("/publish")
96
+ async def publish_message_route(request: Request):
97
+ """
98
+ Publishes a message to a Redis channel via the redis_service.
99
+ Expects JSON body with 'channel' and 'message'.
100
+ """
101
+ try:
102
+ data = await request.json()
103
+ channel = data.get("channel")
104
+ message = data.get("message")
105
+
106
+ if not channel or not message:
107
+ return {"status": "error", "message": "Missing channel or message"}
108
+
109
+ # Call the publish function from redis_service
110
+ result = await publish_to_redis(channel, message)
111
+ return result
112
+
113
+ except Exception as e:
114
+ return {"status": "error", "message": str(e)}
redis_service.py ADDED
@@ -0,0 +1,105 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ import asyncio
3
+ import redis.asyncio as redis
4
+ from dotenv import load_dotenv
5
+ from redis.exceptions import ConnectionError
6
+ import json
7
+ from feishu_service import get_valid_tenant_access_token, send_feishu_reply # Import Feishu service functions
8
+
9
+ load_dotenv()
10
+
11
+ redis_host = os.getenv('REDIS_HOST')
12
+ redis_password = os.getenv('REDIS_PASSWORD')
13
+ redis_port = os.getenv('REDIS_PORT', 6379)
14
+
15
+ # Use async Redis client
16
+ r = redis.Redis(
17
+ host=redis_host,
18
+ port=int(redis_port),
19
+ password=redis_password,
20
+ ssl=True
21
+ )
22
+
23
+ async def connect_redis():
24
+ """
25
+ Connects to Redis and performs a ping test.
26
+ """
27
+ try:
28
+ await r.ping()
29
+ print("Connected to Redis!")
30
+
31
+ # Perform SET and GET test after successful connection
32
+ try:
33
+ await r.set("mykey", "myvalue")
34
+ print("Redis SET test successful: mykey = myvalue")
35
+ value = await r.get("mykey")
36
+ print(f"Redis GET test successful: mykey = {value.decode()}")
37
+ except Exception as e:
38
+ print(f"Redis SET/GET test failed: {e}")
39
+
40
+ except ConnectionError as e:
41
+ print(f"Could not connect to Redis: {e}")
42
+ # If connection fails, we might not be able to proceed further
43
+ raise # Re-raise the exception to indicate connection failure
44
+
45
+ async def redis_subscriber(redis_client, channel_name):
46
+ """
47
+ Asynchronous function to subscribe to a Redis channel and process messages.
48
+ """
49
+ print(f"Subscribing to channel: {channel_name}")
50
+ pubsub = redis_client.pubsub()
51
+ await asyncio.sleep(1) # Add a small delay before subscribing
52
+ await pubsub.subscribe(channel_name) # Use await for async subscribe
53
+
54
+ # Listen for messages
55
+ while True:
56
+ message = await pubsub.get_message(ignore_subscribe_messages=True)
57
+ if message:
58
+ print("\n\n\n**************************\n\nmessage in...")
59
+ channel = message['channel'].decode()
60
+ data_str = message['data'].decode()
61
+ print(f"\n\n\nReceived message on channel '{channel}': {data_str}")
62
+
63
+ # Convert the received JSON string back to a Python dictionary
64
+ try:
65
+ data_dict = json.loads(data_str)
66
+ # Get valid tenant access token using the Feishu service function
67
+ tenant_access_token = await get_valid_tenant_access_token()
68
+ if tenant_access_token:
69
+ # Extract necessary info from data_dict for sending reply
70
+ msg_id = data_dict.get('msg_id')
71
+ content = data_dict.get('content') # This should be the dictionary like {"text": "..."}
72
+ # Correctly extract msg_type from the original_msg_type field
73
+ msg_type = data_dict.get('original_msg_type')
74
+
75
+ if msg_id and content and msg_type:
76
+ # Send the reply using the Feishu service function
77
+ await send_feishu_reply(msg_id, tenant_access_token, content, msg_type)
78
+ else:
79
+ print("Missing msg_id, content, or msg_type in received data, cannot send reply.")
80
+
81
+ else:
82
+ print("Failed to get tenant access token, cannot process message.")
83
+
84
+ except json.JSONDecodeError as e:
85
+ print(f"Could not parse message data as JSON: {e}")
86
+ # Handle cases where the message is not valid JSON
87
+ except Exception as e:
88
+ print(f"An unexpected error occurred while processing message: {e}")
89
+
90
+
91
+ await asyncio.sleep(0.01) # Prevent blocking the event loop
92
+
93
+ async def publish_message(channel: str, message: str):
94
+ """
95
+ Publishes a message to a Redis channel.
96
+ """
97
+ try:
98
+ # Publish the message to the specified channel (publish is asynchronous)
99
+ await r.publish(channel, message)
100
+ print(f"Published message to channel '{channel}'")
101
+ return {"status": "success", "message": f"Published message to channel '{channel}'"}
102
+
103
+ except Exception as e:
104
+ print(f"Error publishing message: {e}")
105
+ return {"status": "error", "message": str(e)}