# app.py import os, re, functools, numpy as np, pandas as pd import gradio as gr from datasets import load_dataset from sklearn.metrics.pairwise import cosine_similarity # -------- Config -------- SAMPLE_SIZE = int(os.getenv("SAMPLE_SIZE", "3000")) # small by default for CPU Spaces RANDOM_STATE = 42 DEFAULT_INPUT = "I am so happy with this product" # -------- Helpers -------- def clean_text(text: str) -> str: text = (text or "").lower() text = re.sub(r"http\S+", "", text) text = re.sub(r"@\w+", "", text) text = re.sub(r"#\w+", "", text) text = re.sub(r"[^\w\s]", "", text) text = re.sub(r"\s+", " ", text).strip() return text def _to_numpy(x): try: import torch if hasattr(torch, "Tensor") and isinstance(x, torch.Tensor): return x.detach().cpu().numpy() except Exception: pass return np.asarray(x) def _l2norm(x: np.ndarray) -> np.ndarray: x = x.astype(np.float32, copy=False) if x.ndim == 1: x = x.reshape(1, -1) return x / (np.linalg.norm(x, axis=1, keepdims=True) + 1e-12) # -------- Load sample data once (FAST: only a slice) -------- @functools.lru_cache(maxsize=1) def load_sample_df(): # Load only a slice (e.g., first 3000 rows) instead of the full 1.6M ds = load_dataset("sentiment140", split=f"train[:{SAMPLE_SIZE}]") df = ds.to_pandas() df = df.dropna(subset=["text", "sentiment"]).copy() df["text_length"] = df["text"].str.len() df = df[(df["text_length"] >= 5) & (df["text_length"] <= 280)].copy() df["clean_text"] = df["text"].apply(clean_text) df = df.sample(frac=1.0, random_state=RANDOM_STATE).reset_index(drop=True) return df[["text", "clean_text"]] # -------- Lazy model loaders -------- @functools.lru_cache(maxsize=None) def load_sentence_model(model_id: str): from sentence_transformers import SentenceTransformer return SentenceTransformer(model_id) @functools.lru_cache(maxsize=None) def load_generator(): from transformers import pipeline, set_seed set_seed(RANDOM_STATE) return pipeline("text-generation", model="distilgpt2") # HF model ids EMBEDDERS = { "MiniLM (fast)": "sentence-transformers/all-MiniLM-L6-v2", "MPNet (heavier)": "sentence-transformers/all-mpnet-base-v2", "DistilRoBERTa (paraphrase)": "sentence-transformers/paraphrase-distilroberta-base-v1", } # Cache for corpus embeddings per model _CORPUS_CACHE = {} def _encode_norm(model, texts): """Encode compatibly across sentence-transformers versions; return L2-normalized numpy (n,d).""" out = model.encode(texts, show_progress_bar=False) out = _to_numpy(out) return _l2norm(out) def ensure_corpus_embeddings(model_name: str, texts: list): if model_name in _CORPUS_CACHE: return _CORPUS_CACHE[model_name] model = load_sentence_model(EMBEDDERS[model_name]) emb = _encode_norm(model, texts) _CORPUS_CACHE[model_name] = emb return emb # -------- Retrieval -------- def top3_for_each_model(user_input: str, selected_models: list): df = load_sample_df() texts = df["clean_text"].tolist() rows = [] for name in selected_models: try: model = load_sentence_model(EMBEDDERS[name]) corpus_emb = ensure_corpus_embeddings(name, texts) q = _encode_norm(model, [clean_text(user_input)]) sims = cosine_similarity(q, corpus_emb)[0] top_idx = sims.argsort()[-3:][::-1] for rank, i in enumerate(top_idx, start=1): rows.append({ "Model": name, "Rank": rank, "Similarity": float(sims[i]), "Tweet (clean)": texts[i], "Tweet (orig)": df.loc[i, "text"], }) except Exception as e: rows.append({ "Model": name, "Rank": "-", "Similarity": "-", "Tweet (clean)": f"[Error: {e}]", "Tweet (orig)": "" }) return pd.DataFrame(rows, columns=["Model","Rank","Similarity","Tweet (clean)","Tweet (orig)"]) # -------- Generation + scoring (with progress) -------- def generate_and_pick_best(prompt: str, n_sequences: int, max_length: int, temperature: float, scorer_model_name: str, progress=gr.Progress()): progress(0.0, desc="Loading models…") gen = load_generator() scorer = load_sentence_model(EMBEDDERS[scorer_model_name]) progress(0.3, desc="Generating candidates…") outputs = gen( prompt, max_new_tokens=int(max_length), # number of NEW tokens to generate num_return_sequences=int(n_sequences), do_sample=True, temperature=float(temperature), pad_token_id=50256, ) candidates = [o["generated_text"].strip() for o in outputs] progress(0.7, desc="Scoring candidates…") q = _encode_norm(scorer, [prompt]) cand_vecs = _encode_norm(scorer, candidates) sims = cosine_similarity(q, cand_vecs)[0] best_idx = int(sims.argmax()) table = pd.DataFrame({ "Rank": np.argsort(-sims) + 1, "Similarity": np.sort(sims)[::-1], "Generated Tweet": [c for _, c in sorted(zip(-sims, candidates))] }) progress(1.0) return candidates[best_idx], float(sims[best_idx]), table # ---------------- UI ---------------- with gr.Blocks(title="Sentiment140 Embeddings + Generation") as demo: gr.Markdown( """ # 🧪 Sentiment140 — Embeddings & Tweet Generator Type a tweet, get similar tweets from Sentiment140, and generate a new one. """ ) with gr.Row(): test_input = gr.Textbox(label="Your input", value=DEFAULT_INPUT, lines=2) models = gr.CheckboxGroup( choices=list(EMBEDDERS.keys()), value=["MiniLM (fast)"], label="Embedding models to compare", ) run_btn = gr.Button("🔎 Find Top‑3 Similar Tweets") table_out = gr.Dataframe(interactive=False) run_btn.click(top3_for_each_model, inputs=[test_input, models], outputs=table_out) gr.Markdown("---") gr.Markdown("## 📝 Generate Tweets and Pick the Best") with gr.Row(): n_seq = gr.Slider(1, 8, value=4, step=1, label="Number of candidates") max_len = gr.Slider(20, 80, value=40, step=1, label="Max length (new tokens)") temp = gr.Slider(0.7, 1.3, value=0.9, step=0.05, label="Temperature") scorer_model = gr.Dropdown(list(EMBEDDERS.keys()), value="MiniLM (fast)", label="Scorer embedding") gen_btn = gr.Button("✨ Generate & Score") best_txt = gr.Textbox(label="Best generated tweet") best_score = gr.Number(label="Similarity (best)") gen_table = gr.Dataframe(interactive=False) gen_btn.click( generate_and_pick_best, inputs=[test_input, n_seq, max_len, temp, scorer_model], outputs=[best_txt, best_score, gen_table], ) demo.queue(max_size=32).launch()