eve / app.py
Chandima Prabhath
ignore user's interactions
571de7b
raw
history blame
8.16 kB
import os
import threading
import requests
import logging
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import PlainTextResponse, JSONResponse
from FLUX import generate_image
from VoiceReply import generate_voice_reply
from llm import generate_llm
# Configure logging for debugging
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s")
GREEN_API_URL = os.getenv("GREEN_API_URL")
GREEN_API_MEDIA_URL = os.getenv("GREEN_API_MEDIA_URL", "https://api.green-api.com")
GREEN_API_TOKEN = os.getenv("GREEN_API_TOKEN")
GREEN_API_ID_INSTANCE = os.getenv("GREEN_API_ID_INSTANCE")
WEBHOOK_AUTH_TOKEN = os.getenv("WEBHOOK_AUTH_TOKEN")
PORT = 7860
image_dir = "/tmp/images"
audio_dir = "/tmp/audio"
if not all([GREEN_API_URL, GREEN_API_TOKEN, GREEN_API_ID_INSTANCE, WEBHOOK_AUTH_TOKEN]):
raise ValueError("Environment variables are not set properly")
app = FastAPI()
def send_message(message_id, to_number, message, retries=3):
chat_id = to_number if to_number.endswith('@g.us') else to_number
url = f"{GREEN_API_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendMessage/{GREEN_API_TOKEN}"
payload = {
"chatId": chat_id,
"message": message,
"quotedMessageId": message_id,
}
for attempt in range(retries):
try:
response = requests.post(url, json=payload)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
if attempt < retries - 1:
continue
return {"error": str(e)}
def send_image(message_id, to_number, image_path, retries=3):
chat_id = to_number if to_number.endswith('@g.us') else to_number
url = f"{GREEN_API_MEDIA_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendFileByUpload/{GREEN_API_TOKEN}"
payload = {'chatId': chat_id, 'caption': 'Here you go!', 'quotedMessageId': message_id}
files = [('file', ('image.jpg', open(image_path, 'rb'), 'image/jpeg'))]
for attempt in range(retries):
try:
response = requests.post(url, data=payload, files=files)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
if attempt < retries - 1:
continue
return {"error": str(e)}
def send_audio(message_id, to_number, audio_path, retries=3):
"""
Send an audio file using the Green API similar to send_image.
"""
logging.debug("Entering send_audio")
chat_id = to_number if to_number.endswith('@g.us') else to_number
if not os.path.exists(audio_path):
logging.debug(f"Audio file does not exist: {audio_path}")
url = f"{GREEN_API_MEDIA_URL}/waInstance{GREEN_API_ID_INSTANCE}/sendFileByUpload/{GREEN_API_TOKEN}"
payload = {'chatId': chat_id, 'caption': 'Here is your voice reply!', 'quotedMessageId': message_id}
try:
with open(audio_path, 'rb') as audio_file:
files = [('file', ('audio.mp3', audio_file, 'audio/mpeg'))]
for attempt in range(retries):
try:
logging.debug(f"Attempt {attempt + 1} to send audio")
response = requests.post(url, data=payload, files=files)
logging.debug(f"Response from send_audio: {response.status_code} {response.text}")
response.raise_for_status()
return response.json()
except requests.RequestException as e:
logging.debug(f"Exception on attempt {attempt + 1} in send_audio: {e}")
if attempt < retries - 1:
continue
return {"error": str(e)}
except Exception as e:
logging.debug(f"Failed to open audio file: {e}")
return {"error": str(e)}
def response_text(message_id, chat_id, prompt):
try:
msg = generate_llm(prompt)
send_message(message_id, chat_id, msg)
except Exception as e:
send_message(message_id, chat_id, "There was an error processing your request.")
def response_audio(message_id, chat_id, prompt):
logging.debug("Entering response_audio with prompt: %s", prompt)
try:
result = generate_voice_reply(prompt, model="openai-audio", voice="coral", audio_dir=audio_dir)
logging.debug("Result from generate_voice_reply: %s", result)
# Check result and also ensure the audio_file_path is not None or empty
if result and result[0]:
audio_file_path, audio_data = result
logging.debug("Audio file path generated: %s", audio_file_path)
send_result = send_audio(message_id, chat_id, audio_file_path)
logging.debug("Result from send_audio: %s", send_result)
if os.path.exists(audio_file_path):
os.remove(audio_file_path) # Clean up the file after sending
logging.debug("Removed audio file: %s", audio_file_path)
else:
logging.debug("generate_voice_reply returned None or empty audio file path, falling back to response_text")
response_text(message_id, chat_id, prompt)
except Exception as e:
logging.debug("Exception in response_audio: %s", e)
send_message(message_id, chat_id, "There was an error generating the audio. Please try again later.")
def handle_image_generation(message_id, chat_id, prompt):
try:
image, image_path, returned_prompt, image_url = generate_image(prompt, message_id, message_id, image_dir)
if image:
send_image(message_id, chat_id, image_path)
send_message(
message_id,
chat_id,
f"Image generated successfully! You can view it here: {image_url}.\n>{chr(8203)} _{returned_prompt}_"
)
else:
send_message(message_id, chat_id, "Failed to generate image. Please try again later.")
except Exception as e:
send_message(message_id, chat_id, "There was an error generating the image. Please try again later.")
@app.get("/", response_class=PlainTextResponse)
def index():
return "Server is running!"
@app.post("/whatsapp")
async def whatsapp_webhook(request: Request):
auth_header = request.headers.get('Authorization', '').strip()
if auth_header != f"Bearer {WEBHOOK_AUTH_TOKEN}":
raise HTTPException(status_code=403, detail="Unauthorized")
try:
data = await request.json()
except Exception:
return JSONResponse(content={"error": "Invalid JSON"}, status_code=400)
if data.get('typeWebhook') != 'incomingMessageReceived':
return {"success": True}
try:
chat_id = data['senderData']['chatId']
message_id = data['idMessage']
message_data = data.get('messageData', {})
# Ignore messages that are replies
if 'extendedTextMessageData' in message_data:
if message_data['extendedTextMessageData'].get('quotedMessageId'):
return {"success": True}
# If needed, add checks for other message types here
if 'textMessageData' in message_data:
body = message_data['textMessageData']['textMessage'].strip()
elif 'extendedTextMessageData' in message_data:
body = message_data['extendedTextMessageData']['text'].strip()
else:
return {"success": True}
except KeyError as e:
return JSONResponse(content={"error": f"Missing key in data: {e}"}, status_code=200)
if body.lower().startswith('/imagine'):
prompt = body.replace('/imagine', '').strip()
if not prompt:
send_message(message_id, chat_id, "Please provide a prompt after /imagine.")
else:
send_message(message_id, chat_id, "Generating...")
threading.Thread(target=handle_image_generation, args=(message_id, chat_id, prompt)).start()
else:
threading.Thread(target=response_audio, args=(message_id, chat_id, body)).start()
return {"success": True}
def main():
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=PORT)
if __name__ == '__main__':
main()