Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
Disponibile nelle tabelle Delta Lake in Databricks Runtime 15.4 LTS e versioni successive, l'estensione dei tipi consente di modificare i tipi di dati delle colonne in un tipo più ampio senza riscrivere i file di dati.
Per impostazione predefinita, tutte le tabelle gestite del catalogo Unity usano Delta Lake. Vedi le tabelle gestite di Unity Catalog per Delta Lake e Apache Iceberg.
Nota
L'abilitazione dell'ampliamento dei tipi aggiorna i protocolli di lettura e scrittura. Ciò potrebbe influire sulla compatibilità con i client Delta Lake esterni. Vedere Compatibilità e protocolli delle funzionalità delta Lake.
Le tabelle con tipo esteso abilitato possono essere lette solo da Databricks Runtime 15.4 LTS e versioni successive.
Cambiamenti di tipo supportati
È possibile ampliare i tipi in base alle regole seguenti:
| Tipo di sorgente | Tipi più ampi supportati |
|---|---|
BYTE |
SHORT, INT, BIGINT, DECIMALDOUBLE |
SHORT |
INT, BIGINT, DECIMALDOUBLE |
INT |
BIGINT, DECIMAL, DOUBLE |
BIGINT |
DECIMAL |
FLOAT |
DOUBLE |
DECIMAL |
DECIMAL con maggiore precisione e scalabilità |
DATE |
TIMESTAMP_NTZ |
VOID |
Qualsiasi tipo |
Le modifiche ai tipi sono supportate per le colonne di primo livello e i campi annidati all'interno di struct, mappe e matrici.
Nota
VOID per qualsiasi tipo non è necessario abilitare l'estensione del tipo nella tabella. Qualsiasi operazione che aggiorna il tipo di una VOID colonna ha esito positivo senza una configurazione aggiuntiva.
VOID L'estensione dei tipi è disponibile in Databricks Runtime 18.2 e versioni successive.
Comportamento decimale
Spark tronca la parte frazionaria di un valore per impostazione predefinita quando un'operazione promuove un tipo intero in un decimal o double e un'operazione di inserimento a valle scrive nuovamente il valore in una colonna intero. Per informazioni dettagliate sul comportamento dei criteri di assegnazione, vedere Assegnazione dello Store.
Quando si modifica un tipo numerico in decimal, la precisione totale deve essere uguale o maggiore della precisione iniziale. Se si aumenta anche la scala, la precisione totale deve aumentare di una quantità corrispondente.
L'obiettivo minimo per i tipi byte, shorte int è decimal(10,0). L'obiettivo minimo per long è decimal(20,0).
Se si desidera aggiungere due cifre decimali a un campo con decimal(10,1), il valore minimo è decimal(12,3).
Abilitare l'estensione del tipo
Nota
L'abilitazione dell'ampliamento del tipo aggiorna i protocolli di lettura e scrittura. Ciò potrebbe influire sulla compatibilità con i client Delta Lake esterni. Vedere Compatibilità e protocolli delle funzionalità delta Lake.
È possibile abilitare l'ampliamento del tipo in una tabella esistente impostando la proprietà della tabella delta.enableTypeWidening su true:
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
È anche possibile abilitare l'ampliamento dei tipi durante la creazione della tabella:
CREATE TABLE T(c1 INT) TBLPROPERTIES('delta.enableTypeWidening' = 'true')
Applicare manualmente una modifica di tipo
Usare il ALTER COLUMN comando per modificare manualmente i tipi:
ALTER TABLE <table_name> ALTER COLUMN <col_name> TYPE <new_type>
Questa operazione aggiorna lo schema della tabella senza riscrivere i file di dati sottostanti. Per altri dettagli, vedere ALTER TABLE.
Ampliare i tipi con evoluzione automatica dello schema
Usare l'evoluzione dello schema con l'estensione del tipo per aggiornare i tipi di dati nelle tabelle di destinazione in modo che corrispondano al tipo di dati in ingresso.
Nota
Senza l'estensione del tipo abilitata, l'evoluzione dello schema tenta sempre di eseguire il downcast dei dati in modo che corrispondano ai tipi di colonna nella tabella di destinazione. Se non si vogliono ampliare automaticamente i tipi di dati nelle tabelle di destinazione, è necessario disattivare l'estensione dei tipi prima di eseguire carichi di lavoro con l'evoluzione dello schema abilitata.
Per usare l'evoluzione dello schema per ampliare il tipo di dati di una colonna durante l'inserimento, è necessario soddisfare le condizioni seguenti:
- Il comando di scrittura viene eseguito con l'evoluzione automatica dello schema abilitata.
- La tabella di destinazione è abilitata per l'ampliamento del tipo.
- Il tipo di colonna di origine è più ampio del tipo di colonna di destinazione.
- L'ampliamento del tipo supporta la modifica del tipo.
Le discrepanze di tipo che non soddisfano tutte queste condizioni seguono le normali regole di applicazione dello schema. Vedi Applicazione dello schema.
Example
Negli esempi seguenti viene illustrato il funzionamento dell'estensione del tipo con l'evoluzione dello schema.
Python
Creare una tabella di destinazione con una INT colonna e una tabella di origine con una BIGINT colonna:
spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")
Usare saveAsTable() con l'evoluzione dello schema per ampliare automaticamente la INT colonna a BIGINT durante un'aggiunta:
spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")
Usare MERGE INTO con l'evoluzione dello schema:
from delta.tables import DeltaTable
source_df = spark.table("source_table")
target_table = DeltaTable.forName(spark, "target_table")
(target_table.alias("target")
.merge(source_df.alias("source"), "target.id = source.id")
.withSchemaEvolution()
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute()
)
Scala
Creare una tabella di destinazione con una INT colonna e una tabella di origine con una BIGINT colonna:
spark.sql("CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true')")
spark.sql("CREATE TABLE source_table (id BIGINT, data STRING)")
Usare saveAsTable() con l'evoluzione dello schema per ampliare automaticamente la INT colonna a BIGINT durante un'aggiunta:
spark.table("source_table").write.mode("append").option("mergeSchema", "true").saveAsTable("target_table")
Usare MERGE INTO con l'evoluzione dello schema:
import io.delta.tables.DeltaTable
val sourceDf = spark.table("source_table")
val targetTable = DeltaTable.forName(spark, "target_table")
targetTable.alias("target")
.merge(sourceDf.alias("source"), "target.id = source.id")
.withSchemaEvolution()
.whenMatched().updateAll()
.whenNotMatched().insertAll()
.execute()
SQL
Creare una tabella di destinazione con una INT colonna e una tabella di origine con una BIGINT colonna:
CREATE TABLE target_table (id INT, data STRING) TBLPROPERTIES ('delta.enableTypeWidening' = 'true');
CREATE TABLE source_table (id BIGINT, data STRING);
Utilizzare INSERT INTO con l'evoluzione dello schema per estendere automaticamente la colonna INT a BIGINT durante un'operazione di append:
INSERT WITH SCHEMA EVOLUTION INTO target_table SELECT * FROM source_table;
Usare MERGE INTO con l'evoluzione dello schema:
MERGE WITH SCHEMA EVOLUTION INTO target_table
USING source_table
ON target_table.id = source_table.id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
Caricatore automatico
Importante
Il supporto per l'ampliamento dei tipi nel caricatore automatico è disponibile in anteprima pubblica.
Auto Loader supporta l'ampliamento dei tipi con l'evoluzione automatica dello schema. Quando si usa il caricatore automatico per inserire i dati in una tabella Delta Lake con tipo esteso ed evoluzione dello schema abilitata, i tipi di colonna vengono automaticamente ampliati in modo che corrispondano ai dati in ingresso.
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "<path-to-schema-location>")
.load("<path-to-source-data>")
.writeStream
.option("mergeSchema", "true")
.option("checkpointLocation", "<path-to-checkpoint>")
.trigger(availableNow=True)
.toTable("table_name")
)
Vedere Ampliamento automatico dei tipi con il caricatore automatico. Inoltre, per la tabella di destinazione deve essere abilitato l'ampliamento dei tipi di dati. Vedere Abilitare l'estensione del tipo.
Disabilitare la funzionalità della tabella di ampliamento del tipo
È possibile impedire l'ampliamento accidentale dei tipi nelle tabelle abilitate impostando la proprietà su false:
ALTER TABLE <table_name> SET TBLPROPERTIES ('delta.enableTypeWidening' = 'false')
Questa impostazione impedisce modifiche future del tipo nella tabella, ma non rimuove la funzionalità di ampliamento del tipo nella tabella o annulla le modifiche del tipo precedenti.
Se è necessario rimuovere completamente le funzionalità di tabella con estensione del tipo, è possibile usare il comando DROP FEATURE come illustrato nell'esempio seguente:
ALTER TABLE <table-name> DROP FEATURE 'typeWidening' [TRUNCATE HISTORY]
Nota
Le tabelle che hanno abilitato l'ampliamento del tipo usando Databricks Runtime 15.4 LTS richiedono invece di eliminare la funzionalità typeWidening-preview .
Quando si elimina l'allargamento dei tipi, Databricks riscrive tutti i file di dati non conformi allo schema di tabella corrente. Vedere Eliminare una funzionalità di tabella Delta Lake e effettuare il downgrade del protocollo di tabella.
Streaming da una tabella di Delta Lake
Il supporto per l'ampliamento dei tipi in Structured Streaming è disponibile in Databricks Runtime 16.4 LTS e versioni successive.
Quando si esegue lo streaming da una tabella Delta Lake con tipo esteso abilitato, è possibile configurare l'estensione automatica dei tipi per le query di streaming abilitando l'evoluzione dello schema con l'opzione mergeSchema nella tabella di destinazione. È necessario che la tabella di destinazione abbia abilitata l'estensione del tipo. Vedere Abilitare l'estensione del tipo.
Python
(spark.readStream
.table("delta_source_table")
.writeStream
.option("checkpointLocation", "/path/to/checkpointLocation")
.option("mergeSchema", "true")
.toTable("output_table")
)
Scala
spark.readStream
.table("delta_source_table")
.writeStream
.option("checkpointLocation", "/path/to/checkpointLocation")
.option("mergeSchema", "true")
.toTable("output_table")
Quando mergeSchema è abilitato e l'ampliamento del tipo è attivato nella tabella di destinazione:
- Le modifiche al tipo vengono applicate automaticamente alla tabella a valle senza la necessità di intervento manuale.
- Le nuove colonne vengono aggiunte automaticamente allo schema della tabella downstream.
Senza mergeSchema abilitato, i valori vengono gestiti secondo la configurazione spark.sql.storeAssignmentPolicy, che per impostazione predefinita esegue il casting verso il basso dei valori per farli corrispondere al tipo di colonna di destinazione. Per altre informazioni sul comportamento dei criteri di assegnazione, vedere Assegnazione dello Store.
Gestire le modifiche dei tipi in un flusso
Quando si esegue lo streaming da una tabella Delta Lake, è possibile fornire un percorso di rilevamento dello schema per tenere traccia delle modifiche dello schema non additive, incluse le modifiche al tipo. Fornire un percorso di rilevamento dello schema è necessario in Databricks Runtime 18.0 e versioni successive ed è facoltativo in Databricks Runtime 18.1 e versioni successive.
Non è possibile impostare un schemaTrackingLocation oggetto usando SQL. Vedere Funzionalità non supportate.
schemaTrackingLocation deve essere impostato su una posizione all'interno dello stesso percorso del checkpoint di streaming. Per esempio:
Python
checkpoint_path = "/path/to/checkpointLocation"
(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)
Scala
val checkpointPath = "/path/to/checkpointLocation"
spark.readStream
.option("schemaTrackingLocation", checkpointPath)
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpointPath)
.toTable("output_table")
Dopo aver impostato una posizione di monitoraggio dello schema, il flusso aggiorna lo schema monitorato quando rileva un cambiamento di tipo e quindi si arresta. A quel punto, è necessario gestire il cambiamento di tipo, ad esempio abilitando l'estensione del tipo nella tabella a valle o aggiornando la query di streaming.
Per riprendere l'elaborazione, impostare la configurazione Spark spark.databricks.delta.streaming.allowSourceColumnTypeChange oppure l'opzione del reader DataFrameallowSourceColumnTypeChange, come nell'esempio seguente:
Python
checkpoint_path = "/path/to/checkpointLocation"
(spark.readStream
.option("schemaTrackingLocation", checkpoint_path)
.option("allowSourceColumnTypeChange", "<delta_source_table_version>")
# alternatively to allow all future type changes for this stream:
# .option("allowSourceColumnTypeChange", "always")
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpoint_path)
.toTable("output_table")
)
Scala
val checkpointPath = "/path/to/checkpointLocation"
spark.readStream
.option("schemaTrackingLocation", checkpointPath)
.option("allowSourceColumnTypeChange", "<delta_source_table_version>")
// alternatively to allow all future type changes for this stream:
// .option("allowSourceColumnTypeChange", "always")
.table("delta_source_table")
.writeStream
.option("checkpointLocation", checkpointPath)
.toTable("output_table")
SQL
-- To unblock for this particular stream just for this series of schema change(s):
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange.ckpt_<checkpoint_id> = "<delta_source_table_version>"
-- To unblock for this particular stream:
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "<delta_source_table_version>"
-- To unblock for all streams:
SET spark.databricks.delta.streaming.allowSourceColumnTypeChange = "always"
Quando il flusso si arresta, viene visualizzato un messaggio di errore che visualizza l'ID <checkpoint_id> del checkpoint e la versione della <delta_source_table_version>tabella di origine Delta Lake.
Per un elenco completo delle opzioni delta Lake di streaming, vedere Delta Lake.
Pipeline dichiarative di Lakeflow Spark
È possibile abilitare l'ampliamento dei tipi per le pipeline dichiarative di Lakeflow Spark a livello di pipeline o per singole tabelle. L'estensione dei tipi consente di ampliare automaticamente i tipi di colonna durante l'esecuzione della pipeline senza richiedere un aggiornamento completo delle tabelle di streaming. Le modifiche ai tipi nelle viste materializzate attivano sempre una ricomputazione completa e quando una modifica del tipo viene applicata a una tabella di origine, le viste materializzate che dipendono da tale tabella richiedono una ricomputazione completa per riflettere i nuovi tipi.
Abilitare l'ampliamento dei tipi per un'intera pipeline
Per abilitare l'ampliamento dei tipi per tutte le tabelle in una pipeline, impostare la configurazione della pipeline pipelines.enableTypeWidening.
JSON
{
"configuration": {
"pipelines.enableTypeWidening": "true"
}
}
YAML
configuration:
pipelines.enableTypeWidening: 'true'
Abilitare l'ampliamento dei tipi per tabelle specifiche
È anche possibile abilitare l'ampliamento del tipo per le singole tabelle impostando la proprietà delta.enableTypeWideningtable :
Python
import dlt
@dlt.table(
table_properties={"delta.enableTypeWidening": "true"}
)
def my_table():
return spark.readStream.table("source_table")
SQL
CREATE OR REFRESH STREAMING TABLE my_table
TBLPROPERTIES ('delta.enableTypeWidening' = 'true')
AS SELECT * FROM source_table
Compatibilità con i lettori downstream
Le tabelle con tipo esteso abilitato possono essere lette solo in Databricks Runtime 15.4 LTS e versioni successive. Se si vuole che una tabella con tipo esteso abilitato nella pipeline sia leggibile dai lettori in Databricks Runtime 14.3 e versioni successive, è necessario:
- Disattivare l'ampliamento del tipo rimuovendo la proprietà
delta.enableTypeWidening/pipelines.enableTypeWideningo impostandola su false e attivando un aggiornamento completo della tabella. - Abilitare la modalità di compatibilità nella tabella.
OpenSharing
Nota
Il supporto di Type Widening in OpenSharing è disponibile in Databricks Runtime 16.1 e versioni successive.
La condivisione di una tabella Delta Lake con l’ampliamento dei tipi abilitato è supportata da Databricks-to-Databricks OpenSharing. Il provider e il destinatario devono trovarsi in Databricks Runtime 16.1 o versione successiva.
Per leggere il feed delle modifiche ai dati da una tabella Delta Lake con l'estensione dei tipi abilitata tramite OpenSharing, è necessario impostare il formato della risposta su delta:
spark.read
.format("deltaSharing")
.option("responseFormat", "delta")
.option("readChangeFeed", "true")
.option("startingVersion", "<start version>")
.option("endingVersion", "<end version>")
.load("<table>")
La lettura del feed di dati delle modifiche tra le modifiche al tipo non è supportata. È invece necessario suddividere l'operazione in due letture separate, una che termina alla versione della tabella contenente la modifica del tipo e l'altra a partire dalla versione contenente la modifica del tipo.
Limitations
Compatibilità di Apache Iceberg
Apache Iceberg non supporta tutte le modifiche ai tipi previste dall'ampliamento dei tipi. Vedere Iceberg Schema Evolution.
Le modifiche al tipo non supportate includono quanto segue:
-
byte,short,int,longadecimalodouble - aumento della scala decimale
-
dateatimestampNTZ
Quando si abilita UniForm con la compatibilità Iceberg in una tabella Delta Lake, l'applicazione di una delle modifiche al tipo precedente genera un errore. Vedi Leggi le tabelle Delta Lake con i client Iceberg usando UniForm.
Se si applica una di queste modifiche di tipo non supportate a una tabella Delta Lake, sono disponibili due opzioni:
Rigenerare i metadati dell'Iceberg: usare il comando seguente per rigenerare i metadati di Iceberg senza la funzionalità di tabella di tipo esteso:
ALTER TABLE <table-name> SET TBLPROPERTIES ('delta.universalFormat.config.icebergCompatVersion' = '<version>')In questo modo è possibile mantenere la compatibilità uniforme dopo l'applicazione di modifiche di tipo incompatibili.
Rimuovere la funzionalità di ampliamento dei tipi della tabella: vedere Disabilitare la funzionalità di ampliamento dei tipi della tabella.
Funzioni dipendenti dal tipo
Alcune funzioni SQL restituiscono risultati che dipendono dal tipo di dati di input. Ad esempio, la funzione restituiscehash valori hash diversi per lo stesso valore logico se il tipo di argomento è diverso: hash(1::INT) restituisce un risultato diverso da hash(1::BIGINT).
Altre funzioni dipendenti dal tipo sono: xxhash64, bit_get, bit_reverse, typeof.
Per ottenere risultati stabili nelle query che utilizzano queste funzioni, è necessario convertire esplicitamente i valori nel tipo desiderato:
Python
spark.read.table("table_name") \
.selectExpr("hash(CAST(column_name AS BIGINT))")
Scala
spark.read.table("main.johan_lasperas.dlt_type_widening_bronze2")
.selectExpr("hash(CAST(a AS BIGINT))")
SQL
-- Use explicit casting for stable hash values
SELECT hash(CAST(column_name AS BIGINT)) FROM table_name
Funzionalità non supportate
- Non è possibile impostare una posizione di rilevamento dello schema usando SQL durante lo streaming da una tabella Delta Lake con una modifica del tipo.
- Non è possibile condividere una tabella con l'estensione dei tipi abilitata con consumer esterni a Databricks tramite OpenSharing.