eve / app.py
Chandima Prabhath
flux & audio reply
b9724da
raw
history blame
6.4 kB
import os
import threading
import requests
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import PlainTextResponse, JSONResponse
from FLUX import generate_image
from VoiceReply import generate_voice_reply
from llm import generate_llm
GREEN_API_URL = os.getenv("GREEN_API_URL")
GREEN_API_MEDIA_URL = os.getenv("GREEN_API_MEDIA_URL", "https://api.green-api.com")
GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN")
GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE")
WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN")
PORT = 7860
image_dir = "/tmp/images"
audio_dir = "/tmp/audio"
if not all([GREEN_API_URL, GREEN_API_TOKEN, GREEN_API_ID_INSTANCE, WEBHOOK_AUTH_TOKEN]):
raise ValueError("Environment variables are not set properly")
app = FastAPI()
def send_message(message_id, to_number, message, retries=3):
if to_number.endswith('@g.us'):
chat_id = to_number
else:
chat_id = to_number
url = f"{GREEN_API_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendMessage/{GREEN_API_TOKEN}"
payload = {
"chatId": chat_id,
"message": message,
"quotedMessageId": message_id,
}
for attempt in range(retries):
try:
response = requests.post(url, json=payload)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
if attempt < retries - 1:
continue
return {"error": str(e)}
def send_image(message_id, to_number, image_path, retries=3):
if to_number.endswith('@g.us'):
chat_id = to_number
else:
chat_id = to_number
url = f"{GREEN_API_MEDIA_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendFileByUpload/{GREEN_API_TOKEN}"
payload = {'chatId': chat_id, 'caption': 'Here you go!', 'quotedMessageId': message_id}
files = [('file', ('image.jpg', open(image_path, 'rb'), 'image/jpeg'))]
for attempt in range(retries):
try:
response = requests.post(url, data=payload, files=files)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
if attempt < retries - 1:
continue
return {"error": str(e)}
def send_audio(message_id, to_number, audio_path, retries=3):
"""
Send an audio file using the Green API similar to send_image.
"""
if to_number.endswith('@g.us'):
chat_id = to_number
else:
chat_id = to_number
url = f"{GREEN_API_MEDIA_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendFileByUpload/{GREEN_API_TOKEN}"
payload = {'chatId': chat_id, 'caption': 'Here is your voice reply!', 'quotedMessageId': message_id}
files = [('file', ('audio.mp3', open(audio_path, 'rb'), 'audio/mpeg'))]
for attempt in range(retries):
try:
response = requests.post(url, data=payload, files=files)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
if attempt < retries - 1:
continue
return {"error": str(e)}
def response_text(message_id, chat_id, prompt):
try:
msg = generate_llm(prompt)
send_message(message_id, chat_id, msg)
except Exception as e:
send_message(message_id, chat_id, "There was an error processing your request.")
def response_audio(message_id, chat_id, prompt):
try:
audio_file_path, audio_data = generate_voice_reply(prompt, model="openai-audio", voice="coral", audio_dir=audio_dir)
if audio_file_path:
send_audio(message_id, chat_id, audio_file_path)
os.remove(audio_file_path) # Clean up the file after sending
else:
response_text(message_id, chat_id, prompt=prompt)
except Exception as e:
send_message(message_id, chat_id, "There was an error generating the audio. Please try again later.")
def handle_image_generation(message_id, chat_id, prompt):
try:
image, image_path, returned_prompt, image_url = generate_image(prompt, message_id, message_id, image_dir)
if image:
send_image(message_id, chat_id, image_path)
send_message(message_id, chat_id, f"Image generated successfully! You can view it here: {image_url}. \nPrompt: > _{returned_prompt}_")
else:
send_message(message_id, chat_id, "Failed to generate image. Please try again later.")
except Exception as e:
send_message(message_id, chat_id, "There was an error generating the image. Please try again later.")
@app.get("/", response_class=PlainTextResponse)
def index():
return "Server is running!"
@app.post("/whatsapp")
async def whatsapp_webhook(request: Request):
auth_header = request.headers.get('Authorization', '').strip()
if auth_header != f"Bearer {WEBHOOK_AUTH_TOKEN}":
raise HTTPException(status_code=403, detail="Unauthorized")
try:
data = await request.json()
except Exception:
return JSONResponse(content={"error": "Invalid JSON"}, status_code=400)
if data.get('typeWebhook') != 'incomingMessageReceived':
return {"success": True}
try:
chat_id = data['senderData']['chatId']
message_id = data['idMessage']
message_data = data.get('messageData', {})
if 'textMessageData' in message_data:
body = message_data['textMessageData']['textMessage'].strip()
elif 'extendedTextMessageData' in message_data:
body = message_data['extendedTextMessageData']['text'].strip()
else:
return {"success": True}
except KeyError as e:
return JSONResponse(content={"error": f"Missing key in data: {e}"}, status_code=200)
if body.lower().startswith('/imagine'):
prompt = body.replace('/imagine', '').strip()
if not prompt:
send_message(message_id, chat_id, "Please provide a prompt after /imagine.")
else:
send_message(message_id, chat_id, "Generating...")
threading.Thread(target=handle_image_generation, args=(message_id, chat_id, prompt)).start()
else:
threading.Thread(target=response_text, args=(message_id, chat_id, body)).start()
return {"success": True}
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=PORT, debug=True)