geqintan commited on
Commit
aa3e29b
·
1 Parent(s): a248251
Files changed (3) hide show
  1. feishu_service.py +0 -26
  2. main.py +0 -2
  3. redis_service.py +3 -11
feishu_service.py CHANGED
@@ -14,26 +14,20 @@ from store import token_store
14
  # Ensure SUPABASE_URL and SUPABASE_KEY are set in your .env file
15
  SUPABASE_URL = os.getenv("SUPABASE_URL")
16
  SUPABASE_KEY = os.getenv("SUPABASE_KEY")
17
- print('\n\n\n\nSUPABASE_URL', SUPABASE_URL)
18
- print('SUPABASE_KEY', SUPABASE_KEY)
19
-
20
  if not SUPABASE_URL or not SUPABASE_KEY:
21
  print("Error: SUPABASE_URL or SUPABASE_KEY not found in environment variables.")
22
  # Depending on your application's needs, you might want to exit or handle this differently
23
  # For now, we'll just print an error and the client will be None
24
  supabase: Client | None = None
25
  else:
26
- print("\n\n\n\n\n\nConnecting to Supabase...")
27
  supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
28
  print("Supabase client initialized.")
29
 
30
-
31
  async def get_app_secret_from_db(app_id: str) -> str | None:
32
  """
33
  Reads the app_secret from the Supabase database based on app_id.
34
  Assumes a table named 'feishu_apps' with columns 'app_id' and 'app_secret'.
35
  """
36
- print('\n\n\n\n************************\nget_app_secret_from_db() app_id:', app_id)
37
  if not supabase:
38
  print("Supabase client not initialized, cannot fetch app_secret.")
39
  return None # Correctly return None if supabase client is not initialized
@@ -42,14 +36,10 @@ async def get_app_secret_from_db(app_id: str) -> str | None:
42
  # Query the 'feishu_bot_config' table for the platform_specific column matching the bot_id
43
  # Use maybe_single() as we expect at most one row, and execute() to get the result
44
  response = supabase.table('feishu_bot_config').select('platform_specific').eq('bot_id', app_id).single().execute()
45
- # print(f"\n\n\n\nResponse from Supabase (maybe_single().execute()): ",response)
46
- print('\nresponse.data', response.data)
47
-
48
  # Check if a row was returned and extract the app_secret from platform_specific
49
  # The result of maybe_single().execute() will have a 'data' attribute
50
  if response.data and 'platform_specific' in response.data and 'app_secret' in response.data['platform_specific']:
51
  app_secret = response.data['platform_specific']['app_secret']
52
- print(f"Successfully retrieved app_secret for app_id: {app_id} from Supabase. app_secret: ", app_secret)
53
  return app_secret
54
  else:
55
  print(f"No app_secret found in Supabase for app_id: {app_id}.")
@@ -58,7 +48,6 @@ async def get_app_secret_from_db(app_id: str) -> str | None:
58
  print(f"Error fetching app_secret from Supabase for app_id {app_id}: {e}")
59
  return None
60
 
61
-
62
  async def get_valid_tenant_access_token(app_id: str) -> str | None:
63
  """
64
  Retrieves a valid tenant access token for the given app_id.
@@ -71,13 +60,8 @@ async def get_valid_tenant_access_token(app_id: str) -> str | None:
71
  Returns:
72
  A valid tenant access token or None if unable to obtain one.
73
  """
74
- print('\n\n\n\n\n\n\n*******************************\n')
75
- print('get_valid_tenant_access_token, appid:', app_id)
76
  # 1. Prioritize fetching from the global store
77
  stored_token_info = token_store.get(app_id)
78
- print('\n\n\n\n\n\n\n\n$$$$$$$$$$$$$$$$$$$$$$$$$$$$$\nstored_token_info',stored_token_info)
79
- print('\ntoken_store:', token_store)
80
-
81
  if stored_token_info:
82
  token = stored_token_info.get('token')
83
  created_time = stored_token_info.get('created_time')
@@ -119,24 +103,16 @@ async def get_valid_tenant_access_token(app_id: str) -> str | None:
119
  response = await client.post(url, headers=headers, json=payload)
120
  response.raise_for_status() # Raise an exception for bad status codes
121
  data = response.json()
122
- print('\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n^^^^^data',data)
123
  if data.get("code") == 0:
124
  new_token = data.get("tenant_access_token")
125
  expire = data.get("expire") # Note: Feishu API returns 'expire_in'
126
- print('\nnew_token',new_token)
127
- print('\nexpires',expire)
128
-
129
-
130
  if new_token and expire is not None:
131
- print("\n\n%%%%%%%%%%%%%%%%%%%%%%%%%%%%%New token received")
132
  # 4. Store the new token and its expiry information
133
  token_store[app_id] = {
134
  'token': new_token,
135
  'created_time': time.time(),
136
  'expire': expire
137
  }
