Spaces:
Running
Running
""" | |
Professional WhatsApp Bot using Green-API | |
Author: Assistant | |
Description: A comprehensive WhatsApp bot with a professional, class-based structure. | |
Features include image generation, image editing, voice replies, | |
and various utility functions, all handled by an asynchronous task queue. | |
Includes request logging and chat ID filtering. | |
""" | |
import os | |
import threading | |
import requests | |
import logging | |
import queue | |
import json | |
import base64 | |
from typing import List, Optional, Union, Literal, Dict, Any, Tuple, Set | |
from collections import defaultdict, deque | |
from concurrent.futures import ThreadPoolExecutor | |
from fastapi import FastAPI, Request, HTTPException | |
from fastapi.responses import JSONResponse | |
from pydantic import BaseModel, Field, ValidationError | |
import uvicorn | |
# Assume these are your custom libraries for AI functionalities | |
from FLUX import generate_image | |
from VoiceReply import generate_voice_reply | |
from polLLM import generate_llm, LLMBadRequestError | |
import flux_kontext_lib | |
# --- Configuration --------------------------------------------------------- | |
class BotConfig: | |
"""Manages all bot configuration from environment variables.""" | |
GREEN_API_URL: str | |
GREEN_API_TOKEN: str | |
GREEN_API_ID_INSTANCE: str | |
WEBHOOK_AUTH_TOKEN: str | |
IMAGE_DIR: str = "/tmp/whatsapp_images" | |
AUDIO_DIR: str = "/tmp/whatsapp_audio" | |
TEMP_DIR: str = "/tmp/whatsapp_edit" | |
DEFAULT_IMAGE_COUNT: int = 4 | |
MAX_HISTORY_SIZE: int = 20 | |
WORKER_THREADS: int = 4 | |
# Set log level to DEBUG to capture all incoming requests | |
LOG_LEVEL: str = "DEBUG" | |
# Whitelisted chat IDs. The bot will only respond to these chats. | |
ALLOWED_CHATS: Set[str] = {"[email protected]", "[email protected]"} | |
def __init__(self): | |
"""Initializes configuration from environment variables.""" | |
self.GREEN_API_URL = os.getenv("GREEN_API_URL") | |
self.GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN") | |
self.GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE") | |
self.WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN") | |
# Allow overriding log level from environment | |
self.LOG_LEVEL = os.getenv("LOG_LEVEL", "DEBUG").upper() | |
self._validate() | |
def _validate(self): | |
"""Ensures all required environment variables are set.""" | |
missing = [ | |
var for var in ("GREEN_API_URL", "GREEN_API_TOKEN", | |
"GREEN_API_ID_INSTANCE", "WEBHOOK_AUTH_TOKEN") | |
if not getattr(self, var) | |
] | |
if missing: | |
raise ValueError(f"Missing required environment variables: {', '.join(missing)}") | |
# --- Logging Setup --------------------------------------------------------- | |
class LoggerSetup: | |
"""Sets up and manages structured logging for the application.""" | |
def setup(level: str) -> logging.Logger: | |
""" | |
Configures the root logger for the application. | |
Args: | |
level: The logging level (e.g., "DEBUG", "INFO"). | |
Returns: | |
A configured logger instance. | |
""" | |
logger = logging.getLogger("whatsapp_bot") | |
logger.setLevel(level) | |
# Avoid adding duplicate handlers | |
if logger.hasHandlers(): | |
logger.handlers.clear() | |
handler = logging.StreamHandler() | |
# Added a more detailed formatter | |
formatter = logging.Formatter( | |
"%(asctime)s - %(name)s - [%(levelname)s] - [%(chat_id)s] - %(funcName)s:%(lineno)d - %(message)s" | |
) | |
handler.setFormatter(formatter) | |
class ContextFilter(logging.Filter): | |
"""Injects contextual information into log records.""" | |
def filter(self, record): | |
record.chat_id = ThreadContext.get_context().get("chat_id", "NO_CONTEXT") | |
return True | |
handler.addFilter(ContextFilter()) | |
logger.addHandler(handler) | |
return logger | |
# --- Thread Context Management --------------------------------------------- | |
class ThreadContext: | |
"""Manages thread-local context for chat and message IDs.""" | |
_context = threading.local() | |
def set_context(cls, chat_id: str, message_id: str): | |
"""Sets the context for the current thread.""" | |
cls._context.chat_id = chat_id | |
cls._context.message_id = message_id | |
def get_context(cls) -> Dict[str, Optional[str]]: | |
"""Retrieves the context for the current thread.""" | |
return { | |
"chat_id": getattr(cls._context, "chat_id", None), | |
"message_id": getattr(cls._context, "message_id", None), | |
} | |
# --- Conversation History ------------------------------------------------- | |
class ConversationManager: | |
"""Manages conversation history for each chat.""" | |
def __init__(self, max_size: int): | |
self.history = defaultdict(lambda: deque(maxlen=max_size)) | |
def add_user_message(self, chat_id: str, message: str): | |
self.history[chat_id].append(f"User: {message}") | |
def add_bot_message(self, chat_id: str, message: str): | |
self.history[chat_id].append(f"Assistant: {message}") | |
def get_history_text(self, chat_id: str) -> str: | |
return "\n".join(self.history[chat_id]) | |
def clear_history(self, chat_id: str): | |
self.history[chat_id].clear() | |
# --- Green-API Client ----------------------------------------------------- | |
class GreenApiClient: | |
"""A client for interacting with the Green-API for WhatsApp.""" | |
def __init__(self, config: BotConfig, logger: logging.Logger): | |
self.config = config | |
self.logger = logger | |
self.session = requests.Session() | |
self.base_url = ( | |
f"{self.config.GREEN_API_URL}/waInstance" | |
f"{self.config.GREEN_API_ID_INSTANCE}" | |
) | |
def _request(self, method: str, endpoint: str, **kwargs) -> Optional[Dict[str, Any]]: | |
"""Makes a request to the Green-API with retries.""" | |
url = f"{self.base_url}/{endpoint}/{self.config.GREEN_API_TOKEN}" | |
for attempt in range(3): | |
try: | |
self.logger.debug(f"Sending API request to {url} with payload: {kwargs.get('json', kwargs.get('data'))}") | |
response = self.session.request(method, url, timeout=20, **kwargs) | |
response.raise_for_status() | |
return response.json() | |
except requests.RequestException as e: | |
self.logger.warning( | |
f"API request to {endpoint} failed (attempt {attempt + 1}): {e}" | |
) | |
self.logger.error(f"API request to {endpoint} failed after all retries.") | |
return None | |
def send_message(self, chat_id: str, text: str, quoted_message_id: str = None): | |
payload = {"chatId": chat_id, "message": text} | |
if quoted_message_id: | |
payload["quotedMessageId"] = quoted_message_id | |
return self._request("POST", "sendMessage", json=payload) | |
def send_file(self, chat_id: str, file_path: str, caption: str = "", quoted_message_id: str = None): | |
"""Uploads and sends a file (image or audio).""" | |
filename = os.path.basename(file_path) | |
payload = {"chatId": chat_id, "caption": caption} | |
if quoted_message_id: | |
payload["quotedMessageId"] = quoted_message_id | |
with open(file_path, "rb") as f: | |
files = {"file": (filename, f)} | |
return self._request("POST", "sendFileByUpload", data=payload, files=files) | |
def download_file(self, url: str) -> Optional[bytes]: | |
"""Downloads a file from a given URL.""" | |
try: | |
response = self.session.get(url, timeout=30) | |
response.raise_for_status() | |
return response.content | |
except requests.RequestException as e: | |
self.logger.error(f"Failed to download file from {url}: {e}") | |
return None | |
# --- Pydantic Models for Intent Recognition -------------------------------- | |
class BaseIntent(BaseModel): | |
action: str | |
class SummarizeIntent(BaseIntent): action: Literal["summarize"]; text: str | |
class TranslateIntent(BaseIntent): action: Literal["translate"]; lang: str; text: str | |
class JokeIntent(BaseIntent): action: Literal["joke"] | |
class WeatherIntent(BaseIntent): action: Literal["weather"]; location: str | |
class InspireIntent(BaseIntent): action: Literal["inspire"] | |
class MemeIntent(BaseIntent): action: Literal["meme"]; text: str | |
class EditImageIntent(BaseIntent): action: Literal["edit_image"]; prompt: str | |
class GenerateImageIntent(BaseModel): | |
action: Literal["generate_image"] | |
prompt: str | |
count: int = Field(default=1, ge=1, le=10) | |
width: Optional[int] = Field(default=None, ge=512, le=2048) | |
height: Optional[int] = Field(default=None, ge=512, le=2048) | |
class SendTextIntent(BaseIntent): | |
action: Literal["send_text"] | |
message: str | |
# --- Intent Router -------------------------------------------------------- | |
class IntentRouter: | |
"""Recognizes user intent using an LLM and routes to appropriate actions.""" | |
INTENT_MODELS = [ | |
SummarizeIntent, TranslateIntent, JokeIntent, WeatherIntent, | |
InspireIntent, MemeIntent, GenerateImageIntent, EditImageIntent, SendTextIntent | |
] | |
def __init__(self, conv_manager: ConversationManager, logger: logging.Logger): | |
self.conv_manager = conv_manager | |
self.logger = logger | |
def get_intent(self, user_input: str, chat_id: str) -> BaseIntent: | |
history_text = self.conv_manager.get_history_text(chat_id) | |
system_prompt = self._build_system_prompt(history_text, user_input) | |
try: | |
raw_response = generate_llm(system_prompt) | |
except LLMBadRequestError: | |
self.logger.warning(f"LLM request failed due to bad request for chat {chat_id}. Clearing history.") | |
self.conv_manager.clear_history(chat_id) | |
return SendTextIntent(action="send_text", message="Oops! Let's start fresh! π") | |
return self._parse_response(raw_response) | |
def _build_system_prompt(self, history: str, user_input: str) -> str: | |
return ( | |
"You are a function dispatcher. You only invoke functions by returning a single JSON object.\n" | |
"Available functions:\n" | |
"- summarize(text): Summarize given text\n" | |
"- translate(lang, text): Translate text to a language\n" | |
"- joke(): Tell a random joke\n" | |
"- weather(location): Get weather for a location\n" | |
"- inspire(): Get an inspirational quote\n" | |
"- meme(text): Generate a meme from text\n" | |
"- generate_image(prompt, count, width, height): Generate images\n" | |
"- edit_image(prompt): Edit an image (requires replying to an image)\n" | |
"- send_text(message): Send a plain text response\n\n" | |
"Return only raw JSON. Examples:\n" | |
'{"action":"generate_image","prompt":"a red fox","count":2}\n' | |
'{"action":"edit_image","prompt":"make the sky purple"}\n' | |
'{"action":"send_text","message":"Hello there!"}\n\n' | |
f"Conversation history:\n{history}\n\n" | |
f"Current message: User: {user_input}" | |
) | |
def _parse_response(self, raw_response: str) -> BaseIntent: | |
try: | |
# Clean the response to ensure it's valid JSON | |
cleaned_response = raw_response.strip().replace("`json", "").replace("`", "") | |
parsed = json.loads(cleaned_response) | |
for model in self.INTENT_MODELS: | |
try: | |
return model.model_validate(parsed) | |
except ValidationError: | |
continue | |
except json.JSONDecodeError: | |
self.logger.warning(f"Could not decode LLM response to JSON: {raw_response}") | |
# Fallback for non-JSON or unparsable responses | |
return SendTextIntent(action="send_text", message=raw_response) | |
# --- Main Application Class ------------------------------------------------ | |
class WhatsAppBot: | |
def __init__(self, config: BotConfig): | |
self.config = config | |
self.logger = LoggerSetup.setup(config.LOG_LEVEL) | |
self.api_client = GreenApiClient(config, self.logger) | |
self.conv_manager = ConversationManager(config.MAX_HISTORY_SIZE) | |
self.intent_router = IntentRouter(self.conv_manager, self.logger) | |
self.task_queue = queue.Queue() | |
# Cache to store recent image message IDs and their download URLs | |
self.image_cache = defaultdict(lambda: deque(maxlen=50)) | |
self.fastapi_app = FastAPI(title="WhatsApp Eve Bot", version="2.0.0") | |
self._setup_routes() | |
self._start_workers() | |
def _setup_routes(self): | |
async def webhook(request: Request): | |
if request.headers.get("Authorization") != f"Bearer {self.config.WEBHOOK_AUTH_TOKEN}": | |
self.logger.warning("Unauthorized webhook access attempt.") | |
raise HTTPException(403, "Unauthorized") | |
try: | |
payload = await request.json() | |
# Log the entire incoming request payload for debugging | |
self.logger.debug(f"Incoming webhook payload: {json.dumps(payload, indent=2)}") | |
# Process valid incoming messages in the background | |
if payload.get("typeWebhook") == "incomingMessageReceived": | |
executor.submit(self._process_incoming_message, payload) | |
else: | |
self.logger.info(f"Received non-message webhook type: {payload.get('typeWebhook')}") | |
except json.JSONDecodeError: | |
self.logger.error("Failed to decode JSON from webhook request.") | |
raise HTTPException(400, "Invalid JSON payload.") | |
return JSONResponse(content={"status": "received"}) | |
def health_check(): | |
return JSONResponse(content={"status": "healthy"}) | |
def _start_workers(self): | |
for i in range(self.config.WORKER_THREADS): | |
threading.Thread(target=self._worker, name=f"Worker-{i}", daemon=True).start() | |
self.logger.info(f"Started {self.config.WORKER_THREADS} worker threads.") | |
def _worker(self): | |
"""Worker thread to process tasks from the queue.""" | |
while True: | |
task = self.task_queue.get() | |
try: | |
# Set context for the worker thread | |
ThreadContext.set_context(task["chat_id"], task["message_id"]) | |
handler = getattr(self, f"_task_{task['type']}", None) | |
if handler: | |
self.logger.debug(f"Processing task: {task['type']} for chat {task['chat_id']}") | |
handler(task) | |
else: | |
self.logger.warning(f"Unknown task type received: {task['type']}") | |
except Exception as e: | |
self.logger.error(f"Error processing task {task.get('type', 'N/A')}: {e}", exc_info=True) | |
finally: | |
self.task_queue.task_done() | |
def _process_incoming_message(self, payload: Dict[str, Any]): | |
"""Main logic for handling an incoming message payload.""" | |
try: | |
chat_id = payload["senderData"]["chatId"] | |
message_id = payload["idMessage"] | |
# ** CHAT RESTRICTION LOGIC ** | |
if chat_id not in self.config.ALLOWED_CHATS: | |
self.logger.warning(f"Ignoring message from unauthorized chat ID: {chat_id}") | |
return | |
ThreadContext.set_context(chat_id, message_id) | |
message_data = payload.get("messageData", {}) | |
type_message = message_data.get("typeMessage") | |
# --- Improved message processing logic --- | |
# 1. Handle image caching | |
if type_message == "imageMessage": | |
download_url = message_data.get("fileMessageData", {}).get("downloadUrl") | |
if download_url: | |
self.logger.debug(f"Caching image message {message_id} from {chat_id} with URL: {download_url}") | |
self.image_cache[chat_id].append((message_id, download_url)) | |
# 2. Extract text from various message types | |
text = "" | |
if type_message == "textMessage": | |
text = message_data.get("textMessageData", {}).get("textMessage", "") | |
elif type_message == "extendedTextMessage": | |
text = message_data.get("extendedTextMessageData", {}).get("text", "") | |
elif type_message == "quotedMessage": | |
text = message_data.get("extendedTextMessageData", {}).get("text", "") | |
elif type_message == "imageMessage": | |
text = message_data.get("fileMessageData", {}).get("caption", "") | |
text = text.strip() | |
if not text: | |
self.logger.debug(f"Received message type '{type_message}' without actionable text from {chat_id}. No action taken.") | |
return | |
# --- End of improved logic --- | |
self.logger.info(f"Processing message from {chat_id}: '{text}'") | |
self.conv_manager.add_user_message(chat_id, text) | |
# Handle direct commands | |
if text.startswith('/'): | |
self._handle_command(chat_id, message_id, text, payload) | |
else: | |
# Handle natural language and replies | |
self._handle_natural_language(chat_id, message_id, text, payload) | |
except KeyError as e: | |
self.logger.error(f"Missing expected key in message payload: {e}. Payload: {payload}") | |
except Exception as e: | |
self.logger.error(f"Failed to process message payload: {e}", exc_info=True) | |
def _handle_command(self, chat_id, message_id, text, payload): | |
"""Processes direct slash commands.""" | |
parts = text.lower().split() | |
command = parts[0] | |
args = text.split(maxsplit=1)[1] if len(parts) > 1 else "" | |
if command == "/help": | |
help_text = ( | |
"*π€ Eve's Command Center:*\n\n" | |
"πΉ `/help` - Show this help message\n" | |
"πΉ `/gen <prompt>` - Generate an image\n" | |
"πΉ `/edit <prompt>` - Reply to an image to edit it\n" | |
"πΉ `/joke` - Get a random joke\n" | |
"πΉ `/inspire` - Receive an inspirational quote\n" | |
"πΉ `/weather <location>` - Check the weather\n\n" | |
"You can also just chat with me naturally!" | |
) | |
self.api_client.send_message(chat_id, help_text, message_id) | |
elif command == "/gen": | |
self.task_queue.put({"type": "generate_image", "chat_id": chat_id, "message_id": message_id, "prompt": args}) | |
elif command == "/edit": | |
self._dispatch_edit_image(chat_id, message_id, args, payload) | |
elif command == "/joke": | |
self.task_queue.put({"type": "joke", "chat_id": chat_id, "message_id": message_id}) | |
elif command == "/inspire": | |
self.task_queue.put({"type": "inspire", "chat_id": chat_id, "message_id": message_id}) | |
elif command == "/weather": | |
self.task_queue.put({"type": "weather", "chat_id": chat_id, "message_id": message_id, "location": args}) | |
else: | |
self.api_client.send_message(chat_id, "Unknown command. Type /help for options.", message_id) | |
def _handle_natural_language(self, chat_id, message_id, text, payload): | |
"""Processes natural language using the intent router.""" | |
intent = self.intent_router.get_intent(text, chat_id) | |
task_data = { | |
"chat_id": chat_id, | |
"message_id": message_id, | |
**intent.model_dump() | |
} | |
if intent.action == "edit_image": | |
# This action needs the original payload to find the replied-to image | |
self._dispatch_edit_image(chat_id, message_id, intent.prompt, payload) | |
elif hasattr(self, f"_task_{intent.action}"): | |
# Enqueue the task with its type and all necessary data | |
self.task_queue.put({"type": intent.action, **task_data}) | |
else: | |
self.logger.warning(f"No handler found for intent action: {intent.action}") | |
self.api_client.send_message(chat_id, "Sorry, I'm not sure how to handle that.", message_id) | |
def _dispatch_edit_image(self, chat_id, message_id, prompt, payload): | |
"""Checks for a replied-to image and dispatches the edit task.""" | |
message_data = payload.get("messageData", {}) | |
# Ensure we are dealing with a reply | |
if message_data.get("typeMessage") != "quotedMessage": | |
self.api_client.send_message(chat_id, "To edit an image, please reply to an image with your instructions.", message_id) | |
return | |
quoted_info = message_data.get("quotedMessage") | |
if not quoted_info or quoted_info.get("typeMessage") != "imageMessage": | |
self.api_client.send_message(chat_id, "You must reply to an image to edit it.", message_id) | |
return | |
# Get the ID of the message being replied to | |
quoted_message_id = quoted_info.get("stanzaId") | |
if not quoted_message_id: | |
self.logger.error(f"Could not find stanzaId in quoted message: {quoted_info}") | |
self.api_client.send_message(chat_id, "Sorry, I couldn't identify which image to edit.", message_id) | |
return | |
# Find the downloadUrl from our cache | |
download_url = None | |
if chat_id in self.image_cache: | |
# Search the cache for the matching message ID | |
for msg_id, url in self.image_cache[chat_id]: | |
if msg_id == quoted_message_id: | |
download_url = url | |
break | |
if not download_url: | |
self.logger.warning(f"Could not find download URL for message ID {quoted_message_id} in cache for chat {chat_id}.") | |
self.api_client.send_message(chat_id, "I couldn't find the original image. It might be too old. Please send it again before editing.", message_id) | |
return | |
self.logger.info(f"Found cached image URL for message {quoted_message_id}. Dispatching edit task.") | |
self.task_queue.put({ | |
"type": "edit_image", | |
"chat_id": chat_id, | |
"message_id": message_id, | |
"prompt": prompt, | |
"download_url": download_url | |
}) | |
# --- Task Handler Methods --- | |
def _task_send_text(self, task: Dict[str, Any]): | |
chat_id, message_id, message = task["chat_id"], task["message_id"], task["message"] | |
self.api_client.send_message(chat_id, message, message_id) | |
self.conv_manager.add_bot_message(chat_id, message) | |
# Asynchronously generate a voice reply for the sent text | |
self.task_queue.put({"type": "voice_reply", "chat_id": chat_id, "message_id": message_id, "text": message}) | |
def _task_generate_image(self, task: Dict[str, Any]): | |
chat_id, mid, prompt, count = task["chat_id"], task["message_id"], task["prompt"], task.get("count", 1) | |
self.api_client.send_message(chat_id, f"π¨ Generating {count} image(s) for: \"{prompt}\"...", mid) | |
for i in range(count): | |
try: | |
_, path, _, url = generate_image(prompt, mid, str(i), self.config.IMAGE_DIR, width=task.get("width"), height=task.get("height")) | |
caption = f"β¨ Image {i+1}/{count}: {prompt}" | |
self.api_client.send_file(chat_id, path, caption, mid) | |
os.remove(path) | |
except Exception as e: | |
self.logger.error(f"Image generation {i+1} failed: {e}", exc_info=True) | |
self.api_client.send_message(chat_id, f"π’ Failed to generate image {i+1}.", mid) | |
def _task_edit_image(self, task: Dict[str, Any]): | |
chat_id, mid, prompt, url = task["chat_id"], task["message_id"], task["prompt"], task["download_url"] | |
self.api_client.send_message(chat_id, f"π¨ Editing image with prompt: \"{prompt}\"...", mid) | |
input_path, output_path = None, None | |
try: | |
image_data = self.api_client.download_file(url) | |
if not image_data: | |
raise ValueError("Failed to download image.") | |
os.makedirs(self.config.TEMP_DIR, exist_ok=True) | |
input_path = os.path.join(self.config.TEMP_DIR, f"input_{mid}.jpg") | |
output_path = os.path.join(self.config.TEMP_DIR, f"output_{mid}.jpg") | |
with open(input_path, 'wb') as f: | |
f.write(image_data) | |
flux_kontext_lib.generate_image(prompt, input_path, download_path=output_path) | |
if os.path.exists(output_path): | |
caption = f"β¨ Edited: {prompt}" | |
self.api_client.send_file(chat_id, output_path, caption, mid) | |
else: | |
raise ValueError("Edited image file not found.") | |
except Exception as e: | |
self.logger.error(f"Image editing task failed: {e}", exc_info=True) | |
self.api_client.send_message(chat_id, "π’ Sorry, I failed to edit the image.", mid) | |
finally: | |
for path in [input_path, output_path]: | |
if path and os.path.exists(path): | |
os.remove(path) | |
def _task_voice_reply(self, task: Dict[str, Any]): | |
text = task["text"] | |
prompt = f"Say this in a friendly, playful, and slightly clumsy-cute way: {text}" | |
try: | |
result = generate_voice_reply(prompt, model="openai-audio", voice="coral", audio_dir=self.config.AUDIO_DIR) | |
if result and result[0]: | |
path, _ = result | |
self.api_client.send_file(task["chat_id"], path, quoted_message_id=task["message_id"]) | |
os.remove(path) | |
except Exception as e: | |
self.logger.warning(f"Voice reply generation failed: {e}", exc_info=True) | |
def _task_joke(self, task: Dict[str, Any]): | |
try: | |
j = requests.get("https://official-joke-api.appspot.com/random_joke", timeout=5).json() | |
joke = f"{j['setup']}\n\n{j['punchline']}" | |
except Exception: | |
self.logger.warning("Joke API failed, falling back to LLM.") | |
joke = generate_llm("Tell me a short, clean joke.") | |
self._task_send_text({"type": "send_text", **task, "message": f"π {joke}"}) | |
def _task_inspire(self, task: Dict[str, Any]): | |
quote = generate_llm("Give me a unique, short, uplifting inspirational quote with attribution.") | |
self._task_send_text({"type": "send_text", **task, "message": f"β¨ {quote}"}) | |
def _task_weather(self, task: Dict[str, Any]): | |
location = task.get("location") | |
if not location: | |
self.api_client.send_message(task["chat_id"], "Please provide a location for the weather.", task["message_id"]) | |
return | |
try: | |
raw = requests.get(f"http://wttr.in/{location.replace(' ', '+')}?format=4", timeout=10).text | |
report = generate_llm(f"Create a friendly weather report in Celsius from this data:\n\n{raw}") | |
self._task_send_text({"type": "send_text", **task, "message": f"π€οΈ Weather for {location}:\n{report}"}) | |
except Exception as e: | |
self.logger.error(f"Weather task failed: {e}", exc_info=True) | |
self.api_client.send_message(task["chat_id"], "Sorry, I couldn't get the weather.", task["message_id"]) | |
def run(self): | |
"""Starts the bot and FastAPI server.""" | |
self.logger.info("Starting Eve WhatsApp Bot...") | |
for d in [self.config.IMAGE_DIR, self.config.AUDIO_DIR, self.config.TEMP_DIR]: | |
os.makedirs(d, exist_ok=True) | |
self.logger.debug(f"Ensured directory exists: {d}") | |
# Send a startup message to one of the allowed chats to confirm it's online | |
self.api_client.send_message( | |
"[email protected]", | |
"π Eve is online and ready to help! Type /help to see commands." | |
) | |
uvicorn.run(self.fastapi_app, host="0.0.0.0", port=7860, log_config=None) | |
if __name__ == "__main__": | |
try: | |
bot_config = BotConfig() | |
executor = ThreadPoolExecutor(max_workers=bot_config.WORKER_THREADS * 2) | |
bot = WhatsAppBot(bot_config) | |
bot.run() | |
except ValueError as e: | |
# Catch config validation errors | |
logging.basicConfig() | |
logging.getLogger().critical(f"β CONFIGURATION ERROR: {e}") | |
except KeyboardInterrupt: | |
print("\nπ Bot stopped by user.") | |
except Exception as e: | |
logging.basicConfig() | |
logging.getLogger().critical(f"β A fatal error occurred: {e}", exc_info=True) | |