geqintan commited on
Commit
7373a20
·
1 Parent(s): 388725d
Files changed (4) hide show
  1. .gitignore +2 -0
  2. __pycache__/app.cpython-312.pyc +0 -0
  3. app.py +271 -1
  4. requirements.txt +3 -0
.gitignore ADDED
@@ -0,0 +1,2 @@
 
 
 
1
+ __pycache__/
2
+ .env
__pycache__/app.cpython-312.pyc DELETED
Binary file (416 Bytes)
 
app.py CHANGED
@@ -1,7 +1,277 @@
1
- from fastapi import FastAPI
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
2
 
3
  app = FastAPI()
4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5
  @app.get("/")
6
  def greet_json():
7
  return {"Hello": "World!"}
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from fastapi import FastAPI, Request
2
+ import os, time
3
+ import asyncio
4
+ import redis.asyncio as redis # Import async Redis client
5
+ from dotenv import load_dotenv
6
+ from redis.exceptions import ConnectionError # Import ConnectionError
7
+ import json # Import the json module
8
+ import httpx # Import httpx for async HTTP requests
9
+
10
+ load_dotenv()
11
+
12
+ redis_host = os.getenv('REDIS_HOST')
13
+ redis_password = os.getenv('REDIS_PASSWORD')
14
+ redis_port = os.getenv('REDIS_PORT', 6379) # Default to 6379 if not set
15
+ feishu_app_id = os.getenv('FEISHU_APP_ID', 'YOUR_FEISHU_APP_ID') # Placeholder
16
+ feishu_app_secret = os.getenv('FEISHU_APP_SECRET', 'YOUR_FEISHU_APP_SECRET') # Placeholder
17
+
18
+ # Use async Redis client
19
+ r = redis.Redis(
20
+ host=redis_host,
21
+ port=int(redis_port), # Ensure port is an integer
22
+ password=redis_password,
23
+ ssl=True
24
+ )
25
+
26
+ reply_template = {
27
+ "url":"https://open.feishu.cn/open-apis/im/v1/messages/{{msg_id}}/reply",
28
+ "Authorization": f"Bearer {{tenant_access_token}}",
29
+ "body": {
30
+ "content": "{{body}}",
31
+ "msg_type": "{{ type }}"
32
+ }
33
+ }
34
+
35
+ def generate_reply_string(msg_id: str, tenant_access_token: str, body: str, type: str) -> str:
36
+ """
37
+ Generates a reply string based on the reply_template.
38
+
39
+ Args:
40
+ msg_id: The message ID.
41
+ tenant_access_token: The tenant access token.
42
+ body: The content body of the reply (should be a dictionary).
43
+ type: The message type.
44
+
45
+ Returns:
46
+ A formatted string based on the reply_template.
47
+ """
48
+ # Use json.dumps to convert the body dictionary to a JSON string
49
+ # and then escape the quotes for inclusion in the f-string
50
+ body_json_string = json.dumps(body)
51
+
52
+ # Construct the final string using an f-string
53
+ # Note: The original template had {{body}} directly, which implies the body
54
+ # should be inserted as a JSON object, not a string.
55
+ # The f-string needs to correctly format the JSON structure.
56
+ # The url and Authorization header use f-strings within the template itself,
57
+ # so we need to evaluate those first, then insert the variables.
58
+ # Let's reconstruct the template string directly with f-strings.
59
+ reply_string = f"""{{
60
+ "url":"https://open.feishu.cn/open-apis/im/v1/messages/{msg_id}/reply",
61
+ "Authorization": "Bearer {tenant_access_token}",
62
+ "body": {{
63
+ "content": {{
64
+ "text": "{body}"
65
+ }},
66
+ "msg_type": "{type}"
67
+ }}
68
+ }}"""
69
+ return reply_string
70
+
71
+ async def get_valid_tenant_access_token() -> str:
72
+ """
73
+ Retrieves a valid tenant access token from Feishu API.
74
+ """
75
+ url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
76
+ headers = {
77
+ "Content-Type": "application/json"
78
+ }
79
+ payload = {
80
+ "app_id": feishu_app_id,
81
+ "app_secret": feishu_app_secret
82
+ }
83
+
84
+ async with httpx.AsyncClient() as client:
85
+ try:
86
+ response = await client.post(url, headers=headers, json=payload)
87
+ response.raise_for_status() # Raise an exception for bad status codes
88
+ data = response.json()
89
+ if data.get("code") == 0:
90
+ return data.get("tenant_access_token")
91
+ else:
92
+ print(f"Error getting tenant access token: {data.get('msg')}")
93
+ return None
94
+ except httpx.HTTPStatusError as e:
95
+ print(f"HTTP error occurred: {e}")
96
+ return None
97
+ except httpx.RequestError as e:
98
+ print(f"An error occurred while requesting {e.request.url!r}: {e}")
99
+ return None
100
+ except Exception as e:
101
+ print(f"An unexpected error occurred: {e}")
102
+ return None
103
+
104
+
105
+ # def get_tenant_access_token():
106
+ # return "tenant_access_token"
107
 
