File size: 3,105 Bytes
fdc5d7a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
import logging
import time
import asyncio
from datetime import datetime, timezone
from typing import Dict, List, Any, Optional, Tuple
from celery import Task, shared_task
from app.core.celery_app import get_celery_app
from app.services.hf_datasets import (
    determine_impact_level_by_criteria,
    get_hf_token,
    get_dataset_size,
    refresh_datasets_cache,
    fetch_and_cache_all_datasets,
)
from app.services.redis_client import sync_cache_set, sync_cache_get, generate_cache_key
from app.core.config import settings
import requests
import os

# Configure logging
logger = logging.getLogger(__name__)

# Get Celery app instance
celery_app = get_celery_app()

# Constants
DATASET_CACHE_TTL = 60 * 60 * 24 * 30  # 30 days
BATCH_PROGRESS_CACHE_TTL = 60 * 60 * 24 * 7  # 7 days for batch progress
DATASET_SIZE_CACHE_TTL = 60 * 60 * 24 * 30  # 30 days for size info

@celery_app.task(name="app.tasks.dataset_tasks.refresh_hf_datasets_cache")
def refresh_hf_datasets_cache():
    """Celery task to refresh the HuggingFace datasets cache in Redis."""
    logger.info("Starting refresh of HuggingFace datasets cache via Celery task.")
    try:
        refresh_datasets_cache()
        logger.info("Successfully refreshed HuggingFace datasets cache.")
        return {"status": "success"}
    except Exception as e:
        logger.error(f"Failed to refresh HuggingFace datasets cache: {e}")
        return {"status": "error", "error": str(e)}

@shared_task(bind=True, max_retries=3, default_retry_delay=10)
def fetch_datasets_page(self, offset, limit):
    """
    Celery task to fetch and cache a single page of datasets from Hugging Face.
    Retries on failure.
    """
    logger.info(f"[fetch_datasets_page] ENTRY: offset={offset}, limit={limit}")
    try:
        from app.services.hf_datasets import process_datasets_page
        logger.info(f"[fetch_datasets_page] Calling process_datasets_page with offset={offset}, limit={limit}")
        result = process_datasets_page(offset, limit)
        logger.info(f"[fetch_datasets_page] SUCCESS: offset={offset}, limit={limit}, result={result}")
        return result
    except Exception as exc:
        logger.error(f"[fetch_datasets_page] ERROR: offset={offset}, limit={limit}, exc={exc}", exc_info=True)
        raise self.retry(exc=exc)

@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def refresh_hf_datasets_full_cache(self):
    logger.info("[refresh_hf_datasets_full_cache] Starting full Hugging Face datasets cache refresh.")
    try:
        token = os.environ.get("HUGGINGFACEHUB_API_TOKEN")
        if not token:
            logger.error("[refresh_hf_datasets_full_cache] HUGGINGFACEHUB_API_TOKEN not set.")
            return {"status": "error", "error": "HUGGINGFACEHUB_API_TOKEN not set"}
        count = fetch_and_cache_all_datasets(token)
        logger.info(f"[refresh_hf_datasets_full_cache] Cached {count} datasets.")
        return {"status": "ok", "cached": count}
    except Exception as exc:
        logger.error(f"[refresh_hf_datasets_full_cache] ERROR: {exc}", exc_info=True)
        raise self.retry(exc=exc)