import os, requests import asyncio import redis.asyncio as redis from dotenv import load_dotenv from redis.exceptions import ConnectionError import json from feishu_service import get_valid_tenant_access_token, send_feishu_reply # Import Feishu service functions load_dotenv() redis_host = os.getenv('REDIS_HOST') redis_password = os.getenv('REDIS_PASSWORD') redis_port = os.getenv('REDIS_PORT', 6379) # Use async Redis client r = redis.Redis( host=redis_host, port=int(redis_port), password=redis_password, ssl=True ) async def connect_redis(): """ Connects to Redis and performs a ping test. """ try: await r.ping() print("Connected to Redis!") # Perform SET and GET test after successful connection try: await r.set("mykey", "myvalue") value = await r.get("mykey") except Exception as e: print(f"Redis SET/GET test failed: {e}") except ConnectionError as e: print(f"Could not connect to Redis: {e}") # If connection fails, we might not be able to proceed further raise # Re-raise the exception to indicate connection failure async def redis_subscriber(redis_client, channel_name): """ Asynchronous function to subscribe to a Redis channel and process messages. """ # print(f"Subscribing to channel: {channel_name}") pubsub = redis_client.pubsub() await asyncio.sleep(1) # Add a small delay before subscribing await pubsub.subscribe(channel_name) # Use await for async subscribe # Listen for messages while True: message = await pubsub.get_message(ignore_subscribe_messages=True) if message: channel = message['channel'].decode() data_str = message['data'].decode() # print(f"\n\n\nReceived message on channel '{channel}': {data_str}") # Convert the received JSON string back to a Python dictionary try: data_dict = json.loads(data_str) # Extract app_id from platform_specific app_id = data_dict.get('platform_specific', {}).get('app_id') # Get valid tenant access token using the Feishu service function, passing only app_id tenant_access_token = None if app_id: tenant_access_token = await get_valid_tenant_access_token(app_id) else: print("Missing app_id in received data, cannot get tenant access token.") if tenant_access_token: # Extract necessary info from data_dict for sending reply msg_id = data_dict.get('msg_id') content = data_dict.get('content') # This should be the dictionary like {"text": "..."} print('content["text"]',content["text"]) response = requests.get("https://tanbushi-adk-teams.hf.space/apps/ai_quant/users/tanbushi/sessions/ai_quant_tanbushi_session") if (response.status_code != 200): print("Failed to get session") response=requests.post("https://tanbushi-adk-teams.hf.space/apps/ai_quant/users/tanbushi/sessions/ai_quant_tanbushi_session") if (response.status_code==200): data = { "app_name": "ai_quant", "user_id": "user", "session_id": "232d4228-d3a2-41ee-99eb-3db5b307260e", "new_message": { "parts": [ { "text": content["text"] } ], "role": "user" }, "streaming": False } response1 = requests.post("https://tanbushi-adk-teams.hf.space/run", json=data) response1_json=response1.json() print('\n\n\n\n\n\n\n\nresponse1_json: ',response1_json) # 引用自检索结果[6] if response1_json[0]["content"]["parts"][0].get("function_call"): function_call = response1_json[0]["content"]["parts"][0]["function_call"] print(f"函数调用: {function_call['name']}") else: text_content = "".join([part["text"] for part in response1_json[0]["content"]["parts"]]) print(text_content) else: print("Successfully get session") data = { "app_name": "ai_quant", "user_id": "tanbushi", "session_id": "ai_quant_tanbushi_session", "new_message": { "parts": [ { "text": content["text"] } ], "role": "user" }, "streaming": False } response1 = requests.post("https://tanbushi-adk-teams.hf.space/run", json=data) response1_json=response1.json() print('\n\n\n\n\n\n\n\nresponse1_json: ', response1_json) if response1_json[0]["content"]["parts"][0].get("function_call"): function_call = response1_json[0]["content"]["parts"][0]["function_call"] print(f"函数调用: {function_call['name']}") else: text_content = "".join([part["text"] for part in response1_json[0]["content"]["parts"]]) print(text_content) content = {"text": text_content} # Correctly extract msg_type from the original_msg_type field msg_type = data_dict.get('original_msg_type') if msg_id and content and msg_type: # Send the reply using the Feishu service function await send_feishu_reply(msg_id, tenant_access_token, content, msg_type) else: print("Missing msg_id, content, or msg_type in received data, cannot send reply.") else: print("Failed to get tenant access token, cannot process message.") except json.JSONDecodeError as e: print(f"Could not parse message data as JSON: {e}") # Handle cases where the message is not valid JSON except Exception as e: print(f"An unexpected error occurred while processing message: {e}") await asyncio.sleep(0.01) # Prevent blocking the event loop async def publish_message(channel: str, message: str): """ Publishes a message to a Redis channel. """ try: # Publish the message to the specified channel (publish is asynchronous) await r.publish(channel, message) # print(f"Published message to channel '{channel}'") return {"status": "success", "message": f"Published message to channel '{channel}'"} except Exception as e: print(f"Error publishing message: {e}") return {"status": "error", "message": str(e)}