Caricare i dati nel runtime di intelligenza artificiale

Importante

Il runtime di intelligenza artificiale per le attività a nodo singolo è disponibile in anteprima pubblica. L'API di training distribuita per i carichi di lavoro con più GPU rimane in beta.

Questa sezione illustra le informazioni sul caricamento dei dati nel runtime di intelligenza artificiale in modo specifico per le applicazioni ML e DL. Vedere l'esercitazione per altre informazioni su come caricare e trasformare i dati usando l'API Python Spark.

Annotazioni

Il catalogo unity è obbligatorio. Tutti gli accessi ai dati nel runtime di intelligenza artificiale passano attraverso Unity Catalog. Le tabelle e i volumi devono essere registrati nel Catalogo Unity e accessibili all'utente o al principale del servizio.

Caricare dati tabulari

Usare Spark Connect per caricare dati di Machine Learning tabulari da tabelle Delta.

Per il training a nodo singolo, è possibile convertire i DataFrame Apache Spark in DataFrame pandas utilizzando il metodo PySparktoPandas(), e quindi, facoltativamente, convertire in formato NumPy utilizzando il metodo PySparkto_numpy().

Annotazioni

Spark Connect rinvia l'analisi e la risoluzione dei nomi al tempo di esecuzione, che può modificare il comportamento del codice. Vedi Confronta Spark Connect con Spark Classic.

Spark Connect supporta la maggior parte delle API PySpark, tra cui Spark SQL, API Pandas in Spark, Structured Streaming e MLlib (basata su dataframe). Vedere la documentazione di riferimento dell'API PySpark per le API supportate più recenti.

Per altre limitazioni, vedere Limitazioni di calcolo serverless.

Caricare tabelle Delta di grandi dimensioni usando volumi

Per tabelle Delta di grandi dimensioni troppo grandi per la conversione con toPandas(), esportare i dati in un volume di Catalogo Unity e caricarli direttamente usando PyTorch o Hugging Face:

# Step 1: Export the Delta table to Parquet files in a UC volume
output_path = "/Volumes/catalog/schema/my_volume/training_data"
spark.table("catalog.schema.my_table").write.mode("overwrite").parquet(output_path)
# Step 2: Load the exported data directly using Hugging Face datasets
from datasets import load_dataset

dataset = load_dataset("parquet", data_files="/Volumes/catalog/schema/my_volume/training_data/*.parquet")

Questo approccio evita il sovraccarico di Spark durante il training e funziona bene sia per i flussi di lavoro di training a GPU singola che per i flussi di lavoro di training distribuiti.

Caricare dati non strutturati da volumi con UCVolumeDataset

Per i dati non strutturati, ad esempio immagini, audio e file di testo archiviati nei volumi del catalogo Unity, usare UCVolumeDataset dal serverless_gpu.data pacchetto. UCVolumeDataset è un PyTorch IterableDataset che copia ogni file dal volume a una cache locale veloce al primo accesso e restituisce il percorso del file locale memorizzato nella cache. Gestisce le prestazioni e gli aspetti legati alla distribuzione che altrimenti dovresti implementare manualmente:

  • Memorizzazione nella cache locale. I file vengono copiati dal punto di montaggio FUSE in una directory di cache locale al primo accesso e vengono poi letti dalla cache, quindi l'addestramento su più epoche non richiede una nuova lettura del volume.
  • Partizionamento automatico. Quando torch.distributed viene inizializzato, i file vengono partizionati tra i ranghi e quindi suddivisi ulteriormente tra DataLoader i ruoli di lavoro, quindi ogni (rank, worker) coppia riceve una sezione non sovrapposta senza alcuna configurazione aggiuntiva.

Annotazioni

UCVolumeDataset e serverless_gpu.data.DataLoader richiedono l'ambiente GPU 5 o versione successiva.

UCVolumeDataset restituisce percorsi di file locali non elaborati. Per decodificare questi file in tensori, racchiuderli in un secondo IterableDataset che utilizza il flusso di percorsi e applica la logica di parsing. In questo modo, le problematiche di I/O e analisi sono separate.

from serverless_gpu.data import UCVolumeDataset
from torch.utils.data import IterableDataset
from PIL import Image
import torchvision.transforms.functional as TF

