Iscriviti a Google Pub/Sub

Usare il connettore predefinito per sottoscrivere Google Pub/Sub. Questo connettore ha una semantica di elaborazione una e una sola volta per le righe provenienti dal sottoscrittore.

Nota

Pub/Sub potrebbe pubblicare righe duplicate oppure le righe potrebbero arrivare al sottoscrittore non in ordine. È necessario scrivere codice per gestire righe duplicate e non ordinate.

Configurare un flusso Pub/Sub

L'esempio di codice seguente illustra come configurare un flusso strutturato letto da Pub/Sub ed eseguire l'autenticazione con chiavi private.

Python

auth_options = {
    "clientId": client_id,
    "clientEmail": client_email,
    "privateKey": private_key,
    "privateKeyId": private_key_id
}

query = (spark.readStream
  .format("pubsub")
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(auth_options)
  .load()
)

Scala

val authOptions: Map[String, String] =
  Map("clientId" -> clientId,
      "clientEmail" -> clientEmail,
      "privateKey" -> privateKey,
      "privateKeyId" -> privateKeyId)

val query = spark.readStream
  .format("pubsub")
  // Creates a Pub/Sub subscription if one does not already exist with this ID
  .option("subscriptionId", "mysub")
  .option("topicId", "mytopic")
  .option("projectId", "myproject")
  .options(authOptions)
  .load()

SQL

CREATE OR REFRESH STREAMING TABLE pubsub_raw
AS SELECT * FROM STREAM read_pubsub(
  subscriptionId => 'mysub',
  projectId => 'myproject',
  topicId => 'mytopic',
  clientEmail => secret('pubsub-scope', 'clientEmail'),
  clientId => secret('pubsub-scope', 'clientId'),
  privateKeyId => secret('pubsub-scope', 'privateKeyId'),
  privateKey => secret('pubsub-scope', 'privateKey')
);

Per altre opzioni di configurazione, vedere Configurare le opzioni per la lettura in streaming pub/sub.

Configurare l'accesso a Pub/Sub

Le credenziali devono avere i ruoli seguenti:

Ruoli Obbligatorio o facoltativo Modalità di utilizzo del ruolo
roles/pubsub.viewer oppure roles/viewer Richiesto Controlla se la sottoscrizione esiste e ottiene la sottoscrizione.
roles/pubsub.subscriber Richiesto Recupera i dati da una sottoscrizione.
roles/pubsub.editor oppure roles/editor Facoltativo Abilita la creazione di una sottoscrizione se non esiste e consente l'uso deleteSubscriptionOnStreamStop di per eliminare le sottoscrizioni alla terminazione del flusso.

Nota

Se si concede roles/pubsub.viewer e roles/pubsub.subscriber a livello di risorsa anziché a livello di progetto, è necessario applicare entrambi i ruoli sia all'argomento che alla sottoscrizione. Se non si utilizzano i ruoli facoltativi roles/pubsub.editor o roles/editor, non è sufficiente concedere i ruoli richiesti solo all'argomento.

Databricks consiglia di usare segreti quando si usano chiavi. Per autorizzare una connessione sono necessarie le opzioni seguenti:

  • clientEmail
  • clientId
  • privateKey
  • privateKeyId

Informazioni sullo schema Pub/Sub

Lo schema del flusso corrisponde alle righe ottenute da Pub/Sub, come descritto nella tabella seguente:

Campo TIPO
messageId StringType
payload ArrayType[ByteType]
attributes StringType
publishTimestampInMillis LongType

Configurare le opzioni per la lettura in streaming Pub/Sub

Alcune opzioni di configurazione Pub/Sub utilizzano il concetto di fetch invece di micro-batch. Si tratta di un dettaglio di implementazione interno e le opzioni funzionano in modo analogo ad altri connettori Structured Streaming, ad eccezione del fatto che le righe vengono recuperate e quindi elaborate.

Per l'elenco completo delle opzioni, vedere Pub/Sub.

Usare l'elaborazione batch incrementale con Pub/Sub

È possibile usare Trigger.AvailableNow per utilizzare le righe disponibili dalle origini Pub/Sub come batch incrementale.

Azure Databricks registra il timestamp quando si inizia una lettura con l'impostazione Trigger.AvailableNow . Le righe elaborate dal batch includono tutti i dati recuperati in precedenza e tutte le righe appena pubblicate con un timestamp minore del timestamp di inizio registrato. Per altre informazioni, vedere AvailableNow: Elaborazione batch incrementale.

Monitorare le metriche di streaming pub/sub

Le metriche di stato di Structured Streaming segnalano il numero di righe recuperate e pronte per l'elaborazione, le dimensioni delle righe recuperate e pronte per l'elaborazione e il numero di duplicati rilevati dall'avvio del flusso.

Di seguito è riportato un esempio di metriche Pub/Sub:

"metrics" : {
  "numDuplicatesSinceStreamStart" : "1",
  "numRecordsReadyToProcess" : "1",
  "sizeOfRecordsReadyToProcess" : "8"
}

Limiti

Pub/Sub non supporta l'esecuzione speculativa con spark.speculation.