Spaces:
Running
Running
Chandima Prabhath
Enhance configuration to support function calling for image generation and text replies; improve help text for user commands.
3ad83d3
| import os | |
| import threading | |
| import requests | |
| import logging | |
| import queue | |
| import json | |
| import time | |
| import random | |
| from concurrent.futures import ThreadPoolExecutor | |
| from fastapi import FastAPI, Request, HTTPException | |
| from fastapi.responses import PlainTextResponse | |
| from FLUX import generate_image | |
| from VoiceReply import generate_voice_reply | |
| from polLLM import generate_llm | |
| # --- Configuration and Client Classes --- | |
| class BotConfig: | |
| GREEN_API_URL = os.getenv("GREEN_API_URL") | |
| GREEN_API_MEDIA_URL = os.getenv("GREEN_API_MEDIA_URL", "https://api.green-api.com") | |
| GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN") | |
| GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE") | |
| WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN") | |
| BOT_GROUP_CHAT = "[email protected]" | |
| BOT_JID = os.getenv("BOT_JID") # your bot's own WhatsApp JID | |
| IMAGE_DIR = "/tmp/images" | |
| AUDIO_DIR = "/tmp/audio" | |
| DEFAULT_IMAGE_COUNT = 4 | |
| def validate(cls): | |
| missing = [ | |
| name for name in ( | |
| "GREEN_API_URL", | |
| "GREEN_API_TOKEN", | |
| "GREEN_API_ID_INSTANCE", | |
| "WEBHOOK_AUTH_TOKEN", | |
| "BOT_JID", | |
| ) if not getattr(cls, name) | |
| ] | |
| if missing: | |
| raise ValueError(f"Missing env vars: {', '.join(missing)}") | |
| class BotClient: | |
| def __init__(self, cfg: BotConfig): | |
| self.cfg = cfg | |
| self.session = requests.Session() | |
| logging.basicConfig(level=logging.DEBUG, | |
| format="%(asctime)s [%(levelname)s] %(message)s") | |
| def send(self, endpoint: str, payload: dict, files=None, retries=3): | |
| url = (f"{self.cfg.GREEN_API_URL}/waInstance" | |
| f"{self.cfg.GREEN_API_ID_INSTANCE}/{endpoint}/" | |
| f"{self.cfg.GREEN_API_TOKEN}") | |
| for attempt in range(1, retries + 1): | |
| try: | |
| resp = self.session.post( | |
| url, | |
| json=payload if files is None else None, | |
| data=None if files is None else payload, | |
| files=files | |
| ) | |
| resp.raise_for_status() | |
| return resp.json() | |
| except requests.RequestException as e: | |
| logging.warning(f"Attempt {attempt}/{retries} failed for {endpoint}: {e}") | |
| if attempt == retries: | |
| logging.error(f"{endpoint} ultimately failed: {e}") | |
| return {"error": str(e)} | |
| def send_message(self, message_id: str, chat_id: str, text: str): | |
| return self.send("sendMessage", { | |
| "chatId": chat_id, | |
| "message": text, | |
| "quotedMessageId": message_id | |
| }) | |
| def send_message_to(self, chat_id: str, text: str): | |
| return self.send("sendMessage", { | |
| "chatId": chat_id, | |
| "message": text | |
| }) | |
| def send_media(self, message_id: str, chat_id: str, file_path: str, | |
| caption: str, media_type: str): | |
| endpoint = "sendFileByUpload" | |
| payload = { | |
| "chatId": chat_id, | |
| "caption": caption, | |
| "quotedMessageId": message_id | |
| } | |
| with open(file_path, "rb") as f: | |
| mime = "image/jpeg" if media_type == "image" else "audio/mpeg" | |
| files = [("file", (os.path.basename(file_path), f, mime))] | |
| return self.send(endpoint, payload, files=files) | |
| # Validate env | |
| BotConfig.validate() | |
| client = BotClient(BotConfig) | |
| # --- Threading, Queues, Stores --- | |
| task_queue = queue.Queue() | |
| trivia_store = {} | |
| polls = {} | |
| last_message_time = time.time() | |
| def inactivity_monitor(): | |
| global last_message_time | |
| while True: | |
| time.sleep(60) | |
| if time.time() - last_message_time >= 300: | |
| client.send_message_to( | |
| BotConfig.BOT_GROUP_CHAT, | |
| "⏰ I haven't heard from you in a while! I'm still here if you need anything." | |
| ) | |
| last_message_time = time.time() | |
| threading.Thread(target=inactivity_monitor, daemon=True).start() | |
| executor = ThreadPoolExecutor(max_workers=4) | |
| def worker(): | |
| while True: | |
| task = task_queue.get() | |
| try: | |
| if task["type"] == "image": | |
| _fn_generate_images(task["message_id"], | |
| task["chat_id"], | |
| task["prompt"], | |
| task.get("num_images", 1)) | |
| elif task["type"] == "audio": | |
| _fn_voice_reply(task["message_id"], | |
| task["chat_id"], | |
| task["prompt"]) | |
| except Exception as e: | |
| logging.error(f"Worker error {task}: {e}") | |
| finally: | |
| task_queue.task_done() | |
| for _ in range(4): | |
| threading.Thread(target=worker, daemon=True).start() | |
| # --- Primitive “tool” functions --- | |
| def _fn_summarize(message_id, chat_id, text): | |
| summary = generate_llm(f"Summarize this text in one short paragraph:\n\n{text}") | |
| client.send_message(message_id, chat_id, summary) | |
| def _fn_translate(message_id, chat_id, lang, text): | |
| resp = generate_llm(f"Translate the following into {lang}:\n\n{text}") | |
| client.send_message(message_id, chat_id, resp) | |
| def _fn_joke(message_id, chat_id): | |
| try: | |
| j = requests.get("https://official-joke-api.appspot.com/random_joke", timeout=5).json() | |
| joke = f"{j['setup']}\n\n{j['punchline']}" | |
| except: | |
| joke = generate_llm("Tell me a short, funny joke.") | |
| client.send_message(message_id, chat_id, joke) | |
| def _fn_weather(message_id, chat_id, loc): | |
| raw = requests.get(f"http://sl.wttr.in/{loc}?format=4", timeout=5).text | |
| report = generate_llm( | |
| f"Convert this weather report into Celsius and craft a short, creative report:\n\n{raw}" | |
| ) | |
| client.send_message(message_id, chat_id, report) | |
| task_queue.put({ | |
| "type":"audio","message_id":message_id,"chat_id":chat_id, | |
| "prompt":f"Speak only this weather report: {report}" | |
| }) | |
| def _fn_weather_poem(message_id, chat_id, loc): | |
| raw = requests.get(f"http://sl.wttr.in/{loc}?format=4", timeout=5).text | |
| poem = generate_llm( | |
| f"Write a short, poetic weather summary in Celsius based on:\n\n{raw}" | |
| ) | |
| client.send_message(message_id, chat_id, poem) | |
| task_queue.put({ | |
| "type":"audio","message_id":message_id,"chat_id":chat_id, | |
| "prompt":f"Speak only this poetic weather summary: {poem}" | |
| }) | |
| def _fn_inspire(message_id, chat_id): | |
| quote = generate_llm(f"Give me a short inspirational unique quote.") | |
| client.send_message(message_id, chat_id, f"✨ {quote}") | |
| def _fn_trivia(message_id, chat_id): | |
| raw = generate_llm( | |
| f"Generate a unique trivia Q&A in JSON: {{\"question\":\"...\",\"answer\":\"...\"}}" | |
| ) | |
| try: | |
| obj = json.loads(raw.strip().strip("```json").strip("```")) | |
| trivia_store[chat_id] = obj | |
| client.send_message( | |
| message_id, chat_id, | |
| f"❓ {obj['question']}\nReply `/answer` or `/answer your guess`." | |
| ) | |
| except: | |
| client.send_message(message_id, chat_id, "Failed to generate trivia.") | |
| def _fn_answer(message_id, chat_id, guess): | |
| if chat_id not in trivia_store: | |
| client.send_message(message_id, chat_id, "No active trivia. `/trivia` to start.") | |
| return | |
| qa = trivia_store.pop(chat_id) | |
| if guess: | |
| verdict = generate_llm( | |
| f"Q: {qa['question']}\nCorrect: {qa['answer']}\nUser: {guess}\nCorrect?" | |
| ) | |
| client.send_message(message_id, chat_id, verdict) | |
| else: | |
| client.send_message(message_id, chat_id, f"💡 Answer: {qa['answer']}") | |
| def _fn_meme(message_id, chat_id, txt): | |
| client.send_message(message_id, chat_id, "🎨 Generating your meme...") | |
| task_queue.put({"type":"image","message_id":message_id, | |
| "chat_id":chat_id,"prompt":f"meme: {txt}"}) | |
| def _fn_poll(message_id, chat_id, question, options): | |
| votes = {i+1:0 for i in range(len(options))} | |
| polls[chat_id] = {"question":question,"options":options,"votes":votes,"voters":{}} | |
| text = f"📊 *Poll:* {question}\n" + "\n".join( | |
| f"{i+1}. {o}" for i,o in enumerate(options) | |
| ) | |
| client.send_message(message_id, chat_id, text) | |
| def _fn_poll_vote(message_id, chat_id, voter, choice): | |
| poll = polls.get(chat_id) | |
| if not poll or choice < 1 or choice > len(poll["options"]): | |
| return | |
| prev = poll["voters"].get(voter) | |
| if prev: | |
| poll["votes"][prev] -= 1 | |
| poll["votes"][choice] += 1 | |
| poll["voters"][voter] = choice | |
| client.send_message(message_id, chat_id, | |
| f"✅ Voted for {poll['options'][choice-1]}") | |
| def _fn_poll_results(message_id, chat_id): | |
| poll = polls.get(chat_id) | |
| if not poll: | |
| client.send_message(message_id, chat_id, "No active poll.") | |
| return | |
| text = f"📊 *Results:* {poll['question']}\n" + "\n".join( | |
| f"{i}. {o}: {poll['votes'][i]}" for i,o in enumerate(poll["options"],1) | |
| ) | |
| client.send_message(message_id, chat_id, text) | |
| def _fn_poll_end(message_id, chat_id): | |
| poll = polls.pop(chat_id, None) | |
| if not poll: | |
| client.send_message(message_id, chat_id, "No active poll.") | |
| return | |
| text = f"📊 *Final Results:* {poll['question']}\n" + "\n".join( | |
| f"{i}. {o}: {poll['votes'][i]}" for i,o in enumerate(poll["options"],1) | |
| ) | |
| client.send_message(message_id, chat_id, text) | |
| def _fn_generate_images(message_id, chat_id, prompt, count=1): | |
| for i in range(1, count+1): | |
| try: | |
| img, path, ret_prompt, url = generate_image( | |
| prompt, message_id, message_id, BotConfig.IMAGE_DIR | |
| ) | |
| formatted = "\n\n".join(f"_{p.strip()}_" | |
| for p in ret_prompt.split("\n\n") if p.strip()) | |
| caption = f"✨ Image {i}/{count}: {url}\n>{chr(8203)} {formatted}" | |
| client.send_media(message_id, chat_id, path, caption, media_type="image") | |
| os.remove(path) | |
| except Exception as e: | |
| logging.warning(f"Image {i}/{count} failed: {e}") | |
| client.send_message(message_id, chat_id, | |
| f"😢 Failed to generate image {i}/{count}.") | |
| def _fn_voice_reply(message_id, chat_id, prompt): | |
| result = generate_voice_reply(prompt, | |
| model="openai-audio", | |
| voice="coral", | |
| audio_dir=BotConfig.AUDIO_DIR) | |
| if result and result[0]: | |
| audio_path, _ = result | |
| client.send_media(message_id, chat_id, audio_path, "", media_type="audio") | |
| os.remove(audio_path) | |
| else: | |
| # fallback to text | |
| response = generate_llm(prompt) | |
| client.send_message(message_id, chat_id, response) | |
| # --- Intent router for fallback --- | |
| FUNCTION_SCHEMA = { | |
| "generate_image": { | |
| "description": "Generate one or more images", | |
| "params": ["prompt","count"] | |
| }, | |
| "send_text": { | |
| "description": "Send a plain text response", | |
| "params": ["message"] | |
| } | |
| } | |
| def route_intent(user_input: str): | |
| """ | |
| Ask the LLM whether to call a function or just chat. | |
| Expects a JSON response like: | |
| {"action":"generate_image","prompt":"a sunset","count":2} | |
| or | |
| {"action":"send_text","message":"Here's my reply..."} | |
| """ | |
| sys_prompt = ( | |
| "You are Eve. You can either chat normally or call one of these functions:\n" | |
| + "\n".join(f"- {name}: {info['description']}" | |
| for name,info in FUNCTION_SCHEMA.items()) | |
| + "\n\nIf the user wants an image generated, return JSON with " | |
| "\"action\":\"generate_image\",\"prompt\":\"...\",\"count\":<int>.\n" | |
| "Otherwise return JSON with \"action\":\"send_text\",\"message\":\"...\".\n" | |
| "Do NOT wrap your response in any extra text—only raw JSON." | |
| ) | |
| raw = generate_llm(f"{sys_prompt}\nUser: {user_input}") | |
| try: | |
| return json.loads(raw) | |
| except: | |
| # fallback: treat entire raw as chat | |
| return {"action":"send_text","message":raw} | |
| # --- FastAPI App & Webhook --- | |
| app = FastAPI() | |
| help_text = ( | |
| "🤖 *Hi, I'm Eve!* Commands:\n" | |
| "• /help\n" | |
| "• /summarize <text>\n" | |
| "• /translate <lang>|<text>\n" | |
| "• /joke\n" | |
| "• /weather <loc>\n" | |
| "• /weatherpoem <loc>\n" | |
| "• /inspire\n" | |
| "• /trivia / /answer\n" | |
| "• /meme <text>\n" | |
| "• /poll <Q>|<opt1>|… / /results / /endpoll\n" | |
| "• /gen <prompt>|<count>\n" | |
| "Otherwise I’ll chat or generate images for you!" | |
| ) | |
| async def whatsapp_webhook(request: Request): | |
| global last_message_time | |
| last_message_time = time.time() | |
| # Auth | |
| if request.headers.get("Authorization") != f"Bearer {BotConfig.WEBHOOK_AUTH_TOKEN}": | |
| raise HTTPException(403, "Unauthorized") | |
| data = await request.json() | |
| chat_id = data.get("senderData", {}).get("chatId") | |
| if chat_id != BotConfig.BOT_GROUP_CHAT or data.get("typeWebhook") != "incomingMessageReceived": | |
| return {"success": True} | |
| md = data["messageData"] | |
| mid = data["idMessage"] | |
| tmd = md.get("textMessageData") or md.get("extendedTextMessageData") | |
| if not tmd: | |
| return {"success": True} | |
| body = tmd.get("textMessage", tmd.get("text", "")).strip() | |
| ctx = tmd.get("contextInfo", {}) | |
| # 1) Quoted‑reply to bot | |
| if md.get("typeMessage") == "quotedMessage": | |
| ext = md["extendedTextMessageData"] | |
| quoted = md["quotedMessage"] | |
| if ext.get("participant") == BotConfig.BOT_JID: | |
| user_reply = ext.get("text", "") | |
| quoted_text = quoted.get("textMessage", "") | |
| prompt = ( | |
| f"You asked: {quoted_text}\n" | |
| f"User replied: {user_reply}\n" | |
| "Provide a helpful follow‑up." | |
| ) | |
| ans = generate_llm(prompt) | |
| client.send_message(mid, chat_id, ans) | |
| task_queue.put({ | |
| "type":"audio","message_id":mid, | |
| "chat_id":chat_id,"prompt":ans | |
| }) | |
| return {"success": True} | |
| # 2) Mentions skip | |
| if ctx.get("mentionedJidList"): | |
| return {"success": True} | |
| low = body.lower() | |
| # 3) Slash‑commands | |
| if low == "/help": | |
| client.send_message(mid, chat_id, help_text); return {"success": True} | |
| if low.startswith("/summarize "): | |
| _fn_summarize(mid, chat_id, body[11:].strip()); return {"success": True} | |
| if low.startswith("/translate "): | |
| lang, txt = body[11:].split("|",1) | |
| _fn_translate(mid, chat_id, lang.strip(), txt.strip()); return {"success": True} | |
| if low == "/joke": | |
| _fn_joke(mid, chat_id); return {"success": True} | |
| if low.startswith("/weather "): | |
| _fn_weather(mid, chat_id, body[9:].strip().replace(" ","+")); return {"success": True} | |
| if low.startswith("/weatherpoem "): | |
| _fn_weather_poem(mid, chat_id, body[13:].strip().replace(" ","+")); return {"success": True} | |
| if low == "/inspire": | |
| _fn_inspire(mid, chat_id); return {"success": True} | |
| if low == "/trivia": | |
| _fn_trivia(mid, chat_id); return {"success": True} | |
| if low.startswith("/answer"): | |
| _fn_answer(mid, chat_id, body[7:].strip()); return {"success": True} | |
| if low.startswith("/meme "): | |
| _fn_meme(mid, chat_id, body[6:].strip()); return {"success": True} | |
| if low.startswith("/poll "): | |
| parts = [p.strip() for p in body[6:].split("|")] | |
| _fn_poll(mid, chat_id, parts[0], parts[1:]); return {"success": True} | |
| if chat_id in polls and low.isdigit(): | |
| _fn_poll_vote(mid, chat_id, | |
| data["senderData"]["sender"], | |
| int(low)); return {"success": True} | |
| if low == "/results": | |
| _fn_poll_results(mid, chat_id); return {"success": True} | |
| if low == "/endpoll": | |
| _fn_poll_end(mid, chat_id); return {"success": True} | |
| if low.startswith("/gen"): | |
| parts = body[4:].split("|",1) | |
| prompt = parts[0].strip() | |
| cnt = int(parts[1]) if len(parts)>1 and parts[1].isdigit() else BotConfig.DEFAULT_IMAGE_COUNT | |
| client.send_message(mid, chat_id, f"✨ Generating {cnt} image(s)…") | |
| task_queue.put({"type":"image","message_id":mid,"chat_id":chat_id, | |
| "prompt":prompt,"num_images":cnt}) | |
| return {"success": True} | |
| # 4) Fallback → function calling router | |
| intent = route_intent(body) | |
| act = intent.get("action") | |
| if act == "generate_image": | |
| pr = intent.get("prompt","") | |
| ct = intent.get("count",1) | |
| client.send_message(mid, chat_id, f"👍 Generating {ct} images for “{pr}”…") | |
| task_queue.put({"type":"image","message_id":mid,"chat_id":chat_id, | |
| "prompt":pr,"num_images":ct}) | |
| else: | |
| # send_text or any unknown | |
| msg = intent.get("message", "Sorry, I didn't understand.") | |
| client.send_message(mid, chat_id, msg) | |
| task_queue.put({"type":"audio","message_id":mid,"chat_id":chat_id, | |
| "prompt":msg}) | |
| return {"success": True} | |
| def index(): | |
| return "Server is running!" | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |