Un grafico del flusso di dati è una pipeline di elaborazione componibile che trasforma i dati mentre si sposta tra origini e destinazioni. Un flusso di dati standard segue una sequenza fissa di arricchimento, filtro, mappa. Un grafico del flusso di dati consente di concatenare le trasformazioni in qualsiasi ordine, diramazione in percorsi paralleli e di aggregare i dati nelle finestre temporali.
Questo articolo illustra in dettaglio la creazione di un grafo del flusso di dati. Per una panoramica dei grafici del flusso di dati e delle trasformazioni disponibili, vedere Panoramica dei grafici del flusso di dati.
Importante
I grafici del flusso di dati supportano attualmente solo endpoint MQTT, Kafka e OpenTelemetry. Altri tipi di endpoint come Data Lake, Microsoft Fabric OneLake, Esplora dati di Azure e Archiviazione locale non sono supportati. Per altre informazioni, vedere Problemi noti.
Prerequisiti
La interfaccia della riga di comando di Azure versione 2.62.0 o successiva installata nel computer di sviluppo. Usare az --version per controllare la versione e az upgrade per aggiornarla, se necessario. Per altre informazioni, vedere Installare il interfaccia della riga di comando di Azure.
Estensione Operazioni di Azure IoT per il interfaccia della riga di comando di Azure. Usare il comando seguente per aggiungere l'estensione o aggiornarla alla versione più recente:
az extension add --upgrade --name azure-iot-ops
Operazioni di Azure IoT versione 1.2 o successiva.
Profilo del flusso di dati. È possibile usare il profilo predefinito.
Endpoint del flusso di dati per l'origine e la destinazione. L'endpoint broker MQTT predefinito funziona per iniziare.
Creare un grafo del flusso di dati
Un grafico del flusso di dati contiene tre tipi di elementi: origini in cui vengono inseriti i dati, trasformati e destinazioni che lo inviano. Connetterli nell'ordine in cui si desidera che i dati vengano trasmessi.
Nell'esperienza di Operazioni, vai alla tua istanza di Operazioni IoT di Azure.
Selezionare Grafico flusso di dati>Crea grafico flusso di dati.
Immettere un nome per il grafico del flusso di dati e selezionare un profilo del flusso di dati. Il profilo predefinito è selezionato per impostazione predefinita.
Costruisci la tua pipeline aggiungendo elementi alla tela.
Aggiungere un'origine: selezionare l'endpoint di origine e configurare gli argomenti per sottoscrivere i messaggi in arrivo.
-
Aggiungi trasformazioni: selezionare una o più trasformazioni per elaborare i dati. Le trasformazioni disponibili includono mappa, filtro, ramo, concatenazione e finestra. Per informazioni dettagliate su ogni tipo di trasformazione, vedere Panoramica dei grafici del flusso di dati.
Aggiungere una destinazione: selezionare l'endpoint di destinazione e configurare l'argomento o il percorso in cui inviare i dati elaborati.
Connettere gli elementi nell'ordine in cui si desidera che i dati vengano trasmessi.
Selezionare Salva per distribuire il grafico del flusso di dati.
Il interfaccia della riga di comando di Azure applica un grafico del flusso di dati da un singolo file di configurazione JSON che contiene tutti i nodi e le connessioni. Usare az iot ops dataflowgraph apply per creare o sostituire il grafico. L'esempio seguente legge i dati relativi alla temperatura, li converte in Fahrenheit e li invia a un argomento di destinazione.
Creare un graph.json file con le proprietà del grafico del flusso di dati.
graph.json Nel file le regole di ogni trasformazione vengono archiviate nel value campo come stringa JSON di escape. Per il formato leggibile delle regole di ogni trasformazione, vedere la procedura per tale tipo di trasformazione.
{
"mode": "Enabled",
"nodes": [
{
"nodeType": "Source",
"name": "sensors",
"sourceSettings": {
"endpointRef": "default",
"dataSources": [
"telemetry/temperature"
]
}
},
{
"nodeType": "Graph",
"name": "convert",
"graphSettings": {
"registryEndpointRef": "default",
"artifact": "azureiotoperations/graph-dataflow-map:1.0.0",
"configuration": [
{
"key": "rules",
"value": "{\"map\":[{\"inputs\":[\"*\"],\"output\":\"*\"},{\"inputs\":[\"temperature\"],\"output\":\"temperature_f\",\"expression\":\"cToF($1)\"}]}"
}
]
}
},
{
"nodeType": "Destination",
"name": "output",
"destinationSettings": {
"endpointRef": "default",
"dataDestination": "telemetry/converted"
}
}
],
"nodeConnections": [
{
"from": {
"name": "sensors"
},
"to": {
"name": "convert"
}
},
{
"from": {
"name": "convert"
},
"to": {
"name": "output"
}
}
]
}
Applicare il file di configurazione. L'oggetto extendedLocation viene aggiunto automaticamente dall'istanza e dal gruppo di risorse, quindi non includerlo nel file.
az iot ops dataflowgraph apply \
--name temperature-processing \
--instance <INSTANCE_NAME> \
--resource-group <RESOURCE_GROUP> \
--config-file graph.json
Il grafico è associato al profilo del default flusso di dati. Per usare un profilo diverso, aggiungere --profile <PROFILE_NAME>.
Creare un file Bicep .bicep con la struttura seguente. In questo esempio viene creato un grafico del flusso di dati che legge i dati relativi alla temperatura, li converte in Fahrenheit e li invia a un argomento di destinazione.
param aioInstanceName string = '<AIO_INSTANCE_NAME>'
param customLocationName string = '<CUSTOM_LOCATION_NAME>'
resource aioInstance 'Microsoft.IoTOperations/instances@2026-03-01' existing = {
name: aioInstanceName
}
resource customLocation 'Microsoft.ExtendedLocation/customLocations@2021-08-31-preview' existing = {
name: customLocationName
}
resource defaultDataflowProfile 'Microsoft.IoTOperations/instances/dataflowProfiles@2026-03-01' existing = {
parent: aioInstance
name: 'default'
}
resource dataflowGraph 'Microsoft.IoTOperations/instances/dataflowProfiles/dataflowGraphs@2026-03-01' = {
parent: defaultDataflowProfile
name: 'temperature-processing'
extendedLocation: {
name: customLocation.id
type: 'CustomLocation'
}
properties: {
profileRef: 'default'
mode: 'Enabled'
nodes: [
{
nodeType: 'Source'
name: 'sensors'
sourceSettings: {
endpointRef: 'default'
dataSources: [
'telemetry/temperature'
]
}
}
{
nodeType: 'Graph'
name: 'convert'
graphSettings: {
registryEndpointRef: 'default'
artifact: 'azureiotoperations/graph-dataflow-map:1.0.0'
configuration: [
{
key: 'rules'
value: '{"map":[{"inputs":["*"],"output":"*"},{"inputs":["temperature"],"output":"temperature_f","expression":"cToF($1)"}]}'
}
]
}
}
{
nodeType: 'Destination'
name: 'output'
destinationSettings: {
endpointRef: 'default'
dataDestination: 'telemetry/converted'
}
}
]
nodeConnections: [
{
from: { name: 'sensors' }
to: { name: 'convert' }
}
{
from: { name: 'convert' }
to: { name: 'output' }
}
]
}
}
Distribuire il file Bicep:
az deployment group create --resource-group <RESOURCE_GROUP> --template-file <FILE>.bicep
Importante
L'uso dei manifesti di distribuzione Kubernetes non è supportato negli ambienti di produzione e deve essere usato solo per il debug e il test.
Creare un file manifesto .yaml Kubernetes con la struttura seguente. In questo esempio viene creato un grafico del flusso di dati che legge i dati relativi alla temperatura, li converte in Fahrenheit e li invia a un argomento di destinazione.
apiVersion: connectivity.iotoperations.azure.com/v1
kind: DataflowGraph
metadata:
name: temperature-processing
namespace: azure-iot-operations
spec:
profileRef: default
nodes:
- nodeType: Source
name: sensors
sourceSettings:
endpointRef: default
dataSources:
- telemetry/temperature
- nodeType: Graph
name: convert
graphSettings:
registryEndpointRef: default
artifact: azureiotoperations/graph-dataflow-map:1.0.0
configuration:
- key: rules
value: |
{
"map": [
{
"inputs": ["*"],
"output": "*"
},
{
"inputs": ["temperature"],
"output": "temperature_f",
"expression": "cToF($1)"
}
]
}
- nodeType: Destination
name: output
destinationSettings:
endpointRef: default
dataDestination: telemetry/converted
nodeConnections:
- from:
name: sensors
to:
name: convert
- from:
name: convert
to:
name: output
Applicare il manifesto:
kubectl apply -f <FILE>.yaml
L'origine definisce dove i dati entrano nella pipeline. Specificare un riferimento all'endpoint e uno o più argomenti.
Nell'editor del grafico del flusso di dati selezionare l'elemento di origine e configurare:
| Impostazione |
Descrizione |
|
Punto finale |
Endpoint del flusso di dati da usare. Selezionare il valore predefinito per il broker MQTT locale. |
|
Topics |
Uno o più argomenti a cui sottoscrivere i messaggi in arrivo. |
La CLI applica l'intero grafo in una sola volta, quindi configura l'origine come nodo Source nel file di configurazione graph.json, quindi esegui az iot ops dataflowgraph apply:
{
"nodeType": "Source",
"name": "sensors",
"sourceSettings": {
"endpointRef": "default",
"dataSources": [
"telemetry/temperature",
"telemetry/humidity"
]
}
}
{
nodeType: 'Source'
name: 'sensors'
sourceSettings: {
endpointRef: 'default'
dataSources: [
'telemetry/temperature'
'telemetry/humidity'
]
}
}
Importante
L'uso dei manifesti di distribuzione Kubernetes non è supportato negli ambienti di produzione e deve essere usato solo per il debug e il test.
- nodeType: Source
name: sensors
sourceSettings:
endpointRef: default
dataSources:
- telemetry/temperature
- telemetry/humidity
Trasforma i dati del processo tra l'origine e la destinazione. Ogni trasformazione fa riferimento a un artefatto predefinito e viene configurato con le regole.
Le trasformazioni predefinite disponibili sono:
| Trasformazione |
Artefatto |
Descrizione |
|
Mappa |
azureiotoperations/graph-dataflow-map:1.0.0 |
Rinominare, ristrutturare, calcolare e copiare i campi. |
|
Filter |
azureiotoperations/graph-dataflow-filter:1.0.0 |
Eliminare i messaggi che corrispondono a una condizione. |
|
Branch |
azureiotoperations/graph-dataflow-branch:1.0.0 |
Instrada ogni messaggio verso un percorso true o false in base a una condizione. |
|
Concatenate |
azureiotoperations/graph-dataflow-concatenate:1.0.0 |
Unire nuovamente due o più percorsi in un unico percorso. |
|
Window |
azureiotoperations/graph-dataflow-window:1.0.0 |
Raccogliere i messaggi in un intervallo di tempo, quindi aggregare. |
Per altre informazioni sull'arricchimento dei messaggi con dati esterni, vedere Arricchire i dati esterni.
Nell'editor del grafico del flusso di dati selezionare Aggiungi trasformazione e scegliere il tipo di trasformazione. Configurare le regole nell'editor visivo.
Ogni trasformazione è un nodo con nodeType impostato su Graph nel graph.json file di configurazione. Le regole della trasformazione sono un oggetto JSON, come questa mappa che converte la temperatura in Fahrenheit:
{
"map": [
{
"inputs": ["temperature"],
"output": "temperature_f",
"expression": "cToF($1)"
}
]
}
La proprietà configuration accetta queste regole come stringa, quindi il JSON delle regole viene sottoposto all'escape e inserito nel campo value. Applicare il grafico completo con az iot ops dataflowgraph apply:
{
"nodeType": "Graph",
"name": "convert",
"graphSettings": {
"registryEndpointRef": "default",
"artifact": "azureiotoperations/graph-dataflow-map:1.0.0",
"configuration": [
{
"key": "rules",
"value": "{\"map\":[{\"inputs\":[\"temperature\"],\"output\":\"temperature_f\",\"expression\":\"cToF($1)\"}]}"
}
]
}
}
Suggerimento
Per generare la stringa di escape, salvare le regole in un file come rules.json, quindi eseguire jq -c . rules.json e incollare l'output a riga singola nel value campo.
Ogni trasformazione è un nodo con nodeType: 'Graph'. La configuration proprietà passa le regole come stringa JSON:
{
nodeType: 'Graph'
name: 'convert'
graphSettings: {
registryEndpointRef: 'default'
artifact: 'azureiotoperations/graph-dataflow-map:1.0.0'
configuration: [
{
key: 'rules'
value: '{"map":[{"inputs":["temperature"],"output":"temperature_f","expression":"cToF($1)"}]}'
}
]
}
}
Importante
L'uso dei manifesti di distribuzione Kubernetes non è supportato negli ambienti di produzione e deve essere usato solo per il debug e il test.
Ogni trasformazione è un nodo con nodeType: Graph. La configuration proprietà passa le regole come stringa JSON:
- nodeType: Graph
name: convert
graphSettings:
registryEndpointRef: default
artifact: azureiotoperations/graph-dataflow-map:1.0.0
configuration:
- key: rules
value: |
{
"map": [
{
"inputs": ["temperature"],
"output": "temperature_f",
"expression": "cToF($1)"
}
]
}
È possibile concatenare qualsiasi numero di trasformazioni. Connetterli nella nodeConnections sezione nell'ordine in cui si vuole che i dati vengano trasmessi:
Trascinare le connessioni tra trasformazioni nell'area di disegno per definire l'ordine di elaborazione.
Definire l'ordine di elaborazione nella nodeConnections sezione del file di graph.json configurazione:
"nodeConnections": [
{
"from": {
"name": "sensors"
},
"to": {
"name": "remove-bad-data"
}
},
{
"from": {
"name": "remove-bad-data"
},
"to": {
"name": "convert"
}
},
{
"from": {
"name": "convert"
},
"to": {
"name": "output"
}
}
]
nodeConnections: [
{ from: { name: 'sensors' }, to: { name: 'remove-bad-data' } }
{ from: { name: 'remove-bad-data' }, to: { name: 'convert' } }
{ from: { name: 'convert' }, to: { name: 'output' } }
]
Importante
L'uso dei manifesti di distribuzione Kubernetes non è supportato negli ambienti di produzione e deve essere usato solo per il debug e il test.
nodeConnections:
- from: { name: sensors }
to: { name: remove-bad-data }
- from: { name: remove-bad-data }
to: { name: convert }
- from: { name: convert }
to: { name: output }
La destinazione definisce dove vengono inviati i dati elaborati. Specificare un riferimento all'endpoint e un argomento o un percorso.
Selezionare l'elemento di destinazione e configurare:
| Impostazione |
Descrizione |
|
Punto finale |
Endpoint del flusso di dati a cui inviare i dati. |
|
Topic |
Argomento o percorso in cui pubblicare i dati elaborati. |
Configura la destinazione come un nodo Destination nel file di configurazione graph.json, quindi applica il grafo completo con az iot ops dataflowgraph apply:
{
"nodeType": "Destination",
"name": "output",
"destinationSettings": {
"endpointRef": "default",
"dataDestination": "telemetry/processed"
}
}
{
nodeType: 'Destination'
name: 'output'
destinationSettings: {
endpointRef: 'default'
dataDestination: 'telemetry/processed'
}
}
Importante
L'uso dei manifesti di distribuzione Kubernetes non è supportato negli ambienti di produzione e deve essere usato solo per il debug e il test.
- nodeType: Destination
name: output
destinationSettings:
endpointRef: default
dataDestination: telemetry/processed
Per il routing di argomenti dinamici in base al contenuto dei messaggi, vedere Instradare i messaggi a diversi argomenti.
Verificare che il grafico del flusso di dati funzioni
Dopo aver distribuito un grafo del flusso di dati, verificare che sia in esecuzione:
Nell'esperienza Operazioni selezionare il grafico del flusso di dati per visualizzarne lo stato. Un grafico integro mostra uno stato In esecuzione.
Usare az iot ops dataflowgraph show per visualizzare i dettagli di un grafico:
az iot ops dataflowgraph show \
--name temperature-processing \
--instance <INSTANCE_NAME> \
--resource-group <RESOURCE_GROUP>
Per elencare tutti i grafici del flusso di dati associati a un profilo, usare az iot ops dataflowgraph list:
az iot ops dataflowgraph list \
--instance <INSTANCE_NAME> \
--resource-group <RESOURCE_GROUP>
Controllare lo stato della DataflowGraph risorsa:
az resource show --resource-group <RESOURCE_GROUP> --resource-type Microsoft.IoTOperations/instances/dataflowProfiles/dataflowGraphs --name <GRAPH_NAME> --parent instances/<INSTANCE_NAME>/dataflowProfiles/<PROFILE_NAME>
Importante
L'uso dei manifesti di distribuzione Kubernetes non è supportato negli ambienti di produzione e deve essere usato solo per il debug e il test.
kubectl get dataflowgraph temperature-processing -n azure-iot-operations
Controllare i log dei pod per eventuali errori:
kubectl logs -l app=dataflow -n azure-iot-operations --tail=50
Passaggi successivi