chatbot / app.py
DreamStream-1's picture
Update app.py
5920a0a verified
#!/usr/bin/env python3
"""
Apex Biotical Veterinary WhatsApp Assistant - Premium Edition
The most effective and accurate veterinary Assistant in the market
"""
import os
import pandas as pd
import requests
import json
from fastapi import FastAPI, Request, Response, Form, HTTPException, File, UploadFile
from fastapi.responses import JSONResponse, HTMLResponse, FileResponse
import time
import re
from typing import List, Dict, Any, Optional, Tuple
import openai
from dotenv import load_dotenv
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
import uvicorn
from datetime import datetime, timedelta
from rapidfuzz import process, fuzz
from deep_translator import GoogleTranslator
import numpy as np
import logging
import base64
import tempfile
from reportlab.pdfgen import canvas
from reportlab.lib.pagesizes import letter, A4
from reportlab.lib.units import inch
from reportlab.lib import colors
from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle, PageBreak
from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle
from reportlab.lib.enums import TA_CENTER, TA_LEFT, TA_JUSTIFY
import io
import pathlib
from collections import defaultdict, Counter
import hashlib
import aiofiles
import asyncio
from difflib import SequenceMatcher
import httpx
import langdetect
from langdetect import detect
import threading
import shutil
# Configure advanced logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('veterinary_bot.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
# Initialize FastAPI app
app = FastAPI(title="Apex Biotical Veterinary Assistant", version="2.0.0")
# Ensure static and uploads directories exist before mounting
os.makedirs('static', exist_ok=True)
os.makedirs('uploads', exist_ok=True)
# Mount static files and templates
app.mount("/static", StaticFiles(directory="static"), name="static")
app.mount("/uploads", StaticFiles(directory="uploads"), name="uploads")
templates = Jinja2Templates(directory="templates")
# Global variables with enhanced data structures
CSV_FILE = "Veterinary.csv"
products_df = None
user_contexts = {}
last_products = {}
conversation_history = defaultdict(list)
product_analytics = defaultdict(int)
session_data = {}
# Environment variables
WHATSJET_API_URL = os.getenv("WHATSJET_API_URL")
WHATSJET_VENDOR_UID = os.getenv("WHATSJET_VENDOR_UID")
WHATSJET_API_TOKEN = os.getenv("WHATSJET_API_TOKEN")
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
SERVER_URL = os.getenv("SERVER_URL", "https://your-huggingface-space-url.hf.space")
# Initialize OpenAI client
if OPENAI_API_KEY:
openai.api_key = OPENAI_API_KEY
logger.info("✅ OpenAI client initialized successfully")
else:
logger.warning("⚠️ OpenAI API key not found - voice transcription will be disabled")
# Veterinary domain-specific constants
VETERINARY_CATEGORIES = {
'antibiotic': ['Antibiotic / Quinolone', 'Antibiotic / Respiratory Infections', 'Veterinary Injectable Solution (Antibiotic)'],
'respiratory': ['Respiratory Support', 'Respiratory / Mucolytic', 'Respiratory Support and Hygiene Enhancer'],
'liver': ['Liver & Kidney Support', 'Liver Tonic and Hepatoprotective Supplement'],
'vitamin': ['Multivitamin Supplement', 'Multivitamin Supplement for veterinary use', 'Vitamin and Amino Acid Supplement (Injectable Solution)'],
'supplement': ['Nutritional Supplement / Mycotoxins', 'Immunity Enhancer and Antioxidant Supplement'],
'mycotoxin': ['Mycotoxin Binder'],
'heat_stress': ['Heat Stress Support'],
'anticoccidial': ['Anticoccidial / Sulfonamide'],
'phytogenic': ['Phytogenic / Antibiotic Alternative']
}
VETERINARY_SYMPTOMS = {
'respiratory': ['cough', 'breathing', 'respiratory', 'bronchitis', 'pneumonia', 'crd', 'coryza', 'flu'],
'liver': ['liver', 'hepatitis', 'jaundice', 'ascites', 'fatty liver'],
'diarrhea': ['diarrhea', 'diarrhoea', 'loose stool', 'gastroenteritis'],
'stress': ['stress', 'heat stress', 'transport', 'vaccination'],
'infection': ['infection', 'bacterial', 'viral', 'fungal', 'septicemia'],
'deficiency': ['vitamin deficiency', 'mineral deficiency', 'anemia'],
'mycotoxin': ['mycotoxin', 'mold', 'fungal toxin', 'aflatoxin']
}
VETERINARY_SPECIES = {
'poultry': ['chicken', 'broiler', 'layer', 'turkey', 'duck', 'quail', 'poultry'],
'livestock': ['cattle', 'cow', 'buffalo', 'sheep', 'goat', 'livestock'],
'pet': ['dog', 'cat', 'pet', 'companion animal']
}
# Menu Configuration - Define each menu with its valid options
MENU_CONFIG = {
'main_menu': {
'name': 'Main Menu',
'valid_options': ['1', '2', '3', '4'],
'option_descriptions': {
'1': 'Search Products',
'2': 'Browse Categories',
'3': 'Download Catalog',
'4': 'Chat with Veterinary AI Assistant'
}
},
'category_selection_menu': {
'name': 'Category Selection Menu',
'valid_options': [], # Will be populated dynamically based on available categories
'option_descriptions': {}
},
'category_products_menu': {
'name': 'Category Products Menu',
'valid_options': [], # Will be populated dynamically based on available products
'option_descriptions': {}
},
'all_products_menu': {
'name': 'All Products Menu',
'valid_options': [], # Will be populated dynamically based on all products
'option_descriptions': {}
},
'intelligent_products_menu': {
'name': 'Intelligent Products Menu',
'valid_options': [], # Will be populated dynamically based on available products
'option_descriptions': {}
},
'product_inquiry': {
'name': 'Product Inquiry Menu',
'valid_options': ['1', '2', '3'],
'option_descriptions': {
'1': 'Talk to Veterinary Consultant',
'2': 'Inquire about Product Availability',
'3': 'Back to Main Menu'
}
},
'ai_chat': {
'name': 'AI Chat Mode',
'valid_options': ['main'],
'option_descriptions': {
'main': 'Return to Main Menu'
}
}
}
def validate_menu_selection(selection: str, current_state: str, user_context: dict) -> tuple[bool, str]:
"""
Validate if a selection is valid for the current menu
Returns (is_valid, error_message)
"""
if current_state not in MENU_CONFIG:
return False, f"❌ Unknown menu state: {current_state}"
menu_config = MENU_CONFIG[current_state]
valid_options = menu_config['valid_options']
# For dynamic menus, get valid options from context
if current_state == 'category_selection_menu':
available_categories = user_context.get('available_categories', [])
valid_options = [str(i) for i in range(1, len(available_categories) + 1)]
elif current_state == 'category_products_menu':
available_products = user_context.get('available_products', [])
valid_options = [str(i) for i in range(1, len(available_products) + 1)]
elif current_state == 'all_products_menu':
if products_df is not None and not products_df.empty:
valid_options = [str(i) for i in range(1, len(products_df) + 1)]
elif current_state == 'intelligent_products_menu':
available_products = user_context.get('available_products', [])
valid_options = [str(i) for i in range(1, len(available_products) + 1)]
# Check if selection is valid
if selection in valid_options:
return True, ""
# Generate error message with valid options
if valid_options:
error_msg = f"❌ Invalid selection for {menu_config['name']}. Valid options: {', '.join(valid_options)}"
else:
error_msg = f"❌ Invalid selection for {menu_config['name']}. No options available."
return False, error_msg
def get_menu_info(current_state: str, user_context: dict) -> dict:
"""
Get information about the current menu including valid options
"""
if current_state not in MENU_CONFIG:
return {"name": "Unknown Menu", "valid_options": [], "option_descriptions": {}}
menu_config = MENU_CONFIG[current_state].copy()
# For dynamic menus, populate valid options from context
if current_state == 'category_selection_menu':
available_categories = user_context.get('available_categories', [])
menu_config['valid_options'] = [str(i) for i in range(1, len(available_categories) + 1)]
menu_config['option_descriptions'] = {
str(i): category for i, category in enumerate(available_categories, 1)
}
elif current_state == 'category_products_menu':
available_products = user_context.get('available_products', [])
menu_config['valid_options'] = [str(i) for i in range(1, len(available_products) + 1)]
menu_config['option_descriptions'] = {
str(i): product.get('Product Name', f'Product {i}')
for i, product in enumerate(available_products, 1)
}
elif current_state == 'all_products_menu':
if products_df is not None and not products_df.empty:
all_products = products_df.to_dict('records')
menu_config['valid_options'] = [str(i) for i in range(1, len(all_products) + 1)]
menu_config['option_descriptions'] = {
str(i): product.get('Product Name', f'Product {i}')
for i, product in enumerate(all_products, 1)
}
elif current_state == 'intelligent_products_menu':
available_products = user_context.get('available_products', [])
menu_config['valid_options'] = [str(i) for i in range(1, len(available_products) + 1)]
menu_config['option_descriptions'] = {
str(i): product.get('Product Name', f'Product {i}')
for i, product in enumerate(available_products, 1)
}
return menu_config
# Voice processing functions
async def download_voice_file(media_url: str, filename: str) -> str:
"""Download voice file from WhatsApp"""
try:
# Create temp_voice directory if it doesn't exist
os.makedirs('temp_voice', exist_ok=True)
# Download the file
async with httpx.AsyncClient() as client:
response = await client.get(media_url)
response.raise_for_status()
file_path = os.path.join('temp_voice', filename)
with open(file_path, 'wb') as f:
f.write(response.content)
logger.info(f"Voice file downloaded: {file_path}")
return file_path
except Exception as e:
logger.error(f"Error downloading voice file: {e}")
return None
async def transcribe_voice_with_openai(file_path: str) -> str:
"""Transcribe voice file using OpenAI Whisper with comprehensive veterinary domain system prompt"""
try:
# Check if file exists and has content
if not os.path.exists(file_path):
logger.error(f"[Transcribe] File not found: {file_path}")
return None
file_size = os.path.getsize(file_path)
if file_size == 0:
logger.error(f"[Transcribe] Empty file: {file_path}")
return None
logger.info(f"[Transcribe] Transcribing file: {file_path} (size: {file_size} bytes)")
# Comprehensive system prompt for veterinary WhatsApp assistant
system_prompt = """
You are transcribing voice messages for Apex Biotical Veterinary WhatsApp Assistant. This is a professional veterinary products chatbot.
CRITICAL: TRANSCRIBE ONLY ENGLISH OR URDU SPEECH - NOTHING ELSE
IMPORTANT RULES:
1. ONLY transcribe English or Urdu speech
2. If you hear unclear audio, transcribe as English
3. If you hear mixed languages, transcribe as English
4. Never transcribe gibberish or random characters
5. If audio is unclear, transcribe as "unclear audio"
6. Keep transcriptions simple and clean
PRODUCT NAMES (exact spelling required):
- Hydropex, Respira Aid Plus, Heposel, Bromacid, Hexatox
- APMA Fort, Para C.E, Tribiotic, PHYTO-SAL, Mycopex Super
- Eflin KT-20, Salcozine ST-30, Oftilex UA-10, Biscomin 10
- Apvita Plus, B-G Aspro-C, EC-Immune, Liverpex, Symodex
- Respira Aid, Adek Gold, Immuno DX
MENU COMMANDS:
- Numbers: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
- Navigation: main, menu, back, home, start
- Options: option, number, choice, select
GREETINGS:
- English: hi, hello, hey, good morning, good afternoon, good evening
- Urdu: salam, assalamu alaikum, adaab, namaste, khuda hafiz
TRANSCRIPTION RULES:
1. Transcribe exactly what you hear in English or Urdu
2. Convert numbers to digits (one->1, two->2, etc.)
3. Preserve product names exactly
4. If unclear, transcribe as "unclear audio"
5. Keep it simple and clean
6. No random characters or mixed languages
EXAMPLES:
- "hydropex" -> "hydropex"
- "respira aid plus" -> "respira aid plus"
- "option one" -> "1"
- "main menu" -> "main"
- "salam" -> "salam"
- "search products" -> "search products"
- Unclear audio -> "unclear audio"
"""
# First attempt with comprehensive system prompt
with open(file_path, 'rb') as audio_file:
transcript = openai.Audio.transcribe(
model="whisper-1",
file=audio_file,
language="en", # Start with English
prompt=system_prompt
)
transcribed_text = transcript.text.strip()
logger.info(f"[Transcribe] Voice transcribed (English): '{transcribed_text}'")
# If first attempt failed or seems unclear, try with Urdu-specific prompt
if not transcribed_text or len(transcribed_text.strip()) < 2:
logger.warning(f"[Transcribe] First attempt failed, trying with Urdu-specific prompt")
urdu_system_prompt = """
You are transcribing Urdu voice messages for Apex Biotical Veterinary WhatsApp Assistant.
PRODUCT NAMES (Urdu/English):
- ہائیڈروپیکس (Hydropex)
- ریسپیرا ایڈ پلس (Respira Aid Plus)
- ہیپوسیل (Heposel)
- بروماسڈ (Bromacid)
- ہیکساٹوکس (Hexatox)
- اے پی ایم اے فورٹ (APMA Fort)
- پیرا سی ای (Para C.E)
- ٹرائی بیوٹک (Tribiotic)
- فائٹو سال (PHYTO-SAL)
- مائیکوپیکس سپر (Mycopex Super)
URDU NUMBERS:
- ایک (1), دو (2), تین (3), چار (4), پانچ (5)
- چھ (6), سات (7), آٹھ (8), نو (9), دس (10)
- گیارہ (11), بارہ (12), تیرہ (13), چودہ (14), پندرہ (15)
- سولہ (16), سترہ (17), اٹھارہ (18), انیس (19), بیس (20)
- اکیس (21), بائیس (22), تئیس (23)
URDU GREETINGS:
- سلام (salam), السلام علیکم (assalamu alaikum)
- آداب (adaab), نمستے (namaste), خدا حافظ (khuda hafiz)
URDU MENU COMMANDS:
- مین مینو (main menu), آپشن (option), نمبر (number)
- تلاش (search), براؤز (browse), ڈاؤن لوڈ (download)
- کیٹلاگ (catalog), رابطہ (contact), دستیابی (availability)
TRANSCRIPTION RULES:
1. Transcribe Urdu words in Urdu script
2. Convert Urdu numbers to digits
3. Handle mixed Urdu-English speech
4. Preserve product names exactly
5. Convert menu selections to numbers
"""
with open(file_path, 'rb') as audio_file:
transcript = openai.Audio.transcribe(
model="whisper-1",
file=audio_file,
language="ur", # Force Urdu
prompt=urdu_system_prompt
)
transcribed_text = transcript.text.strip()
logger.info(f"[Transcribe] Second attempt transcribed (Urdu): '{transcribed_text}'")
# Third attempt with mixed language prompt if still failing
if not transcribed_text or len(transcribed_text.strip()) < 2:
logger.warning(f"[Transcribe] Second attempt failed, trying with mixed language prompt")
mixed_system_prompt = """
You are transcribing voice messages for a veterinary products WhatsApp assistant. The user may speak in English, Urdu, or a mix of both languages.
PRODUCT NAMES (exact spelling required):
Hydropex, Respira Aid Plus, Heposel, Bromacid, Hexatox, APMA Fort, Para C.E, Tribiotic, PHYTO-SAL, Mycopex Super, Eflin KT-20, Salcozine ST-30, Oftilex UA-10, Biscomin 10, Apvita Plus, B-G Aspro-C, EC-Immune, Liverpex, Symodex, Respira Aid, Adek Gold, Immuno DX
NUMBERS (convert to digits):
English: one->1, two->2, three->3, etc.
Urdu: aik->1, ek->1, do->2, teen->3, etc.
MENU COMMANDS:
main, menu, back, home, start, option, number, search, browse, download, catalog, contact, availability
GREETINGS:
hi, hello, salam, assalamu alaikum, adaab, namaste
TRANSCRIPTION RULES:
1. Transcribe exactly what you hear
2. Convert numbers to digits
3. Preserve product names exactly
4. Handle both languages
5. Convert menu selections to numbers
"""
with open(file_path, 'rb') as audio_file:
transcript = openai.Audio.transcribe(
model="whisper-1",
file=audio_file,
prompt=mixed_system_prompt
)
transcribed_text = transcript.text.strip()
logger.info(f"[Transcribe] Third attempt (mixed) transcribed: '{transcribed_text}'")
# Final check for empty transcription or unclear audio
if not transcribed_text or len(transcribed_text.strip()) < 2:
logger.warning(f"[Transcribe] Very short or empty transcription: '{transcribed_text}'")
return "unclear audio"
# Check for gibberish or mixed characters
if len(transcribed_text) > 10 and not re.search(r'[a-zA-Z\u0600-\u06FF]', transcribed_text):
logger.warning(f"[Transcribe] Gibberish detected: '{transcribed_text}'")
return "unclear audio"
# Check for too many special characters
special_char_ratio = len(re.findall(r'[^\w\s]', transcribed_text)) / len(transcribed_text)
if special_char_ratio > 0.3:
logger.warning(f"[Transcribe] Too many special characters: '{transcribed_text}'")
return "unclear audio"
return transcribed_text
except Exception as e:
logger.error(f"[Transcribe] Error transcribing voice: {e}")
logger.error(f"[Transcribe] File path: {file_path}")
return None
def process_voice_input(text: str) -> str:
"""Process and clean voice input text with veterinary domain-specific transcription error correction"""
if not text:
return ""
# Clean the text
processed_text = text.strip()
# Remove extra whitespace
processed_text = re.sub(r'\s+', ' ', processed_text)
# Basic punctuation cleanup
processed_text = processed_text.replace(' ,', ',').replace(' .', '.')
# Veterinary domain-specific transcription error corrections
transcription_fixes = {
# Common menu selection errors
'opium': 'option',
'opium numara': 'option number',
'opium number': 'option number',
'opium number one': 'option number one',
'opium number two': 'option number two',
'opium number three': 'option number three',
'opium one': 'option one',
'opium two': 'option two',
'opium three': 'option three',
'numara': 'number',
'numbara': 'number',
'numbra': 'number',
'numbra one': 'number one',
'numbra two': 'number two',
'numbra three': 'number three',
'numbra 1': 'number 1',
'numbra 2': 'number 2',
'numbra 3': 'number 3',
# Number fixes - only when they appear as standalone numbers
'aik': '1',
'ek': '1',
'do': '2',
'teen': '3',
'char': '4',
'panch': '5',
'che': '3',
'tree': '3',
'free': '3',
'for': '4',
'fiv': '5',
'sik': '6',
'sat': '7',
'ath': '8',
'nau': '9',
'das': '10',
# Navigation command fixes
'man': 'main',
'men': 'main',
'mean': 'main',
'mein': 'main',
'maine': 'main',
'menu': 'main',
'home': 'main',
'back': 'main',
'return': 'main',
# Veterinary product name corrections
'hydro pex': 'hydropex',
'hydro pex': 'hydropex',
'respira aid': 'respira aid plus',
'respira aid plus': 'respira aid plus',
'hepo sel': 'heposel',
'brom acid': 'bromacid',
'hexa tox': 'hexatox',
'apma fort': 'apma fort',
'para c': 'para c.e',
'para ce': 'para c.e',
'tribiotic': 'tribiotic',
'phyto sal': 'phyto-sal',
'mycopex': 'mycopex super',
'mycopex super': 'mycopex super',
'eflin': 'eflin kt-20',
'salcozine': 'salcozine st-30',
'oftilex': 'oftilex ua-10',
'biscomin': 'biscomin 10',
'apvita': 'apvita plus',
'bg aspro': 'b-g aspro-c',
'ec immune': 'ec-immune',
'liverpex': 'liverpex',
'symodex': 'symodex',
'adek': 'adek gold',
'immuno': 'immuno dx'
}
# Apply transcription fixes - but be careful with Islamic greetings
original_text = processed_text.lower()
# Special handling for Islamic greetings - don't change "aik" in "assalamu alaikum"
if 'assalamu alaikum' in original_text or 'assalam' in original_text:
# Don't apply number fixes to Islamic greetings
for wrong, correct in transcription_fixes.items():
if wrong in original_text and wrong not in ['aik', 'ek']: # Skip number fixes for greetings
processed_text = processed_text.lower().replace(wrong, correct)
logger.info(f"Fixed transcription error: '{wrong}' -> '{correct}' in '{text}'")
else:
# Apply all fixes for non-greeting text
for wrong, correct in transcription_fixes.items():
if wrong in original_text:
processed_text = processed_text.lower().replace(wrong, correct)
logger.info(f"Fixed transcription error: '{wrong}' -> '{correct}' in '{text}'")
logger.info(f"Voice input processed: '{text}' -> '{processed_text}'")
return processed_text
# Note: Voice messages are now processed exactly like text messages
# The transcribed voice text is passed directly to process_incoming_message
# This ensures consistent behavior between voice and text inputs
# Enhanced product search with veterinary domain expertise
def get_veterinary_product_matches(query: str) -> List[Dict[str, Any]]:
"""
Advanced veterinary product matching with domain-specific intelligence
"""
if not query:
return []
if products_df is None:
load_products_data()
normalized_query = normalize(query).lower().strip()
logger.info(f"[Veterinary Search] Searching for: '{normalized_query}'")
# Skip very short queries that are likely menu selections
if len(normalized_query) <= 2 and normalized_query.isdigit():
logger.info(f"[Veterinary Search] Skipping menu selection: '{normalized_query}'")
return []
scored_matches = []
# Veterinary-specific query expansion
expanded_queries = [normalized_query]
# Expand by symptoms
for symptom_category, symptoms in VETERINARY_SYMPTOMS.items():
if any(symptom in normalized_query for symptom in symptoms):
expanded_queries.extend(symptoms)
# Expand by species
for species_category, species in VETERINARY_SPECIES.items():
if any(sp in normalized_query for sp in species):
expanded_queries.extend(species)
# Expand by category
for category_key, categories in VETERINARY_CATEGORIES.items():
if category_key in normalized_query:
expanded_queries.extend(categories)
# Common veterinary product variations
veterinary_variations = {
'hydropex': ['hydropex', 'hydro pex', 'electrolyte', 'dehydration', 'heat stress'],
'heposel': ['heposel', 'hepo sel', 'liver tonic', 'hepatoprotective'],
'bromacid': ['bromacid', 'brom acid', 'respiratory', 'mucolytic'],
'respira aid': ['respira aid', 'respira aid plus', 'respiratory support'],
'hexatox': ['hexatox', 'hexa tox', 'liver support', 'kidney support'],
'apma fort': ['apma fort', 'mycotoxin', 'liver support'],
'para c': ['para c', 'para c.e', 'heat stress', 'paracetamol'],
'tribiotic': ['tribiotic', 'antibiotic', 'respiratory infection'],
'phyto-sal': ['phyto-sal', 'phytogenic', 'vitamin supplement'],
'mycopex': ['mycopex', 'mycotoxin binder', 'mold'],
'oftilex': ['oftilex', 'ofloxacin', 'antibiotic'],
'biscomin': ['biscomin', 'oxytetracycline', 'injectable'],
'apvita': ['apvita', 'vitamin b', 'amino acid'],
'bg aspro': ['bg aspro', 'aspirin', 'vitamin c'],
'ec-immune': ['ec-immune', 'immune', 'immunity'],
'liverpex': ['liverpex', 'liver', 'metabolic'],
'symodex': ['symodex', 'multivitamin', 'vitamin'],
'adek gold': ['adek gold', 'vitamin', 'multivitamin'],
'immuno dx': ['immuno dx', 'immune', 'antioxidant']
}
# Add veterinary variations
for key, variations in veterinary_variations.items():
if key in normalized_query:
expanded_queries.extend(variations)
for _, row in products_df.iterrows():
best_score = 0
best_match_type = ""
match_details = {}
# Search across all relevant fields with veterinary weighting
search_fields = [
('Product Name', row.get('Product Name', ''), 1.0),
('Category', row.get('Category', ''), 0.8),
('Indications', row.get('Indications', ''), 0.9),
('Target Species', row.get('Target Species', ''), 0.7),
('Type', row.get('Type', ''), 0.6),
('Composition', row.get('Composition', ''), 0.5)
]
for field_name, field_value, weight in search_fields:
if pd.isna(field_value) or not field_value:
continue
field_str = str(field_value).lower()
# Exact matches (highest priority)
for expanded_query in expanded_queries:
if expanded_query in field_str or field_str in expanded_query:
score = 100 * weight
if score > best_score:
best_score = score
best_match_type = "exact"
match_details = {"field": field_name, "query": expanded_query}
# Fuzzy matching for close matches
for expanded_query in expanded_queries:
if len(expanded_query) > 3: # Only fuzzy match longer queries
score = fuzz.partial_ratio(normalized_query, field_str) * weight
if score > best_score and score > 70:
best_score = score
best_match_type = "fuzzy"
match_details = {"field": field_name, "query": expanded_query}
if best_score > 70:
product_dict = row.to_dict()
product_dict['_score'] = best_score
product_dict['_match_type'] = best_match_type
product_dict['_match_details'] = match_details
scored_matches.append(product_dict)
scored_matches.sort(key=lambda x: x['_score'], reverse=True)
# Remove duplicates based on product name
seen_names = set()
unique_matches = []
for match in scored_matches:
if match['Product Name'] not in seen_names:
seen_names.add(match['Product Name'])
unique_matches.append(match)
return unique_matches
def normalize(text: str) -> str:
"""Normalize text for search"""
if not text:
return ""
# Convert to lowercase and remove extra whitespace
normalized = text.lower().strip()
# Remove special characters but keep spaces
normalized = re.sub(r'[^\w\s]', '', normalized)
# Replace multiple spaces with single space
normalized = re.sub(r'\s+', ' ', normalized)
return normalized
# Enhanced context management with veterinary domain awareness
class VeterinaryContextManager:
def __init__(self):
self.user_contexts = {}
self.conversation_history = defaultdict(list)
self.product_analytics = defaultdict(int)
self.session_data = {}
def get_context(self, phone_number: str) -> Dict[str, Any]:
"""Get or create user context with veterinary domain awareness"""
if phone_number not in self.user_contexts:
self.user_contexts[phone_number] = {
"current_state": "main_menu",
"current_menu": "main_menu",
"current_menu_options": ["Search Veterinary Products", "Browse Categories", "Download Catalog"],
"current_product": None,
"current_category": None,
"search_history": [],
"product_interests": [],
"species_preference": None,
"symptom_context": None,
"last_interaction": datetime.now(),
"session_start": datetime.now(),
"interaction_count": 0,
"last_message": "",
"available_categories": [],
"available_products": []
}
return self.user_contexts[phone_number]
def update_context(self, phone_number: str, **kwargs):
"""Update user context with veterinary domain data"""
context = self.get_context(phone_number)
context.update(kwargs)
context["last_interaction"] = datetime.now()
context["interaction_count"] += 1
# Track product interests for recommendations
if "current_product" in kwargs and kwargs["current_product"]:
product_name = kwargs["current_product"].get("Product Name", "")
if product_name:
context["product_interests"].append(product_name)
self.product_analytics[product_name] += 1
def add_to_history(self, phone_number: str, message: str, response: str):
"""Add interaction to conversation history"""
self.conversation_history[phone_number].append({
"timestamp": datetime.now(),
"user_message": message,
"bot_response": response
})
# Keep only last 20 interactions
if len(self.conversation_history[phone_number]) > 20:
self.conversation_history[phone_number] = self.conversation_history[phone_number][-20:]
def get_recommendations(self, phone_number: str) -> List[Dict[str, Any]]:
"""Get personalized product recommendations based on user history"""
context = self.get_context(phone_number)
recommendations = []
# Recommend based on product interests
if context["product_interests"]:
for product_name in context["product_interests"][-3:]: # Last 3 products
products = get_veterinary_product_matches(product_name)
if products:
# Find related products in same category
category = products[0].get("Category", "")
if category:
category_products = get_products_by_category(category)
for product in category_products[:3]:
if product.get("Product Name") != product_name:
recommendations.append(product)
# Remove duplicates and limit
seen = set()
unique_recommendations = []
for rec in recommendations:
name = rec.get("Product Name", "")
if name and name not in seen:
seen.add(name)
unique_recommendations.append(rec)
return unique_recommendations[:5]
# Initialize context manager
context_manager = VeterinaryContextManager()
# Enhanced product response with veterinary domain expertise
def generate_veterinary_product_response(product_info: Dict[str, Any], user_context: Dict[str, Any]) -> str:
"""Generate comprehensive veterinary product response with intelligent information handling"""
def clean_text(text):
if pd.isna(text) or text is None:
return "Not specified"
return str(text).strip()
# Extract product details
product_name = clean_text(product_info.get('Product Name', ''))
product_type = clean_text(product_info.get('Type', ''))
category = clean_text(product_info.get('Category', ''))
indications = clean_text(product_info.get('Indications', ''))
# Check for PDF link in the CSV data
pdf_link = ""
try:
# Load CSV data to check for PDF link
csv_data = pd.read_csv('Veterinary.csv')
product_row = csv_data[csv_data['Product Name'] == product_name]
if not product_row.empty:
brochure_link = product_row.iloc[0].get('Brochure (PDF)', '')
if pd.notna(brochure_link) and brochure_link.strip():
pdf_link = brochure_link.strip()
except Exception as e:
logger.warning(f"Error checking PDF link for {product_name}: {e}")
# Build the response
response = f"""🧪 *Name:* {product_name}
📦 *Type:* {product_type}
🏥 *Category:* {category}
💊 *Used For:* {indications}"""
# Add PDF link if available, in the requested format
if pdf_link:
response += f"\n\n📄 Product Brochure Available\n🔗 {product_name} PDF:\n{pdf_link}"
# Add menu options
response += f"""
💬 *Available Actions:*
1️⃣ Talk to Veterinary Consultant
2️⃣ Inquire About Availability
3️⃣ Back to Main Menu
💬 Select an option or ask about related products"""
return response
def clean_text_for_pdf(text: str) -> str:
"""Clean text for PDF generation"""
if pd.isna(text) or text is None:
return "N/A"
cleaned = str(text)
# Remove or replace problematic characters for PDF
cleaned = cleaned.replace('â€"', '-').replace('â€"', '"').replace('’', "'")
cleaned = cleaned.replace('“', '"').replace('â€', '"').replace('…', '...')
cleaned = re.sub(r'[^\w\s\-.,()%:;]', '', cleaned)
return cleaned.strip()
# Enhanced PDF generation with veterinary domain expertise
def generate_veterinary_pdf(product: Dict[str, Any]) -> bytes:
"""
Generate comprehensive veterinary PDF with professional formatting
"""
buffer = io.BytesIO()
doc = SimpleDocTemplate(buffer, pagesize=A4)
styles = getSampleStyleSheet()
# Veterinary-specific styles
title_style = ParagraphStyle(
'VeterinaryTitle',
parent=styles['Heading1'],
fontSize=18,
spaceAfter=25,
alignment=TA_CENTER,
textColor=colors.darkblue,
fontName='Helvetica-Bold'
)
heading_style = ParagraphStyle(
'VeterinaryHeading',
parent=styles['Heading2'],
fontSize=14,
spaceAfter=12,
textColor=colors.darkgreen,
fontName='Helvetica-Bold'
)
normal_style = ParagraphStyle(
'VeterinaryNormal',
parent=styles['Normal'],
fontSize=11,
spaceAfter=8,
alignment=TA_JUSTIFY,
fontName='Helvetica'
)
# Build PDF content
story = []
# Header with veterinary branding
story.append(Paragraph("🏥 APEX BIOTICAL VETERINARY PRODUCTS", title_style))
story.append(Spacer(1, 20))
# Product information
product_name = clean_text_for_pdf(product.get('Product Name', 'Unknown Product'))
story.append(Paragraph(f"<b>Product: {product_name}</b>", heading_style))
story.append(Spacer(1, 15))
# Clinical information table
clinical_info = [
['Field', 'Information'],
['Product Name', clean_text_for_pdf(product.get('Product Name', 'N/A'))],
['Category', clean_text_for_pdf(product.get('Category', 'N/A'))],
['Target Species', clean_text_for_pdf(product.get('Target Species', 'N/A'))],
['Product Type', clean_text_for_pdf(product.get('Type', 'N/A'))]
]
clinical_table = Table(clinical_info, colWidths=[2*inch, 4*inch])
clinical_table.setStyle(TableStyle([
('BACKGROUND', (0, 0), (-1, 0), colors.darkblue),
('TEXTCOLOR', (0, 0), (-1, 0), colors.whitesmoke),
('ALIGN', (0, 0), (-1, -1), 'LEFT'),
('FONTNAME', (0, 0), (-1, 0), 'Helvetica-Bold'),
('FONTSIZE', (0, 0), (-1, 0), 12),
('BOTTOMPADDING', (0, 0), (-1, 0), 12),
('BACKGROUND', (0, 1), (-1, -1), colors.lightblue),
('GRID', (0, 0), (-1, -1), 1, colors.black)
]))
story.append(Paragraph("Clinical Information", heading_style))
story.append(clinical_table)
story.append(Spacer(1, 20))
# Clinical details
if product.get('Indications'):
story.append(Paragraph("Clinical Indications", heading_style))
story.append(Paragraph(clean_text_for_pdf(product.get('Indications')), normal_style))
story.append(Spacer(1, 15))
if product.get('Composition'):
story.append(Paragraph("Composition", heading_style))
story.append(Paragraph(clean_text_for_pdf(product.get('Composition')), normal_style))
story.append(Spacer(1, 15))
if product.get('Dosage & Administration'):
story.append(Paragraph("Dosage & Administration", heading_style))
story.append(Paragraph(clean_text_for_pdf(product.get('Dosage & Administration')), normal_style))
story.append(Spacer(1, 15))
if product.get('Precautions'):
story.append(Paragraph("Precautions", heading_style))
story.append(Paragraph(clean_text_for_pdf(product.get('Precautions')), normal_style))
story.append(Spacer(1, 15))
if product.get('Storage'):
story.append(Paragraph("Storage", heading_style))
story.append(Paragraph(clean_text_for_pdf(product.get('Storage')), normal_style))
story.append(Spacer(1, 15))
# Veterinary disclaimer
story.append(Paragraph("Veterinary Disclaimer", heading_style))
disclaimer_text = (
"This product should be used under veterinary supervision. "
"Always consult with a qualified veterinarian before administration. "
"Follow dosage instructions precisely and monitor animal response. "
"Store according to manufacturer guidelines and keep out of reach of children."
)
story.append(Paragraph(disclaimer_text, normal_style))
# Build PDF
doc.build(story)
buffer.seek(0)
return buffer.getvalue()
async def send_catalog_pdf(phone_number: str):
"""Send the complete product catalog as a link to the PDF"""
try:
# Use the correct Google Drive link converted to direct download format
catalog_url = "https://drive.google.com/uc?export=download&id=1mxpkFf3DY-n3QHzZBe_CdksR-gHu2f_0"
message = (
"📋 *Apex Biotical Veterinary Products Catalog*\n\n"
"📄 Here's your complete product catalog with all our veterinary products:\n"
f"📎 [Apex Biotical Veterinary Products Catalog.pdf]({catalog_url})\n\n"
"💬 For detailed information about any specific product, type its name or contact our sales team.\n\n"
"Type main at any time to return to the main menu."
)
send_whatsjet_message(phone_number, message)
except Exception as e:
logger.error(f"Error sending catalog: {e}")
send_whatsjet_message(phone_number,
"❌ Error sending catalog. Please try again or contact our sales team for assistance.")
async def send_individual_product_pdf(phone_number: str, product: Dict[str, Any]):
"""Send individual product PDF with download link"""
try:
# Generate PDF for the product
pdf_content = generate_veterinary_pdf(product)
# Create filename
product_name = product.get('Product Name', 'Unknown_Product')
safe_name = re.sub(r'[^\w\s-]', '', product_name).replace(' ', '_')
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{safe_name}_{timestamp}.pdf"
# Save PDF to uploads directory
uploads_dir = "../uploads"
os.makedirs(uploads_dir, exist_ok=True)
pdf_path = os.path.join(uploads_dir, filename)
with open(pdf_path, 'wb') as f:
f.write(pdf_content)
# Generate download URL
base_url = os.getenv("PUBLIC_BASE_URL", "http://localhost:8000")
download_url = f"{base_url}/uploads/{filename}"
# Send PDF via WhatsApp media
success = send_whatsjet_message(
phone_number,
f"📄 *{product_name} - Product Information*\n\nHere's the detailed product information in PDF format.",
media_type="application/pdf",
media_path=pdf_path,
filename=filename
)
# Also send direct download link as backup
if success:
message = (
f"📄 *{product_name} - Product Information*\n\n"
"📎 [Direct Download Link]({download_url})\n\n"
"💬 *If the PDF didn't download, use the link above*\n"
"Type 'main' to return to main menu."
)
send_whatsjet_message(phone_number, message)
else:
# If media send failed, send only the link
message = (
f"📄 *{product_name} - Product Information*\n\n"
"📎 [Download Product PDF]({download_url})\n\n"
"💬 *Click the link above to download the product information*\n"
"Type 'main' to return to main menu."
)
send_whatsjet_message(phone_number, message)
except Exception as e:
logger.error(f"Error sending individual product PDF: {e}")
send_whatsjet_message(phone_number,
"❌ Error generating product PDF. Please try again or contact our sales team for assistance.")
# --- WhatsJet Message Sending ---
def split_message_for_whatsapp(message: str, max_length: int = 1000) -> list:
"""Split a long message into chunks for WhatsApp (max 1000 chars per message)."""
return [message[i:i+max_length] for i in range(0, len(message), max_length)]
def send_whatsjet_message(phone_number: str, message: str, media_type: str = None, media_path: str = None, filename: str = None) -> bool:
"""Send a message using WhatsJet API with optional media attachment or public URL"""
if not all([WHATSJET_API_URL, WHATSJET_VENDOR_UID, WHATSJET_API_TOKEN]):
logger.error("[WhatsJet] Missing environment variables.")
return False
url = f"{WHATSJET_API_URL}/{WHATSJET_VENDOR_UID}/contact/send-message?token={WHATSJET_API_TOKEN}"
# Handle media messages (local file or public URL)
if media_type and media_path:
# If media_path is a public URL, use media_url and send caption
if isinstance(media_path, str) and media_path.startswith("http"):
# Try different payload formats for WhatsJet API
payload_formats = [
# Format 1: Using caption field
{
"phone_number": phone_number,
"caption": message,
"media_type": media_type,
"media_url": media_path,
"media_filename": filename or os.path.basename(media_path)
},
# Format 2: Using message_body instead of caption
{
"phone_number": phone_number,
"message_body": message,
"media_type": media_type,
"media_url": media_path,
"media_filename": filename or os.path.basename(media_path)
},
# Format 3: Simplified format without media_filename
{
"phone_number": phone_number,
"message_body": message,
"media_type": media_type,
"media_url": media_path
},
# Format 4: Using different field names
{
"phone_number": phone_number,
"caption": message,
"type": media_type,
"url": media_path
}
]
for i, payload in enumerate(payload_formats, 1):
try:
logger.info(f"[WhatsJet] Trying payload format {i}: {payload}")
response = httpx.post(
url,
json=payload,
timeout=15
)
if response.status_code == 200:
logger.info(f"[WhatsJet] Media URL message sent successfully with format {i} to {phone_number}")
return True
else:
logger.warning(f"[WhatsJet] Format {i} failed with status {response.status_code}: {response.text[:200]}")
except Exception as e:
logger.warning(f"[WhatsJet] Format {i} exception: {e}")
continue
# If all formats failed, log the error and return False
logger.error(f"[WhatsJet] All media URL payload formats failed for {phone_number}")
return False
else:
# Local file logic as before
try:
with open(media_path, 'rb') as f:
media_content = f.read()
media_b64 = base64.b64encode(media_content).decode('utf-8')
payload = {
"phone_number": phone_number,
"message_body": message,
'media_type': media_type,
'media_content': media_b64,
'media_filename': filename or os.path.basename(media_path)
}
try:
response = httpx.post(
url,
json=payload,
timeout=15
)
response.raise_for_status()
logger.info(f"[WhatsJet] Media message sent successfully to {phone_number}")
return True
except Exception as e:
logger.error(f"[WhatsJet] Exception sending media message: {e}")
return False
except Exception as e:
logger.error(f"[WhatsJet] Exception preparing media message: {str(e)}")
return False
# Handle text messages
if not message.strip():
return True # Don't send empty messages
for chunk in split_message_for_whatsapp(message):
try:
payload = {"phone_number": phone_number, "message_body": chunk}
try:
response = httpx.post(
url,
json=payload,
timeout=15
)
response.raise_for_status()
logger.info(f"[WhatsJet] Text chunk sent successfully to {phone_number}")
except Exception as e:
logger.error(f"[WhatsJet] Exception sending text chunk: {e}")
return False
except Exception as e:
logger.error(f"[WhatsJet] Exception preparing text chunk: {str(e)}")
return False
logger.info(f"[WhatsJet] Successfully sent complete text message to {phone_number}")
return True
def send_whatsjet_media_image_only(phone_number: str, image_url: str, filename: str = None) -> bool:
"""Send an image with optional caption using WhatsJet's /contact/send-media-message endpoint."""
if not all([WHATSJET_API_URL, WHATSJET_VENDOR_UID, WHATSJET_API_TOKEN]):
logger.error("[WhatsJet] Missing environment variables for media message.")
return False
url = f"{WHATSJET_API_URL}/{WHATSJET_VENDOR_UID}/contact/send-media-message"
headers = {
"Authorization": f"Bearer {WHATSJET_API_TOKEN}",
"Content-Type": "application/json"
}
payload = {
"phone_number": phone_number,
"media_type": "image",
"media_url": image_url
}
if filename:
payload["file_name"] = filename
try:
logger.info(f"[WhatsJet] Sending image with payload: {payload}")
response = httpx.post(url, json=payload, headers=headers, timeout=30)
logger.info(f"[WhatsJet] Image response status: {response.status_code}")
logger.info(f"[WhatsJet] Image response body: {response.text[:500]}...")
if response.status_code == 200:
logger.info(f"[WhatsJet] Image sent successfully to {phone_number}")
return True
else:
logger.error(f"[WhatsJet] Failed to send image: {response.status_code} - {response.text}")
return False
except Exception as e:
logger.error(f"[WhatsJet] Exception sending image: {e}")
return False
# --- Health Check Endpoint ---
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"products_loaded": len(products_df) if products_df is not None else 0,
"openai_available": bool(OPENAI_API_KEY),
"whatsjet_configured": bool(all([WHATSJET_API_URL, WHATSJET_VENDOR_UID, WHATSJET_API_TOKEN]))
}
@app.get("/test-voice")
async def test_voice():
"""Test endpoint to check voice processing logic"""
return {
"voice_detection": {
"audio_type": "audio" in ['audio', 'voice'],
"voice_type": "voice" in ['audio', 'voice'],
"media_audio": {'type': 'audio'}.get('type') == 'audio'
},
"openai_available": bool(OPENAI_API_KEY),
"langdetect_available": True,
"deep_translator_available": True
}
@app.get("/catalog")
async def get_catalog():
"""Serve the complete product catalog PDF"""
try:
catalog_path = "static/Hydropex.pdf"
if os.path.exists(catalog_path):
return FileResponse(
catalog_path,
media_type="application/pdf",
filename="Apex_Biotical_Veterinary_Catalog.pdf"
)
else:
raise HTTPException(status_code=404, detail="Catalog PDF not found")
except Exception as e:
logger.error(f"Error serving catalog: {e}")
raise HTTPException(status_code=500, detail="Error serving catalog")
@app.get("/", response_class=HTMLResponse)
async def root():
return """
<h2>Apex Biotical Veterinary WhatsApp Assistant</h2>
<p>The Assistant is running! Use the API endpoints for WhatsApp integration.</p>
<ul>
<h2>Apex Biotical Veterinary WhatsApp Bot</h2>
<p>The bot is running! Use the API endpoints for WhatsApp integration.</p>
<ul>
<li><b>POST /webhook</b> – WhatsApp webhook endpoint</li>
<li><b>GET /health</b> – Health check</li>
<li><b>GET /catalog</b> – Download product catalog PDF</li>
</ul>
"""
# --- Webhook Endpoint for WhatsApp/WhatsJet ---
@app.post("/webhook")
async def webhook(request: Request):
"""Handle incoming WhatsApp/WhatsJet webhook messages"""
try:
data = await request.json()
logger.info(f"[Webhook] Incoming data: {data}")
# WhatsJet/Custom format
if isinstance(data, dict) and 'contact' in data and 'message' in data:
from_number = str(data['contact'].get('phone_number', '')).replace('+', '').replace(' ', '')
msg = data['message']
# Robust media type extraction
media = msg.get('media', {}) if isinstance(msg, dict) else {}
media_type = None
if isinstance(media, dict):
media_type = media.get('type')
# If media is a list or None, media_type stays None
# Check for voice/audio messages first (they might not have body)
if isinstance(msg, dict) and (msg.get('type') in ['audio', 'voice'] or media_type == 'audio'):
logger.info(f"[Webhook] Processing voice message from {from_number}")
await process_incoming_message(from_number, msg)
return Response(status_code=200)
# Ignore status updates and messages without body (only for non-voice messages)
if not isinstance(msg, dict) or msg.get('body') is None:
return Response(status_code=200)
# Ignore specific status updates
if msg.get('status') in ['delivered', 'sent', 'read', 'failed']:
return Response(status_code=200)
# Process actual message
await process_incoming_message(from_number, msg)
return Response(status_code=200)
# WhatsApp Cloud API format
if isinstance(data, dict) and 'entry' in data and isinstance(data['entry'], list):
for entry in data['entry']:
if not isinstance(entry, dict):
logger.error(f"[Webhook] entry is not a dict: {type(entry)}")
continue
changes = entry.get('changes', [])
if not isinstance(changes, list):
logger.error(f"[Webhook] changes is not a list: {type(changes)}")
continue
for change in changes:
if not isinstance(change, dict):
logger.error(f"[Webhook] change is not a dict: {type(change)}")
continue
value = change.get('value', {})
if not isinstance(value, dict):
logger.error(f"[Webhook] value is not a dict: {type(value)}")
continue
messages = value.get('messages', [])
if not isinstance(messages, list):
logger.error(f"[Webhook] messages is not a list: {type(messages)}")
continue
for message in messages:
if not isinstance(message, dict):
logger.error(f"[Webhook] message is not a dict: {type(message)}")
continue
from_number = message.get('from', '')
# Ignore status updates
if message.get('type') == 'status':
continue
# Convert WhatsApp format to our format
msg = {
'body': message.get('text', {}).get('body', ''),
'type': message.get('type', 'text'),
'media': message.get('audio') or message.get('voice') or message.get('image') or message.get('document')
}
await process_incoming_message(from_number, msg)
return Response(status_code=200)
logger.warning(f"[Webhook] Unrecognized or malformed payload format: {type(data)}")
return Response(status_code=400)
except Exception as e:
logger.error(f"[Webhook] Error: {e}")
import traceback
logger.error(f"[Webhook] Traceback: {traceback.format_exc()}")
return Response(status_code=500)
def map_spoken_number_to_digit(text: str) -> str:
"""
Enhanced number mapping for voice input - supports both English and Urdu number systems
Handles various transcription errors and number formats
"""
if not text:
return ""
# Clean and normalize the text
text_lower = text.lower().strip()
text_clean = re.sub(r'[^\w\s]', '', text_lower)
# Comprehensive English number mappings
english_numbers = {
# Basic numbers
'one': '1', 'two': '2', 'three': '3', 'four': '4', 'five': '5',
'six': '6', 'seven': '7', 'eight': '8', 'nine': '9', 'ten': '10',
'eleven': '11', 'twelve': '12', 'thirteen': '13', 'fourteen': '14', 'fifteen': '15',
'sixteen': '16', 'seventeen': '17', 'eighteen': '18', 'nineteen': '19', 'twenty': '20',
'twenty one': '21', 'twenty two': '22', 'twenty three': '23',
# Common transcription errors
'won': '1', 'to': '2', 'too': '2', 'tree': '3', 'free': '3', 'for': '4', 'fiv': '5',
'sik': '6', 'sat': '7', 'ath': '8', 'nau': '9', 'das': '10',
'che': '3', 'fir': '4', 'fiv': '5', 'sik': '6', 'sat': '7', 'ath': '8', 'nau': '9',
# Ordinal numbers
'first': '1', 'second': '2', 'third': '3', 'fourth': '4', 'fifth': '5',
'sixth': '6', 'seventh': '7', 'eighth': '8', 'ninth': '9', 'tenth': '10',
# Menu variations
'option one': '1', 'option two': '2', 'option three': '3', 'option four': '4', 'option five': '5',
'number one': '1', 'number two': '2', 'number three': '3', 'number four': '4', 'number five': '5',
'menu one': '1', 'menu two': '2', 'menu three': '3', 'menu four': '4', 'menu five': '5',
'choice one': '1', 'choice two': '2', 'choice three': '3', 'choice four': '4', 'choice five': '5',
# Common transcription errors for menu selections
'opium one': '1', 'opium two': '2', 'opium three': '3', 'opium four': '4', 'opium five': '5',
'opium numara one': '1', 'opium numara two': '2', 'opium numara three': '3',
'opium number one': '1', 'opium number two': '2', 'opium number three': '3',
'opium number 1': '1', 'opium number 2': '2', 'opium number 3': '3',
# Direct digits
'1': '1', '2': '2', '3': '3', '4': '4', '5': '5', '6': '6', '7': '7', '8': '8', '9': '9', '10': '10',
'11': '11', '12': '12', '13': '13', '14': '14', '15': '15', '16': '16', '17': '17', '18': '18', '19': '19', '20': '20',
'21': '21', '22': '22', '23': '23'
}
# Comprehensive Urdu number mappings (Roman Urdu and Urdu script)
urdu_numbers = {
# Roman Urdu numbers
'aik': '1', 'ek': '1', 'do': '2', 'teen': '3', 'char': '4', 'panch': '5',
'che': '6', 'sat': '7', 'ath': '8', 'nau': '9', 'das': '10',
'gyara': '11', 'bara': '12', 'tera': '13', 'choda': '14', 'pandra': '15',
'sola': '16', 'satara': '17', 'athara': '18', 'unnees': '19', 'bees': '20',
'ikkees': '21', 'baees': '22', 'tees': '23',
# Urdu script numbers
'ایک': '1', 'دو': '2', 'تین': '3', 'چار': '4', 'پانچ': '5',
'چھ': '6', 'سات': '7', 'آٹھ': '8', 'نو': '9', 'دس': '10',
'گیارہ': '11', 'بارہ': '12', 'تیرہ': '13', 'چودہ': '14', 'پندرہ': '15',
'سولہ': '16', 'سترہ': '17', 'اٹھارہ': '18', 'انیس': '19', 'بیس': '20',
'اکیس': '21', 'بائیس': '22', 'تئیس': '23',
# Menu variations in Urdu
'نمبر ایک': '1', 'نمبر دو': '2', 'نمبر تین': '3', 'نمبر چار': '4', 'نمبر پانچ': '5',
'آپشن ایک': '1', 'آپشن دو': '2', 'آپشن تین': '3', 'آپشن چار': '4', 'آپشن پانچ': '5',
'اختیار ایک': '1', 'اختیار دو': '2', 'اختیار تین': '3', 'اختیار چار': '4', 'اختیار پانچ': '5',
# Common transcription errors in Urdu
'numara': 'number', 'numbara': 'number', 'numbra': 'number',
'numbra one': '1', 'numbra two': '2', 'numbra three': '3', 'numbra 1': '1', 'numbra 2': '2', 'numbra 3': '3',
'aik': '1', 'ek': '1', 'do': '2', 'teen': '3', 'char': '4', 'panch': '5',
'che': '6', 'sat': '7', 'ath': '8', 'nau': '9', 'das': '10'
}
# Combined mappings
all_numbers = {**english_numbers, **urdu_numbers}
# First, try exact matches
if text_lower in all_numbers:
return all_numbers[text_lower]
# Try pattern matching for common transcription errors - improved patterns
patterns = [
(r'opium\s+numara?\s*(\d+)', r'\1'), # "opium numara 1" -> "1"
(r'opium\s+number?\s*(\d+)', r'\1'), # "opium number 1" -> "1"
(r'opium\s+(\d+)', r'\1'), # "opium 1" -> "1"
(r'numara?\s*(\d+)', r'\1'), # "numara 1" -> "1"
(r'number?\s*(\d+)\s*[.!]?', r'\1'), # "number 1" or "number 1." -> "1" - improved
(r'option\s*(\d+)\s*[.!]?', r'\1'), # "option 1" or "option 1." -> "1" - improved
(r'choice\s*(\d+)\s*[.!]?', r'\1'), # "choice 1" or "choice 1." -> "1" - improved
(r'menu\s*(\d+)\s*[.!]?', r'\1'), # "menu 1" or "menu 1." -> "1" - improved
(r'(\d+)\s*[.!]?\s*$', r'\1'), # "22." -> "22" - improved
(r'^(\d+)\s*[.!]?\s*', r'\1'), # "22." -> "22" - improved
]
for pattern, replacement in patterns:
match = re.search(pattern, text_lower)
if match:
return match.group(1)
# Try fuzzy matching for close matches
for number_word, digit in all_numbers.items():
if len(number_word) > 2: # Only fuzzy match longer words
if fuzz.ratio(text_lower, number_word) > 80:
logger.info(f"Fuzzy matched '{text_lower}' to '{number_word}' -> '{digit}'")
return digit
# Try extracting numbers from mixed text
number_match = re.search(r'(\d+)', text_clean)
if number_match:
return number_match.group(1)
# If no match found, return original text
logger.warning(f"No number mapping found for: '{text}'")
return text
def process_intelligent_voice_command(message_body: str, current_state: str, user_context: dict) -> str:
"""
Process voice commands intelligently for all menu states
Maps voice commands to appropriate menu selections consistently with text logic
"""
if not message_body:
return message_body
# Clean and normalize the input
cleaned_text = message_body.strip().lower()
logger.info(f"[Voice Command] Processing: '{message_body}' in state: {current_state}")
# First, check for navigation commands (main, menu, back, etc.)
# Make this more precise to avoid false positives from transcription errors
navigation_commands = [
'main', 'menu', 'start', 'home', 'back', 'return', 'go back', 'main menu',
'مین', 'مینو', 'شروع', 'گھر', 'واپس', 'ریٹرن', 'مین مینو',
'main menu', 'main menu please', 'go to main', 'back to main'
]
# Check for exact navigation commands or commands that start/end with navigation words
for cmd in navigation_commands:
# Check for exact match
if cleaned_text == cmd:
logger.info(f"[Voice Command] Exact navigation command detected: '{message_body}' -> 'main'")
return 'main'
# Check for commands that start with navigation word followed by space
if cleaned_text.startswith(cmd + ' '):
logger.info(f"[Voice Command] Navigation command at start detected: '{message_body}' -> 'main'")
return 'main'
# Check for commands that end with navigation word preceded by space
if cleaned_text.endswith(' ' + cmd):
logger.info(f"[Voice Command] Navigation command at end detected: '{message_body}' -> 'main'")
return 'main'
# Check for standalone navigation commands (surrounded by spaces or at boundaries)
if re.search(r'\b' + re.escape(cmd) + r'\b', cleaned_text):
# Additional check: make sure it's not part of a larger word
words = cleaned_text.split()
if cmd in words:
logger.info(f"[Voice Command] Navigation command as word detected: '{message_body}' -> 'main'")
return 'main'
# Handle number patterns more comprehensively
# Pattern 1: "Number X" or "Number X." or "Number X!" - more flexible
number_pattern1 = re.search(r'number\s*(\d+)\s*[.!]?', cleaned_text)
if number_pattern1:
number = number_pattern1.group(1)
logger.info(f"[Voice Command] Number pattern 1 detected: '{message_body}' -> '{number}'")
return number
# Pattern 2: "Option X" or "Option X." or "Option X!" - more flexible
option_pattern = re.search(r'option\s*(\d+)\s*[.!]?', cleaned_text)
if option_pattern:
number = option_pattern.group(1)
logger.info(f"[Voice Command] Option pattern detected: '{message_body}' -> '{number}'")
return number
# Pattern 3: "Product X" or "Product X." or "Product X!" - more flexible
product_pattern = re.search(r'product\s*(\d+)\s*[.!]?', cleaned_text)
if product_pattern:
number = product_pattern.group(1)
logger.info(f"[Voice Command] Product pattern detected: '{message_body}' -> '{number}'")
return number
# Pattern 4: "Category X" or "Category X." or "Category X!" - more flexible
category_pattern = re.search(r'category\s*(\d+)\s*[.!]?', cleaned_text)
if category_pattern:
number = category_pattern.group(1)
logger.info(f"[Voice Command] Category pattern detected: '{message_body}' -> '{number}'")
return number
# Pattern 5: Just a number at the end or beginning - more flexible
# Look for numbers at the end of the sentence
number_pattern2 = re.search(r'(\d+)\s*[.!]?\s*$', cleaned_text)
if number_pattern2:
number = number_pattern2.group(1)
logger.info(f"[Voice Command] Number pattern 2 detected: '{message_body}' -> '{number}'")
return number
# Pattern 6: Just a number at the beginning - more flexible
number_pattern3 = re.search(r'^(\d+)\s*[.!]?\s*', cleaned_text)
if number_pattern3:
number = number_pattern3.group(1)
logger.info(f"[Voice Command] Number pattern 3 detected: '{message_body}' -> '{number}'")
return number
# Pattern 7: Standalone number
if cleaned_text.isdigit():
logger.info(f"[Voice Command] Standalone number detected: '{message_body}' -> '{message_body}'")
return message_body
# Pattern 8: Extract any number from the text (fallback)
any_number_pattern = re.search(r'(\d+)', cleaned_text)
if any_number_pattern:
number = any_number_pattern.group(1)
logger.info(f"[Voice Command] Any number pattern detected: '{message_body}' -> '{number}'")
return number
# Handle spoken numbers in English and Urdu
spoken_number_mappings = {
# English spoken numbers
'one': '1', 'first': '1', '1st': '1',
'two': '2', 'second': '2', '2nd': '2', 'to': '2', 'too': '2',
'three': '3', 'third': '3', '3rd': '3', 'tree': '3',
'four': '4', 'fourth': '4', '4th': '4', 'for': '4',
'five': '5', 'fifth': '5', '5th': '5',
'six': '6', 'sixth': '6', '6th': '6',
'seven': '7', 'seventh': '7', '7th': '7',
'eight': '8', 'eighth': '8', '8th': '8',
'nine': '9', 'ninth': '9', '9th': '9',
'ten': '10', 'tenth': '10', '10th': '10',
'eleven': '11', 'eleventh': '11', '11th': '11',
'twelve': '12', 'twelfth': '12', '12th': '12',
'thirteen': '13', 'thirteenth': '13', '13th': '13',
'fourteen': '14', 'fourteenth': '14', '14th': '14',
'fifteen': '15', 'fifteenth': '15', '15th': '15',
'sixteen': '16', 'sixteenth': '16', '16th': '16',
'seventeen': '17', 'seventeenth': '17', '17th': '17',
'eighteen': '18', 'eighteenth': '18', '18th': '18',
'nineteen': '19', 'nineteenth': '19', '19th': '19',
'twenty': '20', 'twentieth': '20', '20th': '20',
'twenty one': '21', 'twenty-first': '21', '21st': '21',
'twenty two': '22', 'twenty-second': '22', '22nd': '22',
'twenty three': '23', 'twenty-third': '23', '23rd': '23',
# Urdu spoken numbers
'ایک': '1', 'پہلا': '1', 'پہلی': '1',
'دو': '2', 'دوسرا': '2', 'دوسری': '2',
'تین': '3', 'تیسرا': '3', 'تیسری': '3',
'چار': '4', 'چوتھا': '4', 'چوتھی': '4',
'پانچ': '5', 'پانچواں': '5', 'پانچویں': '5',
'چھ': '6', 'چھٹا': '6', 'چھٹی': '6',
'سات': '7', 'ساتواں': '7', 'ساتویں': '7',
'آٹھ': '8', 'آٹھواں': '8', 'آٹھویں': '8',
'نو': '9', 'نواں': '9', 'نویں': '9',
'دس': '10', 'دسواں': '10', 'دسویں': '10',
'گیارہ': '11', 'گیارہواں': '11', 'گیارہویں': '11',
'بارہ': '12', 'بارہواں': '12', 'بارہویں': '12',
'تیرہ': '13', 'تیرہواں': '13', 'تیرہویں': '13',
'چودہ': '14', 'چودہواں': '14', 'چودہویں': '14',
'پندرہ': '15', 'پندرہواں': '15', 'پندرہویں': '15',
'سولہ': '16', 'سولہواں': '16', 'سولہویں': '16',
'سترہ': '17', 'سترہواں': '17', 'سترہویں': '17',
'اٹھارہ': '18', 'اٹھارہواں': '18', 'اٹھارہویں': '18',
'انیس': '19', 'انیسواں': '19', 'انیسویں': '19',
'بیس': '20', 'بیسواں': '20', 'بیسویں': '20',
'اکیس': '21', 'اکیسواں': '21', 'اکیسویں': '21',
'بائیس': '22', 'بائیسواں': '22', 'بائیسویں': '22',
'تئیس': '23', 'تئیسواں': '23', 'تئیسویں': '23',
}
# Check for spoken numbers
for spoken, digit in spoken_number_mappings.items():
if spoken in cleaned_text:
logger.info(f"[Voice Command] Spoken number detected: '{message_body}' -> '{digit}'")
return digit
# Handle common transcription errors and variations
transcription_fixes = {
'bye': 'hi', # Common transcription error for "hi"
'hi': 'hi',
'hello': 'hi',
'hey': 'hi',
'main': 'main',
'menu': 'main',
'start': 'main',
'home': 'main',
'back': 'main',
'return': 'main',
'go back': 'main',
'main menu': 'main',
'main menu please': 'main',
'go to main': 'main',
'back to main': 'main',
}
# Check for transcription fixes
for error, correction in transcription_fixes.items():
if error in cleaned_text:
logger.info(f"[Voice Command] Transcription fix applied: '{message_body}' -> '{correction}'")
return correction
# If no pattern matches, return the original message for further processing
logger.info(f"[Voice Command] No specific pattern matched, returning original: '{message_body}'")
return message_body
async def process_incoming_message(from_number: str, msg: dict):
"""Process incoming message and send appropriate response with full intelligence"""
try:
# Safety check for message body
message_body = msg.get('body') if isinstance(msg, dict) else None
message_type = msg.get('type', 'text') if isinstance(msg, dict) else 'text'
reply_language = msg.get('reply_language', 'en') # Default to English
# Robust media type extraction
media = msg.get('media', {}) if isinstance(msg, dict) else {}
media_type = None
if isinstance(media, dict):
media_type = media.get('type')
# If media is a list or None, media_type stays None
# Handle voice messages FIRST - before checking message_body
if message_type in ['audio', 'voice'] or media_type == 'audio':
logger.info(f"[Process] Processing voice message from {from_number}")
await handle_voice_message_complete(from_number, msg)
return
# For text messages, check if body exists
if message_body is None:
logger.info(f"[Process] Skipping message from {from_number} - no body content")
return
message_body = message_body.strip()
logger.info(f"[Process] Processing {message_type} message from {from_number}: {message_body}")
# Get user context
user_context = context_manager.get_context(from_number)
current_state = user_context.get('current_state', 'main_menu')
# Update context with last message for intelligent responses
context_manager.update_context(from_number, last_message=message_body)
# Debug logging
logger.info(f"[Process] Current state: {current_state}, Message: '{message_body}' from {from_number}")
# Handle text messages
if not message_body:
return
# 🎯 PRIORITY 1: Check for greetings FIRST (before any other processing)
if is_greeting(message_body):
logger.info(f"[Process] Greeting detected: '{message_body}' -> showing welcome message")
# Always show welcome message for greetings, regardless of current state
welcome_msg = generate_veterinary_welcome_message()
send_whatsjet_message(from_number, welcome_msg)
context_manager.update_context(
from_number,
current_state='main_menu',
current_menu='main_menu',
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values())
)
return
# 🎯 PRIORITY 2: Navigation commands - work from ANY state
# Check for "main" command - now works for both text and voice
if current_state != 'main_menu' and current_state != 'ai_chat_mode': # Only check for main if not already in main menu and not in AI chat mode
mapped_navigation = process_intelligent_voice_command(message_body, current_state, user_context)
if mapped_navigation == 'main':
logger.info(f"[Process] Navigation command detected: '{message_body}' -> 'main'")
welcome_msg = generate_veterinary_welcome_message()
send_whatsjet_message(from_number, welcome_msg)
context_manager.update_context(
from_number,
current_state='main_menu',
current_menu='main_menu',
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values())
)
return
# Also check for text-based navigation commands
if message_body.lower() in ['main', 'menu', 'start', 'home', 'back']:
welcome_msg = generate_veterinary_welcome_message()
send_whatsjet_message(from_number, welcome_msg)
context_manager.update_context(
from_number,
current_state='main_menu',
current_menu='main_menu',
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values())
)
return
# 🎯 PRIORITY 3: State-specific handling (contact_request, availability_request, ai_chat_mode)
if current_state == 'contact_request':
await handle_contact_request_response(from_number, message_body)
return
elif current_state == 'availability_request':
await handle_availability_request_response(from_number, message_body)
return
elif current_state == 'ai_chat_mode':
await handle_ai_chat_mode(from_number, message_body, reply_language)
return
# 🎯 PRIORITY 4: Menu selections - check if this is a valid menu selection for current state
if current_state in ['main_menu', 'category_selection_menu', 'category_products_menu', 'all_products_menu', 'product_inquiry', 'intelligent_products_menu']:
# Validate menu selection
is_valid, error_msg = validate_menu_selection(message_body, current_state, user_context)
if is_valid:
# Handle valid menu selection
if current_state == 'main_menu':
if message_body == '1':
# Search Products
await display_all_products(from_number)
elif message_body == '2':
# Browse Categories
categories = get_all_categories()
if categories:
context_manager.update_context(
from_number,
current_state='category_selection_menu',
current_menu='category_selection_menu',
current_menu_options=categories,
available_categories=categories
)
message = "📁 *Select a Category:*\n\n"
for i, category in enumerate(categories, 1):
message += f"{format_number_with_emoji(i)} {category}\n"
message += "\n💬 Type a category number or 'main' to return to main menu."
send_whatsjet_message(from_number, message)
else:
send_whatsjet_message(from_number, "❌ No categories available. Type 'main' to return to main menu.")
elif message_body == '3':
# Download Catalog
await send_catalog_pdf(from_number)
elif message_body == '4':
# AI Chat Mode
context_manager.update_context(
from_number,
current_state='ai_chat_mode',
current_menu='ai_chat_mode',
current_menu_options=['main'],
reply_language='ur'
)
message = (
"🤖 ویٹرنری اے آئی اسسٹنٹ\n\n"
"آپ مجھ سے پوچھ سکتے ہیں:\n"
"• ویٹرنری سوالات\n"
"• پروڈکٹ کی سفارشات\n"
"• علاج کے مشورے\n"
"• عمومی معلومات\n\n"
"💬 'main' لکھ کر مین مینو پر واپس جائیں۔"
)
send_whatsjet_message(from_number, message)
elif current_state == 'category_selection_menu':
await handle_category_selection(message_body, from_number)
elif current_state == 'category_products_menu':
# Handle product selection from category
available_products = user_context.get('available_products', [])
if message_body.isdigit() and 1 <= int(message_body) <= len(available_products):
selected_product = available_products[int(message_body) - 1]
context_manager.update_context(
from_number,
current_product=selected_product,
current_state='product_inquiry',
current_menu='product_inquiry',
current_menu_options=list(MENU_CONFIG['product_inquiry']['option_descriptions'].values())
)
await send_product_image_with_caption(from_number, selected_product, user_context)
else:
send_whatsjet_message(from_number, get_menu_validation_message(current_state, user_context))
elif current_state == 'all_products_menu':
# Handle product selection from all products
if products_df is not None and not products_df.empty:
all_products = products_df.to_dict('records')
if message_body.isdigit() and 1 <= int(message_body) <= len(all_products):
selected_product = all_products[int(message_body) - 1]
context_manager.update_context(
from_number,
current_product=selected_product,
current_state='product_inquiry',
current_menu='product_inquiry',
current_menu_options=list(MENU_CONFIG['product_inquiry']['option_descriptions'].values())
)
await send_product_image_with_caption(from_number, selected_product, user_context)
else:
send_whatsjet_message(from_number, get_menu_validation_message(current_state, user_context))
else:
send_whatsjet_message(from_number, "❌ No products available. Type 'main' to return to main menu.")
elif current_state == 'product_inquiry':
await handle_veterinary_product_followup(message_body, from_number)
elif current_state == 'intelligent_products_menu':
# Handle product selection from intelligent products menu
available_products = user_context.get('available_products', [])
if message_body.isdigit() and 1 <= int(message_body) <= len(available_products):
selected_product = available_products[int(message_body) - 1]
context_manager.update_context(
from_number,
current_product=selected_product,
current_state='product_inquiry',
current_menu='product_inquiry',
current_menu_options=list(MENU_CONFIG['product_inquiry']['option_descriptions'].values())
)
await send_product_image_with_caption(from_number, selected_product, user_context)
return
else:
send_whatsjet_message(from_number, get_menu_validation_message(current_state, user_context))
return
return # Exit after handling menu selection
# 🎯 PRIORITY 5: Check for company/about queries first (before product search)
query_lower = message_body.lower().strip()
if any(keyword in query_lower for keyword in ['apex', 'company', 'about', 'who', 'what is']):
# Use OpenAI for company information
await handle_general_query_with_ai(from_number, message_body, user_context, reply_language)
return
# 🎯 PRIORITY 6: Check for product-specific questions (mode of action, dosage, etc.)
product_question_keywords = ['mode of action', 'dosage', 'administration', 'composition', 'indications', 'precautions', 'storage', 'how to use', 'side effects']
if any(keyword in query_lower for keyword in product_question_keywords):
# Use OpenAI for product-specific questions
await handle_general_query_with_ai(from_number, message_body, user_context, reply_language)
return
# 🎯 PRIORITY 7: Product names - works from ANY menu state
# This ensures users can say product names like "hydropex", "respira aid plus", etc. from any menu
logger.info(f"[Process] Checking for product name in message: '{message_body}' from state: {current_state}")
products = get_veterinary_product_matches(message_body)
# --- NEW LOGIC: Check for exact match first ---
normalized_input = normalize(message_body).lower().strip()
exact_match = None
for product in products:
product_name = product.get('Product Name', '')
normalized_product_name = normalize(product_name).lower().strip()
if normalized_product_name == normalized_input:
exact_match = product
break
if exact_match:
logger.info(f"[Process] Exact product match found: {exact_match.get('Product Name', 'Unknown')}")
context_manager.update_context(
from_number,
current_product=exact_match,
current_state='product_inquiry',
current_menu='product_inquiry',
current_menu_options=list(MENU_CONFIG['product_inquiry']['option_descriptions'].values())
)
# Only send one reply: image+caption if possible, else text
await send_product_image_with_caption(from_number, exact_match, user_context)
return
# --- END NEW LOGIC ---
if products:
logger.info(f"[Process] Product name detected: '{message_body}' -> Found {len(products)} products")
# Check if this is a specific product name search or a category/symptom search
is_specific_product = False
# Check for exact product name match (indicating specific product search)
normalized_input = normalize(message_body).lower().strip()
for product in products:
product_name = product.get('Product Name', '')
normalized_product_name = normalize(product_name).lower().strip()
if normalized_product_name == normalized_input:
is_specific_product = True
break
# If it's a specific product name, show only that product
if is_specific_product and len(products) == 1:
selected_product = products[0]
product_name = selected_product.get('Product Name', 'Unknown')
logger.info(f"[Process] Specific product found: {product_name}")
context_manager.update_context(
from_number,
current_product=selected_product,
current_state='product_inquiry',
current_menu='product_inquiry',
current_menu_options=list(MENU_CONFIG['product_inquiry']['option_descriptions'].values())
)
await send_product_image_with_caption(from_number, selected_product, user_context)
return
# If it's a category/symptom search with multiple products, show all products
else:
logger.info(f"[Process] Category/symptom search with {len(products)} products")
# Use intelligent product inquiry to show all matching products
await handle_intelligent_product_inquiry(from_number, message_body, user_context, reply_language)
return
else:
# Check for specific query types before falling back to generic response
query_lower = message_body.lower().strip()
# Check for company/about queries
if any(keyword in query_lower for keyword in ['apex', 'company', 'about', 'who', 'what is']):
# Use OpenAI for company information
await handle_general_query_with_ai(from_number, message_body, user_context, reply_language)
return
# Check for product-specific questions (mode of action, dosage, etc.)
product_question_keywords = ['mode of action', 'dosage', 'administration', 'composition', 'indications', 'precautions', 'storage', 'how to use', 'side effects']
if any(keyword in query_lower for keyword in product_question_keywords):
# Use OpenAI for product-specific questions
await handle_general_query_with_ai(from_number, message_body, user_context, reply_language)
return
# Check for general veterinary questions
veterinary_keywords = ['weather', 'temperature', 'disease', 'symptoms', 'treatment', 'prevention', 'vaccination', 'nutrition', 'health']
if any(keyword in query_lower for keyword in veterinary_keywords):
# Simple redirect for non-veterinary topics
send_whatsjet_message(from_number, "❌ Please ask the correct question related to Apex Biotical Solutions or type 'main' to go to main menu.")
return
# Simple one-liner for wrong queries
send_whatsjet_message(from_number, "❌ Please correct your question or type 'main' to go to main menu.")
# 🎯 PRIORITY 8: Default: treat as general query with intelligent product inquiry
await handle_intelligent_product_inquiry(from_number, message_body, user_context, reply_language)
except Exception as e:
logger.error(f"Error in process_incoming_message: {e}")
# Instead of sending a generic error, return to main menu
welcome_msg = generate_veterinary_welcome_message()
send_whatsjet_message(from_number, welcome_msg)
context_manager.update_context(from_number, current_state='main_menu', current_menu='main_menu', current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()))
async def handle_general_query_with_ai(from_number: str, query: str, user_context: dict, reply_language: str = 'en'):
"""Handle general queries with OpenAI intelligence"""
reply_language = 'ur'
logger.info(f"[AI General] Forcing reply_language to Urdu for Option 4.")
try:
if not openai.api_key:
send_whatsjet_message(from_number,
"❌ AI assistance is not available. Please try searching for a specific product or type 'main' for the menu.")
return
current_state = user_context.get('current_state', 'main_menu')
current_product = user_context.get('current_product')
prompt = f"""
You are a professional veterinary product assistant for Apex Biotical Solutions, helping users on WhatsApp.
Always answer in a clear, accurate, and helpful manner with proper formatting and emojis.
User Query: "{query}"
Current State: {current_state}
Current Product: {current_product.get('Product Name', 'None') if current_product else 'None'}
IMPORTANT INSTRUCTIONS:
1. If the user asks about "Apex" or "Apex Biotical" - provide a brief, professional overview of Apex Biotical Solutions as a veterinary pharmaceutical company, including their expertise, product range, and commitment to animal health. Keep it concise and welcoming.
2. If the user asks about specific product details (mode of action, dosage, administration, composition, etc.) - search through the veterinary products database and provide detailed, accurate information about the specific product mentioned. If the product exists in the database, provide comprehensive details. If not found, suggest similar products.
3. If the user asks about products (e.g., 'poultry products', 'respiratory medicine'), list ALL relevant products from the database with a short description for each.
4. If the user asks a general veterinary question, provide a concise, expert answer.
5. Always keep responses professional, concise, and user-friendly with proper formatting.
6. Use emojis and bullet points for better readability.
7. If you don't have specific information, say so clearly and suggest alternatives.
CRITICAL NAMING RULES:
- ALWAYS keep company name "Apex Biotical Solutions" in English, even in Urdu responses
- ALWAYS keep product names in English (e.g., "EC-Immune", "Hydropex", "Heposel")
- ALWAYS keep technical terms in English when possible
- Only translate descriptive text, not proper nouns or brand names
Available Products Database: {products_df.to_dict('records') if products_df is not None else 'No products loaded'}
RESPONSE FORMAT:
- Keep responses concise and to the point
- Use emojis sparingly but effectively
- Avoid long titles or headers
- Focus on providing accurate, helpful information
- Preserve English company names and product names
"""
response = openai.ChatCompletion.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=300
)
ai_response = response.choices[0].message.content.strip()
if reply_language == 'ur':
try:
# Get all product and category names for restoration
product_names = []
category_names = []
if products_df is not None and not products_df.empty:
product_names = [str(p.get('Product Name', '')) for p in products_df.to_dict('records') if p.get('Product Name')]
category_names = list(set([str(p.get('Category', '')) for p in products_df.to_dict('records') if p.get('Category')]))
# Add company names to preserve in English
company_names = ['Apex Biotical Solutions', 'Apex Biotical', 'Apex']
translated_response = GoogleTranslator(source='auto', target='ur').translate(ai_response)
# Restore English terms including company names
translated_response = restore_english_terms(translated_response, ai_response, product_names + company_names, category_names)
send_whatsjet_message(from_number, translated_response)
except Exception as e:
logger.error(f"[AI] Translation error: {e}")
send_whatsjet_message(from_number, ai_response)
else:
send_whatsjet_message(from_number, ai_response)
if context_manager:
context_manager.add_to_history(from_number, query, ai_response)
except Exception as e:
logger.error(f"[AI] Error handling general query: {e}")
send_whatsjet_message(from_number, "❌ AI Assistant encountered an error. Please try again or type 'main' to return to main menu.")
async def handle_contact_request(from_number: str):
"""Handle contact request"""
try:
message = (
"📞 *Contact Information*\n\n"
"Please provide your details:\n"
"• Name and location\n"
"• Phone number\n"
"• Specific inquiry\n\n"
"💬 *Example:* Dr. Ali - Multan - Need consultation for liver disease\n\n"
"💬 *Type 'main' to return to the main menu.*"
)
send_whatsjet_message(from_number, message)
context_manager.update_context(
from_number,
current_state='contact_request',
current_menu='contact_request',
current_menu_options=['Provide contact details']
)
except Exception as e:
logger.error(f"[Contact] Error handling contact request: {e}")
# Instead of sending a generic error, return to main menu
welcome_msg = generate_veterinary_welcome_message()
send_whatsjet_message(from_number, welcome_msg)
context_manager.update_context(
from_number,
current_state='main_menu',
current_menu='main_menu',
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values())
)
async def handle_contact_request_response(from_number: str, response: str):
"""Handle contact request response"""
try:
# Save contact inquiry
contact_data = {
'phone_number': from_number,
'inquiry': response,
'timestamp': datetime.now().isoformat()
}
# Ensure directory exists
os.makedirs('contacts', exist_ok=True)
with open('contacts/contact_inquiries.json', 'a', encoding='utf-8') as f:
f.write(json.dumps(contact_data, ensure_ascii=False) + '\n')
# Send inquiry to receiving number (admin)
receiving_number = "923102288328"
# Parse the response to separate name/location from details
response_lines = response.strip().split('\n')
if len(response_lines) >= 2:
name_location = response_lines[0].strip()
details = '\n'.join(response_lines[1:]).strip()
else:
# If only one line, assume it's all name/location
name_location = response.strip()
details = "No specific details provided"
inquiry_message = (
f"📞 *Follow Up Inquiry*\n\n"
f"Name and Location: {name_location}\n"
f"Phone: {from_number}\n"
f"Details: {details}"
)
send_whatsjet_message(receiving_number, inquiry_message)
# Send confirmation to user
send_whatsjet_message(from_number,
"✅ Thank you! Your inquiry has been received. Our team will contact you soon.\n\n"
"Type 'main' to return to the main menu.")
context_manager.update_context(
from_number,
current_state='main_menu',
current_menu='main_menu',
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values())
)
except Exception as e:
logger.error(f"[Contact] Error handling contact response: {e}")
# Instead of sending a generic error, return to main menu
welcome_msg = generate_veterinary_welcome_message()
send_whatsjet_message(from_number, welcome_msg)
context_manager.update_context(
from_number,
current_state='main_menu',
current_menu='main_menu',
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values())
)
async def handle_availability_inquiry(from_number: str, user_context: dict):
"""Handle availability inquiry"""
try:
current_product = user_context.get('current_product')
if current_product:
product_name = current_product.get('Product Name', 'N/A')
message = (
f"📦 *Availability Inquiry*\n\n"
f"Product: {product_name}\n\n"
"Please provide:\n"
"• Your name and location\n"
"• Required quantity\n"
"• Delivery preferences\n\n"
"💬 *Example:* Dr. Ali – Multan, 50 bottles\n\n"
"💬 *Type 'main' to return to the main menu.*"
)
send_whatsjet_message(from_number, message)
context_manager.update_context(
from_number,
current_state='availability_request',
current_menu='availability_request',
current_menu_options=['Provide availability details']
)
else:
send_whatsjet_message(from_number,
"❌ No product selected. Please search for a product first.")
except Exception as e:
logger.error(f"[Availability] Error handling availability inquiry: {e}")
# Instead of sending a generic error, return to main menu
welcome_msg = generate_veterinary_welcome_message()
send_whatsjet_message(from_number, welcome_msg)
context_manager.update_context(
from_number,
current_state='main_menu',
current_menu='main_menu',
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values())
)
async def handle_availability_request_response(from_number: str, response: str):
"""Handle availability request response"""
try:
# Save availability inquiry
availability_data = {
'phone_number': from_number,
'inquiry': response,
'timestamp': datetime.now().isoformat()
}
# Ensure directory exists
os.makedirs('contacts', exist_ok=True)
with open('contacts/availability_inquiries.json', 'a', encoding='utf-8') as f:
f.write(json.dumps(availability_data, ensure_ascii=False) + '\n')
# Send inquiry to receiving number (admin)
receiving_number = "923102288328"
current_product = context_manager.get_context(from_number).get('current_product', {})
product_name = current_product.get('Product Name', 'N/A') if current_product else 'N/A'
# Parse the response to extract name/location, quantity, and delivery preferences
response_lines = [line.strip() for line in response.strip().split('\n') if line.strip()]
name_location = "Not provided"
quantity = "Not specified"
delivery_preferences = "Not specified"
if len(response_lines) >= 1:
name_location = response_lines[0]
if len(response_lines) >= 2:
quantity = response_lines[1]
if len(response_lines) >= 3:
delivery_preferences = response_lines[2]
inquiry_message = (
f"📦 *Product Availability Inquiry*\n\n"
f"Product: {product_name}\n"
f"Name and Location: {name_location}\n"
f"Quantity: {quantity}\n"
f"Delivery Preferences: {delivery_preferences}\n"
f"Phone: {from_number}"
)
send_whatsjet_message(receiving_number, inquiry_message)
# Send confirmation to user
send_whatsjet_message(from_number,
"✅ Thank you! Your availability inquiry has been received. Our sales team will contact you soon.\n\n"
"Type 'main' to return to the main menu.")
context_manager.update_context(
from_number,
current_state='main_menu',
current_menu='main_menu',
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values())
)
except Exception as e:
logger.error(f"[Availability] Error handling availability response: {e}")
# Instead of sending a generic error, return to main menu
welcome_msg = generate_veterinary_welcome_message()
send_whatsjet_message(from_number, welcome_msg)
context_manager.update_context(
from_number,
current_state='main_menu',
current_menu='main_menu',
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values())
)
def send_helpful_guidance(from_number: str, current_state: str):
try:
if current_state == 'all_products_menu':
send_whatsjet_message(from_number,
"📋 *Products Menu*\n\n"
"Select a product number (1-23) to view detailed information.\n"
"Type 'main' to return to the main menu.\n"
"You can also type a product name to search.")
elif current_state == 'product_inquiry':
send_whatsjet_message(from_number,
"📦 *Product Details*\n\n"
"Select an option:\n"
"1️⃣ Contact Sales\n"
"2️⃣ Check Availability\n"
"3️⃣ Back to Main Menu\n"
"Type 'main' to return to main menu.")
elif current_state == 'category_selection_menu':
send_whatsjet_message(from_number,
"📁 *Category Selection*\n\n"
"Select a category number to view products.\n"
"Type 'main' to return to main menu.")
elif current_state == 'category_products_menu':
send_whatsjet_message(from_number,
"📦 *Category Products*\n\n"
"Select a product number to view details.\n"
"Type 'main' to return to main menu.")
elif current_state == 'contact_request':
send_whatsjet_message(from_number,
"📞 *Contact Request*\n\n"
"Please provide your name, location, and quantity.\n"
"Format: 'Name - Location, Quantity'\n"
"Example: 'Dr. Ali - Multan, 50 bottles'")
elif current_state == 'availability_request':
send_whatsjet_message(from_number,
"📦 *Availability Inquiry*\n\n"
"Please provide your location and quantity.\n"
"Format: 'Location, Quantity'\n"
"Example: 'Multan, 50 bottles'")
else:
send_whatsjet_message(from_number,
"💬 *Main Menu*\n\n"
"Available options:\n"
"1️⃣ Search Veterinary Products\n"
"2️⃣ Browse Categories\n"
"3️⃣ Download Catalog\n\n"
"Select an option or ask about specific products.")
context_manager.update_context(
from_number,
current_state='main_menu',
current_menu='main_menu',
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values())
)
except Exception as e:
logger.error(f"Error sending helpful guidance: {e}")
welcome_msg = generate_veterinary_welcome_message()
send_whatsjet_message(from_number, welcome_msg)
context_manager.update_context(
from_number,
current_state='main_menu',
current_menu='main_menu',
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values())
)
def is_greeting(text):
"""
Enhanced greeting detection using fuzzy matching and universal patterns.
Can detect variations like "Hy", "Hii", "Hallo", etc. without hardcoding.
"""
if not text:
return False
text_lower = text.lower().strip()
# Core greeting patterns that can be extended with variations
core_greetings = {
'hello': ['hello', 'hallo', 'helo', 'hlo', 'hallo', 'heloo', 'helloo'],
'hi': ['hi', 'hy', 'hii', 'hiii', 'hiiii', 'hie', 'hye', 'hai', 'hay'],
'hey': ['hey', 'heyy', 'heyyy', 'heey', 'heeyy', 'hay', 'hae'],
'good_morning': ['good morning', 'goodmorning', 'gm', 'gud morning', 'gudmorning'],
'good_afternoon': ['good afternoon', 'goodafternoon', 'ga', 'gud afternoon', 'gudafternoon'],
'good_evening': ['good evening', 'goodevening', 'ge', 'gud evening', 'gudevening'],
'good_night': ['good night', 'goodnight', 'gn', 'gud night', 'gudnight'],
'morning': ['morning', 'mornin', 'morn'],
'afternoon': ['afternoon', 'aftrnoon', 'aftr'],
'evening': ['evening', 'evnin', 'evn'],
'night': ['night', 'nite', 'nyt'],
'how_are_you': ['how are you', 'how r u', 'how are u', 'how r you', 'howru', 'howru'],
'whats_up': ['whats up', 'whats up', 'what is up', 'wassup', 'wassup', 'sup', 'sup'],
'assalamu_alaikum': ['assalamu alaikum', 'assalam alaikum', 'assalamu alaikom', 'assalam alaikom', 'asalamu alaikum', 'asalam alaikum', 'as-salamu alaykum', 'as salam alaykum', 'assalamu alaykum', 'assalam alaykum'],
'salam': ['salam', 'salaam', 'assalam', 'assalaam', 'salaam alaikum', 'salaam alaikom'],
'adaab': ['adaab', 'adaab arz hai', 'adaab arz', 'adaab arz karta hun'],
'namaste': ['namaste', 'namaskar', 'pranam', 'pranaam'],
'khuda_hafiz': ['khuda hafiz', 'allah hafiz', 'fi amanillah'],
'thank_you': ['thank you', 'thanks', 'shukriya', 'shukran', 'thnx', 'thx', 'tnx']
}
# Flatten all variations into a single list for fuzzy matching
all_greeting_variations = []
for variations in core_greetings.values():
all_greeting_variations.extend(variations)
# 1. Exact match check (fastest)
if text_lower in all_greeting_variations:
return True
# 2. Check for greeting patterns with common prefixes/suffixes
greeting_patterns = [
r'^(hi|hello|hey|hy|hii|hiii|hallo|helo|hlo|heyy|heyyy|heey|heeyy|hay|hae|hai|hye|hie)\s*$',
r'^(good\s+(morning|afternoon|evening|night)|gm|ga|ge|gn|gud\s+(morning|afternoon|evening|night))\s*$',
r'^(morning|afternoon|evening|night|mornin|morn|aftrnoon|aftr|evnin|evn|nite|nyt)\s*$',
r'^(how\s+(are\s+)?(you|u|r\s+u)|howru|howru)\s*$',
r'^(whats?\s+up|wassup|sup)\s*$',
r'^(assalamu?\s+alaik(um|om)|asalamu?\s+alaik(um|om)|salaam\s+alaik(um|om)|as-salamu?\s+alaykum|as\s+salam\s+alaykum)\s*$',
r'^(salam|salaam|assalam|assalaam)\s*$',
r'^(adaab(?:\s+arz(?:\s+(hai|karta\s+hun))?)?)\s*$',
r'^(namaste|namaskar|pranam|pranaam)\s*$',
r'^(khuda\s+hafiz|allah\s+hafiz|fi\s+amanillah)\s*$',
r'^(thank\s+you|thanks|shukriya|shukran|thnx|thx|tnx)\s*$'
]
for pattern in greeting_patterns:
if re.match(pattern, text_lower):
return True
# 3. Fuzzy matching for typos and variations (using rapidfuzz)
# Set a high threshold to avoid false positives
FUZZY_THRESHOLD = 90 # Increased threshold to 90% for better precision
# Check against all greeting variations
for greeting in all_greeting_variations:
# Use ratio for overall similarity
similarity = fuzz.ratio(text_lower, greeting)
if similarity >= FUZZY_THRESHOLD:
logger.info(f"Fuzzy greeting match: '{text_lower}' -> '{greeting}' (similarity: {similarity}%)")
return True
# 4. Check for greeting questions
greeting_questions = [
'how are you', 'how r u', 'how are u', 'how do you do', 'how\'s it going',
'how is it going', 'how\'s everything', 'how is everything',
'what\'s up', 'whats up', 'what is up', 'how\'s life', 'how is life',
'آپ کیسے ہیں', 'آپ کیسے ہو', 'کیسے ہیں', 'کیسے ہو', 'کیا حال ہے', 'کیسا ہے'
]
for question in greeting_questions:
if question in text_lower:
return True
# 5. Check for greeting with common modifiers
greeting_modifiers = ['there', 'everyone', 'all', 'guys', 'folks', 'people']
words = text_lower.split()
if len(words) >= 2:
first_word = words[0]
remaining_words = words[1:]
# Check if first word is a greeting and remaining words are modifiers
for greeting in all_greeting_variations:
if fuzz.ratio(first_word, greeting) >= FUZZY_THRESHOLD:
# Check if remaining words are all modifiers
if all(word in greeting_modifiers for word in remaining_words):
return True
# 6. Special case: Very short messages that are likely greetings
if len(text_lower) <= 4 and len(text_lower) >= 2:
# Check if it's a very short greeting-like word
short_greetings = ['hi', 'hy', 'hii', 'hey', 'heyy', 'hay', 'hae', 'hai', 'hye', 'hie']
for short_greeting in short_greetings:
if fuzz.ratio(text_lower, short_greeting) >= 85: # Lower threshold for short words
logger.info(f"Short greeting match: '{text_lower}' -> '{short_greeting}'")
return True
# 7. Additional safety check: Avoid false positives for common non-greeting words
# that might have high similarity to greetings
non_greeting_words = [
'help', 'here', 'her', 'his', 'him', 'hot', 'how', 'history', 'high',
'hint', 'hit', 'hill', 'hire', 'a', 'b', 'c', 'what', 'when', 'where', 'why'
]
# If the text is exactly one of these words, it's not a greeting
if text_lower in non_greeting_words:
return False
# 8. Check for product/inquiry keywords that indicate non-greeting intent
inquiry_keywords = [
'need', 'want', 'looking', 'find', 'show', 'tell', 'give', 'products',
'medicine', 'antibiotics', 'veterinary', 'animals', 'cattle', 'poultry',
'catalog', 'price', 'availability', 'consultation', 'appointment',
'main', 'menu', 'start', 'home', 'back', '1', '2', '3', '4', '5'
]
# If any inquiry keyword is present, it's likely not just a greeting
for keyword in inquiry_keywords:
if keyword in text_lower:
return False
return False
async def handle_ai_chat_mode(from_number: str, query: str, reply_language: str = 'en'):
"""
Handle AI chat mode - completely separate from menu system
Uses OpenAI to provide intelligent responses based on CSV data
"""
# Force Urdu replies for Option 4
reply_language = 'ur'
logger.info(f"[AI Chat] Forcing reply_language to Urdu for Option 4.")
try:
logger.info(f"[AI Chat] Processing query: '{query}' for {from_number} in {reply_language}")
# Check for navigation commands first
if query.lower().strip() in ['main', 'menu', 'start', 'home', 'back']:
logger.info(f"[AI Chat] Navigation command detected: '{query}' -> returning to main menu")
welcome_msg = generate_veterinary_welcome_message()
send_whatsjet_message(from_number, welcome_msg)
context_manager.update_context(
from_number,
current_state='main_menu',
current_menu='main_menu',
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values())
)
return
# Check for greetings - return to main menu
if is_greeting(query):
logger.info(f"[AI Chat] Greeting detected: '{query}' -> returning to main menu")
welcome_msg = generate_veterinary_welcome_message()
send_whatsjet_message(from_number, welcome_msg)
context_manager.update_context(
from_number,
current_state='main_menu',
current_menu='main_menu',
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values())
)
return
# Check if OpenAI is available
if not OPENAI_API_KEY:
if reply_language == 'ur':
send_whatsjet_message(from_number, "❌ AI Assistant requires OpenAI API. Please contact support.")
else:
send_whatsjet_message(from_number, "❌ AI Assistant requires OpenAI API. Please contact support.")
return
# Get all products data for context
all_products = []
if products_df is not None and not products_df.empty:
all_products = products_df.to_dict('records')
# Create comprehensive context for AI
products_context = ""
if all_products:
products_context = "Available Veterinary Products:\n"
for i, product in enumerate(all_products[:50], 1): # Limit to first 50 products for context
product_name = product.get('Product Name', 'N/A')
category = product.get('Category', 'N/A')
composition = product.get('Composition', 'N/A')
target_species = product.get('Target Species', 'N/A')
products_context += f"{i}. {product_name} - {category}\n"
products_context += f" Composition: {composition}\n"
products_context += f" Target Species: {target_species}\n\n"
# Create AI prompt
if reply_language == 'ur':
prompt = f"""
آپ Apex Biotical کے Veterinary AI Assistant ہیں۔ آپ کو veterinary products اور treatments کے بارے میں معلومات فراہم کرنی ہیں۔
یوزر کا سوال: {query}
دستیاب veterinary products:
{products_context}
براہ کرم:
1. یوزر کے سوال کا جواب دیں
2. اگر یہ veterinary products سے متعلق ہے تو relevant products کی معلومات دیں
3. اگر یہ general veterinary advice ہے تو professional guidance دیں
4. اردو میں جواب دیں
5. جواب professional اور helpful ہو
جواب:
"""
else:
prompt = f"""
You are Apex Biotical's Veterinary AI Assistant. You provide information about veterinary products and treatments.
User Query: {query}
Available Veterinary Products:
{products_context}
Please:
1. Answer the user's question
2. If it's related to veterinary products, provide relevant product information
3. If it's general veterinary advice, provide professional guidance
4. Answer in English
5. Keep the response professional and helpful
Response:
"""
# Get AI response
response = openai.ChatCompletion.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
temperature=0.7,
max_tokens=500
)
ai_response = response.choices[0].message.content.strip()
# Add instructions for returning to main menu
if reply_language == 'ur':
ai_response += "\n\n💬 *Type 'main' to return to main menu*"
else:
ai_response += "\n\n💬 *Type 'main' to return to main menu*"
# Translate response if needed
if reply_language == 'ur':
try:
# Get all product and category names
product_names = [str(p.get('Product Name', '')) for p in all_products if p.get('Product Name')]
category_names = list(set([str(p.get('Category', '')) for p in all_products if p.get('Category')]))
# Add company names to preserve in English
company_names = ['Apex Biotical Solutions', 'Apex Biotical', 'Apex']
translated_response = GoogleTranslator(source='auto', target='ur').translate(ai_response)
# Restore English terms including company names
translated_response = restore_english_terms(translated_response, ai_response, product_names + company_names, category_names)
send_whatsjet_message(from_number, translated_response)
except Exception as e:
logger.error(f"[AI] Translation error: {e}")
send_whatsjet_message(from_number, ai_response)
else:
send_whatsjet_message(from_number, ai_response)
# Update context to AI chat mode
context_manager.update_context(
from_number,
current_state='ai_chat_mode',
current_menu='ai_chat_mode',
current_menu_options=['main'],
last_ai_query=query,
last_ai_response=ai_response
)
# Add to conversation history
context_manager.add_to_history(from_number, query, ai_response)
logger.info(f"[AI Chat] Response sent successfully to {from_number}")
except Exception as e:
logger.error(f"[AI Chat] Error processing query: {e}")
if reply_language == 'ur':
error_msg = "❌ AI Assistant میں error آ گیا ہے۔ براہ کرم دوبارہ کوشش کریں یا 'main' لکھ کر main menu پر واپس جائیں۔"
else:
error_msg = "❌ AI Assistant encountered an error. Please try again or type 'main' to return to main menu."
send_whatsjet_message(from_number, error_msg)
# Load products on startup
def load_products_data():
"""Load products data from CSV file"""
global products_df
try:
if os.path.exists(CSV_FILE):
products_df = pd.read_csv(CSV_FILE)
logger.info(f"✅ Loaded {len(products_df)} products from {CSV_FILE}")
else:
logger.warning(f"⚠️ CSV file {CSV_FILE} not found")
products_df = pd.DataFrame()
except Exception as e:
logger.error(f"❌ Error loading products data: {e}")
products_df = pd.DataFrame()
load_products_data()
# Add these functions after the existing imports and before the main functions
def get_product_image_path(product_name: str) -> str:
"""
Get the cPanel image URL for a product based on its name.
Only uses cPanel public URL format: https://amgocus.com/uploads/images/<normalized_name>.<ext>
Normalized: lowercase, remove spaces/underscores/dots, preserve dashes.
"""
try:
def normalize(name):
return re.sub(r'[\s_\.]', '', name).lower()
normalized_name = normalize(product_name)
logger.info(f"[Image] Normalized product name: '{product_name}' -> '{normalized_name}'")
image_extensions = ['.png', '.jpg', '.jpeg', '.webp']
base_url = "https://amgocus.com/uploads/images/"
# Check for all possible extensions
for ext in image_extensions:
image_url = f"{base_url}{normalized_name}{ext}"
logger.info(f"[Image] Checking cPanel image URL: {image_url}")
# For cPanel URLs, assume they are accessible if they start with http
if image_url.startswith('http'):
logger.info(f"[Image] Found cPanel image URL: {image_url}")
return image_url
# Fallback: try original name with spaces as %20
safe_name = product_name.strip().replace(' ', '%20')
for ext in image_extensions:
image_url = f"{base_url}{safe_name}{ext}"
logger.info(f"[Image] Checking fallback cPanel image URL: {image_url}")
if image_url.startswith('http'):
logger.info(f"[Image] Found cPanel image URL (fallback): {image_url}")
return image_url
logger.warning(f"[Image] No cPanel image found for product: {product_name}")
return None
except Exception as e:
logger.error(f"[Image] Error generating cPanel image URL for {product_name}: {e}")
return None
def get_product_image_media_type(image_path: str) -> str:
"""
Determine the media type based on file extension
"""
if not image_path:
return None
ext = os.path.splitext(image_path)[1].lower()
media_type_map = {
'.jpg': 'image/jpeg',
'.jpeg': 'image/jpeg',
'.png': 'image/png',
'.webp': 'image/webp',
'.gif': 'image/gif'
}
return media_type_map.get(ext, 'image/jpeg')
async def send_product_with_image(from_number: str, product: Dict[str, Any], user_context: Dict[str, Any]):
"""
Send product information with image if available
"""
try:
product_name = product.get('Product Name', 'Unknown Product')
# Generate product response
response = generate_veterinary_product_response(product, user_context)
# Try to get product image
image_path = get_product_image_path(product_name)
if image_path and os.path.exists(image_path):
# Send product info with image
media_type = get_product_image_media_type(image_path)
filename = f"{product_name.replace(' ', '_')}.jpg"
success = send_whatsjet_message(
from_number,
response,
media_type=media_type,
media_path=image_path,
filename=filename
)
if success:
logger.info(f"[Product] Successfully sent product with image: {product_name}")
else:
# Fallback to text-only if image send fails
logger.warning(f"[Product] Failed to send image, sending text only: {product_name}")
send_whatsjet_message(from_number, response)
else:
# Send text-only response
send_whatsjet_message(from_number, response)
logger.info(f"[Product] Sent product info without image: {product_name}")
except Exception as e:
logger.error(f"[Product] Error sending product with image: {e}")
# Fallback to text-only
response = generate_veterinary_product_response(product, user_context)
send_whatsjet_message(from_number, response)
async def send_enhanced_pdf(from_number: str, product: Dict[str, Any], pdf_content: bytes = None):
"""
Send PDF with enhanced formatting and proper WhatsApp document sharing
"""
try:
product_name = product.get('Product Name', 'Unknown_Product')
safe_name = re.sub(r'[^\w\s-]', '', product_name).replace(' ', '_')
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{safe_name}_Product_Info_{timestamp}.pdf"
# Generate PDF if not provided
if pdf_content is None:
pdf_content = generate_veterinary_pdf(product)
# Save PDF to uploads directory
uploads_dir = "uploads"
os.makedirs(uploads_dir, exist_ok=True)
pdf_path = os.path.join(uploads_dir, filename)
with open(pdf_path, 'wb') as f:
f.write(pdf_content)
# Send PDF as document via WhatsApp
success = send_whatsjet_message(
from_number,
f"📄 *{product_name} - Detailed Product Information*\n\n"
f"📎 Here's the complete product information in PDF format.\n"
f"📋 Includes: Composition, Dosage, Precautions, Storage\n\n"
f"💬 Type 'main' to return to main menu.",
media_type="application/pdf",
media_path=pdf_path,
filename=filename
)
if success:
logger.info(f"[PDF] Successfully sent PDF for product: {product_name}")
else:
# Fallback: Send download link
server_url = os.getenv("SERVER_URL", "https://your-huggingface-space-url.hf.space")
download_url = f"{server_url}/uploads/{filename}"
message = (
f"📄 *{product_name} - Product Information*\n\n"
f"📎 [Download Product PDF]({download_url})\n\n"
f"💬 *Click the link above to download the detailed product information*\n"
f"Type 'main' to return to main menu."
)
send_whatsjet_message(from_number, message)
logger.info(f"[PDF] Sent PDF download link for product: {product_name}")
except Exception as e:
logger.error(f"[PDF] Error sending enhanced PDF: {e}")
# Fallback to basic text response
response = generate_veterinary_product_response(product, {})
send_whatsjet_message(from_number, response)
# Enhanced product response function with image support
def generate_veterinary_product_response_with_media(product_info: Dict[str, Any], user_context: Dict[str, Any]) -> Dict[str, Any]:
"""
Generate comprehensive veterinary product response with media information
Returns a dictionary with text response and media info
"""
def clean_text(text):
if pd.isna(text) or text is None:
return "Not specified"
return str(text).strip()
product_name = clean_text(product_info.get('Product Name', ''))
product_type = clean_text(product_info.get('Type', ''))
category = clean_text(product_info.get('Category', ''))
indications = clean_text(product_info.get('Indications', ''))
pdf_link = ""
try:
csv_data = pd.read_csv('Veterinary.csv')
product_row = csv_data[csv_data['Product Name'] == product_name]
if not product_row.empty:
brochure_link = product_row.iloc[0].get('Brochure (PDF)', '')
if pd.notna(brochure_link) and brochure_link.strip():
pdf_link = brochure_link.strip()
except Exception as e:
logger.warning(f"Error checking PDF link for {product_name}: {e}")
response_text = f"""🧪 *Name:* {product_name}\n📦 *Type:* {product_type}\n🏥 *Category:* {category}\n💊 *Used For:* {indications}"""
if pdf_link:
response_text += f"\n\n📄 Product Brochure Available\n🔗 {product_name} PDF:\n{pdf_link}"
response_text += f"""
\n💬 *Available Actions:*
1️⃣ Talk to Veterinary Consultant
2️⃣ Inquire About Availability
3️⃣ Back to Main Menu
\n💬 Select an option or ask about related products"""
image_path = get_product_image_path(product_name)
has_image = image_path is not None and os.path.exists(image_path)
return {
'text': response_text,
'has_image': has_image,
'image_path': image_path,
'product_name': product_name
}
def ensure_images_dir():
"""Ensure the images directory exists"""
images_dir = "static/images"
os.makedirs(images_dir, exist_ok=True)
logger.info(f"[Image] Ensured images directory exists: {images_dir}")
# New feature: Send product image with caption (product details)
async def send_product_image_with_caption(from_number: str, product: Dict[str, Any], user_context: Dict[str, Any]):
"""
Send product image (if available) with product details as caption in a single WhatsApp message.
Only uses cPanel images from https://amgocus.com/uploads/images/
If image is not available, send only the product details as text.
"""
ensure_images_dir()
product_name = product.get('Product Name', 'Unknown Product')
details = generate_veterinary_product_response(product, user_context)
logger.info(f"[Product] Processing cPanel image for product: {product_name}")
try:
# Get cPanel image URL for the product
image_url = get_product_image_path(product_name)
if image_url and image_url.startswith('http'):
logger.info(f"[Product] Found cPanel image URL: {image_url}")
# Test if the cPanel image URL is accessible
try:
logger.info(f"[Product] Testing cPanel image URL accessibility: {image_url}")
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'image/webp,image/apng,image/*,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
test_response = requests.head(image_url, headers=headers, timeout=10, allow_redirects=True)
if test_response.status_code != 200:
logger.warning(f"[Product] cPanel image URL not accessible (status {test_response.status_code}): {image_url}")
raise Exception(f"cPanel image URL not accessible: {test_response.status_code}")
logger.info(f"[Product] cPanel image URL is accessible")
except Exception as e:
logger.warning(f"[Product] Failed to test cPanel image URL {image_url}: {e}")
image_url = None
# Send image with caption using the correct WhatsJet API
if image_url:
logger.info(f"[Product] Attempting to send cPanel image with caption for: {product_name}")
# Use the correct WhatsJet media endpoint with caption
url = f"{WHATSJET_API_URL}/{WHATSJET_VENDOR_UID}/contact/send-media-message"
headers = {
"Authorization": f"Bearer {WHATSJET_API_TOKEN}",
"Content-Type": "application/json"
}
payload = {
"phone_number": from_number,
"media_type": "image",
"media_url": image_url,
"caption": details,
"file_name": f"{product_name.replace(' ', '_')}.jpg"
}
try:
logger.info(f"[Product] Sending image with caption using WhatsJet API: {payload}")
response = httpx.post(url, json=payload, headers=headers, timeout=30)
logger.info(f"[Product] WhatsJet response status: {response.status_code}")
logger.info(f"[Product] WhatsJet response body: {response.text[:500]}...")
if response.status_code == 200:
logger.info(f"[Product] Successfully sent cPanel image with caption for product: {product_name}")
return
else:
logger.warning(f"[Product] Failed to send image with caption, trying separate messages: {product_name}")
# Fallback to separate messages
image_success = send_whatsjet_media_image_only(from_number, image_url, f"{product_name.replace(' ', '_')}.jpg")
if image_success:
await asyncio.sleep(1)
send_whatsjet_message(from_number, details)
return
else:
logger.warning(f"[Product] Failed to send cPanel image, sending text only: {product_name}")
except Exception as e:
logger.error(f"[Product] Error sending image with caption: {e}")
# Fallback to separate messages
image_success = send_whatsjet_media_image_only(from_number, image_url, f"{product_name.replace(' ', '_')}.jpg")
if image_success:
await asyncio.sleep(1)
send_whatsjet_message(from_number, details)
return
else:
logger.warning(f"[Product] Failed to send cPanel image, sending text only: {product_name}")
# No cPanel image available, send text only
logger.info(f"[Product] No cPanel image available, sending text only for: {product_name}")
send_whatsjet_message(from_number, details)
except Exception as e:
logger.error(f"[Product] Error sending product image with caption: {e}")
logger.info(f"[Product] Falling back to text-only message for: {product_name}")
send_whatsjet_message(from_number, details)
# Test endpoint for product image with caption
@app.get("/test-product-image-with-caption")
async def test_product_image_with_caption(phone: str):
"""Test endpoint for sending product image with caption"""
try:
if products_df is None or products_df.empty:
return {"error": "No products loaded"}
# Get first product for testing
product = products_df.iloc[0].to_dict()
user_context = {}
await send_product_image_with_caption(phone, product, user_context)
return {
"success": True,
"message": f"Test product image sent to {phone}",
"product": product.get('Product Name', 'Unknown')
}
except Exception as e:
logger.error(f"Error in test product image with caption: {e}")
return {"error": str(e)}
# Test endpoint for image sending
@app.get("/test-image-sending")
async def test_image_sending(phone: str, image_url: str = "https://amgocus.com/uploads/images/respiraaidplus.png"):
"""Test endpoint for sending images via WhatsApp"""
try:
filename = "test_image.jpg"
success = send_whatsjet_message(
phone,
"🖼️ *Test Image*\n\nThis is a test image sent via WhatsApp API.",
media_type="image/jpeg",
media_path=image_url,
filename=filename
)
if success:
return {
"success": True,
"message": f"Test image sent successfully to {phone}",
"image_url": image_url
}
else:
return {
"success": False,
"message": f"Failed to send test image to {phone}",
"image_url": image_url
}
except Exception as e:
logger.error(f"Error in test image sending: {e}")
return {"error": str(e)}
# Debug endpoint for WhatsJet
@app.get("/debug-whatsjet")
async def debug_whatsjet():
"""Debug endpoint to check WhatsJet configuration"""
try:
config = {
"api_url": WHATSJET_API_URL,
"vendor_uid": WHATSJET_VENDOR_UID,
"api_token": "***" if WHATSJET_API_TOKEN else None,
"server_url": SERVER_URL,
"openai_key": "***" if OPENAI_API_KEY else None
}
return {
"status": "success",
"config": config,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"status": "error",
"error": str(e),
"timestamp": datetime.now().isoformat()
}
# Test endpoint for WhatsJet payloads
@app.get("/test-whatsjet-payloads")
async def test_whatsjet_payloads(phone: str):
"""Test endpoint to check WhatsJet payloads"""
try:
# Test basic message sending
test_message = "🧪 *WhatsJet Test*\n\nThis is a test message to verify WhatsJet integration."
success = send_whatsjet_message(phone, test_message)
return {
"status": "success" if success else "failed",
"message": f"WhatsJet test message sent to {phone}",
"success": success,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"status": "error",
"error": str(e),
"timestamp": datetime.now().isoformat()
}
# Test endpoint for cPanel image access
@app.get("/test-cpanel-image-access")
async def test_cpanel_image_access():
"""
Test endpoint to check if cPanel image URLs are now accessible with browser-like headers.
"""
try:
image_url = "https://amgocus.com/uploads/images/Respira%20Aid%20Plus.jpg"
# Test with browser-like headers
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'image/webp,image/apng,image/*,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1'
}
logger.info(f"[Test] Testing cPanel image URL with browser headers: {image_url}")
response = requests.get(image_url, headers=headers, timeout=10, stream=True, allow_redirects=True)
result = {
"image_url": image_url,
"status_code": response.status_code,
"headers": dict(response.headers),
"accessible": response.status_code == 200,
"timestamp": datetime.now().isoformat()
}
if response.status_code == 200:
logger.info(f"[Test] ✅ cPanel image URL is now accessible!")
else:
logger.warning(f"[Test] ❌ cPanel image URL still not accessible (status {response.status_code})")
return result
except Exception as e:
logger.error(f"[Test] Error testing cPanel image access: {e}")
return {
"error": str(e),
"image_url": image_url,
"timestamp": datetime.now().isoformat()
}
def format_number_with_emoji(number: int) -> str:
"""Format number with emoji"""
emoji_map = {
1: "1️⃣", 2: "2️⃣", 3: "3️⃣", 4: "4️⃣", 5: "5️⃣",
6: "6️⃣", 7: "7️⃣", 8: "8️⃣", 9: "9️⃣", 10: "🔟",
11: "1️⃣1️⃣", 12: "1️⃣2️⃣", 13: "1️⃣3️⃣", 14: "1️⃣4️⃣", 15: "1️⃣5️⃣",
16: "1️⃣6️⃣", 17: "1️⃣7️⃣", 18: "1️⃣8️⃣", 19: "1️⃣9️⃣", 20: "2️⃣0️⃣",
21: "2️⃣1️⃣", 22: "2️⃣2️⃣", 23: "2️⃣3️⃣"
}
return emoji_map.get(number, f"{number}.")
async def display_all_products(from_number: str):
"""Display all products in multiple messages and update menu context"""
try:
user_context = context_manager.get_context(from_number)
current_state = user_context.get('current_state', 'main_menu')
logger.info(f"[Display] display_all_products called for {from_number} in state: {current_state}")
if current_state == 'all_products_menu':
logger.warning(f"[Display] Already in all_products_menu state for {from_number}, skipping display")
return
if products_df is None or products_df.empty:
send_whatsjet_message(from_number, "❌ No products available at the moment.")
return
# Set state to all_products_menu and store menu context
products = products_df.to_dict('records')
context_manager.update_context(
from_number,
current_state='all_products_menu',
current_menu='all_products_menu',
current_menu_options=[p.get('Product Name', 'Unknown') for p in products],
available_products=products
)
logger.info(f"[Display] Set state to all_products_menu for {from_number}")
# Send products in chunks
chunk_size = 5
for i in range(0, len(products), chunk_size):
chunk = products[i:i + chunk_size]
message = f"📋 *Products List ({i+1}-{min(i+chunk_size, len(products))} of {len(products)})*\n\n"
for j, product in enumerate(chunk, i+1):
message += f"{format_number_with_emoji(j)} {product.get('Product Name', 'Unknown')}\n"
if product.get('Category'):
message += f" Category: {product.get('Category')}\n"
message += "\n"
send_whatsjet_message(from_number, message)
send_whatsjet_message(from_number,
"💬 Type a product name to get detailed information, or type 'main' to return to main menu.")
except Exception as e:
logger.error(f"[Display] Error displaying products: {e}")
send_whatsjet_message(from_number, "❌ Error displaying products. Please try again.")
def get_all_categories():
"""Return a list of all unique categories from the products DataFrame"""
if products_df is not None and not products_df.empty:
return list(products_df['Category'].unique())
return []
def get_products_by_category(category: str):
"""Get products by category"""
if products_df is None or products_df.empty:
return []
category_products = products_df[products_df['Category'] == category]
return category_products.to_dict('records')
async def handle_category_selection(selection: str, from_number: str):
"""Handle category selection"""
try:
user_context = context_manager.get_context(from_number)
available_categories = user_context.get('available_categories', [])
if selection.isdigit() and 1 <= int(selection) <= len(available_categories):
selected_category = available_categories[int(selection) - 1]
products = get_products_by_category(selected_category)
if products:
# Update context with category products
context_manager.update_context(
from_number,
current_category=selected_category,
current_state='category_products_menu',
current_menu='category_products_menu',
current_menu_options=[p.get('Product Name', 'Unknown') for p in products],
available_products=products
)
# Send category products
message = f"📦 *Products in {selected_category}*\n\n"
for i, product in enumerate(products, 1):
message += f"{format_number_with_emoji(i)} {product.get('Product Name', 'Unknown')}\n"
if product.get('Target Species'):
message += f" Target: {product.get('Target Species')}\n"
message += "\n"
message += "💬 Select a product number or type 'main' to return to main menu."
send_whatsjet_message(from_number, message)
else:
send_whatsjet_message(from_number, f"❌ No products found in {selected_category} category.")
else:
send_whatsjet_message(from_number, "❌ Invalid selection. Please choose a valid category number.")
except Exception as e:
logger.error(f"[Category] Error handling category selection: {e}")
send_helpful_guidance(from_number, 'category_selection_menu')
def get_menu_validation_message(current_state: str, user_context: dict) -> str:
"""Get appropriate validation message for current menu state"""
if current_state == 'main_menu':
return (
"❌ *Invalid Selection*\n\n"
"Please choose from the main menu:\n"
"1️⃣ Search Veterinary Products\n"
"2️⃣ Browse Categories\n"
"3️⃣ Download Catalog\n"
"4️⃣ Chat with Veterinary AI Assistant\n\n"
"💬 *You can also:*\n"
"• Type a product name (e.g., 'hydropex', 'respira aid plus')\n"
"• Type 'main' to refresh the menu"
)
elif current_state == 'all_products_menu':
if products_df is not None and not products_df.empty:
total_products = len(products_df)
return (
f"❌ *Invalid Product Selection*\n\n"
f"Please choose a product number between 1 and {total_products}.\n\n"
"💬 *You can also:*\n"
"• Type a product name (e.g., 'hydropex', 'respira aid plus')\n"
"• Type 'main' to return to main menu"
)
else:
return "❌ No products available. Type 'main' to return to main menu."
elif current_state == 'category_products_menu':
available_products = user_context.get('available_products', [])
if available_products:
return (
f"❌ *Invalid Product Selection*\n\n"
f"Please choose a product number between 1 and {len(available_products)}.\n\n"
"💬 *You can also:*\n"
"• Type a product name (e.g., 'hydropex', 'respira aid plus')\n"
"• Type 'main' to return to main menu"
)
else:
return "❌ No products available in this category. Type 'main' to return to main menu."
elif current_state == 'category_selection_menu':
available_categories = user_context.get('available_categories', [])
if available_categories:
return (
f"❌ *Invalid Category Selection*\n\n"
f"Please choose a category number between 1 and {len(available_categories)}.\n\n"
"💬 *You can also:*\n"
"• Type a product name (e.g., 'hydropex', 'respira aid plus')\n"
"• Type 'main' to return to main menu"
)
else:
return "❌ No categories available. Type 'main' to return to main menu."
elif current_state == 'product_inquiry':
return (
"❌ *Invalid Selection*\n\n"
"Please choose an option:\n"
"1️⃣ Talk to Veterinary Consultant\n"
"2️⃣ Inquire About Availability\n"
"3️⃣ Back to Main Menu\n\n"
"💬 *You can also:*\n"
"• Type a product name (e.g., 'hydropex', 'respira aid plus')\n"
"• Type 'main' to return to main menu"
)
elif current_state == 'intelligent_products_menu':
return (
"❌ *Invalid Selection*\n\n"
"Please choose a product number between 1 and {len(available_products)}.\n\n"
"💬 *You can also:*\n"
"• Type a product name (e.g., 'hydropex', 'respira aid plus')\n"
"• Type 'main' to return to main menu"
)
else:
return (
"❌ *Invalid Selection*\n\n"
"Please choose a valid option or type 'main' to return to main menu.\n\n"
"💬 *You can also:*\n"
"• Type a product name (e.g., 'hydropex', 'respira aid plus')"
)
def is_valid_menu_selection(selection: str, current_state: str, user_context: dict) -> bool:
"""Check if selection is valid for current menu state"""
is_valid, _ = validate_menu_selection(selection, current_state, user_context)
return is_valid
def generate_veterinary_welcome_message(phone_number=None, user_context=None):
"""Generate veterinary welcome message"""
return (
"🏥 Welcome to Apex Biotical Solutions Veterinary Virtual Assistant\n\n"
"How can I help you today?\n\n"
"📋 Main Menu:\n"
"1️⃣ Complete Products List\n"
"2️⃣ Browse Categories\n"
"3️⃣ Download Catalog\n"
"4️⃣ Chat with Veterinary AI Assistant\n\n"
"💬 Quick Actions:\n"
"* Type a product name (e.g., 'hydropex', 'respira aid plus')\n"
"* Ask about symptoms (e.g., 'respiratory problems', 'liver support')\n"
"* Search by category (e.g., 'antibiotics', 'vitamins')\n\n"
"🎤 Voice messages are supported!\n"
"You can speak product names, menu numbers, or ask questions."
)
async def handle_veterinary_product_followup(selection: str, from_number: str) -> None:
"""
Handle product follow-up selections with enhanced veterinary domain support
"""
try:
user_context = context_manager.get_context(from_number)
current_product = user_context.get('current_product')
if not current_product:
send_whatsjet_message(from_number, "❌ No product selected. Please search for a product first.")
return
if selection == '1':
# Talk to Veterinary Consultant
product_name = current_product.get('Product Name', 'the selected product')
consultant_msg = (
f"📞 Contact Veterinary Consultant\n\n"
f"Product: {product_name}\n\n"
"Please provide your details:\n"
"* Name and location\n"
"* Specific inquiry\n\n"
"💬 Example: Dr. Ali - Multan - Need consultation for respiratory problems\n\n"
"Type main at any time to go to main menu."
)
send_whatsjet_message(from_number, consultant_msg)
context_manager.update_context(
from_number,
current_state='contact_request',
current_menu='contact_request',
current_menu_options=['Provide contact details']
)
elif selection == '2':
# Inquire about Product Availability
await handle_availability_inquiry(from_number, user_context)
elif selection == '3':
welcome_msg = generate_veterinary_welcome_message(from_number, user_context)
send_whatsjet_message(from_number, welcome_msg)
context_manager.update_context(
from_number,
current_state='main_menu',
current_menu='main_menu',
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values())
)
return
else:
send_whatsjet_message(from_number, "❌ Invalid selection. Please choose 1, 2, or 3.")
return
except Exception as e:
logger.error(f"Error in product follow-up: {e}")
user_context = context_manager.get_context(from_number)
welcome_msg = generate_veterinary_welcome_message(from_number, user_context)
send_whatsjet_message(from_number, welcome_msg)
context_manager.update_context(
from_number,
current_state='main_menu',
current_menu='main_menu',
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values())
)
# Add or update the following functions in app.py:
# --- Restore handle_voice_message_complete ---
async def handle_voice_message_complete(from_number: str, msg: dict):
"""Complete voice message processing with OpenAI transcription - treats voice exactly like text"""
try:
logger.info(f"[Voice] Processing voice message from {from_number}")
logger.info(f"[Voice] Message structure: {msg}")
# Check if OpenAI is available
if not OPENAI_API_KEY:
send_whatsjet_message(from_number,
"🎤 Voice messages require OpenAI API. Please send a text message or type 'main' to see the menu.")
return
# Extract media URL from different possible locations
media_url = None
logger.info(f"[Voice] Checking media URL locations...")
if msg.get('media', {}).get('link'):
media_url = msg.get('media', {}).get('link')
logger.info(f"[Voice] Found media URL in media.link: {media_url}")
elif msg.get('media', {}).get('url'):
media_url = msg.get('media', {}).get('url')
logger.info(f"[Voice] Found media URL in media.url: {media_url}")
elif msg.get('url'):
media_url = msg.get('url')
logger.info(f"[Voice] Found media URL in url: {media_url}")
elif msg.get('audio', {}).get('url'):
media_url = msg.get('audio', {}).get('url')
logger.info(f"[Voice] Found media URL in audio.url: {media_url}")
else:
logger.error(f"[Voice] No media URL found in message structure")
logger.error(f"[Voice] Available fields: {list(msg.keys())}")
if 'media' in msg:
logger.error(f"[Voice] Media fields: {list(msg['media'].keys())}")
logger.info(f"[Voice] Final extracted media URL: {media_url}")
if not media_url:
send_whatsjet_message(from_number, "❌ Could not process voice message. Please try again.")
return
# Generate unique filename
filename = f"voice_{from_number}_{int(time.time())}.ogg"
# Download voice file
file_path = await download_voice_file(media_url, filename)
if not file_path:
send_whatsjet_message(from_number, "❌ Failed to download voice message. Please try again.")
return
# Transcribe with OpenAI
transcribed_text = await transcribe_voice_with_openai(file_path)
# Clean up voice file immediately
try:
os.remove(file_path)
except:
pass
# Handle empty, failed, or unclear transcription
if not transcribed_text or transcribed_text.strip() == "" or transcribed_text.lower() == "unclear audio":
logger.warning(f"[Voice] Empty or unclear transcription for {from_number}: '{transcribed_text}'")
send_whatsjet_message(from_number,
"🎤 *Voice Message Issue*\n\n"
"I couldn't understand your voice message clearly. This can happen due to:\n"
"• Very short voice note\n"
"• Background noise\n"
"• Microphone too far away\n"
"• Audio quality issues\n"
"• Speaking too fast\n\n"
"💡 *Tips for better voice notes:*\n"
"• Speak clearly and slowly\n"
"• Keep phone close to mouth\n"
"• Record in quiet environment\n"
"• Make voice note at least 2-3 seconds\n"
"• Speak in English or Urdu only\n\n"
"💬 *You can also:*\n"
"• Send a text message\n"
"• Type 'main' to see menu options\n"
"• Try voice note again")
return
# Process transcribed text with full intelligence
logger.info(f"[Voice] Transcribed: {transcribed_text}")
# Apply transcription error corrections
corrected_text = process_voice_input(transcribed_text)
if corrected_text != transcribed_text:
logger.info(f"[Voice] Applied corrections: '{transcribed_text}' -> '{corrected_text}'")
transcribed_text = corrected_text
# Detect language of transcribed text - STRICTLY ENGLISH OR URDU ONLY
detected_lang = 'en' # Default to English
try:
detected_lang = detect(transcribed_text)
logger.info(f"[Voice] Raw detected language: {detected_lang}")
# STRICTLY ENGLISH OR URDU ONLY - NO OTHER LANGUAGES
# Only allow English and Urdu, reject everything else
if detected_lang in ['en', 'ur']:
reply_language = detected_lang
else:
# Force any other language to English
reply_language = 'en'
logger.warning(f"[Voice] Detected language '{detected_lang}' is not English or Urdu, forcing to English")
# Check if text contains Urdu/Arabic characters or Islamic greetings
urdu_arabic_pattern = re.compile(r'[\u0600-\u06FF\u0750-\u077F\u08A0-\u08FF\uFB50-\uFDFF\uFE70-\uFEFF]')
islamic_greetings = ['assalamu', 'assalam', 'salam', 'salaam', 'adaab', 'namaste', 'khuda', 'allah']
has_urdu_chars = bool(urdu_arabic_pattern.search(transcribed_text))
has_islamic_greeting = any(greeting in transcribed_text.lower() for greeting in islamic_greetings)
if has_urdu_chars or has_islamic_greeting:
detected_lang = 'ur'
reply_language = 'ur'
logger.info(f"[Voice] Overriding language detection to Urdu due to Arabic/Urdu characters or Islamic greeting")
logger.info(f"[Voice] Final language set to: {reply_language}")
except Exception as e:
logger.warning(f"[Voice] Language detection failed: {e}, defaulting to English")
reply_language = 'en'
# For Urdu voice notes, translate to English for processing
processing_text = transcribed_text
if reply_language == 'ur' and detected_lang == 'ur':
try:
logger.info(f"[Voice] Translating Urdu voice note to English for processing")
translated_text = GoogleTranslator(source='ur', target='en').translate(transcribed_text)
processing_text = translated_text
logger.info(f"[Voice] Translated to English: {translated_text}")
except Exception as e:
logger.error(f"[Voice] Translation failed: {e}")
# If translation fails, use original text
processing_text = transcribed_text
# Determine reply language - always respond in English or Urdu
if detected_lang == 'ur':
reply_language = 'ur' # Urdu voice notes get Urdu replies
else:
reply_language = 'en' # All other languages get English replies
logger.info(f"[Voice] Processing text: {processing_text}")
logger.info(f"[Voice] Reply language set to: {reply_language}")
# Check if this is a greeting in voice note (check both original and translated)
if is_greeting(transcribed_text) or is_greeting(processing_text):
logger.info(f"[Voice] Greeting detected in voice note: {transcribed_text}")
# Check if user is currently in AI chat mode - if so, don't trigger menu mode
user_context = context_manager.get_context(from_number)
current_state = user_context.get('current_state', 'main_menu')
if current_state == 'ai_chat_mode':
logger.info(f"[Voice] User is in AI chat mode, treating greeting as AI query instead of menu trigger")
# Treat greeting as a general query in AI chat mode
await handle_general_query_with_ai(from_number, processing_text, user_context, reply_language)
return
else:
# Only trigger menu mode if not in AI chat mode
welcome_msg = generate_veterinary_welcome_message(from_number, user_context)
send_whatsjet_message(from_number, welcome_msg)
context_manager.update_context(from_number, current_state='main_menu', current_menu='main_menu', current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values()))
return
# Process the translated text using the same strict state-based logic as text messages
# This ensures voice messages follow the same menu and state rules as text messages
await process_incoming_message(from_number, {
'body': processing_text, # Use translated text for processing
'type': 'text',
'reply_language': reply_language,
'original_transcription': transcribed_text # Keep original for context
})
except Exception as e:
logger.error(f"[Voice] Error processing voice message: {e}")
logger.error(f"[Voice] Full error details: {str(e)}")
import traceback
logger.error(f"[Voice] Traceback: {traceback.format_exc()}")
send_whatsjet_message(from_number,
"❌ Error processing voice message. Please try a text message.")
# Test endpoint for WhatsJet media format debugging
@app.get("/test-whatsjet-media-formats")
async def test_whatsjet_media_formats(phone: str):
"""Test endpoint to debug WhatsJet media message formats"""
try:
test_image_url = "https://amgocus.com/uploads/images/respiraaidplus.png"
test_message = "🧪 *Media Format Test*\n\nTesting different WhatsJet media payload formats."
# Test different payload formats
formats = [
{
"name": "Format 1 - caption",
"payload": {
"phone_number": phone,
"caption": test_message,
"media_type": "image/png",
"media_url": test_image_url,
"media_filename": "test.png"
}
},
{
"name": "Format 2 - message_body",
"payload": {
"phone_number": phone,
"message_body": test_message,
"media_type": "image/png",
"media_url": test_image_url,
"media_filename": "test.png"
}
},
{
"name": "Format 3 - simplified",
"payload": {
"phone_number": phone,
"message_body": test_message,
"media_type": "image/png",
"media_url": test_image_url
}
},
{
"name": "Format 4 - different fields",
"payload": {
"phone_number": phone,
"caption": test_message,
"type": "image/png",
"url": test_image_url
}
}
]
results = []
url = f"{WHATSJET_API_URL}/{WHATSJET_VENDOR_UID}/contact/send-message?token={WHATSJET_API_TOKEN}"
for format_info in formats:
try:
logger.info(f"[Test] Trying {format_info['name']}: {format_info['payload']}")
response = httpx.post(url, json=format_info['payload'], timeout=15)
result = {
"format": format_info['name'],
"status_code": response.status_code,
"success": response.status_code == 200,
"response_text": response.text[:500] if response.text else "No response text"
}
results.append(result)
if response.status_code == 200:
logger.info(f"[Test] ✅ {format_info['name']} succeeded!")
else:
logger.warning(f"[Test] ❌ {format_info['name']} failed: {response.status_code}")
except Exception as e:
result = {
"format": format_info['name'],
"status_code": "Exception",
"success": False,
"response_text": str(e)
}
results.append(result)
logger.error(f"[Test] Exception with {format_info['name']}: {e}")
return {
"status": "completed",
"phone": phone,
"image_url": test_image_url,
"results": results,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error in test WhatsJet media formats: {e}")
return {"error": str(e)}
# Test endpoint for product image URL accessibility
@app.get("/test-product-image-url")
async def test_product_image_url(product_name: str = "Respira Aid Plus"):
"""Test endpoint to check if product image URL is accessible"""
try:
image_path = get_product_image_path(product_name)
if not image_path:
return {
"product_name": product_name,
"image_path": None,
"accessible": False,
"error": "No image path found"
}
# Test if the URL is accessible
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept': 'image/webp,image/apng,image/*,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9',
'Accept-Encoding': 'gzip, deflate, br',
'Connection': 'keep-alive'
}
response = requests.get(image_path, headers=headers, timeout=10, stream=True)
result = {
"product_name": product_name,
"image_path": image_path,
"status_code": response.status_code,
"accessible": response.status_code == 200,
"content_type": response.headers.get('content-type', 'unknown'),
"content_length": response.headers.get('content-length', 'unknown'),
"headers": dict(response.headers)
}
if response.status_code == 200:
logger.info(f"[Test] ✅ Product image URL is accessible: {image_path}")
else:
logger.warning(f"[Test] ❌ Product image URL not accessible: {image_path} (status: {response.status_code})")
return result
except Exception as e:
return {
"product_name": product_name,
"image_path": image_path,
"accessible": False,
"error": str(e)
}
except Exception as e:
logger.error(f"Error testing product image URL: {e}")
return {"error": str(e)}
# Test endpoint for send_product_image_with_caption function
@app.get("/test-send-product-image")
async def test_send_product_image(phone: str, product_name: str = "Bromacid"):
"""
Test endpoint to test the send_product_image_with_caption function with a specific product.
"""
try:
# Load product from CSV
df = pd.read_csv('Veterinary.csv')
row = df[df['Product Name'].str.lower() == product_name.lower()]
if row.empty:
return {"error": f"Product '{product_name}' not found in CSV"}
product = row.iloc[0].to_dict()
user_context = context_manager.get_context(phone)
logger.info(f"[Test] Testing send_product_image_with_caption for product: {product_name}")
await send_product_image_with_caption(phone, product, user_context)
return {
"status": "sent",
"phone": phone,
"product": product_name,
"message": f"Product image with caption sent for {product_name}"
}
except Exception as e:
logger.error(f"[Test] Error testing send_product_image_with_caption: {e}")
return {"error": str(e)}
async def handle_intelligent_product_inquiry(from_number: str, query: str, user_context: dict, reply_language: str = 'en'):
"""Handle product inquiry with OpenAI intelligence and media support, matching WhatsApp screenshot logic"""
try:
products = get_veterinary_product_matches(query)
if products:
if len(products) > 1:
message = f"Certainly, here are the relevant products for your query:\n\n"
for i, product in enumerate(products, 1):
product_name = product.get('Product Name', 'Unknown')
category = product.get('Category', '')
short_desc = product.get('Type', '') or product.get('Indications', '')
message += f"{i}. {product_name}"
if category:
message += f" - {category}"
if short_desc:
message += f" / {short_desc}"
message += "\n"
message += (f"\nTo view detailed information about any product, reply with its number (1-{len(products)})\n"
"Type 'main' to return to the main menu")
send_whatsjet_message(from_number, message)
if context_manager:
context_manager.update_context(
from_number,
current_state='intelligent_products_menu',
current_menu='intelligent_products_menu',
current_menu_options=[str(i) for i in range(1, len(products)+1)],
available_products=products
)
return
else:
selected_product = products[0]
if context_manager:
context_manager.update_context(
from_number,
current_product=selected_product,
current_state='product_inquiry',
current_menu='product_inquiry',
current_menu_options=list(MENU_CONFIG['product_inquiry']['option_descriptions'].values())
)
# The actual sending of product details should be handled by the caller
return selected_product
else:
# Simple one-liner for wrong queries
send_whatsjet_message(from_number, "❌ Please correct your question or type 'main' to go to main menu.")
except Exception as e:
logger.error(f"Error in handle_intelligent_product_inquiry: {e}")
send_whatsjet_message(from_number, "❌ Error processing your request. Type 'main' to return to the main menu.")
async def handle_contact_request(from_number: str):
"""Handle contact request"""
try:
message = (
"📞 *Contact Information*\n\n"
"Please provide your details:\n"
"• Name and location\n"
"• Phone number\n"
"• Specific inquiry\n\n"
"💬 *Example:* Dr. Ali - Multan - Need consultation for liver disease\n\n"
"💬 *Type 'main' to return to the main menu.*"
)
send_whatsjet_message(from_number, message)
context_manager.update_context(
from_number,
current_state='contact_request',
current_menu='contact_request',
current_menu_options=['Provide contact details']
)
except Exception as e:
logger.error(f"[Contact] Error handling contact request: {e}")
# Instead of sending a generic error, return to main menu
welcome_msg = generate_veterinary_welcome_message()
send_whatsjet_message(from_number, welcome_msg)
context_manager.update_context(
from_number,
current_state='main_menu',
current_menu='main_menu',
current_menu_options=list(MENU_CONFIG['main_menu']['option_descriptions'].values())
)
# Helper for restoring English terms in translations
def restore_english_terms(translated_text, original_text, product_names, category_names):
"""
Restore English terms (company names, product names, technical terms) in translated text.
This ensures that proper nouns and brand names remain in English even in Urdu responses.
"""
# Add common technical terms that should remain in English
technical_terms = [
'EC-Immune', 'Hydropex', 'Heposel', 'Respira Aid Plus', 'Bromacid',
'mode of action', 'dosage', 'administration', 'composition',
'veterinary', 'pharmaceutical', 'supplement', 'antibiotic'
]
# Combine all terms that should remain in English
all_english_terms = product_names + category_names + technical_terms
# Process each term
for term in all_english_terms:
if term and term.strip():
# Handle case variations
term_lower = term.lower()
translated_lower = translated_text.lower()
# If the term exists in translated text but not in original, restore it
if term_lower in translated_lower:
# Find the actual case in the translated text and replace with original
import re
pattern = re.compile(re.escape(term), re.IGNORECASE)
translated_text = pattern.sub(term, translated_text)
return translated_text
if __name__ == "__main__":
# Load products data on startup
load_products_data()
# Launch FastAPI app
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)