Spaces:
Runtime error
Runtime error
from flask import Blueprint, render_template, request, session, jsonify, redirect, url_for | |
import os | |
import base64 | |
import logging | |
from salesforce import get_salesforce_connection | |
# Configure logging | |
logging.basicConfig(level=logging.DEBUG) | |
logger = logging.getLogger(__name__) | |
menu_blueprint = Blueprint('menu', __name__) | |
# Initialize Salesforce connection | |
sf = get_salesforce_connection() | |
# Constants for video handling | |
STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static') | |
PLACEHOLDER_VIDEO = 'placeholder.mp4' | |
PLACEHOLDER_PATH = os.path.join(STATIC_DIR, PLACEHOLDER_VIDEO) | |
SECTION_ORDER = ["Best Sellers", "Starters", "Biryanis", "Curries", "Breads", "Customized dish", "Apetizer", "Desserts", "Soft Drinks"] | |
# Create placeholder video at startup if it doesn't exist | |
if not os.path.exists(PLACEHOLDER_PATH): | |
with open(PLACEHOLDER_PATH, 'wb') as f: | |
f.close() | |
logger.info(f"Created placeholder video at {PLACEHOLDER_PATH}") | |
def get_valid_video_path(item_name, video_url=None): | |
"""Get valid video path for item with placeholder fallback.""" | |
if video_url: | |
if video_url.startswith(('http://', 'https://')): | |
return video_url | |
elif video_url.startswith('/'): | |
return video_url | |
elif video_url.startswith('069'): | |
return f"https://biryanihub-dev-ed.develop.my.salesforce.com/sfc/servlet.shepherd/version/download/{video_url}" | |
if not os.path.exists(PLACEHOLDER_PATH): | |
with open(PLACEHOLDER_PATH, 'wb') as f: | |
f.close() | |
logger.info(f"Created missing placeholder video at {PLACEHOLDER_PATH}") | |
return f"/static/{PLACEHOLDER_VIDEO}" | |
def menu(): | |
selected_category = request.args.get("category", "All") | |
user_email = session.get('user_email') | |
# Handle user authentication | |
if not user_email: | |
user_email = request.args.get("email") | |
user_name = request.args.get("name") | |
if user_email and user_name: | |
session['user_email'] = user_email | |
session['user_name'] = user_name | |
else: | |
return redirect(url_for("login")) | |
else: | |
user_name = session.get('user_name') | |
first_letter = user_name[0].upper() if user_name else "A" | |
user_image = session.get('user_image') | |
try: | |
describe_result = sf.Customer_Login__c.describe() | |
fields = [field['name'] for field in describe_result['fields']] | |
avatar_field_exists = 'Avatar__c' in fields | |
except Exception as e: | |
logger.error(f"Error describing Customer_Login__c object: {str(e)}") | |
avatar_field_exists = False | |
query_fields = ["Id", "Referral__c", "Reward_Points__c"] | |
if avatar_field_exists: | |
query_fields.append("Avatar__c") | |
user_query = f"SELECT {', '.join(query_fields)} FROM Customer_Login__c WHERE Email__c = '{user_email}'" | |
try: | |
user_result = sf.query(user_query) | |
if not user_result.get('records'): | |
logger.warning(f"No user found with email: {user_email}") | |
return redirect(url_for('login')) | |
except Exception as e: | |
logger.error(f"Error querying user data: {str(e)}") | |
return jsonify({"success": False, "error": "Failed to fetch user data from Salesforce"}), 500 | |
user_record = user_result['records'][0] | |
user_id = user_record['Id'] | |
referral_code = user_record.get('Referral__c', 'N/A') | |
reward_points = user_record.get('Reward_Points__c', 0) | |
if not user_image and avatar_field_exists and user_record.get('Avatar__c'): | |
session['user_image'] = user_record['Avatar__c'] | |
user_image = session['user_image'] | |
cart_query = f"SELECT COUNT() FROM Cart_Item__c WHERE Customer_Email__c = '{user_email}'" | |
try: | |
cart_count_result = sf.query(cart_query) | |
cart_item_count = cart_count_result.get('totalSize', 0) | |
except Exception as e: | |
logger.error(f"Error fetching cart item count: {str(e)}") | |
cart_item_count = 0 | |
menu_query = """ | |
SELECT Name, Price__c, Description__c, Image1__c, Image2__c, | |
Veg_NonVeg__c, Section__c, Total_Ordered__c, Video1__c, | |
IngredientsInfo__c, NutritionalInfo__c, Allergens__c | |
FROM Menu_Item__c | |
""" | |
try: | |
menu_result = sf.query_all(menu_query) | |
food_items = menu_result.get('records', []) | |
except Exception as e: | |
logger.error(f"Error fetching menu items: {str(e)}") | |
food_items = [] | |
for item in food_items: | |
item['Total_Ordered__c'] = item.get('Total_Ordered__c', 0) or 0 | |
item['Video1__c'] = get_valid_video_path(item['Name'], item.get('Video1__c')) | |
item['Section__c'] = item.get('Section__c', "Others") | |
item['Description__c'] = item.get('Description__c', "No description available") | |
item['IngredientsInfo__c'] = item.get('IngredientsInfo__c', "Not specified") | |
item['NutritionalInfo__c'] = item.get('NutritionalInfo__c', "Not available") | |
item['Allergens__c'] = item.get('Allergens__c', "None listed") | |
item['is_menu_item'] = True | |
custom_dish_query = """ | |
SELECT Name, Price__c, Description__c, Image1__c, Image2__c, | |
Veg_NonVeg__c, Section__c, Total_Ordered__c | |
FROM Custom_Dish__c | |
WHERE CreatedDate >= LAST_N_DAYS:7 | |
""" | |
try: | |
custom_dish_result = sf.query_all(custom_dish_query) | |
custom_dishes = custom_dish_result.get('records', []) | |
except Exception as e: | |
logger.error(f"Error fetching custom dishes: {str(e)}") | |
custom_dishes = [] | |
for item in custom_dishes: | |
item['Total_Ordered__c'] = item.get('Total_Ordered__c', 0) or 0 | |
item['Video1__c'] = get_valid_video_path(item['Name']) | |
item['Section__c'] = item.get('Section__c', "Customized dish") | |
item['Description__c'] = item.get('Description__c', "No description available") | |
item['is_menu_item'] = False | |
all_items = food_items + custom_dishes | |
ordered_menu = {section: [] for section in SECTION_ORDER} | |
best_sellers = sorted(all_items, key=lambda x: x['Total_Ordered__c'], reverse=True) | |
if selected_category == "Veg": | |
best_sellers = [item for item in best_sellers if item.get("Veg_NonVeg__c") in ["Veg", "both"]] | |
elif selected_category == "Non veg": | |
best_sellers = [item for item in best_sellers if item.get("Veg_NonVeg__c") in ["Non veg", "both"]] | |
ordered_menu["Best Sellers"] = best_sellers[:4] | |
added_item_names = set() | |
for item in all_items: | |
section = item['Section__c'] | |
if section not in ordered_menu: | |
ordered_menu[section] = [] | |
if item['Name'] in added_item_names: | |
continue | |
veg_nonveg = item.get("Veg_NonVeg__c", "both") | |
if selected_category == "Veg" and veg_nonveg not in ["Veg", "both"]: | |
continue | |
if selected_category == "Non veg" and veg_nonveg not in ["Non veg", "both"]: | |
continue | |
ordered_menu[section].append(item) | |
added_item_names.add(item['Name']) | |
ordered_menu = {section: items for section, items in ordered_menu.items() if items} | |
categories = ["All", "Veg", "Non veg"] | |
return render_template( | |
"menu.html", | |
ordered_menu=ordered_menu, | |
categories=categories, | |
selected_category=selected_category, | |
referral_code=referral_code, | |
reward_points=reward_points, | |
user_name=user_name, | |
first_letter=first_letter, | |
cart_item_count=cart_item_count, | |
user_image=user_image, | |
user_id=user_id | |
) | |
def search(): | |
user_email = session.get('user_email') | |
if not user_email: | |
return redirect(url_for("login")) | |
user_name = session.get('user_name') | |
first_letter = user_name[0].upper() if user_name else "A" | |
user_image = session.get('user_image') | |
# Get cart item count | |
cart_query = f"SELECT COUNT() FROM Cart_Item__c WHERE Customer_Email__c = '{user_email}'" | |
try: | |
cart_count_result = sf.query(cart_query) | |
cart_item_count = cart_count_result.get('totalSize', 0) | |
except Exception as e: | |
logger.error(f"Error fetching cart item count: {str(e)}") | |
cart_item_count = 0 | |
# Fetch all menu items for search functionality | |
menu_query = """ | |
SELECT Name, Price__c, Description__c, Image1__c, Image2__c, | |
Veg_NonVeg__c, Section__c, Total_Ordered__c, Video1__c, | |
IngredientsInfo__c, NutritionalInfo__c, Allergens__c | |
FROM Menu_Item__c | |
""" | |
try: | |
menu_result = sf.query_all(menu_query) | |
food_items = menu_result.get('records', []) | |
except Exception as e: | |
logger.error(f"Error fetching menu items: {str(e)}") | |
food_items = [] | |
for item in food_items: | |
item['Total_Ordered__c'] = item.get('Total_Ordered__c', 0) or 0 | |
item['Video1__c'] = get_valid_video_path(item['Name'], item.get('Video1__c')) | |
item['Section__c'] = item.get('Section__c', "Others") | |
item['Description__c'] = item.get('Description__c', "No description available") | |
item['IngredientsInfo__c'] = item.get('IngredientsInfo__c', "Not specified") | |
item['NutritionalInfo__c'] = item.get('NutritionalInfo__c', "Not available") | |
item['Allergens__c'] = item.get('Allergens__c', "None listed") | |
item['is_menu_item'] = True | |
custom_dish_query = """ | |
SELECT Name, Price__c, Description__c, Image1__c, Image2__c, | |
Veg_NonVeg__c, Section__c, Total_Ordered__c | |
FROM Custom_Dish__c | |
WHERE CreatedDate >= LAST_N_DAYS:7 | |
""" | |
try: | |
custom_dish_result = sf.query_all(custom_dish_query) | |
custom_dishes = custom_dish_result.get('records', []) | |
except Exception as e: | |
logger.error(f"Error fetching custom dishes: {str(e)}") | |
custom_dishes = [] | |
for item in custom_dishes: | |
item['Total_Ordered__c'] = item.get('Total_Ordered__c', 0) or 0 | |
item['Video1__c'] = get_valid_video_path(item['Name']) | |
item['Section__c'] = item.get('Section__c', "Customized dish") | |
item['Description__c'] = item.get('Description__c', "No description available") | |
item['is_menu_item'] = False | |
all_items = food_items + custom_dishes | |
return render_template( | |
"search.html", | |
all_items=all_items, | |
user_name=user_name, | |
first_letter=first_letter, | |
cart_item_count=cart_item_count, | |
user_image=user_image | |
) | |
# Existing routes remain unchanged... | |
def upload_avatar(): | |
# ... (unchanged) | |
pass | |
def delete_avatar(): | |
# ... (unchanged) | |
pass | |
def get_addons(): | |
# ... (unchanged) | |
pass | |
def add_to_cart(): | |
# ... (unchanged) | |
pass |