108
  app = FastAPI()
109
 
110
+ # Optional: Test connection (async ping)
111
+ @app.on_event("startup")
112
+ async def connect_redis():
113
+ try:
114
+ await r.ping()
115
+ print("Connected to Redis!")
116
+
117
+ # Perform SET and GET test after successful connection
118
+ try:
119
+ await r.set("mykey", "myvalue")
120
+ print("Redis SET test successful: mykey = myvalue")
121
+ value = await r.get("mykey")
122
+ print(f"Redis GET test successful: mykey = {value.decode()}")
123
+ except Exception as e:
124
+ print(f"Redis SET/GET test failed: {e}")
125
+
126
+ except ConnectionError as e:
127
+ print(f"Could not connect to Redis: {e}")
128
+ # If connection fails, we might not be able to proceed further
129
+ return # Exit the startup event if connection fails
130
+
131
+
132
+ # Define the Redis subscriber function
133
+ async def redis_subscriber(redis_client, channel_name):
134
+ """
135
+ Asynchronous function to subscribe to a Redis channel and process messages.
136
+ """
137
+ print(f"Subscribing to channel: {channel_name}")
138
+ pubsub = redis_client.pubsub()
139
+ await asyncio.sleep(1) # Add a small delay before subscribing
140
+ await pubsub.subscribe(channel_name) # Use await for async subscribe
141
+
142
+ # Listen for messages
143
+ while True:
144
+ # print("Waiting for message...")
145
+ message = await pubsub.get_message(ignore_subscribe_messages=True)
146
+ # print(f"Message received: {message}")
147
+ if message:
148
+ print("message in...")
149
+ channel = message['channel'].decode()
150
+ data_str = message['data'].decode()
151
+ print(f"\n\n\nReceived message on channel '{channel}': {data_str}")
152
+
153
+ # Convert the received JSON string back to a Python dictionary
154
+ try:
155
+ data_dict = json.loads(data_str)
156
+ # Get valid tenant access token
157
+ tenant_access_token = await get_valid_tenant_access_token()
158
+ if tenant_access_token:
159
+ print(f"Parsed message data as dictionary: {data_dict}")
160
+ print(f"msg_id: {data_dict['msg_id']}")
161
+ print(f"tenant_access_token: {tenant_access_token}")
162
+ print(f"body: {data_dict['content']['body']}")
163
+ print(f"type: {data_dict['content']['type']}")
164
+ reply_string = generate_reply_string(data_dict['msg_id'], tenant_access_token, data_dict['content']['body'], data_dict['content']['type'])
165
+ print(reply_string)
166
+
167
+ # Send the reply string as an HTTP POST request
168
+ try:
169
+ reply_data = json.loads(reply_string)
170
+ reply_url = reply_data.get("url")
171
+ reply_headers = {"Authorization": reply_data.get("Authorization")}
172
+ reply_body = reply_data.get("body")
173
+
174
+ async with httpx.AsyncClient() as client:
175
+ response = await client.post(reply_url, headers=reply_headers, json=reply_body)
176
+ response.raise_for_status() # Raise an exception for bad status codes
177
+ print(f"Successfully sent reply: Status Code {response.status_code}")
178
+ print(f"Reply Response: {response.text}")
179
+
180
+ except json.JSONDecodeError as e:
181
+ print(f"Could not parse reply_string as JSON: {e}")
182
+ except httpx.HTTPStatusError as e:
183
+ print(f"HTTP error sending reply: {e}")
184
+ except httpx.RequestError as e:
185
+ print(f"An error occurred while requesting {e.request.url!r} to send reply: {e}")
186
+ except Exception as e:
187
+ print(f"An unexpected error occurred while sending reply: {e}")
188
+
189
+ else:
190
+ print("Failed to get tenant access token, cannot process message.")
191
+
192
+
193
+ except json.JSONDecodeError as e:
194
+ print(f"Could not parse message data as JSON: {e}")
195
+ # Handle cases where the message is not valid JSON
196
+
197
+ await asyncio.sleep(0.01) # Prevent blocking the event loop
198
+
199
+ # Run the subscriber as a background task on startup
200
+ @app.on_event("startup")
201
+ async def startup_event():
202
+ # Replace 'my_feishu_channel' with the actual channel name you want to subscribe to
203
+ asyncio.create_task(redis_subscriber(r, 'my_feishu_channel'))
204
+ print("Redis subscriber task started.")
205
+
206
+
207
  @app.get("/")
