import os import threading import requests import logging import queue import json from typing import List, Optional, Union, Literal from collections import defaultdict, deque from concurrent.futures import ThreadPoolExecutor from fastapi import FastAPI, Request, HTTPException from fastapi.responses import PlainTextResponse from pydantic import BaseModel, Field, ValidationError from FLUX import generate_image from VoiceReply import generate_voice_reply from polLLM import generate_llm # --- Logging Setup --------------------------------------------------------- LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() logger = logging.getLogger("eve_bot") logger.setLevel(LOG_LEVEL) handler = logging.StreamHandler() formatter = logging.Formatter( "%(asctime)s [%(levelname)s] [%(message_id)s/%(sender)s] %(message)s" ) handler.setFormatter(formatter) class ContextFilter(logging.Filter): def filter(self, record): record.message_id = getattr(record, "message_id", "-") record.sender = getattr(record, "sender", "-") return True handler.addFilter(ContextFilter()) logger.handlers = [handler] # Thread‐local to carry context through helpers _thread_ctx = threading.local() def set_thread_context(chat_id, sender, message_id): _thread_ctx.chat_id = chat_id _thread_ctx.sender = sender _thread_ctx.message_id = message_id def get_thread_context(): return ( getattr(_thread_ctx, "chat_id", None), getattr(_thread_ctx, "sender", None), getattr(_thread_ctx, "message_id", None), ) # --- Conversation History ------------------------------------------------- history = defaultdict(lambda: deque(maxlen=20)) def record_user_message(chat_id, sender, message): history[(chat_id, sender)].append(f"User: {message}") def record_bot_message(chat_id, sender, message): history[(chat_id, sender)].append(f"Assistant: {message}") def get_history_text(chat_id, sender): return "\n".join(history[(chat_id, sender)]) # --- Bot Config & Client -------------------------------------------------- class BotConfig: GREEN_API_URL = os.getenv("GREEN_API_URL") GREEN_API_MEDIA_URL = os.getenv("GREEN_API_MEDIA_URL", "https://api.green-api.com") GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN") GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE") WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN") BOT_GROUP_CHAT = "120363312903494448@g.us" BOT_JID = os.getenv("BOT_JID") IMAGE_DIR = "/tmp/images" AUDIO_DIR = "/tmp/audio" DEFAULT_IMAGE_COUNT = 4 @classmethod def validate(cls): missing = [n for n in ( "GREEN_API_URL","GREEN_API_TOKEN", "GREEN_API_ID_INSTANCE","WEBHOOK_AUTH_TOKEN","BOT_JID" ) if not getattr(cls, n)] if missing: raise ValueError(f"Missing env vars: {', '.join(missing)}") class BotClient: def __init__(self, cfg: BotConfig): self.cfg = cfg self.session = requests.Session() def send(self, endpoint, payload, files=None, retries=3): url = ( f"{self.cfg.GREEN_API_URL}/waInstance" f"{self.cfg.GREEN_API_ID_INSTANCE}/{endpoint}/" f"{self.cfg.GREEN_API_TOKEN}" ) for i in range(1, retries+1): try: resp = self.session.post( url, json=payload if files is None else None, data=None if files is None else payload, files=files ) resp.raise_for_status() return resp.json() except requests.RequestException as e: logger.warning(f"{endpoint} attempt {i}/{retries} failed: {e}") return {"error":"failed"} def send_message(self, message_id, chat_id, text): return self.send("sendMessage", { "chatId": chat_id, "message": text, "quotedMessageId": message_id }) def send_message_to(self, chat_id, text): return self.send("sendMessage", { "chatId": chat_id, "message": text }) def send_media(self, message_id, chat_id, file_path, caption, media_type): endpoint = "sendFileByUpload" payload = { "chatId": chat_id, "caption": caption, "quotedMessageId": message_id } with open(file_path,"rb") as f: mime = "image/jpeg" if media_type=="image" else "audio/mpeg" files = [("file",(os.path.basename(file_path),f,mime))] return self.send(endpoint, payload, files=files) BotConfig.validate() client = BotClient(BotConfig) # --- Threading & Queues --------------------------------------------------- task_queue = queue.Queue() polls = {} executor = ThreadPoolExecutor(max_workers=4) def worker(): while True: task = task_queue.get() try: if task["type"] == "image": _fn_generate_images(**task) elif task["type"] == "audio": _fn_voice_reply(**task) except Exception as e: logger.error(f"Worker error {task}: {e}") finally: task_queue.task_done() for _ in range(4): threading.Thread(target=worker, daemon=True).start() # --- Basic Tool Functions ------------------------------------------------- def _fn_send_text(mid, cid, message): client.send_message(mid, cid, message) chat_id, sender, _ = get_thread_context() if chat_id and sender: record_bot_message(chat_id, sender, message) task_queue.put({ "type": "audio", "message_id": mid, "chat_id": cid, "prompt": message }) def _fn_send_accept(mid, cid, message): client.send_message(mid, cid, message) chat_id, sender, _ = get_thread_context() if chat_id and sender: record_bot_message(chat_id, sender, message) def _fn_summarize(mid, cid, text): summary = generate_llm(f"Summarize:\n\n{text}") _fn_send_text(mid, cid, summary) def _fn_translate(mid, cid, lang, text): resp = generate_llm(f"Translate to {lang}:\n\n{text}") _fn_send_text(mid, cid, resp) def _fn_joke(mid, cid): try: j = requests.get( "https://official-joke-api.appspot.com/random_joke", timeout=5 ).json() joke = f"{j['setup']}\n\n{j['punchline']}" except: joke = generate_llm("Tell me a short joke.") _fn_send_text(mid, cid, joke) def _fn_weather(mid, cid, loc): raw = requests.get(f"http://sl.wttr.in/{loc}?format=4", timeout=5).text report = generate_llm(f"Give a weather report in °C:\n\n{raw}") _fn_send_text(mid, cid, report) def _fn_inspire(mid, cid): quote = generate_llm("Give me a unique, random short inspirational quote.") _fn_send_text(mid, cid, f"✨ {quote}") def _fn_meme(mid, cid, txt): _fn_send_accept(mid, cid, "🎨 Generating meme…") task_queue.put({ "type": "image", "message_id": mid, "chat_id": cid, "prompt": f"meme: {txt}" }) def _fn_poll_create(mid, cid, question, options): votes = {i+1:0 for i in range(len(options))} polls[cid] = {"question": question, "options": options, "votes": votes, "voters": {}} text = f"📊 *Poll:* {question}\n" + "\n".join(f"{i+1}. {o}" for i,o in enumerate(options)) _fn_send_text(mid, cid, text) def _fn_poll_vote(mid, cid, voter, choice): poll = polls.get(cid) if not poll or choice<1 or choice>len(poll["options"]): return prev = poll["voters"].get(voter) if prev: poll["votes"][prev] -= 1 poll["votes"][choice] += 1 poll["voters"][voter] = choice _fn_send_text(mid, cid, f"✅ Voted for {poll['options'][choice-1]}") def _fn_poll_results(mid, cid): poll = polls.get(cid) if not poll: _fn_send_text(mid, cid, "No active poll.") return txt = f"📊 *Results:* {poll['question']}\n" + "\n".join( f"{i}. {o}: {poll['votes'][i]}" for i,o in enumerate(poll["options"],1) ) _fn_send_text(mid, cid, txt) def _fn_poll_end(mid, cid): poll = polls.pop(cid, None) if not poll: _fn_send_text(mid, cid, "No active poll.") return txt = f"📊 *Final Results:* {poll['question']}\n" + "\n".join( f"{i}. {o}: {poll['votes'][i]}" for i,o in enumerate(poll["options"],1) ) _fn_send_text(mid, cid, txt) def _fn_generate_images(message_id: str, chat_id: str, prompt: str, count: int = 1, width: Optional[int] = None, height: Optional[int] = None, **_): for i in range(1, count+1): try: img, path, ret_p, url = generate_image( prompt, message_id, message_id, BotConfig.IMAGE_DIR, width=width, height=height ) formatted = "\n\n".join(f"_{p.strip()}_" for p in ret_p.split("\n\n") if p.strip()) cap = f"✨ Image {i}/{count}: {url}\n>{chr(8203)} {formatted}" client.send_media(message_id, chat_id, path, cap, media_type="image") os.remove(path) except Exception as e: logger.warning(f"Img {i}/{count} failed: {e}") _fn_send_text(message_id, chat_id, f"😢 Failed to generate image {i}/{count}.") def _fn_voice_reply(message_id: str, chat_id: str, prompt: str, **_): proc = ( f"Just say this exactly as written in a flirty, friendly, playful, " f"happy and helpful but a little bit clumsy-cute way: {prompt}" ) res = generate_voice_reply(proc, model="openai-audio", voice="coral", audio_dir=BotConfig.AUDIO_DIR) if res and res[0]: path, _ = res client.send_media(message_id, chat_id, path, "", media_type="audio") os.remove(path) else: _fn_send_text(message_id, chat_id, prompt) # --- Pydantic Models for Function Calling -------------------------------- class BaseIntent(BaseModel): action: str class SummarizeIntent(BaseIntent): action: Literal["summarize"] text: str class TranslateIntent(BaseIntent): action: Literal["translate"] lang: str text: str class JokeIntent(BaseIntent): action: Literal["joke"] class WeatherIntent(BaseIntent): action: Literal["weather"] location: str class InspireIntent(BaseIntent): action: Literal["inspire"] class MemeIntent(BaseIntent): action: Literal["meme"] text: str class PollCreateIntent(BaseIntent): action: Literal["poll_create"] question: str options: List[str] class PollVoteIntent(BaseIntent): action: Literal["poll_vote"] voter: str choice: int class PollResultsIntent(BaseIntent): action: Literal["poll_results"] class PollEndIntent(BaseIntent): action: Literal["poll_end"] class GenerateImageIntent(BaseModel): action: Literal["generate_image"] prompt: str count: int = Field(default=1, ge=1) width: Optional[int] height: Optional[int] class SendTextIntent(BaseIntent): action: Literal["send_text"] message: str # list of all intent models INTENT_MODELS = [ SummarizeIntent, TranslateIntent, JokeIntent, WeatherIntent, InspireIntent, MemeIntent, PollCreateIntent, PollVoteIntent, PollResultsIntent, PollEndIntent, GenerateImageIntent, SendTextIntent ] ACTION_HANDLERS = { "summarize": lambda mid,cid,**i: _fn_summarize(mid,cid,i["text"]), "translate": lambda mid,cid,**i: _fn_translate(mid,cid,i["lang"],i["text"]), "joke": lambda mid,cid,**i: _fn_joke(mid,cid), "weather": lambda mid,cid,**i: _fn_weather(mid,cid,i["location"]), "inspire": lambda mid,cid,**i: _fn_inspire(mid,cid), "meme": lambda mid,cid,**i: _fn_meme(mid,cid,i["text"]), "poll_create": lambda mid,cid,**i: _fn_poll_create(mid,cid,i["question"],i["options"]), "poll_vote": lambda mid,cid,**i: _fn_poll_vote(mid,cid,i["voter"],i["choice"]), "poll_results": lambda mid,cid,**i: _fn_poll_results(mid,cid), "poll_end": lambda mid,cid,**i: _fn_poll_end(mid,cid), "generate_image": _fn_generate_images, "send_text": lambda mid,cid,**i: _fn_send_text(mid,cid,i["message"]), } # --- Intent Routing with Fallback ------------------------------------------ def route_intent(user_input: str, chat_id: str, sender: str): history_text = get_history_text(chat_id, sender) sys_prompt = ( "You are Eve. You can either chat or call one of these functions:\n" "- summarize(text)\n" "- translate(lang, text)\n" "- joke()\n" "- weather(location)\n" "- inspire()\n" "- meme(text)\n" "- poll_create(question, options)\n" "- poll_vote(voter, choice)\n" "- poll_results()\n" "- poll_end()\n" "- generate_image(prompt, count, width, height)\n" "- send_text(message)\n\n" "Return only raw JSON matching one of these shapes. For example:\n" " {\"action\":\"generate_image\",\"prompt\":\"a red fox\",\"count\":3,\"width\":512,\"height\":512}\n" "Otherwise, use send_text to reply with plain chat.\n" ) prompt = f"{sys_prompt}\nConversation so far:\n{history_text}\n\nUser: {user_input}" raw = generate_llm(prompt) # 1) Strict: try each Pydantic model try: parsed = json.loads(raw) except json.JSONDecodeError: return SendTextIntent(action="send_text", message=raw) for M in INTENT_MODELS: try: intent = M.parse_obj(parsed) return intent except ValidationError: continue logger.warning("Strict parse failed for all models, falling back to lenient") # 2) Lenient JSON get action = parsed.get("action") if action in ACTION_HANDLERS: data = parsed kwargs = {} if action == "generate_image": kwargs["prompt"] = data.get("prompt","") kwargs["count"] = int(data.get("count", BotConfig.DEFAULT_IMAGE_COUNT)) kwargs["width"] = data.get("width") kwargs["height"] = data.get("height") elif action == "send_text": kwargs["message"] = data.get("message","") elif action == "translate": kwargs["lang"] = data.get("lang","") kwargs["text"] = data.get("text","") elif action == "summarize": kwargs["text"] = data.get("text","") elif action == "weather": kwargs["location"] = data.get("location","") elif action == "meme": kwargs["text"] = data.get("text","") elif action == "poll_create": kwargs["question"] = data.get("question","") kwargs["options"] = data.get("options",[]) elif action == "poll_vote": kwargs["voter"] = sender kwargs["choice"] = int(data.get("choice",0)) # parse into Pydantic for uniformity try: return next(M for M in INTENT_MODELS if getattr(M, "__fields__", {}).get("action").default == action).parse_obj({"action":action,**kwargs}) except Exception: return SendTextIntent(action="send_text", message=raw) return SendTextIntent(action="send_text", message=raw) # --- FastAPI & Webhook ---------------------------------------------------- app = FastAPI() help_text = ( "🤖 *Eve* commands:\n" "• /help\n" "• /summarize \n" "• /translate |\n" "• /joke\n" "• /weather \n" "• /inspire\n" "• /meme \n" "• /poll |… / /results / /endpoll\n" "• /gen |||\n" "Otherwise chat or reply to my message to invoke tools." ) @app.post("/whatsapp") async def whatsapp_webhook(request: Request): data = await request.json() if request.headers.get("Authorization") != f"Bearer {BotConfig.WEBHOOK_AUTH_TOKEN}": raise HTTPException(403, "Unauthorized") chat_id = data["senderData"]["chatId"] sender = data["senderData"]["sender"] mid = data["idMessage"] set_thread_context(chat_id, sender, mid) logger.debug("Received webhook") if chat_id != BotConfig.BOT_GROUP_CHAT or data["typeWebhook"] != "incomingMessageReceived": return {"success": True} md = data["messageData"] tmd = md.get("textMessageData") or md.get("extendedTextMessageData") if not tmd: return {"success": True} body = (tmd.get("textMessage") or tmd.get("text","")).strip() record_user_message(chat_id, sender, body) low = body.lower() if low == "/help": _fn_send_text(mid, chat_id, help_text) return {"success": True} if low.startswith("/summarize "): _fn_summarize(mid, chat_id, body[11:].strip()) return {"success": True} if low.startswith("/translate "): lang, txt = body[11:].split("|", 1) _fn_translate(mid, chat_id, lang.strip(), txt.strip()) return {"success": True} if low == "/joke": _fn_joke(mid, chat_id) return {"success": True} if low.startswith("/weather "): _fn_weather(mid, chat_id, body[9:].strip().replace(" ","+")) return {"success": True} if low == "/inspire": _fn_inspire(mid, chat_id) return {"success": True} if low.startswith("/meme "): _fn_meme(mid, chat_id, body[6:].strip()) return {"success": True} if low.startswith("/poll "): parts = [p.strip() for p in body[6:].split("|")] _fn_poll_create(mid, chat_id, parts[0], parts[1:]) return {"success": True} if chat_id in polls and low.isdigit(): _fn_poll_vote(mid, chat_id, sender, int(low)) return {"success": True} if low == "/results": _fn_poll_results(mid, chat_id) return {"success": True} if low == "/endpoll": _fn_poll_end(mid, chat_id) return {"success": True} if low.startswith("/gen"): parts = body[4:].split("|") pr = parts[0].strip() ct = int(parts[1]) if len(parts) > 1 and parts[1].isdigit() else BotConfig.DEFAULT_IMAGE_COUNT width = int(parts[2]) if len(parts) > 2 and parts[2].isdigit() else None height = int(parts[3]) if len(parts) > 3 and parts[3].isdigit() else None _fn_send_accept(mid, chat_id, f"✨ Generating {ct} image(s)…") task_queue.put({ "type": "image", "message_id": mid, "chat_id": chat_id, "prompt": pr, "count": ct, "width": width, "height": height }) return {"success": True} if tmd.get("contextInfo", {}).get("mentionedJidList"): return {"success": True} effective = body intent = route_intent(effective, chat_id, sender) handler = ACTION_HANDLERS.get(intent.action) if handler: handler(mid, chat_id, **intent.dict(exclude={"action"})) else: _fn_send_text(mid, chat_id, "Sorry, I didn't understand that.") return {"success": True} @app.get("/", response_class=PlainTextResponse) def index(): return "Server is running!" if __name__ == "__main__": client.send_message_to( BotConfig.BOT_GROUP_CHAT, "🌟 Eve is online! Type /help to see commands." ) import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)