import streamlit as st import pandas as pd import numpy as np import tensorflow as tf import joblib import os import zipfile import tempfile # Define static file paths BASE_DIR = os.path.dirname(__file__) ZIP_MODEL_PATH = os.path.join(BASE_DIR, "recommender_model.zip") MOVIES_PATH = os.path.join(BASE_DIR, "movies.csv") ENCODINGS_PATH = os.path.join(BASE_DIR, "encodings.pkl") @st.cache_resource def load_model(): try: # Define extraction directory in a writable temp location extract_dir = os.path.join(tempfile.gettempdir(), "recommender_model_extracted") # Only extract if not already done if not os.path.exists(extract_dir): with zipfile.ZipFile(ZIP_MODEL_PATH, "r") as zip_ref: zip_ref.extractall(extract_dir) # Load model from extracted directory return tf.keras.models.load_model(extract_dir) except Exception as e: st.error(f"❌ Failed to load model:\n\n{e}") st.stop() @st.cache_data def load_assets(): try: df_movies = pd.read_csv(MOVIES_PATH) except FileNotFoundError: st.error("❌ movies.csv not found.") st.stop() try: user_map, movie_map = joblib.load(ENCODINGS_PATH) except FileNotFoundError: st.error("❌ encodings.pkl not found.") st.stop() return df_movies, user_map, movie_map # Load model and assets model = load_model() movies_df, user2idx, movie2idx = load_assets() reverse_movie_map = {v: k for k, v in movie2idx.items()} # App UI st.title("🎬 TensorFlow Movie Recommender") st.write("Select some movies you've liked to get personalized recommendations:") # Movie selection UI movie_titles = movies_df.set_index("movieId")["title"].to_dict() movie_choices = [movie_titles[mid] for mid in movie2idx if mid in movie_titles] selected_titles = st.multiselect("🎞️ Liked movies", sorted(movie_choices)) # Create ratings dictionary user_ratings = {} for title in selected_titles: movie_id = next((k for k, v in movie_titles.items() if v == title), None) if movie_id: user_ratings[movie_id] = 5.0 # Generate recommendations if st.button("🎯 Get Recommendations"): if not user_ratings: st.warning("Please select at least one movie.") else: liked_indices = [movie2idx[m] for m in user_ratings if m in movie2idx] if not liked_indices: st.error("⚠️ No valid movie encodings found.") st.stop() # Get embedding averages and scores avg_embedding = tf.reduce_mean(model.layers[2](tf.constant(liked_indices)), axis=0, keepdims=True) all_movie_indices = tf.range(len(movie2idx)) movie_embeddings = model.layers[3](all_movie_indices) scores = tf.reduce_sum(avg_embedding * movie_embeddings, axis=1).numpy() top_indices = np.argsort(scores)[::-1] # Top N recommendations excluding already-liked recommended = [] for idx in top_indices: mid = reverse_movie_map.get(idx) if mid not in user_ratings and mid in movie_titles: recommended.append((movie_titles[mid], scores[idx])) if len(recommended) >= 10: break # Display recommendations st.subheader("🍿 Top 10 Recommendations") for title, score in recommended: st.write(f"**{title}** — Score: `{score:.3f}`")