safe-talk / train_abuse_model.py
rshakked's picture
fix: resolve model detection and evaluation dropdown issues in app
c8d8b34
# # Install core packages
# !pip install -U transformers datasets accelerate
import threading
import logging
import io
import os
import time
import gradio as gr # βœ… required for progress bar
from datetime import datetime
from pathlib import Path
import queue
# Python standard + ML packages
import pandas as pd
import numpy as np
import torch
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, precision_recall_fscore_support
# Hugging Face Hub
from huggingface_hub import hf_hub_download
# Hugging Face transformers
import transformers
from transformers import (
TrainerCallback,
AutoTokenizer,
DebertaV2Tokenizer,
BertTokenizer,
BertForSequenceClassification,
AutoModelForSequenceClassification,
Trainer,
TrainingArguments
)
from utils import (
map_to_3_classes,
convert_to_label_strings,
tune_thresholds,
label_map,
label_row_soft,
AbuseDataset,
save_and_yield_eval
)
# Create evaluation results directory if it doesn't exist
Path("/home/user/app/results_eval").mkdir(parents=True, exist_ok=True)
PERSIST_DIR = Path("/home/user/app")
MODEL_DIR = PERSIST_DIR / "saved_model"
LOG_FILE = PERSIST_DIR / "training.log"
# configure logging
log_buffer = io.StringIO()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler(log_buffer)
]
)
logger = logging.getLogger(__name__)
# Check versions
logger.info(f"Transformers version: {transformers.__version__}")
# Check for GPU availability
logger.info("torch.cuda.is_available(): %s", torch.cuda.is_available())
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class GradioLoggerCallback(TrainerCallback):
def __init__(self, gr_queue):
self.gr_queue = gr_queue
def on_log(self, args, state, control, logs=None, **kwargs):
if logs:
msg = f"πŸ“Š Step {state.global_step}: {logs}"
logger.info(msg)
self.gr_queue.put(msg)
def evaluate_model_with_thresholds(trainer, test_dataset):
"""Run full evaluation with automatic threshold tuning."""
logger.info("\nπŸ” Running model predictions...")
yield "\nπŸ” Running model predictions..."
predictions = trainer.predict(test_dataset)
probs = torch.sigmoid(torch.tensor(predictions.predictions)).numpy()
true_soft = np.array(predictions.label_ids)
logger.info("\nπŸ”Ž Tuning thresholds...")
yield "\nπŸ”Ž Tuning thresholds..."
best_low, best_high, best_f1 = tune_thresholds(probs, true_soft)
logger.info(f"\nβœ… Best thresholds: low={best_low:.2f}, high={best_high:.2f} (macro F1={best_f1:.3f})")
yield f"\nβœ… Best thresholds: low={best_low:.2f}, high={best_high:.2f} (macro F1={best_f1:.3f})"
final_pred_soft = map_to_3_classes(probs, best_low, best_high)
final_pred_str = convert_to_label_strings(final_pred_soft)
true_str = convert_to_label_strings(true_soft)
logger.info("\nπŸ“Š Final Evaluation Report (multi-class per label):\n")
yield "\nπŸ“Š Final Evaluation Report (multi-class per label):\n "
report = classification_report(
true_str,
final_pred_str,
labels=["no", "plausibly", "yes"],
digits=3,
zero_division=0
)
logger.info(report)
yield from save_and_yield_eval(report)
# Save to file
with open("/home/user/app/results_eval/eval_report.txt", "w") as f:
f.write(report)
def load_saved_model_and_tokenizer():
tokenizer = DebertaV2Tokenizer.from_pretrained(MODEL_DIR)
model = AutoModelForSequenceClassification.from_pretrained(MODEL_DIR).to(device)
return tokenizer, model
def evaluate_saved_model(progress=gr.Progress(track_tqdm=True)):
if MODEL_DIR.exists():
yield "βœ… Trained model found! Skipping training...\n"
else:
yield "❌ No trained model found. Please train the model first.\n"
return
try:
logger.info("πŸ” Loading saved model for evaluation...")
yield "πŸ” Loading saved model for evaluation...\n"
tokenizer, model = load_saved_model_and_tokenizer()
test_dataset = AbuseDataset(test_texts, test_labels, tokenizer)
trainer = Trainer(
model=model,
args=TrainingArguments(
output_dir="./results_eval",
per_device_eval_batch_size=4,
logging_dir="./logs_eval",
disable_tqdm=True
),
eval_dataset=test_dataset
)
# Re-yield from generator
for line in evaluate_model_with_thresholds(trainer, test_dataset):
yield line
logger.info("βœ… Evaluation complete.\n")
yield "\nβœ… Evaluation complete.\n"
except Exception as e:
logger.exception(f"❌ Evaluation failed: {e}")
yield f"❌ Evaluation failed: {e}\n"
token = os.environ.get("HF_TOKEN") # Reads my token from a secure hf secret
# Load dataset from Hugging Face Hub
path = hf_hub_download(
repo_id="rshakked/abusive-relashionship-stories",
filename="Abusive Relationship Stories - Technion & MSF.xlsx",
repo_type="dataset",
use_auth_token= token
)
df = pd.read_excel(path)
# Define text and label columns
text_column = "post_body"
label_columns = [
'emotional_violence', 'physical_violence', 'sexual_violence', 'spiritual_violence',
'economic_violence', 'past_offenses', 'social_isolation', 'refuses_treatment',
'suicidal_threats', 'mental_condition', 'daily_activity_control', 'violent_behavior',
'unemployment', 'substance_use', 'obsessiveness', 'jealousy', 'outbursts',
'ptsd', 'hard_childhood', 'emotional_dependency', 'prevention_of_care',
'fear_based_relationship', 'humiliation', 'physical_threats',
'presence_of_others_in_assault', 'signs_of_injury', 'property_damage',
'access_to_weapons', 'gaslighting'
]
logger.info(np.shape(df))
# Clean data
df = df[[text_column] + label_columns]
logger.info(np.shape(df))
df = df.dropna(subset=[text_column])
logger.info(np.shape(df))
df["label_vector"] = df.apply(lambda row: label_row_soft(row, label_columns), axis=1)
label_matrix = df["label_vector"].tolist()
# Proper 3-way split: train / val / test
train_val_texts, test_texts, train_val_labels, test_labels = train_test_split(
df[text_column].tolist(), label_matrix, test_size=0.2, random_state=42
)
train_texts, val_texts, train_labels, val_labels = train_test_split(
train_val_texts, train_val_labels, test_size=0.1, random_state=42
)
#model_name = "onlplab/alephbert-base"
model_name = "microsoft/deberta-v3-base"
def run_training(progress=gr.Progress(track_tqdm=True)):
log_queue = queue.Queue()
if MODEL_DIR.exists():
yield "βœ… Trained model found! Skipping training...\n"
for line in evaluate_saved_model():
yield line
return
yield "πŸš€ Starting training...\n"
try:
logger.info("Starting training run...")
# Load pretrained model for fine-tuning
tokenizer = DebertaV2Tokenizer.from_pretrained(model_name)
model = AutoModelForSequenceClassification.from_pretrained(
model_name,
num_labels=len(label_columns),
problem_type="multi_label_classification"
).to(device) # Move model to GPU
# gradient checkpointing helps cut memory use:
model.gradient_checkpointing_enable()
# Freeze bottom 6 layers of DeBERTa encoder
for name, param in model.named_parameters():
if any(f"encoder.layer.{i}." in name for i in range(0, 6)):
param.requires_grad = False
train_dataset = AbuseDataset(train_texts, train_labels,tokenizer)
val_dataset = AbuseDataset(val_texts, val_labels,tokenizer)
test_dataset = AbuseDataset(test_texts, test_labels,tokenizer)
# TrainingArguments for HuggingFace Trainer (logging, saving)
training_args = TrainingArguments(
output_dir="./results",
num_train_epochs=3,
per_device_train_batch_size=8,
per_device_eval_batch_size=8,
evaluation_strategy="epoch",
save_strategy="epoch",
logging_dir="./logs",
logging_steps=500,
disable_tqdm=True
)
# Train using HuggingFace Trainer
trainer = Trainer(
model=model,
args=training_args,
train_dataset=train_dataset,
eval_dataset=val_dataset,
callbacks=[GradioLoggerCallback(log_queue)]
)
logger.info("Training started with %d samples", len(train_dataset))
yield "πŸ”„ Training started...\n"
progress(0.01)
# Run training in background thread
trainer_training = [True]
def background_train():
trainer.train()
trainer_training[0] = False # Mark as done
train_thread = threading.Thread(target=background_train)
train_thread.start()
# Drain log queue live while training runs
percent = 0
while train_thread.is_alive() or not log_queue.empty():
while not log_queue.empty():
log_msg = log_queue.get()
yield log_msg
# Optional: update progress bar slowly toward 1.0
if percent < 98:
percent += 1
progress(percent / 100)
time.sleep(1)
progress(1.0)
yield "βœ… Progress: 100%\n"
# Start training!
trainer.train()
# # Drain queue to UI
# while not log_queue.empty():
# yield log_queue.get()
progress(1.0)
yield "βœ… Progress: 100%\n"
# Save the model and tokenizer
MODEL_DIR.mkdir(parents=True, exist_ok=True)
model.save_pretrained(MODEL_DIR)
tokenizer.save_pretrained(MODEL_DIR)
logger.info(" Training completed and model saved.")
yield f"πŸŽ‰ Training complete! Model saved on {MODEL_DIR.resolve()}.\n"
except Exception as e:
logger.exception( f"❌ Training failed: {e}")
yield f"❌ Training failed: {e}\n"
# Evaluation
try:
if 'trainer' in locals():
for line in evaluate_model_with_thresholds(trainer, test_dataset):
yield line
logger.info("Evaluation completed")
logger.info("Evaluation completed")
yield "πŸ“ˆ Evaluation completed\n"
except Exception as e:
logger.exception(f"Evaluation failed: {e}")
return
def push_model_to_hub():
try:
logger.info("πŸ”„ Pushing model to Hugging Face Hub...")
tokenizer, model = load_saved_model_and_tokenizer()
model.push_to_hub("rshakked/abuse-detector-he-en", use_auth_token=token)
tokenizer.push_to_hub("rshakked/abuse-detector-he-en", use_auth_token=token)
return "βœ… Model pushed to hub successfully!"
except Exception as e:
logger.exception("❌ Failed to push model to hub.")
return f"❌ Failed to push model: {e}"