Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
Apex Biotical Veterinary WhatsApp Assistant - Premium Edition | |
The most effective and accurate veterinary Assistant in the market | |
""" | |
import os | |
import pandas as pd | |
import requests | |
import json | |
from fastapi import FastAPI, Request, Response, Form, HTTPException, File, UploadFile | |
from fastapi.responses import JSONResponse, HTMLResponse, FileResponse | |
import time | |
import re | |
from typing import List, Dict, Any, Optional, Tuple | |
import openai | |
from dotenv import load_dotenv | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.templating import Jinja2Templates | |
import uvicorn | |
from datetime import datetime, timedelta | |
from rapidfuzz import process, fuzz | |
from deep_translator import GoogleTranslator | |
import numpy as np | |
import logging | |
import base64 | |
import tempfile | |
from reportlab.pdfgen import canvas | |
from reportlab.lib.pagesizes import letter, A4 | |
from reportlab.lib.units import inch | |
from reportlab.lib import colors | |
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, PageBreak | |
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
from reportlab.lib.enums import TA_CENTER, TA_LEFT, TA_JUSTIFY | |
import io | |
import pathlib | |
from collections import defaultdict, Counter | |
import hashlib | |
import aiofiles | |
import asyncio | |
from difflib import SequenceMatcher | |
import httpx | |
import langdetect | |
from langdetect import detect | |
import threading | |
import shutil | |
# Configure advanced logging | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler('veterinary_bot.log', encoding='utf-8'), | |
logging.StreamHandler() | |
] | |
) | |
logger = logging.getLogger(__name__) | |
# Load environment variables | |
load_dotenv() | |
# Initialize FastAPI app | |
app = FastAPI(title="Apex Biotical Veterinary Assistant", version="2.0.0") | |
# Ensure static and uploads directories exist before mounting | |
os.makedirs('static', exist_ok=True) | |
os.makedirs('uploads', exist_ok=True) | |
# Mount static files and templates | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
app.mount("/uploads", StaticFiles(directory="uploads"), name="uploads") | |
templates = Jinja2Templates(directory="templates") | |
# Global variables with enhanced data structures | |
CSV_FILE = "Veterinary.csv" | |
products_df = None | |
user_contexts = {} | |
last_products = {} | |
conversation_history = defaultdict(list) | |
product_analytics = defaultdict(int) | |
session_data = {} | |
# Environment variables | |
WHATSJET_API_URL = os.getenv("WHATSJET_API_URL") | |
WHATSJET_VENDOR_UID = os.getenv("WHATSJET_VENDOR_UID") | |
WHATSJET_API_TOKEN = os.getenv("WHATSJET_API_TOKEN") | |
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") | |
SERVER_URL = os.getenv("SERVER_URL", "https://your-huggingface-space-url.hf.space") | |
# Initialize OpenAI client | |
if OPENAI_API_KEY: | |
openai.api_key = OPENAI_API_KEY | |
logger.info("✅ OpenAI client initialized successfully") | |
else: | |
logger.warning("⚠️ OpenAI API key not found - voice transcription will be disabled") | |
# Veterinary domain-specific constants | |
VETERINARY_CATEGORIES = { | |
'antibiotic': ['Antibiotic / Quinolone', 'Antibiotic / Respiratory Infections', 'Veterinary Injectable Solution (Antibiotic)'], | |
'respiratory': ['Respiratory Support', 'Respiratory / Mucolytic', 'Respiratory Support and Hygiene Enhancer'], | |
'liver': ['Liver & Kidney Support', 'Liver Tonic and Hepatoprotective Supplement'], | |
'vitamin': ['Multivitamin Supplement', 'Multivitamin Supplement for veterinary use', 'Vitamin and Amino Acid Supplement (Injectable Solution)'], | |
'supplement': ['Nutritional Supplement / Mycotoxins', 'Immunity Enhancer and Antioxidant Supplement'], | |
'mycotoxin': ['Mycotoxin Binder'], | |
'heat_stress': ['Heat Stress Support'], | |
'anticoccidial': ['Anticoccidial / Sulfonamide'], | |
'phytogenic': ['Phytogenic / Antibiotic Alternative'] | |
} | |
VETERINARY_SYMPTOMS = { | |
'respiratory': ['cough', 'breathing', 'respiratory', 'bronchitis', 'pneumonia', 'crd', 'coryza', 'flu'], | |
'liver': ['liver', 'hepatitis', 'jaundice', 'ascites', 'fatty liver'], | |
'diarrhea': ['diarrhea', 'diarrhoea', 'loose stool', 'gastroenteritis'], | |
'stress': ['stress', 'heat stress', 'transport', 'vaccination'], | |
'infection': ['infection', 'bacterial', 'viral', 'fungal', 'septicemia'], | |
'deficiency': ['vitamin deficiency', 'mineral deficiency', 'anemia'], | |
'mycotoxin': ['mycotoxin', 'mold', 'fungal toxin', 'aflatoxin'] | |
} | |
VETERINARY_SPECIES = { | |
'poultry': ['chicken', 'broiler', 'layer', 'turkey', 'duck', 'quail', 'poultry'], | |
'livestock': ['cattle', 'cow', 'buffalo', 'sheep', 'goat', 'livestock'], | |
'pet': ['dog', 'cat', 'pet', 'companion animal'] | |
} | |
# Menu Configuration - Define each menu with its valid options | |
MENU_CONFIG = { | |
'main_menu': { | |
'name': 'Main Menu', | |
'valid_options': ['1', '2', '3', '4'], | |
'option_descriptions': { | |
'1': 'Search Products', | |
'2': 'Browse Categories', | |
'3': 'Download Catalog', | |
'4': 'Chat with Veterinary AI Assistant' | |
} | |
}, | |
'category_selection_menu': { | |
'name': 'Category Selection Menu', | |
'valid_options': [], # Will be populated dynamically based on available categories | |
'option_descriptions': {} | |
}, | |
'category_products_menu': { | |
'name': 'Category Products Menu', | |
'valid_options': [], # Will be populated dynamically based on available products | |
'option_descriptions': {} | |
}, | |
'all_products_menu': { | |
'name': 'All Products Menu', | |
'valid_options': [], # Will be populated dynamically based on all products | |
'option_descriptions': {} | |
}, | |
'intelligent_products_menu': { | |
'name': 'Intelligent Products Menu', | |
'valid_options': [], # Will be populated dynamically based on available products | |
'option_descriptions': {} | |
}, | |
'product_inquiry': { | |
'name': 'Product Inquiry Menu', | |
'valid_options': ['1', '2', '3'], | |
'option_descriptions': { | |
'1': 'Talk to Veterinary Consultant', | |
'2': 'Inquire about Product Availability', | |
'3': 'Back to Main Menu' | |
} | |
}, | |
'ai_chat': { | |
'name': 'AI Chat Mode', | |
'valid_options': ['main'], | |
'option_descriptions': { | |
'main': 'Return to Main Menu' | |
} | |
} | |
} | |
def validate_menu_selection(selection: str, current_state: str, user_context: dict) -> tuple[bool, str]: | |
""" | |
Validate if a selection is valid for the current menu | |
Returns (is_valid, error_message) | |
""" | |
if current_state not in MENU_CONFIG: | |
return False, f"❌ Unknown menu state: {current_state}" | |
menu_config = MENU_CONFIG[current_state] | |
valid_options = menu_config['valid_options'] | |
# For dynamic menus, get valid options from context | |
if current_state == 'category_selection_menu': | |
available_categories = user_context.get('available_categories', []) | |
valid_options = [str(i) for i in range(1, len(available_categories) + 1)] | |
elif current_state == 'category_products_menu': | |
available_products = user_context.get('available_products', []) | |
valid_options = [str(i) for i in range(1, len(available_products) + 1)] | |
elif current_state == 'all_products_menu': | |
if products_df is not None and not products_df.empty: | |
valid_options = [str(i) for i in range(1, len(products_df) + 1)] | |
elif current_state == 'intelligent_products_menu': | |
available_products = user_context.get('available_products', []) | |
valid_options = [str(i) for i in range(1, len(available_products) + 1)] | |
# Check if selection is valid | |
if selection in valid_options: | |
return True, "" | |
# Generate error message with valid options | |
if valid_options: | |
error_msg = f"❌ Invalid selection for {menu_config['name']}. Valid options: {', '.join(valid_options)}" | |
else: | |
error_msg = f"❌ Invalid selection for {menu_config['name']}. No options available." | |
return False, error_msg | |
def get_menu_info(current_state: str, user_context: dict) -> dict: | |
""" | |
Get information about the current menu including valid options | |
""" | |
if current_state not in MENU_CONFIG: | |
return {"name": "Unknown Menu", "valid_options": [], "option_descriptions": {}} | |
menu_config = MENU_CONFIG[current_state].copy() | |
# For dynamic menus, populate valid options from context | |
if current_state == 'category_selection_menu': | |
available_categories = user_context.get('available_categories', []) | |
menu_config['valid_options'] = [str(i) for i in range(1, len(available_categories) + 1)] | |
menu_config['option_descriptions'] = { | |
str(i): category for i, category in enumerate(available_categories, 1) | |
} | |
elif current_state == 'category_products_menu': | |
available_products = user_context.get('available_products', []) | |
menu_config['valid_options'] = [str(i) for i in range(1, len(available_products) + 1)] | |
menu_config['option_descriptions'] = { | |
str(i): product.get('Product Name', f'Product {i}') | |
for i, product in enumerate(available_products, 1) | |
} | |
elif current_state == 'all_products_menu': | |
if products_df is not None and not products_df.empty: | |
all_products = products_df.to_dict('records') | |
menu_config['valid_options'] = [str(i) for i in range(1, len(all_products) + 1)] | |
menu_config['option_descriptions'] = { | |
str(i): product.get('Product Name', f'Product {i}') | |
for i, product in enumerate(all_products, 1) | |
} | |
elif current_state == 'intelligent_products_menu': | |
available_products = user_context.get('available_products', []) | |
menu_config['valid_options'] = [str(i) for i in range(1, len(available_products) + 1)] | |
menu_config['option_descriptions'] = { | |
str(i): product.get('Product Name', f'Product {i}') | |
for i, product in enumerate(available_products, 1) | |
} | |
return menu_config | |
# Voice processing functions | |
async def download_voice_file(media_url: str, filename: str) -> str: | |
"""Download voice file from WhatsApp""" | |
try: | |
# Create temp_voice directory if it doesn't exist | |
os.makedirs('temp_voice', exist_ok=True) | |
# Download the file | |
async with httpx.AsyncClient() as client: | |
response = await client.get(media_url) | |
response.raise_for_status() | |
file_path = os.path.join('temp_voice', filename) | |
with open(file_path, 'wb') as f: | |
f.write(response.content) | |
logger.info(f"Voice file downloaded: {file_path}") | |
return file_path | |
except Exception as e: | |
logger.error(f"Error downloading voice file: {e}") | |
return None | |
async def transcribe_voice_with_openai(file_path: str) -> str: | |
"""Transcribe voice file using OpenAI Whisper with comprehensive veterinary domain system prompt""" | |
try: | |
# Check if file exists and has content | |
if not os.path.exists(file_path): | |
logger.error(f"[Transcribe] File not found: {file_path}") | |
return None | |
file_size = os.path.getsize(file_path) | |
if file_size == 0: | |
logger.error(f"[Transcribe] Empty file: {file_path}") | |
return None | |
logger.info(f"[Transcribe] Transcribing file: {file_path} (size: {file_size} bytes)") | |
# Comprehensive system prompt for veterinary WhatsApp assistant | |
system_prompt = """ | |
You are transcribing voice messages for Apex Biotical Veterinary WhatsApp Assistant. This is a professional veterinary products chatbot. | |
CRITICAL: TRANSCRIBE ONLY ENGLISH OR URDU SPEECH - NOTHING ELSE | |
IMPORTANT RULES: | |
1. ONLY transcribe English or Urdu speech | |
2. If you hear unclear audio, transcribe as English | |
3. If you hear mixed languages, transcribe as English | |
4. Never transcribe gibberish or random characters | |
5. If audio is unclear, transcribe as "unclear audio" | |
6. Keep transcriptions simple and clean | |
PRODUCT NAMES (exact spelling required): | |
- Hydropex, Respira Aid Plus, Heposel, Bromacid, Hexatox | |
- APMA Fort, Para C.E, Tribiotic, PHYTO-SAL, Mycopex Super | |
- Eflin KT-20, Salcozine ST-30, Oftilex UA-10, Biscomin 10 | |
- Apvita Plus, B-G Aspro-C, EC-Immune, Liverpex, Symodex | |
- Respira Aid, Adek Gold, Immuno DX | |
MENU COMMANDS: | |
- Numbers: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 | |
- Navigation: main, menu, back, home, start | |
- Options: option, number, choice, select | |
GREETINGS: | |
- English: hi, hello, hey, good morning, good afternoon, good evening | |
- Urdu: salam, assalamu alaikum, adaab, namaste, khuda hafiz | |
TRANSCRIPTION RULES: | |
1. Transcribe exactly what you hear in English or Urdu | |
2. Convert numbers to digits (one->1, two->2, etc.) | |
3. Preserve product names exactly | |
4. If unclear, transcribe as "unclear audio" | |
5. Keep it simple and clean | |
6. No random characters or mixed languages | |
EXAMPLES: | |
- "hydropex" -> "hydropex" | |
- "respira aid plus" -> "respira aid plus" | |
- "option one" -> "1" | |
- "main menu" -> "main" | |
- "salam" -> "salam" | |
- "search products" -> "search products" | |
- Unclear audio -> "unclear audio" | |
""" | |
# First attempt with comprehensive system prompt | |
with open(file_path, 'rb') as audio_file: | |
transcript = openai.Audio.transcribe( | |
model="whisper-1", | |
file=audio_file, | |
language="en", # Start with English | |
prompt=system_prompt | |
) | |
transcribed_text = transcript.text.strip() | |
logger.info(f"[Transcribe] Voice transcribed (English): '{transcribed_text}'") | |
# If first attempt failed or seems unclear, try with Urdu-specific prompt | |
if not transcribed_text or len(transcribed_text.strip()) < 2: | |
logger.warning(f"[Transcribe] First attempt failed, trying with Urdu-specific prompt") | |
urdu_system_prompt = """ | |
You are transcribing Urdu voice messages for Apex Biotical Veterinary WhatsApp Assistant. | |
PRODUCT NAMES (Urdu/English): | |
- ہائیڈروپیکس (Hydropex) | |
- ریسپیرا ایڈ پلس (Respira Aid Plus) | |
- ہیپوسیل (Heposel) | |
- بروماسڈ (Bromacid) | |
- ہیکساٹوکس (Hexatox) | |
- اے پی ایم اے فورٹ (APMA Fort) | |
- پیرا سی ای (Para C.E) | |
- ٹرائی بیوٹک (Tribiotic) | |
- فائٹو سال (PHYTO-SAL) | |
- مائیکوپیکس سپر (Mycopex Super) | |
URDU NUMBERS: | |
- ایک (1), دو (2), تین (3), چار (4), پانچ (5) | |
- چھ (6), سات (7), آٹھ (8), نو (9), دس (10) | |
- گیارہ (11), بارہ (12), تیرہ (13), چودہ (14), پندرہ (15) | |
- سولہ (16), سترہ (17), اٹھارہ (18), انیس (19), بیس (20) | |
- اکیس (21), بائیس (22), تئیس (23) | |
URDU GREETINGS: | |
- سلام (salam), السلام علیکم (assalamu alaikum) | |
- آداب (adaab), نمستے (namaste), خدا حافظ (khuda hafiz) | |
URDU MENU COMMANDS: | |
- مین مینو (main menu), آپشن (option), نمبر (number) | |
- تلاش (search), براؤز (browse), ڈاؤن لوڈ (download) | |
- کیٹلاگ (catalog), رابطہ (contact), دستیابی (availability) | |
TRANSCRIPTION RULES: | |
1. Transcribe Urdu words in Urdu script | |
2. Convert Urdu numbers to digits | |
3. Handle mixed Urdu-English speech | |
4. Preserve product names exactly | |
5. Convert menu selections to numbers | |
""" | |
with open(file_path, 'rb') as audio_file: | |
transcript = openai.Audio.transcribe( | |
model="whisper-1", | |
file=audio_file, | |
language="ur", # Force Urdu | |
prompt=urdu_system_prompt | |
) | |
transcribed_text = transcript.text.strip() | |
logger.info(f"[Transcribe] Second attempt transcribed (Urdu): '{transcribed_text}'") | |
# Third attempt with mixed language prompt if still failing | |
if not transcribed_text or len(transcribed_text.strip()) < 2: | |
logger.warning(f"[Transcribe] Second attempt failed, trying with mixed language prompt") | |
mixed_system_prompt = """ | |
You are transcribing voice messages for a veterinary products WhatsApp assistant. The user may speak in English, Urdu, or a mix of both languages. | |
PRODUCT NAMES (exact spelling required): | |
Hydropex, Respira Aid Plus, Heposel, Bromacid, Hexatox, APMA Fort, Para C.E, Tribiotic, PHYTO-SAL, Mycopex Super, Eflin KT-20, Salcozine ST-30, Oftilex UA-10, Biscomin 10, Apvita Plus, B-G Aspro-C, EC-Immune, Liverpex, Symodex, Respira Aid, Adek Gold, Immuno DX | |
NUMBERS (convert to digits): | |
English: one->1, two->2, three->3, etc. | |
Urdu: aik->1, ek->1, do->2, teen->3, etc. | |
MENU COMMANDS: | |
main, menu, back, home, start, option, number, search, browse, download, catalog, contact, availability | |
GREETINGS: | |
hi, hello, salam, assalamu alaikum, adaab, namaste | |
TRANSCRIPTION RULES: | |
1. Transcribe exactly what you hear | |
2. Convert numbers to digits | |
3. Preserve product names exactly | |
4. Handle both languages | |
5. Convert menu selections to numbers | |
""" | |
with open(file_path, 'rb') as audio_file: | |
transcript = openai.Audio.transcribe( | |
model="whisper-1", | |
file=audio_file, | |
prompt=mixed_system_prompt | |
) | |
transcribed_text = transcript.text.strip() | |
logger.info(f"[Transcribe] Third attempt (mixed) transcribed: '{transcribed_text}'") | |
# Final check for empty transcription or unclear audio | |
if not transcribed_text or len(transcribed_text.strip()) < 2: | |
logger.warning(f"[Transcribe] Very short or empty transcription: '{transcribed_text}'") | |
return "unclear audio" | |
# Check for gibberish or mixed characters | |
if len(transcribed_text) > 10 and not re.search(r'[a-zA-Z\u0600-\u06FF]', transcribed_text): | |
logger.warning(f"[Transcribe] Gibberish detected: '{transcribed_text}'") | |
return "unclear audio" | |
# Check for too many special characters | |
special_char_ratio = len(re.findall(r'[^\w\s]', transcribed_text)) / len(transcribed_text) | |
if special_char_ratio > 0.3: | |
logger.warning(f"[Transcribe] Too many special characters: '{transcribed_text}'") | |
return "unclear audio" | |
return transcribed_text | |
except Exception as e: | |
logger.error(f"[Transcribe] Error transcribing voice: {e}") | |
logger.error(f"[Transcribe] File path: {file_path}") | |
return None | |
def process_voice_input(text: str) -> str: | |
"""Process and clean voice input text with veterinary domain-specific transcription error correction""" | |
if not text: | |
return "" | |
# Clean the text | |
processed_text = text.strip() | |
# Remove extra whitespace | |
processed_text = re.sub(r'\s+', ' ', processed_text) | |
# Basic punctuation cleanup | |
processed_text = processed_text.replace(' ,', ',').replace(' .', '.') | |
# Veterinary domain-specific transcription error corrections | |
transcription_fixes = { | |
# Common menu selection errors | |
'opium': 'option', | |
'opium numara': 'option number', | |
'opium number': 'option number', | |
'opium number one': 'option number one', | |
'opium number two': 'option number two', | |
'opium number three': 'option number three', | |
'opium one': 'option one', | |
'opium two': 'option two', | |
'opium three': 'option three', | |
'numara': 'number', | |
'numbara': 'number', | |
'numbra': 'number', | |
'numbra one': 'number one', | |
'numbra two': 'number two', | |
'numbra three': 'number three', | |
'numbra 1': 'number 1', | |
'numbra 2': 'number 2', | |
'numbra 3': 'number 3', | |
# Number fixes - only when they appear as standalone numbers | |
'aik': '1', | |
'ek': '1', | |
'do': '2', | |
'teen': '3', | |
'char': '4', | |
'panch': '5', | |
'che': '3', | |
'tree': '3', | |
'free': '3', | |
'for': '4', | |
'fiv': '5', | |
'sik': '6', | |
'sat': '7', | |
'ath': '8', | |
'nau': '9', | |
'das': '10', | |
# Navigation command fixes | |
'man': 'main', | |
'men': 'main', | |
'mean': 'main', | |
'mein': 'main', | |
'maine': 'main', | |
'menu': 'main', | |
'home': 'main', | |
'back': 'main', | |
'return': 'main', | |
# Veterinary product name corrections | |
'hydro pex': 'hydropex', | |
'hydro pex': 'hydropex', | |
'respira aid': 'respira aid plus', | |
'respira aid plus': 'respira aid plus', | |
'hepo sel': 'heposel', | |
'brom acid': 'bromacid', | |
'hexa tox': 'hexatox', | |
'apma fort': 'apma fort', | |
'para c': 'para c.e', | |
'para ce': 'para c.e', | |
'tribiotic': 'tribiotic', | |
'phyto sal': 'phyto-sal', | |
'mycopex': 'mycopex super', | |
'mycopex super': 'mycopex super', | |
'eflin': 'eflin kt-20', | |
'salcozine': 'salcozine st-30', | |
'oftilex': 'oftilex ua-10', | |
'biscomin': 'biscomin 10', | |
'apvita': 'apvita plus', | |
'bg aspro': 'b-g aspro-c', | |
'ec immune': 'ec-immune', | |
'liverpex': 'liverpex', | |
'symodex': 'symodex', | |
'adek': 'adek gold', | |
'immuno': 'immuno dx' | |
} | |
# Apply transcription fixes - but be careful with Islamic greetings | |
original_text = processed_text.lower() | |
# Special handling for Islamic greetings - don't change "aik" in "assalamu alaikum" | |
if 'assalamu alaikum' in original_text or 'assalam' in original_text: | |
# Don't apply number fixes to Islamic greetings | |
for wrong, correct in transcription_fixes.items(): | |
if wrong in original_text and wrong not in ['aik', 'ek']: # Skip number fixes for greetings | |
processed_text = processed_text.lower().replace(wrong, correct) | |
logger.info(f"Fixed transcription error: '{wrong}' -> '{correct}' in '{text}'") | |
else: | |
# Apply all fixes for non-greeting text | |
for wrong, correct in transcription_fixes.items(): | |
if wrong in original_text: | |
processed_text = processed_text.lower().replace(wrong, correct) | |
logger.info(f"Fixed transcription error: '{wrong}' -> '{correct}' in '{text}'") | |
logger.info(f"Voice input processed: '{text}' -> '{processed_text}'") | |
return processed_text | |
# Note: Voice messages are now processed exactly like text messages | |
# The transcribed voice text is passed directly to process_incoming_message | |
# This ensures consistent behavior between voice and text inputs | |
# Enhanced product search with veterinary domain expertise | |
def get_veterinary_product_matches(query: str) -> List[Dict[str, Any]]: | |
""" | |
Advanced veterinary product matching with domain-specific intelligence | |
""" | |
if not query: | |
return [] | |
if products_df is None: | |
load_products_data() | |
normalized_query = normalize(query).lower().strip() | |
logger.info(f"[Veterinary Search] Searching for: '{normalized_query}'") | |
# Skip very short queries that are likely menu selections | |
if len(normalized_query) <= 2 and normalized_query.isdigit(): | |
logger.info(f"[Veterinary Search] Skipping menu selection: '{normalized_query}'") | |
return [] | |
scored_matches = [] | |
# Veterinary-specific query expansion | |
expanded_queries = [normalized_query] | |
# Expand by symptoms | |
for symptom_category, symptoms in VETERINARY_SYMPTOMS.items(): | |
if any(symptom in normalized_query for symptom in symptoms): | |
expanded_queries.extend(symptoms) | |
# Expand by species | |
for species_category, species in VETERINARY_SPECIES.items(): | |
if any(sp in normalized_query for sp in species): | |
expanded_queries.extend(species) | |
# Expand by category | |
for category_key, categories in VETERINARY_CATEGORIES.items(): | |
if category_key in normalized_query: | |
expanded_queries.extend(categories) | |
# Common veterinary product variations | |
veterinary_variations = { | |
'hydropex': ['hydropex', 'hydro pex', 'electrolyte', 'dehydration', 'heat stress'], | |
'heposel': ['heposel', 'hepo sel', 'liver tonic', 'hepatoprotective'], | |
'bromacid': ['bromacid', 'brom acid', 'respiratory', 'mucolytic'], | |
'respira aid': ['respira aid', 'respira aid plus', 'respiratory support'], | |
'hexatox': ['hexatox', 'hexa tox', 'liver support', 'kidney support'], | |
'apma fort': ['apma fort', 'mycotoxin', 'liver support'], | |
'para c': ['para c', 'para c.e', 'heat stress', 'paracetamol'], | |
'tribiotic': ['tribiotic', 'antibiotic', 'respiratory infection'], | |
'phyto-sal': ['phyto-sal', 'phytogenic', 'vitamin supplement'], | |
'mycopex': ['mycopex', 'mycotoxin binder', 'mold'], | |
'oftilex': ['oftilex', 'ofloxacin', 'antibiotic'], | |
'biscomin': ['biscomin', 'oxytetracycline', 'injectable'], | |
'apvita': ['apvita', 'vitamin b', 'amino acid'], | |
'bg aspro': ['bg aspro', 'aspirin', 'vitamin c'], | |
'ec-immune': ['ec-immune', 'immune', 'immunity'], | |
'liverpex': ['liverpex', 'liver', 'metabolic'], | |
'symodex': ['symodex', 'multivitamin', 'vitamin'], | |
'adek gold': ['adek gold', 'vitamin', 'multivitamin'], | |
'immuno dx': ['immuno dx', 'immune', 'antioxidant'] | |
} | |
# Add veterinary variations | |
for key, variations in veterinary_variations.items(): | |
if key in normalized_query: | |
expanded_queries.extend(variations) | |
for _, row in products_df.iterrows(): | |
best_score = 0 | |
best_match_type = "" | |
match_details = {} | |
# Search across all relevant fields with veterinary weighting | |
search_fields = [ | |
('Product Name', row.get('Product Name', ''), 1.0), | |
('Category', row.get('Category', ''), 0.8), | |
('Indications', row.get('Indications', ''), 0.9), | |
('Target Species', row.get('Target Species', ''), 0.7), | |
('Type', row.get('Type', ''), 0.6), | |
('Composition', row.get('Composition', ''), 0.5) | |
] | |
for field_name, field_value, weight in search_fields: | |
if pd.isna(field_value) or not field_value: | |
continue | |
field_str = str(field_value).lower() | |
# Exact matches (highest priority) | |
for expanded_query in expanded_queries: | |
if expanded_query in field_str or field_str in expanded_query: | |
score = 100 * weight | |
if score > best_score: | |
best_score = score | |
best_match_type = "exact" | |
match_details = {"field": field_name, "query": expanded_query} | |
# Fuzzy matching for close matches | |
for expanded_query in expanded_queries: | |
if len(expanded_query) > 3: # Only fuzzy match longer queries | |
score = fuzz.partial_ratio(normalized_query, field_str) * weight | |
if score > best_score and score > 70: | |
best_score = score | |
best_match_type = "fuzzy" | |
match_details = {"field": field_name, "query": expanded_query} | |
if best_score > 70: | |
product_dict = row.to_dict() | |
product_dict['_score'] = best_score | |
product_dict['_match_type'] = best_match_type | |
product_dict['_match_details'] = match_details | |
scored_matches.append(product_dict) | |
scored_matches.sort(key=lambda x: x['_score'], reverse=True) | |
# Remove duplicates based on product name | |
seen_names = set() | |
unique_matches = [] | |
for match in scored_matches: | |
if match['Product Name'] not in seen_names: | |
seen_names.add(match['Product Name']) | |
unique_matches.append(match) | |
return unique_matches | |
def normalize(text: str) -> str: | |
"""Normalize text for search""" | |
if not text: | |
return "" | |
# Convert to lowercase and remove extra whitespace | |
normalized = text.lower().strip() | |
# Remove special characters but keep spaces | |
normalized = re.sub(r'[^\w\s]', '', normalized) | |
# Replace multiple spaces with single space | |
normalized = re.sub(r'\s+', ' ', normalized) | |
return normalized | |
# Enhanced context management with veterinary domain awareness | |
class VeterinaryContextManager: | |
def __init__(self): | |
self.user_contexts = {} | |
self.conversation_history = defaultdict(list) | |
self.product_analytics = defaultdict(int) | |
self.session_data = {} | |
def get_context(self, phone_number: str) -> Dict[str, Any]: | |
"""Get or create user context with veterinary domain awareness""" | |
if phone_number not in self.user_contexts: | |
self.user_contexts[phone_number] = { | |
"current_state": "main_menu", | |
"current_menu": "main_menu", | |
"current_menu_options": ["Search Veterinary Products", "Browse Categories", "Download Catalog"], | |
"current_product": None, | |
"current_category": None, | |
"search_history": [], | |
"product_interests": [], | |
"species_preference": None, | |
"symptom_context": None, | |
"last_interaction": datetime.now(), | |
"session_start": datetime.now(), | |
"interaction_count": 0, | |
"last_message": "", | |
"available_categories": [], | |
"available_products": [] | |
} | |
return self.user_contexts[phone_number] | |
def update_context(self, phone_number: str, **kwargs): | |
"""Update user context with veterinary domain data""" | |
context = self.get_context(phone_number) | |
context.update(kwargs) | |
context["last_interaction"] = datetime.now() | |
context["interaction_count"] += 1 | |
# Track product interests for recommendations | |
if "current_product" in kwargs and kwargs["current_product"]: | |
product_name = kwargs["current_product"].get("Product Name", "") | |
if product_name: | |
context["product_interests"].append(product_name) | |
self.product_analytics[product_name] += 1 | |
def add_to_history(self, phone_number: str, message: str, response: str): | |
"""Add interaction to conversation history""" | |
self.conversation_history[phone_number].append({ | |
"timestamp": datetime.now(), | |
"user_message": message, | |
"bot_response": response | |
}) | |
# Keep only last 20 interactions | |
if len(self.conversation_history[phone_number]) > 20: | |
self.conversation_history[phone_number] = self.conversation_history[phone_number][-20:] | |
def get_recommendations(self, phone_number: str) -> List[Dict[str, Any]]: | |
"""Get personalized product recommendations based on user history""" | |
context = self.get_context(phone_number) | |
recommendations = [] | |
# Recommend based on product interests | |
if context["product_interests"]: | |
for product_name in context["product_interests"][-3:]: # Last 3 products | |
products = get_veterinary_product_matches(product_name) | |
if products: | |
# Find related products in same category | |
category = products[0].get("Category", "") | |
if category: | |
category_products = get_products_by_category(category) | |
for product in category_products[:3]: | |
if product.get("Product Name") != product_name: | |
recommendations.append(product) | |
# Remove duplicates and limit | |
seen = set() | |
unique_recommendations = [] | |
for rec in recommendations: | |
name = rec.get("Product Name", "") | |
if name and name not in seen: | |
seen.add(name) | |
unique_recommendations.append(rec) | |
return unique_recommendations[:5] | |
# Initialize context manager | |
context_manager = VeterinaryContextManager() | |
# Enhanced product response with veterinary domain expertise | |
def generate_veterinary_product_response(product_info: Dict[str, Any], user_context: Dict[str, Any]) -> str: | |
"""Generate comprehensive veterinary product response with intelligent information handling""" | |
def clean_text(text): | |
if pd.isna(text) or text is None: | |
return "Not specified" | |
return str(text).strip() | |
# Extract product details | |
product_name = clean_text(product_info.get('Product Name', '')) | |
product_type = clean_text(product_info.get('Type', '')) | |
category = clean_text(product_info.get('Category', '')) | |
indications = clean_text(product_info.get('Indications', '')) | |
# Check for PDF link in the CSV data | |
pdf_link = "" | |
try: | |
# Load CSV data to check for PDF link | |
csv_data = pd.read_csv('Veterinary.csv') | |
product_row = csv_data[csv_data['Product Name'] == product_name] | |
if not product_row.empty: | |
brochure_link = product_row.iloc[0].get('Brochure (PDF)', '') | |
if pd.notna(brochure_link) and brochure_link.strip(): | |
pdf_link = brochure_link.strip() | |
except Exception as e: | |
logger.warning(f"Error checking PDF link for {product_name}: {e}") | |
# Build the response | |
response = f"""🧪 *Name:* {product_name} | |
📦 *Type:* {product_type} | |
🏥 *Category:* {category} | |
💊 *Used For:* {indications}""" | |
# Add PDF link if available, in the requested format | |
if pdf_link: | |
response += f"\n\n📄 Product Brochure Available\n🔗 {product_name} PDF:\n{pdf_link}" | |
# Add menu options | |
response += f""" | |
💬 *Available Actions:* | |
1️⃣ Talk to Veterinary Consultant | |
2️⃣ Inquire About Availability | |
3️⃣ Back to Main Menu | |
💬 Select an option or ask about related products""" | |
return response | |
def clean_text_for_pdf(text: str) -> str: | |
"""Clean text for PDF generation""" | |
if pd.isna(text) or text is None: | |
return "N/A" | |
cleaned = str(text) | |
# Remove or replace problematic characters for PDF | |
cleaned = cleaned.replace('â€"', '-').replace('â€"', '"').replace('’', "'") | |
cleaned = cleaned.replace('“', '"').replace('â€', '"').replace('…', '...') | |
cleaned = re.sub(r'[^\w\s\-.,()%:;]', '', cleaned) | |
return cleaned.strip() | |
# Enhanced PDF generation with veterinary domain expertise | |
def generate_veterinary_pdf(product: Dict[str, Any]) -> bytes: | |
""" | |
Generate comprehensive veterinary PDF with professional formatting | |
""" | |
buffer = io.BytesIO() | |
doc = SimpleDocTemplate(buffer, pagesize=A4) | |
styles = getSampleStyleSheet() | |
# Veterinary-specific styles | |
title_style = ParagraphStyle( | |
'VeterinaryTitle', | |
parent=styles['Heading1'], | |
fontSize=18, | |
spaceAfter=25, | |
alignment=TA_CENTER, | |
textColor=colors.darkblue, | |
fontName='Helvetica-Bold' | |
) | |
heading_style = ParagraphStyle( | |
'VeterinaryHeading', | |
parent=styles['Heading2'], | |
fontSize=14, | |
spaceAfter=12, | |
textColor=colors.darkgreen, | |
fontName='Helvetica-Bold' | |
) | |
normal_style = ParagraphStyle( | |
'VeterinaryNormal', | |
parent=styles['Normal'], | |
fontSize=11, | |
spaceAfter=8, | |
alignment=TA_JUSTIFY, | |
fontName='Helvetica' | |
) | |
# Build PDF content | |
story = [] | |
# Header with veterinary branding | |
story.append(Paragraph("🏥 APEX BIOTICAL VETERINARY PRODUCTS", title_style)) | |
story.append(Spacer(1, 20)) | |
# Product information | |
product_name = clean_text_for_pdf(product.get('Product Name', 'Unknown Product')) | |
story.append(Paragraph(f"<b>Product: {product_name}</b>", heading_style)) | |
story.append(Spacer(1, 15)) | |
# Clinical information table | |
clinical_info = [ | |
['Field', 'Information'], | |
['Product Name', clean_text_for_pdf(product.get('Product Name', 'N/A'))], | |
['Category', clean_text_for_pdf(product.get('Category', 'N/A'))], | |
['Target Species', clean_text_for_pdf(product.get('Target Species', 'N/A'))], | |
['Product Type', clean_text_for_pdf(product.get('Type', 'N/A'))] | |
] | |
clinical_table = Table(clinical_info, colWidths=[2*inch, 4*inch]) | |
clinical_table.setStyle(TableStyle([ | |
('BACKGROUND', (0, 0), (-1, 0), colors.darkblue), | |
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke), | |
('ALIGN', (0, 0), (-1, -1), 'LEFT'), | |
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'), | |
('FONTSIZE', (0, 0), (-1, 0), 12), | |
('BOTTOMPADDING', (0, 0), (-1, 0), 12), | |
('BACKGROUND', (0, 1), (-1, -1), colors.lightblue), | |
('GRID', (0, 0), (-1, -1), 1, colors.black) | |
])) | |
story.append(Paragraph("Clinical Information", heading_style)) | |
story.append(clinical_table) | |
story.append(Spacer(1, 20)) | |
# Clinical details | |
if product.get('Indications'): | |
story.append(Paragraph("Clinical Indications", heading_style)) | |
story.append(Paragraph(clean_text_for_pdf(product.get('Indications')), normal_style)) | |
story.append(Spacer(1, 15)) | |
if product.get('Composition'): | |
story.append(Paragraph("Composition", heading_style)) | |
story.append(Paragraph(clean_text_for_pdf(product.get('Composition')), normal_style)) | |
story.append(Spacer(1, 15)) | |
if product.get('Dosage & Administration'): | |
story.append(Paragraph("Dosage & Administration", heading_style)) | |
story.append(Paragraph(clean_text_for_pdf(product.get('Dosage & Administration')), normal_style)) | |
story.append(Spacer(1, 15)) | |
if product.get('Precautions'): | |
story.append(Paragraph("Precautions", heading_style)) | |
story.append(Paragraph(clean_text_for_pdf(product.get('Precautions')), normal_style)) | |
story.append(Spacer(1, 15)) | |
if product.get('Storage'): | |
story.append(Paragraph("Storage", heading_style)) | |
story.append(Paragraph(clean_text_for_pdf(product.get('Storage')), normal_style)) | |
story.append(Spacer(1, 15)) | |
# Veterinary disclaimer | |
story.append(Paragraph("Veterinary Disclaimer", heading_style)) | |
disclaimer_text = ( | |
"This product should be used under veterinary supervision. " | |
"Always consult with a qualified veterinarian before administration. " | |
"Follow dosage instructions precisely and monitor animal response. " | |
"Store according to manufacturer guidelines and keep out of reach of children." | |
) | |
story.append(Paragraph(disclaimer_text, normal_style)) | |
# Build PDF | |
doc.build(story) | |
buffer.seek(0) | |
return buffer.getvalue() | |
async def send_catalog_pdf(phone_number: str): | |
"""Send the complete product catalog as a link to the PDF""" | |
try: | |
# Use the correct Google Drive link converted to direct download format | |
catalog_url = "https://drive.google.com/uc?export=download&id=1mxpkFf3DY-n3QHzZBe_CdksR-gHu2f_0" | |
message = ( | |
"📋 *Apex Biotical Veterinary Products Catalog*\n\n" | |
"📄 Here's your complete product catalog with all our veterinary products:\n" | |
f"📎 [Apex Biotical Veterinary Products Catalog.pdf]({catalog_url})\n\n" | |
"💬 For detailed information about any specific product, type its name or contact our sales team.\n\n" | |
"Type main at any time to return to the main menu." | |
) | |
send_whatsjet_message(phone_number, message) | |
except Exception as e: | |
logger.error(f"Error sending catalog: {e}") | |
send_whatsjet_message(phone_number, | |
"❌ Error sending catalog. Please try again or contact our sales team for assistance.") | |
async def send_individual_product_pdf(phone_number: str, product: Dict[str, Any]): | |
"""Send individual product PDF with download link""" | |
try: | |
# Generate PDF for the product | |
pdf_content = generate_veterinary_pdf(product) | |
# Create filename | |
product_name = product.get('Product Name', 'Unknown_Product') | |
safe_name = re.sub(r'[^\w\s-]', '', product_name).replace(' ', '_') | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
filename = f"{safe_name}_{timestamp}.pdf" | |
# Save PDF to uploads directory | |
uploads_dir = "../uploads" | |
os.makedirs(uploads_dir, exist_ok=True) | |
pdf_path = os.path.join(uploads_dir, filename) | |
with open(pdf_path, 'wb') as f: | |
f.write(pdf_content) | |
# Generate download URL | |
base_url = os.getenv("PUBLIC_BASE_URL", "http://localhost:8000") | |
download_url = f"{base_url}/uploads/{filename}" | |
# Send PDF via WhatsApp media | |
success = send_whatsjet_message( | |
phone_number, | |
f"📄 *{product_name} - Product Information*\n\nHere's the detailed product information in PDF format.", | |
media_type="application/pdf", | |
media_path=pdf_path, | |
filename=filename | |
) | |
# Also send direct download link as backup | |
if success: | |
message = ( | |
f"📄 *{product_name} - Product Information*\n\n" | |
"📎 [Direct Download Link]({download_url})\n\n" | |
"💬 *If the PDF didn't download, use the link above*\n" | |
"Type 'main' to return to main menu." | |
) | |
send_whatsjet_message(phone_number, message) | |
else: | |
# If media send failed, send only the link | |
message = ( | |
f"📄 *{product_name} - Product Information*\n\n" | |
"📎 [Download Product PDF]({download_url})\n\n" | |
"💬 *Click the link above to download the product information*\n" | |
"Type 'main' to return to main menu." | |
) | |
send_whatsjet_message(phone_number, message) | |
except Exception as e: | |
logger.error(f"Error sending individual product PDF: {e}") | |
send_whatsjet_message(phone_number, | |
"❌ Error generating product PDF. Please try again or contact our sales team for assistance.") | |
# --- WhatsJet Message Sending --- | |
def split_message_for_whatsapp(message: str, max_length: int = 1000) -> list: | |
"""Split a long message into chunks for WhatsApp (max 1000 chars per message).""" | |
return [message[i:i+max_length] for i in range(0, len(message), max_length)] | |
def send_whatsjet_message(phone_number: str, message: str, media_type: str = None, media_path: str = None, filename: str = None) -> bool: | |
"""Send a message using WhatsJet API with optional media attachment or public URL""" | |
if not all([WHATSJET_API_URL, WHATSJET_VENDOR_UID, WHATSJET_API_TOKEN]): | |
logger.error("[WhatsJet] Missing environment variables.") | |
return False | |
url = f"{WHATSJET_API_URL}/{WHATSJET_VENDOR_UID}/contact/send-message?token={WHATSJET_API_TOKEN}" | |
# Handle media messages (local file or public URL) | |
if media_type and media_path: | |
# If media_path is a public URL, use media_url and send caption | |
if isinstance(media_path, str) and media_path.startswith("http"): | |
# Try different payload formats for WhatsJet API | |
payload_formats = [ | |
# Format 1: Using caption field | |
{ | |
"phone_number": phone_number, | |
"caption": message, | |
"media_type": media_type, | |
"media_url": media_path, | |
"media_filename": filename or os.path.basename(media_path) | |
}, | |
# Format 2: Using message_body instead of caption | |
{ | |
"phone_number": phone_number, | |
"message_body": message, | |
"media_type": media_type, | |
"media_url": media_path, | |
"media_filename": filename or os.path.basename(media_path) | |
}, | |
# Format 3: Simplified format without media_filename | |
{ | |
"phone_number": phone_number, | |
"message_body": message, | |
"media_type": media_type, | |
"media_url": media_path | |
}, | |
# Format 4: Using different field names | |
{ | |
"phone_number": phone_number, | |
"caption": message, | |
"type": media_type, | |
"url": media_path | |
} | |
] | |
for i, payload in enumerate(payload_formats, 1): | |
try: | |
logger.info(f"[WhatsJet] Trying payload format {i}: {payload}") | |
response = httpx.post( | |
url, | |
json=payload, | |
timeout=15 | |
) | |
if response.status_code == 200: | |
logger.info(f"[WhatsJet] Media URL message sent successfully with format {i} to {phone_number}") | |
return True | |
else: | |
logger.warning(f"[WhatsJet] Format {i} failed with status {response.status_code}: {response.text[:200]}") | |
except Exception as e: | |
logger.warning(f"[WhatsJet] Format {i} exception: {e}") | |
continue | |
# If all formats failed, log the error and return False | |
logger.error(f"[WhatsJet] All media URL payload formats failed for {phone_number}") | |
return False | |
else: | |
# Local file logic as before | |
try: | |
with open(media_path, 'rb') as f: | |
media_content = f.read() | |
media_b64 = base64.b64encode(media_content).decode('utf-8') | |
payload = { | |
"phone_number": phone_number, | |
"message_body": message, | |
'media_type': media_type, | |
'media_content': media_b64, | |
'media_filename': filename or os.path.basename(media_path) | |
} | |
try: | |
response = httpx.post( | |
url, | |
json=payload, | |
timeout=15 | |
) | |
response.raise_for_status() | |
logger.info(f"[WhatsJet] Media message sent successfully to {phone_number}") | |
return True | |
except Exception as e: | |
logger.error(f"[WhatsJet] Exception sending media message: {e}") | |
return False | |
except Exception as e: | |
logger.error(f"[WhatsJet] Exception preparing media message: {str(e)}") | |
return False | |
# Handle text messages | |
if not message.strip(): | |
return True # Don't send empty messages | |
for chunk in split_message_for_whatsapp(message): | |
try: | |
payload = {"phone_number": phone_number, "message_body": chunk} | |
try: | |
response = httpx.post( | |
url, | |
json=payload, | |
timeout=15 | |
) | |
response.raise_for_status() | |
logger.info(f"[WhatsJet] Text chunk sent successfully to {phone_number}") | |
except Exception as e: | |
logger.error(f"[WhatsJet] Exception sending text chunk: {e}") | |
return False | |
except Exception as e: | |
logger.error(f"[WhatsJet] Exception preparing text chunk: {str(e)}") | |
return False | |
logger.info(f"[WhatsJet] Successfully sent complete text message to {phone_number}") | |
return True | |
def send_whatsjet_media_image_only(phone_number: str, image_url: str, filename: str = None) -> bool: | |
"""Send an image with optional caption using WhatsJet's /contact/send-media-message endpoint.""" | |
if not all([WHATSJET_API_URL, WHATSJET_VENDOR_UID, WHATSJET_API_TOKEN]): | |
logger.error("[WhatsJet] Missing environment variables for media message.") | |
return False | |
url = f"{WHATSJET_API_URL}/{WHATSJET_VENDOR_UID}/contact/send-media-message" | |
headers = { | |
"Authorization": f"Bearer {WHATSJET_API_TOKEN}", | |
"Content-Type": "application/json" | |
} | |
payload = { | |
"phone_number": phone_number, | |
"media_type": "image", | |
"media_url": image_url | |
} | |
if filename: | |
payload["file_name"] = filename | |
try: | |
logger.info(f"[WhatsJet] Sending image with payload: {payload}") | |
response = httpx.post(url, json=payload, headers=headers, timeout=30) | |
logger.info(f"[WhatsJet] Image response status: {response.status_code}") | |
logger.info(f"[WhatsJet] Image response body: {response.text[:500]}...") | |
if response.status_code == 200: | |
logger.info(f"[WhatsJet] Image sent successfully to {phone_number}") | |
return True | |
else: | |
logger.error(f"[WhatsJet] Failed to send image: {response.status_code} - {response.text}") | |
return False | |
except Exception as e: | |
logger.error(f"[WhatsJet] Exception sending image: {e}") | |
return False | |
# --- Health Check Endpoint --- | |
async def health_check(): | |
"""Health check endpoint""" | |
return { | |
"status": "healthy", | |
"timestamp": datetime.now().isoformat(), | |
"products_loaded": len(products_df) if products_df is not None else 0, | |
"openai_available": bool(OPENAI_API_KEY), | |
"whatsjet_configured": bool(all([WHATSJET_API_URL, WHATSJET_VENDOR_UID, WHATSJET_API_TOKEN])) | |
} | |
async def test_voice(): | |
"""Test endpoint to check voice processing logic""" | |
return { | |
"voice_detection": { | |
"audio_type": "audio" in ['audio', 'voice'], | |
"voice_type": "voice" in ['audio', 'voice'], | |
"media_audio": {'type': 'audio'}.get('type') == 'audio' | |
}, | |
"openai_available": bool(OPENAI_API_KEY), | |
"langdetect_available": True, | |
"deep_translator_available": True | |
} | |
async def get_catalog(): | |
"""Serve the complete product catalog PDF""" | |
try: | |
catalog_path = "static/Hydropex.pdf" | |
if os.path.exists(catalog_path): | |
return FileResponse( | |
catalog_path, | |
media_type="application/pdf", | |
filename="Apex_Biotical_Veterinary_Catalog.pdf" | |
) | |
else: | |
raise HTTPException(status_code=404, detail="Catalog PDF not found") | |
except Exception as e: | |
logger.error(f"Error serving catalog: {e}") | |
raise HTTPException(status_code=500, detail="Error serving catalog") | |
async def root(): | |
return """ | |
<h2>Apex Biotical Veterinary WhatsApp Assistant</h2> | |
<p>The Assistant is running! Use the API endpoints for WhatsApp integration.</p> | |
<ul> | |
<h2>Apex Biotical Veterinary WhatsApp Bot</h2> | |
<p>The bot is running! Use the API endpoints for WhatsApp integration.</p> | |
<ul> | |
<li><b>POST /webhook</b> – WhatsApp webhook endpoint</li> | |
<li><b>GET /health</b> – Health check</li> | |
<li><b>GET /catalog</b> – Download product catalog PDF</li> | |
</ul> | |
""" | |
# --- Webhook Endpoint for WhatsApp/WhatsJet --- | |
async def webhook(request: Request): | |
"""Handle incoming WhatsApp/WhatsJet webhook messages""" | |
try: | |
data = await request.json() | |
logger.info(f"[Webhook] Incoming data: {data}") | |
# WhatsJet/Custom format | |
if isinstance(data, dict) and 'contact' in data and 'message' in data: | |
from_number = str(data['contact'].get('phone_number', '')).replace('+', '').replace(' ', '') | |
msg = data['message'] | |
# Robust media type extraction | |
media = msg.get('media', {}) if isinstance(msg, dict) else {} | |
media_type = None | |
if isinstance(media, dict): | |
media_type = media.get('type') | |
# If media is a list or None, media_type stays None | |
# Check for voice/audio messages first (they might not have body) | |
if isinstance(msg, dict) and (msg.get('type') in ['audio', 'voice'] or media_type == 'audio'): | |
logger.info(f"[Webhook] Processing voice message from {from_number}") | |
await process_incoming_message(from_number, msg) | |
return Response(status_code=200) | |
# Ignore status updates and messages without body (only for non-voice messages) | |
if not isinstance(msg, dict) or msg.get('body') is None: | |
return Response(status_code=200) | |
# Ignore specific status updates | |
if msg.get('status') in ['delivered', 'sent', 'read', 'failed']: | |
return Response(status_code=200) | |
# Process actual message | |
await process_incoming_message(from_number, msg) | |
return Response(status_code=200) | |
# WhatsApp Cloud API format | |
if isinstance(data, dict) and 'entry' in data and isinstance(data['entry'], list): | |
for entry in data['entry']: | |
if not isinstance(entry, dict): | |
logger.error(f"[Webhook] entry is not a dict: {type(entry)}") | |
continue | |
changes = entry.get('changes', []) | |
if not isinstance(changes, list): | |
logger.error(f"[Webhook] changes is not a list: {type(changes)}") | |
continue | |
for change in changes: | |
if not isinstance(change, dict): | |
logger.error(f"[Webhook] change is not a dict: {type(change)}") | |
continue | |
value = change.get('value', {}) | |
if not isinstance(value, dict): | |
logger.error(f"[Webhook] value is not a dict: {type(value)}") | |
continue | |
messages = value.get('messages', []) | |
if not isinstance(messages, list): | |
logger.error(f"[Webhook] messages is not a list: {type(messages)}") | |
continue | |
for message in messages: | |
if not isinstance(message, dict): | |
logger.error(f"[Webhook] message is not a dict: {type(message)}") | |
continue | |
from_number = message.get('from', '') | |
# Ignore status updates | |
if message.get('type') == 'status': | |
continue | |
# Convert WhatsApp format to our format | |
msg = { | |
'body': message.get('text', {}).get('body', ''), | |
'type': message.get('type', 'text'), | |
'media': message.get('audio') or message.get('voice') or message.get('image') or message.get('document') | |
} | |
await process_incoming_message(from_number, msg) | |
return Response(status_code=200) | |
logger.warning(f"[Webhook] Unrecognized or malformed payload format: {type(data)}") | |
return Response(status_code=400) | |
except Exception as e: | |
logger.error(f"[Webhook] Error: {e}") | |
import traceback | |
logger.error(f"[Webhook] Traceback: {traceback.format_exc()}") | |
return Response(status_code=500) | |
def map_spoken_number_to_digit(text: str) -> str: | |
""" | |
Enhanced number mapping for voice input - supports both English and Urdu number systems | |
Handles various transcription errors and number formats | |
""" | |
if not text: | |
return "" | |
# Clean and normalize the text | |
text_lower = text.lower().strip() | |
text_clean = re.sub(r'[^\w\s]', '', text_lower) | |
# Comprehensive English number mappings | |
english_numbers = { | |
# Basic numbers | |
'one': '1', 'two': '2', 'three': '3', 'four': '4', 'five': '5', | |
'six': '6', 'seven': '7', 'eight': '8', 'nine': '9', 'ten': '10', | |
'eleven': '11', 'twelve': '12', 'thirteen': '13', 'fourteen': '14', 'fifteen': '15', | |
'sixteen': '16', 'seventeen': '17', 'eighteen': '18', 'nineteen': '19', 'twenty': '20', | |
'twenty one': '21', 'twenty two': '22', 'twenty three': '23', | |
# Common transcription errors | |
'won': '1', 'to': '2', 'too': '2', 'tree': '3', 'free': '3', 'for': '4', 'fiv': '5', | |
'sik': '6', 'sat': '7', 'ath': '8', 'nau': '9', 'das': '10', | |
'che': '3', 'fir': '4', 'fiv': '5', 'sik': '6', 'sat': '7', 'ath': '8', 'nau': '9', | |
# Ordinal numbers | |
'first': '1', 'second': '2', 'third': '3', 'fourth': '4', 'fifth': '5', | |
'sixth': '6', 'seventh': '7', 'eighth': '8', 'ninth': '9', 'tenth': '10', | |
# Menu variations | |
'option one': '1', 'option two': '2', 'option three': '3', 'option four': '4', 'option five': '5', | |
'number one': '1', 'number two': '2', 'number three': '3', 'number four': '4', 'number five': '5', | |
'menu one': '1', 'menu two': '2', 'menu three': '3', 'menu four': '4', 'menu five': '5', | |
'choice one': '1', 'choice two': '2', 'choice three': '3', 'choice four': '4', 'choice five': '5', | |
# Common transcription errors for menu selections | |
'opium one': '1', 'opium two': '2', 'opium three': '3', 'opium four': '4', 'opium five': '5', | |
'opium numara one': '1', 'opium numara two': '2', 'opium numara three': '3', | |
'opium number one': '1', 'opium number two': '2', 'opium number three': '3', | |
'opium number 1': '1', 'opium number 2': '2', 'opium number 3': '3', | |
# Direct digits | |
'1': '1', '2': '2', '3': '3', '4': '4', '5': '5', '6': '6', '7': '7', '8': '8', '9': '9', '10': '10', | |
'11': '11', '12': '12', '13': '13', '14': '14', '15': '15', '16': '16', '17': '17', '18': '18', '19': '19', '20': '20', | |
'21': '21', '22': '22', '23': '23' | |
} | |
# Comprehensive Urdu number mappings (Roman Urdu and Urdu script) | |
urdu_numbers = { | |
# Roman Urdu numbers | |
'aik': '1', 'ek': '1', 'do': '2', 'teen': '3', 'char': '4', 'panch': '5', | |
'che': '6', 'sat': '7', 'ath': '8', 'nau': '9', 'das': '10', | |
'gyara': '11', 'bara': '12', 'tera': '13', 'choda': '14', 'pandra': '15', | |
'sola': '16', 'satara': '17', 'athara': '18', 'unnees': '19', 'bees': '20', | |
'ikkees': '21', 'baees': '22', 'tees': '23', | |
# Urdu script numbers | |
'ایک': '1', 'دو': '2', 'تین': '3', 'چار': '4', 'پانچ': '5', | |
'چھ': '6', 'سات': '7', 'آٹھ': '8', 'نو': '9', 'دس': '10', | |
'گیارہ': '11', 'بارہ': '12', 'تیرہ': '13', 'چودہ': '14', 'پندرہ': '15', | |
'سولہ': '16', 'سترہ': '17', 'اٹھارہ': '18', 'انیس': '19', 'بیس': '20', | |
'اکیس': '21', 'بائیس': '22', 'تئیس': '23', | |
# Menu variations in Urdu | |
'نمبر ایک': '1', 'نمبر دو': '2', 'نمبر تین': '3', 'نمبر چار': '4', 'نمبر پانچ': '5', | |
'آپشن ایک': '1', 'آپشن دو': '2', 'آپشن تین': '3', 'آپشن چار': '4', 'آپشن پانچ': '5', | |
'اختیار ایک': '1', 'اختیار دو': '2', 'اختیار تین': '3', 'اختیار چار': '4', 'اختیار پانچ': '5', | |
# Common transcription errors in Urdu | |
'numara': 'number', 'numbara': 'number', 'numbra': 'number', | |
'numbra one': '1', 'numbra two': '2', 'numbra three': '3', 'numbra 1': '1', 'numbra 2': '2', 'numbra 3': '3', | |
'aik': '1', 'ek': '1', 'do': '2', 'teen': '3', 'char': '4', 'panch': '5', | |
'che': '6', 'sat': '7', 'ath': '8', 'nau': '9', 'das': '10' | |
} | |
# Combined mappings | |
all_numbers = {**english_numbers, **urdu_numbers} | |
# First, try exact matches | |
if text_lower in all_numbers: | |
return all_numbers[text_lower] | |
# Try pattern matching for common transcription errors - improved patterns | |
patterns = [ | |
(r'opium\s+numara?\s*(\d+)', r'\1'), # "opium numara 1" -> "1" | |
(r'opium\s+number?\s*(\d+)', r'\1'), # "opium number 1" -> "1" | |
(r'opium\s+(\d+)', r'\1'), # "opium 1" -> "1" | |
(r'numara?\s*(\d+)', r'\1'), # "numara 1" -> "1" | |
(r'number?\s*(\d+)\s*[.!]?', r'\1'), # "number 1" or "number 1." -> "1" - improved | |
(r'option\s*(\d+)\s*[.!]?', r'\1'), # "option 1" or "option 1." -> "1" - improved | |
(r'choice\s*(\d+)\s*[.!]?', r'\1'), # "choice 1" or "choice 1." -> "1" - improved | |
(r'menu\s*(\d+)\s*[.!]?', r'\1'), # "menu 1" or "menu 1." -> "1" - improved | |
(r'(\d+)\s*[.!]?\s*$', r'\1'), # "22." -> "22" - improved | |
(r'^(\d+)\s*[.!]?\s*', r'\1'), # "22." -> "22" - improved | |
] | |
for pattern, replacement in patterns: | |
match = re.search(pattern, text_lower) | |
if match: | |
return match.group(1) | |
# Try fuzzy matching for close matches | |
for number_word, digit in all_numbers.items(): | |
if len(number_word) > 2: # Only fuzzy match longer words | |
if fuzz.ratio(text_lower, number_word) > 80: | |
logger.info(f"Fuzzy matched '{text_lower}' to '{number_word}' -> '{digit}'") | |
return digit | |
# Try extracting numbers from mixed text | |
number_match = re.search(r'(\d+)', text_clean) | |
if number_match: | |
return number_match.group(1) | |
# If no match found, return original text | |
logger.warning(f"No number mapping found for: '{text}'") | |
return text | |
def process_intelligent_voice_command(message_body: str, current_state: str, user_context: dict) -> str: | |
""" | |
Process voice commands intelligently for all menu states | |
Maps voice commands to appropriate menu selections consistently with text logic | |
""" | |
if not message_body: | |
return message_body | |
# Clean and normalize the input | |
cleaned_text = message_body.strip().lower() | |
logger.info(f"[Voice Command] Processing: '{message_body}' in state: {current_state}") | |
# First, check for navigation commands (main, menu, back, etc.) | |
# Make this more precise to avoid false positives from transcription errors | |
navigation_commands = [ | |
'main', 'menu', 'start', 'home', 'back', 'return', 'go back', 'main menu', | |
'مین', 'مینو', 'شروع', 'گھر', 'واپس', 'ریٹرن', 'مین مینو', | |
'main menu', 'main menu please', 'go to main', 'back to main' | |
] | |
# Check for exact navigation commands or commands that start/end with navigation words | |
for cmd in navigation_commands: | |
# Check for exact match | |
if cleaned_text == cmd: | |
logger.info(f"[Voice Command] Exact navigation command detected: '{message_body}' -> 'main'") | |
return 'main' | |
# Check for commands that start with navigation word followed by space | |
if cleaned_text.startswith(cmd + ' '): | |
logger.info(f"[Voice Command] Navigation command at start detected: '{message_body}' -> 'main'") | |
return 'main' | |
# Check for commands that end with navigation word preceded by space | |
if cleaned_text.endswith(' ' + cmd): | |
logger.info(f"[Voice Command] Navigation command at end detected: '{message_body}' -> 'main'") | |
return 'main' | |
# Check for standalone navigation commands (surrounded by spaces or at boundaries) | |
if re.search(r'\b' + re.escape(cmd) + r'\b', cleaned_text): | |
# Additional check: make sure it's not part of a larger word | |
words = cleaned_text.split() | |
if cmd in words: | |
logger.info(f"[Voice Command] Navigation command as word detected: '{message_body}' -> 'main'") | |
return 'main' | |
# Handle number patterns more comprehensively | |
# Pattern 1: "Number X" or "Number X." or "Number X!" - more flexible | |
number_pattern1 = re.search(r'number\s*(\d+)\s*[.!]?', cleaned_text) | |
if number_pattern1: | |
number = number_pattern1.group(1) | |
logger.info(f"[Voice Command] Number pattern 1 detected: '{message_body}' -> '{number}'") | |
return number | |
# Pattern 2: "Option X" or "Option X." or "Option X!" - more flexible | |
option_pattern = re.search(r'option\s*(\d+)\s*[.!]?', cleaned_text) | |
if option_pattern: | |
number = option_pattern.group(1) | |
logger.info(f"[Voice Command] Option pattern detected: '{message_body}' -> '{number}'") | |
return number | |
# Pattern 3: "Product X" or "Product X." or "Product X!" - more flexible | |
product_pattern = re.search(r'product\s*(\d+)\s*[.!]?', cleaned_text) | |
if product_pattern: | |
number = product_pattern.group(1) | |
logger.info(f"[Voice Command] Product pattern detected: '{message_body}' -> '{number}'") | |
return number | |
# Pattern 4: "Category X" or "Category X." or "Category X!" - more flexible | |
category_pattern = re.search(r'category\s*(\d+)\s*[.!]?', cleaned_text) | |
if category_pattern: | |
number = category_pattern.group(1) | |
logger.info(f"[Voice Command] Category pattern detected: '{message_body}' -> '{number}'") | |
return number | |
# Pattern 5: Just a number at the end or beginning - more flexible | |
# Look for numbers at the end of the sentence | |
number_pattern2 = re.search(r'(\d+)\s*[.!]?\s*$', cleaned_text) | |
if number_pattern2: | |
number = number_pattern2.group(1) | |
logger.info(f"[Voice Command] Number pattern 2 detected: '{message_body}' -> '{number}'") | |
return number | |
# Pattern 6: Just a number at the beginning - more flexible | |
number_pattern3 = re.search(r'^(\d+)\s*[.!]?\s*', cleaned_text) | |
if number_pattern3: | |
number = number_pattern3.group(1) | |
logger.info(f"[Voice Command] Number pattern 3 detected: '{message_body}' -> '{number}'") | |
return number | |
# Pattern 7: Standalone number | |
if cleaned_text.isdigit(): | |
logger.info(f"[Voice Command] Standalone number detected: '{message_body}' -> '{message_body}'") | |
return message_body | |
# Pattern 8: Extract any number from the text (fallback) | |
any_number_pattern = re.search(r'(\d+)', cleaned_text) | |
if any_number_pattern: | |
number = any_number_pattern.group(1) | |
logger.info(f"[Voice Command] Any number pattern detected: '{message_body}' -> '{number}'") | |
return number | |
# Handle spoken numbers in English and Urdu | |
spoken_number_mappings = { | |
# English spoken numbers | |
'one': '1', 'first': '1', '1st': '1', | |
'two': '2', 'second': '2', '2nd': '2', 'to': '2', 'too': '2', | |
'three': '3', 'third': '3', '3rd': '3', 'tree': '3', | |
'four': '4', 'fourth': '4', '4th': '4', 'for': '4', | |
'five': '5', 'fifth': '5', '5th': '5', | |
'six': '6', 'sixth': '6', '6th': '6', | |
'seven': '7', 'seventh': '7', '7th': '7', | |
'eight': '8', 'eighth': '8', '8th': '8', | |
'nine': '9', 'ninth': '9', '9th': '9', | |
'ten': '10', 'tenth': '10', '10th': '10', | |
'eleven': '11', 'eleventh': '11', '11th': '11', | |
'twelve': '12', 'twelfth': '12', '12th': '12', | |
'thirteen': '13', 'thirteenth': '13', '13th': '13', | |
'fourteen': '14', 'fourteenth': '14', '14th': '14', | |
'fifteen': '15', 'fifteenth': '15', '15th': '15', | |
'sixteen': '16', 'sixteenth': '16', '16th': '16', | |
'seventeen': '17', 'seventeenth': '17', '17th': '17', | |
'eighteen': '18', 'eighteenth': '18', '18th': '18', | |
'nineteen': '19', 'nineteenth': '19', '19th': '19', | |
'twenty': '20', 'twentieth': '20', '20th': '20', | |
'twenty one': '21', 'twenty-first': '21', '21st': '21', | |
'twenty two': '22', 'twenty-second': '22', '22nd': '22', | |
'twenty three': '23', 'twenty-third': '23', '23rd': '23', | |
# Urdu spoken numbers | |
'ایک': '1', 'پہلا': '1', 'پہلی': '1', | |
'دو': '2', 'دوسرا': '2', 'دوسری': '2', | |
'تین': '3', 'تیسرا': '3', 'تیسری': '3', | |
'چار': '4', 'چوتھا': '4', 'چوتھی': '4', | |
'پانچ': '5', 'پانچواں': '5', 'پانچویں': '5', | |
'چھ': '6', 'چھٹا': '6', 'چھٹی': '6', | |
'سات': '7', 'ساتواں': '7', 'ساتویں': '7', | |
'آٹھ': '8', 'آٹھواں': '8', 'آٹھویں': '8', | |
'نو': '9', 'نواں': '9', 'نویں': '9', | |
'دس': '10', 'دسواں': '10', 'دسویں': '10', | |
'گیارہ': '11', 'گیارہواں': '11', 'گیارہویں': '11', | |
'بارہ': '12', 'بارہواں': '12', 'بارہویں': '12', | |
'تیرہ': '13', 'تیرہواں': '13', 'تیرہویں': '13', | |
'چودہ': '14', 'چودہواں': '14', 'چودہویں': '14', | |
'پندرہ': '15', 'پندرہواں': '15', 'پندرہویں': '15', | |
'سولہ': '16', 'سولہواں': '16', 'سولہویں': '16', | |
'سترہ': '17', 'سترہواں': '17', 'سترہویں': '17', | |
'اٹھارہ': '18', 'اٹھارہواں': '18', 'اٹھارہویں': '18', | |
'انیس': '19', 'انیسواں': '19', 'انیسویں': '19', | |
'بیس': '20', 'بیسواں': '20', 'بیسویں': '20', | |
'اکیس': '21', 'اکیسواں': '21', 'اکیسویں': '21', | |
'بائیس': '22', 'بائیسواں': '22', 'بائیسویں': '22', | |
'تئیس': '23', 'تئیسواں': '23', 'تئیسویں': '23', | |
} | |
# Check for spoken numbers | |
for spoken, digit in spoken_number_mappings.items(): | |
if spoken in cleaned_text: | |
logger.info(f"[Voice Command] Spoken number detected: '{message_body}' -> '{digit}'") | |
return digit | |
# Handle common transcription errors and variations | |
transcription_fixes = { | |
'bye': 'hi', # Common transcription error for "hi" | |
'hi': 'hi', | |
'hello': 'hi', | |
'hey': 'hi', | |
'main': 'main', | |
'menu': 'main', | |
'start': 'main', | |
'home': 'main', | |
'back': 'main', | |
'return': 'main', | |
'go back': 'main', | |
'main menu': 'main', | |
'main menu please': 'main', | |
'go to main': 'main', | |
'back to main': 'main', | |
} | |
# Check for transcription fixes | |
for error, correction in transcription_fixes.items(): | |
if error in cleaned_text: | |
logger.info(f"[Voice Command] Transcription fix applied: '{message_body}' -> '{correction}'") | |
return correction | |
# If no pattern matches, return the original message for further processing | |
logger.info(f"[Voice Command] No specific pattern matched, returning original: '{message_body}'") | |
return message_body | |
async def process_incoming_message(from_number: str, msg: dict): | |
"""Process incoming message and send appropriate response with full intelligence""" | |
try: | |
# Safety check for message body | |
message_body = msg.get('body') if isinstance(msg, dict) else None | |
message_type = msg.get('type', 'text') if isinstance(msg, dict) else 'text' | |
reply_language = msg.get('reply_language', 'en') # Default to English | |
# Robust media type extraction | |
media = msg.get('media', {}) if isinstance(msg, dict) else {} | |
media_type = None | |
if isinstance(media, dict): | |
media_type = media.get('type') | |
# If media is a list or None, media_type stays None | |
# Handle voice messages FIRST - before checking message_body | |
if message_type in ['audio', 'voice'] or media_type == 'audio': | |
logger.info(f"[Process] Processing voice message from {from_number}") | |
await handle_voice_message_complete(from_number, msg) | |
return | |
# For text messages, check if body exists | |
if message_body is None: | |
logger.info(f"[Process] Skipping message from {from_number} - no body content") | |
return | |
message_body = message_body.strip() | |
logger.info(f"[Process] Processing {message_type} message from {from_number}: {message_body}") | |
# Get user context | |
user_context = context_manager.get_context(from_number) | |
current_state = user_context.get('current_state', 'main_menu') | |
# Update context with last message for intelligent responses | |
context_manager.update_context(from_number, last_message=message_body) | |
# Debug logging | |
logger.info(f"[Process] Current state: {current_state}, Message: '{message_body}' from {from_number}") | |
# Handle text messages | |
if not message_body: | |
return | |
# 🎯 PRIORITY 1: Check for greetings FIRST (before any other processing) | |
if is_greeting(message_body): | |
logger.info(f"[Process] Greeting detected: '{message_body}' -> showing welcome message") | |
# Always show welcome message for greetings, regardless of current state | |
welcome_msg = generate_veterinary_welcome_message() | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
return | |
# 🎯 PRIORITY 2: Navigation commands - work from ANY state | |
# Check for "main" command - now works for both text and voice | |
if current_state != 'main_menu' and current_state != 'ai_chat_mode': # Only check for main if not already in main menu and not in AI chat mode | |
mapped_navigation = process_intelligent_voice_command(message_body, current_state, user_context) | |
if mapped_navigation == 'main': | |
logger.info(f"[Process] Navigation command detected: '{message_body}' -> 'main'") | |
welcome_msg = generate_veterinary_welcome_message() | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
return | |
# Also check for text-based navigation commands | |
if message_body.lower() in ['main', 'menu', 'start', 'home', 'back']: | |
welcome_msg = generate_veterinary_welcome_message() | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
return | |
# 🎯 PRIORITY 3: State-specific handling (contact_request, availability_request, ai_chat_mode) | |
if current_state == 'contact_request': | |
await handle_contact_request_response(from_number, message_body) | |
return | |
elif current_state == 'availability_request': | |
await handle_availability_request_response(from_number, message_body) | |
return | |
elif current_state == 'ai_chat_mode': | |
await handle_ai_chat_mode(from_number, message_body, reply_language) | |
return | |
# 🎯 PRIORITY 4: Menu selections - check if this is a valid menu selection for current state | |
if current_state in ['main_menu', 'category_selection_menu', 'category_products_menu', 'all_products_menu', 'product_inquiry', 'intelligent_products_menu']: | |
# Validate menu selection | |
is_valid, error_msg = validate_menu_selection(message_body, current_state, user_context) | |
if is_valid: | |
# Handle valid menu selection | |
if current_state == 'main_menu': | |
if message_body == '1': | |
# Search Products | |
await display_all_products(from_number) | |
elif message_body == '2': | |
# Browse Categories | |
categories = get_all_categories() | |
if categories: | |
context_manager.update_context( | |
from_number, | |
current_state='category_selection_menu', | |
current_menu='category_selection_menu', | |
current_menu_options=categories, | |
available_categories=categories | |
) | |
message = "📁 *Select a Category:*\n\n" | |
for i, category in enumerate(categories, 1): | |
message += f"{format_number_with_emoji(i)} {category}\n" | |
message += "\n💬 Type a category number or 'main' to return to main menu." | |
send_whatsjet_message(from_number, message) | |
else: | |
send_whatsjet_message(from_number, "❌ No categories available. Type 'main' to return to main menu.") | |
elif message_body == '3': | |
# Download Catalog | |
await send_catalog_pdf(from_number) | |
elif message_body == '4': | |
# AI Chat Mode | |
context_manager.update_context( | |
from_number, | |
current_state='ai_chat_mode', | |
current_menu='ai_chat_mode', | |
current_menu_options=['main'], | |
reply_language='ur' | |
) | |
message = ( | |
"🤖 ویٹرنری اے آئی اسسٹنٹ\n\n" | |
"آپ مجھ سے پوچھ سکتے ہیں:\n" | |
"• ویٹرنری سوالات\n" | |
"• پروڈکٹ کی سفارشات\n" | |
"• علاج کے مشورے\n" | |
"• عمومی معلومات\n\n" | |
"💬 'main' لکھ کر مین مینو پر واپس جائیں۔" | |
) | |
send_whatsjet_message(from_number, message) | |
elif current_state == 'category_selection_menu': | |
await handle_category_selection(message_body, from_number) | |
elif current_state == 'category_products_menu': | |
# Handle product selection from category | |
available_products = user_context.get('available_products', []) | |
if message_body.isdigit() and 1 <= int(message_body) <= len(available_products): | |
selected_product = available_products[int(message_body) - 1] | |
context_manager.update_context( | |
from_number, | |
current_product=selected_product, | |
current_state='product_inquiry', | |
current_menu='product_inquiry', | |
current_menu_options=list(MENU_CONFIG['product_inquiry']['option_descriptions'].values()) | |
) | |
await send_product_image_with_caption(from_number, selected_product, user_context) | |
else: | |
send_whatsjet_message(from_number, get_menu_validation_message(current_state, user_context)) | |
elif current_state == 'all_products_menu': | |
# Handle product selection from all products | |
if products_df is not None and not products_df.empty: | |
all_products = products_df.to_dict('records') | |
if message_body.isdigit() and 1 <= int(message_body) <= len(all_products): | |
selected_product = all_products[int(message_body) - 1] | |
context_manager.update_context( | |
from_number, | |
current_product=selected_product, | |
current_state='product_inquiry', | |
current_menu='product_inquiry', | |
current_menu_options=list(MENU_CONFIG['product_inquiry']['option_descriptions'].values()) | |
) | |
await send_product_image_with_caption(from_number, selected_product, user_context) | |
else: | |
send_whatsjet_message(from_number, get_menu_validation_message(current_state, user_context)) | |
else: | |
send_whatsjet_message(from_number, "❌ No products available. Type 'main' to return to main menu.") | |
elif current_state == 'product_inquiry': | |
await handle_veterinary_product_followup(message_body, from_number) | |
elif current_state == 'intelligent_products_menu': | |
# Handle product selection from intelligent products menu | |
available_products = user_context.get('available_products', []) | |
if message_body.isdigit() and 1 <= int(message_body) <= len(available_products): | |
selected_product = available_products[int(message_body) - 1] | |
context_manager.update_context( | |
from_number, | |
current_product=selected_product, | |
current_state='product_inquiry', | |
current_menu='product_inquiry', | |
current_menu_options=list(MENU_CONFIG['product_inquiry']['option_descriptions'].values()) | |
) | |
await send_product_image_with_caption(from_number, selected_product, user_context) | |
return | |
else: | |
send_whatsjet_message(from_number, get_menu_validation_message(current_state, user_context)) | |
return | |
return # Exit after handling menu selection | |
# 🎯 PRIORITY 5: Check for company/about queries first (before product search) | |
query_lower = message_body.lower().strip() | |
if any(keyword in query_lower for keyword in ['apex', 'company', 'about', 'who', 'what is']): | |
# Use OpenAI for company information | |
await handle_general_query_with_ai(from_number, message_body, user_context, reply_language) | |
return | |
# 🎯 PRIORITY 6: Check for product-specific questions (mode of action, dosage, etc.) | |
product_question_keywords = ['mode of action', 'dosage', 'administration', 'composition', 'indications', 'precautions', 'storage', 'how to use', 'side effects'] | |
if any(keyword in query_lower for keyword in product_question_keywords): | |
# Use OpenAI for product-specific questions | |
await handle_general_query_with_ai(from_number, message_body, user_context, reply_language) | |
return | |
# 🎯 PRIORITY 7: Product names - works from ANY menu state | |
# This ensures users can say product names like "hydropex", "respira aid plus", etc. from any menu | |
logger.info(f"[Process] Checking for product name in message: '{message_body}' from state: {current_state}") | |
products = get_veterinary_product_matches(message_body) | |
# --- NEW LOGIC: Check for exact match first --- | |
normalized_input = normalize(message_body).lower().strip() | |
exact_match = None | |
for product in products: | |
product_name = product.get('Product Name', '') | |
normalized_product_name = normalize(product_name).lower().strip() | |
if normalized_product_name == normalized_input: | |
exact_match = product | |
break | |
if exact_match: | |
logger.info(f"[Process] Exact product match found: {exact_match.get('Product Name', 'Unknown')}") | |
context_manager.update_context( | |
from_number, | |
current_product=exact_match, | |
current_state='product_inquiry', | |
current_menu='product_inquiry', | |
current_menu_options=list(MENU_CONFIG['product_inquiry']['option_descriptions'].values()) | |
) | |
# Only send one reply: image+caption if possible, else text | |
await send_product_image_with_caption(from_number, exact_match, user_context) | |
return | |
# --- END NEW LOGIC --- | |
if products: | |
logger.info(f"[Process] Product name detected: '{message_body}' -> Found {len(products)} products") | |
# Check if this is a specific product name search or a category/symptom search | |
is_specific_product = False | |
# Check for exact product name match (indicating specific product search) | |
normalized_input = normalize(message_body).lower().strip() | |
for product in products: | |
product_name = product.get('Product Name', '') | |
normalized_product_name = normalize(product_name).lower().strip() | |
if normalized_product_name == normalized_input: | |
is_specific_product = True | |
break | |
# If it's a specific product name, show only that product | |
if is_specific_product and len(products) == 1: | |
selected_product = products[0] | |
product_name = selected_product.get('Product Name', 'Unknown') | |
logger.info(f"[Process] Specific product found: {product_name}") | |
context_manager.update_context( | |
from_number, | |
current_product=selected_product, | |
current_state='product_inquiry', | |
current_menu='product_inquiry', | |
current_menu_options=list(MENU_CONFIG['product_inquiry']['option_descriptions'].values()) | |
) | |
await send_product_image_with_caption(from_number, selected_product, user_context) | |
return | |
# If it's a category/symptom search with multiple products, show all products | |
else: | |
logger.info(f"[Process] Category/symptom search with {len(products)} products") | |
# Use intelligent product inquiry to show all matching products | |
await handle_intelligent_product_inquiry(from_number, message_body, user_context, reply_language) | |
return | |
else: | |
# Check for specific query types before falling back to generic response | |
query_lower = message_body.lower().strip() | |
# Check for company/about queries | |
if any(keyword in query_lower for keyword in ['apex', 'company', 'about', 'who', 'what is']): | |
# Use OpenAI for company information | |
await handle_general_query_with_ai(from_number, message_body, user_context, reply_language) | |
return | |
# Check for product-specific questions (mode of action, dosage, etc.) | |
product_question_keywords = ['mode of action', 'dosage', 'administration', 'composition', 'indications', 'precautions', 'storage', 'how to use', 'side effects'] | |
if any(keyword in query_lower for keyword in product_question_keywords): | |
# Use OpenAI for product-specific questions | |
await handle_general_query_with_ai(from_number, message_body, user_context, reply_language) | |
return | |
# Check for general veterinary questions | |
veterinary_keywords = ['weather', 'temperature', 'disease', 'symptoms', 'treatment', 'prevention', 'vaccination', 'nutrition', 'health'] | |
if any(keyword in query_lower for keyword in veterinary_keywords): | |
# Simple redirect for non-veterinary topics | |
send_whatsjet_message(from_number, "❌ Please ask the correct question related to Apex Biotical Solutions or type 'main' to go to main menu.") | |
return | |
# Simple one-liner for wrong queries | |
send_whatsjet_message(from_number, "❌ Please correct your question or type 'main' to go to main menu.") | |
# 🎯 PRIORITY 8: Default: treat as general query with intelligent product inquiry | |
await handle_intelligent_product_inquiry(from_number, message_body, user_context, reply_language) | |
except Exception as e: | |
logger.error(f"Error in process_incoming_message: {e}") | |
# Instead of sending a generic error, return to main menu | |
welcome_msg = generate_veterinary_welcome_message() | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context(from_number, current_state='main_menu', current_menu='main_menu', current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values())) | |
async def handle_general_query_with_ai(from_number: str, query: str, user_context: dict, reply_language: str = 'en'): | |
"""Handle general queries with OpenAI intelligence""" | |
reply_language = 'ur' | |
logger.info(f"[AI General] Forcing reply_language to Urdu for Option 4.") | |
try: | |
if not openai.api_key: | |
send_whatsjet_message(from_number, | |
"❌ AI assistance is not available. Please try searching for a specific product or type 'main' for the menu.") | |
return | |
current_state = user_context.get('current_state', 'main_menu') | |
current_product = user_context.get('current_product') | |
prompt = f""" | |
You are a professional veterinary product assistant for Apex Biotical Solutions, helping users on WhatsApp. | |
Always answer in a clear, accurate, and helpful manner with proper formatting and emojis. | |
User Query: "{query}" | |
Current State: {current_state} | |
Current Product: {current_product.get('Product Name', 'None') if current_product else 'None'} | |
IMPORTANT INSTRUCTIONS: | |
1. If the user asks about "Apex" or "Apex Biotical" - provide a brief, professional overview of Apex Biotical Solutions as a veterinary pharmaceutical company, including their expertise, product range, and commitment to animal health. Keep it concise and welcoming. | |
2. If the user asks about specific product details (mode of action, dosage, administration, composition, etc.) - search through the veterinary products database and provide detailed, accurate information about the specific product mentioned. If the product exists in the database, provide comprehensive details. If not found, suggest similar products. | |
3. If the user asks about products (e.g., 'poultry products', 'respiratory medicine'), list ALL relevant products from the database with a short description for each. | |
4. If the user asks a general veterinary question, provide a concise, expert answer. | |
5. Always keep responses professional, concise, and user-friendly with proper formatting. | |
6. Use emojis and bullet points for better readability. | |
7. If you don't have specific information, say so clearly and suggest alternatives. | |
CRITICAL NAMING RULES: | |
- ALWAYS keep company name "Apex Biotical Solutions" in English, even in Urdu responses | |
- ALWAYS keep product names in English (e.g., "EC-Immune", "Hydropex", "Heposel") | |
- ALWAYS keep technical terms in English when possible | |
- Only translate descriptive text, not proper nouns or brand names | |
Available Products Database: {products_df.to_dict('records') if products_df is not None else 'No products loaded'} | |
RESPONSE FORMAT: | |
- Keep responses concise and to the point | |
- Use emojis sparingly but effectively | |
- Avoid long titles or headers | |
- Focus on providing accurate, helpful information | |
- Preserve English company names and product names | |
""" | |
response = openai.ChatCompletion.create( | |
model="gpt-4o", | |
messages=[{"role": "user", "content": prompt}], | |
temperature=0.7, | |
max_tokens=300 | |
) | |
ai_response = response.choices[0].message.content.strip() | |
if reply_language == 'ur': | |
try: | |
# Get all product and category names for restoration | |
product_names = [] | |
category_names = [] | |
if products_df is not None and not products_df.empty: | |
product_names = [str(p.get('Product Name', '')) for p in products_df.to_dict('records') if p.get('Product Name')] | |
category_names = list(set([str(p.get('Category', '')) for p in products_df.to_dict('records') if p.get('Category')])) | |
# Add company names to preserve in English | |
company_names = ['Apex Biotical Solutions', 'Apex Biotical', 'Apex'] | |
translated_response = GoogleTranslator(source='auto', target='ur').translate(ai_response) | |
# Restore English terms including company names | |
translated_response = restore_english_terms(translated_response, ai_response, product_names + company_names, category_names) | |
send_whatsjet_message(from_number, translated_response) | |
except Exception as e: | |
logger.error(f"[AI] Translation error: {e}") | |
send_whatsjet_message(from_number, ai_response) | |
else: | |
send_whatsjet_message(from_number, ai_response) | |
if context_manager: | |
context_manager.add_to_history(from_number, query, ai_response) | |
except Exception as e: | |
logger.error(f"[AI] Error handling general query: {e}") | |
send_whatsjet_message(from_number, "❌ AI Assistant encountered an error. Please try again or type 'main' to return to main menu.") | |
async def handle_contact_request(from_number: str): | |
"""Handle contact request""" | |
try: | |
message = ( | |
"📞 *Contact Information*\n\n" | |
"Please provide your details:\n" | |
"• Name and location\n" | |
"• Phone number\n" | |
"• Specific inquiry\n\n" | |
"💬 *Example:* Dr. Ali - Multan - Need consultation for liver disease\n\n" | |
"💬 *Type 'main' to return to the main menu.*" | |
) | |
send_whatsjet_message(from_number, message) | |
context_manager.update_context( | |
from_number, | |
current_state='contact_request', | |
current_menu='contact_request', | |
current_menu_options=['Provide contact details'] | |
) | |
except Exception as e: | |
logger.error(f"[Contact] Error handling contact request: {e}") | |
# Instead of sending a generic error, return to main menu | |
welcome_msg = generate_veterinary_welcome_message() | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
async def handle_contact_request_response(from_number: str, response: str): | |
"""Handle contact request response""" | |
try: | |
# Save contact inquiry | |
contact_data = { | |
'phone_number': from_number, | |
'inquiry': response, | |
'timestamp': datetime.now().isoformat() | |
} | |
# Ensure directory exists | |
os.makedirs('contacts', exist_ok=True) | |
with open('contacts/contact_inquiries.json', 'a', encoding='utf-8') as f: | |
f.write(json.dumps(contact_data, ensure_ascii=False) + '\n') | |
# Send inquiry to receiving number (admin) | |
receiving_number = "923102288328" | |
# Parse the response to separate name/location from details | |
response_lines = response.strip().split('\n') | |
if len(response_lines) >= 2: | |
name_location = response_lines[0].strip() | |
details = '\n'.join(response_lines[1:]).strip() | |
else: | |
# If only one line, assume it's all name/location | |
name_location = response.strip() | |
details = "No specific details provided" | |
inquiry_message = ( | |
f"📞 *Follow Up Inquiry*\n\n" | |
f"Name and Location: {name_location}\n" | |
f"Phone: {from_number}\n" | |
f"Details: {details}" | |
) | |
send_whatsjet_message(receiving_number, inquiry_message) | |
# Send confirmation to user | |
send_whatsjet_message(from_number, | |
"✅ Thank you! Your inquiry has been received. Our team will contact you soon.\n\n" | |
"Type 'main' to return to the main menu.") | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
except Exception as e: | |
logger.error(f"[Contact] Error handling contact response: {e}") | |
# Instead of sending a generic error, return to main menu | |
welcome_msg = generate_veterinary_welcome_message() | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
async def handle_availability_inquiry(from_number: str, user_context: dict): | |
"""Handle availability inquiry""" | |
try: | |
current_product = user_context.get('current_product') | |
if current_product: | |
product_name = current_product.get('Product Name', 'N/A') | |
message = ( | |
f"📦 *Availability Inquiry*\n\n" | |
f"Product: {product_name}\n\n" | |
"Please provide:\n" | |
"• Your name and location\n" | |
"• Required quantity\n" | |
"• Delivery preferences\n\n" | |
"💬 *Example:* Dr. Ali – Multan, 50 bottles\n\n" | |
"💬 *Type 'main' to return to the main menu.*" | |
) | |
send_whatsjet_message(from_number, message) | |
context_manager.update_context( | |
from_number, | |
current_state='availability_request', | |
current_menu='availability_request', | |
current_menu_options=['Provide availability details'] | |
) | |
else: | |
send_whatsjet_message(from_number, | |
"❌ No product selected. Please search for a product first.") | |
except Exception as e: | |
logger.error(f"[Availability] Error handling availability inquiry: {e}") | |
# Instead of sending a generic error, return to main menu | |
welcome_msg = generate_veterinary_welcome_message() | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
async def handle_availability_request_response(from_number: str, response: str): | |
"""Handle availability request response""" | |
try: | |
# Save availability inquiry | |
availability_data = { | |
'phone_number': from_number, | |
'inquiry': response, | |
'timestamp': datetime.now().isoformat() | |
} | |
# Ensure directory exists | |
os.makedirs('contacts', exist_ok=True) | |
with open('contacts/availability_inquiries.json', 'a', encoding='utf-8') as f: | |
f.write(json.dumps(availability_data, ensure_ascii=False) + '\n') | |
# Send inquiry to receiving number (admin) | |
receiving_number = "923102288328" | |
current_product = context_manager.get_context(from_number).get('current_product', {}) | |
product_name = current_product.get('Product Name', 'N/A') if current_product else 'N/A' | |
# Parse the response to extract name/location, quantity, and delivery preferences | |
response_lines = [line.strip() for line in response.strip().split('\n') if line.strip()] | |
name_location = "Not provided" | |
quantity = "Not specified" | |
delivery_preferences = "Not specified" | |
if len(response_lines) >= 1: | |
name_location = response_lines[0] | |
if len(response_lines) >= 2: | |
quantity = response_lines[1] | |
if len(response_lines) >= 3: | |
delivery_preferences = response_lines[2] | |
inquiry_message = ( | |
f"📦 *Product Availability Inquiry*\n\n" | |
f"Product: {product_name}\n" | |
f"Name and Location: {name_location}\n" | |
f"Quantity: {quantity}\n" | |
f"Delivery Preferences: {delivery_preferences}\n" | |
f"Phone: {from_number}" | |
) | |
send_whatsjet_message(receiving_number, inquiry_message) | |
# Send confirmation to user | |
send_whatsjet_message(from_number, | |
"✅ Thank you! Your availability inquiry has been received. Our sales team will contact you soon.\n\n" | |
"Type 'main' to return to the main menu.") | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
except Exception as e: | |
logger.error(f"[Availability] Error handling availability response: {e}") | |
# Instead of sending a generic error, return to main menu | |
welcome_msg = generate_veterinary_welcome_message() | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
def send_helpful_guidance(from_number: str, current_state: str): | |
try: | |
if current_state == 'all_products_menu': | |
send_whatsjet_message(from_number, | |
"📋 *Products Menu*\n\n" | |
"Select a product number (1-23) to view detailed information.\n" | |
"Type 'main' to return to the main menu.\n" | |
"You can also type a product name to search.") | |
elif current_state == 'product_inquiry': | |
send_whatsjet_message(from_number, | |
"📦 *Product Details*\n\n" | |
"Select an option:\n" | |
"1️⃣ Contact Sales\n" | |
"2️⃣ Check Availability\n" | |
"3️⃣ Back to Main Menu\n" | |
"Type 'main' to return to main menu.") | |
elif current_state == 'category_selection_menu': | |
send_whatsjet_message(from_number, | |
"📁 *Category Selection*\n\n" | |
"Select a category number to view products.\n" | |
"Type 'main' to return to main menu.") | |
elif current_state == 'category_products_menu': | |
send_whatsjet_message(from_number, | |
"📦 *Category Products*\n\n" | |
"Select a product number to view details.\n" | |
"Type 'main' to return to main menu.") | |
elif current_state == 'contact_request': | |
send_whatsjet_message(from_number, | |
"📞 *Contact Request*\n\n" | |
"Please provide your name, location, and quantity.\n" | |
"Format: 'Name - Location, Quantity'\n" | |
"Example: 'Dr. Ali - Multan, 50 bottles'") | |
elif current_state == 'availability_request': | |
send_whatsjet_message(from_number, | |
"📦 *Availability Inquiry*\n\n" | |
"Please provide your location and quantity.\n" | |
"Format: 'Location, Quantity'\n" | |
"Example: 'Multan, 50 bottles'") | |
else: | |
send_whatsjet_message(from_number, | |
"💬 *Main Menu*\n\n" | |
"Available options:\n" | |
"1️⃣ Search Veterinary Products\n" | |
"2️⃣ Browse Categories\n" | |
"3️⃣ Download Catalog\n\n" | |
"Select an option or ask about specific products.") | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
except Exception as e: | |
logger.error(f"Error sending helpful guidance: {e}") | |
welcome_msg = generate_veterinary_welcome_message() | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
def is_greeting(text): | |
""" | |
Enhanced greeting detection using fuzzy matching and universal patterns. | |
Can detect variations like "Hy", "Hii", "Hallo", etc. without hardcoding. | |
""" | |
if not text: | |
return False | |
text_lower = text.lower().strip() | |
# Core greeting patterns that can be extended with variations | |
core_greetings = { | |
'hello': ['hello', 'hallo', 'helo', 'hlo', 'hallo', 'heloo', 'helloo'], | |
'hi': ['hi', 'hy', 'hii', 'hiii', 'hiiii', 'hie', 'hye', 'hai', 'hay'], | |
'hey': ['hey', 'heyy', 'heyyy', 'heey', 'heeyy', 'hay', 'hae'], | |
'good_morning': ['good morning', 'goodmorning', 'gm', 'gud morning', 'gudmorning'], | |
'good_afternoon': ['good afternoon', 'goodafternoon', 'ga', 'gud afternoon', 'gudafternoon'], | |
'good_evening': ['good evening', 'goodevening', 'ge', 'gud evening', 'gudevening'], | |
'good_night': ['good night', 'goodnight', 'gn', 'gud night', 'gudnight'], | |
'morning': ['morning', 'mornin', 'morn'], | |
'afternoon': ['afternoon', 'aftrnoon', 'aftr'], | |
'evening': ['evening', 'evnin', 'evn'], | |
'night': ['night', 'nite', 'nyt'], | |
'how_are_you': ['how are you', 'how r u', 'how are u', 'how r you', 'howru', 'howru'], | |
'whats_up': ['whats up', 'whats up', 'what is up', 'wassup', 'wassup', 'sup', 'sup'], | |
'assalamu_alaikum': ['assalamu alaikum', 'assalam alaikum', 'assalamu alaikom', 'assalam alaikom', 'asalamu alaikum', 'asalam alaikum', 'as-salamu alaykum', 'as salam alaykum', 'assalamu alaykum', 'assalam alaykum'], | |
'salam': ['salam', 'salaam', 'assalam', 'assalaam', 'salaam alaikum', 'salaam alaikom'], | |
'adaab': ['adaab', 'adaab arz hai', 'adaab arz', 'adaab arz karta hun'], | |
'namaste': ['namaste', 'namaskar', 'pranam', 'pranaam'], | |
'khuda_hafiz': ['khuda hafiz', 'allah hafiz', 'fi amanillah'], | |
'thank_you': ['thank you', 'thanks', 'shukriya', 'shukran', 'thnx', 'thx', 'tnx'] | |
} | |
# Flatten all variations into a single list for fuzzy matching | |
all_greeting_variations = [] | |
for variations in core_greetings.values(): | |
all_greeting_variations.extend(variations) | |
# 1. Exact match check (fastest) | |
if text_lower in all_greeting_variations: | |
return True | |
# 2. Check for greeting patterns with common prefixes/suffixes | |
greeting_patterns = [ | |
r'^(hi|hello|hey|hy|hii|hiii|hallo|helo|hlo|heyy|heyyy|heey|heeyy|hay|hae|hai|hye|hie)\s*$', | |
r'^(good\s+(morning|afternoon|evening|night)|gm|ga|ge|gn|gud\s+(morning|afternoon|evening|night))\s*$', | |
r'^(morning|afternoon|evening|night|mornin|morn|aftrnoon|aftr|evnin|evn|nite|nyt)\s*$', | |
r'^(how\s+(are\s+)?(you|u|r\s+u)|howru|howru)\s*$', | |
r'^(whats?\s+up|wassup|sup)\s*$', | |
r'^(assalamu?\s+alaik(um|om)|asalamu?\s+alaik(um|om)|salaam\s+alaik(um|om)|as-salamu?\s+alaykum|as\s+salam\s+alaykum)\s*$', | |
r'^(salam|salaam|assalam|assalaam)\s*$', | |
r'^(adaab(?:\s+arz(?:\s+(hai|karta\s+hun))?)?)\s*$', | |
r'^(namaste|namaskar|pranam|pranaam)\s*$', | |
r'^(khuda\s+hafiz|allah\s+hafiz|fi\s+amanillah)\s*$', | |
r'^(thank\s+you|thanks|shukriya|shukran|thnx|thx|tnx)\s*$' | |
] | |
for pattern in greeting_patterns: | |
if re.match(pattern, text_lower): | |
return True | |
# 3. Fuzzy matching for typos and variations (using rapidfuzz) | |
# Set a high threshold to avoid false positives | |
FUZZY_THRESHOLD = 90 # Increased threshold to 90% for better precision | |
# Check against all greeting variations | |
for greeting in all_greeting_variations: | |
# Use ratio for overall similarity | |
similarity = fuzz.ratio(text_lower, greeting) | |
if similarity >= FUZZY_THRESHOLD: | |
logger.info(f"Fuzzy greeting match: '{text_lower}' -> '{greeting}' (similarity: {similarity}%)") | |
return True | |
# 4. Check for greeting questions | |
greeting_questions = [ | |
'how are you', 'how r u', 'how are u', 'how do you do', 'how\'s it going', | |
'how is it going', 'how\'s everything', 'how is everything', | |
'what\'s up', 'whats up', 'what is up', 'how\'s life', 'how is life', | |
'آپ کیسے ہیں', 'آپ کیسے ہو', 'کیسے ہیں', 'کیسے ہو', 'کیا حال ہے', 'کیسا ہے' | |
] | |
for question in greeting_questions: | |
if question in text_lower: | |
return True | |
# 5. Check for greeting with common modifiers | |
greeting_modifiers = ['there', 'everyone', 'all', 'guys', 'folks', 'people'] | |
words = text_lower.split() | |
if len(words) >= 2: | |
first_word = words[0] | |
remaining_words = words[1:] | |
# Check if first word is a greeting and remaining words are modifiers | |
for greeting in all_greeting_variations: | |
if fuzz.ratio(first_word, greeting) >= FUZZY_THRESHOLD: | |
# Check if remaining words are all modifiers | |
if all(word in greeting_modifiers for word in remaining_words): | |
return True | |
# 6. Special case: Very short messages that are likely greetings | |
if len(text_lower) <= 4 and len(text_lower) >= 2: | |
# Check if it's a very short greeting-like word | |
short_greetings = ['hi', 'hy', 'hii', 'hey', 'heyy', 'hay', 'hae', 'hai', 'hye', 'hie'] | |
for short_greeting in short_greetings: | |
if fuzz.ratio(text_lower, short_greeting) >= 85: # Lower threshold for short words | |
logger.info(f"Short greeting match: '{text_lower}' -> '{short_greeting}'") | |
return True | |
# 7. Additional safety check: Avoid false positives for common non-greeting words | |
# that might have high similarity to greetings | |
non_greeting_words = [ | |
'help', 'here', 'her', 'his', 'him', 'hot', 'how', 'history', 'high', | |
'hint', 'hit', 'hill', 'hire', 'a', 'b', 'c', 'what', 'when', 'where', 'why' | |
] | |
# If the text is exactly one of these words, it's not a greeting | |
if text_lower in non_greeting_words: | |
return False | |
# 8. Check for product/inquiry keywords that indicate non-greeting intent | |
inquiry_keywords = [ | |
'need', 'want', 'looking', 'find', 'show', 'tell', 'give', 'products', | |
'medicine', 'antibiotics', 'veterinary', 'animals', 'cattle', 'poultry', | |
'catalog', 'price', 'availability', 'consultation', 'appointment', | |
'main', 'menu', 'start', 'home', 'back', '1', '2', '3', '4', '5' | |
] | |
# If any inquiry keyword is present, it's likely not just a greeting | |
for keyword in inquiry_keywords: | |
if keyword in text_lower: | |
return False | |
return False | |
async def handle_ai_chat_mode(from_number: str, query: str, reply_language: str = 'en'): | |
""" | |
Handle AI chat mode - completely separate from menu system | |
Uses OpenAI to provide intelligent responses based on CSV data | |
""" | |
# Force Urdu replies for Option 4 | |
reply_language = 'ur' | |
logger.info(f"[AI Chat] Forcing reply_language to Urdu for Option 4.") | |
try: | |
logger.info(f"[AI Chat] Processing query: '{query}' for {from_number} in {reply_language}") | |
# Check for navigation commands first | |
if query.lower().strip() in ['main', 'menu', 'start', 'home', 'back']: | |
logger.info(f"[AI Chat] Navigation command detected: '{query}' -> returning to main menu") | |
welcome_msg = generate_veterinary_welcome_message() | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
return | |
# Check for greetings - return to main menu | |
if is_greeting(query): | |
logger.info(f"[AI Chat] Greeting detected: '{query}' -> returning to main menu") | |
welcome_msg = generate_veterinary_welcome_message() | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
return | |
# Check if OpenAI is available | |
if not OPENAI_API_KEY: | |
if reply_language == 'ur': | |
send_whatsjet_message(from_number, "❌ AI Assistant requires OpenAI API. Please contact support.") | |
else: | |
send_whatsjet_message(from_number, "❌ AI Assistant requires OpenAI API. Please contact support.") | |
return | |
# Get all products data for context | |
all_products = [] | |
if products_df is not None and not products_df.empty: | |
all_products = products_df.to_dict('records') | |
# Create comprehensive context for AI | |
products_context = "" | |
if all_products: | |
products_context = "Available Veterinary Products:\n" | |
for i, product in enumerate(all_products[:50], 1): # Limit to first 50 products for context | |
product_name = product.get('Product Name', 'N/A') | |
category = product.get('Category', 'N/A') | |
composition = product.get('Composition', 'N/A') | |
target_species = product.get('Target Species', 'N/A') | |
products_context += f"{i}. {product_name} - {category}\n" | |
products_context += f" Composition: {composition}\n" | |
products_context += f" Target Species: {target_species}\n\n" | |
# Create AI prompt | |
if reply_language == 'ur': | |
prompt = f""" | |
آپ Apex Biotical کے Veterinary AI Assistant ہیں۔ آپ کو veterinary products اور treatments کے بارے میں معلومات فراہم کرنی ہیں۔ | |
یوزر کا سوال: {query} | |
دستیاب veterinary products: | |
{products_context} | |
براہ کرم: | |
1. یوزر کے سوال کا جواب دیں | |
2. اگر یہ veterinary products سے متعلق ہے تو relevant products کی معلومات دیں | |
3. اگر یہ general veterinary advice ہے تو professional guidance دیں | |
4. اردو میں جواب دیں | |
5. جواب professional اور helpful ہو | |
جواب: | |
""" | |
else: | |
prompt = f""" | |
You are Apex Biotical's Veterinary AI Assistant. You provide information about veterinary products and treatments. | |
User Query: {query} | |
Available Veterinary Products: | |
{products_context} | |
Please: | |
1. Answer the user's question | |
2. If it's related to veterinary products, provide relevant product information | |
3. If it's general veterinary advice, provide professional guidance | |
4. Answer in English | |
5. Keep the response professional and helpful | |
Response: | |
""" | |
# Get AI response | |
response = openai.ChatCompletion.create( | |
model="gpt-4o", | |
messages=[{"role": "user", "content": prompt}], | |
temperature=0.7, | |
max_tokens=500 | |
) | |
ai_response = response.choices[0].message.content.strip() | |
# Add instructions for returning to main menu | |
if reply_language == 'ur': | |
ai_response += "\n\n💬 *Type 'main' to return to main menu*" | |
else: | |
ai_response += "\n\n💬 *Type 'main' to return to main menu*" | |
# Translate response if needed | |
if reply_language == 'ur': | |
try: | |
# Get all product and category names | |
product_names = [str(p.get('Product Name', '')) for p in all_products if p.get('Product Name')] | |
category_names = list(set([str(p.get('Category', '')) for p in all_products if p.get('Category')])) | |
# Add company names to preserve in English | |
company_names = ['Apex Biotical Solutions', 'Apex Biotical', 'Apex'] | |
translated_response = GoogleTranslator(source='auto', target='ur').translate(ai_response) | |
# Restore English terms including company names | |
translated_response = restore_english_terms(translated_response, ai_response, product_names + company_names, category_names) | |
send_whatsjet_message(from_number, translated_response) | |
except Exception as e: | |
logger.error(f"[AI] Translation error: {e}") | |
send_whatsjet_message(from_number, ai_response) | |
else: | |
send_whatsjet_message(from_number, ai_response) | |
# Update context to AI chat mode | |
context_manager.update_context( | |
from_number, | |
current_state='ai_chat_mode', | |
current_menu='ai_chat_mode', | |
current_menu_options=['main'], | |
last_ai_query=query, | |
last_ai_response=ai_response | |
) | |
# Add to conversation history | |
context_manager.add_to_history(from_number, query, ai_response) | |
logger.info(f"[AI Chat] Response sent successfully to {from_number}") | |
except Exception as e: | |
logger.error(f"[AI Chat] Error processing query: {e}") | |
if reply_language == 'ur': | |
error_msg = "❌ AI Assistant میں error آ گیا ہے۔ براہ کرم دوبارہ کوشش کریں یا 'main' لکھ کر main menu پر واپس جائیں۔" | |
else: | |
error_msg = "❌ AI Assistant encountered an error. Please try again or type 'main' to return to main menu." | |
send_whatsjet_message(from_number, error_msg) | |
# Load products on startup | |
def load_products_data(): | |
"""Load products data from CSV file""" | |
global products_df | |
try: | |
if os.path.exists(CSV_FILE): | |
products_df = pd.read_csv(CSV_FILE) | |
logger.info(f"✅ Loaded {len(products_df)} products from {CSV_FILE}") | |
else: | |
logger.warning(f"⚠️ CSV file {CSV_FILE} not found") | |
products_df = pd.DataFrame() | |
except Exception as e: | |
logger.error(f"❌ Error loading products data: {e}") | |
products_df = pd.DataFrame() | |
load_products_data() | |
# Add these functions after the existing imports and before the main functions | |
def get_product_image_path(product_name: str) -> str: | |
""" | |
Get the cPanel image URL for a product based on its name. | |
Only uses cPanel public URL format: https://amgocus.com/uploads/images/<normalized_name>.<ext> | |
Normalized: lowercase, remove spaces/underscores/dots, preserve dashes. | |
""" | |
try: | |
def normalize(name): | |
return re.sub(r'[\s_\.]', '', name).lower() | |
normalized_name = normalize(product_name) | |
logger.info(f"[Image] Normalized product name: '{product_name}' -> '{normalized_name}'") | |
image_extensions = ['.png', '.jpg', '.jpeg', '.webp'] | |
base_url = "https://amgocus.com/uploads/images/" | |
# Check for all possible extensions | |
for ext in image_extensions: | |
image_url = f"{base_url}{normalized_name}{ext}" | |
logger.info(f"[Image] Checking cPanel image URL: {image_url}") | |
# For cPanel URLs, assume they are accessible if they start with http | |
if image_url.startswith('http'): | |
logger.info(f"[Image] Found cPanel image URL: {image_url}") | |
return image_url | |
# Fallback: try original name with spaces as %20 | |
safe_name = product_name.strip().replace(' ', '%20') | |
for ext in image_extensions: | |
image_url = f"{base_url}{safe_name}{ext}" | |
logger.info(f"[Image] Checking fallback cPanel image URL: {image_url}") | |
if image_url.startswith('http'): | |
logger.info(f"[Image] Found cPanel image URL (fallback): {image_url}") | |
return image_url | |
logger.warning(f"[Image] No cPanel image found for product: {product_name}") | |
return None | |
except Exception as e: | |
logger.error(f"[Image] Error generating cPanel image URL for {product_name}: {e}") | |
return None | |
def get_product_image_media_type(image_path: str) -> str: | |
""" | |
Determine the media type based on file extension | |
""" | |
if not image_path: | |
return None | |
ext = os.path.splitext(image_path)[1].lower() | |
media_type_map = { | |
'.jpg': 'image/jpeg', | |
'.jpeg': 'image/jpeg', | |
'.png': 'image/png', | |
'.webp': 'image/webp', | |
'.gif': 'image/gif' | |
} | |
return media_type_map.get(ext, 'image/jpeg') | |
async def send_product_with_image(from_number: str, product: Dict[str, Any], user_context: Dict[str, Any]): | |
""" | |
Send product information with image if available | |
""" | |
try: | |
product_name = product.get('Product Name', 'Unknown Product') | |
# Generate product response | |
response = generate_veterinary_product_response(product, user_context) | |
# Try to get product image | |
image_path = get_product_image_path(product_name) | |
if image_path and os.path.exists(image_path): | |
# Send product info with image | |
media_type = get_product_image_media_type(image_path) | |
filename = f"{product_name.replace(' ', '_')}.jpg" | |
success = send_whatsjet_message( | |
from_number, | |
response, | |
media_type=media_type, | |
media_path=image_path, | |
filename=filename | |
) | |
if success: | |
logger.info(f"[Product] Successfully sent product with image: {product_name}") | |
else: | |
# Fallback to text-only if image send fails | |
logger.warning(f"[Product] Failed to send image, sending text only: {product_name}") | |
send_whatsjet_message(from_number, response) | |
else: | |
# Send text-only response | |
send_whatsjet_message(from_number, response) | |
logger.info(f"[Product] Sent product info without image: {product_name}") | |
except Exception as e: | |
logger.error(f"[Product] Error sending product with image: {e}") | |
# Fallback to text-only | |
response = generate_veterinary_product_response(product, user_context) | |
send_whatsjet_message(from_number, response) | |
async def send_enhanced_pdf(from_number: str, product: Dict[str, Any], pdf_content: bytes = None): | |
""" | |
Send PDF with enhanced formatting and proper WhatsApp document sharing | |
""" | |
try: | |
product_name = product.get('Product Name', 'Unknown_Product') | |
safe_name = re.sub(r'[^\w\s-]', '', product_name).replace(' ', '_') | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
filename = f"{safe_name}_Product_Info_{timestamp}.pdf" | |
# Generate PDF if not provided | |
if pdf_content is None: | |
pdf_content = generate_veterinary_pdf(product) | |
# Save PDF to uploads directory | |
uploads_dir = "uploads" | |
os.makedirs(uploads_dir, exist_ok=True) | |
pdf_path = os.path.join(uploads_dir, filename) | |
with open(pdf_path, 'wb') as f: | |
f.write(pdf_content) | |
# Send PDF as document via WhatsApp | |
success = send_whatsjet_message( | |
from_number, | |
f"📄 *{product_name} - Detailed Product Information*\n\n" | |
f"📎 Here's the complete product information in PDF format.\n" | |
f"📋 Includes: Composition, Dosage, Precautions, Storage\n\n" | |
f"💬 Type 'main' to return to main menu.", | |
media_type="application/pdf", | |
media_path=pdf_path, | |
filename=filename | |
) | |
if success: | |
logger.info(f"[PDF] Successfully sent PDF for product: {product_name}") | |
else: | |
# Fallback: Send download link | |
server_url = os.getenv("SERVER_URL", "https://your-huggingface-space-url.hf.space") | |
download_url = f"{server_url}/uploads/{filename}" | |
message = ( | |
f"📄 *{product_name} - Product Information*\n\n" | |
f"📎 [Download Product PDF]({download_url})\n\n" | |
f"💬 *Click the link above to download the detailed product information*\n" | |
f"Type 'main' to return to main menu." | |
) | |
send_whatsjet_message(from_number, message) | |
logger.info(f"[PDF] Sent PDF download link for product: {product_name}") | |
except Exception as e: | |
logger.error(f"[PDF] Error sending enhanced PDF: {e}") | |
# Fallback to basic text response | |
response = generate_veterinary_product_response(product, {}) | |
send_whatsjet_message(from_number, response) | |
# Enhanced product response function with image support | |
def generate_veterinary_product_response_with_media(product_info: Dict[str, Any], user_context: Dict[str, Any]) -> Dict[str, Any]: | |
""" | |
Generate comprehensive veterinary product response with media information | |
Returns a dictionary with text response and media info | |
""" | |
def clean_text(text): | |
if pd.isna(text) or text is None: | |
return "Not specified" | |
return str(text).strip() | |
product_name = clean_text(product_info.get('Product Name', '')) | |
product_type = clean_text(product_info.get('Type', '')) | |
category = clean_text(product_info.get('Category', '')) | |
indications = clean_text(product_info.get('Indications', '')) | |
pdf_link = "" | |
try: | |
csv_data = pd.read_csv('Veterinary.csv') | |
product_row = csv_data[csv_data['Product Name'] == product_name] | |
if not product_row.empty: | |
brochure_link = product_row.iloc[0].get('Brochure (PDF)', '') | |
if pd.notna(brochure_link) and brochure_link.strip(): | |
pdf_link = brochure_link.strip() | |
except Exception as e: | |
logger.warning(f"Error checking PDF link for {product_name}: {e}") | |
response_text = f"""🧪 *Name:* {product_name}\n📦 *Type:* {product_type}\n🏥 *Category:* {category}\n💊 *Used For:* {indications}""" | |
if pdf_link: | |
response_text += f"\n\n📄 Product Brochure Available\n🔗 {product_name} PDF:\n{pdf_link}" | |
response_text += f""" | |
\n💬 *Available Actions:* | |
1️⃣ Talk to Veterinary Consultant | |
2️⃣ Inquire About Availability | |
3️⃣ Back to Main Menu | |
\n💬 Select an option or ask about related products""" | |
image_path = get_product_image_path(product_name) | |
has_image = image_path is not None and os.path.exists(image_path) | |
return { | |
'text': response_text, | |
'has_image': has_image, | |
'image_path': image_path, | |
'product_name': product_name | |
} | |
def ensure_images_dir(): | |
"""Ensure the images directory exists""" | |
images_dir = "static/images" | |
os.makedirs(images_dir, exist_ok=True) | |
logger.info(f"[Image] Ensured images directory exists: {images_dir}") | |
# New feature: Send product image with caption (product details) | |
async def send_product_image_with_caption(from_number: str, product: Dict[str, Any], user_context: Dict[str, Any]): | |
""" | |
Send product image (if available) with product details as caption in a single WhatsApp message. | |
Only uses cPanel images from https://amgocus.com/uploads/images/ | |
If image is not available, send only the product details as text. | |
""" | |
ensure_images_dir() | |
product_name = product.get('Product Name', 'Unknown Product') | |
details = generate_veterinary_product_response(product, user_context) | |
logger.info(f"[Product] Processing cPanel image for product: {product_name}") | |
try: | |
# Get cPanel image URL for the product | |
image_url = get_product_image_path(product_name) | |
if image_url and image_url.startswith('http'): | |
logger.info(f"[Product] Found cPanel image URL: {image_url}") | |
# Test if the cPanel image URL is accessible | |
try: | |
logger.info(f"[Product] Testing cPanel image URL accessibility: {image_url}") | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', | |
'Accept': 'image/webp,image/apng,image/*,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.9', | |
'Accept-Encoding': 'gzip, deflate, br', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1' | |
} | |
test_response = requests.head(image_url, headers=headers, timeout=10, allow_redirects=True) | |
if test_response.status_code != 200: | |
logger.warning(f"[Product] cPanel image URL not accessible (status {test_response.status_code}): {image_url}") | |
raise Exception(f"cPanel image URL not accessible: {test_response.status_code}") | |
logger.info(f"[Product] cPanel image URL is accessible") | |
except Exception as e: | |
logger.warning(f"[Product] Failed to test cPanel image URL {image_url}: {e}") | |
image_url = None | |
# Send image with caption using the correct WhatsJet API | |
if image_url: | |
logger.info(f"[Product] Attempting to send cPanel image with caption for: {product_name}") | |
# Use the correct WhatsJet media endpoint with caption | |
url = f"{WHATSJET_API_URL}/{WHATSJET_VENDOR_UID}/contact/send-media-message" | |
headers = { | |
"Authorization": f"Bearer {WHATSJET_API_TOKEN}", | |
"Content-Type": "application/json" | |
} | |
payload = { | |
"phone_number": from_number, | |
"media_type": "image", | |
"media_url": image_url, | |
"caption": details, | |
"file_name": f"{product_name.replace(' ', '_')}.jpg" | |
} | |
try: | |
logger.info(f"[Product] Sending image with caption using WhatsJet API: {payload}") | |
response = httpx.post(url, json=payload, headers=headers, timeout=30) | |
logger.info(f"[Product] WhatsJet response status: {response.status_code}") | |
logger.info(f"[Product] WhatsJet response body: {response.text[:500]}...") | |
if response.status_code == 200: | |
logger.info(f"[Product] Successfully sent cPanel image with caption for product: {product_name}") | |
return | |
else: | |
logger.warning(f"[Product] Failed to send image with caption, trying separate messages: {product_name}") | |
# Fallback to separate messages | |
image_success = send_whatsjet_media_image_only(from_number, image_url, f"{product_name.replace(' ', '_')}.jpg") | |
if image_success: | |
await asyncio.sleep(1) | |
send_whatsjet_message(from_number, details) | |
return | |
else: | |
logger.warning(f"[Product] Failed to send cPanel image, sending text only: {product_name}") | |
except Exception as e: | |
logger.error(f"[Product] Error sending image with caption: {e}") | |
# Fallback to separate messages | |
image_success = send_whatsjet_media_image_only(from_number, image_url, f"{product_name.replace(' ', '_')}.jpg") | |
if image_success: | |
await asyncio.sleep(1) | |
send_whatsjet_message(from_number, details) | |
return | |
else: | |
logger.warning(f"[Product] Failed to send cPanel image, sending text only: {product_name}") | |
# No cPanel image available, send text only | |
logger.info(f"[Product] No cPanel image available, sending text only for: {product_name}") | |
send_whatsjet_message(from_number, details) | |
except Exception as e: | |
logger.error(f"[Product] Error sending product image with caption: {e}") | |
logger.info(f"[Product] Falling back to text-only message for: {product_name}") | |
send_whatsjet_message(from_number, details) | |
# Test endpoint for product image with caption | |
async def test_product_image_with_caption(phone: str): | |
"""Test endpoint for sending product image with caption""" | |
try: | |
if products_df is None or products_df.empty: | |
return {"error": "No products loaded"} | |
# Get first product for testing | |
product = products_df.iloc[0].to_dict() | |
user_context = {} | |
await send_product_image_with_caption(phone, product, user_context) | |
return { | |
"success": True, | |
"message": f"Test product image sent to {phone}", | |
"product": product.get('Product Name', 'Unknown') | |
} | |
except Exception as e: | |
logger.error(f"Error in test product image with caption: {e}") | |
return {"error": str(e)} | |
# Test endpoint for image sending | |
async def test_image_sending(phone: str, image_url: str = "https://amgocus.com/uploads/images/respiraaidplus.png"): | |
"""Test endpoint for sending images via WhatsApp""" | |
try: | |
filename = "test_image.jpg" | |
success = send_whatsjet_message( | |
phone, | |
"🖼️ *Test Image*\n\nThis is a test image sent via WhatsApp API.", | |
media_type="image/jpeg", | |
media_path=image_url, | |
filename=filename | |
) | |
if success: | |
return { | |
"success": True, | |
"message": f"Test image sent successfully to {phone}", | |
"image_url": image_url | |
} | |
else: | |
return { | |
"success": False, | |
"message": f"Failed to send test image to {phone}", | |
"image_url": image_url | |
} | |
except Exception as e: | |
logger.error(f"Error in test image sending: {e}") | |
return {"error": str(e)} | |
# Debug endpoint for WhatsJet | |
async def debug_whatsjet(): | |
"""Debug endpoint to check WhatsJet configuration""" | |
try: | |
config = { | |
"api_url": WHATSJET_API_URL, | |
"vendor_uid": WHATSJET_VENDOR_UID, | |
"api_token": "***" if WHATSJET_API_TOKEN else None, | |
"server_url": SERVER_URL, | |
"openai_key": "***" if OPENAI_API_KEY else None | |
} | |
return { | |
"status": "success", | |
"config": config, | |
"timestamp": datetime.now().isoformat() | |
} | |
except Exception as e: | |
return { | |
"status": "error", | |
"error": str(e), | |
"timestamp": datetime.now().isoformat() | |
} | |
# Test endpoint for WhatsJet payloads | |
async def test_whatsjet_payloads(phone: str): | |
"""Test endpoint to check WhatsJet payloads""" | |
try: | |
# Test basic message sending | |
test_message = "🧪 *WhatsJet Test*\n\nThis is a test message to verify WhatsJet integration." | |
success = send_whatsjet_message(phone, test_message) | |
return { | |
"status": "success" if success else "failed", | |
"message": f"WhatsJet test message sent to {phone}", | |
"success": success, | |
"timestamp": datetime.now().isoformat() | |
} | |
except Exception as e: | |
return { | |
"status": "error", | |
"error": str(e), | |
"timestamp": datetime.now().isoformat() | |
} | |
# Test endpoint for cPanel image access | |
async def test_cpanel_image_access(): | |
""" | |
Test endpoint to check if cPanel image URLs are now accessible with browser-like headers. | |
""" | |
try: | |
image_url = "https://amgocus.com/uploads/images/Respira%20Aid%20Plus.jpg" | |
# Test with browser-like headers | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', | |
'Accept': 'image/webp,image/apng,image/*,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.9', | |
'Accept-Encoding': 'gzip, deflate, br', | |
'Connection': 'keep-alive', | |
'Upgrade-Insecure-Requests': '1' | |
} | |
logger.info(f"[Test] Testing cPanel image URL with browser headers: {image_url}") | |
response = requests.get(image_url, headers=headers, timeout=10, stream=True, allow_redirects=True) | |
result = { | |
"image_url": image_url, | |
"status_code": response.status_code, | |
"headers": dict(response.headers), | |
"accessible": response.status_code == 200, | |
"timestamp": datetime.now().isoformat() | |
} | |
if response.status_code == 200: | |
logger.info(f"[Test] ✅ cPanel image URL is now accessible!") | |
else: | |
logger.warning(f"[Test] ❌ cPanel image URL still not accessible (status {response.status_code})") | |
return result | |
except Exception as e: | |
logger.error(f"[Test] Error testing cPanel image access: {e}") | |
return { | |
"error": str(e), | |
"image_url": image_url, | |
"timestamp": datetime.now().isoformat() | |
} | |
def format_number_with_emoji(number: int) -> str: | |
"""Format number with emoji""" | |
emoji_map = { | |
1: "1️⃣", 2: "2️⃣", 3: "3️⃣", 4: "4️⃣", 5: "5️⃣", | |
6: "6️⃣", 7: "7️⃣", 8: "8️⃣", 9: "9️⃣", 10: "🔟", | |
11: "1️⃣1️⃣", 12: "1️⃣2️⃣", 13: "1️⃣3️⃣", 14: "1️⃣4️⃣", 15: "1️⃣5️⃣", | |
16: "1️⃣6️⃣", 17: "1️⃣7️⃣", 18: "1️⃣8️⃣", 19: "1️⃣9️⃣", 20: "2️⃣0️⃣", | |
21: "2️⃣1️⃣", 22: "2️⃣2️⃣", 23: "2️⃣3️⃣" | |
} | |
return emoji_map.get(number, f"{number}.") | |
async def display_all_products(from_number: str): | |
"""Display all products in multiple messages and update menu context""" | |
try: | |
user_context = context_manager.get_context(from_number) | |
current_state = user_context.get('current_state', 'main_menu') | |
logger.info(f"[Display] display_all_products called for {from_number} in state: {current_state}") | |
if current_state == 'all_products_menu': | |
logger.warning(f"[Display] Already in all_products_menu state for {from_number}, skipping display") | |
return | |
if products_df is None or products_df.empty: | |
send_whatsjet_message(from_number, "❌ No products available at the moment.") | |
return | |
# Set state to all_products_menu and store menu context | |
products = products_df.to_dict('records') | |
context_manager.update_context( | |
from_number, | |
current_state='all_products_menu', | |
current_menu='all_products_menu', | |
current_menu_options=[p.get('Product Name', 'Unknown') for p in products], | |
available_products=products | |
) | |
logger.info(f"[Display] Set state to all_products_menu for {from_number}") | |
# Send products in chunks | |
chunk_size = 5 | |
for i in range(0, len(products), chunk_size): | |
chunk = products[i:i + chunk_size] | |
message = f"📋 *Products List ({i+1}-{min(i+chunk_size, len(products))} of {len(products)})*\n\n" | |
for j, product in enumerate(chunk, i+1): | |
message += f"{format_number_with_emoji(j)} {product.get('Product Name', 'Unknown')}\n" | |
if product.get('Category'): | |
message += f" Category: {product.get('Category')}\n" | |
message += "\n" | |
send_whatsjet_message(from_number, message) | |
send_whatsjet_message(from_number, | |
"💬 Type a product name to get detailed information, or type 'main' to return to main menu.") | |
except Exception as e: | |
logger.error(f"[Display] Error displaying products: {e}") | |
send_whatsjet_message(from_number, "❌ Error displaying products. Please try again.") | |
def get_all_categories(): | |
"""Return a list of all unique categories from the products DataFrame""" | |
if products_df is not None and not products_df.empty: | |
return list(products_df['Category'].unique()) | |
return [] | |
def get_products_by_category(category: str): | |
"""Get products by category""" | |
if products_df is None or products_df.empty: | |
return [] | |
category_products = products_df[products_df['Category'] == category] | |
return category_products.to_dict('records') | |
async def handle_category_selection(selection: str, from_number: str): | |
"""Handle category selection""" | |
try: | |
user_context = context_manager.get_context(from_number) | |
available_categories = user_context.get('available_categories', []) | |
if selection.isdigit() and 1 <= int(selection) <= len(available_categories): | |
selected_category = available_categories[int(selection) - 1] | |
products = get_products_by_category(selected_category) | |
if products: | |
# Update context with category products | |
context_manager.update_context( | |
from_number, | |
current_category=selected_category, | |
current_state='category_products_menu', | |
current_menu='category_products_menu', | |
current_menu_options=[p.get('Product Name', 'Unknown') for p in products], | |
available_products=products | |
) | |
# Send category products | |
message = f"📦 *Products in {selected_category}*\n\n" | |
for i, product in enumerate(products, 1): | |
message += f"{format_number_with_emoji(i)} {product.get('Product Name', 'Unknown')}\n" | |
if product.get('Target Species'): | |
message += f" Target: {product.get('Target Species')}\n" | |
message += "\n" | |
message += "💬 Select a product number or type 'main' to return to main menu." | |
send_whatsjet_message(from_number, message) | |
else: | |
send_whatsjet_message(from_number, f"❌ No products found in {selected_category} category.") | |
else: | |
send_whatsjet_message(from_number, "❌ Invalid selection. Please choose a valid category number.") | |
except Exception as e: | |
logger.error(f"[Category] Error handling category selection: {e}") | |
send_helpful_guidance(from_number, 'category_selection_menu') | |
def get_menu_validation_message(current_state: str, user_context: dict) -> str: | |
"""Get appropriate validation message for current menu state""" | |
if current_state == 'main_menu': | |
return ( | |
"❌ *Invalid Selection*\n\n" | |
"Please choose from the main menu:\n" | |
"1️⃣ Search Veterinary Products\n" | |
"2️⃣ Browse Categories\n" | |
"3️⃣ Download Catalog\n" | |
"4️⃣ Chat with Veterinary AI Assistant\n\n" | |
"💬 *You can also:*\n" | |
"• Type a product name (e.g., 'hydropex', 'respira aid plus')\n" | |
"• Type 'main' to refresh the menu" | |
) | |
elif current_state == 'all_products_menu': | |
if products_df is not None and not products_df.empty: | |
total_products = len(products_df) | |
return ( | |
f"❌ *Invalid Product Selection*\n\n" | |
f"Please choose a product number between 1 and {total_products}.\n\n" | |
"💬 *You can also:*\n" | |
"• Type a product name (e.g., 'hydropex', 'respira aid plus')\n" | |
"• Type 'main' to return to main menu" | |
) | |
else: | |
return "❌ No products available. Type 'main' to return to main menu." | |
elif current_state == 'category_products_menu': | |
available_products = user_context.get('available_products', []) | |
if available_products: | |
return ( | |
f"❌ *Invalid Product Selection*\n\n" | |
f"Please choose a product number between 1 and {len(available_products)}.\n\n" | |
"💬 *You can also:*\n" | |
"• Type a product name (e.g., 'hydropex', 'respira aid plus')\n" | |
"• Type 'main' to return to main menu" | |
) | |
else: | |
return "❌ No products available in this category. Type 'main' to return to main menu." | |
elif current_state == 'category_selection_menu': | |
available_categories = user_context.get('available_categories', []) | |
if available_categories: | |
return ( | |
f"❌ *Invalid Category Selection*\n\n" | |
f"Please choose a category number between 1 and {len(available_categories)}.\n\n" | |
"💬 *You can also:*\n" | |
"• Type a product name (e.g., 'hydropex', 'respira aid plus')\n" | |
"• Type 'main' to return to main menu" | |
) | |
else: | |
return "❌ No categories available. Type 'main' to return to main menu." | |
elif current_state == 'product_inquiry': | |
return ( | |
"❌ *Invalid Selection*\n\n" | |
"Please choose an option:\n" | |
"1️⃣ Talk to Veterinary Consultant\n" | |
"2️⃣ Inquire About Availability\n" | |
"3️⃣ Back to Main Menu\n\n" | |
"💬 *You can also:*\n" | |
"• Type a product name (e.g., 'hydropex', 'respira aid plus')\n" | |
"• Type 'main' to return to main menu" | |
) | |
elif current_state == 'intelligent_products_menu': | |
return ( | |
"❌ *Invalid Selection*\n\n" | |
"Please choose a product number between 1 and {len(available_products)}.\n\n" | |
"💬 *You can also:*\n" | |
"• Type a product name (e.g., 'hydropex', 'respira aid plus')\n" | |
"• Type 'main' to return to main menu" | |
) | |
else: | |
return ( | |
"❌ *Invalid Selection*\n\n" | |
"Please choose a valid option or type 'main' to return to main menu.\n\n" | |
"💬 *You can also:*\n" | |
"• Type a product name (e.g., 'hydropex', 'respira aid plus')" | |
) | |
def is_valid_menu_selection(selection: str, current_state: str, user_context: dict) -> bool: | |
"""Check if selection is valid for current menu state""" | |
is_valid, _ = validate_menu_selection(selection, current_state, user_context) | |
return is_valid | |
def generate_veterinary_welcome_message(phone_number=None, user_context=None): | |
"""Generate veterinary welcome message""" | |
return ( | |
"🏥 Welcome to Apex Biotical Solutions Veterinary Virtual Assistant\n\n" | |
"How can I help you today?\n\n" | |
"📋 Main Menu:\n" | |
"1️⃣ Complete Products List\n" | |
"2️⃣ Browse Categories\n" | |
"3️⃣ Download Catalog\n" | |
"4️⃣ Chat with Veterinary AI Assistant\n\n" | |
"💬 Quick Actions:\n" | |
"* Type a product name (e.g., 'hydropex', 'respira aid plus')\n" | |
"* Ask about symptoms (e.g., 'respiratory problems', 'liver support')\n" | |
"* Search by category (e.g., 'antibiotics', 'vitamins')\n\n" | |
"🎤 Voice messages are supported!\n" | |
"You can speak product names, menu numbers, or ask questions." | |
) | |
async def handle_veterinary_product_followup(selection: str, from_number: str) -> None: | |
""" | |
Handle product follow-up selections with enhanced veterinary domain support | |
""" | |
try: | |
user_context = context_manager.get_context(from_number) | |
current_product = user_context.get('current_product') | |
if not current_product: | |
send_whatsjet_message(from_number, "❌ No product selected. Please search for a product first.") | |
return | |
if selection == '1': | |
# Talk to Veterinary Consultant | |
product_name = current_product.get('Product Name', 'the selected product') | |
consultant_msg = ( | |
f"📞 Contact Veterinary Consultant\n\n" | |
f"Product: {product_name}\n\n" | |
"Please provide your details:\n" | |
"* Name and location\n" | |
"* Specific inquiry\n\n" | |
"💬 Example: Dr. Ali - Multan - Need consultation for respiratory problems\n\n" | |
"Type main at any time to go to main menu." | |
) | |
send_whatsjet_message(from_number, consultant_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='contact_request', | |
current_menu='contact_request', | |
current_menu_options=['Provide contact details'] | |
) | |
elif selection == '2': | |
# Inquire about Product Availability | |
await handle_availability_inquiry(from_number, user_context) | |
elif selection == '3': | |
welcome_msg = generate_veterinary_welcome_message(from_number, user_context) | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
return | |
else: | |
send_whatsjet_message(from_number, "❌ Invalid selection. Please choose 1, 2, or 3.") | |
return | |
except Exception as e: | |
logger.error(f"Error in product follow-up: {e}") | |
user_context = context_manager.get_context(from_number) | |
welcome_msg = generate_veterinary_welcome_message(from_number, user_context) | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
# Add or update the following functions in app.py: | |
# --- Restore handle_voice_message_complete --- | |
async def handle_voice_message_complete(from_number: str, msg: dict): | |
"""Complete voice message processing with OpenAI transcription - treats voice exactly like text""" | |
try: | |
logger.info(f"[Voice] Processing voice message from {from_number}") | |
logger.info(f"[Voice] Message structure: {msg}") | |
# Check if OpenAI is available | |
if not OPENAI_API_KEY: | |
send_whatsjet_message(from_number, | |
"🎤 Voice messages require OpenAI API. Please send a text message or type 'main' to see the menu.") | |
return | |
# Extract media URL from different possible locations | |
media_url = None | |
logger.info(f"[Voice] Checking media URL locations...") | |
if msg.get('media', {}).get('link'): | |
media_url = msg.get('media', {}).get('link') | |
logger.info(f"[Voice] Found media URL in media.link: {media_url}") | |
elif msg.get('media', {}).get('url'): | |
media_url = msg.get('media', {}).get('url') | |
logger.info(f"[Voice] Found media URL in media.url: {media_url}") | |
elif msg.get('url'): | |
media_url = msg.get('url') | |
logger.info(f"[Voice] Found media URL in url: {media_url}") | |
elif msg.get('audio', {}).get('url'): | |
media_url = msg.get('audio', {}).get('url') | |
logger.info(f"[Voice] Found media URL in audio.url: {media_url}") | |
else: | |
logger.error(f"[Voice] No media URL found in message structure") | |
logger.error(f"[Voice] Available fields: {list(msg.keys())}") | |
if 'media' in msg: | |
logger.error(f"[Voice] Media fields: {list(msg['media'].keys())}") | |
logger.info(f"[Voice] Final extracted media URL: {media_url}") | |
if not media_url: | |
send_whatsjet_message(from_number, "❌ Could not process voice message. Please try again.") | |
return | |
# Generate unique filename | |
filename = f"voice_{from_number}_{int(time.time())}.ogg" | |
# Download voice file | |
file_path = await download_voice_file(media_url, filename) | |
if not file_path: | |
send_whatsjet_message(from_number, "❌ Failed to download voice message. Please try again.") | |
return | |
# Transcribe with OpenAI | |
transcribed_text = await transcribe_voice_with_openai(file_path) | |
# Clean up voice file immediately | |
try: | |
os.remove(file_path) | |
except: | |
pass | |
# Handle empty, failed, or unclear transcription | |
if not transcribed_text or transcribed_text.strip() == "" or transcribed_text.lower() == "unclear audio": | |
logger.warning(f"[Voice] Empty or unclear transcription for {from_number}: '{transcribed_text}'") | |
send_whatsjet_message(from_number, | |
"🎤 *Voice Message Issue*\n\n" | |
"I couldn't understand your voice message clearly. This can happen due to:\n" | |
"• Very short voice note\n" | |
"• Background noise\n" | |
"• Microphone too far away\n" | |
"• Audio quality issues\n" | |
"• Speaking too fast\n\n" | |
"💡 *Tips for better voice notes:*\n" | |
"• Speak clearly and slowly\n" | |
"• Keep phone close to mouth\n" | |
"• Record in quiet environment\n" | |
"• Make voice note at least 2-3 seconds\n" | |
"• Speak in English or Urdu only\n\n" | |
"💬 *You can also:*\n" | |
"• Send a text message\n" | |
"• Type 'main' to see menu options\n" | |
"• Try voice note again") | |
return | |
# Process transcribed text with full intelligence | |
logger.info(f"[Voice] Transcribed: {transcribed_text}") | |
# Apply transcription error corrections | |
corrected_text = process_voice_input(transcribed_text) | |
if corrected_text != transcribed_text: | |
logger.info(f"[Voice] Applied corrections: '{transcribed_text}' -> '{corrected_text}'") | |
transcribed_text = corrected_text | |
# Detect language of transcribed text - STRICTLY ENGLISH OR URDU ONLY | |
detected_lang = 'en' # Default to English | |
try: | |
detected_lang = detect(transcribed_text) | |
logger.info(f"[Voice] Raw detected language: {detected_lang}") | |
# STRICTLY ENGLISH OR URDU ONLY - NO OTHER LANGUAGES | |
# Only allow English and Urdu, reject everything else | |
if detected_lang in ['en', 'ur']: | |
reply_language = detected_lang | |
else: | |
# Force any other language to English | |
reply_language = 'en' | |
logger.warning(f"[Voice] Detected language '{detected_lang}' is not English or Urdu, forcing to English") | |
# Check if text contains Urdu/Arabic characters or Islamic greetings | |
urdu_arabic_pattern = re.compile(r'[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF]') | |
islamic_greetings = ['assalamu', 'assalam', 'salam', 'salaam', 'adaab', 'namaste', 'khuda', 'allah'] | |
has_urdu_chars = bool(urdu_arabic_pattern.search(transcribed_text)) | |
has_islamic_greeting = any(greeting in transcribed_text.lower() for greeting in islamic_greetings) | |
if has_urdu_chars or has_islamic_greeting: | |
detected_lang = 'ur' | |
reply_language = 'ur' | |
logger.info(f"[Voice] Overriding language detection to Urdu due to Arabic/Urdu characters or Islamic greeting") | |
logger.info(f"[Voice] Final language set to: {reply_language}") | |
except Exception as e: | |
logger.warning(f"[Voice] Language detection failed: {e}, defaulting to English") | |
reply_language = 'en' | |
# For Urdu voice notes, translate to English for processing | |
processing_text = transcribed_text | |
if reply_language == 'ur' and detected_lang == 'ur': | |
try: | |
logger.info(f"[Voice] Translating Urdu voice note to English for processing") | |
translated_text = GoogleTranslator(source='ur', target='en').translate(transcribed_text) | |
processing_text = translated_text | |
logger.info(f"[Voice] Translated to English: {translated_text}") | |
except Exception as e: | |
logger.error(f"[Voice] Translation failed: {e}") | |
# If translation fails, use original text | |
processing_text = transcribed_text | |
# Determine reply language - always respond in English or Urdu | |
if detected_lang == 'ur': | |
reply_language = 'ur' # Urdu voice notes get Urdu replies | |
else: | |
reply_language = 'en' # All other languages get English replies | |
logger.info(f"[Voice] Processing text: {processing_text}") | |
logger.info(f"[Voice] Reply language set to: {reply_language}") | |
# Check if this is a greeting in voice note (check both original and translated) | |
if is_greeting(transcribed_text) or is_greeting(processing_text): | |
logger.info(f"[Voice] Greeting detected in voice note: {transcribed_text}") | |
# Check if user is currently in AI chat mode - if so, don't trigger menu mode | |
user_context = context_manager.get_context(from_number) | |
current_state = user_context.get('current_state', 'main_menu') | |
if current_state == 'ai_chat_mode': | |
logger.info(f"[Voice] User is in AI chat mode, treating greeting as AI query instead of menu trigger") | |
# Treat greeting as a general query in AI chat mode | |
await handle_general_query_with_ai(from_number, processing_text, user_context, reply_language) | |
return | |
else: | |
# Only trigger menu mode if not in AI chat mode | |
welcome_msg = generate_veterinary_welcome_message(from_number, user_context) | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context(from_number, current_state='main_menu', current_menu='main_menu', current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values())) | |
return | |
# Process the translated text using the same strict state-based logic as text messages | |
# This ensures voice messages follow the same menu and state rules as text messages | |
await process_incoming_message(from_number, { | |
'body': processing_text, # Use translated text for processing | |
'type': 'text', | |
'reply_language': reply_language, | |
'original_transcription': transcribed_text # Keep original for context | |
}) | |
except Exception as e: | |
logger.error(f"[Voice] Error processing voice message: {e}") | |
logger.error(f"[Voice] Full error details: {str(e)}") | |
import traceback | |
logger.error(f"[Voice] Traceback: {traceback.format_exc()}") | |
send_whatsjet_message(from_number, | |
"❌ Error processing voice message. Please try a text message.") | |
# Test endpoint for WhatsJet media format debugging | |
async def test_whatsjet_media_formats(phone: str): | |
"""Test endpoint to debug WhatsJet media message formats""" | |
try: | |
test_image_url = "https://amgocus.com/uploads/images/respiraaidplus.png" | |
test_message = "🧪 *Media Format Test*\n\nTesting different WhatsJet media payload formats." | |
# Test different payload formats | |
formats = [ | |
{ | |
"name": "Format 1 - caption", | |
"payload": { | |
"phone_number": phone, | |
"caption": test_message, | |
"media_type": "image/png", | |
"media_url": test_image_url, | |
"media_filename": "test.png" | |
} | |
}, | |
{ | |
"name": "Format 2 - message_body", | |
"payload": { | |
"phone_number": phone, | |
"message_body": test_message, | |
"media_type": "image/png", | |
"media_url": test_image_url, | |
"media_filename": "test.png" | |
} | |
}, | |
{ | |
"name": "Format 3 - simplified", | |
"payload": { | |
"phone_number": phone, | |
"message_body": test_message, | |
"media_type": "image/png", | |
"media_url": test_image_url | |
} | |
}, | |
{ | |
"name": "Format 4 - different fields", | |
"payload": { | |
"phone_number": phone, | |
"caption": test_message, | |
"type": "image/png", | |
"url": test_image_url | |
} | |
} | |
] | |
results = [] | |
url = f"{WHATSJET_API_URL}/{WHATSJET_VENDOR_UID}/contact/send-message?token={WHATSJET_API_TOKEN}" | |
for format_info in formats: | |
try: | |
logger.info(f"[Test] Trying {format_info['name']}: {format_info['payload']}") | |
response = httpx.post(url, json=format_info['payload'], timeout=15) | |
result = { | |
"format": format_info['name'], | |
"status_code": response.status_code, | |
"success": response.status_code == 200, | |
"response_text": response.text[:500] if response.text else "No response text" | |
} | |
results.append(result) | |
if response.status_code == 200: | |
logger.info(f"[Test] ✅ {format_info['name']} succeeded!") | |
else: | |
logger.warning(f"[Test] ❌ {format_info['name']} failed: {response.status_code}") | |
except Exception as e: | |
result = { | |
"format": format_info['name'], | |
"status_code": "Exception", | |
"success": False, | |
"response_text": str(e) | |
} | |
results.append(result) | |
logger.error(f"[Test] Exception with {format_info['name']}: {e}") | |
return { | |
"status": "completed", | |
"phone": phone, | |
"image_url": test_image_url, | |
"results": results, | |
"timestamp": datetime.now().isoformat() | |
} | |
except Exception as e: | |
logger.error(f"Error in test WhatsJet media formats: {e}") | |
return {"error": str(e)} | |
# Test endpoint for product image URL accessibility | |
async def test_product_image_url(product_name: str = "Respira Aid Plus"): | |
"""Test endpoint to check if product image URL is accessible""" | |
try: | |
image_path = get_product_image_path(product_name) | |
if not image_path: | |
return { | |
"product_name": product_name, | |
"image_path": None, | |
"accessible": False, | |
"error": "No image path found" | |
} | |
# Test if the URL is accessible | |
try: | |
headers = { | |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', | |
'Accept': 'image/webp,image/apng,image/*,*/*;q=0.8', | |
'Accept-Language': 'en-US,en;q=0.9', | |
'Accept-Encoding': 'gzip, deflate, br', | |
'Connection': 'keep-alive' | |
} | |
response = requests.get(image_path, headers=headers, timeout=10, stream=True) | |
result = { | |
"product_name": product_name, | |
"image_path": image_path, | |
"status_code": response.status_code, | |
"accessible": response.status_code == 200, | |
"content_type": response.headers.get('content-type', 'unknown'), | |
"content_length": response.headers.get('content-length', 'unknown'), | |
"headers": dict(response.headers) | |
} | |
if response.status_code == 200: | |
logger.info(f"[Test] ✅ Product image URL is accessible: {image_path}") | |
else: | |
logger.warning(f"[Test] ❌ Product image URL not accessible: {image_path} (status: {response.status_code})") | |
return result | |
except Exception as e: | |
return { | |
"product_name": product_name, | |
"image_path": image_path, | |
"accessible": False, | |
"error": str(e) | |
} | |
except Exception as e: | |
logger.error(f"Error testing product image URL: {e}") | |
return {"error": str(e)} | |
# Test endpoint for send_product_image_with_caption function | |
async def test_send_product_image(phone: str, product_name: str = "Bromacid"): | |
""" | |
Test endpoint to test the send_product_image_with_caption function with a specific product. | |
""" | |
try: | |
# Load product from CSV | |
df = pd.read_csv('Veterinary.csv') | |
row = df[df['Product Name'].str.lower() == product_name.lower()] | |
if row.empty: | |
return {"error": f"Product '{product_name}' not found in CSV"} | |
product = row.iloc[0].to_dict() | |
user_context = context_manager.get_context(phone) | |
logger.info(f"[Test] Testing send_product_image_with_caption for product: {product_name}") | |
await send_product_image_with_caption(phone, product, user_context) | |
return { | |
"status": "sent", | |
"phone": phone, | |
"product": product_name, | |
"message": f"Product image with caption sent for {product_name}" | |
} | |
except Exception as e: | |
logger.error(f"[Test] Error testing send_product_image_with_caption: {e}") | |
return {"error": str(e)} | |
async def handle_intelligent_product_inquiry(from_number: str, query: str, user_context: dict, reply_language: str = 'en'): | |
"""Handle product inquiry with OpenAI intelligence and media support, matching WhatsApp screenshot logic""" | |
try: | |
products = get_veterinary_product_matches(query) | |
if products: | |
if len(products) > 1: | |
message = f"Certainly, here are the relevant products for your query:\n\n" | |
for i, product in enumerate(products, 1): | |
product_name = product.get('Product Name', 'Unknown') | |
category = product.get('Category', '') | |
short_desc = product.get('Type', '') or product.get('Indications', '') | |
message += f"{i}. {product_name}" | |
if category: | |
message += f" - {category}" | |
if short_desc: | |
message += f" / {short_desc}" | |
message += "\n" | |
message += (f"\nTo view detailed information about any product, reply with its number (1-{len(products)})\n" | |
"Type 'main' to return to the main menu") | |
send_whatsjet_message(from_number, message) | |
if context_manager: | |
context_manager.update_context( | |
from_number, | |
current_state='intelligent_products_menu', | |
current_menu='intelligent_products_menu', | |
current_menu_options=[str(i) for i in range(1, len(products)+1)], | |
available_products=products | |
) | |
return | |
else: | |
selected_product = products[0] | |
if context_manager: | |
context_manager.update_context( | |
from_number, | |
current_product=selected_product, | |
current_state='product_inquiry', | |
current_menu='product_inquiry', | |
current_menu_options=list(MENU_CONFIG['product_inquiry']['option_descriptions'].values()) | |
) | |
# The actual sending of product details should be handled by the caller | |
return selected_product | |
else: | |
# Simple one-liner for wrong queries | |
send_whatsjet_message(from_number, "❌ Please correct your question or type 'main' to go to main menu.") | |
except Exception as e: | |
logger.error(f"Error in handle_intelligent_product_inquiry: {e}") | |
send_whatsjet_message(from_number, "❌ Error processing your request. Type 'main' to return to the main menu.") | |
async def handle_contact_request(from_number: str): | |
"""Handle contact request""" | |
try: | |
message = ( | |
"📞 *Contact Information*\n\n" | |
"Please provide your details:\n" | |
"• Name and location\n" | |
"• Phone number\n" | |
"• Specific inquiry\n\n" | |
"💬 *Example:* Dr. Ali - Multan - Need consultation for liver disease\n\n" | |
"💬 *Type 'main' to return to the main menu.*" | |
) | |
send_whatsjet_message(from_number, message) | |
context_manager.update_context( | |
from_number, | |
current_state='contact_request', | |
current_menu='contact_request', | |
current_menu_options=['Provide contact details'] | |
) | |
except Exception as e: | |
logger.error(f"[Contact] Error handling contact request: {e}") | |
# Instead of sending a generic error, return to main menu | |
welcome_msg = generate_veterinary_welcome_message() | |
send_whatsjet_message(from_number, welcome_msg) | |
context_manager.update_context( | |
from_number, | |
current_state='main_menu', | |
current_menu='main_menu', | |
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()) | |
) | |
# Helper for restoring English terms in translations | |
def restore_english_terms(translated_text, original_text, product_names, category_names): | |
""" | |
Restore English terms (company names, product names, technical terms) in translated text. | |
This ensures that proper nouns and brand names remain in English even in Urdu responses. | |
""" | |
# Add common technical terms that should remain in English | |
technical_terms = [ | |
'EC-Immune', 'Hydropex', 'Heposel', 'Respira Aid Plus', 'Bromacid', | |
'mode of action', 'dosage', 'administration', 'composition', | |
'veterinary', 'pharmaceutical', 'supplement', 'antibiotic' | |
] | |
# Combine all terms that should remain in English | |
all_english_terms = product_names + category_names + technical_terms | |
# Process each term | |
for term in all_english_terms: | |
if term and term.strip(): | |
# Handle case variations | |
term_lower = term.lower() | |
translated_lower = translated_text.lower() | |
# If the term exists in translated text but not in original, restore it | |
if term_lower in translated_lower: | |
# Find the actual case in the translated text and replace with original | |
import re | |
pattern = re.compile(re.escape(term), re.IGNORECASE) | |
translated_text = pattern.sub(term, translated_text) | |
return translated_text | |
if __name__ == "__main__": | |
# Load products data on startup | |
load_products_data() | |
# Launch FastAPI app | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=7860) | |