Spaces:
Runtime error
Runtime error
File size: 3,105 Bytes
fdc5d7a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
import logging
import time
import asyncio
from datetime import datetime, timezone
from typing import Dict, List, Any, Optional, Tuple
from celery import Task, shared_task
from app.core.celery_app import get_celery_app
from app.services.hf_datasets import (
determine_impact_level_by_criteria,
get_hf_token,
get_dataset_size,
refresh_datasets_cache,
fetch_and_cache_all_datasets,
)
from app.services.redis_client import sync_cache_set, sync_cache_get, generate_cache_key
from app.core.config import settings
import requests
import os
# Configure logging
logger = logging.getLogger(__name__)
# Get Celery app instance
celery_app = get_celery_app()
# Constants
DATASET_CACHE_TTL = 60 * 60 * 24 * 30 # 30 days
BATCH_PROGRESS_CACHE_TTL = 60 * 60 * 24 * 7 # 7 days for batch progress
DATASET_SIZE_CACHE_TTL = 60 * 60 * 24 * 30 # 30 days for size info
@celery_app.task(name="app.tasks.dataset_tasks.refresh_hf_datasets_cache")
def refresh_hf_datasets_cache():
"""Celery task to refresh the HuggingFace datasets cache in Redis."""
logger.info("Starting refresh of HuggingFace datasets cache via Celery task.")
try:
refresh_datasets_cache()
logger.info("Successfully refreshed HuggingFace datasets cache.")
return {"status": "success"}
except Exception as e:
logger.error(f"Failed to refresh HuggingFace datasets cache: {e}")
return {"status": "error", "error": str(e)}
@shared_task(bind=True, max_retries=3, default_retry_delay=10)
def fetch_datasets_page(self, offset, limit):
"""
Celery task to fetch and cache a single page of datasets from Hugging Face.
Retries on failure.
"""
logger.info(f"[fetch_datasets_page] ENTRY: offset={offset}, limit={limit}")
try:
from app.services.hf_datasets import process_datasets_page
logger.info(f"[fetch_datasets_page] Calling process_datasets_page with offset={offset}, limit={limit}")
result = process_datasets_page(offset, limit)
logger.info(f"[fetch_datasets_page] SUCCESS: offset={offset}, limit={limit}, result={result}")
return result
except Exception as exc:
logger.error(f"[fetch_datasets_page] ERROR: offset={offset}, limit={limit}, exc={exc}", exc_info=True)
raise self.retry(exc=exc)
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def refresh_hf_datasets_full_cache(self):
logger.info("[refresh_hf_datasets_full_cache] Starting full Hugging Face datasets cache refresh.")
try:
token = os.environ.get("HUGGINGFACEHUB_API_TOKEN")
if not token:
logger.error("[refresh_hf_datasets_full_cache] HUGGINGFACEHUB_API_TOKEN not set.")
return {"status": "error", "error": "HUGGINGFACEHUB_API_TOKEN not set"}
count = fetch_and_cache_all_datasets(token)
logger.info(f"[refresh_hf_datasets_full_cache] Cached {count} datasets.")
return {"status": "ok", "cached": count}
except Exception as exc:
logger.error(f"[refresh_hf_datasets_full_cache] ERROR: {exc}", exc_info=True)
raise self.retry(exc=exc) |