|
import multiprocessing as mp |
|
from datasets import load_dataset, DownloadConfig |
|
import backoff |
|
import os |
|
from pathlib import Path |
|
import numpy as np |
|
import tiktoken |
|
|
|
|
|
def process_data(item): |
|
""" |
|
Process a single dataset item. |
|
Replace this with your actual processing logic (e.g., tokenization). |
|
""" |
|
|
|
encoder = tiktoken.get_encoding('gpt2') |
|
text = item.get('text', '') |
|
tokens = encoder.encode(text) |
|
return tokens |
|
|
|
@backoff.on_exception(backoff.expo, Exception, max_tries=5) |
|
def fetch_data(item): |
|
""" |
|
Wrapper for process_data with exponential backoff for retries. |
|
""" |
|
return process_data(item) |
|
|
|
def main(): |
|
""" |
|
Main function to load and process the FineWeb-Edu dataset. |
|
""" |
|
|
|
remote_name = "sample-10BT" |
|
output_dir = "./data" |
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
download_config = DownloadConfig( |
|
max_retries=5, |
|
num_proc=4, |
|
cache_dir=Path.home() / ".cache" / "huggingface" / "datasets" |
|
) |
|
|
|
try: |
|
|
|
print("Loading dataset...") |
|
dataset = load_dataset( |
|
'HuggingFaceFW/fineweb-edu', |
|
name=remote_name, |
|
split='train', |
|
download_mode="reuse_dataset_if_exists", |
|
download_config=download_config |
|
) |
|
print(f"Dataset loaded with {len(dataset)} items.") |
|
|
|
|
|
nprocs = min(mp.cpu_count(), 4) |
|
print(f"Using {nprocs} processes for multiprocessing.") |
|
|
|
|
|
with mp.Pool(nprocs) as pool: |
|
results = pool.map(fetch_data, dataset) |
|
|
|
|
|
output_path = os.path.join(output_dir, "processed_fineweb_edu.npy") |
|
np.save(output_path, results) |
|
print(f"Processed dataset saved to {output_path}") |
|
|
|
except Exception as e: |
|
print(f"Error loading or processing dataset: {e}") |
|
raise |
|
|
|
if __name__ == '__main__': |
|
mp.freeze_support() |
|
main() |