import numpy as np import joblib from typing import List, Dict, Any from preprocessor import preprocess_for_classification class TraditionalClassifier: """Traditional text classifier with probability distributions and metadata.""" def __init__( self, classifier_path: str = "models/traditional_svm_classifier.joblib", vectorizer_path: str = "models/traditional_tfidf_vectorizer_classifier.joblib", ): self.model = joblib.load(classifier_path) self.vectorizer = joblib.load(vectorizer_path) self.model_name = classifier_path.split("/")[-1].replace(".joblib", "") def predict(self, text: str) -> Dict[str, Any]: """Predict class with full probability distribution and metadata.""" cleaned_text = preprocess_for_classification(text) if self.vectorizer: text_vector = self.vectorizer.transform([cleaned_text]) else: text_vector = [cleaned_text] prediction = self.model.predict(text_vector)[0] classes = getattr(self.model, "classes_", None) if classes is not None: prediction_index = int(np.where(classes == prediction)[0][0]) else: prediction_index = ( int(prediction) if isinstance(prediction, (int, np.integer)) else 0 ) if hasattr(self.model, "predict_proba"): probabilities = self.model.predict_proba(text_vector)[0] confidence = float(probabilities[prediction_index]) else: if hasattr(self.model, "decision_function"): decision_scores = self.model.decision_function(text_vector)[0] if len(decision_scores.shape) == 0: probabilities = np.array( [ 1 / (1 + np.exp(decision_scores)), 1 / (1 + np.exp(-decision_scores)), ] ) else: exp_scores = np.exp(decision_scores - np.max(decision_scores)) probabilities = exp_scores / np.sum(exp_scores) confidence = float(probabilities[prediction_index]) else: classes = getattr(self.model, "classes_", None) num_classes = len(classes) if classes is not None else 2 probabilities = np.zeros(num_classes) probabilities[prediction_index] = 1.0 confidence = 1.0 classes = getattr(self.model, "classes_", None) prob_distribution = {} if classes is not None: for i, class_label in enumerate(classes): prob_distribution[str(class_label)] = float(probabilities[i]) else: for i, prob in enumerate(probabilities): prob_distribution[f"class_{i}"] = float(prob) return { "prediction": str(prediction), "prediction_index": int(prediction_index), "confidence": confidence, "probability_distribution": prob_distribution, "cleaned_text": cleaned_text, "model_used": self.model_name, "prediction_metadata": { "max_probability": float(np.max(probabilities)), "min_probability": float(np.min(probabilities)), "entropy": float( -np.sum(probabilities * np.log(probabilities + 1e-10)) ), "num_classes": len(probabilities), }, } def predict_batch(self, texts: List[str]) -> List[Dict[str, Any]]: """Predict classes for multiple texts.""" cleaned_texts = [preprocess_for_classification(text) for text in texts] if self.vectorizer: text_vectors = self.vectorizer.transform(cleaned_texts) else: text_vectors = cleaned_texts predictions = self.model.predict(text_vectors) classes = getattr(self.model, "classes_", None) prediction_indices = [] for pred in predictions: if classes is not None: pred_index = int(np.where(classes == pred)[0][0]) else: pred_index = int(pred) if isinstance(pred, (int, np.integer)) else 0 prediction_indices.append(pred_index) if hasattr(self.model, "predict_proba"): probabilities = self.model.predict_proba(text_vectors) else: if hasattr(self.model, "decision_function"): decision_scores = self.model.decision_function(text_vectors) if len(decision_scores.shape) == 1: probabilities = np.column_stack( [ 1 / (1 + np.exp(decision_scores)), 1 / (1 + np.exp(-decision_scores)), ] ) else: exp_scores = np.exp( decision_scores - np.max(decision_scores, axis=1, keepdims=True) ) probabilities = exp_scores / np.sum( exp_scores, axis=1, keepdims=True ) else: classes = getattr(self.model, "classes_", None) num_classes = len(classes) if classes is not None else 2 probabilities = np.zeros((len(predictions), num_classes)) for i, pred_idx in enumerate(prediction_indices): probabilities[i, pred_idx] = 1.0 results = [] for i, (pred, pred_idx) in enumerate(zip(predictions, prediction_indices)): confidence = float(probabilities[i][pred_idx]) prob_distribution = {} if classes is not None: for j, class_label in enumerate(classes): prob_distribution[str(class_label)] = float(probabilities[i][j]) else: for j, prob in enumerate(probabilities[i]): prob_distribution[f"class_{j}"] = float(prob) results.append( { "prediction": str(pred), "prediction_index": int(pred_idx), "confidence": confidence, "probability_distribution": prob_distribution, "cleaned_text": cleaned_texts[i], "model_used": self.model_name, "prediction_metadata": { "max_probability": float(np.max(probabilities[i])), "min_probability": float(np.min(probabilities[i])), "entropy": float( -np.sum(probabilities[i] * np.log(probabilities[i] + 1e-10)) ), "num_classes": len(probabilities[i]), }, } ) return results def get_model_info(self) -> Dict[str, Any]: """Get model information and capabilities.""" classes = getattr(self.model, "classes_", None) return { "model_name": self.model_name, "model_type": type(self.model).__name__, "num_classes": len(classes) if classes is not None else "unknown", "classes": classes.tolist() if classes is not None else None, "has_predict_proba": hasattr(self.model, "predict_proba"), "has_vectorizer": self.vectorizer is not None, "vectorizer_type": type(self.vectorizer).__name__ if self.vectorizer else None, }