Spaces:
Running
Running
import os | |
import torch | |
import torch.nn as nn | |
import torch.optim as optim | |
from torch.utils.data import DataLoader, Dataset | |
from transformers import AutoTokenizer, get_scheduler | |
import gradio as gr | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import pandas as pd | |
import io | |
from PIL import Image | |
import openai | |
import time | |
# β Set OpenAI API key from secret | |
openai.api_key = os.getenv("OPENAI_API_KEY") | |
# β Device setup | |
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
# β Load PIQA from public GitHub (JSONL) | |
dataset = { | |
"train": pd.read_json("https://raw.githubusercontent.com/epfml/Deep_Learning_Projects/master/PIQA/data/train.jsonl", lines=True), | |
"validation": pd.read_json("https://raw.githubusercontent.com/epfml/Deep_Learning_Projects/master/PIQA/data/valid.jsonl", lines=True) | |
} | |
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased") | |
# β Tokenization helper | |
def tokenize_choices(example): | |
input_0 = tokenizer(example["goal"] + " " + example["sol1"], truncation=True, padding="max_length", max_length=128, return_tensors="pt") | |
input_1 = tokenizer(example["goal"] + " " + example["sol2"], truncation=True, padding="max_length", max_length=128, return_tensors="pt") | |
return { | |
"input_ids_0": input_0["input_ids"][0], | |
"input_ids_1": input_1["input_ids"][0], | |
"label": int(example["label"]) | |
} | |
train_data = [tokenize_choices(row) for _, row in dataset["train"].head(500).iterrows()] | |
val_data = [tokenize_choices(row) for _, row in dataset["validation"].head(200).iterrows()] | |
# β Dataset class | |
class PIQADataset(Dataset): | |
def __init__(self, data): | |
self.data = data | |
def __len__(self): | |
return len(self.data) | |
def __getitem__(self, idx): | |
return { | |
"input_ids_0": self.data[idx]["input_ids_0"], | |
"input_ids_1": self.data[idx]["input_ids_1"], | |
"label": torch.tensor(self.data[idx]["label"]) | |
} | |
train_dataset = PIQADataset(train_data) | |
val_dataset = PIQADataset(val_data) | |
# β EvoTransformer definition | |
class EvoTransformer(nn.Module): | |
def __init__(self): | |
super().__init__() | |
self.embedding = nn.Embedding(30522, 384) | |
encoder_layer = nn.TransformerEncoderLayer(d_model=384, nhead=6, dim_feedforward=1024, batch_first=True) | |
self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=6) | |
self.classifier = nn.Sequential( | |
nn.Linear(384, 128), | |
nn.ReLU(), | |
nn.Linear(128, 1) | |
) | |
def forward(self, input_ids): | |
x = self.embedding(input_ids) | |
x = self.encoder(x) | |
return self.classifier(x[:, 0, :]).squeeze(-1) | |
# β GPT-3.5 logic | |
def gpt35_answer(prompt): | |
try: | |
response = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=[{"role": "user", "content": prompt}], | |
max_tokens=20, | |
temperature=0 | |
) | |
return response['choices'][0]['message']['content'].strip() | |
except Exception as e: | |
return f"[Error: {e}]" | |
# β Main train + compare function | |
def train_and_demo(few_shot_size): | |
start_time = time.time() | |
model = EvoTransformer().to(device) | |
criterion = nn.CrossEntropyLoss() | |
optimizer = optim.AdamW(model.parameters(), lr=5e-5) | |
loader = DataLoader(train_dataset[:few_shot_size], batch_size=8, shuffle=True) | |
val_loader = DataLoader(val_dataset, batch_size=32) | |
scheduler = get_scheduler("linear", optimizer=optimizer, num_warmup_steps=0, num_training_steps=3 * len(loader)) | |
best_val = 0 | |
accs = [] | |
patience = 2 | |
early_stop = 0 | |
for epoch in range(3): | |
model.train() | |
for batch in loader: | |
optimizer.zero_grad() | |
x0 = batch["input_ids_0"].to(device) | |
x1 = batch["input_ids_1"].to(device) | |
labels = batch["label"].to(device) | |
l0 = model(x0) | |
l1 = model(x1) | |
logits = torch.stack([l0, l1], dim=1) | |
loss = criterion(logits, labels) | |
loss.backward() | |
optimizer.step() | |
scheduler.step() | |
model.eval() | |
correct = 0 | |
with torch.no_grad(): | |
for batch in val_loader: | |
x0 = batch["input_ids_0"].to(device) | |
x1 = batch["input_ids_1"].to(device) | |
labels = batch["label"].to(device) | |
l0 = model(x0) | |
l1 = model(x1) | |
logits = torch.stack([l0, l1], dim=1) | |
preds = torch.argmax(logits, dim=1) | |
correct += (preds == labels).sum().item() | |
acc = correct / len(val_dataset) | |
accs.append(acc) | |
if acc > best_val: | |
best_val = acc | |
early_stop = 0 | |
else: | |
early_stop += 1 | |
if early_stop >= patience: | |
break | |
# β Accuracy plot | |
fig, ax = plt.subplots() | |
ax.plot(accs, marker='o') | |
ax.set_title(f"Validation Accuracy ({few_shot_size} examples)") | |
ax.set_xlabel("Epoch") | |
ax.set_ylabel("Accuracy") | |
buf = io.BytesIO() | |
plt.savefig(buf, format='png') | |
buf.seek(0) | |
img = Image.open(buf) | |
# β Example comparison with GPT-3.5 | |
output = "" | |
for i in range(2): | |
ex = dataset["validation"].iloc[i] | |
goal = ex["goal"] | |
sol1 = ex["sol1"] | |
sol2 = ex["sol2"] | |
x0 = tokenizer(goal + " " + sol1, return_tensors="pt", padding="max_length", max_length=128, truncation=True)["input_ids"].to(device) | |
x1 = tokenizer(goal + " " + sol2, return_tensors="pt", padding="max_length", max_length=128, truncation=True)["input_ids"].to(device) | |
l0 = model(x0) | |
l1 = model(x1) | |
pred_evo = 0 if l0 > l1 else 1 | |
correct_evo = "β " if pred_evo == ex["label"] else "β" | |
gpt_prompt = f"Q: {goal}\nA) {sol1}\nB) {sol2}\nWhich is more appropriate? Answer with A or B only." | |
gpt_out = gpt35_answer(gpt_prompt) | |
pred_gpt = gpt_out[0].upper() | |
correct_gpt = "β " if (pred_gpt == 'A' and ex["label"] == 0) or (pred_gpt == 'B' and ex["label"] == 1) else "β" | |
output += f"Q: {goal}\nA) {sol1}\nB) {sol2}\n\nEvoTransformer: {'A' if pred_evo==0 else 'B'} {correct_evo}\nGPT-3.5: {pred_gpt} {correct_gpt}\n\n" | |
architecture_info = f""" | |
EvoTransformer v2.1 Configuration: | |
- Embedding Dim: 384 | |
- Transformer Layers: 6 | |
- Attention Heads: 6 | |
- Feedforward Size: 1024 | |
- Parameters: ~13M | |
- Training Time: {time.time() - start_time:.2f}s | |
""" | |
return img, f"Best Accuracy: {best_val:.4f}", output.strip() + "\n\n" + architecture_info.strip() | |
# β Gradio app | |
gr.Interface( | |
fn=train_and_demo, | |
inputs=gr.Slider(10, 300, step=10, value=50, label="Training Samples"), | |
outputs=[ | |
gr.Image(label="Accuracy Plot"), | |
gr.Textbox(label="Best Accuracy"), | |
gr.Textbox(label="Evo vs GPT-3.5 Output") | |
], | |
title="𧬠EvoTransformer v2.1 Benchmark", | |
description="Train EvoTransformer on PIQA and compare predictions against GPT-3.5." | |
).launch() | |