eve / app.py
Chandima Prabhath
Update FUNCTION_SCHEMA to include width and height parameters for generate_image; enhance user guidance in route_intent function.
869bca1
raw
history blame
19.9 kB
import os
import threading
import requests
import logging
import queue
import json
import random
from collections import defaultdict, deque
from concurrent.futures import ThreadPoolExecutor
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import PlainTextResponse
from FLUX import generate_image
from VoiceReply import generate_voice_reply
from polLLM import generate_llm
# --- Logging Setup ---------------------------------------------------------
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
logger = logging.getLogger("eve_bot")
logger.setLevel(LOG_LEVEL)
handler = logging.StreamHandler()
formatter = logging.Formatter(
"%(asctime)s [%(levelname)s] [%(message_id)s/%(sender)s] %(message)s"
)
handler.setFormatter(formatter)
class ContextFilter(logging.Filter):
def filter(self, record):
record.message_id = getattr(record, "message_id", "-")
record.sender = getattr(record, "sender", "-")
return True
handler.addFilter(ContextFilter())
logger.handlers = [handler]
# Thread‐local to carry context through helpers
_thread_ctx = threading.local()
def set_thread_context(chat_id, sender, message_id):
_thread_ctx.chat_id = chat_id
_thread_ctx.sender = sender
_thread_ctx.message_id = message_id
def get_thread_context():
return (
getattr(_thread_ctx, "chat_id", None),
getattr(_thread_ctx, "sender", None),
getattr(_thread_ctx, "message_id", None),
)
# --- Conversation History -------------------------------------------------
# keep last 20 messages per (chat_id, sender)
history = defaultdict(lambda: deque(maxlen=20))
def record_user_message(chat_id, sender, message):
history[(chat_id, sender)].append(f"User: {message}")
def record_bot_message(chat_id, sender, message):
history[(chat_id, sender)].append(f"Assistant: {message}")
def get_history_text(chat_id, sender):
return "\n".join(history[(chat_id, sender)])
# --- Bot Config & Client --------------------------------------------------
class BotConfig:
GREEN_API_URL = os.getenv("GREEN_API_URL")
GREEN_API_MEDIA_URL = os.getenv("GREEN_API_MEDIA_URL", "https://api.green-api.com")
GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN")
GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE")
WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN")
BOT_GROUP_CHAT = "[email protected]"
BOT_JID = os.getenv("BOT_JID")
IMAGE_DIR = "/tmp/images"
AUDIO_DIR = "/tmp/audio"
DEFAULT_IMAGE_COUNT = 4
@classmethod
def validate(cls):
missing = [n for n in (
"GREEN_API_URL","GREEN_API_TOKEN",
"GREEN_API_ID_INSTANCE","WEBHOOK_AUTH_TOKEN","BOT_JID"
) if not getattr(cls, n)]
if missing:
raise ValueError(f"Missing env vars: {', '.join(missing)}")
class BotClient:
def __init__(self, cfg: BotConfig):
self.cfg = cfg
self.session = requests.Session()
def send(self, endpoint, payload, files=None, retries=3):
url = (
f"{self.cfg.GREEN_API_URL}/waInstance"
f"{self.cfg.GREEN_API_ID_INSTANCE}/{endpoint}/"
f"{self.cfg.GREEN_API_TOKEN}"
)
for i in range(1, retries+1):
try:
resp = self.session.post(
url,
json=payload if files is None else None,
data=None if files is None else payload,
files=files
)
resp.raise_for_status()
return resp.json()
except requests.RequestException as e:
logger.warning(f"{endpoint} attempt {i}/{retries} failed: {e}")
return {"error":"failed"}
def send_message(self, message_id, chat_id, text):
return self.send("sendMessage", {
"chatId": chat_id,
"message": text,
"quotedMessageId": message_id
})
def send_message_to(self, chat_id, text):
return self.send("sendMessage", {
"chatId": chat_id,
"message": text
})
def send_media(self, message_id, chat_id, file_path, caption, media_type):
endpoint = "sendFileByUpload"
payload = {
"chatId": chat_id,
"caption": caption,
"quotedMessageId": message_id
}
with open(file_path,"rb") as f:
mime = "image/jpeg" if media_type=="image" else "audio/mpeg"
files = [("file",(os.path.basename(file_path),f,mime))]
return self.send(endpoint, payload, files=files)
# Validate env & init client
BotConfig.validate()
client = BotClient(BotConfig)
# --- Threading & Queues ---------------------------------------------------
task_queue = queue.Queue()
polls = {}
executor = ThreadPoolExecutor(max_workers=4)
def worker():
while True:
task = task_queue.get()
try:
if task["type"] == "image":
_fn_generate_images(
task["message_id"],
task["chat_id"],
task["prompt"],
task.get("num_images", 1),
task.get("width"),
task.get("height")
)
elif task["type"] == "audio":
_fn_voice_reply(
task["message_id"],
task["chat_id"],
task["prompt"]
)
except Exception as e:
logger.error(f"Worker error {task}: {e}")
finally:
task_queue.task_done()
for _ in range(4):
threading.Thread(target=worker, daemon=True).start()
# --- Basic Tool Functions -------------------------------------------------
def _fn_send_text(mid, cid, message):
"""Send text + record + queue voice."""
client.send_message(mid, cid, message)
chat_id, sender, _ = get_thread_context()
if chat_id and sender:
record_bot_message(chat_id, sender, message)
task_queue.put({
"type": "audio",
"message_id": mid,
"chat_id": cid,
"prompt": message
})
def _fn_send_accept(mid, cid, message):
"""Send text + record, but no voice."""
client.send_message(mid, cid, message)
chat_id, sender, _ = get_thread_context()
if chat_id and sender:
record_bot_message(chat_id, sender, message)
def _fn_summarize(mid, cid, text):
summary = generate_llm(f"Summarize:\n\n{text}")
_fn_send_text(mid, cid, summary)
def _fn_translate(mid, cid, lang, text):
resp = generate_llm(f"Translate to {lang}:\n\n{text}")
_fn_send_text(mid, cid, resp)
def _fn_joke(mid, cid):
try:
j = requests.get(
"https://official-joke-api.appspot.com/random_joke",
timeout=5
).json()
joke = f"{j['setup']}\n\n{j['punchline']}"
except:
joke = generate_llm("Tell me a short joke.")
_fn_send_text(mid, cid, joke)
def _fn_weather(mid, cid, loc):
raw = requests.get(f"http://sl.wttr.in/{loc}?format=4", timeout=5).text
report = generate_llm(f"Give a weather report in °C:\n\n{raw}")
_fn_send_text(mid, cid, report)
def _fn_inspire(mid, cid):
quote = generate_llm("Give me a unique, random short inspirational quote.")
_fn_send_text(mid, cid, f"✨ {quote}")
def _fn_meme(mid, cid, txt):
_fn_send_accept(mid, cid, "🎨 Generating meme…")
task_queue.put({
"type": "image",
"message_id": mid,
"chat_id": cid,
"prompt": f"meme: {txt}"
})
def _fn_poll_create(mid, cid, question, options):
votes = {i+1:0 for i in range(len(options))}
polls[cid] = {"question": question, "options": options, "votes": votes, "voters": {}}
text = f"📊 *Poll:* {question}\n" + "\n".join(f"{i+1}. {o}" for i,o in enumerate(options))
_fn_send_text(mid, cid, text)
def _fn_poll_vote(mid, cid, voter, choice):
poll = polls.get(cid)
if not poll or choice<1 or choice>len(poll["options"]):
return
prev = poll["voters"].get(voter)
if prev:
poll["votes"][prev] -= 1
poll["votes"][choice] += 1
poll["voters"][voter] = choice
_fn_send_text(mid, cid, f"✅ Voted for {poll['options'][choice-1]}")
def _fn_poll_results(mid, cid):
poll = polls.get(cid)
if not poll:
_fn_send_text(mid, cid, "No active poll.")
return
txt = f"📊 *Results:* {poll['question']}\n" + "\n".join(
f"{i}. {o}: {poll['votes'][i]}" for i,o in enumerate(poll["options"],1)
)
_fn_send_text(mid, cid, txt)
def _fn_poll_end(mid, cid):
poll = polls.pop(cid, None)
if not poll:
_fn_send_text(mid, cid, "No active poll.")
return
txt = f"📊 *Final Results:* {poll['question']}\n" + "\n".join(
f"{i}. {o}: {poll['votes'][i]}" for i,o in enumerate(poll["options"],1)
)
_fn_send_text(mid, cid, txt)
def _fn_generate_images(mid, cid, prompt, count=1, width=None, height=None):
for i in range(1, count+1):
try:
img, path, ret_p, url = generate_image(
prompt, mid, mid, BotConfig.IMAGE_DIR,
width=width, height=height
)
formatted = "\n\n".join(f"_{p.strip()}_" for p in ret_p.split("\n\n") if p.strip())
cap = f"✨ Image {i}/{count}: {url}\n>{chr(8203)} {formatted}"
client.send_media(mid, cid, path, cap, media_type="image")
os.remove(path)
except Exception as e:
logger.warning(f"Img {i}/{count} failed: {e}")
_fn_send_text(mid, cid, f"😢 Failed to generate image {i}/{count}.")
def _fn_voice_reply(mid, cid, prompt):
proc = (
f"Just say this exactly as written in a flirty, friendly, playful, "
f"happy and helpful but a little bit clumsy-cute way: {prompt}"
)
res = generate_voice_reply(proc, model="openai-audio", voice="coral", audio_dir=BotConfig.AUDIO_DIR)
if res and res[0]:
path, _ = res
client.send_media(mid, cid, path, "", media_type="audio")
os.remove(path)
else:
_fn_send_text(mid, cid, prompt)
# --- Intent Dispatcher ----------------------------------------------------
FUNCTION_SCHEMA = {
"summarize": {"description":"Summarize text", "params":["text"]},
"translate": {"description":"Translate text", "params":["lang","text"]},
"joke": {"description":"Tell a joke", "params":[]},
"weather": {"description":"Weather report", "params":["location"]},
"inspire": {"description":"Inspirational quote","params":[]},
"meme": {"description":"Generate meme", "params":["text"]},
"poll_create": {"description":"Create poll", "params":["question","options"]},
"poll_vote": {"description":"Vote poll", "params":["choice"]},
"poll_results": {"description":"Show poll results", "params":[]},
"poll_end": {"description":"End poll", "params":[]},
"generate_image": {
"description":"Generate images",
"params":["prompt","count","width","height"]
},
"send_text": {"description":"Send plain text", "params":["message"]}}
class IntentDispatcher:
def __init__(self):
self.handlers = {}
def register(self, action):
def decorator(fn):
self.handlers[action] = fn
return fn
return decorator
def dispatch(self, action, mid, cid, intent):
fn = self.handlers.get(action)
if not fn:
return False
fn(mid, cid, intent)
return True
dispatcher = IntentDispatcher()
def validate_intent(action, intent):
schema = FUNCTION_SCHEMA.get(action)
if not schema:
return False
for p in schema["params"]:
if p not in intent:
logger.warning(f"Missing param '{p}' for action '{action}'")
return False
return True
@dispatcher.register("summarize")
def _h_summarize(mid, cid, intent):
_fn_summarize(mid, cid, intent["text"])
@dispatcher.register("translate")
def _h_translate(mid, cid, intent):
_fn_translate(mid, cid, intent["lang"], intent["text"])
@dispatcher.register("joke")
def _h_joke(mid, cid, intent):
_fn_joke(mid, cid)
@dispatcher.register("weather")
def _h_weather(mid, cid, intent):
_fn_weather(mid, cid, intent["location"])
@dispatcher.register("inspire")
def _h_inspire(mid, cid, intent):
_fn_inspire(mid, cid)
@dispatcher.register("meme")
def _h_meme(mid, cid, intent):
_fn_meme(mid, cid, intent["text"])
@dispatcher.register("poll_create")
def _h_poll_create(mid, cid, intent):
_fn_poll_create(mid, cid, intent["question"], intent["options"])
@dispatcher.register("poll_vote")
def _h_poll_vote(mid, cid, intent):
_fn_poll_vote(mid, cid, intent["voter"], intent["choice"])
@dispatcher.register("poll_results")
def _h_poll_results(mid, cid, intent):
_fn_poll_results(mid, cid)
@dispatcher.register("poll_end")
def _h_poll_end(mid, cid, intent):
_fn_poll_end(mid, cid)
@dispatcher.register("generate_image")
def _h_generate_image(mid, cid, intent):
prompt = intent["prompt"]
count = intent.get("count", 1)
width = intent.get("width")
height = intent.get("height")
_fn_send_accept(mid, cid, f"✨ Generating {count} image(s)…")
task_queue.put({
"type": "image",
"message_id": mid,
"chat_id": cid,
"prompt": prompt,
"num_images": count,
"width": width,
"height": height
})
@dispatcher.register("send_text")
def _h_send_text(mid, cid, intent):
_fn_send_text(mid, cid, intent["message"])
# --- Intent Routing --------------------------------------------------------
def route_intent(user_input, chat_id, sender):
history_text = get_history_text(chat_id, sender)
sys_prompt = (
"You are Eve. You can either chat or call one of these functions:\n"
+ "\n".join(f"- {n}: {f['description']}" for n,f in FUNCTION_SCHEMA.items())
+ "\n\nTo call a function, return JSON with \"action\":\"<name>\", plus its parameters.\n"
+ "Here’s an example for generating images:\n"
+ " {\"action\":\"generate_image\",\"prompt\":\"a red fox\",\"count\":3,\"width\":512,\"height\":512}\n"
+ "Otherwise return JSON with \"action\":\"send_text\",\"message\":\"...\".\n"
"Return only raw JSON."
)
prompt = (
f"{sys_prompt}\n\n"
f"Conversation so far:\n{history_text}\n\n"
f"User: {user_input}"
)
raw = generate_llm(prompt)
try:
return json.loads(raw)
except:
return {"action":"send_text","message":raw}
# --- FastAPI & Webhook ----------------------------------------------------
app = FastAPI()
help_text = (
"🤖 *Eve* commands:\n"
"• /help\n"
"• /summarize <text>\n"
"• /translate <lang>|<text>\n"
"• /joke\n"
"• /weather <loc>\n"
"• /inspire\n"
"• /meme <text>\n"
"• /poll <Q>|… / /results / /endpoll\n"
"• /gen <prompt>|<count>|<width>|<height>\n"
"Otherwise chat or reply to my message to invoke tools."
)
@app.post("/whatsapp")
async def whatsapp_webhook(request: Request):
data = await request.json()
if request.headers.get("Authorization") != f"Bearer {BotConfig.WEBHOOK_AUTH_TOKEN}":
raise HTTPException(403, "Unauthorized")
chat_id = data["senderData"]["chatId"]
sender = data["senderData"]["sender"]
mid = data["idMessage"]
set_thread_context(chat_id, sender, mid)
logger.debug("Received webhook")
if chat_id != BotConfig.BOT_GROUP_CHAT or data["typeWebhook"] != "incomingMessageReceived":
return {"success": True}
md = data["messageData"]
tmd = md.get("textMessageData") or md.get("extendedTextMessageData")
if not tmd:
return {"success": True}
body = (tmd.get("textMessage") or tmd.get("text","")).strip()
ctx = tmd.get("contextInfo", {})
# record user message
record_user_message(chat_id, sender, body)
# Slash commands
low = body.lower()
if low == "/help":
_fn_send_text(mid, chat_id, help_text)
return {"success": True}
if low.startswith("/summarize "):
_fn_summarize(mid, chat_id, body[11:].strip())
return {"success": True}
if low.startswith("/translate "):
lang, txt = body[11:].split("|", 1)
_fn_translate(mid, chat_id, lang.strip(), txt.strip())
return {"success": True}
if low == "/joke":
_fn_joke(mid, chat_id)
return {"success": True}
if low.startswith("/weather "):
_fn_weather(mid, chat_id, body[9:].strip().replace(" ","+"))
return {"success": True}
if low == "/inspire":
_fn_inspire(mid, chat_id)
return {"success": True}
if low.startswith("/meme "):
_fn_meme(mid, chat_id, body[6:].strip())
return {"success": True}
if low.startswith("/poll "):
parts = [p.strip() for p in body[6:].split("|")]
_fn_poll_create(mid, chat_id, parts[0], parts[1:])
return {"success": True}
if chat_id in polls and low.isdigit():
_fn_poll_vote(mid, chat_id, sender, int(low))
return {"success": True}
if low == "/results":
_fn_poll_results(mid, chat_id)
return {"success": True}
if low == "/endpoll":
_fn_poll_end(mid, chat_id)
return {"success": True}
if low.startswith("/gen"):
parts = body[4:].split("|")
pr = parts[0].strip()
ct = int(parts[1]) if len(parts) > 1 and parts[1].isdigit() else BotConfig.DEFAULT_IMAGE_COUNT
width = int(parts[2]) if len(parts) > 2 and parts[2].isdigit() else None
height = int(parts[3]) if len(parts) > 3 and parts[3].isdigit() else None
_fn_send_accept(mid, chat_id, f"✨ Generating {ct} image(s)…")
task_queue.put({
"type": "image",
"message_id": mid,
"chat_id": chat_id,
"prompt": pr,
"num_images": ct,
"width": width,
"height": height
})
return {"success": True}
# Skip mentions
if ctx.get("mentionedJidList"):
return {"success": True}
# Build effective text (handle quoted replies to the bot)
if md.get("typeMessage") == "quotedMessage":
ext = md["extendedTextMessageData"]
quoted = md["quotedMessage"]
if ext.get("participant") == BotConfig.BOT_JID:
effective = (
f"Quoted: {quoted.get('textMessage','')}\n"
f"User: {ext.get('text','')}"
)
else:
effective = body
else:
effective = body
# Route intent & dispatch
intent = route_intent(effective, chat_id, sender)
action = intent.get("action")
if action in FUNCTION_SCHEMA:
if not validate_intent(action, intent):
_fn_send_text(mid, chat_id, f"❗ Missing parameter(s) for `{action}`.")
else:
dispatched = dispatcher.dispatch(action, mid, chat_id, intent)
if not dispatched:
_fn_send_text(mid, chat_id, intent.get("message","Sorry, I couldn't handle that."))
else:
# fallback chat
_fn_send_text(mid, chat_id, intent.get("message","Sorry, I didn't get that."))
return {"success": True}
@app.get("/", response_class=PlainTextResponse)
def index():
return "Server is running!"
if __name__ == "__main__":
client.send_message_to(
BotConfig.BOT_GROUP_CHAT,
"🌟 Eve is online! Type /help to see commands."
)
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)