class ImageDataset(IterableDataset):
    """Decodes each cached file path from UCVolumeDataset into a tensor."""

    def __init__(self, path_dataset: UCVolumeDataset):
        self._path_dataset = path_dataset

    def __iter__(self):
        for local_path in self._path_dataset:
            image = Image.open(local_path).convert("RGB")
            yield TF.to_tensor(image)

path_dataset = UCVolumeDataset("/Volumes/catalog/schema/my_volume/images")
dataset = ImageDataset(path_dataset)

Il wrapper riceve percorsi locali già memorizzati nella cache, quindi il passaggio di analisi non tocca mai il montaggio FUSE. È possibile concatenare wrapper aggiuntivi per l'aumento, la tokenizzazione o il filtro.

Per ottenere prestazioni ottimali, abbina UCVolumeDataset a serverless_gpu.data.DataLoader anziché a DataLoader di PyTorch standard. È ottimizzato per l'I/O della GPU serverless e recupera e memorizza nella cache i file contemporaneamente mentre la GPU calcola. Vedere Prestazioni di caricamento dei dati.

Caricare i dati all'interno del decoratore @distributed

Quando si utilizza l'API GPU serverless per l'addestramento distribuito, spostare il codice di caricamento dei dati all'interno del decoratore @distributed. Le dimensioni del dataset possono superare la dimensione massima consentita da pickle, pertanto è consigliabile generare il dataset all'interno del decoratore, come illustrato di seguito:

from serverless_gpu import distributed

# This may cause a pickle error if the dataset is too large
dataset = get_dataset(file_path)

@distributed(gpus=8, gpu_type='H100')
def run_train():
    # Load data inside the decorator to avoid pickle serialization issues
    dataset = get_dataset(file_path)
    ...

Quando si crea un UCVolumeDataset all'interno del decoratore, questo legge le informazioni sui rank torch.distributed al momento dell'iterazione e suddivide automaticamente i file tra i rank, quindi non è necessario un DistributedSampler per i dati di volume basati su file.

Prestazioni di caricamento dei dati

/Workspace e /Volumes le directory sono ospitate nell'archiviazione remota del catalogo Unity. Se il set di dati viene archiviato in Unity Catalog, la velocità di caricamento dei dati è limitata dalla larghezza di banda di rete disponibile. Se si stanno eseguendo più epoche di training, l'approccio consigliato consiste nell'utilizzare UCVolumeDataset, che esegue automaticamente questa memorizzazione nella cache: copia ogni file nell'archiviazione locale al primo accesso e fornisce le letture successive dalla copia locale. Per i set di dati archiviati nei volumi, è preferibile usarlo rispetto a una copia manuale shutil.copytree, che copia fin da subito l'intero albero anche se l'addestramento ne utilizza solo una parte.

Se il set di dati è di grandi dimensioni, le tecniche seguenti possono migliorare la velocità effettiva:

  • Usare serverless_gpu.data.DataLoader per parallelizzare il recupero. Si tratta di una sottoclasse drop-in della torcia DataLoader ottimizzata per l'I/O GPU serverless: num_workers il valore predefinito è 6 e prefetch_factor 4 (rispetto a 0 e 2 di PyTorch), quindi i file vengono recuperati e memorizzati nella cache contemporaneamente mentre la GPU viene calcolata. Registra inoltre i tempi di caricamento per batch nella run MLflow attiva, il che aiuta a individuare i colli di bottiglia nel caricamento dei dati.

    from serverless_gpu.data import DataLoader
    
    loader = DataLoader(
        dataset,
        batch_size=32,
        pin_memory=True,
        # num_workers=6, by default
        # prefetch_factor=4, by default
        # raise num_workers to increase parallel reads, or prefetch_factor to deepen each worker's queue.
    )
    

    Tutti i ranghi devono usare lo stesso num_workers valore, perché UCVolumeDataset partiziona i file usando uno stride globale tra world_size × num_workers gli slot. I valori non corrispondenti causano la duplicazione o l'omissione dei file.

  • Aumentare la dimensione del batch. I batch più grandi ammortizzano il sovraccarico di caricamento dei dati per batch su più campioni e riducono il numero di operazioni di recupero file per passaggio. Se la memoria della GPU è il fattore limitante, combina un batch size più grande con l'accumulo del gradiente per preservare la dimensione effettiva del batch.

Set di dati di streaming

Per set di dati di dimensioni molto grandi che non rientrano nella memoria, usare gli approcci di streaming: