Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
È possibile usare transformWithState per sviluppare applicazioni di streaming stateful e per implementare soluzioni a bassa latenza e quasi in tempo reale. Con gli operatori con stato personalizzati, è possibile creare logica arbitraria con stato che consente di creare nuovi casi d'uso operativi che non sono possibili con l'elaborazione tradizionale di Structured Streaming.
Nota
Per le operazioni con stato, ad esempio aggregazioni, deduplicazione e join su flussi di dati, Databricks consiglia di usare gli operatori predefiniti di Structured Streaming anziché logiche personalizzate. Consulta Che cos'è lo streaming con stato?.
Databricks consiglia di usare transformWithState anziché operatori legacy, ad esempio flatMapGroupsWithState e mapGroupsWithState, per trasformazioni arbitrarie dello stato. Consultare operatori legacy arbitrari con stato.
Requisiti
Gli operatori transformWithState e transformWithStateInPandas hanno i seguenti requisiti:
- Disponibile in Databricks Runtime 16.2 e versioni successive.
- Per la modalità in tempo reale, usare Databricks Runtime 17.3 LTS o versione successiva. Consultare la Modalità in tempo reale nel Structured Streaming.
- Per la modalità di accesso standard, Python è disponibile in Databricks Runtime 16.3 e versioni successive e Scala è disponibile in Databricks Runtime 17.3 e versioni successive.
- RocksDB è il provider dell'archivio stati predefinito in Databricks Runtime 17.3 e versioni successive.
Per Databricks Runtime 17.2 e versioni successive, è necessario configurare il provider dell'archivio stati RocksDB. Databricks consiglia di abilitare RocksDB nella configurazione di Spark.
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
Che cos'è transformWithState?
L'operatore transformWithState applica un processore con stato personalizzato a una query Structured Streaming. È necessario implementare un processore con stato personalizzato per usare transformWithState. Structured Streaming include API per la compilazione del processore con stato usando Python, Scala o Java.
Usare transformWithState per applicare la logica personalizzata a una chiave di raggruppamento. Di seguito viene descritta la progettazione generale:
- Definire una o più variabili di stato.
- Le informazioni sullo stato vengono mantenute per ogni chiave di raggruppamento. È possibile accedere a ogni variabile di stato nel codice definito dall'utente.
- Per ogni micro-batch elaborato, tutte le righe associate alla chiave sono disponibili sotto forma di iteratore.
- Usare
StatefulProcessorHandlecon timer e condizioni definite dall'utente per controllare come emettere righe. - Per gestire la scadenza dello stato e la dimensione dello stato, i valori dello stato supportano definizioni individuali del time-to-live (TTL).
Poiché transformWithState supporta l'evoluzione dello schema nell'archivio stati, è possibile scorrere e aggiornare le applicazioni di produzione senza perdere informazioni sullo stato cronologico. Dopo aver aggiornato lo schema dello stato, non è necessario rielaborare le righe, semplificando le distribuzioni e la manutenzione del codice. Vedere Evoluzione dello schema nell'archivio stati.
Importante
Azure Databricks documentazione usa transformWithState per descrivere le implementazioni di Python e Scala:
- PySpark supporta sia l'API basata su
transformWithStaterighe che l'operatore basato sutransformWithStateInPandasPandas.-
transformWithStateInPandasnon è supportato in modalità in tempo reale. Usare invecetransformWithState. Per informazioni dettagliate, vederetransformWithStatein modalità in tempo reale.
-
- Scala supporta solo l'API basata su
transformWithStaterighe.
Le implementazioni di Scala e Python di transformWithState hanno le stesse funzionalità, ma con alcune differenze nella sintassi.
Definizione di un oggetto StatefulProcessor
Per definire un processore con stato, estendere la StatefulProcessor classe e implementarne i metodi.
Spark passa un StatefulProcessorHandle al metodo init del tuo StatefulProcessor. Usa l'handle per creare variabili di stato e interagire con lo store dello stato.
transformWithState supporta tre tipi di stato: ValueState, ListStatee MapState. Ogni tipo archivia lo stato per ogni chiave di raggruppamento usando una struttura di dati sottostante diversa.
Implementare i metodi seguenti per definire la logica personalizzata:
- Implementare
handleInputRowsper controllare il modo in cui l'applicazione elabora i dati, aggiorna lo stato e genera righe per ogni micro-batch. Vedere Gestire le righe di input. - Implementare
handleExpiredTimerper eseguire la logica basata sul tempo indipendentemente dal fatto che la chiave di raggruppamento riceva nuove righe in un micro batch. Vedere Gestire i timer scaduti. - Facoltativamente, implementare
handleInitialStateper precompilare lo stato prima che l'applicazione elabori le righe di input. Vedere Gestire lo stato iniziale.
Nella tabella seguente vengono confrontati i comportamenti funzionali di questi metodi:
| Comportamento | handleInputRows |
handleExpiredTimer |
|---|---|---|
| Ottenere, inserire, aggiornare o cancellare i valori di stato | Sì | Sì |
| Creare o eliminare un timer | Sì | Sì |
| Emetti righe | Sì | Sì |
| Eseguire l'iterazione delle righe nel micro batch corrente | Sì | NO |
| Logica di attivazione in base al tempo trascorso | NO | Sì |
È possibile combinare sia handleInputRows che handleExpiredTimer per implementare logica complessa in base alle esigenze.
Ad esempio, è possibile implementare un'applicazione che usa handleInputRows per aggiornare i valori di stato per ogni micro batch e impostare un timer di 10 secondi in futuro. Se non vengono elaborate righe aggiuntive, è possibile usare handleExpiredTimer per emettere i valori correnti nell'archivio di stato. Se vengono elaborate nuove righe per la chiave di raggruppamento, è possibile cancellare il timer esistente e impostare un nuovo timer.
StatefulProcessorHandle
In PySpark la StatefulProcessorHandle classe consente di accedere alle funzioni che controllano il modo in cui il codice usa le informazioni sullo stato.
Quando si inizializza un oggetto StatefulProcessor, si deve sempre importare e passare il StatefulProcessorHandle alla variabile handle. La variabile handle collega la variabile locale nella classe Python alla variabile di stato.
Nota
Scala usa il metodo getHandle.
Tipi di stato personalizzati
È possibile implementare più oggetti di stato in un singolo operatore con stato.
Scegliere un tipo di stato in base alla logica dell'applicazione completa. Ad esempio, è possibile tenere traccia delle sessioni utilizzando un ValueState, raggruppate per user_id e session_id. In alternativa, per valutare le condizioni su più sessioni, utilizza un MapState raggruppato per user_id con session_id come chiave di mappa.
Se l'oggetto di stato usa un StructType, è necessario definire nomi univoci per ogni campo nello struct per lo schema. Questi nomi sono visibili durante la lettura dell'archivio di stato. Consulta le informazioni sullo stato dello Streaming Strutturato.
Le sezioni seguenti descrivono i tipi di stato supportati da transformWithState:
ValueState
ValueState archivia un valore per ogni chiave di raggruppamento.
Uno stato valore può includere tipi complessi, ad esempio un struct o una tupla. Per ValueState, è necessario implementare la logica per sostituire l'intero valore.
Il tempo di validità di uno stato del valore viene reimpostato quando il valore viene aggiornato. Se elabori una chiave sorgente per ValueState senza aggiornare il ValueState memorizzato, il time-to-live non viene reimpostato.
ListState
ListState archivia un elenco per ogni chiave di raggruppamento.
Uno stato elenco è una raccolta di valori, ognuno dei quali può includere tipi complessi. Ogni valore di un elenco ha una durata specifica.
È possibile aggiungere elementi a un elenco aggiungendo singoli elementi, aggiungendo un elenco di elementi o sovrascrivendo l'intero elenco con un put. Per reimpostare il time-to-live, è necessario utilizzare un'operazione put.
MapState
MapState archivia una mappa per ogni chiave di raggruppamento. Le mappe sono l'equivalente di Apache Spark a un dizionario Python (dict).
Uno stato della mappa è una raccolta di chiavi distinte mappate a un valore, ognuna delle quali può includere tipi complessi. Ogni coppia chiave-valore in una mappa ha una propria durata.
È possibile aggiornare il valore di una chiave specifica oppure rimuovere una chiave e il relativo valore. È possibile restituire un singolo valore usando la chiave, elencare tutte le chiavi, elencare tutti i valori o restituire un iteratore per lavorare con il set completo di coppie chiave-valore nella mappa.
Importante
Le chiavi di raggruppamento descrivono i campi specificati nella clausola GROUP BY della query Structured Streaming. Gli stati della mappa possono contenere un numero arbitrario di coppie chiave-valore per una chiave di raggruppamento.
Ad esempio, se la query usa GROUP BY user_id e si vuole definire una mappa per ogni session_id, la chiave di raggruppamento è user_id e la MapState chiave è session_id:
Python
class SessionTracker(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
self.sessions = handle.getMapState("sessions", StringType(), LongType())
def handleInputRows(self, key, rows: Iterator[Row], timerValues) -> Iterator[Row]:
for row in rows:
session_id = row["session_id"] # session_id is the MapState key
count = self.sessions.getValue(session_id)[0] if self.sessions.containsKey(session_id) else 0
new_count = count + 1
self.sessions.updateValue(session_id, (new_count,))
yield from []
def close(self) -> None:
pass
df.groupBy("user_id").transformWithState(SessionTracker(), ...) # user_id is the grouping key
Linguaggio di programmazione Scala
case class Event(userId: String, sessionId: String)
class SessionTracker extends StatefulProcessor[String, Event, Row] {
@transient private var sessions: MapState[String, Long] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
sessions = getHandle.getMapState[String, Long]("sessions", Encoders.STRING, Encoders.scalaLong, TTLConfig.NONE)
}
override def handleInputRows(
key: String,
rows: Iterator[Event],
timerValues: TimerValues): Iterator[Row] = {
rows.foreach { event =>
val count = if (sessions.containsKey(event.sessionId)) sessions.getValue(event.sessionId) else 0L
sessions.updateValue(event.sessionId, count + 1) // sessionId is the MapState key
}
Iterator.empty
}
}
df.as[Event]
.groupByKey(_.userId) // userId is the grouping key
.transformWithState(new SessionTracker(), TimeMode.None(), OutputMode.Update())
Creare una variabile di stato personalizzata in StatefulProcessor
Quando si inizializza il StatefulProcessor, si crea una variabile locale per ogni oggetto stato che consente di interagire con gli oggetti di stato nella logica personalizzata. Definire e inizializzare le variabili di stato eseguendo l'override del metodo predefinito init nella StatefulProcessor classe .
È possibile definire un numero qualsiasi di oggetti di stato usando i getValueStatemetodi , getListStatee getMapState in StatefulProcessor.
Ogni oggetto di stato deve avere quanto segue:
- Nome univoco
- Uno schema
- In Python è necessario specificare lo schema.
- In Scala è possibile passare un oggetto
Encoderper specificare lo schema dello stato.
Facoltativamente, è anche possibile fornire una durata TTL (Time-to-Live) in millisecondi. Se si implementa uno stato della mappa, è necessario fornire una definizione di schema separata per le chiavi della mappa e i valori.
Nota
StatefulProcessor gestisce separatamente la logica per interrogare, aggiornare ed emettere informazioni sullo stato. Vedere Usare le variabili di stato nei metodi con logica personalizzata.
Usare le variabili di stato nei metodi con logica personalizzata
Gli oggetti State dispongono di metodi per ottenere lo stato, aggiornare le informazioni sullo stato esistenti e cancellare lo stato corrente.
Ogni chiave di raggruppamento include informazioni sullo stato dedicate.
-
StatefulProcessorGenera righe in base alla logica personalizzata e allo schema di output specificato. Vedere Creare righe. - Usa il
statestorelettore per accedere ai valori nell'archivio di stato. Questo lettore è destinato ai carichi di lavoro batch e non è destinato ai carichi di lavoro a bassa latenza. Consulta le informazioni sullo stato dello Streaming Strutturato. - La logica specificata usando
handleInputRowsviene eseguita solo se le righe per la chiave sono presenti in un micro batch. Vedere Gestire le righe di input. - Usa
handleExpiredTimerper implementare la logica temporizzata che non dipende dall'osservazione delle righe per attivarsi. Vedere Gestire i timer scaduti.
Nota
Gli oggetti di stato sono isolati raggruppando le chiavi con le implicazioni seguenti:
- I valori di stato non possono essere influenzati dalle righe associate a una chiave di raggruppamento diversa.
- Non è possibile implementare una logica che dipenda dal confronto tra i valori o dall'aggiornamento dello stato tra le chiavi di raggruppamento.
È possibile confrontare i valori all'interno di una chiave di raggruppamento. Usare un MapState per implementare la logica con una seconda chiave che la tua logica personalizzata può utilizzare. Ad esempio, raggruppare in base a user_id e usare ip_address per la chiave MapState consente di tenere traccia delle sessioni utente simultanee.
Considerazioni avanzate per l'uso dello stato
Gli aggiornamenti dello stato sono tolleranti agli errori. Se un'attività si arresta in modo anomalo prima del completamento dell'elaborazione di un micro batch, il nuovo tentativo usa il valore dell'ultimo micro batch riuscito.
Per ottimizzare le prestazioni, Databricks consiglia di elaborare tutti i valori nell'iteratore per una determinata chiave e di eseguire il commit degli aggiornamenti in un'unica scrittura. Quando si scrive in una variabile di stato, viene attivata una scrittura in RocksDB.
I valori di stato non hanno valori predefiniti. Se la logica richiede la lettura delle informazioni sullo stato esistenti, usare il exists metodo .
Per implementare la logica per lo stato Null, MapState le variabili consentono di verificare la presenza di singole chiavi o elencare tutte le chiavi.
Gestire le righe di input
Utilizza il metodo handleInputRows per definire come l'applicazione elabora le righe e aggiorna i valori dello stato. Questo metodo viene eseguito ogni volta che la query Structured Streaming elabora le righe per una chiave di raggruppamento.
Per la maggior parte delle applicazioni con stato implementate con transformWithState, la logica di base viene definita usando handleInputRows.
Per ogni aggiornamento del micro-batch elaborato, tutte le righe del micro-batch per una determinata chiave di raggruppamento sono disponibili tramite un iteratore. La logica definita dall'utente può interagire con tutte le righe del microbatch e dei valori correnti nel statestore.
Gestisci i timer scaduti
Usare il handleExpiredTimer metodo per implementare la logica personalizzata in base al tempo trascorso.
All'interno di una chiave di raggruppamento, i timer vengono identificati in modo univoco dal timestamp.
Quando un timer scade, il risultato viene determinato dalla logica implementata nell'applicazione. I modelli comuni includono:
- Emissione di informazioni archiviate in una variabile di stato.
- Rimozione delle informazioni sullo stato archiviate.
- Creazione di un nuovo timer.
I timer scaduti si attivano anche se in un micro-batch non vengono elaborate righe relative alla chiave associata.
Specifica la modalità oraria
Quando si passa il proprio StatefulProcessor a transformWithState, è necessario specificare la modalità temporale utilizzando il parametro timeMode.
Sono supportate le opzioni seguenti:
| Modalità orario | Descrizione |
|---|---|
ProcessingTime |
I timer e la durata (TTL) sono entrambi supportati e vengono valutati in base all'ora in cui Apache Spark elabora ogni micro batch. Usare ProcessingTime quando si desidera che i timer vengano attivati a un intervallo fisso rispetto al momento dell'elaborazione delle righe, indipendentemente dai timestamp nei dati. |
EventTime |
I timer sono supportati e vengono valutati in base al watermark del tempo dell'evento. La filigrana avanza man mano che Apache Spark osserva i timestamp nei dati di input. TTL non è supportato con EventTime. Usare EventTime quando i dati contengono timestamp e si desidera che i timer scattino in base all’avanzamento di tali timestamp. Quando si usa EventTime, è necessario specificare anche il eventTimeColumnName parametro . Vedete eventTimeColumnName. |
NoTime o TimeMode.None() |
Timer e TTL non sono supportati. Usare NoTime quando l'applicazione con stato non richiede logica basata sul tempo. |
eventTimeColumnName
Quando si usa la EventTime modalità ora, il eventTimeColumnName parametro specifica il nome della colonna nello schema di output che contiene il timestamp dell'evento. Apache Spark usa questa colonna per propagare il watermark al flusso di output, consentendo corrette operazioni downstream basate sul tempo.
Python
eventTimeColumnName è un argomento aggiuntivo per transformWithState o transformWithStateInPandas:
q = (
df.groupBy("key")
.transformWithState(
statefulProcessor=MyProcessor(),
outputStructType=output_schema,
outputMode="Append",
timeMode="EventTime",
eventTimeColumnName="outputTimestamp",
)
.writeStream...
)
Linguaggio di programmazione Scala
transformWithState accetta eventTimeColumnName al posto di timeMode. Questo approccio usa sempre la EventTime modalità:
val q = spark
.readStream
.format("delta")
.load(srcDeltaTableDir)
.as[(String, String)]
.groupByKey(x => x._1)
.transformWithState(
new MyProcessor(),
"outputTimestamp",
OutputMode.Append(),
)
.writeStream...
Valori timer predefiniti
Databricks sconsiglia di richiamare l'orologio di sistema nella tua applicazione personalizzata con stato, poiché ciò può portare a tentativi di ripetizione inaffidabili in caso di fallimento del task. Usare i metodi nella classe TimerValues quando è necessario accedere al tempo di elaborazione o alla filigrana:
TimerValues |
Descrizione |
|---|---|
getCurrentProcessingTimeInMs |
Restituisce il timestamp del tempo di elaborazione del batch corrente in millisecondi dall'epoca. |
getCurrentWatermarkInMs |
Restituisce il timestamp della filigrana per il batch corrente in millisecondi dall'epoca. |
Nota
Il tempo di elaborazione descrive il tempo in cui il micro batch viene elaborato da Apache Spark. Molte origini di streaming, ad esempio Kafka, includono anche il tempo di elaborazione del sistema.
Le filigrane sulle query di streaming vengono spesso definite in base all'ora dell'evento o al tempo di elaborazione dell'origine di streaming. Vedere Applicare filigrane per controllare le soglie di elaborazione dati.
Sia le filigrane sia le finestre possono essere usate in combinazione con transformWithState. È possibile implementare una funzionalità simile nella vostra applicazione con stato personalizzata sfruttando TTL, timer e la funzionalità MapState o ListState.
Tempo di vita (TTL) per i tipi di stato
Per evitare errori di memoria insufficiente e rimuovere valori di tipo di stato non aggiornati, transformWithState supporta un valore TTL (Time to Live) facoltativo per ogni valore del tipo di stato. Dopo la scadenza, TTL rimuove automaticamente i valori del tipo di stato. TTL non esegue handleExpiredTimer né alcuna logica personalizzata. Per eseguire il codice alla scadenza dello stato, usare invece un timer.
Importante
Se non implementi il TTL, devi gestire l'eliminazione dello stato per evitare errori di memoria esaurita.
Per tutti i tipi di stato, il TTL viene reimpostato quando si aggiornano le informazioni di stato. Il TTL viene applicato per ogni valore del tipo di stato, con regole diverse per ciascun tipo di stato:
- Le variabili di stato hanno come ambito il raggruppamento delle chiavi.
- Per gli oggetti
ValueState, viene archiviato solo un singolo valore per ogni chiave di raggruppamento. Il valore di TTL si applica a questo parametro. - Per gli oggetti
ListState, l'elenco può contenere molti valori. La durata (TTL) si applica a ogni valore in un elenco in modo indipendente.- Anche se il TTL si applica ai singoli valori in un
ListState, l’unico modo per aggiornare un singolo valore è con il metodoput, che sovrascrive l’intero contenuto della variabileListStatee reimposta il TTL per tutti i valori nell’elenco.
- Anche se il TTL si applica ai singoli valori in un
- Per gli oggetti
MapState, ogni chiave della mappa ha un valore di stato associato. La durata (TTL, tempo di vita) si applica in modo indipendente a ciascuna coppia chiave-valore di una mappa.
Nota
I timer consentono di definire logica personalizzata oltre la rimozione dello stato, inclusa l'emissione di righe. Facoltativamente, è possibile usare i timer per cancellare le informazioni sullo stato per un determinato valore di stato e generare valori o attivare la logica condizionale. Vedere Gestire i timer scaduti.
Esempio di applicazione con stato
Nell'esempio seguente viene definito un processore con stato personalizzato, SimpleCounterProcessor, incluse le variabili di stato di esempio.
SimpleCounterProcessor usa ValueState, ListStatee MapState per contare le righe per ogni chiave di raggruppamento.
Python (Pandas)
import pandas as pd
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
output_schema = StructType(
[
StructField("id", StringType(), True),
StructField("countAsString", StringType(), True),
]
)
class SimpleCounterProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
value_state_schema = StructType([StructField("count", IntegerType(), True)])
list_state_schema = StructType([StructField("count", IntegerType(), True)])
self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
# Schema can also be defined using strings and SQL DDL syntax
self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")
def handleInputRows(self, key, rows, timerValues) -> Iterator[pd.DataFrame]:
count = 0
for pdf in rows:
list_state_rows = [(120,), (20,)] # A list of tuples
self.list_state.put(list_state_rows)
self.list_state.appendValue((111,))
self.list_state.appendList(list_state_rows)
pdf_count = pdf.count()
count += pdf_count.get("value")
self.value_state.update((count,)) # Count is passed as a tuple
iter = self.list_state.get()
list_state_value = next(iter)[0]
value = count
user_key = ("user_key",)
if self.map_state.exists():
if self.map_state.containsKey(user_key):
value += self.map_state.getValue(user_key)[0]
self.map_state.updateValue(user_key, (value,)) # Value is a tuple
yield pd.DataFrame({"id": key, "countAsString": str(count)})
q = (df.groupBy("key")
.transformWithStateInPandas(
statefulProcessor=SimpleCounterProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream...
)
Python (basato su righe)
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator
spark.conf.set("spark.sql.streaming.stateStore.providerClass", "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
output_schema = StructType(
[
StructField("id", StringType(), True),
StructField("countAsString", StringType(), True),
]
)
class SimpleCounterProcessor(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
value_state_schema = StructType([StructField("count", IntegerType(), True)])
list_state_schema = StructType([StructField("count", IntegerType(), True)])
self.value_state = handle.getValueState(stateName="valueState", schema=value_state_schema)
self.list_state = handle.getListState(stateName="listState", schema=list_state_schema)
self.map_state = handle.getMapState(stateName="mapState", userKeySchema="name string", valueSchema="count int")
def handleInputRows(self, key, rows: Iterator[Row], timerValues) -> Iterator[Row]:
count = 0
for row in rows:
list_state_rows = [(120,), (20,)] # A list of tuples
self.list_state.put(list_state_rows)
self.list_state.appendValue((111,))
self.list_state.appendList(list_state_rows)
count += 1
self.value_state.update((count,)) # Count is passed as a tuple
iter_list = self.list_state.get()
list_state_value = next(iter_list)[0]
value = count
user_key = ("user_key",)
if self.map_state.exists():
if self.map_state.containsKey(user_key):
value += self.map_state.getValue(user_key)[0]
self.map_state.updateValue(user_key, (value,)) # Value is a tuple
yield Row(id=key, countAsString=str(count))
q = (
df.groupBy("key")
.transformWithState(
statefulProcessor=SimpleCounterProcessor(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
)
.writeStream...
)
Linguaggio di programmazione Scala
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.{Dataset, Encoder, Encoders , DataFrame}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.streaming.stateStore.providerClass","org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
class SimpleCounterProcessor extends StatefulProcessor[String, (String, String), (String, String)] {
@transient private var countState: ValueState[Int] = _
@transient private var listState: ListState[Int] = _
@transient private var mapState: MapState[String, Int] = _
private val longEncoder = Encoders.scalaLong
private val intEncoder = Encoders.scalaInt
private val stringEncoder = Encoders.STRING
override def init(
outputMode: OutputMode,
timeMode: TimeMode): Unit = {
countState = getHandle.getValueState[Int]("countState",
intEncoder, TTLConfig.NONE)
listState = getHandle.getListState[Int]("listState",
intEncoder, TTLConfig.NONE)
mapState = getHandle.getMapState[String, Int]("mapState",
stringEncoder, intEncoder, TTLConfig.NONE)
}
override def handleInputRows(
key: String,
inputRows: Iterator[(String, String)],
timerValues: TimerValues): Iterator[(String, String)] = {
var count = countState.getOption().getOrElse(0)
for (row <- inputRows) {
val listData = Array(120, 20)
listState.put(listData)
listState.appendValue(count)
listState.appendList(listData)
count += 1
}
val iter = listState.get()
var listStateValue = 0
if (iter.hasNext) {
listStateValue = iter.next()
}
countState.update(count)
var value = count
val userKey = "userKey"
if (mapState.exists()) {
if (mapState.containsKey(userKey)) {
value += mapState.getValue(userKey)
}
}
mapState.updateValue(userKey, value)
Iterator((key, count.toString))
}
}
val q = spark
.readStream
.format("delta")
.load("$srcDeltaTableDir")
.as[(String, String)]
.groupByKey(x => x._1)
.transformWithState(
new SimpleCounterProcessor(),
TimeMode.None(),
OutputMode.Update(),
)
.writeStream...
Per altri esempi, vedere Esempio di applicazioni con stato.
Nota
In Python i valori di stato sono tuple. Passate tuple a put e update, e aspettate tuple da get.
Ad esempio, se lo schema per l'oggetto ValueState è un singolo numero intero:
current_value_tuple = value_state.get() # Returns the value state as a tuple
current_value = current_value_tuple[0] # Extracts the first item in the tuple
new_value = current_value + 1 # Calculate a new value
value_state.update((new_value,)) # Pass the new value formatted as a tuple
Usa questo approccio anche per gli elementi in un ListState o per i valori in un MapState.
Emetti righe
È necessario usare handleInputRows o handleExpiredTimer per definire il modo in cui transformWithState genera righe per ogni chiave di raggruppamento. Vedi Gestire le righe di dati in input e Gestire i timer scaduti.
Le applicazioni stateful personalizzate non fanno supposizioni su come utilizzare le informazioni sullo stato. Per una determinata condizione, l'applicazione potrebbe non generare righe, una riga o molte righe.
Nota
È possibile implementare più valori di stato e definire più condizioni per la creazione di righe, ma tutte le righe devono usare lo stesso schema.
Python (Pandas)
Con transformWithStateInPandas, definire lo schema di output con la outputStructType parola chiave .
Generare righe usando un oggetto DataFrame pandas e yield.
Facoltativamente, è possibile yield usare un dataframe vuoto. Se si utilizza la modalità di output update e si emette un DataFrame vuoto, i valori della chiave di raggruppamento vengono aggiornati a null.
Python (basato su righe)
Con transformWithState, definire lo schema di output con la outputStructType parola chiave .
Emettere righe usando un oggetto Row e yield.
Facoltativamente, è possibile restituire un iteratore vuoto. Se si utilizza la modalità di output update e si genera un iteratore vuoto, questo aggiorna i valori della chiave di raggruppamento impostandoli su null.
Linguaggio di programmazione Scala
In Scala, si emettono righe usando un oggetto Iterator. Lo schema deriva automaticamente dallo schema delle righe generate.
Facoltativamente, è possibile restituire un oggetto vuoto Iterator. Se si utilizza la modalità di output update e si emette un Iterator vuoto, questo aggiorna i valori della chiave di raggruppamento impostandoli su null.
Gestire lo stato iniziale
Facoltativamente, è possibile passare uno stato iniziale al primo micro batch.
Ad esempio, è possibile usarlo per:
- Eseguire la migrazione di un flusso di lavoro esistente a una nuova applicazione personalizzata.
- Aggiornare un operatore con stato per modificare lo schema o la logica.
- Risolvere un guasto che non può essere corretto automaticamente e richiede un intervento manuale.
Nota
Usare il lettore dell'archivio di stato per interrogare le informazioni sullo stato da un checkpoint esistente. Consulta le informazioni sullo stato dello Streaming Strutturato.
Se si converte una tabella Delta esistente in un'applicazione con stato, leggere la tabella usando spark.read.table("table_name") e passare il dataframe risultante. È possibile selezionare o modificare, se lo si desidera, i campi per conformarsi alla nuova applicazione stateful.
Si specifica uno stato iniziale usando un dataframe con lo stesso schema di chiave di raggruppamento delle righe di input.
Nota
Python usa handleInitialState per specificare lo stato iniziale durante la definizione di un StatefulProcessor. Scala usa la classe unica StatefulProcessorWithInitialState.
L'esempio seguente inizializza un contatore per chiave a partire da una tabella Delta esistente:
Python (basato su righe)
from pyspark.sql import Row
from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from typing import Iterator
class CounterWithInitialState(StatefulProcessor):
def init(self, handle: StatefulProcessorHandle) -> None:
state_schema = StructType([StructField("count", IntegerType(), True)])
self.count_state = handle.getValueState("countState", state_schema)
def handleInitialState(self, key, initialState: Row, timerValues) -> None:
self.count_state.update((initialState["count"],))
def handleInputRows(self, key, rows: Iterator[Row], timerValues) -> Iterator[Row]:
count = self.count_state.get()[0] if self.count_state.exists() else 0
for _ in rows:
count += 1
self.count_state.update((count,))
yield Row(id=key[0], count=count)
def close(self) -> None:
pass
output_schema = StructType([
StructField("id", StringType(), True),
StructField("count", IntegerType(), True),
])
# Load existing counts as initial state — must use the same grouping key as the input
initial_state = spark.read.table("existing_counts").groupBy("id")
q = (
df.groupBy("id")
.transformWithState(
statefulProcessor=CounterWithInitialState(),
outputStructType=output_schema,
outputMode="Update",
timeMode="None",
initialState=initial_state,
)
.writeStream...
)
Linguaggio di programmazione Scala
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.Encoders
class CounterWithInitialState
extends StatefulProcessorWithInitialState[String, (String, String), (String, String), (String, Int)] {
@transient private var countState: ValueState[Int] = _
override def init(outputMode: OutputMode, timeMode: TimeMode): Unit = {
countState = getHandle.getValueState[Int]("countState", Encoders.scalaInt, TTLConfig.NONE)
}
override def handleInitialState(
key: String, initialState: (String, Int), timerValues: TimerValues): Unit = {
countState.update(initialState._2)
}
override def handleInputRows(
key: String,
rows: Iterator[(String, String)],
timerValues: TimerValues): Iterator[(String, String)] = {
val count = if (countState.exists()) countState.get() else 0
val newCount = count + rows.size
countState.update(newCount)
Iterator((key, newCount.toString))
}
}
// Load existing counts as initial state — must use the same grouping key as the input
val initialState = spark.read.table("existing_counts")
.as[(String, Int)]
.groupByKey(_._1)
val q = spark
.readStream
.format("delta")
.load(srcDeltaTableDir)
.as[(String, String)]
.groupByKey(_._1)
.transformWithState(
new CounterWithInitialState(),
TimeMode.None(),
OutputMode.Update(),
initialState,
)
.writeStream...
Usare transformWithState nelle pipeline dichiarative di Lakeflow Spark
Usare l'operatore transformWithState all'interno di Pipeline dichiarative di Lakeflow Spark per implementare logica arbitraria con stato nelle pipeline di streaming usando Python.
A questo scopo, esegui i passaggi seguenti:
- Definire lo schema di output e la logica del processore con stato per le trasformazioni arbitrarie con stato. Per esempi, vedere Esempi di applicazioni con stato.
- Creare un flusso dichiarativo di Lakeflow Spark pipelines che richiama l'operatore
transformWithStatein un dataframe. Vedi Tutorial: Crea la tua prima pipeline con l'editor di Lakeflow Pipelines. - Esegui la pipeline e convalida i risultati nella tabella di destinazione o nel sink.
Per un esempio che usa transformWithState per monitorare gli heartbeat dei sensori, vedere Esempio: Usare transformWithState per monitorare gli heartbeat dei sensori.