foreach_batch_sink

L'elemento @dp.foreach_batch_sink() decorator definisce un sink ForEachBatch, che elabora un flusso come una serie di micro batch gestiti in Python con logica personalizzata. Si fa riferimento al sink come in un flusso di targetaccodamento per scrivere i dati trasformati. Per indicazioni concettuali, considerazioni ed esempi, vedere Usare ForEachBatch per scrivere in sink di dati arbitrari nelle pipeline.

Syntax

from pyspark import pipelines as dp

@dp.foreach_batch_sink(name="<name>")
def batch_handler(df, batch_id):
    """
    Required:
      - `df`: a Spark DataFrame representing the rows of this micro-batch.
      - `batch_id`: unique integer ID for each micro-batch in the query.
    """
    # Your custom write or transformation logic here
    # Example:
    # df.write.format("some-target-system").save("...")
    #
    # To access the sparkSession inside the batch handler, use df.sparkSession.

Parametri

Parametro Descrizione
name Facoltativo. Nome univoco per identificare il sink all'interno della pipeline. L'impostazione predefinita è il nome della funzione definita dall'utente, se non incluso.
batch_handler Si tratta della funzione definita dall'utente chiamata per ogni micro batch.
Df DataFrame Spark contenente i dati per il micro batch corrente.
batch_id ID intero del micro batch. Spark incrementa questo ID per ogni intervallo di trigger.
Un batch_id di 0 rappresenta l'inizio di un flusso o l'inizio di un aggiornamento completo. Il foreach_batch_sink codice deve gestire correttamente un aggiornamento completo per le origini dati downstream. Per altre informazioni, vedere Aggiornamento completo.