Spaces:
Running
Running
| import os | |
| import threading | |
| import requests | |
| from fastapi import FastAPI, Request, HTTPException | |
| from fastapi.responses import PlainTextResponse, JSONResponse | |
| from FLUX import generate_image | |
| from VoiceReply import generate_voice_reply | |
| from llm import generate_llm | |
| GREEN_API_URL = os.getenv("GREEN_API_URL") | |
| GREEN_API_MEDIA_URL = os.getenv("GREEN_API_MEDIA_URL", "https://api.green-api.com") | |
| GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN") | |
| GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE") | |
| WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN") | |
| PORT = 7860 | |
| image_dir = "/tmp/images" | |
| audio_dir = "/tmp/audio" | |
| if not all([GREEN_API_URL, GREEN_API_TOKEN, GREEN_API_ID_INSTANCE, WEBHOOK_AUTH_TOKEN]): | |
| raise ValueError("Environment variables are not set properly") | |
| app = FastAPI() | |
| def send_message(message_id, to_number, message, retries=3): | |
| if to_number.endswith('@g.us'): | |
| chat_id = to_number | |
| else: | |
| chat_id = to_number | |
| url = f"{GREEN_API_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendMessage/{GREEN_API_TOKEN}" | |
| payload = { | |
| "chatId": chat_id, | |
| "message": message, | |
| "quotedMessageId": message_id, | |
| } | |
| for attempt in range(retries): | |
| try: | |
| response = requests.post(url, json=payload) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.RequestException as e: | |
| if attempt < retries - 1: | |
| continue | |
| return {"error": str(e)} | |
| def send_image(message_id, to_number, image_path, retries=3): | |
| if to_number.endswith('@g.us'): | |
| chat_id = to_number | |
| else: | |
| chat_id = to_number | |
| url = f"{GREEN_API_MEDIA_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendFileByUpload/{GREEN_API_TOKEN}" | |
| payload = {'chatId': chat_id, 'caption': 'Here you go!', 'quotedMessageId': message_id} | |
| files = [('file', ('image.jpg', open(image_path, 'rb'), 'image/jpeg'))] | |
| for attempt in range(retries): | |
| try: | |
| response = requests.post(url, data=payload, files=files) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.RequestException as e: | |
| if attempt < retries - 1: | |
| continue | |
| return {"error": str(e)} | |
| def send_audio(message_id, to_number, audio_path, retries=3): | |
| """ | |
| Send an audio file using the Green API similar to send_image. | |
| """ | |
| if to_number.endswith('@g.us'): | |
| chat_id = to_number | |
| else: | |
| chat_id = to_number | |
| url = f"{GREEN_API_MEDIA_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendFileByUpload/{GREEN_API_TOKEN}" | |
| payload = {'chatId': chat_id, 'caption': 'Here is your voice reply!', 'quotedMessageId': message_id} | |
| files = [('file', ('audio.mp3', open(audio_path, 'rb'), 'audio/mpeg'))] | |
| for attempt in range(retries): | |
| try: | |
| response = requests.post(url, data=payload, files=files) | |
| response.raise_for_status() | |
| return response.json() | |
| except requests.RequestException as e: | |
| if attempt < retries - 1: | |
| continue | |
| return {"error": str(e)} | |
| def response_text(message_id, chat_id, prompt): | |
| try: | |
| msg = generate_llm(prompt) | |
| send_message(message_id, chat_id, msg) | |
| except Exception as e: | |
| send_message(message_id, chat_id, "There was an error processing your request.") | |
| def response_audio(message_id, chat_id, prompt): | |
| try: | |
| audio_file_path, audio_data = generate_voice_reply(prompt, model="openai-audio", voice="coral", audio_dir=audio_dir) | |
| if audio_file_path: | |
| send_audio(message_id, chat_id, audio_file_path) | |
| os.remove(audio_file_path) # Clean up the file after sending | |
| else: | |
| response_text(message_id, chat_id, prompt=prompt) | |
| except Exception as e: | |
| send_message(message_id, chat_id, "There was an error generating the audio. Please try again later.") | |
| def handle_image_generation(message_id, chat_id, prompt): | |
| try: | |
| image, image_path, returned_prompt, image_url = generate_image(prompt, message_id, message_id, image_dir) | |
| if image: | |
| send_image(message_id, chat_id, image_path) | |
| send_message(message_id, chat_id, f"Image generated successfully! You can view it here: {image_url}. \nPrompt: > _{returned_prompt}_") | |
| else: | |
| send_message(message_id, chat_id, "Failed to generate image. Please try again later.") | |
| except Exception as e: | |
| send_message(message_id, chat_id, "There was an error generating the image. Please try again later.") | |
| def index(): | |
| return "Server is running!" | |
| async def whatsapp_webhook(request: Request): | |
| auth_header = request.headers.get('Authorization', '').strip() | |
| if auth_header != f"Bearer {WEBHOOK_AUTH_TOKEN}": | |
| raise HTTPException(status_code=403, detail="Unauthorized") | |
| try: | |
| data = await request.json() | |
| except Exception: | |
| return JSONResponse(content={"error": "Invalid JSON"}, status_code=400) | |
| if data.get('typeWebhook') != 'incomingMessageReceived': | |
| return {"success": True} | |
| try: | |
| chat_id = data['senderData']['chatId'] | |
| message_id = data['idMessage'] | |
| message_data = data.get('messageData', {}) | |
| if 'textMessageData' in message_data: | |
| body = message_data['textMessageData']['textMessage'].strip() | |
| elif 'extendedTextMessageData' in message_data: | |
| body = message_data['extendedTextMessageData']['text'].strip() | |
| else: | |
| return {"success": True} | |
| except KeyError as e: | |
| return JSONResponse(content={"error": f"Missing key in data: {e}"}, status_code=200) | |
| if body.lower().startswith('/imagine'): | |
| prompt = body.replace('/imagine', '').strip() | |
| if not prompt: | |
| send_message(message_id, chat_id, "Please provide a prompt after /imagine.") | |
| else: | |
| send_message(message_id, chat_id, "Generating...") | |
| threading.Thread(target=handle_image_generation, args=(message_id, chat_id, prompt)).start() | |
| else: | |
| threading.Thread(target=response_text, args=(message_id, chat_id, body)).start() | |
| return {"success": True} | |
| if __name__ == '__main__': | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=PORT, debug=True) | |