Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Der @dp.foreach_batch_sink() Dekorateur definiert eine ForEachBatch-Spüle, die einen Datenstrom als Eine Reihe von Mikrobatches verarbeitet, die Sie in Python mit benutzerdefinierter Logik verarbeiten. Sie verweisen auf die Spüle als targetAnfügefluss , um die transformierten Daten zu schreiben. Konzeptionelle Anleitungen, Überlegungen und Beispiele finden Sie unter Verwenden von ForEachBatch zum Schreiben in beliebige Datensenken in Pipelines.
Syntax
from pyspark import pipelines as dp
@dp.foreach_batch_sink(name="<name>")
def batch_handler(df, batch_id):
"""
Required:
- `df`: a Spark DataFrame representing the rows of this micro-batch.
- `batch_id`: unique integer ID for each micro-batch in the query.
"""
# Your custom write or transformation logic here
# Example:
# df.write.format("some-target-system").save("...")
#
# To access the sparkSession inside the batch handler, use df.sparkSession.
Parameter
| Parameter | Beschreibung |
|---|---|
| name | Optional. Ein eindeutiger Name zum Identifizieren der Spüle innerhalb der Pipeline. Der Standardwert ist der Name der UDF, wenn dieser nicht angegeben ist. |
| batch_handler | Dies ist die benutzerdefinierte Funktion (UDF), die für jeden Mikrobatch aufgerufen wird. |
| Df | Spark DataFrame mit Daten für den aktuellen Mikrobatch. |
| batch_id | Die ganzzahlige ID des Mikrobatches. Spark erhöht diese ID für jedes Triggerintervall. Ein batch_id von 0 stellt den Beginn eines Streams oder den Anfang eines vollständigen Aktualisierungsprozesses dar. Der foreach_batch_sink Code sollte eine vollständige Aktualisierung für nachgeschaltete Datenquellen ordnungsgemäß verarbeiten. Weitere Informationen finden Sie unter "Vollständige Aktualisierung". |