Lesen und Schreiben von Protokollpuffern

Protokollpuffer (Protobuf) ist ein sprachneutrales binäres Serialisierungsformat, das von Google entwickelt wurde. Azure Databricks-Nutzer begegnen ihm am häufigsten bei der Verarbeitung von binärkodierten Datensätzen aus Event-Streaming-Systemen wie Apache Kafka. Azure Databricks unterstützt das Lesen und Schreiben von Protobuf-Daten mit Apache Spark über die from_protobuf und to_protobuf Funktionen, die zwischen binären Protobuf- und Spark SQL-Strukturtypen für Streaming- und Batchworkloads konvertieren.

Voraussetzungen

Protobuf-Funktionen erfordern Databricks Runtime 12.2 LTS und höher.

Funktionssyntax

Verwenden Sie from_protobuf, um eine Binärspalte in eine Struct-Spalte umzuwandeln, und to_protobuf, um eine Struct-Spalte in Binärformat umzuwandeln. Sie müssen entweder eine Beschreibungsdatei angeben, die durch das descFilePath Argument identifiziert wird, oder eine Schemaregistrierung, die mit dem options Argument angegeben ist. Eine vollständige Liste der Optionen finden Sie unter Protobuf.

Python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

Scala

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

Options

Übergeben Sie Optionen an from_protobuf und to_protobuf mithilfe des options-Arguments. Eine vollständige Liste der unterstützten Optionen finden Sie unter Protobuf.

Optionen für die Schema Registry

Die folgenden Optionen sind spezifisch für die Schemaregistrierungsverwendung und werden in der allgemeinen Optionsreferenz nicht behandelt.

Option Erforderlich Standard Description
schema.registry.schema.evolution.mode Nein "restart" Wie Schemaänderungen behandelt werden, wenn eine neuere Schema-ID in einem eingehenden Datensatz erkannt wird. "restart" beendet die Abfrage mit einem UnknownFieldException; konfiguriert Aufträge, um neu zu starten, wenn Änderungen nicht aufgenommen werden. "none" ignoriert Schema-ID-Änderungen und analysiert neuere Datensätze mit dem ursprünglichen Schema.
confluent.schema.registry.<option> Nein Geben Sie jede Confluent Schema Registry-Clientoption mithilfe des Präfixes "confluent.schema.registry" an. Legen Sie zum Beispiel "confluent.schema.registry.basic.auth.credentials.source" auf "USER_INFO" und "confluent.schema.registry.basic.auth.user.info" auf "<KEY>:<SECRET>" fest, um Basic-Auth zu konfigurieren.

Usage

In den folgenden Beispielen wird das Wanderbricks-Dataset verwendet, um die Serialisierung von Apache Spark-Strukturen für binäre Protobuf-Datensätze mit to_protobuf() und deserialisieren von binären Protobuf-Datensätzen mit from_protobuf()zu veranschaulichen.

Verwenden von Protokollpuffern mit der Schemaregistrierung von Confluent

Azure Databricks unterstützt die Verwendung der Confluent Schema Registry zum Definieren von Protobuf.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
from pyspark.sql.functions import struct

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Serialize Wanderbricks reviews to binary Protobuf using schema registry
reviews_df = spark.read.table("samples.wanderbricks.reviews")
proto_bytes_df = reviews_df.select(
    to_protobuf(struct("review_id", "rating", "comment"), options=schema_registry_options).alias("proto_bytes")
)

# Deserialize binary Protobuf records back to a struct
reviews_restored_df = proto_bytes_df.select(
    from_protobuf("proto_bytes", options=schema_registry_options).alias("proto_event")
)
display(reviews_restored_df)

Scala

import org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions.struct
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Serialize Wanderbricks reviews to binary Protobuf using schema registry
val reviewsDF = spark.read.table("samples.wanderbricks.reviews")
val protoBytesDF = reviewsDF.select(
    to_protobuf(struct($"review_id", $"rating", $"comment"), options = schemaRegistryOptions.asJava)
        .as("proto_bytes")
)

// Deserialize binary Protobuf records back to a struct
val reviewsRestoredDF = protoBytesDF.select(
    from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
        .as("proto_event")
)
reviewsRestoredDF.show()

Authentifizieren Sie sich bei einer externen Confluent-Schemaregistrierung

Aktualisieren Sie für die Authentifizierung bei einer externen Schemaregistrierung von Confluent Ihre Schemaregistrierungsoptionen so, dass sie Authentifizierungsanmeldeinformationen und API-Schlüssel enthalten.

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

Verwenden von Truststore- und Keystore-Dateien in Unity Catalog-Volumes

In Databricks Runtime 14.3 LTS und höher können Sie Truststore- und Keystoredateien in Unity-Katalogvolumes verwenden, um sich bei einer Confluent-Schemaregistrierung zu authentifizieren. Aktualisieren Sie die Schemaregistrierungsoptionen gemäß dem folgenden Beispiel:

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

Verwendung von Protobuf mit einer Descriptor-Datei

Sie können auch auf eine protobuf-Deskriptordatei verweisen, die für Ihren Rechencluster verfügbar ist. Stellen Sie sicher, dass Sie je nach Speicherort über die richtigen Berechtigungen zum Lesen der Datei verfügen.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
from pyspark.sql.functions import struct

descriptor_file = "/path/to/proto_descriptor.desc"

# Serialize Wanderbricks reviews to binary Protobuf using a descriptor file
reviews_df = spark.read.table("samples.wanderbricks.reviews")
proto_bytes_df = reviews_df.select(
    to_protobuf(struct("review_id", "rating", "comment"), "Review", descriptor_file).alias("proto_bytes")
)

# Deserialize binary Protobuf records back to a struct
reviews_restored_df = proto_bytes_df.select(
    from_protobuf("proto_bytes", "Review", descFilePath=descriptor_file).alias("review")
)
display(reviews_restored_df)

Scala

import org.apache.spark.sql.protobuf.functions._
import org.apache.spark.sql.functions.struct

val descriptorFile = "/path/to/proto_descriptor.desc"

// Serialize Wanderbricks reviews to binary Protobuf using a descriptor file
val reviewsDF = spark.read.table("samples.wanderbricks.reviews")
val protoBytesDF = reviewsDF.select(
    to_protobuf(struct($"review_id", $"rating", $"comment"), "Review", descriptorFile).as("proto_bytes")
)

// Deserialize binary Protobuf records back to a struct
val reviewsRestoredDF = protoBytesDF.select(
    from_protobuf($"proto_bytes", "Review", descFilePath=descriptorFile).as("review")
)
reviewsRestoredDF.show()

Weitere Ressourcen

  • Avro-Streamingdaten lesen und schreiben: Wenn Ihre Streaming-Workload Avro-Serialisierung anstelle von Protobuf verwendet, siehe die Avro-Streamingfunktionen für die entsprechenden Funktionen from_avro und to_avro.