Spaces:
Sleeping
Sleeping
from flask import Blueprint, render_template, request, session, jsonify, redirect, url_for | |
from salesforce import get_salesforce_connection | |
import os | |
import re | |
import logging | |
from urllib.parse import quote | |
from datetime import datetime, timedelta | |
# Initialize logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
menu_blueprint = Blueprint('menu', __name__) | |
# Initialize Salesforce connection | |
sf = get_salesforce_connection() | |
# Constants | |
STATIC_DIR = os.path.join(os.path.dirname(__file__), '..', 'static') | |
PLACEHOLDER_VIDEO = 'placeholder.mp4' | |
PLACEHOLDER_IMAGE = 'placeholder.jpg' | |
PLACEHOLDER_VIDEO_PATH = os.path.join(STATIC_DIR, PLACEHOLDER_VIDEO) | |
PLACEHOLDER_IMAGE_PATH = os.path.join(STATIC_DIR, PLACEHOLDER_IMAGE) | |
SECTION_ORDER = ["Best Sellers", "Starters", "Biryanis", "Curries", "Breads", | |
"Customized Dish", "Appetizer", "Desserts", "Soft Drinks"] | |
MAX_BEST_SELLERS = 4 | |
DEFAULT_VIDEO_URL = "https://yourdomain.my.salesforce.com/sfc/servlet.shepherd/version/download/" | |
# Custom Jinja2 filter | |
def slugify(text): | |
"""Convert text to a slug format (lowercase, replace spaces with hyphens)""" | |
if not text: | |
return "" | |
text = text.lower() | |
text = re.sub(r'[^\w\s-]', '', text) # Remove non-alphanumeric chars | |
text = re.sub(r'[\s-]+', '-', text) # Replace spaces and multiple hyphens with single hyphen | |
return text | |
# Register the filter with the blueprint | |
def slugify_filter(s): | |
return slugify(s) | |
def initialize_placeholders(): | |
"""Create placeholder files if they don't exist""" | |
try: | |
if not os.path.exists(PLACEHOLDER_VIDEO_PATH): | |
with open(PLACEHOLDER_VIDEO_PATH, 'wb') as f: | |
f.write(b'') # Empty file | |
logger.info(f"Created placeholder video at {PLACEHOLDER_VIDEO_PATH}") | |
if not os.path.exists(PLACEHOLDER_IMAGE_PATH): | |
with open(PLACEHOLDER_IMAGE_PATH, 'wb') as f: | |
f.write(b'') # Empty file | |
logger.info(f"Created placeholder image at {PLACEHOLDER_IMAGE_PATH}") | |
except Exception as e: | |
logger.error(f"Error creating placeholder files: {str(e)}") | |
initialize_placeholders() | |
def get_valid_media_path(media_url=None, media_type='video'): | |
""" | |
Get valid media path with placeholder fallback | |
Args: | |
media_url: URL or ID from Salesforce | |
media_type: 'video' or 'image' | |
Returns: | |
Valid URL for the media | |
""" | |
# Default placeholders | |
default_path = f"/static/{PLACEHOLDER_VIDEO if media_type == 'video' else PLACEHOLDER_IMAGE}" | |
if not media_url: | |
return default_path | |
# Handle Salesforce File ID (starts with '069') | |
if media_url.startswith('069'): | |
return f"{DEFAULT_VIDEO_URL}{media_url}" | |
# Handle complete URLs | |
if media_url.startswith(('http://', 'https://')): | |
return media_url | |
# Handle relative paths | |
if media_url.startswith('/'): | |
return media_url | |
return default_path | |
def validate_user_session(): | |
"""Validate and return user session data or redirect to login""" | |
user_email = session.get('user_email') | |
user_name = session.get('user_name') | |
if not user_email: | |
user_email = request.args.get('email') | |
user_name = request.args.get('name') | |
if user_email: | |
session['user_email'] = user_email | |
session['user_name'] = user_name | |
else: | |
return None, None, redirect(url_for("login")) | |
first_letter = user_name[0].upper() if user_name else "A" | |
return user_email, first_letter, None | |
def get_user_details(sf_connection, email): | |
"""Fetch user details from Salesforce""" | |
try: | |
query = f""" | |
SELECT Id, Referral__c, Reward_Points__c | |
FROM Customer_Login__c | |
WHERE Email__c = '{email}' | |
""" | |
result = sf_connection.query(query) | |
if not result['records']: | |
return None | |
return { | |
'referral_code': result['records'][0].get('Referral__c', 'N/A'), | |
'reward_points': result['records'][0].get('Reward_Points__c', 0) | |
} | |
except Exception as e: | |
logger.error(f"Error fetching user details: {str(e)}") | |
return None | |
def get_cart_count(sf_connection, email): | |
"""Get cart item count for user""" | |
try: | |
query = f"SELECT COUNT() FROM Cart_Item__c WHERE Customer_Email__c = '{email}'" | |
result = sf_connection.query(query) | |
return result['totalSize'] | |
except Exception as e: | |
logger.error(f"Error fetching cart count: {str(e)}") | |
return 0 | |
def fetch_menu_items(sf_connection, selected_category): | |
"""Fetch and process menu items from Salesforce""" | |
try: | |
# Query for standard menu items | |
menu_query = """ | |
SELECT Id, Name, Price__c, Description__c, Image1__c, Image2__c, | |
Veg_NonVeg__c, Section__c, Total_Ordered__c, Video1__c, | |
Is_Available__c, Spice_Level__c, Preparation_Time__c | |
FROM Menu_Item__c | |
WHERE Is_Available__c = true | |
""" | |
menu_result = sf_connection.query(menu_query) | |
food_items = menu_result.get('records', []) | |
# Query for custom dishes (last 7 days) | |
custom_dish_query = """ | |
SELECT Id, Name, Price__c, Description__c, Image1__c, Image2__c, | |
Veg_NonVeg__c, Section__c, Total_Ordered__c, Is_Available__c | |
FROM Custom_Dish__c | |
WHERE CreatedDate >= LAST_N_DAYS:7 AND Is_Available__c = true | |
""" | |
custom_dish_result = sf_connection.query(custom_dish_query) | |
custom_dishes = custom_dish_result.get('records', []) | |
# Process and merge items | |
all_items = [] | |
for item in food_items + custom_dishes: | |
processed_item = { | |
'id': item.get('Id'), | |
'name': item.get('Name', 'Unnamed Item'), | |
'price': item.get('Price__c', 0), | |
'description': item.get('Description__c', 'No description available'), | |
'image1': get_valid_media_path(item.get('Image1__c'), 'image'), | |
'image2': get_valid_media_path(item.get('Image2__c'), 'image'), | |
'video': get_valid_media_path(item.get('Video1__c'), 'video'), | |
'veg_non_veg': item.get('Veg_NonVeg__c', 'Unknown'), | |
'section': item.get('Section__c', 'Others'), | |
'total_ordered': item.get('Total_Ordered__c', 0), | |
'is_available': item.get('Is_Available__c', True), | |
'spice_level': item.get('Spice_Level__c', 'Medium'), | |
'prep_time': item.get('Preparation_Time__c', 20) | |
} | |
# Apply category filter | |
if selected_category == "Veg" and processed_item['veg_non_veg'] not in ["Veg", "both"]: | |
continue | |
if selected_category == "Non veg" and processed_item['veg_non_veg'] not in ["Non veg", "both"]: | |
continue | |
all_items.append(processed_item) | |
return all_items | |
except Exception as e: | |
logger.error(f"Error fetching menu items: {str(e)}") | |
return [] | |
def organize_menu_items(items, selected_category): | |
"""Organize menu items into sections""" | |
ordered_menu = {section: [] for section in SECTION_ORDER} | |
added_item_ids = set() | |
# Process best sellers | |
best_sellers = sorted( | |
[item for item in items if item['total_ordered'] > 0], | |
key=lambda x: x['total_ordered'], | |
reverse=True | |
)[:MAX_BEST_SELLERS] | |
if best_sellers: | |
ordered_menu["Best Sellers"] = best_sellers | |
added_item_ids.update(item['id'] for item in best_sellers) | |
# Organize other sections | |
for item in items: | |
if item['id'] in added_item_ids: | |
continue | |
section = item['section'] | |
if section not in ordered_menu: | |
section = "Others" # Fallback section | |
ordered_menu[section].append(item) | |
added_item_ids.add(item['id']) | |
# Remove empty sections | |
return {section: items for section, items in ordered_menu.items() if items} | |
def menu(): | |
# Validate session | |
user_email, first_letter, redirect_response = validate_user_session() | |
if redirect_response: | |
return redirect_response | |
selected_category = request.args.get("category", "All") | |
try: | |
# Get user details | |
user_details = get_user_details(sf, user_email) | |
if not user_details: | |
return redirect(url_for('login')) | |
# Get cart count | |
cart_item_count = get_cart_count(sf, user_email) | |
# Fetch and organize menu items | |
all_items = fetch_menu_items(sf, selected_category) | |
ordered_menu = organize_menu_items(all_items, selected_category) | |
# Prepare categories | |
categories = ["All", "Veg", "Non veg"] | |
return render_template( | |
"menu.html", | |
ordered_menu=ordered_menu, | |
categories=categories, | |
selected_category=selected_category, | |
referral_code=user_details['referral_code'], | |
reward_points=user_details['reward_points'], | |
user_name=session.get('user_name'), | |
first_letter=first_letter, | |
cart_item_count=cart_item_count | |
) | |
except Exception as e: | |
logger.error(f"Error in menu route: {str(e)}") | |
# Fallback data | |
return render_template( | |
"menu.html", | |
ordered_menu={}, | |
categories=["All", "Veg", "Non veg"], | |
selected_category=selected_category, | |
referral_code='N/A', | |
reward_points=0, | |
user_name=session.get('user_name'), | |
first_letter=first_letter, | |
cart_item_count=0, | |
error_message="We're experiencing technical difficulties. Please try again later." | |
) | |
def get_addons(): | |
try: | |
item_name = request.args.get('item_name', '').strip() | |
item_section = request.args.get('item_section', '').strip() | |
if not item_name or not item_section: | |
return jsonify({ | |
"success": False, | |
"error": "Item name and section are required." | |
}), 400 | |
# Query customization options | |
query = f""" | |
SELECT Id, Name, Customization_Type__c, Options__c, | |
Max_Selections__c, Extra_Charge__c, Extra_Charge_Amount__c, | |
Is_Active__c | |
FROM Customization_Options__c | |
WHERE Section__c = '{item_section}' | |
AND Is_Active__c = true | |
ORDER BY Display_Order__c NULLS LAST | |
""" | |
result = sf.query(query) | |
addons = result.get('records', []) | |
if not addons: | |
return jsonify({ | |
"success": False, | |
"error": "No customization options found." | |
}), 404 | |
# Format response | |
formatted_addons = [] | |
for addon in addons: | |
options = addon.get("Options__c", "") | |
options = [opt.strip() for opt in options.split(",")] if options else [] | |
formatted_addons.append({ | |
"id": addon.get("Id"), | |
"name": addon.get("Name"), | |
"type": addon.get("Customization_Type__c", "checkbox"), | |
"options": options, | |
"max_selections": addon.get("Max_Selections__c", 1), | |
"extra_charge": addon.get("Extra_Charge__c", False), | |
"extra_charge_amount": float(addon.get("Extra_Charge_Amount__c", 0)) | |
}) | |
return jsonify({ | |
"success": True, | |
"addons": formatted_addons | |
}) | |
except Exception as e: | |
logger.error(f"Error in get_addons: {str(e)}") | |
return jsonify({ | |
"success": False, | |
"error": "An error occurred while fetching options." | |
}), 500 | |
def add_to_cart(): | |
try: | |
# Validate session | |
if 'user_email' not in session: | |
return jsonify({ | |
"success": False, | |
"error": "Authentication required." | |
}), 401 | |
# Validate input | |
data = request.get_json() | |
if not data: | |
return jsonify({ | |
"success": False, | |
"error": "Invalid JSON data." | |
}), 400 | |
required_fields = ['itemName', 'itemPrice', 'itemImage', 'section'] | |
for field in required_fields: | |
if field not in data or not data[field]: | |
return jsonify({ | |
"success": False, | |
"error": f"Missing required field: {field}" | |
}), 400 | |
# Prepare data | |
item_data = { | |
'name': data['itemName'].strip(), | |
'price': float(data['itemPrice']), | |
'image': data['itemImage'].strip(), | |
'section': data['section'].strip(), | |
'category': data.get('category', ''), | |
'addons': data.get('addons', []), | |
'instructions': data.get('instructions', '').strip(), | |
'quantity': int(data.get('quantity', 1)), | |
'customer_email': session['user_email'] | |
} | |
# Calculate addons | |
addons_price = sum(float(addon.get('price', 0)) for addon in item_data['addons']) | |
addons_string = "; ".join( | |
f"{addon.get('name', '')} (${addon.get('price', 0):.2f})" | |
for addon in item_data['addons'] | |
) if item_data['addons'] else "None" | |
total_price = (item_data['price'] * item_data['quantity']) + addons_price | |
# Check if item already exists in cart | |
query = f""" | |
SELECT Id, Quantity__c, Add_Ons__c, Add_Ons_Price__c, Instructions__c | |
FROM Cart_Item__c | |
WHERE Customer_Email__c = '{item_data['customer_email']}' | |
AND Name = '{item_data['name']}' | |
LIMIT 1 | |
""" | |
result = sf.query(query) | |
existing_items = result.get('records', []) | |
if existing_items: | |
# Update existing item | |
existing_item = existing_items[0] | |
new_quantity = existing_item['Quantity__c'] + item_data['quantity'] | |
# Merge addons | |
existing_addons = existing_item.get('Add_Ons__c', "None") | |
merged_addons = existing_addons if existing_addons == "None" else f"{existing_addons}; {addons_string}" | |
# Merge instructions | |
existing_instructions = existing_item.get('Instructions__c', "") | |
merged_instructions = existing_instructions if not item_data['instructions'] else \ | |
f"{existing_instructions} | {item_data['instructions']}".strip(" | ") | |
# Update in Salesforce | |
sf.Cart_Item__c.update(existing_item['Id'], { | |
"Quantity__c": new_quantity, | |
"Add_Ons__c": merged_addons, | |
"Add_Ons_Price__c": addons_price + existing_item.get('Add_Ons_Price__c', 0), | |
"Instructions__c": merged_instructions, | |
"Price__c": total_price + existing_item.get('Price__c', 0) | |
}) | |
else: | |
# Create new cart item | |
sf.Cart_Item__c.create({ | |
"Name": item_data['name'], | |
"Price__c": total_price, | |
"Base_Price__c": item_data['price'], | |
"Quantity__c": item_data['quantity'], | |
"Add_Ons_Price__c": addons_price, | |
"Add_Ons__c": addons_string, | |
"Image1__c": item_data['image'], | |
"Customer_Email__c": item_data['customer_email'], | |
"Instructions__c": item_data['instructions'], | |
"Category__c": item_data['category'], | |
"Section__c": item_data['section'] | |
}) | |
return jsonify({ | |
"success": True, | |
"message": "Item added to cart successfully." | |
}) | |
except ValueError as e: | |
logger.error(f"Value error in add_to_cart: {str(e)}") | |
return jsonify({ | |
"success": False, | |
"error": "Invalid price or quantity format." | |
}), 400 | |
except Exception as e: | |
logger.error(f"Error in add_to_cart: {str(e)}") | |
return jsonify({ | |
"success": False, | |
"error": "An error occurred while adding to cart." | |
}), 500 |