138
- print(f"Successfully generated and stored new tenant access token for app_id: {app_id}")
139
- print('\ntoken_store',token_store)
140
  return new_token
141
  else:
142
  print(f"Error generating new tenant access token: Missing token or expiry info in response.")
@@ -179,8 +155,6 @@ async def send_feishu_reply(msg_id: str, tenant_access_token: str, content: dict
179
  try:
180
  response = await client.post(url, headers=headers, json=body)
181
  response.raise_for_status() # Raise an exception for bad status codes
182
- print(f"Successfully sent reply: Status Code {response.status_code}")
183
- print(f"Reply Response: {response.text}")
184
  except httpx.HTTPStatusError as e:
185
  print(f"HTTP error sending reply: {e}")
186
  except httpx.RequestError as e:
 
14
  # Ensure SUPABASE_URL and SUPABASE_KEY are set in your .env file
15
  SUPABASE_URL = os.getenv("SUPABASE_URL")
16
  SUPABASE_KEY = os.getenv("SUPABASE_KEY")
 
 
 
17
  if not SUPABASE_URL or not SUPABASE_KEY:
18
  print("Error: SUPABASE_URL or SUPABASE_KEY not found in environment variables.")
19
  # Depending on your application's needs, you might want to exit or handle this differently
20
  # For now, we'll just print an error and the client will be None
21
  supabase: Client | None = None
22
  else:
 
23
  supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
24
  print("Supabase client initialized.")
25
 
 
26
  async def get_app_secret_from_db(app_id: str) -> str | None:
27
  """
28
  Reads the app_secret from the Supabase database based on app_id.
29
  Assumes a table named 'feishu_apps' with columns 'app_id' and 'app_secret'.
30
  """
 
31
  if not supabase:
32
  print("Supabase client not initialized, cannot fetch app_secret.")
33
  return None # Correctly return None if supabase client is not initialized
 
36
  # Query the 'feishu_bot_config' table for the platform_specific column matching the bot_id
37
  # Use maybe_single() as we expect at most one row, and execute() to get the result
38
  response = supabase.table('feishu_bot_config').select('platform_specific').eq('bot_id', app_id).single().execute()
 
 
 
39
  # Check if a row was returned and extract the app_secret from platform_specific
40
  # The result of maybe_single().execute() will have a 'data' attribute
41
  if response.data and 'platform_specific' in response.data and 'app_secret' in response.data['platform_specific']:
42
  app_secret = response.data['platform_specific']['app_secret']
 
43
  return app_secret
44
  else:
45
  print(f"No app_secret found in Supabase for app_id: {app_id}.")
 
48
  print(f"Error fetching app_secret from Supabase for app_id {app_id}: {e}")
49
  return None
50
 
 
51
  async def get_valid_tenant_access_token(app_id: str) -> str | None:
52
  """
53
  Retrieves a valid tenant access token for the given app_id.
 
60
  Returns:
61
  A valid tenant access token or None if unable to obtain one.
62
  """
 
 
63
  # 1. Prioritize fetching from the global store
64
  stored_token_info = token_store.get(app_id)
 
 
 
65
  if stored_token_info:
66
  token = stored_token_info.get('token')
67
  created_time = stored_token_info.get('created_time')
 
103
  response = await client.post(url, headers=headers, json=payload)
104
  response.raise_for_status() # Raise an exception for bad status codes
105
  data = response.json()
 
106
  if data.get("code") == 0:
107
  new_token = data.get("tenant_access_token")
108
  expire = data.get("expire") # Note: Feishu API returns 'expire_in'
 
 
 
 
109
  if new_token and expire is not None:
 
110
  # 4. Store the new token and its expiry information
111
  token_store[app_id] = {
112
  'token': new_token,
113
  'created_time': time.time(),
114
  'expire': expire
115
  }
 
 
116
  return new_token
117
  else:
118
  print(f"Error generating new tenant access token: Missing token or expiry info in response.")
 
155
  try:
156
  response = await client.post(url, headers=headers, json=body)
157
  response.raise_for_status() # Raise an exception for bad status codes
 
 
158
  except httpx.HTTPStatusError as e:
159
  print(f"HTTP error sending reply: {e}")
160
  except httpx.RequestError as e:
main.py CHANGED
@@ -85,8 +85,6 @@ async def handle_webhook(request: Request):
85
 
86
  # Publish the processed message data to Redis
87
  await publish_to_redis(REDIS_CHANNEL_NAME, json.dumps(ret_data))
88
-
89
- print('\n\n\nret_data: ', json.dumps(ret_data, indent=2)) # Use indent for readability
90
  return {'status': 'OK'}
91
 
92
  # The /publish endpoint is now handled by the publish_message function in redis_service.py
 
85
 
86
  # Publish the processed message data to Redis
87
  await publish_to_redis(REDIS_CHANNEL_NAME, json.dumps(ret_data))
 
 
88
  return {'status': 'OK'}
89
 
90
  # The /publish endpoint is now handled by the publish_message function in redis_service.py
redis_service.py CHANGED
@@ -31,9 +31,7 @@ async def connect_redis():
31
  # Perform SET and GET test after successful connection
32
  try:
33
  await r.set("mykey", "myvalue")
34
- print("Redis SET test successful: mykey = myvalue")
35
  value = await r.get("mykey")
36
- print(f"Redis GET test successful: mykey = {value.decode()}")
37
  except Exception as e:
38
  print(f"Redis SET/GET test failed: {e}")
39
 
@@ -46,7 +44,7 @@ async def redis_subscriber(redis_client, channel_name):
46
  """
47
  Asynchronous function to subscribe to a Redis channel and process messages.
48
  """
49
- print(f"Subscribing to channel: {channel_name}")
50
  pubsub = redis_client.pubsub()
51
  await asyncio.sleep(1) # Add a small delay before subscribing
52
  await pubsub.subscribe(channel_name) # Use await for async subscribe
@@ -55,15 +53,13 @@ async def redis_subscriber(redis_client, channel_name):
55
  while True:
56
  message = await pubsub.get_message(ignore_subscribe_messages=True)
57
  if message:
58
- print("\n\n\n**************************\n\nmessage in...")
59
  channel = message['channel'].decode()
60
  data_str = message['data'].decode()
61
- print(f"\n\n\nReceived message on channel '{channel}': {data_str}")
62
 
63
  # Convert the received JSON string back to a Python dictionary
64
  try:
65
  data_dict = json.loads(data_str)
66
- print(f"\nParsed message data as dictionary: {data_dict}")
67
 
68
  # Extract app_id from platform_specific
69
  app_id = data_dict.get('platform_specific', {}).get('app_id')
@@ -75,9 +71,6 @@ async def redis_subscriber(redis_client, channel_name):
75
  else:
76
  print("Missing app_id in received data, cannot get tenant access token.")
77
 
78
- print(f"\n\n\n\n&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&\ntenant_access_token: {tenant_access_token}")
79
-
80
-
81
  if tenant_access_token:
82
  # Extract necessary info from data_dict for sending reply
83
  msg_id = data_dict.get('msg_id')
@@ -100,7 +93,6 @@ async def redis_subscriber(redis_client, channel_name):
100
  except Exception as e:
101
  print(f"An unexpected error occurred while processing message: {e}")
102
 
103
-
104
  await asyncio.sleep(0.01) # Prevent blocking the event loop
105
 
106
  async def publish_message(channel: str, message: str):
@@ -110,7 +102,7 @@ async def publish_message(channel: str, message: str):
110
  try:
111
  # Publish the message to the specified channel (publish is asynchronous)
112
  await r.publish(channel, message)
113
- print(f"Published message to channel '{channel}'")
114
  return {"status": "success", "message": f"Published message to channel '{channel}'"}
115
 
116
  except Exception as e:
 
31
  # Perform SET and GET test after successful connection
32
  try:
33
  await r.set("mykey", "myvalue")
 
34
  value = await r.get("mykey")
 
35
  except Exception as e:
36
  print(f"Redis SET/GET test failed: {e}")
37
 
 
44
  """
45
  Asynchronous function to subscribe to a Redis channel and process messages.
46
  """
47
+ # print(f"Subscribing to channel: {channel_name}")
48
  pubsub = redis_client.pubsub()
49
  await asyncio.sleep(1) # Add a small delay before subscribing
50
  await pubsub.subscribe(channel_name) # Use await for async subscribe
 
53
  while True:
54
  message = await pubsub.get_message(ignore_subscribe_messages=True)
55
  if message:
 
56
  channel = message['channel'].decode()
57
  data_str = message['data'].decode()
58
+ # print(f"\n\n\nReceived message on channel '{channel}': {data_str}")
59
 
60
  # Convert the received JSON string back to a Python dictionary
61
  try:
62
  data_dict = json.loads(data_str)
 
63
 
64
  # Extract app_id from platform_specific
65
  app_id = data_dict.get('platform_specific', {}).get('app_id')
 
71
  else:
72
  print("Missing app_id in received data, cannot get tenant access token.")
73
 
 
 
 
74
  if tenant_access_token:
75
  # Extract necessary info from data_dict for sending reply
76
  msg_id = data_dict.get('msg_id')
 
93
  except Exception as e:
94
  print(f"An unexpected error occurred while processing message: {e}")
95
 
 
96
  await asyncio.sleep(0.01) # Prevent blocking the event loop
97
 
98
  async def publish_message(channel: str, message: str):
 
102
  try:
103
  # Publish the message to the specified channel (publish is asynchronous)
104
  await r.publish(channel, message)
105
+ # print(f"Published message to channel '{channel}'")
106
  return {"status": "success", "message": f"Published message to channel '{channel}'"}
107
 
108
  except Exception as e: