Spaces:
Running
Running
import os | |
import threading | |
import requests | |
import logging | |
from fastapi import FastAPI, Request, HTTPException | |
from fastapi.responses import PlainTextResponse, JSONResponse | |
from FLUX import generate_image | |
from VoiceReply import generate_voice_reply | |
from llm import generate_llm | |
# Configure logging for debugging | |
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s") | |
GREEN_API_URL = os.getenv("GREEN_API_URL") | |
GREEN_API_MEDIA_URL = os.getenv("GREEN_API_MEDIA_URL", "https://api.green-api.com") | |
GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN") | |
GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE") | |
WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN") | |
PORT = 7860 | |
image_dir = "/tmp/images" | |
audio_dir = "/tmp/audio" | |
if not all([GREEN_API_URL, GREEN_API_TOKEN, GREEN_API_ID_INSTANCE, WEBHOOK_AUTH_TOKEN]): | |
raise ValueError("Environment variables are not set properly") | |
app = FastAPI() | |
def send_message(message_id, to_number, message, retries=3): | |
chat_id = to_number if to_number.endswith('@g.us') else to_number | |
url = f"{GREEN_API_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendMessage/{GREEN_API_TOKEN}" | |
payload = { | |
"chatId": chat_id, | |
"message": message, | |
"quotedMessageId": message_id, | |
} | |
for attempt in range(retries): | |
try: | |
response = requests.post(url, json=payload) | |
response.raise_for_status() | |
return response.json() | |
except requests.RequestException as e: | |
if attempt < retries - 1: | |
continue | |
return {"error": str(e)} | |
def send_image(message_id, to_number, image_path, retries=3): | |
chat_id = to_number if to_number.endswith('@g.us') else to_number | |
url = f"{GREEN_API_MEDIA_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendFileByUpload/{GREEN_API_TOKEN}" | |
payload = {'chatId': chat_id, 'caption': 'Here you go!', 'quotedMessageId': message_id} | |
files = [('file', ('image.jpg', open(image_path, 'rb'), 'image/jpeg'))] | |
for attempt in range(retries): | |
try: | |
response = requests.post(url, data=payload, files=files) | |
response.raise_for_status() | |
return response.json() | |
except requests.RequestException as e: | |
if attempt < retries - 1: | |
continue | |
return {"error": str(e)} | |
def send_audio(message_id, to_number, audio_path, retries=3): | |
""" | |
Send an audio file using the Green API similar to send_image. | |
""" | |
logging.debug("Entering send_audio") | |
chat_id = to_number if to_number.endswith('@g.us') else to_number | |
if not os.path.exists(audio_path): | |
logging.debug(f"Audio file does not exist: {audio_path}") | |
url = f"{GREEN_API_MEDIA_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendFileByUpload/{GREEN_API_TOKEN}" | |
payload = {'chatId': chat_id, 'caption': 'Here is your voice reply!', 'quotedMessageId': message_id} | |
try: | |
with open(audio_path, 'rb') as audio_file: | |
files = [('file', ('audio.mp3', audio_file, 'audio/mpeg'))] | |
for attempt in range(retries): | |
try: | |
logging.debug(f"Attempt {attempt + 1} to send audio") | |
response = requests.post(url, data=payload, files=files) | |
logging.debug(f"Response from send_audio: {response.status_code} {response.text}") | |
response.raise_for_status() | |
return response.json() | |
except requests.RequestException as e: | |
logging.debug(f"Exception on attempt {attempt + 1} in send_audio: {e}") | |
if attempt < retries - 1: | |
continue | |
return {"error": str(e)} | |
except Exception as e: | |
logging.debug(f"Failed to open audio file: {e}") | |
return {"error": str(e)} | |
def response_text(message_id, chat_id, prompt): | |
try: | |
msg = generate_llm(prompt) | |
send_message(message_id, chat_id, msg) | |
except Exception as e: | |
send_message(message_id, chat_id, "There was an error processing your request.") | |
def response_audio(message_id, chat_id, prompt): | |
logging.debug("Entering response_audio with prompt: %s", prompt) | |
try: | |
result = generate_voice_reply(prompt, model="openai-audio", voice="coral", audio_dir=audio_dir) | |
logging.debug("Result from generate_voice_reply: %s", result) | |
# Check result and also ensure the audio_file_path is not None or empty | |
if result and result[0]: | |
audio_file_path, audio_data = result | |
logging.debug("Audio file path generated: %s", audio_file_path) | |
send_result = send_audio(message_id, chat_id, audio_file_path) | |
logging.debug("Result from send_audio: %s", send_result) | |
if os.path.exists(audio_file_path): | |
os.remove(audio_file_path) # Clean up the file after sending | |
logging.debug("Removed audio file: %s", audio_file_path) | |
else: | |
logging.debug("generate_voice_reply returned None or empty audio file path, falling back to response_text") | |
response_text(message_id, chat_id, prompt) | |
except Exception as e: | |
logging.debug("Exception in response_audio: %s", e) | |
send_message(message_id, chat_id, "There was an error generating the audio. Please try again later.") | |
def handle_image_generation(message_id, chat_id, prompt): | |
try: | |
image, image_path, returned_prompt, image_url = generate_image(prompt, message_id, message_id, image_dir) | |
if image: | |
send_image(message_id, chat_id, image_path) | |
send_message( | |
message_id, | |
chat_id, | |
f"Image generated successfully! You can view it here: {image_url}.\n>{chr(8203)} _{returned_prompt}_" | |
) | |
else: | |
send_message(message_id, chat_id, "Failed to generate image. Please try again later.") | |
except Exception as e: | |
send_message(message_id, chat_id, "There was an error generating the image. Please try again later.") | |
def index(): | |
return "Server is running!" | |
async def whatsapp_webhook(request: Request): | |
auth_header = request.headers.get('Authorization', '').strip() | |
if auth_header != f"Bearer {WEBHOOK_AUTH_TOKEN}": | |
raise HTTPException(status_code=403, detail="Unauthorized") | |
try: | |
data = await request.json() | |
except Exception: | |
return JSONResponse(content={"error": "Invalid JSON"}, status_code=400) | |
if data.get('typeWebhook') != 'incomingMessageReceived': | |
return {"success": True} | |
try: | |
chat_id = data['senderData']['chatId'] | |
message_id = data['idMessage'] | |
message_data = data.get('messageData', {}) | |
# Ignore messages that are replies | |
if 'extendedTextMessageData' in message_data: | |
if message_data['extendedTextMessageData'].get('quotedMessageId'): | |
return {"success": True} | |
# If needed, add checks for other message types here | |
if 'textMessageData' in message_data: | |
body = message_data['textMessageData']['textMessage'].strip() | |
elif 'extendedTextMessageData' in message_data: | |
body = message_data['extendedTextMessageData']['text'].strip() | |
else: | |
return {"success": True} | |
except KeyError as e: | |
return JSONResponse(content={"error": f"Missing key in data: {e}"}, status_code=200) | |
if body.lower().startswith('/imagine'): | |
prompt = body.replace('/imagine', '').strip() | |
if not prompt: | |
send_message(message_id, chat_id, "Please provide a prompt after /imagine.") | |
else: | |
send_message(message_id, chat_id, "Generating...") | |
threading.Thread(target=handle_image_generation, args=(message_id, chat_id, prompt)).start() | |
else: | |
threading.Thread(target=response_audio, args=(message_id, chat_id, body)).start() | |
return {"success": True} | |
def main(): | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=PORT) | |
if __name__ == '__main__': | |
main() |