Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
CSV (durch Trennzeichen getrennte Werte) ist ein tabellarisches Nur-Text-Format, das häufig für den Datenaustausch, ETL-Pipelines und die allgemeine Datenspeicherung verwendet wird. Azure Databricks unterstützt CSV sowohl zum Lesen als auch zum Schreiben mit Apache Spark, einschließlich Schemainferenz, Komprimierung, der Behandlung fehlerhafter Datensätze und Rettungsdaten.
Hinweis
Databricks empfiehlt die read_files Tabellenwertfunktion für SQL-Benutzer, CSV-Dateien zu lesen.
read_files ist in Databricks Runtime 13.3 LTS und höher verfügbar.
Sie können auch eine temporäre Ansicht verwenden. Wenn Sie SQL verwenden, um CSV-Daten direkt zu lesen, ohne temporäre Ansichten oder read_files zu verwenden, gelten die folgenden Einschränkungen:
- Sie können keine Datenquellenoptionen angeben.
- Sie können das Schema für die Daten nicht angeben.
Voraussetzungen
Azure Databricks erfordert keine zusätzliche Konfiguration, um CSV-Dateien zu verwenden. Zum Streamen von CSV-Dateien benötigen Sie jedoch das automatische Laden.
Optionen
Verwenden Sie zum Konfigurieren von CSV-Datenquellen die Methoden .option() und .options() von DataFrameReader und DataFrameWriter. Eine vollständige Liste der unterstützten Optionen finden Sie unter DataFrameReader CSV-Optionen und DataFrameWriter CSV-Optionen.
Usage
Die folgenden Beispiele veranschaulichen das Lesen und Schreiben von CSV-Dateien, das Angeben von Schemas und das Behandeln falsch formatierter Datensätze.
Lesen von CSV-Dateien
Im folgenden Beispiel wird das Wanderbricks-Beispiel-Dataset verwendet. Es schreibt Rezensionsdaten in CSV und liest sie dann zurück.
Python
# Write wanderbricks reviews to CSV format
df = spark.read.table("samples.wanderbricks.reviews")
df.write.format("csv").option("header", "true").save("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
# Read the CSV file into a DataFrame
df = (spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv"))
display(df)
df.printSchema()
Scala
// Write wanderbricks reviews to CSV format
val reviews = spark.read.table("samples.wanderbricks.reviews")
reviews.write.format("csv").option("header", "true").save("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
// Read the CSV file into a DataFrame
val df = spark.read
.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.show()
df.printSchema()
R
df <- read.df("/Volumes/<catalog>/<schema>/<volume>/reviews_csv", source = "csv", header = "true", inferSchema = "true")
display(df)
printSchema(df)
Lesen von CSV-Dateien mit SQL
Im folgenden SQL-Beispiel wird eine CSV-Datei mit read_files gelesen.
-- mode "FAILFAST" aborts file parsing with a RuntimeException if malformed lines are encountered
SELECT * FROM read_files(
'abfss://<bucket>@<storage-account>.dfs.core.windows.net/<path>/<file>.csv',
format => 'csv',
header => true,
mode => 'FAILFAST')
Angeben eines Schemas
Wenn das Schema der CSV-Datei bekannt ist, können Sie das gewünschte Schema für den CSV-Reader mit der schema-Option angeben.
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True)
])
df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.printSchema()
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("review_id", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true),
StructField("comment", StringType, nullable = true)
))
val df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.printSchema()
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
schema => 'review_id string, rating int, comment string'
)
Teilmenge von Spalten einlesen
Das Verhalten des CSV-Parsers hängt davon ab, welche Spalten gelesen werden. Wenn das angegebene Schema nicht mit dem Dateilayout übereinstimmt, können sich die Ergebnisse je nachdem, auf welche Spalten zugegriffen wird, erheblich unterscheiden. CSV weist keine Spaltennamenmetadaten auf, sodass Spark Schemafelder Spalten nach Position zuordnet – ein nicht übereinstimmender Schema verschiebt Werte in die falschen Felder.
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Read only a subset of columns by specifying a partial schema
schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True)
])
df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
display(df)
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("review_id", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true)
))
val df = spark.read.format("csv").schema(schema).option("header", "true").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.show()
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
schema => 'review_id string, rating int'
)
Behandeln falsch formatierter CSV-Einträge
Beim Lesen von CSV-Dateien mit einem angegebenen Schema kann es vorkommen, dass die Daten in den Dateien nicht zum Schema passen. Beispielsweise kann ein Feld, das den Namen der Stadt enthält, nicht als ganze Zahl analysiert werden. Die Folgen hängen vom Modus ab, in dem der Parser ausgeführt wird:
-
PERMISSIVE(Standardeinstellung): NULL-Werte werden für Felder eingefügt, die nicht ordnungsgemäß analysiert werden konnten -
DROPMALFORMED: Löscht Zeilen, die Felder enthalten, die nicht analysiert werden konnten -
FAILFAST: Bricht das Lesen ab, wenn falsch formatierte Daten gefunden werden
Verwenden Sie zum Festlegen des Modus die mode-Option.
Python
df = (spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
)
Scala
val df = spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
mode => 'PERMISSIVE'
)
Im Modus PERMISSIVE ist es möglich, die Zeilen, die nicht korrekt geparst werden konnten, mit einer der folgenden Methoden zu untersuchen:
- Sie können einen benutzerdefinierten Pfad zur Option
badRecordsPathangeben, um beschädigte Datensätze in eine Datei aufzuzeichnen. - Sie können die Spalte
_corrupt_recorddem Schema hinzufügen, das dem DataFrameReader bereitgestellt wird, um beschädigte Datensätze im resultierenden DataFrame zu überprüfen.
Hinweis
Die Option badRecordsPath hat Vorrang vor _corrupt_record, was bedeutet, dass fehlerhafte Zeilen, die in den angegebenen Pfad geschrieben werden, nicht im resultierenden DataFrame angezeigt werden.
Standardverhalten für falsch formatierte Datensätze ändert sich bei Verwendung der geretteten Datenspalte.
Um fehlerhaft formatierte Zeilen mit _corrupt_record zu prüfen, fügen Sie es dem Schema hinzu und filtern Sie nach Werten ungleich Null:
Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
schema = StructType([
StructField("review_id", StringType(), True),
StructField("rating", IntegerType(), True),
StructField("comment", StringType(), True),
StructField("_corrupt_record", StringType(), True)
])
df = (spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.schema(schema)
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
)
display(df.filter(df["_corrupt_record"].isNotNull()))
Scala
import org.apache.spark.sql.types._
val schema = StructType(Array(
StructField("review_id", StringType, nullable = true),
StructField("rating", IntegerType, nullable = true),
StructField("comment", StringType, nullable = true),
StructField("_corrupt_record", StringType, nullable = true)
))
val df = spark.read
.format("csv")
.option("header", "true")
.option("mode", "PERMISSIVE")
.schema(schema)
.load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
df.filter(df("_corrupt_record").isNotNull).show()
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
mode => 'PERMISSIVE',
schema => 'review_id string, rating int, comment string, _corrupt_record string'
)
WHERE _corrupt_record IS NOT NULL
Aktivieren der Spalte "Gerettete Daten"
Hinweis
Dieses Feature wird in Databricks Runtime 8.3 und höher unterstützt.
Wenn Sie den PERMISSIVE Modus verwenden, können Sie die Spalte für gerettete Daten aktivieren, um daten zu erfassen, die nicht analysiert wurden, da eines oder mehrere Felder in einem Datensatz eines der folgenden Probleme aufweisen:
- Fehlt im bereitgestellten Schema.
- Stimmt nicht mit dem Datentyp des bereitgestellten Schemas überein.
- Weist einen Fallkonflikt mit den Feldnamen im angegebenen Schema auf.
Die gerettete Datenspalte wird als JSON-Dokument zurückgegeben, das die geretteten Spalten und den Quelldateipfad des Datensatzes enthält.
Um die gerettete Datenspalte zu aktivieren, legen Sie beim Lesen die rescuedDataColumn Option auf einen Spaltennamen fest:
Python
df = spark.read.option("rescuedDataColumn", "_rescued_data").format("csv").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
Scala
val df = spark.read.option("rescuedDataColumn", "_rescued_data").format("csv").load("/Volumes/<catalog>/<schema>/<volume>/reviews_csv")
SQL
SELECT * FROM read_files(
'/Volumes/<catalog>/<schema>/<volume>/reviews_csv',
format => 'csv',
header => true,
rescuedDataColumn => '_rescued_data'
)
Wenn Sie den Quelldateipfad aus der Spalte für die geretteten Daten entfernen möchten, legen Sie Folgendes fest:
spark.conf.set("spark.databricks.sql.rescuedDataColumn.filePath.enabled", "false")
Der CSV-Parser unterstützt drei Modi beim Parsen von Datensätzen: PERMISSIVE, DROPMALFORMED und FAILFAST. Bei Verwendung mit rescuedDataColumn führen Datentypkonflikte nicht dazu, dass Datensätze im Modus DROPMALFORMED gelöscht werden oder im Modus FAILFAST einen Fehler auslösen. Nur beschädigte Datensätze, d. h. unvollständige oder falsch formatierte CSV-Dateien, werden verworfen oder führen zu Fehlern.
Wenn der rescuedDataColumn im PERMISSIVE-Modus verwendet wird, gelten die folgenden Regeln für „beschädigte Datensätze“:
- Die erste Zeile der Datei (entweder eine Kopfzeile oder eine Datenzeile) legt die erwartete Zeilenlänge fest.
- Eine Zeile mit einer anderen Anzahl von Spalten wird als unvollständig betrachtet.
- Datentypinkompatibilitäten werden nicht als beschädigte Datensätze betrachtet.
- Nur unvollständige und falsch formatierte CSV-Datensätze werden als beschädigt betrachtet und in der Spalte
_corrupt_recordoderbadRecordsPathaufgezeichnet.
Weitere Ressourcen
- Lesen und Schreiben von Parkettdateien: Wenn Ihre Arbeitsauslastung eine bessere Abfrageleistung oder effizientere Speicherung erfordert, bietet das Spaltenlayout von Parkett erhebliche Vorteile gegenüber dem Nur-Text-Format von CSV.