File size: 7,788 Bytes
63d40fb 6fd8a52 aa3e29b 6fd8a52 aa3e29b 6fd8a52 a248251 6fd8a52 63d40fb d859f7d 63d40fb d859f7d 6fd8a52 aa3e29b 6fd8a52 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
import os, requests
import asyncio
import redis.asyncio as redis
from dotenv import load_dotenv
from redis.exceptions import ConnectionError
import json
from feishu_service import get_valid_tenant_access_token, send_feishu_reply # Import Feishu service functions
load_dotenv()
redis_host = os.getenv('REDIS_HOST')
redis_password = os.getenv('REDIS_PASSWORD')
redis_port = os.getenv('REDIS_PORT', 6379)
# Use async Redis client
r = redis.Redis(
host=redis_host,
port=int(redis_port),
password=redis_password,
ssl=True
)
async def connect_redis():
"""
Connects to Redis and performs a ping test.
"""
try:
await r.ping()
print("Connected to Redis!")
# Perform SET and GET test after successful connection
try:
await r.set("mykey", "myvalue")
value = await r.get("mykey")
except Exception as e:
print(f"Redis SET/GET test failed: {e}")
except ConnectionError as e:
print(f"Could not connect to Redis: {e}")
# If connection fails, we might not be able to proceed further
raise # Re-raise the exception to indicate connection failure
async def redis_subscriber(redis_client, channel_name):
"""
Asynchronous function to subscribe to a Redis channel and process messages.
"""
# print(f"Subscribing to channel: {channel_name}")
pubsub = redis_client.pubsub()
await asyncio.sleep(1) # Add a small delay before subscribing
await pubsub.subscribe(channel_name) # Use await for async subscribe
# Listen for messages
while True:
message = await pubsub.get_message(ignore_subscribe_messages=True)
if message:
channel = message['channel'].decode()
data_str = message['data'].decode()
# print(f"\n\n\nReceived message on channel '{channel}': {data_str}")
# Convert the received JSON string back to a Python dictionary
try:
data_dict = json.loads(data_str)
# Extract app_id from platform_specific
app_id = data_dict.get('platform_specific', {}).get('app_id')
# Get valid tenant access token using the Feishu service function, passing only app_id
tenant_access_token = None
if app_id:
tenant_access_token = await get_valid_tenant_access_token(app_id)
else:
print("Missing app_id in received data, cannot get tenant access token.")
if tenant_access_token:
# Extract necessary info from data_dict for sending reply
msg_id = data_dict.get('msg_id')
content = data_dict.get('content') # This should be the dictionary like {"text": "..."}
print('content["text"]',content["text"])
response = requests.get("https://tanbushi-adk-teams.hf.space/apps/ai_quant/users/tanbushi/sessions/ai_quant_tanbushi_session")
if (response.status_code != 200):
print("Failed to get session")
response=requests.post("https://tanbushi-adk-teams.hf.space/apps/ai_quant/users/tanbushi/sessions/ai_quant_tanbushi_session")
if (response.status_code==200):
data = {
"app_name": "ai_quant",
"user_id": "user",
"session_id": "232d4228-d3a2-41ee-99eb-3db5b307260e",
"new_message": {
"parts": [
{
"text": content["text"]
}
],
"role": "user"
},
"streaming": False
}
response1 = requests.post("https://tanbushi-adk-teams.hf.space/run", json=data)
response1_json=response1.json()
print('\n\n\n\n\n\n\n\nresponse1_json: ',response1_json)
# 引用自检索结果[6]
if response1_json[0]["content"]["parts"][0].get("function_call"):
function_call = response1_json[0]["content"]["parts"][0]["function_call"]
print(f"函数调用: {function_call['name']}")
else:
text_content = "".join([part["text"] for part in response1_json[0]["content"]["parts"]])
print(text_content)
else:
print("Successfully get session")
data = {
"app_name": "ai_quant",
"user_id": "tanbushi",
"session_id": "ai_quant_tanbushi_session",
"new_message": {
"parts": [
{
"text": content["text"]
}
],
"role": "user"
},
"streaming": False
}
response1 = requests.post("https://tanbushi-adk-teams.hf.space/run", json=data)
response1_json=response1.json()
print('\n\n\n\n\n\n\n\nresponse1_json: ', response1_json)
if response1_json[0]["content"]["parts"][0].get("function_call"):
function_call = response1_json[0]["content"]["parts"][0]["function_call"]
print(f"函数调用: {function_call['name']}")
else:
text_content = "".join([part["text"] for part in response1_json[0]["content"]["parts"]])
print(text_content)
content = {"text": text_content}
# Correctly extract msg_type from the original_msg_type field
msg_type = data_dict.get('original_msg_type')
if msg_id and content and msg_type:
# Send the reply using the Feishu service function
await send_feishu_reply(msg_id, tenant_access_token, content, msg_type)
else:
print("Missing msg_id, content, or msg_type in received data, cannot send reply.")
else:
print("Failed to get tenant access token, cannot process message.")
except json.JSONDecodeError as e:
print(f"Could not parse message data as JSON: {e}")
# Handle cases where the message is not valid JSON
except Exception as e:
print(f"An unexpected error occurred while processing message: {e}")
await asyncio.sleep(0.01) # Prevent blocking the event loop
async def publish_message(channel: str, message: str):
"""
Publishes a message to a Redis channel.
"""
try:
# Publish the message to the specified channel (publish is asynchronous)
await r.publish(channel, message)
# print(f"Published message to channel '{channel}'")
return {"status": "success", "message": f"Published message to channel '{channel}'"}
except Exception as e:
print(f"Error publishing message: {e}")
return {"status": "error", "message": str(e)}
|