Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Protokollpuffer (Protobuf) ist ein sprachneutrales binäres Serialisierungsformat, das von Google entwickelt wurde. Azure Databricks-Nutzer begegnen ihm am häufigsten bei der Verarbeitung von binärkodierten Datensätzen aus Event-Streaming-Systemen wie Apache Kafka. Azure Databricks unterstützt das Lesen und Schreiben von Protobuf-Daten mit Apache Spark über die from_protobuf und to_protobuf Funktionen, die zwischen binären Protobuf- und Spark SQL-Strukturtypen für Streaming- und Batchworkloads konvertieren.
Voraussetzungen
Protobuf-Funktionen erfordern Databricks Runtime 12.2 LTS und höher.
Funktionssyntax
Verwenden Sie from_protobuf, um eine Binärspalte in eine Struct-Spalte umzuwandeln, und to_protobuf, um eine Struct-Spalte in Binärformat umzuwandeln. Sie müssen entweder eine Beschreibungsdatei angeben, die durch das descFilePath Argument identifiziert wird, oder eine Schemaregistrierung, die mit dem options Argument angegeben ist. Eine vollständige Liste der Optionen finden Sie unter Protobuf.
Python
from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
Scala
// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
Options
Übergeben Sie Optionen an from_protobuf und to_protobuf mithilfe des options-Arguments. Eine vollständige Liste der unterstützten Optionen finden Sie unter Protobuf.
Optionen für die Schema Registry
Die folgenden Optionen sind spezifisch für die Schemaregistrierungsverwendung und werden in der allgemeinen Optionsreferenz nicht behandelt.
| Option | Erforderlich | Standard | Description |
|---|---|---|---|
schema.registry.schema.evolution.mode |
Nein | "restart" |
Wie Schemaänderungen behandelt werden, wenn eine neuere Schema-ID in einem eingehenden Datensatz erkannt wird.
"restart" beendet die Abfrage mit einem UnknownFieldException; konfiguriert Aufträge, um neu zu starten, wenn Änderungen nicht aufgenommen werden.
"none" ignoriert Schema-ID-Änderungen und analysiert neuere Datensätze mit dem ursprünglichen Schema. |
confluent.schema.registry.<option> |
Nein | — | Geben Sie jede Confluent Schema Registry-Clientoption mithilfe des Präfixes "confluent.schema.registry" an. Legen Sie zum Beispiel "confluent.schema.registry.basic.auth.credentials.source" auf "USER_INFO" und "confluent.schema.registry.basic.auth.user.info" auf "<KEY>:<SECRET>" fest, um Basic-Auth zu konfigurieren. |
Usage
In den folgenden Beispielen wird das Wanderbricks-Dataset verwendet, um die Serialisierung von Apache Spark-Strukturen für binäre Protobuf-Datensätze mit to_protobuf() und deserialisieren von binären Protobuf-Datensätzen mit from_protobuf()zu veranschaulichen.
Verwenden von Protokollpuffern mit der Schemaregistrierung von Confluent
Azure Databricks unterstützt die Verwendung der Confluent Schema Registry zum Definieren von Protobuf.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
from pyspark.sql.functions import struct
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://schema-registry:8081/"
}
# Serialize Wanderbricks reviews to binary Protobuf using schema registry
reviews_df = spark.read.table("samples.wanderbricks.reviews")
proto_bytes_df = reviews_df.select(
to_protobuf(struct("review_id", "rating", "comment"), options=schema_registry_options).alias("proto_bytes")
)
# Deserialize binary Protobuf records back to a struct
reviews_restored_df = proto_bytes_df.select(
from_protobuf("proto_bytes", options=schema_registry_options).alias("proto_event")
)
display(reviews_restored_df)
Scala
import org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions.struct
import scala.collection.JavaConverters._
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://schema-registry:8081/"
)
// Serialize Wanderbricks reviews to binary Protobuf using schema registry
val reviewsDF = spark.read.table("samples.wanderbricks.reviews")
val protoBytesDF = reviewsDF.select(
to_protobuf(struct($"review_id", $"rating", $"comment"), options = schemaRegistryOptions.asJava)
.as("proto_bytes")
)
// Deserialize binary Protobuf records back to a struct
val reviewsRestoredDF = protoBytesDF.select(
from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
.as("proto_event")
)
reviewsRestoredDF.show()
Authentifizieren Sie sich bei einer externen Confluent-Schemaregistrierung
Aktualisieren Sie für die Authentifizierung bei einer externen Schemaregistrierung von Confluent Ihre Schemaregistrierungsoptionen so, dass sie Authentifizierungsanmeldeinformationen und API-Schlüssel enthalten.
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)
Verwenden von Truststore- und Keystore-Dateien in Unity Catalog-Volumes
In Databricks Runtime 14.3 LTS und höher können Sie Truststore- und Keystoredateien in Unity-Katalogvolumes verwenden, um sich bei einer Confluent-Schemaregistrierung zu authentifizieren. Aktualisieren Sie die Schemaregistrierungsoptionen gemäß dem folgenden Beispiel:
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" : "<password>",
"confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" : "<password>",
"confluent.schema.registry.ssl.key.password" : "<password>"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "<password>",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" -> "<password>",
"confluent.schema.registry.ssl.key.password" -> "<password>"
)
Verwendung von Protobuf mit einer Descriptor-Datei
Sie können auch auf eine protobuf-Deskriptordatei verweisen, die für Ihren Rechencluster verfügbar ist. Stellen Sie sicher, dass Sie je nach Speicherort über die richtigen Berechtigungen zum Lesen der Datei verfügen.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
from pyspark.sql.functions import struct
descriptor_file = "/path/to/proto_descriptor.desc"
# Serialize Wanderbricks reviews to binary Protobuf using a descriptor file
reviews_df = spark.read.table("samples.wanderbricks.reviews")
proto_bytes_df = reviews_df.select(
to_protobuf(struct("review_id", "rating", "comment"), "Review", descriptor_file).alias("proto_bytes")
)
# Deserialize binary Protobuf records back to a struct
reviews_restored_df = proto_bytes_df.select(
from_protobuf("proto_bytes", "Review", descFilePath=descriptor_file).alias("review")
)
display(reviews_restored_df)
Scala
import org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions.struct
val descriptorFile = "/path/to/proto_descriptor.desc"
// Serialize Wanderbricks reviews to binary Protobuf using a descriptor file
val reviewsDF = spark.read.table("samples.wanderbricks.reviews")
val protoBytesDF = reviewsDF.select(
to_protobuf(struct($"review_id", $"rating", $"comment"), "Review", descriptorFile).as("proto_bytes")
)
// Deserialize binary Protobuf records back to a struct
val reviewsRestoredDF = protoBytesDF.select(
from_protobuf($"proto_bytes", "Review", descFilePath=descriptorFile).as("review")
)
reviewsRestoredDF.show()
Weitere Ressourcen
-
Avro-Streamingdaten lesen und schreiben: Wenn Ihre Streaming-Workload Avro-Serialisierung anstelle von Protobuf verwendet, siehe die Avro-Streamingfunktionen für die entsprechenden Funktionen
from_avroundto_avro.