Uso del caricatore automatico con il catalogo Unity

Il caricatore automatico può inserire in modo sicuro i dati da posizioni esterne configurate con Unity Catalog. Per altre informazioni sulla connessione sicura dell'archiviazione con il catalogo unity, vedere Connettersi all'archiviazione di oggetti cloud tramite il catalogo unity. Il caricatore automatico si basa su Structured Streaming per l'elaborazione incrementale; per indicazioni e limitazioni, vedere Uso del catalogo Unity con Structured Streaming.

Nota

In Databricks Runtime 11.3 LTS e versioni successive è possibile usare Il caricatore automatico con modalità di accesso standard o dedicate (in precedenza modalità di accesso condiviso e utente singolo).

La modalità elenco directory è supportata per impostazione predefinita.

Specificare i percorsi per le risorse di Auto Loader per Unity Catalog

Il modello di sicurezza del catalogo Unity presuppone che tutte le posizioni di archiviazione a cui si fa riferimento in un carico di lavoro vengano gestite dal catalogo unity. Databricks consiglia di archiviare sempre le informazioni sull'evoluzione dei checkpoint e dello schema nelle posizioni di archiviazione gestite dal catalogo Unity. Il Unity Catalog non consente di inserire file di checkpoint, di inferenza dello schema o di evoluzione nella directory della tabella.

Inserire dati dall'archiviazione cloud usando Il catalogo unity

Gli esempi seguenti presuppongono che l'utente in esecuzione disponga READ FILES delle autorizzazioni sulla posizione esterna, dei privilegi di proprietario per le tabelle di destinazione e delle configurazioni e autorizzazioni seguenti.

Nota

Azure Data Lake Storage è l'unico tipo di archiviazione di Azure supportato dal catalogo unity.

Posizione di archiviazione Sovvenzione
abfss://autoloader-source@<storage-account>.dfs.core.windows.net/json-data READ FILES
abfss://dev-bucket@<storage-account>.dfs.core.windows.net READ FILES, WRITE FILES, CREATE TABLE

Usare il caricatore automatico per caricare in una tabella gestita del catalogo Unity

Gli esempi seguenti illustrano come usare il caricatore automatico per inserire dati in una tabella gestita di Unity Catalog.

Python

checkpoint_path = "abfss://dev-bucket@<storage-account>.dfs.core.windows.net/_checkpoint/dev_table"

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load("abfss://autoloader-source@<storage-account>.dfs.core.windows.net/json-data")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable("dev_catalog.dev_database.dev_table"))

SQL

CREATE OR REFRESH STREAMING TABLE dev_catalog.dev_database.dev_table
AS SELECT * FROM STREAM read_files(
  'abfss://autoloader-source@<storage-account>.dfs.core.windows.net/json-data',
  format => 'json'
);

Quando si utilizza read_files all'interno di un'istruzione CREATE STREAMING TABLE in Lakeflow Spark Declarative Pipelines, le posizioni del checkpoint e dello schema vengono gestite automaticamente.

Usare il caricatore automatico per caricare in una tabella esterna del catalogo Unity

Per mantenere i dati in una posizione di archiviazione specifica, usare una tabella esterna del catalogo Unity anziché una tabella gestita. Ad esempio, usare una tabella esterna per condividere dati con client non Databricks o per registrare i dati esistenti. Con le tabelle esterne, si imposta il percorso di archiviazione. Consultare Lavorare con tabelle esterne.

Per usare Auto Loader con una tabella esterna di Unity Catalog, registrare innanzitutto la tabella con CREATE TABLE ... LOCATION e quindi eseguire lo streaming nella tabella usando il relativo nome. La posizione della tabella deve trovarsi all'interno di una posizione esterna per la quale si dispone delle autorizzazioni CREATE EXTERNAL TABLE. La posizione del checkpoint deve anch'essa trovarsi in una posizione esterna gestita da Unity Catalog. Usare un percorso separato dai dati della tabella.

checkpoint_path = "abfss://dev-bucket@<storage-account>.dfs.core.windows.net/_checkpoint/dev_table"
table_path = "abfss://dev-bucket@<storage-account>.dfs.core.windows.net/external/dev_table"

# One-time: register the external table in UC.
spark.sql(f"""
  CREATE TABLE IF NOT EXISTS dev_catalog.dev_database.dev_table
  USING DELTA
  LOCATION '{table_path}'
""")

(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load("abfss://autoloader-source@<storage-account>.dfs.core.windows.net/json-data")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable("dev_catalog.dev_database.dev_table"))