Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
@@ -2561,8 +2561,8 @@ load_products_data()
|
|
2561 |
|
2562 |
def get_product_image_path(product_name: str) -> str:
|
2563 |
"""
|
2564 |
-
Get the
|
2565 |
-
cPanel public URL format: https://amgocus.com/uploads/images/<normalized_name>.<ext>
|
2566 |
Normalized: lowercase, remove spaces/underscores/dots, preserve dashes.
|
2567 |
"""
|
2568 |
try:
|
@@ -2572,37 +2572,29 @@ def get_product_image_path(product_name: str) -> str:
|
|
2572 |
logger.info(f"[Image] Normalized product name: '{product_name}' -> '{normalized_name}'")
|
2573 |
image_extensions = ['.png', '.jpg', '.jpeg', '.webp']
|
2574 |
base_url = "https://amgocus.com/uploads/images/"
|
2575 |
-
|
|
|
2576 |
for ext in image_extensions:
|
2577 |
image_url = f"{base_url}{normalized_name}{ext}"
|
2578 |
-
logger.info(f"[Image] Checking
|
2579 |
-
# For
|
2580 |
if image_url.startswith('http'):
|
2581 |
-
|
2582 |
-
if product_name.lower().replace(' ', '').replace('_', '').replace('.', '') == 'b-gaspro-c':
|
2583 |
-
logger.info(f"[Image] Special debug: Checking for B-G Aspro-C at {image_url}")
|
2584 |
-
# If you want to verify with requests, uncomment below:
|
2585 |
-
# try:
|
2586 |
-
# resp = requests.get(image_url, timeout=5, stream=True)
|
2587 |
-
# if resp.status_code == 200:
|
2588 |
-
# logger.info(f"[Image] Found public image URL: {image_url}")
|
2589 |
-
# return image_url
|
2590 |
-
# except Exception as e:
|
2591 |
-
# logger.warning(f"[Image] Error checking image URL {image_url}: {e}")
|
2592 |
-
logger.info(f"[Image] Found public image URL: {image_url}")
|
2593 |
return image_url
|
|
|
2594 |
# Fallback: try original name with spaces as %20
|
2595 |
safe_name = product_name.strip().replace(' ', '%20')
|
2596 |
for ext in image_extensions:
|
2597 |
image_url = f"{base_url}{safe_name}{ext}"
|
2598 |
-
logger.info(f"[Image] Checking fallback image URL: {image_url}")
|
2599 |
if image_url.startswith('http'):
|
2600 |
-
logger.info(f"[Image] Found
|
2601 |
return image_url
|
2602 |
-
|
|
|
2603 |
return None
|
2604 |
except Exception as e:
|
2605 |
-
logger.error(f"[Image] Error generating
|
2606 |
return None
|
2607 |
|
2608 |
def get_product_image_media_type(image_path: str) -> str:
|
@@ -2782,44 +2774,25 @@ def ensure_images_dir():
|
|
2782 |
async def send_product_image_with_caption(from_number: str, product: Dict[str, Any], user_context: Dict[str, Any]):
|
2783 |
"""
|
2784 |
Send product image (if available) with product details as caption in a single WhatsApp message.
|
|
|
2785 |
If image is not available, send only the product details as text.
|
2786 |
-
Now supports 'Images' column in CSV (Google Drive or direct links).
|
2787 |
"""
|
2788 |
ensure_images_dir()
|
2789 |
product_name = product.get('Product Name', 'Unknown Product')
|
2790 |
details = generate_veterinary_product_response(product, user_context)
|
2791 |
-
image_url = product.get('Images', '').strip() if 'Images' in product else ''
|
2792 |
-
|
2793 |
-
# Force image URL for Respira Aid Plus (use cPanel public URL)
|
2794 |
-
if product_name.lower().strip() == "respira aid plus":
|
2795 |
-
image_url = "https://amgocus.com/uploads/images/Respira%20Aid%20Plus.jpg"
|
2796 |
|
2797 |
-
logger.info(f"[Product] Processing image for product: {product_name}")
|
2798 |
-
logger.info(f"[Product] Image URL from CSV: {image_url}")
|
2799 |
|
2800 |
try:
|
2801 |
-
#
|
2802 |
-
|
2803 |
-
|
2804 |
-
|
2805 |
-
|
2806 |
-
if '/d/' in image_url:
|
2807 |
-
file_id = image_url.split('/d/')[1].split('/')[0]
|
2808 |
-
elif 'id=' in image_url:
|
2809 |
-
file_id = image_url.split('id=')[1].split('&')[0]
|
2810 |
-
else:
|
2811 |
-
file_id = ''
|
2812 |
-
if file_id:
|
2813 |
-
image_url = f"https://drive.google.com/uc?export=download&id={file_id}"
|
2814 |
-
logger.info(f"[Product] Converted to direct download URL: {image_url}")
|
2815 |
-
|
2816 |
-
# Use the public URL directly for WhatsApp API
|
2817 |
-
media_type = 'image/jpeg'
|
2818 |
-
filename = f"{product_name.replace(' ', '_')}.jpg"
|
2819 |
|
2820 |
-
# Test the image URL
|
2821 |
try:
|
2822 |
-
logger.info(f"[Product] Testing image URL accessibility: {image_url}")
|
2823 |
headers = {
|
2824 |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
2825 |
'Accept': 'image/webp,image/apng,image/*,*/*;q=0.8',
|
@@ -2830,81 +2803,36 @@ async def send_product_image_with_caption(from_number: str, product: Dict[str, A
|
|
2830 |
}
|
2831 |
test_response = requests.head(image_url, headers=headers, timeout=10, allow_redirects=True)
|
2832 |
if test_response.status_code != 200:
|
2833 |
-
logger.warning(f"[Product]
|
2834 |
-
raise Exception(f"
|
2835 |
-
logger.info(f"[Product]
|
2836 |
except Exception as e:
|
2837 |
-
logger.warning(f"[Product] Failed to test image URL {image_url}: {e}")
|
2838 |
image_url = None
|
2839 |
|
2840 |
-
# Send using
|
2841 |
if image_url:
|
2842 |
-
logger.info(f"[Product] Attempting to send image
|
|
|
|
|
|
|
2843 |
success = send_whatsjet_message(
|
2844 |
from_number,
|
2845 |
details,
|
2846 |
media_type=media_type,
|
2847 |
-
media_path=image_url, # Use
|
2848 |
filename=filename
|
2849 |
)
|
2850 |
|
2851 |
if success:
|
2852 |
-
logger.info(f"[Product] Successfully sent image
|
2853 |
return
|
2854 |
else:
|
2855 |
-
logger.warning(f"[Product] Failed to send image
|
2856 |
-
|
2857 |
-
# Fallback 1: Try with a known public test image
|
2858 |
-
logger.info(f"[Product] Trying public test image for: {product_name}")
|
2859 |
-
test_image_url = "https://www.w3schools.com/w3images/lights.jpg"
|
2860 |
-
media_type = 'image/jpeg'
|
2861 |
-
filename = f"{product_name.replace(' ', '_')}.jpg"
|
2862 |
-
|
2863 |
-
success = send_whatsjet_message(
|
2864 |
-
from_number,
|
2865 |
-
details,
|
2866 |
-
media_type=media_type,
|
2867 |
-
media_path=test_image_url,
|
2868 |
-
filename=filename
|
2869 |
-
)
|
2870 |
-
|
2871 |
-
if success:
|
2872 |
-
logger.info(f"[Product] Successfully sent test image with caption for product: {product_name}")
|
2873 |
-
return
|
2874 |
|
2875 |
-
#
|
2876 |
-
logger.info(f"[Product]
|
2877 |
-
|
2878 |
-
if image_path and (image_path.startswith('http') or os.path.exists(image_path)):
|
2879 |
-
media_type = get_product_image_media_type(image_path)
|
2880 |
-
filename = f"{product_name.replace(' ', '_')}.jpg"
|
2881 |
-
|
2882 |
-
# If it's already a public URL, use it directly
|
2883 |
-
if image_path.startswith('http'):
|
2884 |
-
media_path = image_path
|
2885 |
-
logger.info(f"[Product] Using existing public URL: {media_path}")
|
2886 |
-
else:
|
2887 |
-
# Convert local path to public URL
|
2888 |
-
media_path = f"https://dreamstream-1-chatbot.hf.space/uploads/{os.path.basename(image_path).replace(' ', '%20')}"
|
2889 |
-
logger.info(f"[Product] Converted local path to public URL: {media_path}")
|
2890 |
-
|
2891 |
-
success = send_whatsjet_message(
|
2892 |
-
from_number,
|
2893 |
-
details,
|
2894 |
-
media_type=media_type,
|
2895 |
-
media_path=media_path, # Use public URL
|
2896 |
-
filename=filename
|
2897 |
-
)
|
2898 |
-
|
2899 |
-
if success:
|
2900 |
-
logger.info(f"[Product] Successfully sent image with caption for product: {product_name}")
|
2901 |
-
else:
|
2902 |
-
logger.warning(f"[Product] Failed to send image, sending text only: {product_name}")
|
2903 |
-
send_whatsjet_message(from_number, details)
|
2904 |
-
else:
|
2905 |
-
# No image available, send text only
|
2906 |
-
logger.info(f"[Product] No image available, sending text only for: {product_name}")
|
2907 |
-
send_whatsjet_message(from_number, details)
|
2908 |
|
2909 |
except Exception as e:
|
2910 |
logger.error(f"[Product] Error sending product image with caption: {e}")
|
@@ -2937,7 +2865,7 @@ async def test_product_image_with_caption(phone: str):
|
|
2937 |
|
2938 |
# Test endpoint for image sending
|
2939 |
@app.get("/test-image-sending")
|
2940 |
-
async def test_image_sending(phone: str, image_url: str = "https://
|
2941 |
"""Test endpoint for sending images via WhatsApp"""
|
2942 |
try:
|
2943 |
filename = "test_image.jpg"
|
@@ -3055,13 +2983,6 @@ async def test_cpanel_image_access():
|
|
3055 |
"timestamp": datetime.now().isoformat()
|
3056 |
}
|
3057 |
|
3058 |
-
def convert_drive_link(link: str) -> str:
|
3059 |
-
"""Convert Google Drive link to direct download link"""
|
3060 |
-
if 'drive.google.com' in link:
|
3061 |
-
file_id = link.split('/')[-2] if '/d/' in link else link.split('/')[-1]
|
3062 |
-
return f"https://drive.google.com/uc?export=download&id={file_id}"
|
3063 |
-
return link
|
3064 |
-
|
3065 |
def format_number_with_emoji(number: int) -> str:
|
3066 |
"""Format number with emoji"""
|
3067 |
emoji_map = {
|
@@ -3663,4 +3584,4 @@ if __name__ == "__main__":
|
|
3663 |
|
3664 |
# Launch FastAPI app
|
3665 |
import uvicorn
|
3666 |
-
uvicorn.run(app, host="0.0.0.0", port=7860)
|
|
|
2561 |
|
2562 |
def get_product_image_path(product_name: str) -> str:
|
2563 |
"""
|
2564 |
+
Get the cPanel image URL for a product based on its name.
|
2565 |
+
Only uses cPanel public URL format: https://amgocus.com/uploads/images/<normalized_name>.<ext>
|
2566 |
Normalized: lowercase, remove spaces/underscores/dots, preserve dashes.
|
2567 |
"""
|
2568 |
try:
|
|
|
2572 |
logger.info(f"[Image] Normalized product name: '{product_name}' -> '{normalized_name}'")
|
2573 |
image_extensions = ['.png', '.jpg', '.jpeg', '.webp']
|
2574 |
base_url = "https://amgocus.com/uploads/images/"
|
2575 |
+
|
2576 |
+
# Check for all possible extensions
|
2577 |
for ext in image_extensions:
|
2578 |
image_url = f"{base_url}{normalized_name}{ext}"
|
2579 |
+
logger.info(f"[Image] Checking cPanel image URL: {image_url}")
|
2580 |
+
# For cPanel URLs, assume they are accessible if they start with http
|
2581 |
if image_url.startswith('http'):
|
2582 |
+
logger.info(f"[Image] Found cPanel image URL: {image_url}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
2583 |
return image_url
|
2584 |
+
|
2585 |
# Fallback: try original name with spaces as %20
|
2586 |
safe_name = product_name.strip().replace(' ', '%20')
|
2587 |
for ext in image_extensions:
|
2588 |
image_url = f"{base_url}{safe_name}{ext}"
|
2589 |
+
logger.info(f"[Image] Checking fallback cPanel image URL: {image_url}")
|
2590 |
if image_url.startswith('http'):
|
2591 |
+
logger.info(f"[Image] Found cPanel image URL (fallback): {image_url}")
|
2592 |
return image_url
|
2593 |
+
|
2594 |
+
logger.warning(f"[Image] No cPanel image found for product: {product_name}")
|
2595 |
return None
|
2596 |
except Exception as e:
|
2597 |
+
logger.error(f"[Image] Error generating cPanel image URL for {product_name}: {e}")
|
2598 |
return None
|
2599 |
|
2600 |
def get_product_image_media_type(image_path: str) -> str:
|
|
|
2774 |
async def send_product_image_with_caption(from_number: str, product: Dict[str, Any], user_context: Dict[str, Any]):
|
2775 |
"""
|
2776 |
Send product image (if available) with product details as caption in a single WhatsApp message.
|
2777 |
+
Only uses cPanel images from https://amgocus.com/uploads/images/
|
2778 |
If image is not available, send only the product details as text.
|
|
|
2779 |
"""
|
2780 |
ensure_images_dir()
|
2781 |
product_name = product.get('Product Name', 'Unknown Product')
|
2782 |
details = generate_veterinary_product_response(product, user_context)
|
|
|
|
|
|
|
|
|
|
|
2783 |
|
2784 |
+
logger.info(f"[Product] Processing cPanel image for product: {product_name}")
|
|
|
2785 |
|
2786 |
try:
|
2787 |
+
# Get cPanel image URL for the product
|
2788 |
+
image_url = get_product_image_path(product_name)
|
2789 |
+
|
2790 |
+
if image_url and image_url.startswith('http'):
|
2791 |
+
logger.info(f"[Product] Found cPanel image URL: {image_url}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
2792 |
|
2793 |
+
# Test if the cPanel image URL is accessible
|
2794 |
try:
|
2795 |
+
logger.info(f"[Product] Testing cPanel image URL accessibility: {image_url}")
|
2796 |
headers = {
|
2797 |
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
|
2798 |
'Accept': 'image/webp,image/apng,image/*,*/*;q=0.8',
|
|
|
2803 |
}
|
2804 |
test_response = requests.head(image_url, headers=headers, timeout=10, allow_redirects=True)
|
2805 |
if test_response.status_code != 200:
|
2806 |
+
logger.warning(f"[Product] cPanel image URL not accessible (status {test_response.status_code}): {image_url}")
|
2807 |
+
raise Exception(f"cPanel image URL not accessible: {test_response.status_code}")
|
2808 |
+
logger.info(f"[Product] cPanel image URL is accessible")
|
2809 |
except Exception as e:
|
2810 |
+
logger.warning(f"[Product] Failed to test cPanel image URL {image_url}: {e}")
|
2811 |
image_url = None
|
2812 |
|
2813 |
+
# Send using cPanel URL
|
2814 |
if image_url:
|
2815 |
+
logger.info(f"[Product] Attempting to send cPanel image for: {product_name}")
|
2816 |
+
media_type = 'image/jpeg'
|
2817 |
+
filename = f"{product_name.replace(' ', '_')}.jpg"
|
2818 |
+
|
2819 |
success = send_whatsjet_message(
|
2820 |
from_number,
|
2821 |
details,
|
2822 |
media_type=media_type,
|
2823 |
+
media_path=image_url, # Use cPanel URL directly
|
2824 |
filename=filename
|
2825 |
)
|
2826 |
|
2827 |
if success:
|
2828 |
+
logger.info(f"[Product] Successfully sent cPanel image with caption for product: {product_name}")
|
2829 |
return
|
2830 |
else:
|
2831 |
+
logger.warning(f"[Product] Failed to send cPanel image, sending text only: {product_name}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
2832 |
|
2833 |
+
# No cPanel image available, send text only
|
2834 |
+
logger.info(f"[Product] No cPanel image available, sending text only for: {product_name}")
|
2835 |
+
send_whatsjet_message(from_number, details)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
2836 |
|
2837 |
except Exception as e:
|
2838 |
logger.error(f"[Product] Error sending product image with caption: {e}")
|
|
|
2865 |
|
2866 |
# Test endpoint for image sending
|
2867 |
@app.get("/test-image-sending")
|
2868 |
+
async def test_image_sending(phone: str, image_url: str = "https://amgocus.com/uploads/images/respiraaidplus.png"):
|
2869 |
"""Test endpoint for sending images via WhatsApp"""
|
2870 |
try:
|
2871 |
filename = "test_image.jpg"
|
|
|
2983 |
"timestamp": datetime.now().isoformat()
|
2984 |
}
|
2985 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
2986 |
def format_number_with_emoji(number: int) -> str:
|
2987 |
"""Format number with emoji"""
|
2988 |
emoji_map = {
|
|
|
3584 |
|
3585 |
# Launch FastAPI app
|
3586 |
import uvicorn
|
3587 |
+
uvicorn.run(app, host="0.0.0.0", port=7860)
|