import os import json import httpx from fastapi import FastAPI, Request, Form, HTTPException, APIRouter from fastapi.middleware.cors import CORSMiddleware from typing import Optional from fastapi.responses import JSONResponse from openai import OpenAI # Create the FastAPI app app = FastAPI() # Add CORS middleware to allow requests from any origin app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Base path for Hugging Face Spaces BASE_PATH = "/api" if os.getenv('HF_SPACE') else "" # Root endpoint for health checks @app.get("/") async def root(): return {"status": "ok", "message": "Server is running"} # Slack configuration SLACK_VERIFICATION_TOKEN = os.getenv("SLACK_VERIFICATION_TOKEN") ALLOWED_SLACK_USER_ID = os.getenv("YOUR_SLACK_USER_ID") # Load KB with open("kb.json") as f: kb = json.load(f) # Build system prompt system_prompt = "You are a helpful assistant. Only answer questions based on the following knowledge base:\n\n" for q, a in kb.items(): system_prompt += f"Q: {q}\nA: {a}\n\n" system_prompt += "If the question is not in the knowledge base, respond with: 'I'm not sure about that. Let me connect you with a human agent.'" # OpenAI setup client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) client.base_url = os.getenv("OPENAI_API_BASE", "https://api.openai.com/v1") # Chatwoot config CHATWOOT_BASE_URL = os.getenv("CHATWOOT_BASE_URL") CHATWOOT_API_KEY = os.getenv("CHATWOOT_API_KEY") CHATWOOT_ACCOUNT_ID = int(os.getenv("CHATWOOT_ACCOUNT_ID")) # e.g., 123911 # Track conversations where AI should stop replying stop_reply_conversations = set() @app.post("/ask") async def ask(request: Request): print("๐Ÿ”” /ask endpoint was HIT") # <-- Log immediately on hit try: payload = await request.json() print("๐Ÿ“ฅ Incoming payload:", json.dumps(payload, indent=2)) except Exception as e: print("โŒ Failed to parse JSON payload:", e) return {"status": "error", "detail": "Invalid JSON"} account_id = payload.get("account", {}).get("id") conversation_id = str(payload.get("conversation", {}).get("id")) sender = payload.get("sender") or {} sender_id = sender.get("id") sender_role = (sender.get("role") or "").lower() message_type = payload.get("message_type", "").lower() message_content = payload.get("content", "").strip() print(f"๐Ÿงพ sender_id: {sender_id}, sender_role: {sender_role}, account_id: {account_id}") # Step 1: Detect agent message via Slack and disable AI for that conversation if message_type != "incoming": messages = payload.get("conversation", {}).get("messages", []) if messages: msg = messages[0] external_ids = msg.get("external_source_ids", {}) if "slack" in external_ids: stop_reply_conversations.add(conversation_id) print(f"๐Ÿ›‘ Human intervened via Slack in conversation {conversation_id}. Disabling AI.") return {"status": "AI disabled due to Slack intervention"} print("โš ๏ธ Ignoring non-incoming message") return {"status": "ignored"} # Bot must not reply to itself if sender_id == account_id: print("โš ๏ธ Ignoring bot's own message") return {"status": "ignored"} # Handle special bot resume command if sender_role == "agent" and message_content.lower() == "#botresume": stop_reply_conversations.discard(conversation_id) print(f"โ„น๏ธ Bot resumed for conversation {conversation_id}") await send_chatwoot_message(conversation_id, "Bot resumed and will reply to users now.") return {"status": "bot resumed"} # Check if AI is blacklisted for this conversation if conversation_id in stop_reply_conversations: print(f"๐Ÿšซ AI is disabled for conversation {conversation_id}") return {"status": "ignored: human takeover"} # Ensure all data is present if not message_content or not conversation_id: print("โŒ Missing content or conversation ID") return {"status": "invalid payload"} # Build messages for GPT messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": message_content}, ] try: response = client.chat.completions.create( model="deepseek-ai/DeepSeek-V3", messages=messages, temperature=0, max_tokens=200, ) answer = response.choices[0].message.content.strip() print("โœ… GPT Answer:", answer) # โœ… Extract and send token usage to Slack with conversation history usage = response.usage if usage: prompt_tokens = usage.prompt_tokens completion_tokens = usage.completion_tokens total_tokens = usage.total_tokens # Get conversation history (last 3 messages) messages = payload.get('conversation', {}).get('messages', [])[-6:] # Get last 3 exchanges (6 messages) conversation_history = [] for msg in messages: role = "๐Ÿ‘ค User" if msg.get('message_type') == 'incoming' else "๐Ÿค– AI" content = msg.get('content', '').strip() if content: conversation_history.append(f"{role}: {content}") # Add current exchange conversation_history.append(f"๐Ÿ‘ค User: {message_content}") conversation_history.append(f"๐Ÿค– AI: {answer}") # Format conversation history with code blocks for better readability formatted_history = "\n".join(conversation_history) slack_msg = ( f"๐Ÿ’ฌ *Conversation Update* - `{conversation_id}`\n\n" f"*๐Ÿ” Conversation History:*\n```\n{formatted_history}\n```\n\n" f"*โšก Token Usage:*\n```\n" f"Prompt: {prompt_tokens} tokens\n" f"Completion: {completion_tokens} tokens\n" f"Total: {total_tokens} tokens\n" f"Model: {response.model or 'N/A'}\n" f"```" ) await send_to_slack(slack_msg) else: print("โš ๏ธ No token usage info returned from API") except Exception as e: print("โŒ OpenAI Error:", e) answer = "Sorry, I'm having trouble answering right now." if answer == "I'm not sure about that. Let me connect you with a human agent.": stop_reply_conversations.add(conversation_id) print(f"๐Ÿšซ Fallback answer, disabling AI for conversation {conversation_id}") await send_chatwoot_message(conversation_id, answer) return {"status": "ok"} async def send_to_slack(message: str): webhook_url = os.getenv("SLACK_WEBHOOK_URL") if not webhook_url: print("โŒ SLACK_WEBHOOK_URL not set") return payload = {"text": message} try: async with httpx.AsyncClient() as http: resp = await http.post(webhook_url, json=payload) print("๐Ÿ“จ Slack response:", resp.status_code, resp.text) except Exception as e: print("โŒ Slack Send Error:", e) async def get_chatwoot_conversation(conversation_id: int) -> Optional[dict]: """Fetch conversation details and messages from Chatwoot""" print(f"\n๐Ÿ” Starting to fetch conversation {conversation_id}") print(f"Using base URL: {CHATWOOT_BASE_URL}") print(f"Account ID: {CHATWOOT_ACCOUNT_ID}") try: headers = { "api_access_token": CHATWOOT_API_KEY[:5] + "..." if CHATWOOT_API_KEY else "None", # Log first 5 chars of token for security "Content-Type": "application/json", "Accept": "application/json" } print(f"Request headers: {headers}") async with httpx.AsyncClient(timeout=30.0) as http: # Get conversation messages directly since that's what we need msgs_url = f"{CHATWOOT_BASE_URL}/api/v1/accounts/{CHATWOOT_ACCOUNT_ID}/conversations/{conversation_id}/messages" print(f"\n๐ŸŒ Making request to: {msgs_url}") print(f"Method: GET") print(f"Full URL: {msgs_url}") print(f"Headers: {headers}") try: msgs_resp = await http.get(msgs_url, headers=headers) print(f"\nโœ… Response received") print(f"Status code: {msgs_resp.status_code}") print(f"Response headers: {dict(msgs_resp.headers)}") # Log response body (truncated if too long) response_text = msgs_resp.text print(f"Response length: {len(response_text)} characters") print(f"First 500 chars of response: {response_text[:500]}...") msgs_resp.raise_for_status() # Parse the JSON response response_data = msgs_resp.json() print(f"๐Ÿ“Š Response data type: {type(response_data).__name__}") # Handle different response formats if isinstance(response_data, dict): # If the response has a 'payload' key, use that messages = response_data.get('payload', []) meta = response_data.get('meta', {}) elif isinstance(response_data, list): # If the response is directly a list of messages messages = response_data meta = {} else: print(f"โš ๏ธ Unexpected response format: {type(response_data)}") return None if not isinstance(messages, list): print(f"โš ๏ธ Messages is not a list: {type(messages)}") return None print(f"\n๐Ÿ“ฉ Successfully parsed {len(messages)} messages") if messages: first_msg = messages[0] print(f"First message type: {type(first_msg).__name__}") print(f"First message keys: {list(first_msg.keys()) if hasattr(first_msg, 'keys') else 'N/A'}") # Process and filter messages filtered_messages = [] for msg in messages: try: # Skip if no content or sender info if not msg.get('content') or not isinstance(msg.get('content'), str): continue # Get sender info, handle different formats sender = msg.get('sender', {}) sender_type = None sender_name = 'Unknown' if isinstance(sender, dict): sender_type = sender.get('type') sender_name = sender.get('name') or sender.get('available_name', 'Unknown') # Only include relevant message types if sender_type not in ['contact', 'user', 'agent']: continue # Create the filtered message filtered_msg = { 'id': msg.get('id'), 'content': msg['content'], 'created_at': msg.get('created_at'), 'sender': { 'type': sender_type, 'name': sender_name }, 'message_type': 'incoming' if msg.get('message_type') == 0 else 'outgoing', 'timestamp': msg.get('created_at') } filtered_messages.append(filtered_msg) except Exception as e: print(f"โš ๏ธ Error processing message {msg.get('id')}: {str(e)}") continue print(f"โœ… Filtered to {len(filtered_messages)} relevant messages") # Sort messages by timestamp if available filtered_messages.sort(key=lambda x: x.get('timestamp', 0)) return { 'meta': meta, 'payload': filtered_messages } except httpx.HTTPStatusError as e: print(f"\nโŒ HTTP Error: {e}") print(f"Response: {e.response.text}") return None except Exception as e: import traceback print(f"\nโŒ Unexpected error: {str(e)}") print("Stack trace:") print(traceback.format_exc()) return None def format_slack_message(conversation_data): """Format conversation data into Slack blocks""" if not conversation_data: return {"response_type": "ephemeral", "text": "โŒ Error: No conversation data received"} try: # Get messages from the payload messages = conversation_data.get("payload", []) if not messages: return {"response_type": "ephemeral", "text": "โ„น๏ธ No messages found in this conversation"} # Create blocks for the message blocks = [ { "type": "header", "text": { "type": "plain_text", "text": f"๐Ÿ’ฌ Conversation #{messages[0].get('conversation_id', 'N/A')}", "emoji": True } }, { "type": "divider" } ] # Add messages for msg in sorted(messages, key=lambda x: x.get('created_at', 0)): content = msg.get("content", "") if not content or not isinstance(content, str): continue # Determine sender info sender_data = msg.get("sender", {}) if msg.get("message_type") == 0: # incoming message sender_name = sender_data.get("name", "User") sender_emoji = "๐Ÿ‘ค" else: # outgoing message sender_name = sender_data.get("available_name", sender_data.get("name", "Bot")) sender_emoji = "๐Ÿค–" # Format timestamp if available timestamp = msg.get("created_at") if timestamp and isinstance(timestamp, (int, float)): from datetime import datetime try: dt = datetime.fromtimestamp(timestamp) time_str = dt.strftime("%b %d, %H:%M") time_display = f"`{time_str}`" except: time_display = "" else: time_display = "" # Add message block blocks.append({ "type": "section", "text": { "type": "mrkdwn", "text": f"{sender_emoji} *{sender_name}* {time_display}\n{content}" } }) blocks.append({"type": "divider"}) return { "response_type": "in_channel", "blocks": blocks } except Exception as e: import traceback print(f"Error formatting Slack message: {e}") print(traceback.format_exc()) return { "response_type": "ephemeral", "text": f"โŒ Error formatting conversation: {str(e)}" } return { "response_type": "in_channel", "blocks": blocks } @app.post(f"{BASE_PATH}/slack/command") async def slack_command( token: str = Form(...), command: str = Form(...), text: str = Form(...), response_url: str = Form(...), user_id: str = Form(...) ): """Handle Slack slash command to fetch Chatwoot conversation""" # Verify the Slack token and user if token != SLACK_VERIFICATION_TOKEN: raise HTTPException(status_code=401, detail="Invalid token") # Check if the user is authorized if user_id != ALLOWED_SLACK_USER_ID: return { "response_type": "ephemeral", "text": "โ›” This command is restricted to authorized users only." } # Extract conversation ID from command text conversation_id = text.strip() if not conversation_id.isdigit(): return { "response_type": "ephemeral", "text": "โŒ Please provide a valid conversation ID. Usage: `/conversation `" } # Send immediate response (Slack requires response within 3 seconds) response = { "response_type": "ephemeral", "text": f"Fetching conversation {conversation_id}..." } # Fetch conversation data in the background import asyncio asyncio.create_task(_process_slack_command(conversation_id, response_url)) return response async def _process_slack_command(conversation_id: str, response_url: str): """Process the Slack command asynchronously""" try: # Fetch conversation data conversation_data = await get_chatwoot_conversation(conversation_id) if not conversation_data: raise Exception("Failed to fetch conversation data") # Format the response formatted_response = format_slack_message(conversation_data) # Send the formatted response to Slack async with httpx.AsyncClient() as http: await http.post( response_url, json=formatted_response, headers={"Content-Type": "application/json"} ) except Exception as e: error_msg = { "response_type": "ephemeral", "text": f"โŒ Error fetching conversation: {str(e)}" } async with httpx.AsyncClient() as http: await http.post( response_url, json=error_msg, headers={"Content-Type": "application/json"} ) async def send_chatwoot_message(conversation_id: str, content: str): """Send a message to a Chatwoot conversation""" message_payload = { "content": content, "message_type": "outgoing", "private": False, "content_type": "text", "content_attributes": {} } try: url = f"{CHATWOOT_BASE_URL}/api/v1/accounts/{CHATWOOT_ACCOUNT_ID}/conversations/{conversation_id}/messages" headers = { "Content-Type": "application/json", "api_access_token": CHATWOOT_API_KEY, } print(f"๐Ÿ“ค Sending to Chatwoot: {url}") print(f"๐Ÿ”‘ Using API key: {CHATWOOT_API_KEY[:5]}..." if CHATWOOT_API_KEY else "โŒ No API key provided!") print("๐Ÿ“ฆ Payload:", json.dumps(message_payload, indent=2)) print("๐Ÿ”‘ Headers:", json.dumps({k: '***' if 'token' in k.lower() or 'key' in k.lower() else v for k, v in headers.items()}, indent=2)) async with httpx.AsyncClient() as http: # First, try to get the conversation to verify access conv_url = f"{CHATWOOT_BASE_URL}/api/v1/accounts/{CHATWOOT_ACCOUNT_ID}/conversations/{conversation_id}" print(f"๐Ÿ” Checking conversation access: {conv_url}") # Test GET request to verify conversation access test_resp = await http.get(conv_url, headers=headers) print(f"๐Ÿ” Conversation check status: {test_resp.status_code}") print(f"๐Ÿ” Response: {test_resp.text[:500]}..." if test_resp.text else "No response body") if test_resp.status_code != 200: error_msg = f"Failed to access conversation: {test_resp.status_code} - {test_resp.text}" print(f"โŒ {error_msg}") return {"status": "error", "message": error_msg} # If we can access the conversation, try to send the message print("โœ… Conversation access verified. Sending message...") resp = await http.post( url, headers=headers, json=message_payload ) print(f"๐Ÿ“ฌ Chatwoot Response Status: {resp.status_code}") print(f"๐Ÿ“ฌ Response Body: {resp.text}") if resp.status_code != 200: error_msg = f"Failed to send message: {resp.status_code} - {resp.text}" print(f"โŒ {error_msg}") return {"status": "error", "message": error_msg} return {"status": "success", "data": resp.json()} except httpx.HTTPStatusError as e: error_msg = f"HTTP error occurred: {str(e)}" print(f"โŒ {error_msg}") print(f"Response: {e.response.text if hasattr(e, 'response') else 'No response'}") return {"status": "error", "message": error_msg} except Exception as e: print("โŒ Chatwoot Send Error:", e)