feishu / redis_service.py
geqintan's picture
update
aa3e29b
raw
history blame
4.31 kB
import os
import asyncio
import redis.asyncio as redis
from dotenv import load_dotenv
from redis.exceptions import ConnectionError
import json
from feishu_service import get_valid_tenant_access_token, send_feishu_reply # Import Feishu service functions
load_dotenv()
redis_host = os.getenv('REDIS_HOST')
redis_password = os.getenv('REDIS_PASSWORD')
redis_port = os.getenv('REDIS_PORT', 6379)
# Use async Redis client
r = redis.Redis(
host=redis_host,
port=int(redis_port),
password=redis_password,
ssl=True
)
async def connect_redis():
"""
Connects to Redis and performs a ping test.
"""
try:
await r.ping()
print("Connected to Redis!")
# Perform SET and GET test after successful connection
try:
await r.set("mykey", "myvalue")
value = await r.get("mykey")
except Exception as e:
print(f"Redis SET/GET test failed: {e}")
except ConnectionError as e:
print(f"Could not connect to Redis: {e}")
# If connection fails, we might not be able to proceed further
raise # Re-raise the exception to indicate connection failure
async def redis_subscriber(redis_client, channel_name):
"""
Asynchronous function to subscribe to a Redis channel and process messages.
"""
# print(f"Subscribing to channel: {channel_name}")
pubsub = redis_client.pubsub()
await asyncio.sleep(1) # Add a small delay before subscribing
await pubsub.subscribe(channel_name) # Use await for async subscribe
# Listen for messages
while True:
message = await pubsub.get_message(ignore_subscribe_messages=True)
if message:
channel = message['channel'].decode()
data_str = message['data'].decode()
# print(f"\n\n\nReceived message on channel '{channel}': {data_str}")
# Convert the received JSON string back to a Python dictionary
try:
data_dict = json.loads(data_str)
# Extract app_id from platform_specific
app_id = data_dict.get('platform_specific', {}).get('app_id')
# Get valid tenant access token using the Feishu service function, passing only app_id
tenant_access_token = None
if app_id:
tenant_access_token = await get_valid_tenant_access_token(app_id)
else:
print("Missing app_id in received data, cannot get tenant access token.")
if tenant_access_token:
# Extract necessary info from data_dict for sending reply
msg_id = data_dict.get('msg_id')
content = data_dict.get('content') # This should be the dictionary like {"text": "..."}
# Correctly extract msg_type from the original_msg_type field
msg_type = data_dict.get('original_msg_type')
if msg_id and content and msg_type:
# Send the reply using the Feishu service function
await send_feishu_reply(msg_id, tenant_access_token, content, msg_type)
else:
print("Missing msg_id, content, or msg_type in received data, cannot send reply.")
else:
print("Failed to get tenant access token, cannot process message.")
except json.JSONDecodeError as e:
print(f"Could not parse message data as JSON: {e}")
# Handle cases where the message is not valid JSON
except Exception as e:
print(f"An unexpected error occurred while processing message: {e}")
await asyncio.sleep(0.01) # Prevent blocking the event loop
async def publish_message(channel: str, message: str):
"""
Publishes a message to a Redis channel.
"""
try:
# Publish the message to the specified channel (publish is asynchronous)
await r.publish(channel, message)
# print(f"Published message to channel '{channel}'")
return {"status": "success", "message": f"Published message to channel '{channel}'"}
except Exception as e:
print(f"Error publishing message: {e}")
return {"status": "error", "message": str(e)}