Spaces:
Runtime error
Runtime error
File size: 5,758 Bytes
55535c7 |
|
"""
出力ディレクトリの古いファイルをクリーンアップする機能
"""
import os
import time
from pathlib import Path
from datetime import datetime, timedelta
import threading
import logging
# ロギング設定
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class FileCleanupManager:
"""定期的に古いファイルを削除するマネージャー"""
def __init__(self, output_dir: Path, max_age_hours: float = 24, check_interval_minutes: int = 60):
"""
Args:
output_dir: 監視対象のディレクトリ
max_age_hours: ファイルを保持する最大時間(時間)
check_interval_minutes: チェック間隔(分)
"""
self.output_dir = output_dir
self.max_age_seconds = max_age_hours * 3600
self.check_interval = check_interval_minutes * 60
self._running = False
self._thread = None
def start(self):
"""クリーンアップ処理を開始"""
if self._running:
logger.warning("Cleanup manager is already running")
return
self._running = True
self._thread = threading.Thread(target=self._cleanup_loop, daemon=True)
self._thread.start()
logger.info(f"Started cleanup manager for {self.output_dir}")
def stop(self):
"""クリーンアップ処理を停止"""
self._running = False
if self._thread:
self._thread.join()
logger.info("Stopped cleanup manager")
def _cleanup_loop(self):
"""定期的にクリーンアップを実行"""
while self._running:
try:
self.cleanup_now()
except Exception as e:
logger.error(f"Error during cleanup: {e}")
# 次のチェックまで待機
time.sleep(self.check_interval)
def cleanup_now(self):
"""今すぐクリーンアップを実行"""
if not self.output_dir.exists():
logger.warning(f"Output directory does not exist: {self.output_dir}")
return
current_time = time.time()
deleted_count = 0
deleted_size = 0
# MP4ファイルをチェック
for file_path in self.output_dir.glob("*.mp4"):
try:
# ファイルの最終変更時刻を取得
file_age = current_time - file_path.stat().st_mtime
if file_age > self.max_age_seconds:
file_size = file_path.stat().st_size
file_path.unlink()
deleted_count += 1
deleted_size += file_size
logger.debug(f"Deleted old file: {file_path.name} (age: {file_age/3600:.1f} hours)")
except Exception as e:
logger.error(f"Failed to delete {file_path}: {e}")
if deleted_count > 0:
logger.info(f"Cleanup completed: deleted {deleted_count} files, freed {deleted_size/1024/1024:.1f} MB")
def get_directory_stats(self):
"""ディレクトリの統計情報を取得"""
if not self.output_dir.exists():
return {
"total_files": 0,
"total_size_mb": 0,
"oldest_file_hours": None
}
files = list(self.output_dir.glob("*.mp4"))
if not files:
return {
"total_files": 0,
"total_size_mb": 0,
"oldest_file_hours": None
}
total_size = sum(f.stat().st_size for f in files)
current_time = time.time()
oldest_age = max((current_time - f.stat().st_mtime) for f in files)
return {
"total_files": len(files),
"total_size_mb": total_size / 1024 / 1024,
"oldest_file_hours": oldest_age / 3600
}
# グローバルインスタンス(アプリケーションで使用)
cleanup_manager = None
def initialize_cleanup(output_dir: Path, max_age_hours: float = 24):
"""クリーンアップマネージャーを初期化して開始"""
global cleanup_manager
if cleanup_manager is not None:
cleanup_manager.stop()
cleanup_manager = FileCleanupManager(output_dir, max_age_hours)
cleanup_manager.start()
return cleanup_manager
def get_cleanup_status():
"""クリーンアップの状態を取得"""
if cleanup_manager is None:
return "Cleanup manager not initialized"
stats = cleanup_manager.get_directory_stats()
return f"""
クリーンアップ状態:
- ファイル数: {stats['total_files']}
- 合計サイズ: {stats['total_size_mb']:.1f} MB
- 最古のファイル: {stats['oldest_file_hours']:.1f} 時間前 (最古のファイルがある場合)
"""
if __name__ == "__main__":
# テスト実行
output_dir = Path("./output")
output_dir.mkdir(exist_ok=True)
# テスト用のダミーファイルを作成
test_file = output_dir / "test.mp4"
test_file.write_text("dummy content")
# 古いファイルをシミュレート(2日前)
old_time = time.time() - (48 * 3600)
os.utime(test_file, (old_time, old_time))
# クリーンアップマネージャーを初期化
manager = FileCleanupManager(output_dir, max_age_hours=24)
# 即座にクリーンアップを実行
print("Before cleanup:", manager.get_directory_stats())
manager.cleanup_now()
print("After cleanup:", manager.get_directory_stats()) |