Spaces:
Running
Running
| import os | |
| import threading | |
| import requests | |
| from flask import Flask, request, jsonify | |
| from llm import generate_llm | |
| from sd import generate_sd | |
| GREEN_API_URL = os.getenv("GREEN_API_URL") | |
| GREEN_API_MEDIA_URL = os.getenv("GREEN_API_MEDIA_URL", "https://api.green-api.com") | |
| GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN") | |
| GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE") | |
| WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN") | |
| PORT = 7860 | |
| if not all([GREEN_API_URL, GREEN_API_TOKEN, GREEN_API_ID_INSTANCE, WEBHOOK_AUTH_TOKEN]): | |
| raise ValueError("Environment variables are not set properly") | |
| app = Flask(__name__) | |
| def send_message(message_id, to_number, message, retries=3): | |
| """ | |
| Send a text message using Green API with retry logic. | |
| """ | |
| if to_number.endswith('@g.us'): | |
| chat_id = to_number | |
| else: | |
| chat_id = to_number | |
| url = f"{GREEN_API_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendMessage/{GREEN_API_TOKEN}" | |
| payload = { | |
| "chatId": chat_id, | |
| "message": message, | |
| "quotedMessageId": message_id, | |
| } | |
| for attempt in range(retries): | |
| try: | |
| response = requests.post(url, json=payload) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.RequestException as e: | |
| if attempt < retries - 1: | |
| continue | |
| return {"error": str(e)} | |
| def send_image(message_id, to_number, image_path, retries=3): | |
| """ | |
| Send an image using Green API with retry logic. | |
| """ | |
| if to_number.endswith('@g.us'): | |
| chat_id = to_number | |
| else: | |
| chat_id = to_number | |
| url = f"{GREEN_API_MEDIA_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendFileByUpload/{GREEN_API_TOKEN}" | |
| payload = {'chatId': chat_id, 'caption': 'Here you go!', 'quotedMessageId': message_id} | |
| files = [('file', ('image.jpg', open(image_path, 'rb'), 'image/jpeg'))] | |
| for attempt in range(retries): | |
| try: | |
| response = requests.post(url, data=payload, files=files) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.RequestException as e: | |
| if attempt < retries - 1: | |
| continue | |
| return {"error": str(e)} | |
| def response_text(message_id, chat_id, prompt): | |
| """ | |
| Generate a response using the LLM and send it to the user. | |
| """ | |
| try: | |
| msg = generate_llm(prompt) | |
| send_message(message_id, chat_id, msg) | |
| except Exception as e: | |
| send_message(message_id, chat_id, "There was an error processing your request.") | |
| def handle_image_generation(message_id, chat_id, prompt): | |
| """ | |
| Generate an image from the provided prompt and send it to the user. | |
| """ | |
| try: | |
| image_data, image_path = generate_sd(prompt) | |
| if image_data: | |
| send_image(message_id, chat_id, image_path) | |
| else: | |
| send_message(message_id, chat_id, "Failed to generate image. Please try again later.") | |
| except Exception as e: | |
| send_message(message_id, chat_id, "There was an error generating the image. Please try again later.") | |
| def index(): | |
| """ | |
| Basic endpoint to check if the script is running. | |
| """ | |
| return "Server is running!" | |
| def whatsapp_webhook(): | |
| """ | |
| Handle incoming WhatsApp messages. | |
| """ | |
| data = request.get_json() | |
| auth_header = request.headers.get('Authorization', '').strip() | |
| if auth_header != f"Bearer {WEBHOOK_AUTH_TOKEN}": | |
| return jsonify({"error": "Unauthorized"}), 403 | |
| if data.get('typeWebhook') != 'incomingMessageReceived': | |
| return jsonify(success=True) | |
| try: | |
| chat_id = data['senderData']['chatId'] | |
| message_id = data['idMessage'] | |
| message_data = data.get('messageData', {}) | |
| if 'textMessageData' in message_data: | |
| body = message_data['textMessageData']['textMessage'].strip() | |
| elif 'extendedTextMessageData' in message_data: | |
| body = message_data['extendedTextMessageData']['text'].strip() | |
| else: | |
| return jsonify(success=True) | |
| except KeyError as e: | |
| return jsonify({"error": f"Missing key in data: {e}"}), 200 | |
| if body.lower().startswith('/imagine'): | |
| prompt = body.replace('/imagine', '').strip() | |
| if not prompt: | |
| send_message(message_id, chat_id, "Please provide a prompt after /imagine.") | |
| else: | |
| send_message(message_id, chat_id, "Generating...") | |
| threading.Thread(target=handle_image_generation, args=(message_id, chat_id, prompt)).start() | |
| else: | |
| threading.Thread(target=response_text, args=(message_id, chat_id, body)).start() | |
| return jsonify(success=True) | |
| if __name__ == '__main__': | |
| app.run(debug=True, port=PORT, host="0.0.0.0") | |