qabot / app.py
ethiotech4848's picture
Update app.py
fa89e18 verified
raw
history blame
6.46 kB
import os
import json
import httpx
from fastapi import FastAPI, Request
from openai import OpenAI
app = FastAPI()
# Load KB
with open("kb.json") as f:
kb = json.load(f)
# Build system prompt
system_prompt = "You are a helpful assistant. Only answer questions based on the following knowledge base:\n\n"
for q, a in kb.items():
system_prompt += f"Q: {q}\nA: {a}\n\n"
system_prompt += "If the question is not in the knowledge base, respond with: 'I'm not sure about that. Let me connect you with a human agent.'"
# OpenAI setup
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
client.base_url = os.getenv("OPENAI_API_BASE", "https://api.openai.com/v1")
# Chatwoot config
CHATWOOT_BASE_URL = os.getenv("CHATWOOT_BASE_URL")
CHATWOOT_API_KEY = os.getenv("CHATWOOT_API_KEY")
CHATWOOT_ACCOUNT_ID = int(os.getenv("CHATWOOT_ACCOUNT_ID")) # e.g., 123911
# Track conversations where AI should stop replying
stop_reply_conversations = set()
@app.post("/ask")
async def ask(request: Request):
print("πŸ”” /ask endpoint was HIT") # <-- Log immediately on hit
try:
payload = await request.json()
print("πŸ“₯ Incoming payload:", json.dumps(payload, indent=2))
except Exception as e:
print("❌ Failed to parse JSON payload:", e)
return {"status": "error", "detail": "Invalid JSON"}
account_id = payload.get("account", {}).get("id")
conversation_id = str(payload.get("conversation", {}).get("id"))
sender = payload.get("sender") or {}
sender_id = sender.get("id")
sender_role = (sender.get("role") or "").lower()
message_type = payload.get("message_type", "").lower()
message_content = payload.get("content", "").strip()
print(f"🧾 sender_id: {sender_id}, sender_role: {sender_role}, account_id: {account_id}")
# Step 1: Detect agent message via Slack and disable AI for that conversation
if message_type != "incoming":
messages = payload.get("conversation", {}).get("messages", [])
if messages:
msg = messages[0]
external_ids = msg.get("external_source_ids", {})
if "slack" in external_ids:
stop_reply_conversations.add(conversation_id)
print(f"πŸ›‘ Human intervened via Slack in conversation {conversation_id}. Disabling AI.")
return {"status": "AI disabled due to Slack intervention"}
print("⚠️ Ignoring non-incoming message")
return {"status": "ignored"}
# Bot must not reply to itself
if sender_id == account_id:
print("⚠️ Ignoring bot's own message")
return {"status": "ignored"}
# Handle special bot resume command
if sender_role == "agent" and message_content.lower() == "#botresume":
stop_reply_conversations.discard(conversation_id)
print(f"ℹ️ Bot resumed for conversation {conversation_id}")
await send_chatwoot_message(conversation_id, "Bot resumed and will reply to users now.")
return {"status": "bot resumed"}
# Check if AI is blacklisted for this conversation
if conversation_id in stop_reply_conversations:
print(f"🚫 AI is disabled for conversation {conversation_id}")
return {"status": "ignored: human takeover"}
# Ensure all data is present
if not message_content or not conversation_id:
print("❌ Missing content or conversation ID")
return {"status": "invalid payload"}
# Build messages for GPT
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": message_content},
]
try:
response = client.chat.completions.create(
model="deepseek-ai/DeepSeek-V3",
messages=messages,
temperature=0,
max_tokens=200,
)
answer = response.choices[0].message.content.strip()
print("βœ… GPT Answer:", answer)
# βœ… Extract and send token usage to Slack
usage = response.usage
if usage:
prompt_tokens = usage.prompt_tokens
completion_tokens = usage.completion_tokens
total_tokens = usage.total_tokens
slack_msg = (
f"πŸ“Š Token usage for conversation `{conversation_id}`:\n"
f"> Prompt: {prompt_tokens} tokens\n"
f"> Completion: {completion_tokens} tokens\n"
f"> Total: {total_tokens} tokens"
)
await send_to_slack(slack_msg)
else:
print("⚠️ No token usage info returned from API")
except Exception as e:
print("❌ OpenAI Error:", e)
answer = "Sorry, I'm having trouble answering right now."
if answer == "I'm not sure about that. Let me connect you with a human agent.":
stop_reply_conversations.add(conversation_id)
print(f"🚫 Fallback answer, disabling AI for conversation {conversation_id}")
await send_chatwoot_message(conversation_id, answer)
return {"status": "ok"}
async def send_to_slack(message: str):
webhook_url = os.getenv("SLACK_WEBHOOK_URL")
if not webhook_url:
print("❌ SLACK_WEBHOOK_URL not set")
return
payload = {"text": message}
try:
async with httpx.AsyncClient() as http:
resp = await http.post(webhook_url, json=payload)
print("πŸ“¨ Slack response:", resp.status_code, resp.text)
except Exception as e:
print("❌ Slack Send Error:", e)
async def send_chatwoot_message(conversation_id: str, content: str):
message_payload = {
"content": content,
"message_type": "outgoing",
"private": False,
"content_type": "text",
"content_attributes": {}
}
try:
async with httpx.AsyncClient() as http:
url = f"{CHATWOOT_BASE_URL}/api/v1/accounts/{CHATWOOT_ACCOUNT_ID}/conversations/{conversation_id}/messages"
print("πŸ“€ Sending to Chatwoot:", url)
print("πŸ“¦ Payload:", json.dumps(message_payload, indent=2))
resp = await http.post(
url,
headers={
"Content-Type": "application/json",
"api_access_token": CHATWOOT_API_KEY,
},
json=message_payload,
)
print("πŸ“¬ Chatwoot Response:", resp.status_code, resp.text)
except Exception as e:
print("❌ Chatwoot Send Error:", e)