jisujang's picture
first
a005c19
import os
import pandas as pd
import numpy as np
from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score
from typing import Dict, List
import json
from utils.except_dir import cust_listdir
class MetricsEvaluator:
def __init__(self, pred_dir: str, label_dir: str, save_dir: str):
"""
Args:
pred_dir: 예측 csv 파일들이 있는 디렉토리 경로
label_dir: 정답 csv 파일들이 있는 디렉토리 경로
save_dir: 결과를 저장할 디렉토리 경로
"""
self.pred_dir = pred_dir
self.label_dir = label_dir
self.save_dir = save_dir
def evaluate(self) -> Dict:
"""전체 평가 수행"""
category_metrics = {} # 카테고리별 평균 성능 저장
all_metrics = { # 모든 카테고리 통합 메트릭
'falldown': {'f1': [], 'accuracy': [], 'precision': [], 'recall': [], 'specificity': []},
'violence': {'f1': [], 'accuracy': [], 'precision': [], 'recall': [], 'specificity': []},
'fire': {'f1': [], 'accuracy': [], 'precision': [], 'recall': [], 'specificity': []}
}
# 모든 카테고리의 metrics를 저장할 DataFrame 리스트
all_categories_metrics = []
for category in cust_listdir(self.pred_dir):
if not os.path.isdir(os.path.join(self.pred_dir, category)):
continue
pred_category_path = os.path.join(self.pred_dir, category)
label_category_path = os.path.join(self.label_dir, category)
save_category_path = os.path.join(self.save_dir, category)
os.makedirs(save_category_path, exist_ok=True)
# 결과 저장을 위한 데이터프레임 생성
metrics_df = self._evaluate_category(category, pred_category_path, label_category_path)
metrics_df['category'] = category
metrics_df.to_csv(os.path.join(save_category_path, f"{category}_metrics.csv"), index=False)
all_categories_metrics.append(metrics_df)
# 카테고리별 평균 성능 저장
category_metrics[category] = metrics_df.iloc[-1].to_dict() # 마지막 row(평균)
# 전체 평균을 위한 메트릭 수집
# for col in metrics_df.columns:
# if col != 'video_name':
# event_type, metric_type = col.split('_')
# all_metrics[event_type][metric_type].append(category_metrics[category][col])
for col in metrics_df.columns:
if col != 'video_name':
try:
# 첫 번째 언더스코어를 기준으로 이벤트 타입과 메트릭 타입 분리
parts = col.split('_', 1) # maxsplit=1로 첫 번째 언더스코어에서만 분리
if len(parts) == 2:
event_type, metric_type = parts
if event_type in all_metrics and metric_type in all_metrics[event_type]:
all_metrics[event_type][metric_type].append(category_metrics[category][col])
except Exception as e:
print(f"Warning: Could not process column {col}: {str(e)}")
continue
# 각 DataFrame에서 마지막 행(average)을 제거
all_categories_metrics_without_avg = [df.iloc[:-1] for df in all_categories_metrics]
# 모든 카테고리의 metrics를 하나의 DataFrame으로 합치기
combined_metrics_df = pd.concat(all_categories_metrics_without_avg, ignore_index=True)
# 합쳐진 metrics를 json 파일과 같은 위치에 저장
combined_metrics_df.to_csv(os.path.join(self.save_dir, "all_categories_metrics.csv"), index=False)
# 결과 출력
# print("\nCategory-wise Average Metrics:")
# for category, metrics in category_metrics.items():
# print(f"\n{category}:")
# for metric_name, value in metrics.items():
# if metric_name != "video_name":
# print(f"{metric_name}: {value:.3f}")
print("\nCategory-wise Average Metrics:")
for category, metrics in category_metrics.items():
print(f"\n{category}:")
for metric_name, value in metrics.items():
if metric_name != "video_name":
try:
if isinstance(value, str):
print(f"{metric_name}: {value}")
elif metric_name in ['tp', 'tn', 'fp', 'fn']:
print(f"{metric_name}: {int(value)}")
else:
print(f"{metric_name}: {float(value):.3f}")
except (ValueError, TypeError):
print(f"{metric_name}: {value}")
# 전체 평균 계산 및 출력
print("\n" + "="*50)
print("Overall Average Metrics Across All Categories:")
print("="*50)
# for event_type in all_metrics:
# print(f"\n{event_type}:")
# for metric_type, values in all_metrics[event_type].items():
# avg_value = np.mean(values)
# print(f"{metric_type}: {avg_value:.3f}")
for event_type in all_metrics:
print(f"\n{event_type}:")
for metric_type, values in all_metrics[event_type].items():
avg_value = np.mean(values)
if metric_type in ['tp', 'tn', 'fp', 'fn']: # 정수 값
print(f"{metric_type}: {int(avg_value)}")
else: # 소수점 값
print(f"{metric_type}: {avg_value:.3f}")
##################################################################################################
# 최종 결과를 저장할 딕셔너리
final_results = {
"category_metrics": {},
"overall_metrics": {}
}
# 카테고리별 메트릭 저장
for category, metrics in category_metrics.items():
final_results["category_metrics"][category] = {}
for metric_name, value in metrics.items():
if metric_name != "video_name":
if isinstance(value, (int, float)):
final_results["category_metrics"][category][metric_name] = float(value)
# 전체 평균 계산 및 저장
for event_type in all_metrics:
# print(f"\n{event_type}:")
final_results["overall_metrics"][event_type] = {}
for metric_type, values in all_metrics[event_type].items():
avg_value = float(np.mean(values))
# print(f"{metric_type}: {avg_value:.3f}")
final_results["overall_metrics"][event_type][metric_type] = avg_value
# JSON 파일로 저장
json_path = os.path.join(self.save_dir, "overall_metrics.json")
with open(json_path, 'w', encoding='utf-8') as f:
json.dump(final_results, f, indent=4)
# return category_metrics
# 누적 메트릭 계산
accumulated_metrics = self.calculate_accumulated_metrics(combined_metrics_df)
# JSON에 누적 메트릭 추가
final_results["accumulated_metrics"] = accumulated_metrics
# 누적 메트릭만 따로 저장
accumulated_json_path = os.path.join(self.save_dir, "accumulated_metrics.json")
with open(accumulated_json_path, 'w', encoding='utf-8') as f:
json.dump(accumulated_metrics, f, indent=4)
return accumulated_metrics
def _evaluate_category(self, category: str, pred_path: str, label_path: str) -> pd.DataFrame:
"""카테고리별 평가 수행"""
results = []
metrics_columns = ['video_name']
for pred_file in cust_listdir(pred_path):
if not pred_file.endswith('.csv'):
continue
video_name = os.path.splitext(pred_file)[0]
pred_df = pd.read_csv(os.path.join(pred_path, pred_file))
# 해당 비디오의 정답 CSV 파일 로드
label_file = f"{video_name}.csv"
label_path_full = os.path.join(label_path, label_file)
if not os.path.exists(label_path_full):
print(f"Warning: Label file not found for {video_name}")
continue
label_df = pd.read_csv(label_path_full)
# 각 카테고리별 메트릭 계산
video_metrics = {'video_name': video_name}
categories = [col for col in pred_df.columns if col != 'frame']
for cat in categories:
# 정답값과 예측값
y_true = label_df[cat].values
y_pred = pred_df[cat].values
# 메트릭 계산
metrics = self._calculate_metrics(y_true, y_pred)
# 결과 저장
for metric_name, value in metrics.items():
col_name = f"{cat}_{metric_name}"
video_metrics[col_name] = value
if col_name not in metrics_columns:
metrics_columns.append(col_name)
results.append(video_metrics)
# 결과를 데이터프레임으로 변환
metrics_df = pd.DataFrame(results, columns=metrics_columns)
# 평균 계산하여 추가
avg_metrics = {'video_name': 'average'}
for col in metrics_columns[1:]: # video_name 제외
avg_metrics[col] = metrics_df[col].mean()
metrics_df = pd.concat([metrics_df, pd.DataFrame([avg_metrics])], ignore_index=True)
return metrics_df
# def _calculate_metrics(self, y_true: np.ndarray, y_pred: np.ndarray) -> Dict:
# """성능 지표 계산"""
# tn = np.sum((y_true == 0) & (y_pred == 0))
# fp = np.sum((y_true == 0) & (y_pred == 1))
# metrics = {
# 'f1': f1_score(y_true, y_pred, zero_division=0),
# 'accuracy': accuracy_score(y_true, y_pred),
# 'precision': precision_score(y_true, y_pred, zero_division=0),
# 'recall': recall_score(y_true, y_pred, zero_division=0),
# 'specificity': tn / (tn + fp) if (tn + fp) > 0 else 0
# }
# return metrics
def calculate_accumulated_metrics(self, all_categories_metrics_df: pd.DataFrame) -> Dict:
"""누적된 혼동행렬로 각 카테고리별 성능 지표 계산"""
accumulated_results = {"micro_avg": {}}
categories = ['falldown', 'violence', 'fire']
for category in categories:
# 해당 카테고리의 혼동행렬 값들 누적
tp = all_categories_metrics_df[f'{category}_tp'].sum()
tn = all_categories_metrics_df[f'{category}_tn'].sum()
fp = all_categories_metrics_df[f'{category}_fp'].sum()
fn = all_categories_metrics_df[f'{category}_fn'].sum()
# 기본 메트릭 계산
metrics = {
'tp': int(tp),
'tn': int(tn),
'fp': int(fp),
'fn': int(fn),
'accuracy': (tp + tn) / (tp + tn + fp + fn) if (tp + tn + fp + fn) > 0 else 0,
'precision': tp / (tp + fp) if (tp + fp) > 0 else 0,
'recall': tp / (tp + fn) if (tp + fn) > 0 else 0,
'specificity': tn / (tn + fp) if (tn + fp) > 0 else 0,
'f1': 2 * tp / (2 * tp + fp + fn) if (2 * tp + fp + fn) > 0 else 0,
}
# 추가 메트릭 계산
tpr = metrics['recall'] # TPR = recall
tnr = metrics['specificity'] # TNR = specificity
# Balanced Accuracy
metrics['balanced_accuracy'] = (tpr + tnr) / 2
# G-Mean
metrics['g_mean'] = np.sqrt(tpr * tnr) if (tpr * tnr) > 0 else 0
# MCC (Matthews Correlation Coefficient)
numerator = (tp * tn) - (fp * fn)
denominator = np.sqrt((tp + fp) * (tp + fn) * (tn + fp) * (tn + fn))
metrics['mcc'] = numerator / denominator if denominator > 0 else 0
# NPV (Negative Predictive Value)
metrics['npv'] = tn / (tn + fn) if (tn + fn) > 0 else 0
# FAR (False Alarm Rate) = FPR = 1 - specificity
metrics['far'] = 1 - metrics['specificity']
accumulated_results[category] = metrics
# 전체 카테고리의 누적 값으로 계산
total_tp = sum(accumulated_results[cat]['tp'] for cat in categories)
total_tn = sum(accumulated_results[cat]['tn'] for cat in categories)
total_fp = sum(accumulated_results[cat]['fp'] for cat in categories)
total_fn = sum(accumulated_results[cat]['fn'] for cat in categories)
# micro average 계산 (전체 누적 값으로 계산)
accumulated_results["micro_avg"] = {
'tp': int(total_tp),
'tn': int(total_tn),
'fp': int(total_fp),
'fn': int(total_fn),
'accuracy': (total_tp + total_tn) / (total_tp + total_tn + total_fp + total_fn),
'precision': total_tp / (total_tp + total_fp) if (total_tp + total_fp) > 0 else 0,
'recall': total_tp / (total_tp + total_fn) if (total_tp + total_fn) > 0 else 0,
'f1': 2 * total_tp / (2 * total_tp + total_fp + total_fn) if (2 * total_tp + total_fp + total_fn) > 0 else 0,
# ... (다른 메트릭들도 동일한 방식으로 계산)
}
return accumulated_results
def _calculate_metrics(self, y_true: np.ndarray, y_pred: np.ndarray) -> Dict:
"""성능 지표 계산"""
tn = np.sum((y_true == 0) & (y_pred == 0))
fp = np.sum((y_true == 0) & (y_pred == 1))
fn = np.sum((y_true == 1) & (y_pred == 0))
tp = np.sum((y_true == 1) & (y_pred == 1))
metrics = {
'f1': f1_score(y_true, y_pred, zero_division=0),
'accuracy': accuracy_score(y_true, y_pred),
'precision': precision_score(y_true, y_pred, zero_division=0),
'recall': recall_score(y_true, y_pred, zero_division=0),
'specificity': tn / (tn + fp) if (tn + fp) > 0 else 0,
'tp': int(tp),
'tn': int(tn),
'fp': int(fp),
'fn': int(fn)
}
return metrics