Spaces:
Running
Running
Chandima Prabhath
Update FUNCTION_SCHEMA to include width and height parameters for generate_image; enhance user guidance in route_intent function.
869bca1
import os | |
import threading | |
import requests | |
import logging | |
import queue | |
import json | |
import random | |
from collections import defaultdict, deque | |
from concurrent.futures import ThreadPoolExecutor | |
from fastapi import FastAPI, Request, HTTPException | |
from fastapi.responses import PlainTextResponse | |
from FLUX import generate_image | |
from VoiceReply import generate_voice_reply | |
from polLLM import generate_llm | |
# --- Logging Setup --------------------------------------------------------- | |
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() | |
logger = logging.getLogger("eve_bot") | |
logger.setLevel(LOG_LEVEL) | |
handler = logging.StreamHandler() | |
formatter = logging.Formatter( | |
"%(asctime)s [%(levelname)s] [%(message_id)s/%(sender)s] %(message)s" | |
) | |
handler.setFormatter(formatter) | |
class ContextFilter(logging.Filter): | |
def filter(self, record): | |
record.message_id = getattr(record, "message_id", "-") | |
record.sender = getattr(record, "sender", "-") | |
return True | |
handler.addFilter(ContextFilter()) | |
logger.handlers = [handler] | |
# Thread‐local to carry context through helpers | |
_thread_ctx = threading.local() | |
def set_thread_context(chat_id, sender, message_id): | |
_thread_ctx.chat_id = chat_id | |
_thread_ctx.sender = sender | |
_thread_ctx.message_id = message_id | |
def get_thread_context(): | |
return ( | |
getattr(_thread_ctx, "chat_id", None), | |
getattr(_thread_ctx, "sender", None), | |
getattr(_thread_ctx, "message_id", None), | |
) | |
# --- Conversation History ------------------------------------------------- | |
# keep last 20 messages per (chat_id, sender) | |
history = defaultdict(lambda: deque(maxlen=20)) | |
def record_user_message(chat_id, sender, message): | |
history[(chat_id, sender)].append(f"User: {message}") | |
def record_bot_message(chat_id, sender, message): | |
history[(chat_id, sender)].append(f"Assistant: {message}") | |
def get_history_text(chat_id, sender): | |
return "\n".join(history[(chat_id, sender)]) | |
# --- Bot Config & Client -------------------------------------------------- | |
class BotConfig: | |
GREEN_API_URL = os.getenv("GREEN_API_URL") | |
GREEN_API_MEDIA_URL = os.getenv("GREEN_API_MEDIA_URL", "https://api.green-api.com") | |
GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN") | |
GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE") | |
WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN") | |
BOT_GROUP_CHAT = "[email protected]" | |
BOT_JID = os.getenv("BOT_JID") | |
IMAGE_DIR = "/tmp/images" | |
AUDIO_DIR = "/tmp/audio" | |
DEFAULT_IMAGE_COUNT = 4 | |
def validate(cls): | |
missing = [n for n in ( | |
"GREEN_API_URL","GREEN_API_TOKEN", | |
"GREEN_API_ID_INSTANCE","WEBHOOK_AUTH_TOKEN","BOT_JID" | |
) if not getattr(cls, n)] | |
if missing: | |
raise ValueError(f"Missing env vars: {', '.join(missing)}") | |
class BotClient: | |
def __init__(self, cfg: BotConfig): | |
self.cfg = cfg | |
self.session = requests.Session() | |
def send(self, endpoint, payload, files=None, retries=3): | |
url = ( | |
f"{self.cfg.GREEN_API_URL}/waInstance" | |
f"{self.cfg.GREEN_API_ID_INSTANCE}/{endpoint}/" | |
f"{self.cfg.GREEN_API_TOKEN}" | |
) | |
for i in range(1, retries+1): | |
try: | |
resp = self.session.post( | |
url, | |
json=payload if files is None else None, | |
data=None if files is None else payload, | |
files=files | |
) | |
resp.raise_for_status() | |
return resp.json() | |
except requests.RequestException as e: | |
logger.warning(f"{endpoint} attempt {i}/{retries} failed: {e}") | |
return {"error":"failed"} | |
def send_message(self, message_id, chat_id, text): | |
return self.send("sendMessage", { | |
"chatId": chat_id, | |
"message": text, | |
"quotedMessageId": message_id | |
}) | |
def send_message_to(self, chat_id, text): | |
return self.send("sendMessage", { | |
"chatId": chat_id, | |
"message": text | |
}) | |
def send_media(self, message_id, chat_id, file_path, caption, media_type): | |
endpoint = "sendFileByUpload" | |
payload = { | |
"chatId": chat_id, | |
"caption": caption, | |
"quotedMessageId": message_id | |
} | |
with open(file_path,"rb") as f: | |
mime = "image/jpeg" if media_type=="image" else "audio/mpeg" | |
files = [("file",(os.path.basename(file_path),f,mime))] | |
return self.send(endpoint, payload, files=files) | |
# Validate env & init client | |
BotConfig.validate() | |
client = BotClient(BotConfig) | |
# --- Threading & Queues --------------------------------------------------- | |
task_queue = queue.Queue() | |
polls = {} | |
executor = ThreadPoolExecutor(max_workers=4) | |
def worker(): | |
while True: | |
task = task_queue.get() | |
try: | |
if task["type"] == "image": | |
_fn_generate_images( | |
task["message_id"], | |
task["chat_id"], | |
task["prompt"], | |
task.get("num_images", 1), | |
task.get("width"), | |
task.get("height") | |
) | |
elif task["type"] == "audio": | |
_fn_voice_reply( | |
task["message_id"], | |
task["chat_id"], | |
task["prompt"] | |
) | |
except Exception as e: | |
logger.error(f"Worker error {task}: {e}") | |
finally: | |
task_queue.task_done() | |
for _ in range(4): | |
threading.Thread(target=worker, daemon=True).start() | |
# --- Basic Tool Functions ------------------------------------------------- | |
def _fn_send_text(mid, cid, message): | |
"""Send text + record + queue voice.""" | |
client.send_message(mid, cid, message) | |
chat_id, sender, _ = get_thread_context() | |
if chat_id and sender: | |
record_bot_message(chat_id, sender, message) | |
task_queue.put({ | |
"type": "audio", | |
"message_id": mid, | |
"chat_id": cid, | |
"prompt": message | |
}) | |
def _fn_send_accept(mid, cid, message): | |
"""Send text + record, but no voice.""" | |
client.send_message(mid, cid, message) | |
chat_id, sender, _ = get_thread_context() | |
if chat_id and sender: | |
record_bot_message(chat_id, sender, message) | |
def _fn_summarize(mid, cid, text): | |
summary = generate_llm(f"Summarize:\n\n{text}") | |
_fn_send_text(mid, cid, summary) | |
def _fn_translate(mid, cid, lang, text): | |
resp = generate_llm(f"Translate to {lang}:\n\n{text}") | |
_fn_send_text(mid, cid, resp) | |
def _fn_joke(mid, cid): | |
try: | |
j = requests.get( | |
"https://official-joke-api.appspot.com/random_joke", | |
timeout=5 | |
).json() | |
joke = f"{j['setup']}\n\n{j['punchline']}" | |
except: | |
joke = generate_llm("Tell me a short joke.") | |
_fn_send_text(mid, cid, joke) | |
def _fn_weather(mid, cid, loc): | |
raw = requests.get(f"http://sl.wttr.in/{loc}?format=4", timeout=5).text | |
report = generate_llm(f"Give a weather report in °C:\n\n{raw}") | |
_fn_send_text(mid, cid, report) | |
def _fn_inspire(mid, cid): | |
quote = generate_llm("Give me a unique, random short inspirational quote.") | |
_fn_send_text(mid, cid, f"✨ {quote}") | |
def _fn_meme(mid, cid, txt): | |
_fn_send_accept(mid, cid, "🎨 Generating meme…") | |
task_queue.put({ | |
"type": "image", | |
"message_id": mid, | |
"chat_id": cid, | |
"prompt": f"meme: {txt}" | |
}) | |
def _fn_poll_create(mid, cid, question, options): | |
votes = {i+1:0 for i in range(len(options))} | |
polls[cid] = {"question": question, "options": options, "votes": votes, "voters": {}} | |
text = f"📊 *Poll:* {question}\n" + "\n".join(f"{i+1}. {o}" for i,o in enumerate(options)) | |
_fn_send_text(mid, cid, text) | |
def _fn_poll_vote(mid, cid, voter, choice): | |
poll = polls.get(cid) | |
if not poll or choice<1 or choice>len(poll["options"]): | |
return | |
prev = poll["voters"].get(voter) | |
if prev: | |
poll["votes"][prev] -= 1 | |
poll["votes"][choice] += 1 | |
poll["voters"][voter] = choice | |
_fn_send_text(mid, cid, f"✅ Voted for {poll['options'][choice-1]}") | |
def _fn_poll_results(mid, cid): | |
poll = polls.get(cid) | |
if not poll: | |
_fn_send_text(mid, cid, "No active poll.") | |
return | |
txt = f"📊 *Results:* {poll['question']}\n" + "\n".join( | |
f"{i}. {o}: {poll['votes'][i]}" for i,o in enumerate(poll["options"],1) | |
) | |
_fn_send_text(mid, cid, txt) | |
def _fn_poll_end(mid, cid): | |
poll = polls.pop(cid, None) | |
if not poll: | |
_fn_send_text(mid, cid, "No active poll.") | |
return | |
txt = f"📊 *Final Results:* {poll['question']}\n" + "\n".join( | |
f"{i}. {o}: {poll['votes'][i]}" for i,o in enumerate(poll["options"],1) | |
) | |
_fn_send_text(mid, cid, txt) | |
def _fn_generate_images(mid, cid, prompt, count=1, width=None, height=None): | |
for i in range(1, count+1): | |
try: | |
img, path, ret_p, url = generate_image( | |
prompt, mid, mid, BotConfig.IMAGE_DIR, | |
width=width, height=height | |
) | |
formatted = "\n\n".join(f"_{p.strip()}_" for p in ret_p.split("\n\n") if p.strip()) | |
cap = f"✨ Image {i}/{count}: {url}\n>{chr(8203)} {formatted}" | |
client.send_media(mid, cid, path, cap, media_type="image") | |
os.remove(path) | |
except Exception as e: | |
logger.warning(f"Img {i}/{count} failed: {e}") | |
_fn_send_text(mid, cid, f"😢 Failed to generate image {i}/{count}.") | |
def _fn_voice_reply(mid, cid, prompt): | |
proc = ( | |
f"Just say this exactly as written in a flirty, friendly, playful, " | |
f"happy and helpful but a little bit clumsy-cute way: {prompt}" | |
) | |
res = generate_voice_reply(proc, model="openai-audio", voice="coral", audio_dir=BotConfig.AUDIO_DIR) | |
if res and res[0]: | |
path, _ = res | |
client.send_media(mid, cid, path, "", media_type="audio") | |
os.remove(path) | |
else: | |
_fn_send_text(mid, cid, prompt) | |
# --- Intent Dispatcher ---------------------------------------------------- | |
FUNCTION_SCHEMA = { | |
"summarize": {"description":"Summarize text", "params":["text"]}, | |
"translate": {"description":"Translate text", "params":["lang","text"]}, | |
"joke": {"description":"Tell a joke", "params":[]}, | |
"weather": {"description":"Weather report", "params":["location"]}, | |
"inspire": {"description":"Inspirational quote","params":[]}, | |
"meme": {"description":"Generate meme", "params":["text"]}, | |
"poll_create": {"description":"Create poll", "params":["question","options"]}, | |
"poll_vote": {"description":"Vote poll", "params":["choice"]}, | |
"poll_results": {"description":"Show poll results", "params":[]}, | |
"poll_end": {"description":"End poll", "params":[]}, | |
"generate_image": { | |
"description":"Generate images", | |
"params":["prompt","count","width","height"] | |
}, | |
"send_text": {"description":"Send plain text", "params":["message"]}} | |
class IntentDispatcher: | |
def __init__(self): | |
self.handlers = {} | |
def register(self, action): | |
def decorator(fn): | |
self.handlers[action] = fn | |
return fn | |
return decorator | |
def dispatch(self, action, mid, cid, intent): | |
fn = self.handlers.get(action) | |
if not fn: | |
return False | |
fn(mid, cid, intent) | |
return True | |
dispatcher = IntentDispatcher() | |
def validate_intent(action, intent): | |
schema = FUNCTION_SCHEMA.get(action) | |
if not schema: | |
return False | |
for p in schema["params"]: | |
if p not in intent: | |
logger.warning(f"Missing param '{p}' for action '{action}'") | |
return False | |
return True | |
def _h_summarize(mid, cid, intent): | |
_fn_summarize(mid, cid, intent["text"]) | |
def _h_translate(mid, cid, intent): | |
_fn_translate(mid, cid, intent["lang"], intent["text"]) | |
def _h_joke(mid, cid, intent): | |
_fn_joke(mid, cid) | |
def _h_weather(mid, cid, intent): | |
_fn_weather(mid, cid, intent["location"]) | |
def _h_inspire(mid, cid, intent): | |
_fn_inspire(mid, cid) | |
def _h_meme(mid, cid, intent): | |
_fn_meme(mid, cid, intent["text"]) | |
def _h_poll_create(mid, cid, intent): | |
_fn_poll_create(mid, cid, intent["question"], intent["options"]) | |
def _h_poll_vote(mid, cid, intent): | |
_fn_poll_vote(mid, cid, intent["voter"], intent["choice"]) | |
def _h_poll_results(mid, cid, intent): | |
_fn_poll_results(mid, cid) | |
def _h_poll_end(mid, cid, intent): | |
_fn_poll_end(mid, cid) | |
def _h_generate_image(mid, cid, intent): | |
prompt = intent["prompt"] | |
count = intent.get("count", 1) | |
width = intent.get("width") | |
height = intent.get("height") | |
_fn_send_accept(mid, cid, f"✨ Generating {count} image(s)…") | |
task_queue.put({ | |
"type": "image", | |
"message_id": mid, | |
"chat_id": cid, | |
"prompt": prompt, | |
"num_images": count, | |
"width": width, | |
"height": height | |
}) | |
def _h_send_text(mid, cid, intent): | |
_fn_send_text(mid, cid, intent["message"]) | |
# --- Intent Routing -------------------------------------------------------- | |
def route_intent(user_input, chat_id, sender): | |
history_text = get_history_text(chat_id, sender) | |
sys_prompt = ( | |
"You are Eve. You can either chat or call one of these functions:\n" | |
+ "\n".join(f"- {n}: {f['description']}" for n,f in FUNCTION_SCHEMA.items()) | |
+ "\n\nTo call a function, return JSON with \"action\":\"<name>\", plus its parameters.\n" | |
+ "Here’s an example for generating images:\n" | |
+ " {\"action\":\"generate_image\",\"prompt\":\"a red fox\",\"count\":3,\"width\":512,\"height\":512}\n" | |
+ "Otherwise return JSON with \"action\":\"send_text\",\"message\":\"...\".\n" | |
"Return only raw JSON." | |
) | |
prompt = ( | |
f"{sys_prompt}\n\n" | |
f"Conversation so far:\n{history_text}\n\n" | |
f"User: {user_input}" | |
) | |
raw = generate_llm(prompt) | |
try: | |
return json.loads(raw) | |
except: | |
return {"action":"send_text","message":raw} | |
# --- FastAPI & Webhook ---------------------------------------------------- | |
app = FastAPI() | |
help_text = ( | |
"🤖 *Eve* commands:\n" | |
"• /help\n" | |
"• /summarize <text>\n" | |
"• /translate <lang>|<text>\n" | |
"• /joke\n" | |
"• /weather <loc>\n" | |
"• /inspire\n" | |
"• /meme <text>\n" | |
"• /poll <Q>|… / /results / /endpoll\n" | |
"• /gen <prompt>|<count>|<width>|<height>\n" | |
"Otherwise chat or reply to my message to invoke tools." | |
) | |
async def whatsapp_webhook(request: Request): | |
data = await request.json() | |
if request.headers.get("Authorization") != f"Bearer {BotConfig.WEBHOOK_AUTH_TOKEN}": | |
raise HTTPException(403, "Unauthorized") | |
chat_id = data["senderData"]["chatId"] | |
sender = data["senderData"]["sender"] | |
mid = data["idMessage"] | |
set_thread_context(chat_id, sender, mid) | |
logger.debug("Received webhook") | |
if chat_id != BotConfig.BOT_GROUP_CHAT or data["typeWebhook"] != "incomingMessageReceived": | |
return {"success": True} | |
md = data["messageData"] | |
tmd = md.get("textMessageData") or md.get("extendedTextMessageData") | |
if not tmd: | |
return {"success": True} | |
body = (tmd.get("textMessage") or tmd.get("text","")).strip() | |
ctx = tmd.get("contextInfo", {}) | |
# record user message | |
record_user_message(chat_id, sender, body) | |
# Slash commands | |
low = body.lower() | |
if low == "/help": | |
_fn_send_text(mid, chat_id, help_text) | |
return {"success": True} | |
if low.startswith("/summarize "): | |
_fn_summarize(mid, chat_id, body[11:].strip()) | |
return {"success": True} | |
if low.startswith("/translate "): | |
lang, txt = body[11:].split("|", 1) | |
_fn_translate(mid, chat_id, lang.strip(), txt.strip()) | |
return {"success": True} | |
if low == "/joke": | |
_fn_joke(mid, chat_id) | |
return {"success": True} | |
if low.startswith("/weather "): | |
_fn_weather(mid, chat_id, body[9:].strip().replace(" ","+")) | |
return {"success": True} | |
if low == "/inspire": | |
_fn_inspire(mid, chat_id) | |
return {"success": True} | |
if low.startswith("/meme "): | |
_fn_meme(mid, chat_id, body[6:].strip()) | |
return {"success": True} | |
if low.startswith("/poll "): | |
parts = [p.strip() for p in body[6:].split("|")] | |
_fn_poll_create(mid, chat_id, parts[0], parts[1:]) | |
return {"success": True} | |
if chat_id in polls and low.isdigit(): | |
_fn_poll_vote(mid, chat_id, sender, int(low)) | |
return {"success": True} | |
if low == "/results": | |
_fn_poll_results(mid, chat_id) | |
return {"success": True} | |
if low == "/endpoll": | |
_fn_poll_end(mid, chat_id) | |
return {"success": True} | |
if low.startswith("/gen"): | |
parts = body[4:].split("|") | |
pr = parts[0].strip() | |
ct = int(parts[1]) if len(parts) > 1 and parts[1].isdigit() else BotConfig.DEFAULT_IMAGE_COUNT | |
width = int(parts[2]) if len(parts) > 2 and parts[2].isdigit() else None | |
height = int(parts[3]) if len(parts) > 3 and parts[3].isdigit() else None | |
_fn_send_accept(mid, chat_id, f"✨ Generating {ct} image(s)…") | |
task_queue.put({ | |
"type": "image", | |
"message_id": mid, | |
"chat_id": chat_id, | |
"prompt": pr, | |
"num_images": ct, | |
"width": width, | |
"height": height | |
}) | |
return {"success": True} | |
# Skip mentions | |
if ctx.get("mentionedJidList"): | |
return {"success": True} | |
# Build effective text (handle quoted replies to the bot) | |
if md.get("typeMessage") == "quotedMessage": | |
ext = md["extendedTextMessageData"] | |
quoted = md["quotedMessage"] | |
if ext.get("participant") == BotConfig.BOT_JID: | |
effective = ( | |
f"Quoted: {quoted.get('textMessage','')}\n" | |
f"User: {ext.get('text','')}" | |
) | |
else: | |
effective = body | |
else: | |
effective = body | |
# Route intent & dispatch | |
intent = route_intent(effective, chat_id, sender) | |
action = intent.get("action") | |
if action in FUNCTION_SCHEMA: | |
if not validate_intent(action, intent): | |
_fn_send_text(mid, chat_id, f"❗ Missing parameter(s) for `{action}`.") | |
else: | |
dispatched = dispatcher.dispatch(action, mid, chat_id, intent) | |
if not dispatched: | |
_fn_send_text(mid, chat_id, intent.get("message","Sorry, I couldn't handle that.")) | |
else: | |
# fallback chat | |
_fn_send_text(mid, chat_id, intent.get("message","Sorry, I didn't get that.")) | |
return {"success": True} | |
def index(): | |
return "Server is running!" | |
if __name__ == "__main__": | |
client.send_message_to( | |
BotConfig.BOT_GROUP_CHAT, | |
"🌟 Eve is online! Type /help to see commands." | |
) | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=7860) | |