Streaming da Apache Pulsar

Importante

Questa funzionalità è disponibile in anteprima pubblica.

In Databricks Runtime 14.1 e versioni successive è possibile usare Structured Streaming per trasmettere dati da Apache Pulsar in Azure Databricks.

Structured Streaming offre una semantica di elaborazione exactly-once per i dati letti da sorgenti Pulsar.

Esempio di sintassi

Di seguito è riportato un esempio di base dell'uso di Structured Streaming per leggere da Pulsar:

Python

query = (spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()
)

Scala

val query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .load()

Per leggere dai topic di Pulsar, è necessario fornire un service.url e una delle seguenti opzioni:

  • topic
  • topics
  • topicsPattern

Per un elenco completo delle opzioni, vedere Configurare le opzioni per la lettura in streaming Pulsar.

Eseguire l'autenticazione a Pulsar

Azure Databricks supporta l'autenticazione tramite truststore e keystore per Pulsar. Databricks consiglia di usare i segreti per archiviare i dettagli di configurazione.

Per l'elenco completo delle opzioni di autenticazione, vedere Autenticazione.

Example

L'esempio seguente illustra la configurazione delle opzioni di autenticazione:

Python

client_auth_params = dbutils.secrets.get(scope="pulsar", key="clientAuthParams")
client_pw = dbutils.secrets.get(scope="pulsar", key="clientPw")

# clientAuthParams is a comma-separated list of key-value pairs, such as:
# "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

query = (spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", starting_offsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", client_auth_params)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trust_store_path)
  .option("pulsar.client.tlsTrustStorePassword", client_pw)
  .load()
)

Scala

val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")

// clientAuthParams is a comma-separated list of key-value pairs, such as:
// "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"

val query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topics", "topic1,topic2")
  .option("startingOffsets", startingOffsets)
  .option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
  .option("pulsar.client.authParams", clientAuthParams)
  .option("pulsar.client.useKeyStoreTls", "true")
  .option("pulsar.client.tlsTrustStoreType", "JKS")
  .option("pulsar.client.tlsTrustStorePath", trustStorePath)
  .option("pulsar.client.tlsTrustStorePassword", clientPw)
  .load()

Schema Pulsar

Quando si legge da Pulsar, lo schema delle righe dipende dagli schemi degli argomenti dell'origine.

  • Per gli argomenti con lo schema Avro o JSON, i nomi dei campi e i tipi di campo vengono mantenuti nel dataframe Spark risultante.
  • Per gli argomenti senza schema o con un tipo di dati semplice in Pulsar, il payload viene caricato in una value colonna.
  • Se si configura il flusso per leggere più argomenti con schemi diversi, impostare allowDifferentTopicSchemas per caricare il contenuto non elaborato in una value colonna.

I record Pulsar hanno i campi di metadati seguenti:

colonna Tipo
__key binary
__topic string
__messageId binary
__publishTime timestamp
__eventTime timestamp
__messageProperties map<String, String>

Configurare le opzioni per la lettura in streaming Pulsar

Per l'elenco completo delle opzioni, vedere Pulsar.

Crea offset iniziali in JSON

Per usare un ID messaggio personalizzato che specifica un offset, come JSON, con l'opzione startingOffsets , vedere l'esempio seguente:

import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl

val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))

query = spark.readStream
  .format("pulsar")
  .option("service.url", "pulsar://broker.example.com:6650")
  .option("topic", topic)
  .option("startingOffsets", startOffsets)
  .load()