Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Importante
Questa funzionalità è disponibile in anteprima pubblica.
In Databricks Runtime 14.1 e versioni successive è possibile usare Structured Streaming per trasmettere dati da Apache Pulsar in Azure Databricks.
Structured Streaming offre una semantica di elaborazione exactly-once per i dati letti da sorgenti Pulsar.
Esempio di sintassi
Di seguito è riportato un esempio di base dell'uso di Structured Streaming per leggere da Pulsar:
Python
query = (spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
)
Scala
val query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
Per leggere dai topic di Pulsar, è necessario fornire un service.url e una delle seguenti opzioni:
topictopicstopicsPattern
Per un elenco completo delle opzioni, vedere Configurare le opzioni per la lettura in streaming Pulsar.
Eseguire l'autenticazione a Pulsar
Azure Databricks supporta l'autenticazione tramite truststore e keystore per Pulsar. Databricks consiglia di usare i segreti per archiviare i dettagli di configurazione.
Per l'elenco completo delle opzioni di autenticazione, vedere Autenticazione.
Example
L'esempio seguente illustra la configurazione delle opzioni di autenticazione:
Python
client_auth_params = dbutils.secrets.get(scope="pulsar", key="clientAuthParams")
client_pw = dbutils.secrets.get(scope="pulsar", key="clientPw")
# clientAuthParams is a comma-separated list of key-value pairs, such as:
# "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = (spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", starting_offsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", client_auth_params)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trust_store_path)
.option("pulsar.client.tlsTrustStorePassword", client_pw)
.load()
)
Scala
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
// "keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
val query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Schema Pulsar
Quando si legge da Pulsar, lo schema delle righe dipende dagli schemi degli argomenti dell'origine.
- Per gli argomenti con lo schema Avro o JSON, i nomi dei campi e i tipi di campo vengono mantenuti nel dataframe Spark risultante.
- Per gli argomenti senza schema o con un tipo di dati semplice in Pulsar, il payload viene caricato in una
valuecolonna. - Se si configura il flusso per leggere più argomenti con schemi diversi, impostare
allowDifferentTopicSchemasper caricare il contenuto non elaborato in unavaluecolonna.
I record Pulsar hanno i campi di metadati seguenti:
| colonna | Tipo |
|---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
Configurare le opzioni per la lettura in streaming Pulsar
Per l'elenco completo delle opzioni, vedere Pulsar.
Crea offset iniziali in JSON
Per usare un ID messaggio personalizzato che specifica un offset, come JSON, con l'opzione startingOffsets , vedere l'esempio seguente:
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()