Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Apache Avro è un formato di serializzazione dei dati basato su righe che fornisce strutture di dati avanzate e una codifica binaria compatta e veloce. Gli utenti di Azure Databricks lo incontrano più spesso durante l'acquisizione di dati da sistemi di streaming di eventi come Apache Kafka e Google Pub/Sub, dove Avro è il formato di serializzazione predominante. Azure Databricks supporta Avro sia per la lettura che per la scrittura con Apache Spark, inclusa la conversione automatica dello schema tra tipi Avro e Spark SQL, partizionamento, compressione e nomi di record personalizzati.
Se stai leggendo record codificati in Avro da Apache Kafka o da un altro bus di messaggi anziché da file, consulta Leggere e scrivere dati Avro in streaming, che illustra le funzioni from_avro e to_avro usate per la deserializzazione in streaming.
Prerequisiti
Azure Databricks non richiede una configurazione aggiuntiva per l'uso dei file Avro. Tuttavia, per trasmettere i file Avro, è necessario il caricatore automatico.
Options
Usare i metodi .option() e .options() di DataFrameReader e DataFrameWriter per configurare le origini dati Avro. Per un elenco completo delle opzioni supportate, vedere DataFrameReader Opzioni avro e DataFrameWriter opzioni avro.
Usage
Gli esempi seguenti usano il set di dati Wanderbricks per illustrare la lettura e la scrittura di file Avro usando l'API Spark DataFrame e SQL.
Leggere i file Avro con SQL
Per eseguire query sui file Avro senza registrare una tabella, usare read_files. Le autorizzazioni del catalogo Unity per la posizione esterna vengono applicate automaticamente.
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_avro',
format => 'avro'
)
Leggere e scrivere file Avro
Usare l'API DataFrame apache Spark quando è necessario leggere o scrivere file Avro per un sistema downstream, applicare trasformazioni prima del caricamento o opzioni di controllo, ad esempio il partizionamento e lo schema in fase di scrittura.
Gli esempi seguenti usano il set di dati di esempio Wanderbricks .
Pitone
from pyspark.sql.functions import year, month
# Write wanderbricks reviews to Avro format
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("avro").save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
# Read an Avro file into a DataFrame
df = spark.read.format("avro").load("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
display(df)
# Write with overwrite mode
df.write.format("avro").mode("overwrite").save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
# Read using a custom Avro schema to select specific fields
avro_schema = """
{
"type": "record",
"name": "Review",
"fields": [
{"name": "review_id", "type": "string"},
{"name": "rating", "type": "int"},
{"name": "comment", "type": ["null", "string"]}
]
}
"""
df = spark.read.format("avro").option("avroSchema", avro_schema).load("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
# Write partitioned Avro files by year and month
df = spark.read.table("samples.wanderbricks.bookings")
df_with_parts = df.withColumn("year", year("check_in")).withColumn("month", month("check_in"))
df_with_parts.write.format("avro").partitionBy("year", "month").save("/Volumes/<catalog>/<schema>/<volume>/bookings_avro_partitioned")
# Write with a custom record name and namespace for Schema Registry compatibility
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("avro").options(
recordName="Review",
recordNamespace="com.wanderbricks"
).save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
Linguaggio di programmazione Scala
import org.apache.spark.sql.functions.{year, month}
// Write wanderbricks reviews to Avro format
val reviews = spark.read.table("samples.wanderbricks.reviews")
reviews.write.format("avro").save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
// Read an Avro file into a DataFrame
val df = spark.read.format("avro").load("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
df.show()
// Write with overwrite mode
df.write.format("avro").mode("overwrite").save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
// Read using a custom Avro schema to select specific fields
val avroSchema = """
{
"type": "record",
"name": "Review",
"fields": [
{"name": "review_id", "type": "string"},
{"name": "rating", "type": "int"},
{"name": "comment", "type": ["null", "string"]}
]
}
"""
val filtered = spark.read.format("avro").option("avroSchema", avroSchema).load("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
// Write partitioned Avro files by year and month
val bookings = spark.read.table("samples.wanderbricks.bookings")
val bookingsWithParts = bookings.withColumn("year", year(col("check_in"))).withColumn("month", month(col("check_in")))
bookingsWithParts.write.format("avro").partitionBy("year", "month").save("/Volumes/<catalog>/<schema>/<volume>/bookings_avro_partitioned")
// Write with a custom record name and namespace for Schema Registry compatibility
reviews.write.format("avro").options(Map(
"recordName" -> "Review",
"recordNamespace" -> "com.wanderbricks"
)).save("/Volumes/<catalog>/<schema>/<volume>/reviews_avro")
SQL
-- Write wanderbricks reviews to Avro format
CREATE TABLE reviews_avro
USING AVRO
AS SELECT * FROM samples.wanderbricks.reviews;
-- Write partitioned Avro files by year and month
CREATE TABLE bookings_avro_partitioned
USING AVRO
PARTITIONED BY (year, month)
AS SELECT *, year(check_in) AS year, month(check_in) AS month
FROM samples.wanderbricks.bookings;
SELECT * FROM bookings_avro_partitioned;
Risorse aggiuntive
- Leggere e scrivere file Parquet: se il carico di lavoro è principalmente analitico e di lettura piuttosto che in streaming o in scrittura, il layout a colonne di Parquet offre prestazioni di query più efficienti rispetto all'archiviazione basata su righe di Avro.