Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
""" | |
Apex Biotical Veterinary WhatsApp Assistant - Premium Edition | |
The most effective and accurate veterinary Assistant in the market | |
""" | |
import os | |
import pandas as pd | |
import requests | |
import json | |
from fastapi import FastAPI, Request, Response, Form, HTTPException, File, UploadFile | |
from fastapi.responses import JSONResponse, HTMLResponse, FileResponse | |
import time | |
import re | |
from typing import List, Dict, Any, Optional, Tuple | |
import openai | |
from dotenv import load_dotenv | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.templating import Jinja2Templates | |
import uvicorn | |
from datetime import datetime, timedelta | |
from rapidfuzz import process, fuzz | |
from deep_translator import GoogleTranslator | |
import numpy as np | |
import logging | |
import base64 | |
import tempfile | |
from reportlab.pdfgen import canvas | |
from reportlab.lib.pagesizes import letter, A4 | |
from reportlab.lib.units import inch | |
from reportlab.lib import colors | |
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, PageBreak | |
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
from reportlab.lib.enums import TA_CENTER, TA_LEFT, TA_JUSTIFY | |
import io | |
import pathlib | |
from collections import defaultdict, Counter | |
import hashlib | |
import aiofiles | |
import asyncio | |
from difflib import SequenceMatcher | |
import httpx | |
import langdetect | |
from langdetect import detect | |
import threading | |
import shutil | |
# Configure advanced logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler('veterinary_bot.log', encoding='utf-8'), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger(__name__) | |
# Load environment variables | |
load_dotenv() | |
# Initialize FastAPI app | |
app = FastAPI(title="Apex Biotical Veterinary Assistant", version="2.0.0") | |
# Ensure static and uploads directories exist before mounting | |
os.makedirs('static', exist_ok=True) | |
os.makedirs('uploads', exist_ok=True) | |
# Mount static files and templates | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
app.mount("/uploads", StaticFiles(directory="uploads"), name="uploads") | |
templates = Jinja2Templates(directory="templates") | |
# Global variables with enhanced data structures | |
CSV_FILE = "Veterinary.csv" | |
products_df = None | |
user_contexts = {} | |
last_products = {} | |
conversation_history = defaultdict(list) | |
product_analytics = defaultdict(int) | |
session_data = {} | |
# Environment variables | |
WHATSJET_API_URL = os.getenv("WHATSJET_API_URL") | |
WHATSJET_VENDOR_UID = os.getenv("WHATSJET_VENDOR_UID") | |
WHATSJET_API_TOKEN = os.getenv("WHATSJET_API_TOKEN") | |
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
SERVER_URL = os.getenv("SERVER_URL", "https://your-huggingface-space-url.hf.space") | |
# Initialize OpenAI client | |
if OPENAI_API_KEY: | |
openai.api_key = OPENAI_API_KEY | |
logger.info("✅ OpenAI client initialized successfully") | |
else: | |
logger.warning("⚠️ OpenAI API key not found - voice transcription will be disabled") | |
# Veterinary domain-specific constants | |
VETERINARY_CATEGORIES = { | |
'antibiotic': ['Antibiotic / Quinolone', 'Antibiotic / Respiratory Infections', 'Veterinary Injectable Solution (Antibiotic)'], | |
'respiratory': ['Respiratory Support', 'Respiratory / Mucolytic', 'Respiratory Support and Hygiene Enhancer'], | |
'liver': ['Liver & Kidney Support', 'Liver Tonic and Hepatoprotective Supplement'], | |
'vitamin': ['Multivitamin Supplement', 'Multivitamin Supplement for veterinary use', 'Vitamin and Amino Acid Supplement (Injectable Solution)'], | |
'supplement': ['Nutritional Supplement / Mycotoxins', 'Immunity Enhancer and Antioxidant Supplement'], | |
'mycotoxin': ['Mycotoxin Binder'], | |
'heat_stress': ['Heat Stress Support'], | |
'anticoccidial': ['Anticoccidial / Sulfonamide'], | |
'phytogenic': ['Phytogenic / Antibiotic Alternative'] | |
} | |
VETERINARY_SYMPTOMS = { | |
'respiratory': ['cough', 'breathing', 'respiratory', 'bronchitis', 'pneumonia', 'crd', 'coryza', 'flu'], | |
'liver': ['liver', 'hepatitis', 'jaundice', 'ascites', 'fatty liver'], | |
'diarrhea': ['diarrhea', 'diarrhoea', 'loose stool', 'gastroenteritis'], | |
'stress': ['stress', 'heat stress', 'transport', 'vaccination'], | |
'infection': ['infection', 'bacterial', 'viral', 'fungal', 'septicemia'], | |
'deficiency': ['vitamin deficiency', 'mineral deficiency', 'anemia'], | |
'mycotoxin': ['mycotoxin', 'mold', 'fungal toxin', 'aflatoxin'] | |
} | |
VETERINARY_SPECIES = { | |
'poultry': ['chicken', 'broiler', 'layer', 'turkey', 'duck', 'quail', 'poultry'], | |
'livestock': ['cattle', 'cow', 'buffalo', 'sheep', 'goat', 'livestock'], | |
'pet': ['dog', 'cat', 'pet', 'companion animal'] | |
} | |
# Menu Configuration - Define each menu with its valid options | |
MENU_CONFIG = { | |
'main_menu': { | |
'name': 'Main Menu', | |
'valid_options': ['1', '2', '3', '4'], | |
'option_descriptions': { | |
'1': 'Search Products', | |
'2': 'Browse Categories', | |
'3': 'Download Catalog', | |
'4': 'Chat with Veterinary AI Assistant' | |
} | |
}, | |
'category_selection_menu': { | |
'name': 'Category Selection Menu', | |
'valid_options': [], # Will be populated dynamically based on available categories | |
'option_descriptions': {} | |
}, | |
'category_products_menu': { | |
'name': 'Category Products Menu', | |
'valid_options': [], # Will be populated dynamically based on available products | |
'option_descriptions': {} | |
}, | |
'all_products_menu': { | |
'name': 'All Products Menu', | |
'valid_options': [], # Will be populated dynamically based on all products | |
'option_descriptions': {} | |
}, | |
'product_inquiry': { | |
'name': 'Product Inquiry Menu', | |
'valid_options': ['1', '2', '3'], | |
'option_descriptions': { | |
'1': 'Talk to Veterinary Consultant', | |
'2': 'Inquire about Product Availability', | |
'3': 'Back to Main Menu' | |
} | |
}, | |
'ai_chat': { | |
'name': 'AI Chat Mode', | |
'valid_options': ['main'], | |
'option_descriptions': { | |
'main': 'Return to Main Menu' | |
} | |
} | |
} | |
def validate_menu_selection(selection: str, current_state: str, user_context: dict) -> tuple[bool, str]: | |
""" | |
Validate if a selection is valid for the current menu | |
Returns (is_valid, error_message) | |
""" | |
if current_state not in MENU_CONFIG: | |
return False, f"❌ Unknown menu state: {current_state}" | |
menu_config = MENU_CONFIG[current_state] | |
valid_options = menu_config['valid_options'] | |
# For dynamic menus, get valid options from context | |
if current_state == 'category_selection_menu': | |
available_categories = user_context.get('available_categories', []) | |
valid_options = [str(i) for i in range(1, len(available_categories) + 1)] | |
elif current_state == 'category_products_menu': | |
available_products = user_context.get('available_products', []) | |
valid_options = [str(i) for i in range(1, len(available_products) + 1)] | |
elif current_state == 'all_products_menu': | |
if products_df is not None and not products_df.empty: | |
valid_options = [str(i) for i in range(1, len(products_df) + 1)] | |
# Check if selection is valid | |
if selection in valid_options: | |
return True, "" | |
# Generate error message with valid options | |
if valid_options: | |
error_msg = f"❌ Invalid selection for {menu_config['name']}. Valid options: {', '.join(valid_options)}" | |
else: | |
error_msg = f"❌ Invalid selection for {menu_config['name']}. No options available." | |
return False, error_msg | |
def get_menu_info(current_state: str, user_context: dict) -> dict: | |
""" | |
Get information about the current menu including valid options | |
""" | |
if current_state not in MENU_CONFIG: | |
return {"name": "Unknown Menu", "valid_options": [], "option_descriptions": {}} | |
menu_config = MENU_CONFIG[current_state].copy() | |
# For dynamic menus, populate valid options from context | |
if current_state == 'category_selection_menu': | |
available_categories = user_context.get('available_categories', []) | |
menu_config['valid_options'] = [str(i) for i in range(1, len(available_categories) + 1)] | |
menu_config['option_descriptions'] = { | |
str(i): category for i, category in enumerate(available_categories, 1) | |
} | |
elif current_state == 'category_products_menu': | |
available_products = user_context.get('available_products', []) | |
menu_config['valid_options'] = [str(i) for i in range(1, len(available_products) + 1)] | |
menu_config['option_descriptions'] = { | |
str(i): product.get('Product Name', f'Product {i}') | |
for i, product in enumerate(available_products, 1) | |
} | |
elif current_state == 'all_products_menu': | |
if products_df is not None and not products_df.empty: | |
all_products = products_df.to_dict('records') | |
menu_config['valid_options'] = [str(i) for i in range(1, len(all_products) + 1)] | |
menu_config['option_descriptions'] = { | |
str(i): product.get('Product Name', f'Product {i}') | |
for i, product in enumerate(all_products, 1) | |
} | |
return menu_config | |
# Voice processing functions | |
async def download_voice_file(media_url: str, filename: str) -> str: | |
"""Download voice file from WhatsApp""" | |
try: | |
# Create temp_voice directory if it doesn't exist | |
os.makedirs('temp_voice', exist_ok=True) | |
# Download the file | |
async with httpx.AsyncClient() as client: | |
response = await client.get(media_url) | |
response.raise_for_status() | |
file_path = os.path.join('temp_voice', filename) | |
with open(file_path, 'wb') as f: | |
f.write(response.content) | |
logger.info(f"Voice file downloaded: {file_path}") | |
return file_path | |
except Exception as e: | |
logger.error(f"Error downloading voice file: {e}") | |
return None | |
async def transcribe_voice_with_openai(file_path: str) -> str: | |
"""Transcribe voice file using OpenAI Whisper with comprehensive veterinary domain system prompt""" | |
try: | |
# Check if file exists and has content | |
if not os.path.exists(file_path): | |
logger.error(f"[Transcribe] File not found: {file_path}") | |
return None | |
file_size = os.path.getsize(file_path) | |
if file_size == 0: | |
logger.error(f"[Transcribe] Empty file: {file_path}") | |
return None | |
logger.info(f"[Transcribe] Transcribing file: {file_path} (size: {file_size} bytes)") | |
# Comprehensive system prompt for veterinary WhatsApp assistant | |
system_prompt = """ | |
You are transcribing voice messages for Apex Biotical Veterinary WhatsApp Assistant. This is a professional veterinary products chatbot. | |
CONTEXT: Users can speak product names, menu selections, numbers, and general queries in English or Urdu. | |
PRODUCT NAMES (Veterinary Products): | |
- Hydropex (electrolyte supplement) | |
- Respira Aid Plus (respiratory support) | |
- Heposel (liver tonic) | |
- Bromacid (respiratory/mucolytic) | |
- Hexatox (liver & kidney support) | |
- APMA Fort (mycotoxin binder) | |
- Para C.E (heat stress support) | |
- Tribiotic (antibiotic) | |
- PHYTO-SAL (phytogenic supplement) | |
- Mycopex Super (mycotoxin binder) | |
- Eflin KT-20 (antibiotic) | |
- Salcozine ST-30 (anticoccidial) | |
- Oftilex UA-10 (antibiotic) | |
- Biscomin 10 (injectable antibiotic) | |
- Apvita Plus (vitamin supplement) | |
- B-G Aspro-C (aspirin + vitamin C) | |
- EC-Immune (immune booster) | |
- Liverpex (liver tonic) | |
- Symodex (multivitamin) | |
- Respira Aid (respiratory support) | |
- Adek Gold (multivitamin) | |
- Immuno DX (immune enhancer) | |
MENU SELECTIONS: | |
- Main menu options: 1, 2, 3, 4 | |
- Product numbers: 1-23 | |
- Category numbers: 1-10 | |
- Navigation: main, menu, back, home, start | |
NUMBERS (English & Urdu): | |
English: one, two, three, four, five, six, seven, eight, nine, ten, eleven, twelve, thirteen, fourteen, fifteen, sixteen, seventeen, eighteen, nineteen, twenty, twenty-one, twenty-two, twenty-three | |
Urdu (Roman): aik, ek, do, teen, char, panch, che, sat, ath, nau, das, gyara, bara, tera, choda, pandra, sola, satara, athara, unnees, bees, ikkees, baees, tees | |
Urdu (Script): ایک, دو, تین, چار, پانچ, چھ, سات, آٹھ, نو, دس, گیارہ, بارہ, تیرہ, چودہ, پندرہ, سولہ, سترہ, اٹھارہ, انیس, بیس, اکیس, بائیس, تئیس | |
COMMON GREETINGS: | |
English: hi, hello, hey, good morning, good afternoon, good evening, how are you | |
Urdu: salam, assalamu alaikum, adaab, namaste, khuda hafiz | |
MENU COMMANDS: | |
English: search, browse, download, catalog, contact, availability, main menu, option, number, choice | |
Urdu: تلاش, براؤز, ڈاؤن لوڈ, کیٹلاگ, رابطہ, دستیابی, مین مینو, آپشن, نمبر, اختیار | |
TRANSCRIPTION RULES: | |
1. Transcribe product names exactly as listed above | |
2. Convert spoken numbers to digits (1, 2, 3, etc.) | |
3. Handle both English and Urdu speech | |
4. Preserve exact spelling for product names | |
5. Convert menu selections to numbers | |
6. Handle common transcription errors (opium->option, numara->number) | |
7. Maintain context for veterinary domain | |
EXAMPLES: | |
- "hydropex" -> "hydropex" | |
- "respira aid plus" -> "respira aid plus" | |
- "option number one" -> "1" | |
- "aik" -> "1" | |
- "do" -> "2" | |
- "main menu" -> "main" | |
- "salam" -> "salam" | |
- "search products" -> "search products" | |
""" | |
# First attempt with comprehensive system prompt | |
with open(file_path, 'rb') as audio_file: | |
transcript = openai.Audio.transcribe( | |
model="whisper-1", | |
file=audio_file, | |
language="en", # Start with English | |
prompt=system_prompt | |
) | |
transcribed_text = transcript.text.strip() | |
logger.info(f"[Transcribe] Voice transcribed (English): '{transcribed_text}'") | |
# If first attempt failed or seems unclear, try with Urdu-specific prompt | |
if not transcribed_text or len(transcribed_text.strip()) < 2: | |
logger.warning(f"[Transcribe] First attempt failed, trying with Urdu-specific prompt") | |
urdu_system_prompt = """ | |
You are transcribing Urdu voice messages for Apex Biotical Veterinary WhatsApp Assistant. | |
PRODUCT NAMES (Urdu/English): | |
- ہائیڈروپیکس (Hydropex) | |
- ریسپیرا ایڈ پلس (Respira Aid Plus) | |
- ہیپوسیل (Heposel) | |
- بروماسڈ (Bromacid) | |
- ہیکساٹوکس (Hexatox) | |
- اے پی ایم اے فورٹ (APMA Fort) | |
- پیرا سی ای (Para C.E) | |
- ٹرائی بیوٹک (Tribiotic) | |
- فائٹو سال (PHYTO-SAL) | |
- مائیکوپیکس سپر (Mycopex Super) | |
URDU NUMBERS: | |
- ایک (1), دو (2), تین (3), چار (4), پانچ (5) | |
- چھ (6), سات (7), آٹھ (8), نو (9), دس (10) | |
- گیارہ (11), بارہ (12), تیرہ (13), چودہ (14), پندرہ (15) | |
- سولہ (16), سترہ (17), اٹھارہ (18), انیس (19), بیس (20) | |
- اکیس (21), بائیس (22), تئیس (23) | |
URDU GREETINGS: | |
- سلام (salam), السلام علیکم (assalamu alaikum) | |
- آداب (adaab), نمستے (namaste), خدا حافظ (khuda hafiz) | |
URDU MENU COMMANDS: | |
- مین مینو (main menu), آپشن (option), نمبر (number) | |
- تلاش (search), براؤز (browse), ڈاؤن لوڈ (download) | |
- کیٹلاگ (catalog), رابطہ (contact), دستیابی (availability) | |
TRANSCRIPTION RULES: | |
1. Transcribe Urdu words in Urdu script | |
2. Convert Urdu numbers to digits | |
3. Handle mixed Urdu-English speech | |
4. Preserve product names exactly | |
5. Convert menu selections to numbers | |
""" | |
with open(file_path, 'rb') as audio_file: | |
transcript = openai.Audio.transcribe( | |
model="whisper-1", | |
file=audio_file, | |
language="ur", # Force Urdu | |
prompt=urdu_system_prompt | |
) | |
transcribed_text = transcript.text.strip() | |
logger.info(f"[Transcribe] Second attempt transcribed (Urdu): '{transcribed_text}'") | |
# Third attempt with mixed language prompt if still failing | |
if not transcribed_text or len(transcribed_text.strip()) < 2: | |
logger.warning(f"[Transcribe] Second attempt failed, trying with mixed language prompt") | |
mixed_system_prompt = """ | |
You are transcribing voice messages for a veterinary products WhatsApp assistant. The user may speak in English, Urdu, or a mix of both languages. | |
PRODUCT NAMES (exact spelling required): | |
Hydropex, Respira Aid Plus, Heposel, Bromacid, Hexatox, APMA Fort, Para C.E, Tribiotic, PHYTO-SAL, Mycopex Super, Eflin KT-20, Salcozine ST-30, Oftilex UA-10, Biscomin 10, Apvita Plus, B-G Aspro-C, EC-Immune, Liverpex, Symodex, Respira Aid, Adek Gold, Immuno DX | |
NUMBERS (convert to digits): | |
English: one->1, two->2, three->3, etc. | |
Urdu: aik->1, ek->1, do->2, teen->3, etc. | |
MENU COMMANDS: | |
main, menu, back, home, start, option, number, search, browse, download, catalog, contact, availability | |
GREETINGS: | |
hi, hello, salam, assalamu alaikum, adaab, namaste | |
TRANSCRIPTION RULES: | |
1. Transcribe exactly what you hear | |
2. Convert numbers to digits | |
3. Preserve product names exactly | |
4. Handle both languages | |
5. Convert menu selections to numbers | |
""" | |
with open(file_path, 'rb') as audio_file: | |
transcript = openai.Audio.transcribe( | |
model="whisper-1", | |
file=audio_file, | |
prompt=mixed_system_prompt | |
) | |
transcribed_text = transcript.text.strip() | |
logger.info(f"[Transcribe] Third attempt (mixed) transcribed: '{transcribed_text}'") | |
# Final check for empty transcription | |
if not transcribed_text or len(transcribed_text.strip()) < 2: | |
logger.warning(f"[Transcribe] Very short or empty transcription: '{transcribed_text}'") | |
return "" | |
return transcribed_text | |
except Exception as e: | |
logger.error(f"[Transcribe] Error transcribing voice: {e}") | |
logger.error(f"[Transcribe] File path: {file_path}") | |
return None | |
def process_voice_input(text: str) -> str: | |
"""Process and clean voice input text with veterinary domain-specific transcription error correction""" | |
if not text: | |
return "" | |
# Clean the text | |
processed_text = text.strip() | |
# Remove extra whitespace | |
processed_text = re.sub(r'\s+', ' ', processed_text) | |
# Basic punctuation cleanup | |
processed_text = processed_text.replace(' ,', ',').replace(' .', '.') | |
# Veterinary domain-specific transcription error corrections | |
transcription_fixes = { | |
# Common menu selection errors | |
'opium': 'option', | |
'opium numara': 'option number', | |
'opium number': 'option number', | |
'opium number one': 'option number one', | |
'opium number two': 'option number two', | |
'opium number three': 'option number three', | |
'opium one': 'option one', | |
'opium two': 'option two', | |
'opium three': 'option three', | |
'numara': 'number', | |
'numbara': 'number', | |
'numbra': 'number', | |
'numbra one': 'number one', | |
'numbra two': 'number two', | |
'numbra three': 'number three', | |
'numbra 1': 'number 1', | |
'numbra 2': 'number 2', | |
'numbra 3': 'number 3', | |
# Number fixes - only when they appear as standalone numbers | |
'aik': '1', | |
'ek': '1', | |
'do': '2', | |
'teen': '3', | |
'char': '4', | |
'panch': '5', | |
'che': '3', | |
'tree': '3', | |
'free': '3', | |
'for': '4', | |
'fiv': '5', | |
'sik': '6', | |
'sat': '7', | |
'ath': '8', | |
'nau': '9', | |
'das': '10', | |
# Navigation command fixes | |
'man': 'main', | |
'men': 'main', | |
'mean': 'main', | |
'mein': 'main', | |
'maine': 'main', | |
'menu': 'main', | |
'home': 'main', | |
'back': 'main', | |
'return': 'main', | |
# Veterinary product name corrections | |
'hydro pex': 'hydropex', | |
'hydro pex': 'hydropex', | |
'respira aid': 'respira aid plus', | |
'respira aid plus': 'respira aid plus', | |
'hepo sel': 'heposel', | |
'brom acid': 'bromacid', | |
'hexa tox': 'hexatox', | |
'apma fort': 'apma fort', | |
'para c': 'para c.e', | |
'para ce': 'para c.e', | |
'tribiotic': 'tribiotic', | |
'phyto sal': 'phyto-sal', | |
'mycopex': 'mycopex super', | |
'mycopex super': 'mycopex super', | |
'eflin': 'eflin kt-20', | |
'salcozine': 'salcozine st-30', | |
'oftilex': 'oftilex ua-10', | |
'biscomin': 'biscomin 10', | |
'apvita': 'apvita plus', | |
'bg aspro': 'b-g aspro-c', | |
'ec immune': 'ec-immune', | |
'liverpex': 'liverpex', | |
'symodex': 'symodex', | |
'adek': 'adek gold', | |
'immuno': 'immuno dx' | |
} | |
# Apply transcription fixes - but be careful with Islamic greetings | |
original_text = processed_text.lower() | |
# Special handling for Islamic greetings - don't change "aik" in "assalamu alaikum" | |
if 'assalamu alaikum' in original_text or 'assalam' in original_text: | |
# Don't apply number fixes to Islamic greetings | |
for wrong, correct in transcription_fixes.items(): | |
if wrong in original_text and wrong not in ['aik', 'ek']: # Skip number fixes for greetings | |
processed_text = processed_text.lower().replace(wrong, correct) | |
logger.info(f"Fixed transcription error: '{wrong}' -> '{correct}' in '{text}'") | |
else: | |
# Apply all fixes for non-greeting text | |
for wrong, correct in transcription_fixes.items(): | |
if wrong in original_text: | |
processed_text = processed_text.lower().replace(wrong, correct) | |
logger.info(f"Fixed transcription error: '{wrong}' -> '{correct}' in '{text}'") | |
logger.info(f"Voice input processed: '{text}' -> '{processed_text}'") | |
return processed_text | |
# Note: Voice messages are now processed exactly like text messages | |
# The transcribed voice text is passed directly to process_incoming_message | |
# This ensures consistent behavior between voice and text inputs | |
# Enhanced product search with veterinary domain expertise | |
def get_veterinary_product_matches(query: str) -> List[Dict[str, Any]]: | |
""" | |
Advanced veterinary product matching with domain-specific intelligence | |
""" | |
if not query: | |
return [] | |
if products_df is None: | |
load_products_data() | |
normalized_query = normalize(query).lower().strip() | |
logger.info(f"[Veterinary Search] Searching for: '{normalized_query}'") | |
# Skip very short queries that are likely menu selections | |
if len(normalized_query) <= 2 and normalized_query.isdigit(): | |
logger.info(f"[Veterinary Search] Skipping menu selection: '{normalized_query}'") | |
return [] | |
scored_matches = [] | |
# Veterinary-specific query expansion | |
expanded_queries = [normalized_query] | |
# Expand by symptoms | |
for symptom_category, symptoms in VETERINARY_SYMPTOMS.items(): | |
if any(symptom in normalized_query for symptom in symptoms): | |
expanded_queries.extend(symptoms) | |
# Expand by species | |
for species_category, species in VETERINARY_SPECIES.items(): | |
if any(sp in normalized_query for sp in species): | |
expanded_queries.extend(species) | |
# Expand by category | |
for category_key, categories in VETERINARY_CATEGORIES.items(): | |
if category_key in normalized_query: | |
expanded_queries.extend(categories) | |
# Common veterinary product variations | |
veterinary_variations = { | |
'hydropex': ['hydropex', 'hydro pex', 'electrolyte', 'dehydration', 'heat stress'], | |
'heposel': ['heposel', 'hepo sel', 'liver tonic', 'hepatoprotective'], | |
'bromacid': ['bromacid', 'brom acid', 'respiratory', 'mucolytic'], | |
'respira aid': ['respira aid', 'respira aid plus', 'respiratory support'], | |
'hexatox': ['hexatox', 'hexa tox', 'liver support', 'kidney support'], | |
'apma fort': ['apma fort', 'mycotoxin', 'liver support'], | |
'para c': ['para c', 'para c.e', 'heat stress', 'paracetamol'], | |
'tribiotic': ['tribiotic', 'antibiotic', 'respiratory infection'], | |
'phyto-sal': ['phyto-sal', 'phytogenic', 'vitamin supplement'], | |
'mycopex': ['mycopex', 'mycotoxin binder', 'mold'], | |
'oftilex': ['oftilex', 'ofloxacin', 'antibiotic'], | |
'biscomin': ['biscomin', 'oxytetracycline', 'injectable'], | |
'apvita': ['apvita', 'vitamin b', 'amino acid'], | |
'bg aspro': ['bg aspro', 'aspirin', 'vitamin c'], | |
'ec-immune': ['ec-immune', 'immune', 'immunity'], | |
'liverpex': ['liverpex', 'liver', 'metabolic'], | |
'symodex': ['symodex', 'multivitamin', 'vitamin'], | |
'adek gold': ['adek gold', 'vitamin', 'multivitamin'], | |
'immuno dx': ['immuno dx', 'immune', 'antioxidant'] | |
} | |
# Add veterinary variations | |
for key, variations in veterinary_variations.items(): | |
if key in normalized_query: | |
expanded_queries.extend(variations) | |
for _, row in products_df.iterrows(): | |
best_score = 0 | |
best_match_type = "" | |
match_details = {} | |
# Search across all relevant fields with veterinary weighting | |
search_fields = [ | |
('Product Name', row.get('Product Name', ''), 1.0), | |
('Category', row.get('Category', ''), 0.8), | |
('Indications', row.get('Indications', ''), 0.9), | |
('Target Species', row.get('Target Species', ''), 0.7), | |
('Type', row.get('Type', ''), 0.6), | |
('Composition', row.get('Composition', ''), 0.5) | |
] | |
for field_name, field_value, weight in search_fields: | |
if pd.isna(field_value) or not field_value: | |
continue | |
field_str = str(field_value).lower() | |
# Exact matches (highest priority) | |
for expanded_query in expanded_queries: | |
if expanded_query in field_str or field_str in expanded_query: | |
score = 100 * weight | |
if score > best_score: | |
best_score = score | |
best_match_type = "exact" | |
match_details = {"field": field_name, "query": expanded_query} | |
# Fuzzy matching for close matches | |
for expanded_query in expanded_queries: | |
if len(expanded_query) > 3: # Only fuzzy match longer queries | |
score = fuzz.partial_ratio(normalized_query, field_str) * weight | |
if score > best_score and score > 70: | |
best_score = score | |
best_match_type = "fuzzy" | |
match_details = {"field": field_name, "query": expanded_query} | |
if best_score > 70: | |
product_dict = row.to_dict() | |
product_dict['_score'] = best_score | |
product_dict['_match_type'] = best_match_type | |
product_dict['_match_details'] = match_details | |
scored_matches.append(product_dict) | |
scored_matches.sort(key=lambda x: x['_score'], reverse=True) | |
# Remove duplicates based on product name | |
seen_names = set() | |
unique_matches = [] | |
for match in scored_matches: | |
if match['Product Name'] not in seen_names: | |
seen_names.add(match['Product Name']) | |
unique_matches.append(match) | |
return unique_matches | |
def normalize(text: str) -> str: | |
"""Normalize text for search""" | |
if not text: | |
return "" | |
# Convert to lowercase and remove extra whitespace | |
normalized = text.lower().strip() | |
# Remove special characters but keep spaces | |
normalized = re.sub(r'[^\w\s]', '', normalized) | |
# Replace multiple spaces with single space | |
normalized = re.sub(r'\s+', ' ', normalized) | |
return normalized | |
# Enhanced context management with veterinary domain awareness | |
class VeterinaryContextManager: | |
def __init__(self): | |
self.user_contexts = {} | |
self.conversation_history = defaultdict(list) | |
self.product_analytics = defaultdict(int) | |
self.session_data = {} | |
def get_context(self, phone_number: str) -> Dict[str, Any]: | |
"""Get or create user context with veterinary domain awareness""" | |
if phone_number not in self.user_contexts: | |
self.user_contexts[phone_number] = { | |
"current_state": "main_menu", | |
"current_menu": "main_menu", | |
"current_menu_options": ["Search Veterinary Products", "Browse Categories", "Download Catalog"], | |
"current_product": None, | |
"current_category": None, | |
"search_history": [], | |
"product_interests": [], | |
"species_preference": None, | |
"symptom_context": None, | |
"last_interaction": datetime.now(), | |
"session_start": datetime.now(), | |
"interaction_count": 0, | |
"last_message": "", | |
"available_categories": [], | |
"available_products": [] | |
} | |
return self.user_contexts[phone_number] | |
def update_context(self, phone_number: str, **kwargs): | |
"""Update user context with veterinary domain data""" | |
context = self.get_context(phone_number) | |
context.update(kwargs) | |
context["last_interaction"] = datetime.now() | |
context["interaction_count"] += 1 | |
# Track product interests for recommendations | |
if "current_product" in kwargs and kwargs["current_product"]: | |
product_name = kwargs["current_product"].get("Product Name", "") | |
if product_name: | |
context["product_interests"].append(product_name) | |
self.product_analytics[product_name] += 1 | |
def add_to_history(self, phone_number: str, message: str, response: str): | |
"""Add interaction to conversation history""" | |
self.conversation_history[phone_number].append({ | |
"timestamp": datetime.now(), | |
"user_message": message, | |
"bot_response": response | |
}) | |
# Keep only last 20 interactions | |
if len(self.conversation_history[phone_number]) > 20: | |
self.conversation_history[phone_number] = self.conversation_history[phone_number][-20:] | |
def get_recommendations(self, phone_number: str) -> List[Dict[str, Any]]: | |
"""Get personalized product recommendations based on user history""" | |
context = self.get_context(phone_number) | |
recommendations = [] | |
# Recommend based on product interests | |
if context["product_interests"]: | |
for product_name in context["product_interests"][-3:]: # Last 3 products | |
products = get_veterinary_product_matches(product_name) | |
if products: | |
# Find related products in same category | |
category = products[0].get("Category", "") | |
if category: | |
category_products = get_products_by_category(category) | |
for product in category_products[:3]: | |
if product.get("Product Name") != product_name: | |
recommendations.append(product) | |
# Remove duplicates and limit | |
seen = set() | |
unique_recommendations = [] | |
for rec in recommendations: | |
name = rec.get("Product Name", "") | |
if name and name not in seen: | |
seen.add(name) | |
unique_recommendations.append(rec) | |
return unique_recommendations[:5] | |
# Initialize context manager | |
context_manager = VeterinaryContextManager() | |
# Enhanced product response with veterinary domain expertise | |
def generate_veterinary_product_response(product_info: Dict[str, Any], user_context: Dict[str, Any]) -> str: | |
"""Generate comprehensive veterinary product response with intelligent information handling""" | |
def clean_text(text): | |
if pd.isna(text) or text is None: | |
return "Not specified" | |
return str(text).strip() | |
# Extract product details | |
product_name = clean_text(product_info.get('Product Name', '')) | |
product_type = clean_text(product_info.get('Type', '')) | |
category = clean_text(product_info.get('Category', '')) | |
indications = clean_text(product_info.get('Indications', '')) | |
# Check for PDF link in the CSV data | |
pdf_link = "" | |
try: | |
# Load CSV data to check for PDF link | |
csv_data = pd.read_csv('Veterinary.csv') | |
product_row = csv_data[csv_data['Product Name'] == product_name] | |
if not product_row.empty: | |
brochure_link = product_row.iloc[0].get('Brochure (PDF)', '') | |
if pd.notna(brochure_link) and brochure_link.strip(): | |
pdf_link = brochure_link.strip() | |
except Exception as e: | |
logger.warning(f"Error checking PDF link for {product_name}: {e}") | |
# Build the response | |
response = f"""🧪 *Name:* {product_name} | |
📦 *Type:* {product_type} | |
🏥 *Category:* {category} | |
💊 *Used For:* {indications}""" | |
# Add PDF link if available, in the requested format | |
if pdf_link: | |
response += f"\n\n📄 Product Brochure Available\n🔗 {product_name} PDF:\n{pdf_link}" | |
# Add menu options | |
response += f""" | |
💬 *Available Actions:* | |
1️⃣ Talk to Veterinary Consultant | |
2️⃣ Inquire About Availability | |
3️⃣ Back to Main Menu | |
💬 Select an option or ask about related products""" | |
return response | |
def clean_text_for_pdf(text: str) -> str: | |
"""Clean text for PDF generation""" | |
if pd.isna(text) or text is None: | |
return "N/A" | |
cleaned = str(text) | |
# Remove or replace problematic characters for PDF | |
cleaned = cleaned.replace('â€"', '-').replace('â€"', '"').replace('’', "'") | |
cleaned = cleaned.replace('“', '"').replace('â€', '"').replace('…', '...') | |
cleaned = re.sub(r'[^\w\s\-.,()%:;]', '', cleaned) | |
return cleaned.strip() | |
# Enhanced PDF generation with veterinary domain expertise | |
def generate_veterinary_pdf(product: Dict[str, Any]) -> bytes: | |
""" | |
Generate comprehensive veterinary PDF with professional formatting | |
""" | |
buffer = io.BytesIO() | |
doc = SimpleDocTemplate(buffer, pagesize=A4) | |
styles = getSampleStyleSheet() | |
# Veterinary-specific styles | |
title_style = ParagraphStyle( | |
'VeterinaryTitle', | |
parent=styles['Heading1'], | |
fontSize=18, | |
spaceAfter=25, | |
alignment=TA_CENTER, | |
textColor=colors.darkblue, | |
fontName='Helvetica-Bold' | |
) | |
heading_style = ParagraphStyle( | |
'VeterinaryHeading', | |
parent=styles['Heading2'], | |
fontSize=14, | |
spaceAfter=12, | |
textColor=colors.darkgreen, | |
fontName='Helvetica-Bold' | |
) | |
normal_style = ParagraphStyle( | |
'VeterinaryNormal', | |
parent=styles['Normal'], | |
fontSize=11, | |
spaceAfter=8, | |
alignment=TA_JUSTIFY, | |
fontName='Helvetica' | |
) | |
# Build PDF content | |
story = [] | |
# Header with veterinary branding | |
story.append(Paragraph("🏥 APEX BIOTICAL VETERINARY PRODUCTS", title_style)) | |
story.append(Spacer(1, 20)) | |
# Product information | |
product_name = clean_text_for_pdf(product.get('Product Name', 'Unknown Product')) | |
story.append(Paragraph(f"<b>Product: {product_name}</b>", heading_style)) | |
story.append(Spacer(1, 15)) | |
# Clinical information table | |
clinical_info = [ | |
['Field', 'Information'], | |
['Product Name', clean_text_for_pdf(product.get('Product Name', 'N/A'))], | |
['Category', clean_text_for_pdf(product.get('Category', 'N/A'))], | |
['Target Species', clean_text_for_pdf(product.get('Target Species', 'N/A'))], | |
['Product Type', clean_text_for_pdf(product.get('Type', 'N/A'))] | |
] | |
clinical_table = Table(clinical_info, colWidths=[2*inch, 4*inch]) | |
clinical_table.setStyle(TableStyle([ | |
('BACKGROUND', (0, 0), (-1, 0), colors.darkblue), | |
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke), | |
('ALIGN', (0, 0), (-1, -1), 'LEFT'), | |
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), | |
('FONTSIZE', (0, 0), (-1, 0), 12), | |
('BOTTOMPADDING', (0, 0), (-1, 0), 12), | |
('BACKGROUND', (0, 1), (-1, -1), colors.lightblue), | |
('GRID', (0, 0), (-1, -1), 1, colors.black) | |
])) | |
story.append(Paragraph("Clinical Information", heading_style)) | |
story.append(clinical_table) | |
story.append(Spacer(1, 20)) | |
# Clinical details | |
if product.get('Indications'): | |
story.append(Paragraph("Clinical Indications", heading_style)) | |
story.append(Paragraph(clean_text_for_pdf(product.get('Indications')), normal_style)) | |
story.append(Spacer(1, 15)) | |
if product.get('Composition'): | |
story.append(Paragraph("Composition", heading_style)) | |
story.append(Paragraph(clean_text_for_pdf(product.get('Composition')), normal_style)) | |
story.append(Spacer(1, 15)) | |
if product.get('Dosage & Administration'): | |
story.append(Paragraph("Dosage & Administration", heading_style)) | |
story.append(Paragraph(clean_text_for_pdf(product.get('Dosage & Administration')), normal_style)) | |
story.append(Spacer(1, 15)) | |
if product.get('Precautions'): | |
story.append(Paragraph("Precautions", heading_style)) | |
story.append(Paragraph(clean_text_for_pdf(product.get('Precautions')), normal_style)) | |
story.append(Spacer(1, 15)) | |
if product.get('Storage'): | |
story.append(Paragraph("Storage", heading_style)) | |
story.append(Paragraph(clean_text_for_pdf(product.get('Storage')), normal_style)) | |
story.append(Spacer(1, 15)) | |
# Veterinary disclaimer | |
story.append(Paragraph("Veterinary Disclaimer", heading_style)) | |
disclaimer_text = ( | |
"This product should be used under veterinary supervision. " | |
"Always consult with a qualified veterinarian before administration. " | |
"Follow dosage instructions precisely and monitor animal response. " | |
"Store according to manufacturer guidelines and keep out of reach of children." | |
) | |
story.append(Paragraph(disclaimer_text, normal_style)) | |
# Build PDF | |
doc.build(story) | |
buffer.seek(0) | |
return buffer.getvalue() | |
async def send_catalog_pdf(phone_number: str): | |
"""Send the complete product catalog as a link to the PDF""" | |
try: | |
# Use the correct Google Drive link converted to direct download format | |
catalog_url = "https://drive.google.com/uc?export=download&id=1mxpkFf3DY-n3QHzZBe_CdksR-gHu2f_0" | |
message = ( | |
"📋 *Apex Biotical Veterinary Products Catalog*\n\n" | |
"📄 Here's your complete product catalog with all our veterinary products:\n" | |
f"📎 [Apex Biotical Veterinary Products Catalog.pdf]({catalog_url})\n\n" | |
"💬 For detailed information about any specific product, type its name or contact our sales team.\n\n" | |
"Type main at any time to return to the main menu." | |
) | |
send_whatsjet_message(phone_number, message) | |
except Exception as e: | |
logger.error(f"Error sending catalog: {e}") | |
send_whatsjet_message(phone_number, | |
"❌ Error sending catalog. Please try again or contact our sales team for assistance.") | |
async def send_individual_product_pdf(phone_number: str, product: Dict[str, Any]): | |
"""Send individual product PDF with download link""" | |
try: | |
# Generate PDF for the product | |
pdf_content = generate_veterinary_pdf(product) | |
# Create filename | |
product_name = product.get('Product Name', 'Unknown_Product') | |
safe_name = re.sub(r'[^\w\s-]', '', product_name).replace(' ', '_') | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
filename = f"{safe_name}_{timestamp}.pdf" | |
# Save PDF to uploads directory | |
uploads_dir = "../uploads" | |
os.makedirs(uploads_dir, exist_ok=True) | |
pdf_path = os.path.join(uploads_dir, filename) | |
with open(pdf_path, 'wb') as f: | |
f.write(pdf_content) | |
# Generate download URL | |
base_url = os.getenv("PUBLIC_BASE_URL", "http://localhost:8000") | |
download_url = f"{base_url}/uploads/{filename}" | |
# Send PDF via WhatsApp media | |
success = send_whatsjet_message( | |
phone_number, | |
f"📄 *{product_name} - Product Information*\n\nHere's the detailed product information in PDF format.", | |
media_type="application/pdf", | |
media_path=pdf_path, | |
filename=filename | |
) | |
# Also send direct download link as backup | |
if success: | |
message = ( | |
f"📄 *{product_name} - Product Information*\n\n" | |
"📎 [Direct Download Link]({download_url})\n\n" | |
"💬 *If the PDF didn't download, use the link above*\n" | |
"Type 'main' to return to main menu." | |
) | |
send_whatsjet_message(phone_number, message) | |
else: | |
# If media send failed, send only the link | |
message = ( | |
f"📄 *{product_name} - Product Information*\n\n" | |
"📎 [Download Product PDF]({download_url})\n\n" | |
"💬 *Click the link above to download the product information*\n" | |
"Type 'main' to return to main menu." | |
) | |
send_whatsjet_message(phone_number, message) | |
except Exception as e: | |
logger.error(f"Error sending individual product PDF: {e}") | |
send_whatsjet_message(phone_number, | |
"❌ Error generating product PDF. Please try again or contact our sales team for assistance.") | |
# --- WhatsJet Message Sending --- | |
def split_message_for_whatsapp(message: str, max_length: int = 1000) -> list: | |
"""Split a long message into chunks for WhatsApp (max 1000 chars per message).""" | |
return [message[i:i+max_length] for i in range(0, len(message), max_length)] | |
def send_whatsjet_message(phone_number: str, message: str, media_type: str = None, media_path: str = None, filename: str = None) -> bool: | |
"""Send a message using WhatsJet API with optional media attachment""" | |
if not all([WHATSJET_API_URL, WHATSJET_VENDOR_UID, WHATSJET_API_TOKEN]): | |
logger.error("[WhatsJet] Missing environment variables.") | |
return False | |
url = f"{WHATSJET_API_URL}/{WHATSJET_VENDOR_UID}/contact/send-message?token={WHATSJET_API_TOKEN}" | |
# Handle media messages | |
if media_type and media_path: | |
try: | |
with open(media_path, 'rb') as f: | |
media_content = f.read() | |
media_b64 = base64.b64encode(media_content).decode('utf-8') | |
payload = { | |
"phone_number": phone_number, | |
"message_body": message, | |
'media_type': media_type, | |
'media_content': media_b64, | |
'media_filename': filename or os.path.basename(media_path) | |
} | |
# Send message with increased timeout | |
try: | |
response = httpx.post( | |
url, | |
json=payload, | |
timeout=15 | |
) | |
response.raise_for_status() | |
logger.info(f"[WhatsJet] Media message sent successfully to {phone_number}") | |
return True | |
except Exception as e: | |
logger.error(f"[WhatsJet] Exception sending media message: {e}") | |
return False | |
except Exception as e: | |
logger.error(f"[WhatsJet] Exception preparing media message: {str(e)}") | |
return False | |
# Handle text messages | |
if not message.strip(): | |
return True # Don't send empty messages | |
for chunk in split_message_for_whatsapp(message): | |
try: | |
payload = {"phone_number": phone_number, "message_body": chunk} | |
# Send message with increased timeout | |
try: | |
response = httpx.post( | |
url, | |
json=payload, | |
timeout=15 | |
) | |
response.raise_for_status() | |
logger.info(f"[WhatsJet] Text chunk sent successfully to {phone_number}") | |
except Exception as e: | |
logger.error(f"[WhatsJet] Exception sending text chunk: {e}") | |
return False | |
except Exception as e: | |
logger.error(f"[WhatsJet] Exception preparing text chunk: {str(e)}") | |
return False | |
logger.info(f"[WhatsJet] Successfully sent complete text message to {phone_number}") | |
return True | |
def send_whatsjet_media_image_only(phone_number: str, image_url: str, filename: str = None) -> bool: | |
"""Send an image only (no caption) using WhatsJet's /contact/send-media-message endpoint.""" | |
if not all([WHATSJET_API_URL, WHATSJET_VENDOR_UID, WHATSJET_API_TOKEN]): | |
logger.error("[WhatsJet] Missing environment variables for media message.") | |
return False | |
url = f"{WHATSJET_API_URL}/{WHATSJET_VENDOR_UID}/contact/send-media-message" | |
headers = { | |
"Authorization": f"Bearer {WHATSJET_API_TOKEN}", | |
"Content-Type": "application/json" | |
} | |
payload = { | |
"phone_number": phone_number, | |
"media_type": "image", | |
"media_url": image_url | |
} | |
if filename: | |
payload["file_name"] = filename | |
try: | |
logger.info(f"[WhatsJet] Sending image only: {payload}") | |
response = httpx.post(url, json=payload, headers=headers, timeout=30) | |
logger.info(f"[WhatsJet] Image only response status: {response.status_code}") | |
logger.info(f"[WhatsJet] Image only response body: {response.text[:500]}...") | |
response.raise_for_status() | |
logger.info(f"[WhatsJet] Image only sent successfully to {phone_number}") | |
return True | |
except Exception as e: | |
logger.error(f"[WhatsJet] Exception sending image only: {e}") | |
return False | |
# --- Health Check Endpoint --- | |
async def health_check(): | |
"""Health check endpoint""" | |
return { | |
"status": "healthy", | |
"timestamp": datetime.now().isoformat(), | |
"products_loaded": len(products_df) if products_df is not None else 0, | |
"openai_available": bool(OPENAI_API_KEY), | |
"whatsjet_configured": bool(all([WHATSJET_API_URL, WHATSJET_VENDOR_UID, WHATSJET_API_TOKEN])) | |
} | |
async def test_voice(): | |
"""Test endpoint to check voice processing logic""" | |
return { | |
"voice_detection": { | |
"audio_type": "audio" in ['audio', 'voice'], | |
"voice_type": "voice" in ['audio', 'voice'], | |
"media_audio": {'type': 'audio'}.get('type') == 'audio' | |
}, | |
"openai_available": bool(OPENAI_API_KEY), | |
"langdetect_available": True, | |
"deep_translator_available": True | |
} | |
async def get_catalog(): | |
"""Serve the complete product catalog PDF""" | |
try: | |
catalog_path = "static/Hydropex.pdf" | |
if os.path.exists(catalog_path): | |
return FileResponse( | |
catalog_path, | |
media_type="application/pdf", | |
filename="Apex_Biotical_Veterinary_Catalog.pdf" | |
) | |
else: | |
raise HTTPException(status_code=404, detail="Catalog PDF not found") | |
except Exception as e: | |
logger.error(f"Error serving catalog: {e}") | |
raise HTTPException(status_code=500, detail="Error serving catalog") | |
async def root(): | |
return """ | |
<h2>Apex Biotical Veterinary WhatsApp Assistant</h2> | |
<p>The Assistant is running! Use the API endpoints for WhatsApp integration.</p> | |
<ul> | |
<h2>Apex Biotical Veterinary WhatsApp Bot</h2> | |
<p>The bot is running! Use the API endpoints for WhatsApp integration.</p> | |
<ul> | |
<li><b>POST /webhook</b> – WhatsApp webhook endpoint</li> | |
<li><b>GET /health</b> – Health check</li> | |
<li><b>GET /catalog</b> – Download product catalog PDF</li> | |
</ul> | |
""" | |
# --- Webhook Endpoint for WhatsApp/WhatsJet --- | |
async def webhook(request: Request): | |
"""Handle incoming WhatsApp/WhatsJet webhook messages""" | |
try: | |
data = await request.json() | |
logger.info(f"[Webhook] Incoming data: {data}") | |
# WhatsJet/Custom format | |
if isinstance(data, dict) and 'contact' in data and 'message' in data: | |
from_number = str(data['contact'].get('phone_number', '')).replace('+', '').replace(' ', '') | |
msg = data['message'] | |
# Robust media type extraction | |
media = msg.get('media', {}) if isinstance(msg, dict) else {} | |
media_type = None | |
if isinstance(media, dict): | |
media_type = media.get('type') | |
# If media is a list or None, media_type stays None | |
# Check for voice/audio messages first (they might not have body) | |
if isinstance(msg, dict) and (msg.get('type') in ['audio', 'voice'] or media_type == 'audio'): | |
logger.info(f"[Webhook] Processing voice message from {from_number}") | |
await process_incoming_message(from_number, msg) | |
return Response(status_code=200) | |
# Ignore status updates and messages without body (only for non-voice messages) | |
if not isinstance(msg, dict) or msg.get('body') is None: | |
return Response(status_code=200) | |
# Ignore specific status updates | |
if msg.get('status') in ['delivered', 'sent', 'read', 'failed']: | |
return Response(status_code=200) | |
# Process actual message | |
await process_incoming_message(from_number, msg) | |
return Response(status_code=200) | |
# WhatsApp Cloud API format | |
if isinstance(data, dict) and 'entry' in data and isinstance(data['entry'], list): | |
for entry in data['entry']: | |
if not isinstance(entry, dict): | |
logger.error(f"[Webhook] entry is not a dict: {type(entry)}") | |
continue | |
changes = entry.get('changes', []) | |
if not isinstance(changes, list): | |
logger.error(f"[Webhook] changes is not a list: {type(changes)}") | |
continue | |
for change in changes: | |
if not isinstance(change, dict): | |
logger.error(f"[Webhook] change is not a dict: {type(change)}") | |
continue | |
value = change.get('value', {}) | |
if not isinstance(value, dict): | |
logger.error(f"[Webhook] value is not a dict: {type(value)}") | |
continue | |
messages = value.get('messages', []) | |
if not isinstance(messages, list): | |
logger.error(f"[Webhook] messages is not a list: {type(messages)}") | |
continue | |
for message in messages: | |
if not isinstance(message, dict): | |
logger.error(f"[Webhook] message is not a dict: {type(message)}") | |
continue | |
from_number = message.get('from', '') | |
# Ignore status updates | |
if message.get('type') == 'status': | |
continue | |
# Convert WhatsApp format to our format | |
msg = { | |
'body': message.get('text', {}).get('body', ''), | |
'type': message.get('type', 'text'), | |
'media': message.get('audio') or message.get('voice') or message.get('image') or message.get('document') | |
} | |
await process_incoming_message(from_number, msg) | |
return Response(status_code=200) | |
logger.warning(f"[Webhook] Unrecognized or malformed payload format: {type(data)}") | |
return Response(status_code=400) | |
except Exception as e: | |
logger.error(f"[Webhook] Error: {e}") | |
import traceback | |
logger.error(f"[Webhook] Traceback: {traceback.format_exc()}") | |
return Response(status_code=500) | |
def map_spoken_number_to_digit(text: str) -> str: | |
""" | |
Enhanced number mapping for voice input - supports both English and Urdu number systems | |
Handles various transcription errors and number formats | |
""" | |
if not text: | |
return "" | |
# Clean and normalize the text | |
text_lower = text.lower().strip() | |
text_clean = re.sub(r'[^\w\s]', '', text_lower) | |
# Comprehensive English number mappings | |
english_numbers = { | |
# Basic numbers | |
'one': '1', 'two': '2', 'three': '3', 'four': '4', 'five': '5', | |
'six': '6', 'seven': '7', 'eight': '8', 'nine': '9', 'ten': '10', | |
'eleven': '11', 'twelve': '12', 'thirteen': '13', 'fourteen': '14', 'fifteen': '15', | |
'sixteen': '16', 'seventeen': '17', 'eighteen': '18', 'nineteen': '19', 'twenty': '20', | |
'twenty one': '21', 'twenty two': '22', 'twenty three': '23', | |
# Common transcription errors | |
'won': '1', 'to': '2', 'too': '2', 'tree': '3', 'free': '3', 'for': '4', 'fiv': '5', | |
'sik': '6', 'sat': '7', 'ath': '8', 'nau': '9', 'das': '10', | |
'che': '3', 'fir': '4', 'fiv': '5', 'sik': '6', 'sat': '7', 'ath': '8', 'nau': '9', | |
# Ordinal numbers | |
'first': '1', 'second': '2', 'third': '3', 'fourth': '4', 'fifth': '5', | |
'sixth': '6', 'seventh': '7', 'eighth': '8', 'ninth': '9', 'tenth': '10', | |
# Menu variations | |
'option one': '1', 'option two': '2', 'option three': '3', 'option four': '4', 'option five': '5', | |
'number one': '1', 'number two': '2', 'number three': '3', 'number four': '4', 'number five': '5', | |
'menu one': '1', 'menu two': '2', 'menu three': '3', 'menu four': '4', 'menu five': '5', | |
'choice one': '1', 'choice two': '2', 'choice three': '3', 'choice four': '4', 'choice five': '5', | |
# Common transcription errors for menu selections | |
'opium one': '1', 'opium two': '2', 'opium three': '3', 'opium four': '4', 'opium five': '5', | |
'opium numara one': '1', 'opium numara two': '2', 'opium numara three': '3', | |
'opium number one': '1', 'opium number two': '2', 'opium number three': '3', | |
'opium number 1': '1', 'opium number 2': '2', 'opium number 3': '3', | |
# Direct digits | |
'1': '1', '2': '2', '3': '3', '4': '4', '5': '5', '6': '6', '7': '7', '8': '8', '9': '9', '10': '10', | |
'11': '11', '12': '12', '13': '13', '14': '14', '15': '15', '16': '16', '17': '17', '18': '18', '19': '19', '20': '20', | |
'21': '21', '22': '22', '23': '23' | |
} | |
# Comprehensive Urdu number mappings (Roman Urdu and Urdu script) | |
urdu_numbers = { | |
# Roman Urdu numbers | |
'aik': '1', 'ek': '1', 'do': '2', 'teen': '3', 'char': '4', 'panch': '5', | |
'che': '6', 'sat': '7', 'ath': '8', 'nau': '9', 'das': '10', | |
'gyara': '11', 'bara': '12', 'tera': '13', 'choda': '14', 'pandra': '15', | |
'sola': '16', 'satara': '17', 'athara': '18', 'unnees': '19', 'bees': '20', | |
'ikkees': '21', 'baees': '22', 'tees': '23', | |
# Urdu script numbers | |
'ایک': '1', 'دو': '2', 'تین': '3', 'چار': '4', 'پانچ': '5', | |
'چھ': '6', 'سات': '7', 'آٹھ': '8', 'نو': '9', 'دس': '10', | |
'گیارہ': '11', 'بارہ': '12', 'تیرہ': '13', 'چودہ': '14', 'پندرہ': '15', | |
'سولہ': '16', 'سترہ': '17', 'اٹھارہ': '18', 'انیس': '19', 'بیس': '20', | |
'اکیس': '21', 'بائیس': '22', 'تئیس': '23', | |
# Menu variations in Urdu | |
'نمبر ایک': '1', 'نمبر دو': '2', 'نمبر تین': '3', 'نمبر چار': '4', 'نمبر پانچ': '5', | |
'آپشن ایک': '1', 'آپشن دو': '2', 'آپشن تین': '3', 'آپشن چار': '4', 'آپشن پانچ': '5', | |
'اختیار ایک': '1', 'اختیار دو': '2', 'اختیار تین': '3', 'اختیار چار': '4', 'اختیار پانچ': '5', | |
# Common transcription errors in Urdu | |
'numara': 'number', 'numbara': 'number', 'numbra': 'number', | |
'numbra one': '1', 'numbra two': '2', 'numbra three': '3', 'numbra 1': '1', 'numbra 2': '2', 'numbra 3': '3', | |
'aik': '1', 'ek': '1', 'do': '2', 'teen': '3', 'char': '4', 'panch': '5', | |
'che': '6', 'sat': '7', 'ath': '8', 'nau': '9', 'das': '10' | |
} | |
# Combined mappings | |
all_numbers = {**english_numbers, **urdu_numbers} | |
# First, try exact matches | |
if text_lower in all_numbers: | |
return all_numbers[text_lower] | |
# Try pattern matching for common transcription errors - improved patterns | |
patterns = [ | |
(r'opium\s+numara?\s*(\d+)', r'\1'), # "opium numara 1" -> "1" | |
(r'opium\s+number?\s*(\d+)', r'\1'), # "opium number 1" -> "1" | |
(r'opium\s+(\d+)', r'\1'), # "opium 1" -> "1" | |
(r'numara?\s*(\d+)', r'\1'), # "numara 1" -> "1" | |
(r'number?\s*(\d+)\s*[.!]?', r'\1'), # "number 1" or "number 1." -> "1" - improved | |
(r'option\s*(\d+)\s*[.!]?', r'\1'), # "option 1" or "option 1." -> "1" - improved | |
(r'choice\s*(\d+)\s*[.!]?', r'\1'), # "choice 1" or "choice 1." -> "1" - improved | |
(r'menu\s*(\d+)\s*[.!]?', r'\1'), # "menu 1" or "menu 1." -> "1" - improved | |
(r'(\d+)\s*[.!]?\s*$', r'\1'), # "22." -> "22" - improved | |
(r'^(\d+)\s*[.!]?\s*', r'\1'), # "22." -> "22" - improved | |
] | |
for pattern, replacement in patterns: | |
match = re.search(pattern, text_lower) | |
if match: | |
return match.group(1) | |
# Try fuzzy matching for close matches | |
for number_word, digit in all_numbers.items(): | |
if len(number_word) > 2: # Only fuzzy match longer words | |
if fuzz.ratio(text_lower, number_word) > 80: | |
logger.info(f"Fuzzy matched '{text_lower}' to '{number_word}' -> '{digit}'") | |
return digit | |
# Try extracting numbers from mixed text | |
number_match = re.search(r'(\d+)', text_clean) | |
if number_match: | |
return number_match.group(1) | |
# If no match found, return original text | |
logger.warning(f"No number mapping found for: '{text}'") | |
return text | |
def process_intelligent_voice_command(message_body: str, current_state: str, user_context: dict) -> str: | |
""" | |
Process voice commands intelligently for all menu states | |
Maps voice commands to appropriate menu selections consistently with text logic | |
""" | |
if not message_body: | |
return message_body | |
# Clean and normalize the input | |
cleaned_text = message_body.strip().lower() | |
logger.info(f"[Voice Command] Processing: '{message_body}' in state: {current_state}") | |
# First, check for navigation commands (main, menu, back, etc.) | |
# Make this more precise to avoid false positives from transcription errors | |
navigation_commands = [ | |
'main', 'menu', 'start', 'home', 'back', 'return', 'go back', 'main menu', | |
'مین', 'مینو', 'شروع', 'گھر', 'واپس', 'ریٹرن', 'مین مینو', | |
'main menu', 'main menu please', 'go to main', 'back to main' | |
] | |
# Check for exact navigation commands or commands that start/end with navigation words | |
for cmd in navigation_commands: | |
# Check for exact match | |
if cleaned_text == cmd: | |
logger.info(f"[Voice Command] Exact navigation command detected: '{message_body}' -> 'main'") | |
return 'main' | |
# Check for commands that start with navigation word followed by space | |
if cleaned_text.startswith(cmd + ' '): | |
logger.info(f"[Voice Command] Navigation command at start detected: '{message_body}' -> 'main'") | |
return 'main' | |
# Check for commands that end with navigation word preceded by space | |
if cleaned_text.endswith(' ' + cmd): | |
logger.info(f"[Voice Command] Navigation command at end detected: '{message_body}' -> 'main'") | |
return 'main' | |
# Check for standalone navigation commands (surrounded by spaces or at boundaries) | |
if re.search(r'\b' + re.escape(cmd) + r'\b', cleaned_text): | |
# Additional check: make sure it's not part of a larger word | |
words = cleaned_text.split() | |
if cmd in words: | |
logger.info(f"[Voice Command] Navigation command as word detected: '{message_body}' -> 'main'") | |
return 'main' | |
# Handle number patterns more comprehensively | |
# Pattern 1: "Number X" or "Number X." or "Number X!" - more flexible | |
number_pattern1 = re.search(r'number\s*(\d+)\s*[.!]?', cleaned_text) | |
if number_pattern1: | |
number = number_pattern1.group(1) | |
logger.info(f"[Voice Command] Number pattern 1 detected: '{message_body}' -> '{number}'") | |
return number | |
# Pattern 2: "Option X" or "Option X." or "Option X!" - more flexible | |
option_pattern = re.search(r'option\s*(\d+)\s*[.!]?', cleaned_text) | |
if option_pattern: | |
number = option_pattern.group(1) | |
logger.info(f"[Voice Command] Option pattern detected: '{message_body}' -> '{number}'") | |
return number | |
# Pattern 3: "Product X" or "Product X." or "Product X!" - more flexible | |
product_pattern = re.search(r'product\s*(\d+)\s*[.!]?', cleaned_text) | |
if product_pattern: | |
number = product_pattern.group(1) | |
logger.info(f"[Voice Command] Product pattern detected: '{message_body}' -> '{number}'") | |
return number | |
# Pattern 4: "Category X" or "Category X." or "Category X!" - more flexible | |
category_pattern = re.search(r'category\s*(\d+)\s*[.!]?', cleaned_text) | |
if category_pattern: | |
number = category_pattern.group(1) | |
logger.info(f"[Voice Command] Category pattern detected: '{message_body}' -> '{number}'") | |
return number | |
# Pattern 5: Just a number at the end or beginning - more flexible | |
# Look for numbers at the end of the sentence | |
number_pattern2 = re.search(r'(\d+)\s*[.!]?\s*$', cleaned_text) | |
if number_pattern2: | |
number = number_pattern2.group(1) | |
logger.info(f"[Voice Command] Number pattern 2 detected: '{message_body}' -> '{number}'") | |
return number | |
# Pattern 6: Just a number at the beginning - more flexible | |
number_pattern3 = re.search(r'^(\d+)\s*[.!]?\s*', cleaned_text) | |
if number_pattern3: | |
number = number_pattern3.group(1) | |
logger.info(f"[Voice Command] Number pattern 3 detected: '{message_body}' -> '{number}'") | |
return number | |
# Pattern 7: Standalone number | |
if cleaned_text.isdigit(): | |
logger.info(f"[Voice Command] Standalone number detected: '{message_body}' -> '{message_body}'") | |
return message_body | |
# Pattern 8: Extract any number from the text (fallback) | |
any_number_pattern = re.search(r'(\d+)', cleaned_text) | |
if any_number_pattern: | |
number = any_number_pattern.group(1) | |
logger.info(f"[Voice Command] Any number pattern detected: '{message_body}' -> '{number}'") | |
return number | |
# Handle spoken numbers in English and Urdu | |
spoken_number_mappings = { | |
# English spoken numbers | |
'one': '1', 'first': '1', '1st': '1', | |
'two': '2', 'second': '2', '2nd': '2', 'to': '2', 'too': '2', | |
'three': '3', 'third': '3', '3rd': '3', 'tree': '3', | |
'four': '4', 'fourth': '4', '4th': '4', 'for': '4', | |
'five': '5', 'fifth': '5', '5th': '5', | |
'six': '6', 'sixth': '6', '6th': '6', | |
'seven': '7', 'seventh': '7', '7th': '7', | |
'eight': '8', 'eighth': '8', '8th': '8', | |
'nine': '9', 'ninth': '9', '9th': '9', | |
'ten': '10', 'tenth': '10', '10th': '10', | |
'eleven': '11', 'eleventh': '11', '11th': '11', | |
'twelve': '12', 'twelfth': '12', '12th': '12', | |
'thirteen': '13', 'thirteenth': '13', '13th': '13', | |
'fourteen': '14', 'fourteenth': '14', '14th': '14', | |
'fifteen': '15', 'fifteenth': '15', '15th': '15', | |
'sixteen': '16', 'sixteenth': '16', '16th': '16', | |
'seventeen': '17', 'seventeenth': '17', '17th': '17', | |
'eighteen': '18', 'eighteenth': '18', '18th': '18', | |
'nineteen': '19', 'nineteenth': '19', '19th': '19', | |
'twenty': '20', 'twentieth': '20', '20th': '20', | |
'twenty one': '21', 'twenty-first': '21', '21st': '21', | |
'twenty two': '22', 'twenty-second': '22', '22nd': '22', | |
'twenty three': '23', 'twenty-third': '23', '23rd': '23', | |
# Urdu spoken numbers | |
'ایک': '1', 'پہلا': '1', 'پہلی': '1', | |
'دو': '2', 'دوسرا': '2', 'دوسری': '2', | |
'تین': '3', 'تیسرا': '3', 'تیسری': '3', | |
'چار': '4', 'چوتھا': '4', 'چوتھی': '4', | |
'پانچ': '5', 'پانچواں': '5', 'پانچویں': '5', | |
'چھ': '6', 'چھٹا': '6', 'چھٹی': '6', | |
'سات': '7', 'ساتواں': '7', 'ساتویں': '7', | |
'آٹھ': '8', 'آٹھواں': '8', 'آٹھویں': '8', | |
'نو': '9', 'نواں': '9', 'نویں': '9', | |
'دس': '10', 'دسواں': '10', 'دسویں': '10', | |
'گیارہ': '11', 'گیارہواں': '11', 'گیارہویں': '11', | |
'بارہ': '12', 'بارہواں': '12', 'بارہویں': '12', | |
'تیرہ': '13', 'تیرہواں': '13', 'تیرہویں': '13', | |
'چودہ': '14', 'چودہواں': '14', 'چودہویں': '14', | |
'پندرہ': '15', 'پندرہواں': '15', 'پندرہویں': '15', | |
'سولہ': '16', 'سولہواں': '16', 'سولہویں': '16', | |
'سترہ': '17', 'سترہواں': '17', 'سترہویں': '17', | |
'اٹھارہ': '18', 'اٹھارہواں': '18', 'اٹھارہویں': '18', | |
'انیس': '19', 'انیسواں': '19', 'انیسویں': '19', | |
'بیس': '20', 'بیسواں': '20', 'بیسویں': '20', | |
'اکیس': '21', 'اکیسواں': '21', 'اکیسویں': '21', | |
'بائیس': '22', 'بائیسواں': '22', 'بائیسویں': '22', | |
'تئیس': '23', 'تئیسواں': '23', 'تئیسویں': '23', | |
} | |
# Check for spoken numbers | |
for spoken, digit in spoken_number_mappings.items(): | |
if spoken in cleaned_text: | |
logger.info(f"[Voice Command] Spoken number detected: '{message_body}' -> '{digit}'") | |
return digit | |
# Handle common transcription errors and variations | |
transcription_fixes = { | |
'bye': 'hi', # Common transcription error for "hi" | |
'hi': 'hi', | |
'hello': 'hi', | |
'hey': 'hi', | |
'main': 'main', | |
'menu': 'main', | |
'start': 'main', | |
'home': 'main', | |
'back': 'main', | |
'return': 'main', | |
'go back': 'main', | |
'main menu': 'main', | |
'main menu please': 'main', | |
'go to main': 'main', | |
'back to main': 'main', | |
} | |
# Check for transcription fixes | |
for error, correction in transcription_fixes.items(): | |
if error in cleaned_text: | |
logger.info(f"[Voice Command] Transcription fix applied: '{message_body}' -> '{correction}'") | |
return correction | |
# If no pattern matches, return the original message for further processing | |
logger.info(f"[Voice Command] No specific pattern matched, returning original: '{message_body}'") | |
return message_body | |
async def process_incoming_message(from_number: str, msg: dict): | |
"""Process incoming message and send appropriate response with full intelligence""" | |
try: | |
# Safety check for message body | |
message_body = msg.get('body') if isinstance(msg, dict) else None | |
message_type = msg.get('type', 'text') if isinstance(msg, dict) else 'text' | |
reply_language = msg.get('reply_language', 'en') # Default to English | |
# Robust media type extraction | |
media = msg.get('media', {}) if isinstance(msg, dict) else {} | |
media_type = None | |
if isinstance(media, dict): | |
media_type = media.get('type') | |
# If media is a list or None, media_type stays None | |
# Handle voice messages FIRST - before checking message_body | |
if message_type in ['audio', 'voice'] or media_type == 'audio': | |
logger.info(f"[Process] Processing voice message from {from_number}") | |
await handle_voice_message_complete(from_number, msg) | |
return | |
# For text messages, check if body exists | |
if message_body is None: | |
logger.info(f"[Process] Skipping message from {from_number} - no body content") | |
return | |
message_body = message_body.strip() | |
logger.info(f"[Process] Processing {message_type} message from {from_number}: {message_body}") | |
# Get user context | |
user_context = context_manager.get_context(from_number) | |
current_state = user_context.get('current_state', 'main_menu') | |
# Update context with last message for intelligent responses | |
context_manager.update_context(from_number, last_message=message_body) | |
# Debug logging | |
logger.info(f"[Process] Current state: {current_state}, Message: '{message_body}' from {from_number}") | |
# Handle text messages | |
if not message_body: | |
return | |
# Check for greetings with multilingual support | |
if is_greeting(message_body): | |
# Check if user is currently in AI chat mode - if so, don't trigger menu mode | |
if current_state == 'ai_chat_mode': | |
logger.info(f"[Process] Greeting detected in AI chat mode, treating as AI query: {message_body}") | |
# Treat greeting as a general query in AI chat mode | |
await handle_general_query_with_ai(from_number, message_body, user_context, reply_language) | |
return | |
else: | |
# Only trigger menu mode if not in AI chat mode | |
welcome_msg = generate_veterinary_welcome_message() | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
return | |
# Check for "main" command - now works for both text and voice | |
# First check if it's a navigation command from voice (but not in AI chat mode) | |
if current_state != 'main_menu' and current_state != 'ai_chat_mode': # Only check for main if not already in main menu and not in AI chat mode | |
mapped_navigation = process_intelligent_voice_command(message_body, current_state, user_context) | |
if mapped_navigation == 'main': | |
logger.info(f"[Process] Navigation command detected: '{message_body}' -> 'main'") | |
welcome_msg = generate_veterinary_welcome_message() | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
return | |
# Also check for text-based main commands | |
if message_body.lower() in ['main', 'menu', 'start', 'home', 'back']: | |
welcome_msg = generate_veterinary_welcome_message() | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
return | |
# 🎯 PRIORITY: Check for product names FIRST - works from ANY menu state | |
# This ensures users can say product names like "hydropex", "respira aid plus", etc. from any menu | |
logger.info(f"[Process] Checking for product name in message: '{message_body}' from state: {current_state}") | |
products = get_veterinary_product_matches(message_body) | |
# --- NEW LOGIC: Check for exact match first --- | |
normalized_input = normalize(message_body).lower().strip() | |
exact_match = None | |
for product in products: | |
product_name = product.get('Product Name', '') | |
normalized_product_name = normalize(product_name).lower().strip() | |
if normalized_product_name == normalized_input: | |
exact_match = product | |
break | |
if exact_match: | |
logger.info(f"[Process] Exact product match found: {exact_match.get('Product Name', 'Unknown')}") | |
context_manager.update_context( | |
from_number, | |
current_product=exact_match, | |
current_state='product_inquiry', | |
current_menu='product_inquiry', | |
current_menu_options=list(MENU_CONFIG['product_inquiry']['option_descriptions'].values()) | |
) | |
# Only send one reply: image+caption if possible, else text | |
await send_product_image_with_caption(from_number, exact_match, user_context) | |
return | |
# --- END NEW LOGIC --- | |
if products: | |
logger.info(f"[Process] Product name detected: '{message_body}' -> Found {len(products)} products") | |
# If single product found, show it directly | |
if len(products) == 1: | |
selected_product = products[0] | |
product_name = selected_product.get('Product Name', 'Unknown') | |
logger.info(f"[Process] Single product found: {product_name}") | |
context_manager.update_context( | |
from_number, | |
current_product=selected_product, | |
current_state='product_inquiry', | |
current_menu='product_inquiry', | |
current_menu_options=list(MENU_CONFIG['product_inquiry']['option_descriptions'].values()) | |
) | |
# Get updated context with last message | |
context = context_manager.get_context(from_number) | |
response = generate_veterinary_product_response(selected_product, context) | |
# Translate response if needed | |
if reply_language == 'ur': | |
try: | |
translated_response = GoogleTranslator(source='auto', target='ur').translate(response) | |
send_whatsjet_message(from_number, translated_response) | |
except Exception as e: | |
logger.error(f"[AI] Translation error: {e}") | |
send_whatsjet_message(from_number, response) | |
else: | |
send_whatsjet_message(from_number, response) | |
# Add to conversation history | |
context_manager.add_to_history(from_number, message_body, response) | |
else: | |
# Enhanced "not found" response with veterinary suggestions | |
message = ( | |
"❌ *Product Not Found*\n\n" | |
f"🔍 *We couldn't find '{message_body}' in our veterinary database.*\n\n" | |
"💡 *Try these alternatives:*\n" | |
"• Check spelling (e.g., 'Hydropex' not 'Hydro pex')\n" | |
"• Search by symptoms (e.g., 'respiratory', 'liver support')\n" | |
"• Search by category (e.g., 'antibiotic', 'vitamin')\n" | |
"• Search by species (e.g., 'poultry', 'livestock')\n\n" | |
"🏥 *Popular Veterinary Products:*\n" | |
"• Hydropex (Electrolyte supplement)\n" | |
"• Heposel (Liver tonic)\n" | |
"• Bromacid (Respiratory support)\n" | |
"• Tribiotic (Antibiotic)\n" | |
"• Symodex (Multivitamin)\n\n" | |
"💬 *Type 'main' to return to main menu or try another search.*" | |
) | |
# Translate response if needed | |
if reply_language == 'ur': | |
try: | |
translated_message = GoogleTranslator(source='auto', target='ur').translate(message) | |
send_whatsjet_message(from_number, translated_message) | |
except Exception as e: | |
logger.error(f"[AI] Translation error: {e}") | |
send_whatsjet_message(from_number, message) | |
else: | |
send_whatsjet_message(from_number, message) | |
# Handle menu state-based processing | |
# Check if this is a menu selection | |
if current_state in ['main_menu', 'category_selection_menu', 'category_products_menu', 'all_products_menu', 'product_inquiry']: | |
# Validate menu selection | |
is_valid, error_msg = validate_menu_selection(message_body, current_state, user_context) | |
if is_valid: | |
# Handle valid menu selection | |
if current_state == 'main_menu': | |
if message_body == '1': | |
# Search Products | |
await display_all_products(from_number) | |
elif message_body == '2': | |
# Browse Categories | |
categories = get_all_categories() | |
if categories: | |
context_manager.update_context( | |
from_number, | |
current_state='category_selection_menu', | |
current_menu='category_selection_menu', | |
current_menu_options=categories, | |
available_categories=categories | |
) | |
message = "📁 *Select a Category:*\n\n" | |
for i, category in enumerate(categories, 1): | |
message += f"{format_number_with_emoji(i)} {category}\n" | |
message += "\n💬 Type a category number or 'main' to return to main menu." | |
send_whatsjet_message(from_number, message) | |
else: | |
send_whatsjet_message(from_number, "❌ No categories available. Type 'main' to return to main menu.") | |
elif message_body == '3': | |
# Download Catalog | |
await send_catalog_pdf(from_number) | |
elif message_body == '4': | |
# AI Chat Mode | |
context_manager.update_context( | |
from_number, | |
current_state='ai_chat_mode', | |
current_menu='ai_chat_mode', | |
current_menu_options=['main'] | |
) | |
message = ( | |
"🤖 *Veterinary AI Assistant Activated*\n\n" | |
"I'm now in AI chat mode. You can ask me:\n" | |
"• Veterinary questions\n" | |
"• Product recommendations\n" | |
"• Treatment advice\n" | |
"• General inquiries\n\n" | |
"💬 *Type 'main' to return to main menu.*" | |
) | |
send_whatsjet_message(from_number, message) | |
elif current_state == 'category_selection_menu': | |
await handle_category_selection(message_body, from_number) | |
elif current_state == 'category_products_menu': | |
# Handle product selection from category | |
available_products = user_context.get('available_products', []) | |
if message_body.isdigit() and 1 <= int(message_body) <= len(available_products): | |
selected_product = available_products[int(message_body) - 1] | |
context_manager.update_context( | |
from_number, | |
current_product=selected_product, | |
current_state='product_inquiry', | |
current_menu='product_inquiry', | |
current_menu_options=list(MENU_CONFIG['product_inquiry']['option_descriptions'].values()) | |
) | |
response = generate_veterinary_product_response(selected_product, user_context) | |
send_whatsjet_message(from_number, response) | |
else: | |
send_whatsjet_message(from_number, get_menu_validation_message(current_state, user_context)) | |
elif current_state == 'all_products_menu': | |
# Handle product selection from all products | |
if products_df is not None and not products_df.empty: | |
all_products = products_df.to_dict('records') | |
if message_body.isdigit() and 1 <= int(message_body) <= len(all_products): | |
selected_product = all_products[int(message_body) - 1] | |
context_manager.update_context( | |
from_number, | |
current_product=selected_product, | |
current_state='product_inquiry', | |
current_menu='product_inquiry', | |
current_menu_options=list(MENU_CONFIG['product_inquiry']['option_descriptions'].values()) | |
) | |
response = generate_veterinary_product_response(selected_product, user_context) | |
send_whatsjet_message(from_number, response) | |
else: | |
send_whatsjet_message(from_number, get_menu_validation_message(current_state, user_context)) | |
else: | |
send_whatsjet_message(from_number, "❌ No products available. Type 'main' to return to main menu.") | |
elif current_state == 'product_inquiry': | |
await handle_veterinary_product_followup(message_body, from_number) | |
else: | |
# Invalid menu selection | |
send_whatsjet_message(from_number, error_msg) | |
elif current_state == 'contact_request': | |
await handle_contact_request_response(from_number, message_body) | |
elif current_state == 'availability_request': | |
await handle_availability_request_response(from_number, message_body) | |
elif current_state == 'ai_chat_mode': | |
await handle_ai_chat_mode(from_number, message_body, reply_language) | |
else: | |
# Default: treat as general query | |
await handle_general_query_with_ai(from_number, message_body, user_context, reply_language) | |
except Exception as e: | |
logger.error(f"Error in process_incoming_message: {e}") | |
# Instead of sending a generic error, return to main menu | |
welcome_msg = generate_veterinary_welcome_message() | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context(from_number, current_state='main_menu', current_menu='main_menu', current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values())) | |
async def handle_general_query_with_ai(from_number: str, query: str, user_context: dict, reply_language: str = 'en'): | |
"""Handle general queries with OpenAI intelligence""" | |
try: | |
if not OPENAI_API_KEY: | |
send_whatsjet_message(from_number, | |
"❌ AI assistance is not available. Please try searching for a specific product or type 'main' for the menu.") | |
return | |
# Create context-aware prompt | |
current_state = user_context.get('current_state', 'main_menu') | |
current_product = user_context.get('current_product') | |
# --- SYSTEM PROMPT FOR GENERAL/OUT-OF-MENU QUERIES --- | |
# This prompt ensures the assistant answers professionally, accurately, and helpfully. | |
# If the query is about products, list all relevant products (not just one) with brief details. | |
# If the query is general, provide a concise, expert veterinary answer. | |
# Clarify when a query is outside the menu system and offer to return to the main menu if needed. | |
prompt = f""" | |
You are a professional veterinary product assistant for Apex Biotical, helping users on WhatsApp. | |
Always answer in a clear, accurate, and helpful manner. | |
User Query: "{query}" | |
Current State: {current_state} | |
Current Product: {current_product.get('Product Name', 'None') if current_product else 'None'} | |
If the user asks about products (e.g., 'poultry products', 'respiratory medicine'), list ALL relevant products from the database with a short description for each. If there are many, summarize or group them. | |
If the user asks a general veterinary question, provide a concise, expert answer. | |
If the query is outside the menu system, politely clarify and offer to return to the main menu (type 'main'). | |
Always keep responses professional, concise, and user-friendly. | |
""" | |
response = openai.ChatCompletion.create( | |
model="gpt-4o", | |
messages=[{"role": "user", "content": prompt}], | |
temperature=0.7, | |
max_tokens=300 | |
) | |
ai_response = response.choices[0].message.content.strip() | |
# Translate response if needed | |
if reply_language == 'ur': | |
try: | |
translated_response = GoogleTranslator(source='auto', target='ur').translate(ai_response) | |
send_whatsjet_message(from_number, translated_response) | |
except Exception as e: | |
logger.error(f"[AI] Translation error: {e}") | |
send_whatsjet_message(from_number, ai_response) | |
else: | |
send_whatsjet_message(from_number, ai_response) | |
# Add to conversation history | |
context_manager.add_to_history(from_number, query, ai_response) | |
except Exception as e: | |
logger.error(f"[AI] Error handling general query: {e}") | |
# Instead of sending a generic error, return to main menu | |
welcome_msg = generate_veterinary_welcome_message() | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
async def handle_contact_request(from_number: str): | |
"""Handle contact request""" | |
try: | |
message = ( | |
"📞 *Contact Information*\n\n" | |
"Please provide your details:\n" | |
"• Name and location\n" | |
"• Phone number\n" | |
"• Specific inquiry\n\n" | |
"💬 *Example:* Dr. Ali - Multan - Need consultation for liver disease\n\n" | |
"💬 *Type 'main' to return to the main menu.*" | |
) | |
send_whatsjet_message(from_number, message) | |
context_manager.update_context( | |
from_number, | |
current_state='contact_request', | |
current_menu='contact_request', | |
current_menu_options=['Provide contact details'] | |
) | |
except Exception as e: | |
logger.error(f"[Contact] Error handling contact request: {e}") | |
# Instead of sending a generic error, return to main menu | |
welcome_msg = generate_veterinary_welcome_message() | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
async def handle_contact_request_response(from_number: str, response: str): | |
"""Handle contact request response""" | |
try: | |
# Save contact inquiry | |
contact_data = { | |
'phone_number': from_number, | |
'inquiry': response, | |
'timestamp': datetime.now().isoformat() | |
} | |
# Ensure directory exists | |
os.makedirs('contacts', exist_ok=True) | |
with open('contacts/contact_inquiries.json', 'a', encoding='utf-8') as f: | |
f.write(json.dumps(contact_data, ensure_ascii=False) + '\n') | |
# Send inquiry to receiving number (admin) | |
receiving_number = "923068222219" | |
# Parse the response to separate name/location from details | |
response_lines = response.strip().split('\n') | |
if len(response_lines) >= 2: | |
name_location = response_lines[0].strip() | |
details = '\n'.join(response_lines[1:]).strip() | |
else: | |
# If only one line, assume it's all name/location | |
name_location = response.strip() | |
details = "No specific details provided" | |
inquiry_message = ( | |
f"📞 *Follow Up Inquiry*\n\n" | |
f"Name and Location: {name_location}\n" | |
f"Phone: {from_number}\n" | |
f"Details: {details}" | |
) | |
send_whatsjet_message(receiving_number, inquiry_message) | |
# Send confirmation to user | |
send_whatsjet_message(from_number, | |
"✅ Thank you! Your inquiry has been received. Our team will contact you soon.\n\n" | |
"Type 'main' to return to the main menu.") | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
except Exception as e: | |
logger.error(f"[Contact] Error handling contact response: {e}") | |
# Instead of sending a generic error, return to main menu | |
welcome_msg = generate_veterinary_welcome_message() | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
async def handle_availability_inquiry(from_number: str, user_context: dict): | |
"""Handle availability inquiry""" | |
try: | |
current_product = user_context.get('current_product') | |
if current_product: | |
product_name = current_product.get('Product Name', 'N/A') | |
message = ( | |
f"📦 *Availability Inquiry*\n\n" | |
f"Product: {product_name}\n\n" | |
"Please provide:\n" | |
"• Your name and location\n" | |
"• Required quantity\n" | |
"• Delivery preferences\n\n" | |
"💬 *Example:* Dr. Ali – Multan, 50 bottles\n\n" | |
"💬 *Type 'main' to return to the main menu.*" | |
) | |
send_whatsjet_message(from_number, message) | |
context_manager.update_context( | |
from_number, | |
current_state='availability_request', | |
current_menu='availability_request', | |
current_menu_options=['Provide availability details'] | |
) | |
else: | |
send_whatsjet_message(from_number, | |
"❌ No product selected. Please search for a product first.") | |
except Exception as e: | |
logger.error(f"[Availability] Error handling availability inquiry: {e}") | |
# Instead of sending a generic error, return to main menu | |
welcome_msg = generate_veterinary_welcome_message() | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
async def handle_availability_request_response(from_number: str, response: str): | |
"""Handle availability request response""" | |
try: | |
# Save availability inquiry | |
availability_data = { | |
'phone_number': from_number, | |
'inquiry': response, | |
'timestamp': datetime.now().isoformat() | |
} | |
# Ensure directory exists | |
os.makedirs('contacts', exist_ok=True) | |
with open('contacts/availability_inquiries.json', 'a', encoding='utf-8') as f: | |
f.write(json.dumps(availability_data, ensure_ascii=False) + '\n') | |
# Send inquiry to receiving number (admin) | |
receiving_number = "923068222219" | |
current_product = context_manager.get_context(from_number).get('current_product', {}) | |
product_name = current_product.get('Product Name', 'N/A') if current_product else 'N/A' | |
# Parse the response to extract name/location, quantity, and delivery preferences | |
response_lines = [line.strip() for line in response.strip().split('\n') if line.strip()] | |
name_location = "Not provided" | |
quantity = "Not specified" | |
delivery_preferences = "Not specified" | |
if len(response_lines) >= 1: | |
name_location = response_lines[0] | |
if len(response_lines) >= 2: | |
quantity = response_lines[1] | |
if len(response_lines) >= 3: | |
delivery_preferences = response_lines[2] | |
inquiry_message = ( | |
f"📦 *Product Availability Inquiry*\n\n" | |
f"Product: {product_name}\n" | |
f"Name and Location: {name_location}\n" | |
f"Quantity: {quantity}\n" | |
f"Delivery Preferences: {delivery_preferences}\n" | |
f"Phone: {from_number}" | |
) | |
send_whatsjet_message(receiving_number, inquiry_message) | |
# Send confirmation to user | |
send_whatsjet_message(from_number, | |
"✅ Thank you! Your availability inquiry has been received. Our sales team will contact you soon.\n\n" | |
"Type 'main' to return to the main menu.") | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
except Exception as e: | |
logger.error(f"[Availability] Error handling availability response: {e}") | |
# Instead of sending a generic error, return to main menu | |
welcome_msg = generate_veterinary_welcome_message() | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
def send_helpful_guidance(from_number: str, current_state: str): | |
try: | |
if current_state == 'all_products_menu': | |
send_whatsjet_message(from_number, | |
"📋 *Products Menu*\n\n" | |
"Select a product number (1-23) to view detailed information.\n" | |
"Type 'main' to return to the main menu.\n" | |
"You can also type a product name to search.") | |
elif current_state == 'product_inquiry': | |
send_whatsjet_message(from_number, | |
"📦 *Product Details*\n\n" | |
"Select an option:\n" | |
"1️⃣ Contact Sales\n" | |
"2️⃣ Check Availability\n" | |
"3️⃣ Back to Main Menu\n" | |
"Type 'main' to return to main menu.") | |
elif current_state == 'category_selection_menu': | |
send_whatsjet_message(from_number, | |
"📁 *Category Selection*\n\n" | |
"Select a category number to view products.\n" | |
"Type 'main' to return to main menu.") | |
elif current_state == 'category_products_menu': | |
send_whatsjet_message(from_number, | |
"📦 *Category Products*\n\n" | |
"Select a product number to view details.\n" | |
"Type 'main' to return to main menu.") | |
elif current_state == 'contact_request': | |
send_whatsjet_message(from_number, | |
"📞 *Contact Request*\n\n" | |
"Please provide your name, location, and quantity.\n" | |
"Format: 'Name - Location, Quantity'\n" | |
"Example: 'Dr. Ali - Multan, 50 bottles'") | |
elif current_state == 'availability_request': | |
send_whatsjet_message(from_number, | |
"📦 *Availability Inquiry*\n\n" | |
"Please provide your location and quantity.\n" | |
"Format: 'Location, Quantity'\n" | |
"Example: 'Multan, 50 bottles'") | |
else: | |
send_whatsjet_message(from_number, | |
"💬 *Main Menu*\n\n" | |
"Available options:\n" | |
"1️⃣ Search Veterinary Products\n" | |
"2️⃣ Browse Categories\n" | |
"3️⃣ Download Catalog\n\n" | |
"Select an option or ask about specific products.") | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
except Exception as e: | |
logger.error(f"Error sending helpful guidance: {e}") | |
welcome_msg = generate_veterinary_welcome_message() | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
def is_greeting(text): | |
""" | |
Enhanced greeting detection using fuzzy matching and universal patterns. | |
Can detect variations like "Hy", "Hii", "Hallo", etc. without hardcoding. | |
""" | |
if not text: | |
return False | |
text_lower = text.lower().strip() | |
# Core greeting patterns that can be extended with variations | |
core_greetings = { | |
'hello': ['hello', 'hallo', 'helo', 'hlo', 'hallo', 'heloo', 'helloo'], | |
'hi': ['hi', 'hy', 'hii', 'hiii', 'hiiii', 'hie', 'hye', 'hai', 'hay'], | |
'hey': ['hey', 'heyy', 'heyyy', 'heey', 'heeyy', 'hay', 'hae'], | |
'good_morning': ['good morning', 'goodmorning', 'gm', 'gud morning', 'gudmorning'], | |
'good_afternoon': ['good afternoon', 'goodafternoon', 'ga', 'gud afternoon', 'gudafternoon'], | |
'good_evening': ['good evening', 'goodevening', 'ge', 'gud evening', 'gudevening'], | |
'good_night': ['good night', 'goodnight', 'gn', 'gud night', 'gudnight'], | |
'morning': ['morning', 'mornin', 'morn'], | |
'afternoon': ['afternoon', 'aftrnoon', 'aftr'], | |
'evening': ['evening', 'evnin', 'evn'], | |
'night': ['night', 'nite', 'nyt'], | |
'how_are_you': ['how are you', 'how r u', 'how are u', 'how r you', 'howru', 'howru'], | |
'whats_up': ['whats up', 'whats up', 'what is up', 'wassup', 'wassup', 'sup', 'sup'], | |
'assalamu_alaikum': ['assalamu alaikum', 'assalam alaikum', 'assalamu alaikom', 'assalam alaikom', 'asalamu alaikum', 'asalam alaikum'], | |
'salam': ['salam', 'salaam', 'assalam', 'assalaam', 'salaam alaikum', 'salaam alaikom'], | |
'adaab': ['adaab', 'adaab arz hai', 'adaab arz', 'adaab arz karta hun'], | |
'namaste': ['namaste', 'namaskar', 'pranam', 'pranaam'], | |
'khuda_hafiz': ['khuda hafiz', 'allah hafiz', 'fi amanillah'], | |
'thank_you': ['thank you', 'thanks', 'shukriya', 'shukran', 'thnx', 'thx', 'tnx'] | |
} | |
# Flatten all variations into a single list for fuzzy matching | |
all_greeting_variations = [] | |
for variations in core_greetings.values(): | |
all_greeting_variations.extend(variations) | |
# 1. Exact match check (fastest) | |
if text_lower in all_greeting_variations: | |
return True | |
# 2. Check for greeting patterns with common prefixes/suffixes | |
greeting_patterns = [ | |
r'^(hi|hello|hey|hy|hii|hiii|hallo|helo|hlo|heyy|heyyy|heey|heeyy|hay|hae|hai|hye|hie)\s*$', | |
r'^(good\s+(morning|afternoon|evening|night)|gm|ga|ge|gn|gud\s+(morning|afternoon|evening|night))\s*$', | |
r'^(morning|afternoon|evening|night|mornin|morn|aftrnoon|aftr|evnin|evn|nite|nyt)\s*$', | |
r'^(how\s+(are\s+)?(you|u|r\s+u)|howru|howru)\s*$', | |
r'^(whats?\s+up|wassup|sup)\s*$', | |
r'^(assalamu?\s+alaik(um|om)|asalamu?\s+alaik(um|om)|salaam\s+alaik(um|om))\s*$', | |
r'^(salam|salaam|assalam|assalaam)\s*$', | |
r'^(adaab(?:\s+arz(?:\s+(hai|karta\s+hun))?)?)\s*$', | |
r'^(namaste|namaskar|pranam|pranaam)\s*$', | |
r'^(khuda\s+hafiz|allah\s+hafiz|fi\s+amanillah)\s*$', | |
r'^(thank\s+you|thanks|shukriya|shukran|thnx|thx|tnx)\s*$' | |
] | |
for pattern in greeting_patterns: | |
if re.match(pattern, text_lower): | |
return True | |
# 3. Fuzzy matching for typos and variations (using rapidfuzz) | |
# Set a high threshold to avoid false positives | |
FUZZY_THRESHOLD = 90 # Increased threshold to 90% for better precision | |
# Check against all greeting variations | |
for greeting in all_greeting_variations: | |
# Use ratio for overall similarity | |
similarity = fuzz.ratio(text_lower, greeting) | |
if similarity >= FUZZY_THRESHOLD: | |
logger.info(f"Fuzzy greeting match: '{text_lower}' -> '{greeting}' (similarity: {similarity}%)") | |
return True | |
# 4. Check for greeting questions | |
greeting_questions = [ | |
'how are you', 'how r u', 'how are u', 'how do you do', 'how\'s it going', | |
'how is it going', 'how\'s everything', 'how is everything', | |
'what\'s up', 'whats up', 'what is up', 'how\'s life', 'how is life', | |
'آپ کیسے ہیں', 'آپ کیسے ہو', 'کیسے ہیں', 'کیسے ہو', 'کیا حال ہے', 'کیسا ہے' | |
] | |
for question in greeting_questions: | |
if question in text_lower: | |
return True | |
# 5. Check for greeting with common modifiers | |
greeting_modifiers = ['there', 'everyone', 'all', 'guys', 'folks', 'people'] | |
words = text_lower.split() | |
if len(words) >= 2: | |
first_word = words[0] | |
remaining_words = words[1:] | |
# Check if first word is a greeting and remaining words are modifiers | |
for greeting in all_greeting_variations: | |
if fuzz.ratio(first_word, greeting) >= FUZZY_THRESHOLD: | |
# Check if remaining words are all modifiers | |
if all(word in greeting_modifiers for word in remaining_words): | |
return True | |
# 6. Special case: Very short messages that are likely greetings | |
if len(text_lower) <= 4 and len(text_lower) >= 2: | |
# Check if it's a very short greeting-like word | |
short_greetings = ['hi', 'hy', 'hii', 'hey', 'heyy', 'hay', 'hae', 'hai', 'hye', 'hie'] | |
for short_greeting in short_greetings: | |
if fuzz.ratio(text_lower, short_greeting) >= 85: # Lower threshold for short words | |
logger.info(f"Short greeting match: '{text_lower}' -> '{short_greeting}'") | |
return True | |
# 7. Additional safety check: Avoid false positives for common non-greeting words | |
# that might have high similarity to greetings | |
non_greeting_words = [ | |
'help', 'here', 'her', 'his', 'him', 'hot', 'how', 'history', 'high', | |
'hint', 'hit', 'hill', 'hire', 'a', 'b', 'c', 'what', 'when', 'where', 'why' | |
] | |
# If the text is exactly one of these words, it's not a greeting | |
if text_lower in non_greeting_words: | |
return False | |
# 8. Check for product/inquiry keywords that indicate non-greeting intent | |
inquiry_keywords = [ | |
'need', 'want', 'looking', 'find', 'show', 'tell', 'give', 'products', | |
'medicine', 'antibiotics', 'veterinary', 'animals', 'cattle', 'poultry', | |
'catalog', 'price', 'availability', 'consultation', 'appointment', | |
'main', 'menu', 'start', 'home', 'back', '1', '2', '3', '4', '5' | |
] | |
# If any inquiry keyword is present, it's likely not just a greeting | |
for keyword in inquiry_keywords: | |
if keyword in text_lower: | |
return False | |
return False | |
async def handle_ai_chat_mode(from_number: str, query: str, reply_language: str = 'en'): | |
""" | |
Handle AI chat mode - completely separate from menu system | |
Uses OpenAI to provide intelligent responses based on CSV data | |
""" | |
try: | |
logger.info(f"[AI Chat] Processing query: '{query}' for {from_number} in {reply_language}") | |
# Check for navigation commands first | |
if query.lower().strip() in ['main', 'menu', 'start', 'home', 'back']: | |
logger.info(f"[AI Chat] Navigation command detected: '{query}' -> returning to main menu") | |
welcome_msg = generate_veterinary_welcome_message() | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
return | |
# Check for greetings - return to main menu | |
if is_greeting(query): | |
logger.info(f"[AI Chat] Greeting detected: '{query}' -> returning to main menu") | |
welcome_msg = generate_veterinary_welcome_message() | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
return | |
# Check if OpenAI is available | |
if not OPENAI_API_KEY: | |
if reply_language == 'ur': | |
send_whatsjet_message(from_number, "❌ AI Assistant requires OpenAI API. Please contact support.") | |
else: | |
send_whatsjet_message(from_number, "❌ AI Assistant requires OpenAI API. Please contact support.") | |
return | |
# Get all products data for context | |
all_products = [] | |
if products_df is not None and not products_df.empty: | |
all_products = products_df.to_dict('records') | |
# Create comprehensive context for AI | |
products_context = "" | |
if all_products: | |
products_context = "Available Veterinary Products:\n" | |
for i, product in enumerate(all_products[:50], 1): # Limit to first 50 products for context | |
product_name = product.get('Product Name', 'N/A') | |
category = product.get('Category', 'N/A') | |
composition = product.get('Composition', 'N/A') | |
target_species = product.get('Target Species', 'N/A') | |
products_context += f"{i}. {product_name} - {category}\n" | |
products_context += f" Composition: {composition}\n" | |
products_context += f" Target Species: {target_species}\n\n" | |
# Create AI prompt | |
if reply_language == 'ur': | |
prompt = f""" | |
آپ Apex Biotical کے Veterinary AI Assistant ہیں۔ آپ کو veterinary products اور treatments کے بارے میں معلومات فراہم کرنی ہیں۔ | |
یوزر کا سوال: {query} | |
دستیاب veterinary products: | |
{products_context} | |
براہ کرم: | |
1. یوزر کے سوال کا جواب دیں | |
2. اگر یہ veterinary products سے متعلق ہے تو relevant products کی معلومات دیں | |
3. اگر یہ general veterinary advice ہے تو professional guidance دیں | |
4. اردو میں جواب دیں | |
5. جواب professional اور helpful ہو | |
جواب: | |
""" | |
else: | |
prompt = f""" | |
You are Apex Biotical's Veterinary AI Assistant. You provide information about veterinary products and treatments. | |
User Query: {query} | |
Available Veterinary Products: | |
{products_context} | |
Please: | |
1. Answer the user's question | |
2. If it's related to veterinary products, provide relevant product information | |
3. If it's general veterinary advice, provide professional guidance | |
4. Answer in English | |
5. Keep the response professional and helpful | |
Response: | |
""" | |
# Get AI response | |
response = openai.ChatCompletion.create( | |
model="gpt-4o", | |
messages=[{"role": "user", "content": prompt}], | |
temperature=0.7, | |
max_tokens=500 | |
) | |
ai_response = response.choices[0].message.content.strip() | |
# Add instructions for returning to main menu | |
if reply_language == 'ur': | |
ai_response += "\n\n💬 *Type 'main' to return to main menu*" | |
else: | |
ai_response += "\n\n💬 *Type 'main' to return to main menu*" | |
# Send response | |
send_whatsjet_message(from_number, ai_response) | |
# Update context to AI chat mode | |
context_manager.update_context( | |
from_number, | |
current_state='ai_chat_mode', | |
current_menu='ai_chat_mode', | |
current_menu_options=['main'], | |
last_ai_query=query, | |
last_ai_response=ai_response | |
) | |
# Add to conversation history | |
context_manager.add_to_history(from_number, query, ai_response) | |
logger.info(f"[AI Chat] Response sent successfully to {from_number}") | |
except Exception as e: | |
logger.error(f"[AI Chat] Error processing query: {e}") | |
if reply_language == 'ur': | |
error_msg = "❌ AI Assistant میں error آ گیا ہے۔ براہ کرم دوبارہ کوشش کریں یا 'main' لکھ کر main menu پر واپس جائیں۔" | |
else: | |
error_msg = "❌ AI Assistant encountered an error. Please try again or type 'main' to return to main menu." | |
send_whatsjet_message(from_number, error_msg) | |
# Load products on startup | |
def load_products_data(): | |
"""Load products data from CSV file""" | |
global products_df | |
try: | |
if os.path.exists(CSV_FILE): | |
products_df = pd.read_csv(CSV_FILE) | |
logger.info(f"✅ Loaded {len(products_df)} products from {CSV_FILE}") | |
else: | |
logger.warning(f"⚠️ CSV file {CSV_FILE} not found") | |
products_df = pd.DataFrame() | |
except Exception as e: | |
logger.error(f"❌ Error loading products data: {e}") | |
products_df = pd.DataFrame() | |
load_products_data() | |
# Add these functions after the existing imports and before the main functions | |
def get_product_image_path(product_name: str) -> str: | |
""" | |
Get the public image URL for a product based on its name, robust to formatting, preserving dashes. | |
cPanel public URL format: https://amgocus.com/uploads/images/<normalized_name>.<ext> | |
Normalized: lowercase, remove spaces/underscores/dots, preserve dashes. | |
""" | |
try: | |
def normalize(name): | |
return re.sub(r'[\s_\.]', '', name).lower() | |
normalized_name = normalize(product_name) | |
image_extensions = ['.png', '.jpg', '.jpeg', '.webp'] | |
base_url = "https://amgocus.com/uploads/images/" | |
import requests | |
# Log for respiraaidplus.png check | |
if normalized_name == "respiraaidplus": | |
logger.info(f"[Image] Checking cPanel URL for respiraaidplus: {base_url}respiraaidplus.png") | |
# Try normalized name (with dashes preserved) | |
for ext in image_extensions: | |
image_url = f"{base_url}{normalized_name}{ext}" | |
logger.info(f"[Image] Checking normalized image URL: {image_url}") | |
# For public URLs, assume they are accessible if they start with http | |
# This avoids issues with servers that don't respond properly to bot requests | |
if image_url.startswith('http'): | |
logger.info(f"[Image] Found public image URL: {image_url}") | |
return image_url | |
# Fallback: try original name with spaces as %20 | |
safe_name = product_name.strip().replace(' ', '%20') | |
for ext in image_extensions: | |
image_url = f"{base_url}{safe_name}{ext}" | |
logger.info(f"[Image] Checking fallback image URL: {image_url}") | |
# For public URLs, assume they are accessible if they start with http | |
if image_url.startswith('http'): | |
logger.info(f"[Image] Found public image URL (legacy): {image_url}") | |
return image_url | |
# Remove default image fallback | |
logger.warning(f"[Image] No public image found for product: {product_name}") | |
return None | |
except Exception as e: | |
logger.error(f"[Image] Error generating public image URL for {product_name}: {e}") | |
return None | |
def get_product_image_media_type(image_path: str) -> str: | |
""" | |
Determine the media type based on file extension | |
""" | |
if not image_path: | |
return None | |
ext = os.path.splitext(image_path)[1].lower() | |
media_type_map = { | |
'.jpg': 'image/jpeg', | |
'.jpeg': 'image/jpeg', | |
'.png': 'image/png', | |
'.webp': 'image/webp', | |
'.gif': 'image/gif' | |
} | |
return media_type_map.get(ext, 'image/jpeg') | |
async def send_product_with_image(from_number: str, product: Dict[str, Any], user_context: Dict[str, Any]): | |
""" | |
Send product information with image if available | |
""" | |
try: | |
product_name = product.get('Product Name', 'Unknown Product') | |
# Generate product response | |
response = generate_veterinary_product_response(product, user_context) | |
# Try to get product image | |
image_path = get_product_image_path(product_name) | |
if image_path and os.path.exists(image_path): | |
# Send product info with image | |
media_type = get_product_image_media_type(image_path) | |
filename = f"{product_name.replace(' ', '_')}.jpg" | |
success = send_whatsjet_message( | |
from_number, | |
response, | |
media_type=media_type, | |
media_path=image_path, | |
filename=filename | |
) | |
if success: | |
logger.info(f"[Product] Successfully sent product with image: {product_name}") | |
else: | |
# Fallback to text-only if image send fails | |
logger.warning(f"[Product] Failed to send image, sending text only: {product_name}") | |
send_whatsjet_message(from_number, response) | |
else: | |
# Send text-only response | |
send_whatsjet_message(from_number, response) | |
logger.info(f"[Product] Sent product info without image: {product_name}") | |
except Exception as e: | |
logger.error(f"[Product] Error sending product with image: {e}") | |
# Fallback to text-only | |
response = generate_veterinary_product_response(product, user_context) | |
send_whatsjet_message(from_number, response) | |
async def send_enhanced_pdf(from_number: str, product: Dict[str, Any], pdf_content: bytes = None): | |
""" | |
Send PDF with enhanced formatting and proper WhatsApp document sharing | |
""" | |
try: | |
product_name = product.get('Product Name', 'Unknown_Product') | |
safe_name = re.sub(r'[^\w\s-]', '', product_name).replace(' ', '_') | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
filename = f"{safe_name}_Product_Info_{timestamp}.pdf" | |
# Generate PDF if not provided | |
if pdf_content is None: | |
pdf_content = generate_veterinary_pdf(product) | |
# Save PDF to uploads directory | |
uploads_dir = "uploads" | |
os.makedirs(uploads_dir, exist_ok=True) | |
pdf_path = os.path.join(uploads_dir, filename) | |
with open(pdf_path, 'wb') as f: | |
f.write(pdf_content) | |
# Send PDF as document via WhatsApp | |
success = send_whatsjet_message( | |
from_number, | |
f"📄 *{product_name} - Detailed Product Information*\n\n" | |
f"📎 Here's the complete product information in PDF format.\n" | |
f"📋 Includes: Composition, Dosage, Precautions, Storage\n\n" | |
f"💬 Type 'main' to return to main menu.", | |
media_type="application/pdf", | |
media_path=pdf_path, | |
filename=filename | |
) | |
if success: | |
logger.info(f"[PDF] Successfully sent PDF for product: {product_name}") | |
else: | |
# Fallback: Send download link | |
server_url = os.getenv("SERVER_URL", "https://your-huggingface-space-url.hf.space") | |
download_url = f"{server_url}/uploads/{filename}" | |
message = ( | |
f"📄 *{product_name} - Product Information*\n\n" | |
f"📎 [Download Product PDF]({download_url})\n\n" | |
f"💬 *Click the link above to download the detailed product information*\n" | |
f"Type 'main' to return to main menu." | |
) | |
send_whatsjet_message(from_number, message) | |
logger.info(f"[PDF] Sent PDF download link for product: {product_name}") | |
except Exception as e: | |
logger.error(f"[PDF] Error sending enhanced PDF: {e}") | |
# Fallback to basic text response | |
response = generate_veterinary_product_response(product, {}) | |
send_whatsjet_message(from_number, response) | |
# Enhanced product response function with image support | |
def generate_veterinary_product_response_with_media(product_info: Dict[str, Any], user_context: Dict[str, Any]) -> Dict[str, Any]: | |
""" | |
Generate comprehensive veterinary product response with media information | |
Returns a dictionary with text response and media info | |
""" | |
def clean_text(text): | |
if pd.isna(text) or text is None: | |
return "Not specified" | |
return str(text).strip() | |
product_name = clean_text(product_info.get('Product Name', '')) | |
product_type = clean_text(product_info.get('Type', '')) | |
category = clean_text(product_info.get('Category', '')) | |
indications = clean_text(product_info.get('Indications', '')) | |
pdf_link = "" | |
try: | |
csv_data = pd.read_csv('Veterinary.csv') | |
product_row = csv_data[csv_data['Product Name'] == product_name] | |
if not product_row.empty: | |
brochure_link = product_row.iloc[0].get('Brochure (PDF)', '') | |
if pd.notna(brochure_link) and brochure_link.strip(): | |
pdf_link = brochure_link.strip() | |
except Exception as e: | |
logger.warning(f"Error checking PDF link for {product_name}: {e}") | |
response_text = f"""🧪 *Name:* {product_name}\n📦 *Type:* {product_type}\n🏥 *Category:* {category}\n💊 *Used For:* {indications}""" | |
if pdf_link: | |
response_text += f"\n\n📄 Product Brochure Available\n🔗 {product_name} PDF:\n{pdf_link}" | |
response_text += f""" | |
\n💬 *Available Actions:* | |
1️⃣ Talk to Veterinary Consultant | |
2️⃣ Inquire About Availability | |
3️⃣ Back to Main Menu | |
\n💬 Select an option or ask about related products""" | |
image_path = get_product_image_path(product_name) | |
has_image = image_path is not None and os.path.exists(image_path) | |
return { | |
'text': response_text, | |
'has_image': has_image, | |
'image_path': image_path, | |
'product_name': product_name | |
} | |
def ensure_images_dir(): | |
"""Ensure the images directory exists""" | |
images_dir = "static/images" | |
os.makedirs(images_dir, exist_ok=True) | |
logger.info(f"[Image] Ensured images directory exists: {images_dir}") | |
# New feature: Send product image with caption (product details) | |
async def send_product_image_with_caption(from_number: str, product: Dict[str, Any], user_context: Dict[str, Any]): | |
logger.info(f"[Product] send_product_image_with_caption called for: {product.get('Product Name', '')}") | |
""" | |
Send product image (if available) with product details as caption in a single WhatsApp message. | |
If image is not available, send only the product details as text. | |
Now supports 'Images' column in CSV (Google Drive or direct links). | |
""" | |
ensure_images_dir() | |
product_name = product.get('Product Name', 'Unknown Product') | |
details = generate_veterinary_product_response(product, user_context) | |
image_url = product.get('Images', '').strip() if 'Images' in product else '' | |
# Force image URL for Respira Aid Plus (use cPanel public URL) | |
if product_name.lower().strip() == "respira aid plus": | |
image_url = "https://amgocus.com/uploads/images/Respira%20Aid%20Plus.jpg" | |
logger.info(f"[Product] Processing image for product: {product_name}") | |
logger.info(f"[Product] Image URL from CSV: {image_url}") | |
try: | |
# First, check if we have an image URL from CSV | |
if image_url: | |
# Convert Google Drive link to direct download if needed | |
if 'drive.google.com' in image_url: | |
logger.info(f"[Product] Converting Google Drive link: {image_url}") | |
if '/d/' in image_url: | |
file_id = image_url.split('/d/')[1].split('/')[0] | |
elif 'id=' in image_url: | |
file_id = image_url.split('id=')[1].split('&')[0] | |
else: | |
file_id = '' | |
if file_id: | |
image_url = f"https://drive.google.com/uc?export=download&id={file_id}" | |
logger.info(f"[Product] Converted to direct download URL: {image_url}") | |
media_type = 'image/jpeg' | |
filename = f"{product_name.replace(' ', '_')}.jpg" | |
try: | |
logger.info(f"[Product] Testing image URL accessibility: {image_url}") | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', | |
'Accept': 'image/webp,image/apng,image/*,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.9', | |
'Accept-Encoding': 'gzip, deflate, br', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1' | |
} | |
test_response = requests.get(image_url, headers=headers, timeout=10, stream=True, allow_redirects=True) | |
if test_response.status_code != 200: | |
logger.warning(f"[Product] Image URL not accessible (status {test_response.status_code}): {image_url}") | |
raise Exception(f"Image URL not accessible: {test_response.status_code}") | |
logger.info(f"[Product] Image URL is accessible") | |
except Exception as e: | |
logger.warning(f"[Product] Failed to test image URL {image_url}: {e}") | |
image_url = None | |
# If image_url is public, send image+caption in a single message | |
if image_url and image_url.startswith('http'): | |
logger.info(f"[Product] Sending public image URL with caption using send_whatsjet_message: {image_url}") | |
send_whatsjet_message( | |
from_number, | |
details, | |
media_type=media_type, | |
media_path=image_url, | |
filename=filename | |
) | |
return | |
# Otherwise, fallback to local file logic | |
# Send using public URL (not local file) | |
if image_url: | |
logger.info(f"[Product] Attempting to send image from CSV URL for: {product_name}") | |
success = send_whatsjet_message( | |
from_number, | |
details, | |
media_type=media_type, | |
media_path=image_url, # Use public URL directly | |
filename=filename | |
) | |
if success: | |
logger.info(f"[Product] Successfully sent image from CSV link with caption for product: {product_name}") | |
return | |
else: | |
logger.warning(f"[Product] Failed to send image from CSV link, trying fallback: {product_name}") | |
# Fallback 1: Try with a known public test image | |
logger.info(f"[Product] Trying public test image for: {product_name}") | |
test_image_url = "https://www.w3schools.com/w3images/lights.jpg" | |
media_type = 'image/jpeg' | |
filename = f"{product_name.replace(' ', '_')}.jpg" | |
success = send_whatsjet_message( | |
from_number, | |
details, | |
media_type=media_type, | |
media_path=test_image_url, | |
filename=filename | |
) | |
if success: | |
logger.info(f"[Product] Successfully sent test image with caption for product: {product_name}") | |
return | |
# Fallback 2: Try local uploads directory (public URL) | |
logger.info(f"[Product] Trying local uploads directory for: {product_name}") | |
image_path = get_product_image_path(product_name) | |
if image_path: | |
media_type = get_product_image_media_type(image_path) | |
filename = f"{product_name.replace(' ', '_')}.jpg" | |
# If it's a public URL, send as image+caption | |
if image_path.startswith('http'): | |
logger.info(f"[Product] Sending normalized cPanel public URL with caption: {image_path}") | |
# Send image first using the correct method for URLs | |
image_success = send_whatsjet_media_image_only( | |
from_number, | |
image_path, | |
filename=filename | |
) | |
if image_success: | |
logger.info(f"[Product] Image sent successfully, now sending product details") | |
# Send product details as text | |
send_whatsjet_message(from_number, details) | |
return | |
else: | |
logger.warning(f"[Product] Failed to send image, sending text only") | |
send_whatsjet_message(from_number, details) | |
return | |
# Otherwise, treat as local file | |
elif os.path.exists(image_path): | |
logger.info(f"[Product] Sending local file with caption: {image_path}") | |
send_whatsjet_message( | |
from_number, | |
details, | |
media_type=media_type, | |
media_path=image_path, | |
filename=filename | |
) | |
return | |
# No image available, send text only | |
logger.info(f"[Product] No image available, sending text only for: {product_name}") | |
send_whatsjet_message(from_number, details) | |
except Exception as e: | |
logger.error(f"[Product] Error sending product image with caption: {e}") | |
logger.info(f"[Product] Falling back to text-only message for: {product_name}") | |
send_whatsjet_message(from_number, details) | |
# Test endpoint for product image with caption | |
async def test_product_image_with_caption(phone: str): | |
"""Test endpoint for sending product image with caption""" | |
try: | |
if products_df is None or products_df.empty: | |
return {"error": "No products loaded"} | |
# Get first product for testing | |
product = products_df.iloc[0].to_dict() | |
user_context = {} | |
await send_product_image_with_caption(phone, product, user_context) | |
return { | |
"success": True, | |
"message": f"Test product image sent to {phone}", | |
"product": product.get('Product Name', 'Unknown') | |
} | |
except Exception as e: | |
logger.error(f"Error in test product image with caption: {e}") | |
return {"error": str(e)} | |
# Test endpoint for image sending | |
async def test_image_sending(phone: str, image_url: str = "https://www.w3schools.com/w3images/lights.jpg"): | |
"""Test endpoint for sending images via WhatsApp""" | |
try: | |
filename = "test_image.jpg" | |
success = send_whatsjet_message( | |
phone, | |
"🖼️ *Test Image*\n\nThis is a test image sent via WhatsApp API.", | |
media_type="image/jpeg", | |
media_path=image_url, | |
filename=filename | |
) | |
if success: | |
return { | |
"success": True, | |
"message": f"Test image sent successfully to {phone}", | |
"image_url": image_url | |
} | |
else: | |
return { | |
"success": False, | |
"message": f"Failed to send test image to {phone}", | |
"image_url": image_url | |
} | |
except Exception as e: | |
logger.error(f"Error in test image sending: {e}") | |
return {"error": str(e)} | |
# Debug endpoint for WhatsJet | |
async def debug_whatsjet(): | |
"""Debug endpoint to check WhatsJet configuration""" | |
try: | |
config = { | |
"api_url": WHATSJET_API_URL, | |
"vendor_uid": WHATSJET_VENDOR_UID, | |
"api_token": "***" if WHATSJET_API_TOKEN else None, | |
"server_url": SERVER_URL, | |
"openai_key": "***" if OPENAI_API_KEY else None | |
} | |
return { | |
"status": "success", | |
"config": config, | |
"timestamp": datetime.now().isoformat() | |
} | |
except Exception as e: | |
return { | |
"status": "error", | |
"error": str(e), | |
"timestamp": datetime.now().isoformat() | |
} | |
# Test endpoint for WhatsJet payloads | |
async def test_whatsjet_payloads(phone: str): | |
"""Test endpoint to check WhatsJet payloads""" | |
try: | |
# Test basic message sending | |
test_message = "🧪 *WhatsJet Test*\n\nThis is a test message to verify WhatsJet integration." | |
success = send_whatsjet_message(phone, test_message) | |
return { | |
"status": "success" if success else "failed", | |
"message": f"WhatsJet test message sent to {phone}", | |
"success": success, | |
"timestamp": datetime.now().isoformat() | |
} | |
except Exception as e: | |
return { | |
"status": "error", | |
"error": str(e), | |
"timestamp": datetime.now().isoformat() | |
} | |
# Test endpoint for cPanel image access | |
async def test_cpanel_image_access(): | |
""" | |
Test endpoint to check if cPanel image URLs are now accessible with browser-like headers. | |
""" | |
try: | |
image_url = "https://amgocus.com/uploads/images/Respira%20Aid%20Plus.jpg" | |
# Test with browser-like headers | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', | |
'Accept': 'image/webp,image/apng,image/*,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.9', | |
'Accept-Encoding': 'gzip, deflate, br', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1' | |
} | |
logger.info(f"[Test] Testing cPanel image URL with browser headers: {image_url}") | |
response = requests.get(image_url, headers=headers, timeout=10, stream=True, allow_redirects=True) | |
result = { | |
"image_url": image_url, | |
"status_code": response.status_code, | |
"headers": dict(response.headers), | |
"accessible": response.status_code == 200, | |
"timestamp": datetime.now().isoformat() | |
} | |
if response.status_code == 200: | |
logger.info(f"[Test] ✅ cPanel image URL is now accessible!") | |
else: | |
logger.warning(f"[Test] ❌ cPanel image URL still not accessible (status {response.status_code})") | |
return result | |
except Exception as e: | |
logger.error(f"[Test] Error testing cPanel image access: {e}") | |
return { | |
"error": str(e), | |
"image_url": image_url, | |
"timestamp": datetime.now().isoformat() | |
} | |
def convert_drive_link(link: str) -> str: | |
"""Convert Google Drive link to direct download link""" | |
if 'drive.google.com' in link: | |
file_id = link.split('/')[-2] if '/d/' in link else link.split('/')[-1] | |
return f"https://drive.google.com/uc?export=download&id={file_id}" | |
return link | |
def format_number_with_emoji(number: int) -> str: | |
"""Format number with emoji""" | |
emoji_map = { | |
1: "1️⃣", 2: "2️⃣", 3: "3️⃣", 4: "4️⃣", 5: "5️⃣", | |
6: "6️⃣", 7: "7️⃣", 8: "8️⃣", 9: "9️⃣", 10: "🔟", | |
11: "1️⃣1️⃣", 12: "1️⃣2️⃣", 13: "1️⃣3️⃣", 14: "1️⃣4️⃣", 15: "1️⃣5️⃣", | |
16: "1️⃣6️⃣", 17: "1️⃣7️⃣", 18: "1️⃣8️⃣", 19: "1️⃣9️⃣", 20: "2️⃣0️⃣", | |
21: "2️⃣1️⃣", 22: "2️⃣2️⃣", 23: "2️⃣3️⃣" | |
} | |
return emoji_map.get(number, f"{number}.") | |
async def display_all_products(from_number: str): | |
"""Display all products in multiple messages and update menu context""" | |
try: | |
user_context = context_manager.get_context(from_number) | |
current_state = user_context.get('current_state', 'main_menu') | |
logger.info(f"[Display] display_all_products called for {from_number} in state: {current_state}") | |
if current_state == 'all_products_menu': | |
logger.warning(f"[Display] Already in all_products_menu state for {from_number}, skipping display") | |
return | |
if products_df is None or products_df.empty: | |
send_whatsjet_message(from_number, "❌ No products available at the moment.") | |
return | |
# Set state to all_products_menu and store menu context | |
products = products_df.to_dict('records') | |
context_manager.update_context( | |
from_number, | |
current_state='all_products_menu', | |
current_menu='all_products_menu', | |
current_menu_options=[p.get('Product Name', 'Unknown') for p in products], | |
available_products=products | |
) | |
logger.info(f"[Display] Set state to all_products_menu for {from_number}") | |
# Send products in chunks | |
chunk_size = 5 | |
for i in range(0, len(products), chunk_size): | |
chunk = products[i:i + chunk_size] | |
message = f"📋 *Products ({i+1}-{min(i+chunk_size, len(products))} of {len(products)})*\n\n" | |
for j, product in enumerate(chunk, i+1): | |
message += f"{format_number_with_emoji(j)} {product.get('Product Name', 'Unknown')}\n" | |
if product.get('Category'): | |
message += f" Category: {product.get('Category')}\n" | |
message += "\n" | |
send_whatsjet_message(from_number, message) | |
send_whatsjet_message(from_number, | |
"💬 Type a product name to get detailed information, or type 'main' to return to main menu.") | |
except Exception as e: | |
logger.error(f"[Display] Error displaying products: {e}") | |
send_whatsjet_message(from_number, "❌ Error displaying products. Please try again.") | |
def get_all_categories(): | |
"""Return a list of all unique categories from the products DataFrame""" | |
if products_df is not None and not products_df.empty: | |
return list(products_df['Category'].unique()) | |
return [] | |
def get_products_by_category(category: str): | |
"""Get products by category""" | |
if products_df is None or products_df.empty: | |
return [] | |
category_products = products_df[products_df['Category'] == category] | |
return category_products.to_dict('records') | |
async def handle_category_selection(selection: str, from_number: str): | |
"""Handle category selection""" | |
try: | |
user_context = context_manager.get_context(from_number) | |
available_categories = user_context.get('available_categories', []) | |
if selection.isdigit() and 1 <= int(selection) <= len(available_categories): | |
selected_category = available_categories[int(selection) - 1] | |
products = get_products_by_category(selected_category) | |
if products: | |
# Update context with category products | |
context_manager.update_context( | |
from_number, | |
current_category=selected_category, | |
current_state='category_products_menu', | |
current_menu='category_products_menu', | |
current_menu_options=[p.get('Product Name', 'Unknown') for p in products], | |
available_products=products | |
) | |
# Send category products | |
message = f"📦 *Products in {selected_category}*\n\n" | |
for i, product in enumerate(products, 1): | |
message += f"{format_number_with_emoji(i)} {product.get('Product Name', 'Unknown')}\n" | |
if product.get('Target Species'): | |
message += f" Target: {product.get('Target Species')}\n" | |
message += "\n" | |
message += "💬 Select a product number or type 'main' to return to main menu." | |
send_whatsjet_message(from_number, message) | |
else: | |
send_whatsjet_message(from_number, f"❌ No products found in {selected_category} category.") | |
else: | |
send_whatsjet_message(from_number, "❌ Invalid selection. Please choose a valid category number.") | |
except Exception as e: | |
logger.error(f"[Category] Error handling category selection: {e}") | |
send_helpful_guidance(from_number, 'category_selection_menu') | |
def get_menu_validation_message(current_state: str, user_context: dict) -> str: | |
"""Get appropriate validation message for current menu state""" | |
if current_state == 'main_menu': | |
return ( | |
"❌ *Invalid Selection*\n\n" | |
"Please choose from the main menu:\n" | |
"1️⃣ Search Veterinary Products\n" | |
"2️⃣ Browse Categories\n" | |
"3️⃣ Download Catalog\n" | |
"4️⃣ Chat with Veterinary AI Assistant\n\n" | |
"💬 *You can also:*\n" | |
"• Type a product name (e.g., 'hydropex', 'respira aid plus')\n" | |
"• Type 'main' to refresh the menu" | |
) | |
elif current_state == 'all_products_menu': | |
if products_df is not None and not products_df.empty: | |
total_products = len(products_df) | |
return ( | |
f"❌ *Invalid Product Selection*\n\n" | |
f"Please choose a product number between 1 and {total_products}.\n\n" | |
"💬 *You can also:*\n" | |
"• Type a product name (e.g., 'hydropex', 'respira aid plus')\n" | |
"• Type 'main' to return to main menu" | |
) | |
else: | |
return "❌ No products available. Type 'main' to return to main menu." | |
elif current_state == 'category_products_menu': | |
available_products = user_context.get('available_products', []) | |
if available_products: | |
return ( | |
f"❌ *Invalid Product Selection*\n\n" | |
f"Please choose a product number between 1 and {len(available_products)}.\n\n" | |
"💬 *You can also:*\n" | |
"• Type a product name (e.g., 'hydropex', 'respira aid plus')\n" | |
"• Type 'main' to return to main menu" | |
) | |
else: | |
return "❌ No products available in this category. Type 'main' to return to main menu." | |
elif current_state == 'category_selection_menu': | |
available_categories = user_context.get('available_categories', []) | |
if available_categories: | |
return ( | |
f"❌ *Invalid Category Selection*\n\n" | |
f"Please choose a category number between 1 and {len(available_categories)}.\n\n" | |
"💬 *You can also:*\n" | |
"• Type a product name (e.g., 'hydropex', 'respira aid plus')\n" | |
"• Type 'main' to return to main menu" | |
) | |
else: | |
return "❌ No categories available. Type 'main' to return to main menu." | |
elif current_state == 'product_inquiry': | |
return ( | |
"❌ *Invalid Selection*\n\n" | |
"Please choose an option:\n" | |
"1️⃣ Talk to Veterinary Consultant\n" | |
"2️⃣ Inquire About Availability\n" | |
"3️⃣ Back to Main Menu\n\n" | |
"💬 *You can also:*\n" | |
"• Type a product name (e.g., 'hydropex', 'respira aid plus')\n" | |
"• Type 'main' to return to main menu" | |
) | |
else: | |
return ( | |
"❌ *Invalid Selection*\n\n" | |
"Please choose a valid option or type 'main' to return to main menu.\n\n" | |
"💬 *You can also:*\n" | |
"• Type a product name (e.g., 'hydropex', 'respira aid plus')" | |
) | |
def is_valid_menu_selection(selection: str, current_state: str, user_context: dict) -> bool: | |
"""Check if selection is valid for current menu state""" | |
is_valid, _ = validate_menu_selection(selection, current_state, user_context) | |
return is_valid | |
def generate_veterinary_welcome_message(phone_number=None, user_context=None): | |
"""Generate veterinary welcome message""" | |
return ( | |
"🏥 *Welcome to Apex Biotical Veterinary Assistant*\n\n" | |
"I'm your intelligent veterinary assistant. How can I help you today?\n\n" | |
"📋 *Main Menu:*\n" | |
"1️⃣ Search Veterinary Products\n" | |
"2️⃣ Browse Categories\n" | |
"3️⃣ Download Catalog\n" | |
"4️⃣ Chat with Veterinary AI Assistant\n\n" | |
"💬 *Quick Actions:*\n" | |
"• Type a product name (e.g., 'hydropex', 'respira aid plus')\n" | |
"• Ask about symptoms (e.g., 'respiratory problems', 'liver support')\n" | |
"• Search by category (e.g., 'antibiotics', 'vitamins')\n\n" | |
"🎤 *Voice messages are supported!*\n" | |
"You can speak product names, menu numbers, or ask questions." | |
) | |
async def handle_veterinary_product_followup(selection: str, from_number: str) -> None: | |
""" | |
Handle product follow-up selections with enhanced veterinary domain support | |
""" | |
try: | |
user_context = context_manager.get_context(from_number) | |
current_product = user_context.get('current_product') | |
if not current_product: | |
send_whatsjet_message(from_number, "❌ No product selected. Please search for a product first.") | |
return | |
if selection == '1': | |
# Talk to Veterinary Consultant | |
product_name = current_product.get('Product Name', 'the selected product') | |
consultant_msg = ( | |
f"📞 Contact Veterinary Consultant\n\n" | |
f"Product: {product_name}\n\n" | |
"Please provide your details:\n" | |
"* Name and location\n" | |
"* Specific inquiry\n\n" | |
"💬 Example: Dr. Ali - Multan - Need consultation for respiratory problems\n\n" | |
"Type main at any time to go to main menu." | |
) | |
send_whatsjet_message(from_number, consultant_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='contact_request', | |
current_menu='contact_request', | |
current_menu_options=['Provide contact details'] | |
) | |
elif selection == '2': | |
# Inquire about Product Availability | |
await handle_availability_inquiry(from_number, user_context) | |
elif selection == '3': | |
welcome_msg = generate_veterinary_welcome_message(from_number, user_context) | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
return | |
else: | |
send_whatsjet_message(from_number, "❌ Invalid selection. Please choose 1, 2, or 3.") | |
return | |
except Exception as e: | |
logger.error(f"Error in product follow-up: {e}") | |
user_context = context_manager.get_context(from_number) | |
welcome_msg = generate_veterinary_welcome_message(from_number, user_context) | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
# Add or update the following functions in app.py: | |
# --- Restore handle_voice_message_complete --- | |
async def handle_voice_message_complete(from_number: str, msg: dict): | |
"""Complete voice message processing with OpenAI transcription - treats voice exactly like text""" | |
try: | |
logger.info(f"[Voice] Processing voice message from {from_number}") | |
logger.info(f"[Voice] Message structure: {msg}") | |
# Check if OpenAI is available | |
if not OPENAI_API_KEY: | |
send_whatsjet_message(from_number, | |
"🎤 Voice messages require OpenAI API. Please send a text message or type 'main' to see the menu.") | |
return | |
# Extract media URL from different possible locations | |
media_url = None | |
logger.info(f"[Voice] Checking media URL locations...") | |
if msg.get('media', {}).get('link'): | |
media_url = msg.get('media', {}).get('link') | |
logger.info(f"[Voice] Found media URL in media.link: {media_url}") | |
elif msg.get('media', {}).get('url'): | |
media_url = msg.get('media', {}).get('url') | |
logger.info(f"[Voice] Found media URL in media.url: {media_url}") | |
elif msg.get('url'): | |
media_url = msg.get('url') | |
logger.info(f"[Voice] Found media URL in url: {media_url}") | |
elif msg.get('audio', {}).get('url'): | |
media_url = msg.get('audio', {}).get('url') | |
logger.info(f"[Voice] Found media URL in audio.url: {media_url}") | |
else: | |
logger.error(f"[Voice] No media URL found in message structure") | |
logger.error(f"[Voice] Available fields: {list(msg.keys())}") | |
if 'media' in msg: | |
logger.error(f"[Voice] Media fields: {list(msg['media'].keys())}") | |
logger.info(f"[Voice] Final extracted media URL: {media_url}") | |
if not media_url: | |
send_whatsjet_message(from_number, "❌ Could not process voice message. Please try again.") | |
return | |
# Generate unique filename | |
filename = f"voice_{from_number}_{int(time.time())}.ogg" | |
# Download voice file | |
file_path = await download_voice_file(media_url, filename) | |
if not file_path: | |
send_whatsjet_message(from_number, "❌ Failed to download voice message. Please try again.") | |
return | |
# Transcribe with OpenAI | |
transcribed_text = await transcribe_voice_with_openai(file_path) | |
# Clean up voice file immediately | |
try: | |
os.remove(file_path) | |
except: | |
pass | |
# Handle empty or failed transcription | |
if not transcribed_text or transcribed_text.strip() == "": | |
logger.warning(f"[Voice] Empty transcription for {from_number}") | |
send_whatsjet_message(from_number, | |
"🎤 *Voice Message Issue*\n\n" | |
"I couldn't hear anything in your voice message. This can happen due to:\n" | |
"• Very short voice note\n" | |
"• Background noise\n" | |
"• Microphone too far away\n" | |
"• Audio quality issues\n\n" | |
"💡 *Tips for better voice notes:*\n" | |
"• Speak clearly and slowly\n" | |
"• Keep phone close to mouth\n" | |
"• Record in quiet environment\n" | |
"• Make voice note at least 2-3 seconds\n\n" | |
"💬 *You can also:*\n" | |
"• Send a text message\n" | |
"• Type 'main' to see menu options\n" | |
"• Try voice note again") | |
return | |
# Process transcribed text with full intelligence | |
logger.info(f"[Voice] Transcribed: {transcribed_text}") | |
# Apply transcription error corrections | |
corrected_text = process_voice_input(transcribed_text) | |
if corrected_text != transcribed_text: | |
logger.info(f"[Voice] Applied corrections: '{transcribed_text}' -> '{corrected_text}'") | |
transcribed_text = corrected_text | |
# Detect language of transcribed text | |
detected_lang = 'en' # Default to English | |
try: | |
detected_lang = detect(transcribed_text) | |
logger.info(f"[Voice] Detected language: {detected_lang}") | |
# Map language codes to supported languages | |
lang_mapping = { | |
'ur': 'ur', # Urdu | |
'ar': 'ur', # Arabic (treat as Urdu for Islamic greetings) | |
'en': 'en', # English | |
'hi': 'ur', # Hindi (treat as Urdu) | |
'bn': 'ur', # Bengali (treat as Urdu) | |
'pa': 'ur', # Punjabi (treat as Urdu) | |
'id': 'ur', # Indonesian (often misdetected for Urdu/Arabic) | |
'ms': 'ur', # Malay (often misdetected for Urdu/Arabic) | |
'tr': 'ur', # Turkish (often misdetected for Urdu/Arabic) | |
} | |
# Check if text contains Urdu/Arabic characters or Islamic greetings | |
urdu_arabic_pattern = re.compile(r'[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF]') | |
islamic_greetings = ['assalamu', 'assalam', 'salam', 'salaam', 'adaab', 'namaste', 'khuda', 'allah'] | |
has_urdu_chars = bool(urdu_arabic_pattern.search(transcribed_text)) | |
has_islamic_greeting = any(greeting in transcribed_text.lower() for greeting in islamic_greetings) | |
if has_urdu_chars or has_islamic_greeting: | |
detected_lang = 'ur' | |
logger.info(f"[Voice] Overriding language detection to Urdu due to Arabic/Urdu characters or Islamic greeting") | |
reply_language = lang_mapping.get(detected_lang, 'en') | |
logger.info(f"[Voice] Language '{detected_lang}' mapped to: {reply_language}") | |
except Exception as e: | |
logger.warning(f"[Voice] Language detection failed: {e}") | |
reply_language = 'en' | |
if reply_language not in ['en', 'ur']: | |
logger.info(f"[Voice] Language '{reply_language}' not supported, defaulting to English") | |
reply_language = 'en' | |
# For Urdu voice notes, translate to English for processing | |
processing_text = transcribed_text | |
if reply_language == 'ur' and detected_lang == 'ur': | |
try: | |
logger.info(f"[Voice] Translating Urdu voice note to English for processing") | |
translated_text = GoogleTranslator(source='ur', target='en').translate(transcribed_text) | |
processing_text = translated_text | |
logger.info(f"[Voice] Translated to English: {translated_text}") | |
except Exception as e: | |
logger.error(f"[Voice] Translation failed: {e}") | |
# If translation fails, use original text | |
processing_text = transcribed_text | |
# Determine reply language - always respond in English or Urdu | |
if detected_lang == 'ur': | |
reply_language = 'ur' # Urdu voice notes get Urdu replies | |
else: | |
reply_language = 'en' # All other languages get English replies | |
logger.info(f"[Voice] Processing text: {processing_text}") | |
logger.info(f"[Voice] Reply language set to: {reply_language}") | |
# Check if this is a greeting in voice note (check both original and translated) | |
if is_greeting(transcribed_text) or is_greeting(processing_text): | |
logger.info(f"[Voice] Greeting detected in voice note: {transcribed_text}") | |
# Check if user is currently in AI chat mode - if so, don't trigger menu mode | |
user_context = context_manager.get_context(from_number) | |
current_state = user_context.get('current_state', 'main_menu') | |
if current_state == 'ai_chat_mode': | |
logger.info(f"[Voice] User is in AI chat mode, treating greeting as AI query instead of menu trigger") | |
# Treat greeting as a general query in AI chat mode | |
await handle_general_query_with_ai(from_number, processing_text, user_context, reply_language) | |
return | |
else: | |
# Only trigger menu mode if not in AI chat mode | |
welcome_msg = generate_veterinary_welcome_message(from_number, user_context) | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context(from_number, current_state='main_menu', current_menu='main_menu', current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values())) | |
return | |
# Process the translated text using the same strict state-based logic as text messages | |
# This ensures voice messages follow the same menu and state rules as text messages | |
await process_incoming_message(from_number, { | |
'body': processing_text, # Use translated text for processing | |
'type': 'text', | |
'reply_language': reply_language, | |
'original_transcription': transcribed_text # Keep original for context | |
}) | |
except Exception as e: | |
logger.error(f"[Voice] Error processing voice message: {e}") | |
logger.error(f"[Voice] Full error details: {str(e)}") | |
import traceback | |
logger.error(f"[Voice] Traceback: {traceback.format_exc()}") | |
send_whatsjet_message(from_number, | |
"❌ Error processing voice message. Please try a text message.") | |
# --- Update send_product_image_with_caption to latest version --- | |
async def send_product_image_with_caption(from_number: str, product: Dict[str, Any], user_context: Dict[str, Any]): | |
logger.info(f"[Product] send_product_image_with_caption called for: {product.get('Product Name', '')}") | |
""" | |
Send product image (if available) with product details as caption in a single WhatsApp message. | |
If image is not available, send only the product details as text. | |
Now supports 'Images' column in CSV (Google Drive or direct links). | |
""" | |
ensure_images_dir() | |
product_name = product.get('Product Name', 'Unknown Product') | |
details = generate_veterinary_product_response(product, user_context) | |
image_url = product.get('Images', '').strip() if 'Images' in product else '' | |
# Force image URL for Respira Aid Plus (use cPanel public URL) | |
if product_name.lower().strip() == "respira aid plus": | |
image_url = "https://amgocus.com/uploads/images/Respira%20Aid%20Plus.jpg" | |
logger.info(f"[Product] Processing image for product: {product_name}") | |
logger.info(f"[Product] Image URL from CSV: {image_url}") | |
try: | |
# First, check if we have an image URL from CSV | |
if image_url: | |
# Convert Google Drive link to direct download if needed | |
if 'drive.google.com' in image_url: | |
logger.info(f"[Product] Converting Google Drive link: {image_url}") | |
if '/d/' in image_url: | |
file_id = image_url.split('/d/')[1].split('/')[0] | |
elif 'id=' in image_url: | |
file_id = image_url.split('id=')[1].split('&')[0] | |
else: | |
file_id = '' | |
if file_id: | |
image_url = f"https://drive.google.com/uc?export=download&id={file_id}" | |
logger.info(f"[Product] Converted to direct download URL: {image_url}") | |
# Use the public URL directly for WhatsApp API | |
media_type = 'image/jpeg' | |
filename = f"{product_name.replace(' ', '_')}.jpg" | |
# Test the image URL first | |
try: | |
logger.info(f"[Product] Testing image URL accessibility: {image_url}") | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', | |
'Accept': 'image/webp,image/apng,image/*,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.9', | |
'Accept-Encoding': 'gzip, deflate, br', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1' | |
} | |
test_response = requests.get(image_url, headers=headers, timeout=10, stream=True, allow_redirects=True) | |
if test_response.status_code != 200: | |
logger.warning(f"[Product] Image URL not accessible (status {test_response.status_code}): {image_url}") | |
raise Exception(f"Image URL not accessible: {test_response.status_code}") | |
logger.info(f"[Product] Image URL is accessible") | |
except Exception as e: | |
logger.warning(f"[Product] Failed to test image URL {image_url}: {e}") | |
image_url = None | |
# Send using public URL (not local file) | |
if image_url: | |
logger.info(f"[Product] Attempting to send image from CSV URL for: {product_name}") | |
success = send_whatsjet_message( | |
from_number, | |
details, | |
media_type=media_type, | |
media_path=image_url, # Use public URL directly | |
filename=filename | |
) | |
if success: | |
logger.info(f"[Product] Successfully sent image from CSV link with caption for product: {product_name}") | |
return | |
else: | |
logger.warning(f"[Product] Failed to send image from CSV link, trying fallback: {product_name}") | |
# Fallback 1: Try with a known public test image | |
logger.info(f"[Product] Trying public test image for: {product_name}") | |
test_image_url = "https://www.w3schools.com/w3images/lights.jpg" | |
media_type = 'image/jpeg' | |
filename = f"{product_name.replace(' ', '_')}.jpg" | |
success = send_whatsjet_message( | |
from_number, | |
details, | |
media_type=media_type, | |
media_path=test_image_url, | |
filename=filename | |
) | |
if success: | |
logger.info(f"[Product] Successfully sent test image with caption for product: {product_name}") | |
return | |
# Fallback 2: Try local uploads directory (public URL) | |
logger.info(f"[Product] Trying local uploads directory for: {product_name}") | |
image_path = get_product_image_path(product_name) | |
if image_path: | |
media_type = get_product_image_media_type(image_path) | |
filename = f"{product_name.replace(' ', '_')}.jpg" | |
# If it's a public URL, send as image+caption | |
if image_path.startswith('http'): | |
logger.info(f"[Product] Sending normalized cPanel public URL with caption: {image_path}") | |
# Send image first using the correct method for URLs | |
image_success = send_whatsjet_media_image_only( | |
from_number, | |
image_path, | |
filename=filename | |
) | |
if image_success: | |
logger.info(f"[Product] Image sent successfully, now sending product details") | |
# Send product details as text | |
send_whatsjet_message(from_number, details) | |
return | |
else: | |
logger.warning(f"[Product] Failed to send image, sending text only") | |
send_whatsjet_message(from_number, details) | |
return | |
# Otherwise, treat as local file | |
elif os.path.exists(image_path): | |
logger.info(f"[Product] Sending local file with caption: {image_path}") | |
send_whatsjet_message( | |
from_number, | |
details, | |
media_type=media_type, | |
media_path=image_path, | |
filename=filename | |
) | |
return | |
# No image available, send text only | |
logger.info(f"[Product] No image available, sending text only for: {product_name}") | |
send_whatsjet_message(from_number, details) | |
except Exception as e: | |
logger.error(f"[Product] Error sending product image with caption: {e}") | |
logger.info(f"[Product] Falling back to text-only message for: {product_name}") | |
send_whatsjet_message(from_number, details) | |
if __name__ == "__main__": | |
# Load products data on startup | |
load_products_data() | |
# Launch FastAPI app | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=7860) | |