Spaces:
Sleeping
Sleeping
# app.py | |
import os, re, functools, numpy as np, pandas as pd | |
import gradio as gr | |
from datasets import load_dataset | |
from sklearn.metrics.pairwise import cosine_similarity | |
# -------- Config -------- | |
SAMPLE_SIZE = int(os.getenv("SAMPLE_SIZE", "3000")) # small by default for CPU Spaces | |
RANDOM_STATE = 42 | |
DEFAULT_INPUT = "I am so happy with this product" | |
# -------- Helpers -------- | |
def clean_text(text: str) -> str: | |
text = (text or "").lower() | |
text = re.sub(r"http\S+", "", text) | |
text = re.sub(r"@\w+", "", text) | |
text = re.sub(r"#\w+", "", text) | |
text = re.sub(r"[^\w\s]", "", text) | |
text = re.sub(r"\s+", " ", text).strip() | |
return text | |
def _to_numpy(x): | |
try: | |
import torch | |
if hasattr(torch, "Tensor") and isinstance(x, torch.Tensor): | |
return x.detach().cpu().numpy() | |
except Exception: | |
pass | |
return np.asarray(x) | |
def _l2norm(x: np.ndarray) -> np.ndarray: | |
x = x.astype(np.float32, copy=False) | |
if x.ndim == 1: | |
x = x.reshape(1, -1) | |
return x / (np.linalg.norm(x, axis=1, keepdims=True) + 1e-12) | |
# -------- Load sample data once (FAST: only a slice) -------- | |
def load_sample_df(): | |
# Load only a slice (e.g., first 3000 rows) instead of the full 1.6M | |
ds = load_dataset("sentiment140", split=f"train[:{SAMPLE_SIZE}]") | |
df = ds.to_pandas() | |
df = df.dropna(subset=["text", "sentiment"]).copy() | |
df["text_length"] = df["text"].str.len() | |
df = df[(df["text_length"] >= 5) & (df["text_length"] <= 280)].copy() | |
df["clean_text"] = df["text"].apply(clean_text) | |
df = df.sample(frac=1.0, random_state=RANDOM_STATE).reset_index(drop=True) | |
return df[["text", "clean_text"]] | |
# -------- Lazy model loaders -------- | |
def load_sentence_model(model_id: str): | |
from sentence_transformers import SentenceTransformer | |
return SentenceTransformer(model_id) | |
def load_generator(): | |
from transformers import pipeline, set_seed | |
set_seed(RANDOM_STATE) | |
return pipeline("text-generation", model="distilgpt2") | |
# HF model ids | |
EMBEDDERS = { | |
"MiniLM (fast)": "sentence-transformers/all-MiniLM-L6-v2", | |
"MPNet (heavier)": "sentence-transformers/all-mpnet-base-v2", | |
"DistilRoBERTa (paraphrase)": "sentence-transformers/paraphrase-distilroberta-base-v1", | |
} | |
# Cache for corpus embeddings per model | |
_CORPUS_CACHE = {} | |
def _encode_norm(model, texts): | |
"""Encode compatibly across sentence-transformers versions; return L2-normalized numpy (n,d).""" | |
out = model.encode(texts, show_progress_bar=False) | |
out = _to_numpy(out) | |
return _l2norm(out) | |
def ensure_corpus_embeddings(model_name: str, texts: list): | |
if model_name in _CORPUS_CACHE: | |
return _CORPUS_CACHE[model_name] | |
model = load_sentence_model(EMBEDDERS[model_name]) | |
emb = _encode_norm(model, texts) | |
_CORPUS_CACHE[model_name] = emb | |
return emb | |
# -------- Retrieval -------- | |
def top3_for_each_model(user_input: str, selected_models: list): | |
df = load_sample_df() | |
texts = df["clean_text"].tolist() | |
rows = [] | |
for name in selected_models: | |
try: | |
model = load_sentence_model(EMBEDDERS[name]) | |
corpus_emb = ensure_corpus_embeddings(name, texts) | |
q = _encode_norm(model, [clean_text(user_input)]) | |
sims = cosine_similarity(q, corpus_emb)[0] | |
top_idx = sims.argsort()[-3:][::-1] | |
for rank, i in enumerate(top_idx, start=1): | |
rows.append({ | |
"Model": name, | |
"Rank": rank, | |
"Similarity": float(sims[i]), | |
"Tweet (clean)": texts[i], | |
"Tweet (orig)": df.loc[i, "text"], | |
}) | |
except Exception as e: | |
rows.append({ | |
"Model": name, "Rank": "-", "Similarity": "-", | |
"Tweet (clean)": f"[Error: {e}]", "Tweet (orig)": "" | |
}) | |
return pd.DataFrame(rows, columns=["Model","Rank","Similarity","Tweet (clean)","Tweet (orig)"]) | |
# -------- Generation + scoring (with progress) -------- | |
def generate_and_pick_best(prompt: str, n_sequences: int, max_length: int, | |
temperature: float, scorer_model_name: str, | |
progress=gr.Progress()): | |
progress(0.0, desc="Loading models…") | |
gen = load_generator() | |
scorer = load_sentence_model(EMBEDDERS[scorer_model_name]) | |
progress(0.3, desc="Generating candidates…") | |
outputs = gen( | |
prompt, | |
max_new_tokens=int(max_length), # number of NEW tokens to generate | |
num_return_sequences=int(n_sequences), | |
do_sample=True, | |
temperature=float(temperature), | |
pad_token_id=50256, | |
) | |
candidates = [o["generated_text"].strip() for o in outputs] | |
progress(0.7, desc="Scoring candidates…") | |
q = _encode_norm(scorer, [prompt]) | |
cand_vecs = _encode_norm(scorer, candidates) | |
sims = cosine_similarity(q, cand_vecs)[0] | |
best_idx = int(sims.argmax()) | |
table = pd.DataFrame({ | |
"Rank": np.argsort(-sims) + 1, | |
"Similarity": np.sort(sims)[::-1], | |
"Generated Tweet": [c for _, c in sorted(zip(-sims, candidates))] | |
}) | |
progress(1.0) | |
return candidates[best_idx], float(sims[best_idx]), table | |
# ---------------- UI ---------------- | |
with gr.Blocks(title="Sentiment140 Embeddings + Generation") as demo: | |
gr.Markdown( | |
""" | |
# 🧪 Sentiment140 — Embeddings & Tweet Generator | |
Type a tweet, get similar tweets from Sentiment140, and generate a new one. | |
""" | |
) | |
with gr.Row(): | |
test_input = gr.Textbox(label="Your input", value=DEFAULT_INPUT, lines=2) | |
models = gr.CheckboxGroup( | |
choices=list(EMBEDDERS.keys()), | |
value=["MiniLM (fast)"], | |
label="Embedding models to compare", | |
) | |
run_btn = gr.Button("🔎 Find Top‑3 Similar Tweets") | |
table_out = gr.Dataframe(interactive=False) | |
run_btn.click(top3_for_each_model, inputs=[test_input, models], outputs=table_out) | |
gr.Markdown("---") | |
gr.Markdown("## 📝 Generate Tweets and Pick the Best") | |
with gr.Row(): | |
n_seq = gr.Slider(1, 8, value=4, step=1, label="Number of candidates") | |
max_len = gr.Slider(20, 80, value=40, step=1, label="Max length (new tokens)") | |
temp = gr.Slider(0.7, 1.3, value=0.9, step=0.05, label="Temperature") | |
scorer_model = gr.Dropdown(list(EMBEDDERS.keys()), value="MiniLM (fast)", label="Scorer embedding") | |
gen_btn = gr.Button("✨ Generate & Score") | |
best_txt = gr.Textbox(label="Best generated tweet") | |
best_score = gr.Number(label="Similarity (best)") | |
gen_table = gr.Dataframe(interactive=False) | |
gen_btn.click( | |
generate_and_pick_best, | |
inputs=[test_input, n_seq, max_len, temp, scorer_model], | |
outputs=[best_txt, best_score, gen_table], | |
) | |
demo.queue(max_size=32).launch() | |