File size: 4,147 Bytes
6fd8a52 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 |
from fastapi import FastAPI, Request
import time
import asyncio
import json
from dotenv import load_dotenv
# Import functions from the new service modules
from redis_service import connect_redis, redis_subscriber, r as redis_client, publish_message as publish_to_redis
from feishu_service import get_valid_tenant_access_token, send_feishu_reply
load_dotenv()
# Replace 'my_feishu_channel' with the actual channel name you want to subscribe to
REDIS_CHANNEL_NAME = 'my_feishu_channel'
app = FastAPI()
# Startup event to connect to Redis and start the subscriber task
@app.on_event("startup")
async def startup_event():
# Connect to Redis
await connect_redis()
# Start the Redis subscriber as a background task
asyncio.create_task(redis_subscriber(redis_client, REDIS_CHANNEL_NAME))
print("Redis subscriber task started.")
@app.get("/")
def greet_json():
return {"Hello": "World!"}
@app.post("/webhook/feishu")
async def handle_webhook(request: Request):
# Read the incoming JSON body
data = await request.json()
# Check if 'type' exists in data and is equal to "url_verification"
if data.get('type') == "url_verification":
# If it's a URL verification request, return the challenge
print("Received URL verification request.")
return {"challenge": data.get('challenge')}
# Process other event types
# The full event data is under data['event']
event = data.get('event', {})
message = event.get('message', {})
chat_type = message.get('chat_type')
message_type = 'unknown'
if chat_type == 'p2p':
message_type = 'private'
elif chat_type == 'group':
if message.get('mentions'):
message_type = 'group_mention'
else:
message_type = 'group_all'
# Extract content safely
content_str = message.get('content')
content_dict = {}
if content_str:
try:
content_dict = json.loads(content_str)
except json.JSONDecodeError:
print(f"Warning: Could not parse message content as JSON: {content_str}")
# Handle cases where content is not valid JSON, maybe store as plain text
content_dict = {"text": content_str} # Store raw content as text
ret_data = {
'bot_id': data.get('header', {}).get('app_id'),
'message_type': message_type, # This is chat type (private/group)
'original_msg_type': message.get('message_type'), # Add original message type (text/image etc.)
'msg_id': message.get('message_id'),
"content": content_dict, # Use the parsed content dictionary
"sender": {
"id": event.get('sender', {}).get('sender_id', {}).get('open_id'),
"sender_type": event.get('sender', {}).get('sender_type'),
"name": "", # Name is not directly available in this payload structure
"is_human": None # Not directly available
},
"timestamp": time.time(),
"platform_specific": {
"app_id": data.get('header', {}).get('app_id')
}
}
# Publish the processed message data to Redis
await publish_to_redis(REDIS_CHANNEL_NAME, json.dumps(ret_data))
return {'status': 'OK'}
# The /publish endpoint is now handled by the publish_message function in redis_service.py
# We can expose it here if needed, or remove it if it's only for internal use.
# For now, let's keep it as an example of calling the service function from a route.
@app.post("/publish")
async def publish_message_route(request: Request):
"""
Publishes a message to a Redis channel via the redis_service.
Expects JSON body with 'channel' and 'message'.
"""
try:
data = await request.json()
channel = data.get("channel")
message = data.get("message")
if not channel or not message:
return {"status": "error", "message": "Missing channel or message"}
# Call the publish function from redis_service
result = await publish_to_redis(channel, message)
return result
except Exception as e:
return {"status": "error", "message": str(e)}
|