Spaces:
Sleeping
Sleeping
from fastapi import APIRouter | |
from datetime import datetime | |
from datasets import load_dataset | |
from sklearn.metrics import accuracy_score | |
import random | |
from transformers import pipeline, AutoConfig | |
import os | |
from concurrent.futures import ThreadPoolExecutor | |
from typing import List, Dict | |
import numpy as np | |
import torch | |
from .utils.evaluation import TextEvaluationRequest | |
from .utils.emissions import tracker, clean_emissions_data, get_space_info | |
# Disable torch compile | |
os.environ["TORCH_COMPILE_DISABLE"] = "1" | |
router = APIRouter() | |
DESCRIPTION = "Random Baseline" | |
ROUTE = "/text" | |
class TextClassifier: | |
def __init__(self): | |
self.config = AutoConfig.from_pretrained("camillebrl/ModernBERT-envclaims-overfit") | |
self.label2id = self.config.label2id | |
self.classifier = pipeline( | |
"text-classification", | |
"camillebrl/ModernBERT-envclaims-overfit", | |
device="cpu", | |
batch_size=16 | |
) | |
def process_batch(self, batch: List[str]) -> List[int]: | |
""" | |
Process a batch of texts and return their predictions | |
""" | |
try: | |
batch_preds = self.classifier(list(batch)) | |
return [self.label2id[pred[0]["label"]] for pred in batch_preds] | |
except Exception as e: | |
print(f"Error processing batch: {e}") | |
return [] | |
async def evaluate_text(request: TextEvaluationRequest): | |
""" | |
Evaluate text classification for climate disinformation detection. | |
Current Model: Random Baseline | |
- Makes random predictions from the label space (0-7) | |
- Used as a baseline for comparison | |
""" | |
# Get space info | |
username, space_url = get_space_info() | |
# Define the label mapping | |
LABEL_MAPPING = { | |
"0_not_relevant": 0, | |
"1_not_happening": 1, | |
"2_not_human": 2, | |
"3_not_bad": 3, | |
"4_solutions_harmful_unnecessary": 4, | |
"5_science_unreliable": 5, | |
"6_proponents_biased": 6, | |
"7_fossil_fuels_needed": 7 | |
} | |
# Load and prepare the dataset | |
dataset = load_dataset(request.dataset_name) | |
# Convert string labels to integers | |
dataset = dataset.map(lambda x: {"label": LABEL_MAPPING[x["label"]]}) | |
# Split dataset | |
train_test = dataset["train"] | |
test_dataset = dataset["test"] | |
# Start tracking emissions | |
tracker.start() | |
tracker.start_task("inference") | |
#-------------------------------------------------------------------------------------------- | |
# YOUR MODEL INFERENCE CODE HERE | |
# Update the code below to replace the random baseline by your model inference within the inference pass where the energy consumption and emissions are tracked. | |
#-------------------------------------------------------------------------------------------- | |
# Make random predictions (placeholder for actual model inference) | |
true_labels = test_dataset["label"] | |
config = AutoConfig.from_pretrained("camillebrl/ModernBERT-envclaims-overfit") | |
label2id = config.label2id | |
# classifier = pipeline( | |
# "text-classification", | |
# "camillebrl/ModernBERT-envclaims-overfit", | |
# device="cpu" | |
# ) | |
# print("len dataset : ", len(test_dataset["quote"])) | |
# predictions = [] | |
# for batch in range(0, len(test_dataset["quote"]), 32): # Ajustez la taille des batchs | |
# batch_quotes = test_dataset["quote"][batch:batch + 32] | |
# batch_predictions = classifier(batch_quotes) | |
# predictions.extend([label2id[pred["label"]] for pred in batch_predictions]) | |
# print(predictions) | |
# print("final predictions : ", predictions) | |
# Initialize the model once | |
classifier = TextClassifier() | |
# Prepare batches | |
batch_size = 32 | |
quotes = test_dataset["quote"] | |
num_batches = len(quotes) // batch_size + (1 if len(quotes) % batch_size != 0 else 0) | |
batches = [ | |
quotes[i * batch_size:(i + 1) * batch_size] | |
for i in range(num_batches) | |
] | |
# Process batches in parallel | |
max_workers = min(os.cpu_count(), 4) # Limit to 4 workers or CPU count | |
print(f"Processing with {max_workers} workers") | |
with ThreadPoolExecutor(max_workers=max_workers) as executor: | |
# Submit all batches for processing | |
futures = [ | |
executor.submit(classifier.process_batch, batch) | |
for batch in batches | |
] | |
# Collect results in order | |
for future in futures: | |
try: | |
batch_preds = future.result() | |
predictions.extend(batch_preds) | |
except Exception as e: | |
print(f"Batch processing failed: {e}") | |
#-------------------------------------------------------------------------------------------- | |
# YOUR MODEL INFERENCE STOPS HERE | |
#-------------------------------------------------------------------------------------------- | |
# Stop tracking emissions | |
emissions_data = tracker.stop_task() | |
# Calculate accuracy | |
accuracy = accuracy_score(true_labels, predictions) | |
print("accuracy : ", accuracy) | |
# Prepare results dictionary | |
results = { | |
"username": username, | |
"space_url": space_url, | |
"submission_timestamp": datetime.now().isoformat(), | |
"model_description": DESCRIPTION, | |
"accuracy": float(accuracy), | |
"energy_consumed_wh": emissions_data.energy_consumed * 1000, | |
"emissions_gco2eq": emissions_data.emissions * 1000, | |
"emissions_data": clean_emissions_data(emissions_data), | |
"api_route": ROUTE, | |
"dataset_config": { | |
"dataset_name": request.dataset_name, | |
"test_size": request.test_size, | |
"test_seed": request.test_seed | |
} | |
} | |
print("results : ", results) | |
return results |