jisujang's picture
first
a005c19
import os
import shutil
from devmacs_core.devmacs_core import DevMACSCore
import json
from typing import Dict, List, Tuple
from pathlib import Path
import pandas as pd
from utils.except_dir import cust_listdir
def load_config(config_path: str) -> Dict:
"""JSON ์„ค์ • ํŒŒ์ผ์„ ์ฝ์–ด์„œ ๋”•์…”๋„ˆ๋ฆฌ๋กœ ๋ฐ˜ํ™˜"""
with open(config_path, 'r', encoding='utf-8') as f:
return json.load(f)
DATA_SET = "dataset"
CFG = "CFG"
VECTOR = "vector"
TEXT = "text"
VIDEO = "video"
EXECPT = ["@eaDir", "README.md"]
ALRAM = "alarm"
METRIC = "metric"
MSRVTT = "MSRVTT"
MODEL = "models"
class PiaBenchMark:
def __init__(self, benchmark_path , cfg_target_path : str = None , model_name : str = MSRVTT , token:str =None):
self.benchmark_path = benchmark_path
self.token = token
self.model_name = model_name
self.devmacs_core = None
self.cfg_target_path = cfg_target_path
self.cfg_name = Path(cfg_target_path).stem
self.cfg_dict = load_config(self.cfg_target_path)
self.dataset_path = os.path.join(benchmark_path, DATA_SET)
self.cfg_path = os.path.join(benchmark_path , CFG)
self.model_path = os.path.join(self.benchmark_path , MODEL)
self.model_name_path = os.path.join(self.model_path ,self.model_name)
self.model_name_cfg_path = os.path.join(self.model_name_path , CFG)
self.model_name_cfg_name_path = os.path.join(self.model_name_cfg_path , self.cfg_name)
self.alram_path = os.path.join(self.model_name_cfg_name_path , ALRAM)
self.metric_path = os.path.join(self.model_name_cfg_name_path , METRIC)
self.vector_path = os.path.join(self.model_name_path , VECTOR)
self.vector_text_path = os.path.join(self.vector_path , TEXT)
self.vector_video_path = os.path.join(self.vector_path , VIDEO)
self.categories = []
def _create_frame_labels(self, label_data: Dict, total_frames: int) -> pd.DataFrame:
"""ํ”„๋ ˆ์ž„ ๊ธฐ๋ฐ˜์˜ ๋ ˆ์ด๋ธ” ๋ฐ์ดํ„ฐํ”„๋ ˆ์ž„ ์ƒ์„ฑ"""
colmuns = ['frame'] + sorted(self.categories)
df = pd.DataFrame(0, index=range(total_frames), columns=colmuns)
df['frame'] = range(total_frames)
for clip_info in label_data['clips'].values():
category = clip_info['category']
if category in self.categories: # ํ•ด๋‹น ์นดํ…Œ๊ณ ๋ฆฌ๊ฐ€ ๋ชฉ๋ก์— ์žˆ๋Š” ๊ฒฝ์šฐ๋งŒ ์ฒ˜๋ฆฌ
start_frame, end_frame = clip_info['timestamp']
df.loc[start_frame:end_frame, category] = 1
return df
def preprocess_label_to_csv(self):
"""๋ฐ์ดํ„ฐ์…‹์˜ ๋ชจ๋“  JSON ๋ผ๋ฒจ์„ ํ”„๋ ˆ์ž„ ๊ธฐ๋ฐ˜ CSV๋กœ ๋ณ€ํ™˜"""
json_files = []
csv_files = []
# categories๊ฐ€ ๋น„์–ด์žˆ๋Š” ๊ฒฝ์šฐ์—๋งŒ ์ฑ„์šฐ๋„๋ก ์ˆ˜์ •
if not self.categories:
for cate in cust_listdir(self.dataset_path):
if os.path.isdir(os.path.join(self.dataset_path, cate)):
self.categories.append(cate)
for category in self.categories:
category_path = os.path.join(self.dataset_path, category)
category_jsons = [os.path.join(category, f) for f in cust_listdir(category_path) if f.endswith('.json')]
json_files.extend(category_jsons)
category_csvs = [os.path.join(category, f) for f in cust_listdir(category_path) if f.endswith('.csv')]
csv_files.extend(category_csvs)
if not json_files:
raise ValueError("No JSON files found in any category directory")
if len(json_files) == len(csv_files):
print("All JSON files have already been processed to CSV. No further processing needed.")
return
for json_file in json_files:
json_path = os.path.join(self.dataset_path, json_file)
video_name = os.path.splitext(json_file)[0]
label_info = load_config(json_path)
video_info = label_info['video_info']
total_frames = video_info['total_frame']
df = self._create_frame_labels( label_info, total_frames)
output_path = os.path.join(self.dataset_path, f"{video_name}.csv")
df.to_csv(output_path , index=False)
print("Complete !")
def preprocess_structure(self):
os.makedirs(self.dataset_path, exist_ok=True)
os.makedirs(self.cfg_path, exist_ok=True)
os.makedirs(self.vector_text_path, exist_ok=True)
os.makedirs(self.vector_video_path, exist_ok=True)
os.makedirs(self.alram_path, exist_ok=True)
os.makedirs(self.metric_path, exist_ok=True)
os.makedirs(self.model_name_cfg_name_path , exist_ok=True)
# dataset ํด๋”๊ฐ€ ์ด๋ฏธ ์กด์žฌํ•˜๊ณ  ๊ทธ ์•ˆ์— ์นดํ…Œ๊ณ ๋ฆฌ ํด๋”๋“ค์ด ์žˆ๋Š”์ง€ ํ™•์ธ
if os.path.exists(self.dataset_path) and any(os.path.isdir(os.path.join(self.dataset_path, d)) for d in cust_listdir(self.dataset_path)):
# ์ด๋ฏธ ๊ตฌ์„ฑ๋œ ๊ตฌ์กฐ๋ผ๋ฉด, dataset ํด๋”์—์„œ ์นดํ…Œ๊ณ ๋ฆฌ๋“ค์„ ๊ฐ€์ ธ์˜ด
self.categories = [d for d in cust_listdir(self.dataset_path) if os.path.isdir(os.path.join(self.dataset_path, d))]
else:
# ์ฒ˜์Œ ์‹คํ–‰๋˜๋Š” ๊ฒฝ์šฐ, ๊ธฐ์กด ๋กœ์ง๋Œ€๋กœ ์ง„ํ–‰
for item in cust_listdir(self.benchmark_path):
item_path = os.path.join(self.benchmark_path, item)
if item.startswith("@") or item in [METRIC ,"README.md",MODEL, CFG, DATA_SET, VECTOR, ALRAM] or not os.path.isdir(item_path):
continue
target_path = os.path.join(self.dataset_path, item)
if not os.path.exists(target_path):
shutil.move(item_path, target_path)
self.categories.append(item)
for category in self.categories:
category_path = os.path.join(self.vector_video_path, category)
os.makedirs(category_path, exist_ok=True)
print("Folder preprocessing completed.")
def extract_visual_vector(self):
self.devmacs_core = DevMACSCore.from_huggingface(token=self.token, repo_id=f"PIA-SPACE-LAB/{self.model_name}")
self.devmacs_core.save_visual_results(
vid_dir = self.dataset_path,
result_dir = self.vector_video_path
)
if __name__ == "__main__":
from dotenv import load_dotenv
import os
load_dotenv()
access_token = os.getenv("ACCESS_TOKEN")
model_name = "T2V_CLIP4CLIP_MSRVTT"
benchmark_path = "/home/jungseoik/data/Abnormal_situation_leader_board/assets/PIA"
cfg_target_path= "/home/jungseoik/data/Abnormal_situation_leader_board/assets/PIA/CFG/topk.json"
pia_benchmark = PiaBenchMark(benchmark_path ,model_name=model_name, cfg_target_path= cfg_target_path , token=access_token )
pia_benchmark.preprocess_structure()
pia_benchmark.preprocess_label_to_csv()
print("Categories identified:", pia_benchmark.categories)