Spaces:
Runtime error
Runtime error
File size: 5,758 Bytes
55535c7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
"""
出力ディレクトリの古いファイルをクリーンアップする機能
"""
import os
import time
from pathlib import Path
from datetime import datetime, timedelta
import threading
import logging
# ロギング設定
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class FileCleanupManager:
"""定期的に古いファイルを削除するマネージャー"""
def __init__(self, output_dir: Path, max_age_hours: float = 24, check_interval_minutes: int = 60):
"""
Args:
output_dir: 監視対象のディレクトリ
max_age_hours: ファイルを保持する最大時間(時間)
check_interval_minutes: チェック間隔(分)
"""
self.output_dir = output_dir
self.max_age_seconds = max_age_hours * 3600
self.check_interval = check_interval_minutes * 60
self._running = False
self._thread = None
def start(self):
"""クリーンアップ処理を開始"""
if self._running:
logger.warning("Cleanup manager is already running")
return
self._running = True
self._thread = threading.Thread(target=self._cleanup_loop, daemon=True)
self._thread.start()
logger.info(f"Started cleanup manager for {self.output_dir}")
def stop(self):
"""クリーンアップ処理を停止"""
self._running = False
if self._thread:
self._thread.join()
logger.info("Stopped cleanup manager")
def _cleanup_loop(self):
"""定期的にクリーンアップを実行"""
while self._running:
try:
self.cleanup_now()
except Exception as e:
logger.error(f"Error during cleanup: {e}")
# 次のチェックまで待機
time.sleep(self.check_interval)
def cleanup_now(self):
"""今すぐクリーンアップを実行"""
if not self.output_dir.exists():
logger.warning(f"Output directory does not exist: {self.output_dir}")
return
current_time = time.time()
deleted_count = 0
deleted_size = 0
# MP4ファイルをチェック
for file_path in self.output_dir.glob("*.mp4"):
try:
# ファイルの最終変更時刻を取得
file_age = current_time - file_path.stat().st_mtime
if file_age > self.max_age_seconds:
file_size = file_path.stat().st_size
file_path.unlink()
deleted_count += 1
deleted_size += file_size
logger.debug(f"Deleted old file: {file_path.name} (age: {file_age/3600:.1f} hours)")
except Exception as e:
logger.error(f"Failed to delete {file_path}: {e}")
if deleted_count > 0:
logger.info(f"Cleanup completed: deleted {deleted_count} files, freed {deleted_size/1024/1024:.1f} MB")
def get_directory_stats(self):
"""ディレクトリの統計情報を取得"""
if not self.output_dir.exists():
return {
"total_files": 0,
"total_size_mb": 0,
"oldest_file_hours": None
}
files = list(self.output_dir.glob("*.mp4"))
if not files:
return {
"total_files": 0,
"total_size_mb": 0,
"oldest_file_hours": None
}
total_size = sum(f.stat().st_size for f in files)
current_time = time.time()
oldest_age = max((current_time - f.stat().st_mtime) for f in files)
return {
"total_files": len(files),
"total_size_mb": total_size / 1024 / 1024,
"oldest_file_hours": oldest_age / 3600
}
# グローバルインスタンス(アプリケーションで使用)
cleanup_manager = None
def initialize_cleanup(output_dir: Path, max_age_hours: float = 24):
"""クリーンアップマネージャーを初期化して開始"""
global cleanup_manager
if cleanup_manager is not None:
cleanup_manager.stop()
cleanup_manager = FileCleanupManager(output_dir, max_age_hours)
cleanup_manager.start()
return cleanup_manager
def get_cleanup_status():
"""クリーンアップの状態を取得"""
if cleanup_manager is None:
return "Cleanup manager not initialized"
stats = cleanup_manager.get_directory_stats()
return f"""
クリーンアップ状態:
- ファイル数: {stats['total_files']}
- 合計サイズ: {stats['total_size_mb']:.1f} MB
- 最古のファイル: {stats['oldest_file_hours']:.1f} 時間前 (最古のファイルがある場合)
"""
if __name__ == "__main__":
# テスト実行
output_dir = Path("./output")
output_dir.mkdir(exist_ok=True)
# テスト用のダミーファイルを作成
test_file = output_dir / "test.mp4"
test_file.write_text("dummy content")
# 古いファイルをシミュレート(2日前)
old_time = time.time() - (48 * 3600)
os.utime(test_file, (old_time, old_time))
# クリーンアップマネージャーを初期化
manager = FileCleanupManager(output_dir, max_age_hours=24)
# 即座にクリーンアップを実行
print("Before cleanup:", manager.get_directory_stats())
manager.cleanup_now()
print("After cleanup:", manager.get_directory_stats()) |