Spaces:
Sleeping
Sleeping
import os | |
import json | |
import openai | |
from fastapi import FastAPI, Request | |
app = FastAPI() | |
# Load knowledge base | |
with open("kb.json") as f: | |
kb = json.load(f) | |
system_prompt = "You are a helpful assistant. Only answer questions based on the following knowledge base:\n\n" | |
for question, answer in kb.items(): | |
system_prompt += f"Q: {question}\nA: {answer}\n\n" | |
system_prompt += ( | |
"If the question is not in the knowledge base, respond with: " | |
"'I'm not sure about that. Let me connect you with a human agent.'" | |
) | |
openai.api_key = os.getenv("OPENAI_API_KEY") | |
openai.api_base = os.getenv("OPENAI_API_BASE", "https://fast.typegpt.net/v1") | |
async def ask(request: Request): | |
data = await request.json() | |
user_question = data.get("content") or data.get("message") or "" | |
if not user_question: | |
return {"reply": "Please ask a question."} | |
messages = [ | |
{"role": "system", "content": system_prompt}, | |
{"role": "user", "content": user_question}, | |
] | |
try: | |
response = openai.ChatCompletion.create( | |
model="gpt-4o-mini", | |
messages=messages, | |
temperature=0.0, | |
max_tokens=200, | |
) | |
answer = response.choices[0].message.content.strip() | |
except Exception as e: | |
print(f"Error calling OpenAI: {e}") | |
answer = "Sorry, I'm having trouble answering right now." | |
return {"reply": answer} |