208
  def greet_json():
209
  return {"Hello": "World!"}
210
+
211
+ @app.post("/webhook/feishu")
212
+ async def handle_webhook(request: Request):
213
+ # Read the incoming JSON body
214
+ data = await request.json()
215
+ # Check if 'type' exists in data and is equal to "url_verification"
216
+ if data.get('type') == "url_verification":
217
+ # If it's a URL verification request, return the challenge
218
+ return {"challenge": data.get('challenge')}
219
+
220
+ # Process other event types
221
+ # The full event data is under data['body']['event'] based on previous examples
222
+ event = data['event'] # Use .get() with default {} for safe access
223
+ message = event['message']
224
+ chat_type=message.get('chat_type')
225
+ if chat_type=='p2p':
226
+ message_type = 'private'
227
+ elif chat_type=='group':
228
+ if message.get('mentions'):
229
+ message_type = 'group_mention'
230
+ else:
231
+ message_type = 'group_all'
232
+
233
+ ret_data = {
234
+ 'bot_id': data['header']['app_id'],
235
+ 'message_type': message_type,
236
+ 'msg_id': message['message_id'],
237
+ "content":{
238
+ "type": message['message_type'],
239
+ "body": json.loads(message.get('content')).get('text')
240
+ },
241
+ "sender": {
242
+ "id": event['sender']['sender_id']['open_id'],
243
+ "sender_type": event['sender']['sender_type'],
244
+ "name": "",
245
+ "is_human": None
246
+ },
247
+ "timestamp":time.time(),
248
+ "platform_specific": {
249
+ "app_id": data['header']['app_id']
250
+ }
251
+ }
252
+ await r.publish('my_feishu_channel', json.dumps(ret_data))
253
+
254
+ print('\n\n\nret_data: ',json.dumps(ret_data))
255
+ return {'status': 'OK'}
256
+
257
+ @app.post("/publish")
258
+ async def publish_message(request: Request):
259
+ """
260
+ Publishes a message to a Redis channel.
261
+ Expects JSON body with 'channel' and 'message'.
262
+ """
263
+ try:
264
+ data = await request.json()
265
+ channel = data.get("channel")
266
+ message = data.get("message")
267
+
268
+ if not channel or not message:
269
+ return {"status": "error", "message": "Missing channel or message"}
270
+
271
+ # Publish the message to the specified channel (publish is asynchronous)
272
+ await r.publish(channel, message)
273
+
274
+ return {"status": "success", "message": f"Published message to channel '{channel}'"}
275
+
276
+ except Exception as e:
277
+ return {"status": "error", "message": str(e)}
requirements.txt CHANGED
@@ -1,3 +1,6 @@
1
  # clear && uvicorn app:app --host 0.0.0.0 --port 7860 --reload
2
  fastapi
3
  uvicorn[standard]
 
 
 
 
1
  # clear && uvicorn app:app --host 0.0.0.0 --port 7860 --reload
2
  fastapi
3
  uvicorn[standard]
4
+ redis
5
+ python-dotenv
6
+ httpx