from fastapi import FastAPI, Request import time import asyncio import json from dotenv import load_dotenv # Import functions from the new service modules from redis_service import connect_redis, redis_subscriber, r as redis_client, publish_message as publish_to_redis from feishu_service import get_valid_tenant_access_token, send_feishu_reply load_dotenv() # Replace 'my_feishu_channel' with the actual channel name you want to subscribe to REDIS_CHANNEL_NAME = 'my_feishu_channel' app = FastAPI() # Startup event to connect to Redis and start the subscriber task @app.on_event("startup") async def startup_event(): # Connect to Redis await connect_redis() # Start the Redis subscriber as a background task asyncio.create_task(redis_subscriber(redis_client, REDIS_CHANNEL_NAME)) print("Redis subscriber task started.") @app.get("/") def greet_json(): return {"Hello": "World!"} @app.post("/webhook/feishu") async def handle_webhook(request: Request): # Read the incoming JSON body data = await request.json() # Check if 'type' exists in data and is equal to "url_verification" if data.get('type') == "url_verification": # If it's a URL verification request, return the challenge print("Received URL verification request.") return {"challenge": data.get('challenge')} # Process other event types # The full event data is under data['event'] event = data.get('event', {}) message = event.get('message', {}) chat_type = message.get('chat_type') message_type = 'unknown' if chat_type == 'p2p': message_type = 'private' elif chat_type == 'group': if message.get('mentions'): message_type = 'group_mention' else: message_type = 'group_all' # Extract content safely content_str = message.get('content') content_dict = {} if content_str: try: content_dict = json.loads(content_str) except json.JSONDecodeError: print(f"Warning: Could not parse message content as JSON: {content_str}") # Handle cases where content is not valid JSON, maybe store as plain text content_dict = {"text": content_str} # Store raw content as text ret_data = { 'bot_id': data.get('header', {}).get('app_id'), 'message_type': message_type, # This is chat type (private/group) 'original_msg_type': message.get('message_type'), # Add original message type (text/image etc.) 'msg_id': message.get('message_id'), "content": content_dict, # Use the parsed content dictionary "sender": { "id": event.get('sender', {}).get('sender_id', {}).get('open_id'), "sender_type": event.get('sender', {}).get('sender_type'), "name": "", # Name is not directly available in this payload structure "is_human": None # Not directly available }, "timestamp": time.time(), "platform_specific": { "app_id": data.get('header', {}).get('app_id') } } # Publish the processed message data to Redis await publish_to_redis(REDIS_CHANNEL_NAME, json.dumps(ret_data)) return {'status': 'OK'} # The /publish endpoint is now handled by the publish_message function in redis_service.py # We can expose it here if needed, or remove it if it's only for internal use. # For now, let's keep it as an example of calling the service function from a route. @app.post("/publish") async def publish_message_route(request: Request): """ Publishes a message to a Redis channel via the redis_service. Expects JSON body with 'channel' and 'message'. """ try: data = await request.json() channel = data.get("channel") message = data.get("message") if not channel or not message: return {"status": "error", "message": "Missing channel or message"} # Call the publish function from redis_service result = await publish_to_redis(channel, message) return result except Exception as e: return {"status": "error", "message": str(e)}