Spaces:
Sleeping
Sleeping
import os | |
import json | |
import httpx | |
from fastapi import FastAPI, Request, Form, HTTPException, APIRouter | |
from fastapi.middleware.cors import CORSMiddleware | |
from typing import Optional | |
from fastapi.responses import JSONResponse | |
from openai import OpenAI | |
# Create a router for our API endpoints | |
router = APIRouter() | |
# Create the FastAPI app | |
app = FastAPI() | |
# Add CORS middleware to allow requests from any origin | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Root endpoint for health checks | |
async def root(): | |
return {"status": "ok", "message": "Server is running"} | |
# Slack configuration | |
SLACK_VERIFICATION_TOKEN = os.getenv("SLACK_VERIFICATION_TOKEN") | |
# Add your Slack user ID (you can find this in your Slack profile) | |
ALLOWED_SLACK_USER_ID = os.getenv("YOUR_SLACK_USER_ID") | |
# Load KB | |
with open("kb.json") as f: | |
kb = json.load(f) | |
# Build system prompt | |
system_prompt = "You are a helpful assistant. Only answer questions based on the following knowledge base:\n\n" | |
for q, a in kb.items(): | |
system_prompt += f"Q: {q}\nA: {a}\n\n" | |
system_prompt += "If the question is not in the knowledge base, respond with: 'I'm not sure about that. Let me connect you with a human agent.'" | |
# OpenAI setup | |
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
client.base_url = os.getenv("OPENAI_API_BASE", "https://api.openai.com/v1") | |
# Chatwoot config | |
CHATWOOT_BASE_URL = os.getenv("CHATWOOT_BASE_URL") | |
CHATWOOT_API_KEY = os.getenv("CHATWOOT_API_KEY") | |
CHATWOOT_ACCOUNT_ID = int(os.getenv("CHATWOOT_ACCOUNT_ID")) # e.g., 123911 | |
# Track conversations where AI should stop replying | |
stop_reply_conversations = set() | |
async def ask(request: Request): | |
print("π /ask endpoint was HIT") # <-- Log immediately on hit | |
try: | |
payload = await request.json() | |
print("π₯ Incoming payload:", json.dumps(payload, indent=2)) | |
except Exception as e: | |
print("β Failed to parse JSON payload:", e) | |
return {"status": "error", "detail": "Invalid JSON"} | |
account_id = payload.get("account", {}).get("id") | |
conversation_id = str(payload.get("conversation", {}).get("id")) | |
sender = payload.get("sender") or {} | |
sender_id = sender.get("id") | |
sender_role = (sender.get("role") or "").lower() | |
message_type = payload.get("message_type", "").lower() | |
message_content = payload.get("content", "").strip() | |
print(f"π§Ύ sender_id: {sender_id}, sender_role: {sender_role}, account_id: {account_id}") | |
# Step 1: Detect agent message via Slack and disable AI for that conversation | |
if message_type != "incoming": | |
messages = payload.get("conversation", {}).get("messages", []) | |
if messages: | |
msg = messages[0] | |
external_ids = msg.get("external_source_ids", {}) | |
if "slack" in external_ids: | |
stop_reply_conversations.add(conversation_id) | |
print(f"π Human intervened via Slack in conversation {conversation_id}. Disabling AI.") | |
return {"status": "AI disabled due to Slack intervention"} | |
print("β οΈ Ignoring non-incoming message") | |
return {"status": "ignored"} | |
# Bot must not reply to itself | |
if sender_id == account_id: | |
print("β οΈ Ignoring bot's own message") | |
return {"status": "ignored"} | |
# Handle special bot resume command | |
if sender_role == "agent" and message_content.lower() == "#botresume": | |
stop_reply_conversations.discard(conversation_id) | |
print(f"βΉοΈ Bot resumed for conversation {conversation_id}") | |
await send_chatwoot_message(conversation_id, "Bot resumed and will reply to users now.") | |
return {"status": "bot resumed"} | |
# Check if AI is blacklisted for this conversation | |
if conversation_id in stop_reply_conversations: | |
print(f"π« AI is disabled for conversation {conversation_id}") | |
return {"status": "ignored: human takeover"} | |
# Ensure all data is present | |
if not message_content or not conversation_id: | |
print("β Missing content or conversation ID") | |
return {"status": "invalid payload"} | |
# Build messages for GPT | |
messages = [ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": message_content}, | |
] | |
try: | |
response = client.chat.completions.create( | |
model="deepseek-ai/DeepSeek-V3", | |
messages=messages, | |
temperature=0, | |
max_tokens=200, | |
) | |
answer = response.choices[0].message.content.strip() | |
print("β GPT Answer:", answer) | |
# β Extract and send token usage to Slack with conversation history | |
usage = response.usage | |
if usage: | |
prompt_tokens = usage.prompt_tokens | |
completion_tokens = usage.completion_tokens | |
total_tokens = usage.total_tokens | |
# Get conversation history (last 3 messages) | |
messages = payload.get('conversation', {}).get('messages', [])[-6:] # Get last 3 exchanges (6 messages) | |
conversation_history = [] | |
for msg in messages: | |
role = "π€ User" if msg.get('message_type') == 'incoming' else "π€ AI" | |
content = msg.get('content', '').strip() | |
if content: | |
conversation_history.append(f"{role}: {content}") | |
# Add current exchange | |
conversation_history.append(f"π€ User: {message_content}") | |
conversation_history.append(f"π€ AI: {answer}") | |
# Format conversation history with code blocks for better readability | |
formatted_history = "\n".join(conversation_history) | |
slack_msg = ( | |
f"π¬ *Conversation Update* - `{conversation_id}`\n\n" | |
f"*π Conversation History:*\n```\n{formatted_history}\n```\n\n" | |
f"*β‘ Token Usage:*\n```\n" | |
f"Prompt: {prompt_tokens} tokens\n" | |
f"Completion: {completion_tokens} tokens\n" | |
f"Total: {total_tokens} tokens\n" | |
f"Model: {response.model or 'N/A'}\n" | |
f"```" | |
) | |
await send_to_slack(slack_msg) | |
else: | |
print("β οΈ No token usage info returned from API") | |
except Exception as e: | |
print("β OpenAI Error:", e) | |
answer = "Sorry, I'm having trouble answering right now." | |
if answer == "I'm not sure about that. Let me connect you with a human agent.": | |
stop_reply_conversations.add(conversation_id) | |
print(f"π« Fallback answer, disabling AI for conversation {conversation_id}") | |
await send_chatwoot_message(conversation_id, answer) | |
return {"status": "ok"} | |
async def send_to_slack(message: str): | |
webhook_url = os.getenv("SLACK_WEBHOOK_URL") | |
if not webhook_url: | |
print("β SLACK_WEBHOOK_URL not set") | |
return | |
payload = {"text": message} | |
try: | |
async with httpx.AsyncClient() as http: | |
resp = await http.post(webhook_url, json=payload) | |
print("π¨ Slack response:", resp.status_code, resp.text) | |
except Exception as e: | |
print("β Slack Send Error:", e) | |
async def get_chatwoot_conversation(conversation_id: str): | |
"""Fetch conversation details from Chatwoot""" | |
try: | |
async with httpx.AsyncClient() as http: | |
# Get conversation details | |
conv_url = f"{CHATWOOT_BASE_URL}/api/v1/accounts/{CHATWOOT_ACCOUNT_ID}/conversations/{conversation_id}" | |
conv_resp = await http.get( | |
conv_url, | |
headers={"api_access_token": CHATWOOT_API_KEY} | |
) | |
conv_resp.raise_for_status() | |
conversation = conv_resp.json() | |
# Get conversation messages | |
msgs_url = f"{CHATWOOT_BASE_URL}/api/v1/accounts/{CHATWOOT_ACCOUNT_ID}/conversations/{conversation_id}/messages" | |
msgs_resp = await http.get( | |
msgs_url, | |
headers={"api_access_token": CHATWOOT_API_KEY} | |
) | |
msgs_resp.raise_for_status() | |
messages = msgs_resp.json() | |
return { | |
"conversation": conversation, | |
"messages": messages | |
} | |
except Exception as e: | |
print(f"β Error fetching Chatwoot conversation: {e}") | |
return None | |
def format_slack_message(conversation_data): | |
"""Format conversation data into Slack blocks""" | |
if not conversation_data: | |
return {"text": "Error fetching conversation"} | |
conv = conversation_data["conversation"] | |
messages = conversation_data["messages"] | |
# Create blocks for the message | |
blocks = [ | |
{ | |
"type": "header", | |
"text": { | |
"type": "plain_text", | |
"text": f"π¬ Conversation #{conv['id']}", | |
"emoji": True | |
} | |
}, | |
{ | |
"type": "section", | |
"fields": [ | |
{ | |
"type": "mrkdwn", | |
"text": f"*Status:* {conv['status']}" | |
}, | |
{ | |
"type": "mrkdwn", | |
"text": f"*Inbox:* {conv['inbox']['name']}" | |
}, | |
{ | |
"type": "mrkdwn", | |
"text": f"*Assignee:* {conv['meta']['assignee']['name'] if conv['meta']['assignee'] else 'Unassigned'}" | |
}, | |
{ | |
"type": "mrkdwn", | |
"text": f"*Created:* {conv['created_at']}" | |
} | |
] | |
}, | |
{ | |
"type": "divider" | |
} | |
] | |
# Add messages | |
for msg in sorted(messages, key=lambda x: x['created_at']): | |
if msg["message_type"] in ["incoming", "outgoing"] and msg["content"].strip(): | |
sender = "π€" if msg["message_type"] == "incoming" else "π€" | |
blocks.append({ | |
"type": "section", | |
"text": { | |
"type": "mrkdwn", | |
"text": f"{sender} *{msg['sender']['available_name']}*\n{msg['content']}" | |
} | |
}) | |
blocks.append({"type": "divider"}) | |
return { | |
"response_type": "in_channel", | |
"blocks": blocks | |
} | |
async def slack_command( | |
token: str = Form(...), | |
command: str = Form(...), | |
text: str = Form(...), | |
response_url: str = Form(...), | |
user_id: str = Form(...) | |
): | |
"""Handle Slack slash command to fetch Chatwoot conversation""" | |
# Verify the Slack token and user | |
if token != SLACK_VERIFICATION_TOKEN: | |
raise HTTPException(status_code=401, detail="Invalid token") | |
# Check if the user is authorized | |
if user_id != ALLOWED_SLACK_USER_ID: | |
return { | |
"response_type": "ephemeral", | |
"text": "β This command is restricted to authorized users only." | |
} | |
# Extract conversation ID from command text | |
conversation_id = text.strip() | |
if not conversation_id.isdigit(): | |
return { | |
"response_type": "ephemeral", | |
"text": "β Please provide a valid conversation ID. Usage: `/conversation <conversation_id>`" | |
} | |
# Send immediate response (Slack requires response within 3 seconds) | |
response = { | |
"response_type": "ephemeral", | |
"text": f"Fetching conversation {conversation_id}..." | |
} | |
# Fetch conversation data in the background | |
import asyncio | |
asyncio.create_task(_process_slack_command(conversation_id, response_url)) | |
return response | |
async def _process_slack_command(conversation_id: str, response_url: str): | |
"""Process the Slack command asynchronously""" | |
try: | |
# Fetch conversation data | |
conversation_data = await get_chatwoot_conversation(conversation_id) | |
if not conversation_data: | |
raise Exception("Failed to fetch conversation data") | |
# Format the response | |
formatted_response = format_slack_message(conversation_data) | |
# Send the formatted response to Slack | |
async with httpx.AsyncClient() as http: | |
await http.post( | |
response_url, | |
json=formatted_response, | |
headers={"Content-Type": "application/json"} | |
) | |
except Exception as e: | |
error_msg = { | |
"response_type": "ephemeral", | |
"text": f"β Error fetching conversation: {str(e)}" | |
} | |
async with httpx.AsyncClient() as http: | |
await http.post( | |
response_url, | |
json=error_msg, | |
headers={"Content-Type": "application/json"} | |
) | |
# Include the router with the base path | |
app.include_router(router, prefix="/api" if os.getenv('HF_SPACE') else "") | |
async def send_chatwoot_message(conversation_id: str, content: str): | |
message_payload = { | |
"content": content, | |
"message_type": "outgoing", | |
"private": False, | |
"content_type": "text", | |
"content_attributes": {} | |
} | |
try: | |
async with httpx.AsyncClient() as http: | |
url = f"{CHATWOOT_BASE_URL}/api/v1/accounts/{CHATWOOT_ACCOUNT_ID}/conversations/{conversation_id}/messages" | |
print("π€ Sending to Chatwoot:", url) | |
print("π¦ Payload:", json.dumps(message_payload, indent=2)) | |
resp = await http.post( | |
url, | |
headers={ | |
"Content-Type": "application/json", | |
"api_access_token": CHATWOOT_API_KEY, | |
}, | |
json=message_payload, | |
) | |
print("π¬ Chatwoot Response:", resp.status_code, resp.text) | |
except Exception as e: | |
print("β Chatwoot Send Error:", e) |