|
import os |
|
import pandas as pd |
|
import numpy as np |
|
from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score |
|
from typing import Dict, List |
|
import json |
|
from utils.except_dir import cust_listdir |
|
|
|
class MetricsEvaluator: |
|
def __init__(self, pred_dir: str, label_dir: str, save_dir: str): |
|
""" |
|
Args: |
|
pred_dir: 예측 csv 파일들이 있는 디렉토리 경로 |
|
label_dir: 정답 csv 파일들이 있는 디렉토리 경로 |
|
save_dir: 결과를 저장할 디렉토리 경로 |
|
""" |
|
self.pred_dir = pred_dir |
|
self.label_dir = label_dir |
|
self.save_dir = save_dir |
|
|
|
def evaluate(self) -> Dict: |
|
"""전체 평가 수행""" |
|
category_metrics = {} |
|
all_metrics = { |
|
'falldown': {'f1': [], 'accuracy': [], 'precision': [], 'recall': [], 'specificity': []}, |
|
'violence': {'f1': [], 'accuracy': [], 'precision': [], 'recall': [], 'specificity': []}, |
|
'fire': {'f1': [], 'accuracy': [], 'precision': [], 'recall': [], 'specificity': []} |
|
} |
|
|
|
|
|
all_categories_metrics = [] |
|
|
|
for category in cust_listdir(self.pred_dir): |
|
if not os.path.isdir(os.path.join(self.pred_dir, category)): |
|
continue |
|
|
|
pred_category_path = os.path.join(self.pred_dir, category) |
|
label_category_path = os.path.join(self.label_dir, category) |
|
save_category_path = os.path.join(self.save_dir, category) |
|
os.makedirs(save_category_path, exist_ok=True) |
|
|
|
|
|
metrics_df = self._evaluate_category(category, pred_category_path, label_category_path) |
|
|
|
metrics_df['category'] = category |
|
|
|
metrics_df.to_csv(os.path.join(save_category_path, f"{category}_metrics.csv"), index=False) |
|
|
|
all_categories_metrics.append(metrics_df) |
|
|
|
|
|
category_metrics[category] = metrics_df.iloc[-1].to_dict() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for col in metrics_df.columns: |
|
if col != 'video_name': |
|
try: |
|
|
|
parts = col.split('_', 1) |
|
if len(parts) == 2: |
|
event_type, metric_type = parts |
|
if event_type in all_metrics and metric_type in all_metrics[event_type]: |
|
all_metrics[event_type][metric_type].append(category_metrics[category][col]) |
|
except Exception as e: |
|
print(f"Warning: Could not process column {col}: {str(e)}") |
|
continue |
|
|
|
|
|
all_categories_metrics_without_avg = [df.iloc[:-1] for df in all_categories_metrics] |
|
|
|
combined_metrics_df = pd.concat(all_categories_metrics_without_avg, ignore_index=True) |
|
|
|
combined_metrics_df.to_csv(os.path.join(self.save_dir, "all_categories_metrics.csv"), index=False) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print("\nCategory-wise Average Metrics:") |
|
for category, metrics in category_metrics.items(): |
|
print(f"\n{category}:") |
|
for metric_name, value in metrics.items(): |
|
if metric_name != "video_name": |
|
try: |
|
if isinstance(value, str): |
|
print(f"{metric_name}: {value}") |
|
elif metric_name in ['tp', 'tn', 'fp', 'fn']: |
|
print(f"{metric_name}: {int(value)}") |
|
else: |
|
print(f"{metric_name}: {float(value):.3f}") |
|
except (ValueError, TypeError): |
|
print(f"{metric_name}: {value}") |
|
|
|
print("\n" + "="*50) |
|
print("Overall Average Metrics Across All Categories:") |
|
print("="*50) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for event_type in all_metrics: |
|
print(f"\n{event_type}:") |
|
for metric_type, values in all_metrics[event_type].items(): |
|
avg_value = np.mean(values) |
|
if metric_type in ['tp', 'tn', 'fp', 'fn']: |
|
print(f"{metric_type}: {int(avg_value)}") |
|
else: |
|
print(f"{metric_type}: {avg_value:.3f}") |
|
|
|
|
|
final_results = { |
|
"category_metrics": {}, |
|
"overall_metrics": {} |
|
} |
|
|
|
|
|
for category, metrics in category_metrics.items(): |
|
final_results["category_metrics"][category] = {} |
|
for metric_name, value in metrics.items(): |
|
if metric_name != "video_name": |
|
if isinstance(value, (int, float)): |
|
final_results["category_metrics"][category][metric_name] = float(value) |
|
|
|
|
|
for event_type in all_metrics: |
|
|
|
final_results["overall_metrics"][event_type] = {} |
|
for metric_type, values in all_metrics[event_type].items(): |
|
avg_value = float(np.mean(values)) |
|
|
|
final_results["overall_metrics"][event_type][metric_type] = avg_value |
|
|
|
|
|
json_path = os.path.join(self.save_dir, "overall_metrics.json") |
|
with open(json_path, 'w', encoding='utf-8') as f: |
|
json.dump(final_results, f, indent=4) |
|
|
|
|
|
|
|
|
|
accumulated_metrics = self.calculate_accumulated_metrics(combined_metrics_df) |
|
|
|
|
|
final_results["accumulated_metrics"] = accumulated_metrics |
|
|
|
|
|
accumulated_json_path = os.path.join(self.save_dir, "accumulated_metrics.json") |
|
with open(accumulated_json_path, 'w', encoding='utf-8') as f: |
|
json.dump(accumulated_metrics, f, indent=4) |
|
|
|
return accumulated_metrics |
|
|
|
def _evaluate_category(self, category: str, pred_path: str, label_path: str) -> pd.DataFrame: |
|
"""카테고리별 평가 수행""" |
|
results = [] |
|
metrics_columns = ['video_name'] |
|
|
|
for pred_file in cust_listdir(pred_path): |
|
if not pred_file.endswith('.csv'): |
|
continue |
|
|
|
video_name = os.path.splitext(pred_file)[0] |
|
pred_df = pd.read_csv(os.path.join(pred_path, pred_file)) |
|
|
|
|
|
label_file = f"{video_name}.csv" |
|
label_path_full = os.path.join(label_path, label_file) |
|
|
|
if not os.path.exists(label_path_full): |
|
print(f"Warning: Label file not found for {video_name}") |
|
continue |
|
|
|
label_df = pd.read_csv(label_path_full) |
|
|
|
|
|
video_metrics = {'video_name': video_name} |
|
categories = [col for col in pred_df.columns if col != 'frame'] |
|
|
|
for cat in categories: |
|
|
|
y_true = label_df[cat].values |
|
y_pred = pred_df[cat].values |
|
|
|
|
|
metrics = self._calculate_metrics(y_true, y_pred) |
|
|
|
|
|
for metric_name, value in metrics.items(): |
|
col_name = f"{cat}_{metric_name}" |
|
video_metrics[col_name] = value |
|
if col_name not in metrics_columns: |
|
metrics_columns.append(col_name) |
|
|
|
results.append(video_metrics) |
|
|
|
|
|
metrics_df = pd.DataFrame(results, columns=metrics_columns) |
|
|
|
|
|
avg_metrics = {'video_name': 'average'} |
|
for col in metrics_columns[1:]: |
|
avg_metrics[col] = metrics_df[col].mean() |
|
|
|
metrics_df = pd.concat([metrics_df, pd.DataFrame([avg_metrics])], ignore_index=True) |
|
|
|
return metrics_df |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def calculate_accumulated_metrics(self, all_categories_metrics_df: pd.DataFrame) -> Dict: |
|
"""누적된 혼동행렬로 각 카테고리별 성능 지표 계산""" |
|
accumulated_results = {"micro_avg": {}} |
|
categories = ['falldown', 'violence', 'fire'] |
|
|
|
for category in categories: |
|
|
|
tp = all_categories_metrics_df[f'{category}_tp'].sum() |
|
tn = all_categories_metrics_df[f'{category}_tn'].sum() |
|
fp = all_categories_metrics_df[f'{category}_fp'].sum() |
|
fn = all_categories_metrics_df[f'{category}_fn'].sum() |
|
|
|
|
|
metrics = { |
|
'tp': int(tp), |
|
'tn': int(tn), |
|
'fp': int(fp), |
|
'fn': int(fn), |
|
'accuracy': (tp + tn) / (tp + tn + fp + fn) if (tp + tn + fp + fn) > 0 else 0, |
|
'precision': tp / (tp + fp) if (tp + fp) > 0 else 0, |
|
'recall': tp / (tp + fn) if (tp + fn) > 0 else 0, |
|
'specificity': tn / (tn + fp) if (tn + fp) > 0 else 0, |
|
'f1': 2 * tp / (2 * tp + fp + fn) if (2 * tp + fp + fn) > 0 else 0, |
|
} |
|
|
|
|
|
tpr = metrics['recall'] |
|
tnr = metrics['specificity'] |
|
|
|
|
|
metrics['balanced_accuracy'] = (tpr + tnr) / 2 |
|
|
|
|
|
metrics['g_mean'] = np.sqrt(tpr * tnr) if (tpr * tnr) > 0 else 0 |
|
|
|
|
|
numerator = (tp * tn) - (fp * fn) |
|
denominator = np.sqrt((tp + fp) * (tp + fn) * (tn + fp) * (tn + fn)) |
|
metrics['mcc'] = numerator / denominator if denominator > 0 else 0 |
|
|
|
|
|
metrics['npv'] = tn / (tn + fn) if (tn + fn) > 0 else 0 |
|
|
|
|
|
metrics['far'] = 1 - metrics['specificity'] |
|
|
|
accumulated_results[category] = metrics |
|
|
|
|
|
total_tp = sum(accumulated_results[cat]['tp'] for cat in categories) |
|
total_tn = sum(accumulated_results[cat]['tn'] for cat in categories) |
|
total_fp = sum(accumulated_results[cat]['fp'] for cat in categories) |
|
total_fn = sum(accumulated_results[cat]['fn'] for cat in categories) |
|
|
|
|
|
accumulated_results["micro_avg"] = { |
|
'tp': int(total_tp), |
|
'tn': int(total_tn), |
|
'fp': int(total_fp), |
|
'fn': int(total_fn), |
|
'accuracy': (total_tp + total_tn) / (total_tp + total_tn + total_fp + total_fn), |
|
'precision': total_tp / (total_tp + total_fp) if (total_tp + total_fp) > 0 else 0, |
|
'recall': total_tp / (total_tp + total_fn) if (total_tp + total_fn) > 0 else 0, |
|
'f1': 2 * total_tp / (2 * total_tp + total_fp + total_fn) if (2 * total_tp + total_fp + total_fn) > 0 else 0, |
|
|
|
} |
|
|
|
return accumulated_results |
|
def _calculate_metrics(self, y_true: np.ndarray, y_pred: np.ndarray) -> Dict: |
|
"""성능 지표 계산""" |
|
tn = np.sum((y_true == 0) & (y_pred == 0)) |
|
fp = np.sum((y_true == 0) & (y_pred == 1)) |
|
fn = np.sum((y_true == 1) & (y_pred == 0)) |
|
tp = np.sum((y_true == 1) & (y_pred == 1)) |
|
|
|
metrics = { |
|
'f1': f1_score(y_true, y_pred, zero_division=0), |
|
'accuracy': accuracy_score(y_true, y_pred), |
|
'precision': precision_score(y_true, y_pred, zero_division=0), |
|
'recall': recall_score(y_true, y_pred, zero_division=0), |
|
'specificity': tn / (tn + fp) if (tn + fp) > 0 else 0, |
|
'tp': int(tp), |
|
'tn': int(tn), |
|
'fp': int(fp), |
|
'fn': int(fn) |
|
} |
|
|
|
return metrics |