Dans cet article,vous aprendez à
- Ingérer des données externes dans des lakehouses Fabric avec Spark
- Configurer l’authentification et l’optimisation de la source externe
- Charger des données dans un lakehouse sous forme de fichiers ou de tables Delta
Vous devez disposer d’une licence Microsoft Fabric pour effectuer cet exercice. Pour plus d’informations sur l’activation d’une licence d’essai Fabric gratuite, consultez Bien démarrer avec Fabric.
Introduction
Les notebooks Fabric offrent la flexibilité quant à la possibilité d’extraire, de charger et de transformer des données externes dans votre lakehouse, en s’adaptant à votre workflow. Bien qu’une expérience antérieure avec Spark ou un notebook puisse être utile, elle n’est pas obligatoire.
Les notebooks Fabric sont le meilleur choix si vous :
- Gérez de grands jeux de données externes
- Avez Besoin de transformations complexes
Autre avantages avec les notebooks fabric
– Contrairement aux chargements manuels, les notebooks fournissent une automatisation, garantissant une approche fluide et systématique.
– Les flux de données offrent une expérience d’interface utilisateur, mais ils ne sont pas aussi performants avec des jeux de données volumineux.
– Les pipelines vous permettent d’orchestrer les Copier des données et peuvent nécessiter des flux de données ou notebooks pour les transformations.
– Les notebooks fournissent ainsi une solution complète et automatisée pour l’ingestion et la transformation.
À l’instar des autres notebooks, les notebooks Fabric vous permettent d’avoir plusieurs codes ou cellules Markdown. Les notebooks sont excellents pour les tests initiaux, car vous pouvez voir la sortie du code directement en ligne avec le code et apporter de rapides modifications. Vous pouvez également exécuter des cellules individuelles, figer des cellules ou exécuter toutes les cellules d’un notebook.
Par défaut, les notebooks Fabric utilisent PySpark. Ce dernier utilise le moteur Spark pour autoriser une transaction distribuée multithread pour des processus rapides. Vous pouvez également utiliser HTML, Spark (Scala), Spark SQL et SparkR (R), mais ils peuvent ne pas pleinement bénéficier du système distribué.
Les notebooks Fabric sont facilement mis à l’échelle pour les données volumineuses. L’optimisation de la lecture et de l’écriture est donc essentielle. Envisagez ces fonctions d’optimisation pour une ingestion de données plus performante.
V-Order permet des lectures plus rapides et efficaces par différents moteurs de calcul, tels que Power BI, SQL et Spark.V-order effectue un tri, une distribution, un encodage et une compression spéciaux sur les fichiers Parquet au moment de l’écriture.
Optimiser l’écriture améliore les performances et la fiabilité, en réduisant le nombre de fichiers écrits et en augmentant leur taille. Cela est utile pour les scénarios avec des tables Delta ayant des tailles de fichier non optimales ou non standard, ou encore avec une latence d’écriture supplémentaire tolérable.
# Enable V-Order
spark.conf.set("spark.sql.parquet.vorder.enabled", "true")
# Enable automatic Delta optimized write
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
Conseil
Pour plus d’informations, consultez Optimisation et V-Order de la table Delta Lake.
CAS PRATIQUE : Ingérer des données avec des notebooks Spark et Microsoft Fabric
1. Créer un espace de travail
Avant de travailler avec des données dans Fabric, créez un espace de travail avec la version d’essai Fabric activée.
- Sur la page d’accueil de Microsoft Fabric , sélectionnez Synapse Data Engineering .
- Dans la barre de menu de gauche, sélectionnez Espaces de travail (l’icône ressemble à 🗇).
- Créez un nouvel espace de travail avec le nom de votre choix, en sélectionnant un mode de licence qui inclut la capacité Fabric ( Trial , Premium ou Fabric ).
-
Lorsque votre nouvel espace de travail s’ouvre, il devrait être vide.
2. Créer un espace de travail et une destination Lakehouse
Commencez par créer un nouveau Lakehouse et un dossier de destination dans le Lakehouse.
-
Depuis votre espace de travail, sélectionnez + Nouveau > Lakehouse , fournissez un nom et Créer .
Remarque : La création d’un nouveau Lakehouse sans Tables ni Files peut prendre quelques minutes .
-
Dans Fichiers , sélectionnez le […] pour créer un nouveau sous-dossier nommé RawData .
-
Dans Lakehouse Explorer dans Lakehouse, sélectionnez Files > … > Properties .
-
Copiez le chemin ABFS du dossier RawData dans un bloc-notes vide pour une utilisation ultérieure, qui devrait ressembler à ceci :
abfss://{workspace_name}@onelake.dfs.fabric.microsoft.com/{lakehouse_name}.Lakehouse/Files/{folder_name}/{file_name}
Vous devriez maintenant disposer d’un espace de travail avec un Lakehouse et un dossier de destination RawData.
3. Créer un notebook Fabric et charger des données externes
Créez un nouveau bloc-notes Fabric et connectez-vous à une source de données externe avec PySpark.
-
Dans le menu supérieur de Lakehouse, sélectionnez Ouvrir le bloc-notes > Nouveau bloc-notes , qui s’ouvrira une fois créé.
Conseil : Vous avez accès à l’explorateur Lakehouse à partir de ce bloc-notes et pouvez actualiser pour voir la progression à mesure que vous terminez cet exercice.
-
Dans la cellule par défaut, notez que le code est défini sur PySpark (Python) .
- Déclarer les paramètres pour la chaîne de connexion
- Construire la chaîne de connexion
- Lire des données dans un DataFrameInsérez le code suivant dans la cellule de code, ce qui :
-
# Azure Blob Storage access info blob_account_name = "azureopendatastorage" blob_container_name = "nyctlc" blob_relative_path = "yellow" # Construct connection path wasbs_path = f'wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}' print(wasbs_path) # Read parquet data from Azure Blob Storage path blob_df = spark.read.parquet(wasbs_path) -
Sélectionnez ▷ Run Cell à côté de la cellule de code pour vous connecter et lire les données dans un DataFrame.
Résultat attendu : votre commande doit réussir et s’imprimer
wasbs://nyctlc@azureopendatastorage.blob.core.windows.net/yellowRemarque : Une session Spark démarre lors de la première exécution du code, son exécution peut donc prendre plus de temps.
-
Pour écrire les données dans un fichier, vous avez maintenant besoin de ce chemin ABFS pour votre dossier RawData .
-
Insérez le code suivant dans une nouvelle cellule de code :
# Declare file name file_name = "yellow_taxi" # Construct destination path output_parquet_path = f"**InsertABFSPathHere**/{file_name}" print(output_parquet_path) # Load the first 1000 rows as a Parquet file blob_df.limit(1000).write.mode("overwrite").parquet(output_parquet_path) -
Ajoutez votre chemin RawData ABFS et sélectionnez ▷ Run Cell pour écrire 1 000 lignes dans un fichier yellow_taxi.parquet.
-
Votre chemin_parquet_sortie devrait ressembler à :
abfss://Spark@onelake.dfs.fabric.microsoft.com/DPDemo.Lakehouse/Files/RawData/yellow_taxi - Pour confirmer le chargement des données à partir de Lakehouse Explorer, sélectionnez Fichiers > … > Actualiser .
Vous devriez maintenant voir votre nouveau dossier RawData avec un « fichier » yellow_taxi.parquet – qui s’affiche sous la forme d’un dossier contenant des fichiers de partition dans .
4. Transformer et charger des données dans une table Delta
Il est probable que votre tâche d’ingestion de données ne se termine pas uniquement par le chargement d’un fichier. Les tables Delta dans un Lakehouse permettent des requêtes et un stockage évolutifs et flexibles, nous allons donc en créer un également.
-
Créez une nouvelle cellule de code et insérez le code suivant :
from pyspark.sql.functions import col, to_timestamp, current_timestamp, year, month # Read the parquet data from the specified path raw_df = spark.read.parquet(output_parquet_path) # Add dataload_datetime column with current timestamp filtered_df = raw_df.withColumn("dataload_datetime", current_timestamp()) # Filter columns to exclude any NULL values in storeAndFwdFlag filtered_df = filtered_df.filter(raw_df["storeAndFwdFlag"].isNotNull()) # Load the filtered data into a Delta table table_name = "yellow_taxi" # Replace with your desired table name filtered_df.write.format("delta").mode("append").saveAsTable(table_name) # Display results display(filtered_df.limit(1)) -
Sélectionnez ▷ Exécuter la cellule à côté de la cellule de code.
- Cela ajoutera une colonne d’horodatage dataload_datetime pour enregistrer le moment où les données ont été chargées dans une table Delta.
- Filtrer les valeurs NULL dans storeAndFwdFlag
- Charger les données filtrées dans une table Delta
- Afficher une seule ligne pour validation
-
Examinez et confirmez les résultats affichés, quelque chose de similaire à l’image suivante :
Vous êtes maintenant connecté avec succès aux données externes, vous les avez écrites dans un fichier Parquet, vous avez chargé les données dans un DataFrame, vous avez transformé les données et vous les avez chargées dans une table Delta.
5. Optimiser les écritures de la table Delta
Vous utilisez probablement du Big Data dans votre organisation et c’est pourquoi vous avez choisi les notebooks Fabric pour l’ingestion de données. Voyons donc également comment optimiser l’ingestion et les lectures de vos données. Tout d’abord, nous répéterons les étapes de transformation et d’écriture dans une table Delta avec les optimisations d’écriture incluses.
-
Créez une nouvelle cellule de code et insérez le code suivant :
from pyspark.sql.functions import col, to_timestamp, current_timestamp, year, month # Read the parquet data from the specified path raw_df = spark.read.parquet(output_parquet_path) # Add dataload_datetime column with current timestamp opt_df = raw_df.withColumn("dataload_datetime", current_timestamp()) # Filter columns to exclude any NULL values in storeAndFwdFlag opt_df = opt_df.filter(opt_df["storeAndFwdFlag"].isNotNull()) # Enable V-Order spark.conf.set("spark.sql.parquet.vorder.enabled", "true") # Enable automatic Delta optimized write spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true") # Load the filtered data into a Delta table table_name = "yellow_taxi_opt" # New table name opt_df.write.format("delta").mode("append").saveAsTable(table_name) # Display results display(opt_df.limit(1)) -
Confirmez que vous obtenez les mêmes résultats qu’avant le code d’optimisation.
Maintenant, prenez note des temps d’exécution des deux blocs de code. Vos temps varient, mais vous pouvez constater une nette amélioration des performances avec le code optimisé.
6. Analyser les données de la table Delta avec des requêtes SQL
Cet atelier se concentre sur l’ingestion de données, ce qui explique vraiment le processus d’extraction, de transformation et de chargement , mais il est également utile de prévisualiser les données.
-
Créez une nouvelle cellule de code et insérez le code ci-dessous :
# Load table into df delta_table_name = "yellow_taxi" table_df = spark.read.format("delta").table(delta_table_name) # Create temp SQL table table_df.createOrReplaceTempView("yellow_taxi_temp") # SQL Query table_df = spark.sql('SELECT * FROM yellow_taxi_temp') # Display 10 results display(table_df.limit(10)) -
Créez une autre cellule de code et insérez également ce code :
# Load table into df delta_table_name = "yellow_taxi_opt" opttable_df = spark.read.format("delta").table(delta_table_name) # Create temp SQL table opttable_df.createOrReplaceTempView("yellow_taxi_opt") # SQL Query to confirm opttable_df = spark.sql('SELECT * FROM yellow_taxi_opt') # Display results display(opttable_df.limit(10)) -
Maintenant, sélectionnez la flèche ▼ à côté du bouton Exécuter la cellule pour la première de ces deux requêtes, et dans la liste déroulante, sélectionnez Exécuter cette cellule et ci-dessous .
Cela exécutera les deux dernières cellules de code. Notez la différence de temps d’exécution entre l’interrogation de la table avec des données non optimisées et une table avec des données optimisées.
7. Nettoyer les ressources
Dans cet exercice, vous avez utilisé des notebooks avec PySpark dans Fabric pour charger des données et les enregistrer dans Parquet. Vous avez ensuite utilisé ce fichier Parquet pour transformer davantage les données et optimisé les écritures de la table Delta. Enfin, vous avez utilisé SQL pour interroger les tables Delta.
Une fois l’exploration terminée, vous pouvez supprimer l’espace de travail que vous avez créé pour cet exercice.
- Dans la barre de gauche, sélectionnez l’icône de votre espace de travail pour afficher tous les éléments qu’il contient.
- Dans le menu … de la barre d’outils, sélectionnez Paramètres de l’espace de travail .
- Dans la section Autre , sélectionnez Supprimer cet espace de travail .
Note : Source https://learn.microsoft.com/
Aller plus loin
L’analytique dans Microsoft Fabric vous interesse? Passez au niveau supérieur, obtenez la certification DP 600 en suivant le parcours de formation en ligne offert gratuitement par Microsoft sur son site d’apprentissage officiel.
Vous pouvez y accéder directement via ce lien😉 : https://learn.microsoft.com/fr-fr/credentials/certifications/exams/dp-600/
Créer un bouton pour exporter les données du visuel PowerBI vers Excel ou CSV
Découvrir la DataScience dans Microsoft Fabric
Organisez votre lakehouse Fabric à l’aide de la conception d’architecture en médaillon
Bien démarrer avec les entrepôts de données dans Microsoft Fabric
La version d’essai de Microsoft Fabric à nouveau disponible
Utiliser des tables Delta dans Apache Spark
Analysez les données avec Apache Spark
Bien démarrer avec les Lakehouses dans Fabric
Activez et utilisez Microsoft Fabric


