|
import os, requests |
|
import asyncio |
|
import redis.asyncio as redis |
|
from dotenv import load_dotenv |
|
from redis.exceptions import ConnectionError |
|
import json |
|
from feishu_service import get_valid_tenant_access_token, send_feishu_reply |
|
|
|
load_dotenv() |
|
|
|
redis_host = os.getenv('REDIS_HOST') |
|
redis_password = os.getenv('REDIS_PASSWORD') |
|
redis_port = os.getenv('REDIS_PORT', 6379) |
|
|
|
|
|
r = redis.Redis( |
|
host=redis_host, |
|
port=int(redis_port), |
|
password=redis_password, |
|
ssl=True |
|
) |
|
|
|
async def connect_redis(): |
|
""" |
|
Connects to Redis and performs a ping test. |
|
""" |
|
try: |
|
await r.ping() |
|
print("Connected to Redis!") |
|
|
|
|
|
try: |
|
await r.set("mykey", "myvalue") |
|
value = await r.get("mykey") |
|
except Exception as e: |
|
print(f"Redis SET/GET test failed: {e}") |
|
|
|
except ConnectionError as e: |
|
print(f"Could not connect to Redis: {e}") |
|
|
|
raise |
|
|
|
async def redis_subscriber(redis_client, channel_name): |
|
""" |
|
Asynchronous function to subscribe to a Redis channel and process messages. |
|
""" |
|
|
|
pubsub = redis_client.pubsub() |
|
await asyncio.sleep(1) |
|
await pubsub.subscribe(channel_name) |
|
|
|
|
|
while True: |
|
message = await pubsub.get_message(ignore_subscribe_messages=True) |
|
if message: |
|
channel = message['channel'].decode() |
|
data_str = message['data'].decode() |
|
|
|
|
|
|
|
try: |
|
data_dict = json.loads(data_str) |
|
|
|
|
|
app_id = data_dict.get('platform_specific', {}).get('app_id') |
|
|
|
|
|
tenant_access_token = None |
|
if app_id: |
|
tenant_access_token = await get_valid_tenant_access_token(app_id) |
|
else: |
|
print("Missing app_id in received data, cannot get tenant access token.") |
|
|
|
if tenant_access_token: |
|
|
|
msg_id = data_dict.get('msg_id') |
|
content = data_dict.get('content') |
|
print('content["text"]',content["text"]) |
|
|
|
|
|
response = requests.get("https://tanbushi-adk-teams.hf.space/apps/ai_quant/users/tanbushi/sessions/ai_quant_tanbushi_session") |
|
if (response.status_code != 200): |
|
print("Failed to get session") |
|
response=requests.post("https://tanbushi-adk-teams.hf.space/apps/ai_quant/users/tanbushi/sessions/ai_quant_tanbushi_session") |
|
if (response.status_code==200): |
|
data = { |
|
"app_name": "ai_quant", |
|
"user_id": "user", |
|
"session_id": "232d4228-d3a2-41ee-99eb-3db5b307260e", |
|
"new_message": { |
|
"parts": [ |
|
{ |
|
"text": content["text"] |
|
} |
|
], |
|
"role": "user" |
|
}, |
|
"streaming": False |
|
} |
|
response1 = requests.post("https://tanbushi-adk-teams.hf.space/run", json=data) |
|
response1_json=response1.json() |
|
print('\n\n\n\n\n\n\n\nresponse1_json: ',response1_json) |
|
|
|
if response1_json[0]["content"]["parts"][0].get("function_call"): |
|
function_call = response1_json[0]["content"]["parts"][0]["function_call"] |
|
print(f"函数调用: {function_call['name']}") |
|
else: |
|
text_content = "".join([part["text"] for part in response1_json[0]["content"]["parts"]]) |
|
print(text_content) |
|
|
|
else: |
|
print("Successfully get session") |
|
data = { |
|
"app_name": "ai_quant", |
|
"user_id": "tanbushi", |
|
"session_id": "ai_quant_tanbushi_session", |
|
"new_message": { |
|
"parts": [ |
|
{ |
|
"text": content["text"] |
|
} |
|
], |
|
"role": "user" |
|
}, |
|
"streaming": False |
|
} |
|
response1 = requests.post("https://tanbushi-adk-teams.hf.space/run", json=data) |
|
response1_json=response1.json() |
|
print('\n\n\n\n\n\n\n\nresponse1_json: ', response1_json) |
|
if response1_json[0]["content"]["parts"][0].get("function_call"): |
|
function_call = response1_json[0]["content"]["parts"][0]["function_call"] |
|
print(f"函数调用: {function_call['name']}") |
|
else: |
|
text_content = "".join([part["text"] for part in response1_json[0]["content"]["parts"]]) |
|
print(text_content) |
|
|
|
content = {"text": text_content} |
|
|
|
msg_type = data_dict.get('original_msg_type') |
|
|
|
if msg_id and content and msg_type: |
|
|
|
await send_feishu_reply(msg_id, tenant_access_token, content, msg_type) |
|
else: |
|
print("Missing msg_id, content, or msg_type in received data, cannot send reply.") |
|
|
|
else: |
|
print("Failed to get tenant access token, cannot process message.") |
|
|
|
except json.JSONDecodeError as e: |
|
print(f"Could not parse message data as JSON: {e}") |
|
|
|
except Exception as e: |
|
print(f"An unexpected error occurred while processing message: {e}") |
|
|
|
await asyncio.sleep(0.01) |
|
|
|
async def publish_message(channel: str, message: str): |
|
""" |
|
Publishes a message to a Redis channel. |
|
""" |
|
try: |
|
|
|
await r.publish(channel, message) |
|
|
|
return {"status": "success", "message": f"Published message to channel '{channel}'"} |
|
|
|
except Exception as e: |
|
print(f"Error publishing message: {e}") |
|
return {"status": "error", "message": str(e)} |
|
|