

Version texte du graphique ci-dessus
Cette liste de commandes vise à donner une vision de tout le cycle de vie des données sur la plateforme Databricks et à servir de guide d’étude pour les aspirants ingénieurs de données.
Préparation
Avant chaque projet, l’essentiel est de nettoyer l’espace de travail. Databricks possède son propre système de fichiers à la linux
- Clean de l’espace de travail
dbutils.fs.rm('aws-lakehouse-devday/lakehouse', True)
dbutils.fs.mkdirs('aws-lakehouse-devday/lakehouse')
- Copier les données dans un espace de travail
%fs CP r /repertoire/origin /monWorkspace/repertoire/destination
- RESET de la base de données
%sql DROP DATABASE IF EXISTS bddKomeet CASCADE %sql CREATE DATABASE bddKomeet
- Check des données
%fs ls /monWorkspace/repertoire/destination
Récupération des données
Il existe différentes façons de récupérer des données dans Databricks, choisis celle qui te convient le mieux
- Convert to Delta
%sql CONVERT TO DELTA parquet.`/repertoire/data`; SELECT * FROM delta.`/repertoire/data`
- Copie directement au format Delta
%sql COPY INTO delta.`/repertoire/data` FROM '/repertoire/data' FILEFORMAT = PARQUET;
- Convertir JSON en Delta automatiquement
%sql USE maBaseDeDonnee; CREATE TABLE IF NOT EXISTS maTableSilver USING delta AS SELECT * FROM json.`/mesDonneesIOT.json`;
- Delta Auto Loader : récupération des données en temps réel à partir du dépôt des fichiers dans le bucket et lecture consistent
schema = StructType([…])
iot_df = spark.readStream.format("cloudFiles") \
.option("cloudFiles.format", "json") \
.schema(schema) \
.load("/repertoire/de/donnees")
- Activer l’évolution automatique du schéma
%sql
SET spark.databricks.delta.schema.autoMerge.enabled = true
Unification
- MERGE INTO (Upsert)
%sql
MERGE INTO tableSilver
USING tableBronze ON tableBronze.id_societe = tableSilver.id_societe
WHEN MATCHED THEN UPDATE SET date_modif = current_date()
WHEN NOT MATCHED THEN INSERT (id_societe, date_creation) VALUES (id_societe, current_date());
- Time travel
- Lister l’historique :
%sql
DESCRIBE HISTORY tableSilver
- Deux manières pour voyager dans le temps :
-> par version :
%sql
SELECT * FROM tableSilver VERSION AS OF 1
-> par temps exact :
%sql
SELECT * FROM tableSilver TIMESTAMP AS OF '2022-06-03T04:52:55.000+0000'
Analyse
- SQL
%sql
CREATE TABLE IF NOT EXISTS tableGold
USING delta AS
SELECT
id_societe,
ville,
effectif
FROM tableBronze
GROUP BY ROLLUP id_societe, ville, effectif)
ORDER BY effectif ASC, ville DESC
IA
- Entraînement
import nltk
from nltk.sentiment.vader import *
from pyspark.sql.types import *
from pyspark.sql.functions import col
def sentimentNLTK(text):
nltk.download('vader_lexicon')
return SentimentIntensityAnalyzer().polarity_scores(text)
spark.udf.register('analyzeSentiment', sentimentNLTK, MapType(StringType(), DoubleType()))
Optimisation
- Classement et regroupement des données
%sql
OPTIMIZE tableSilver ZORDER BY id_societe
- Delta Clone
Shallow : copie des metadonnées d’une table. Idéal pour tester et créer des tables staging.
%sql
CREATE TABLE tableDestination SHALLOW CLONE delta.`/repertoire/data`;
Deep : les metadonnées et les données sont copiés dans la table destination. Idéal pour archiver, migrer ou partager les données via Data Sharing.
%sql
CREATE TABLE tableDestination CLONE delta.`/repertoire/data`;
- Suppression automatique – RGPD
%sql
VACUUM tableBronze RETAIN 720 HOURS
Exposition
Delta Sharing nous permet de partager des données avec un destinataire externe sans créer de copie des données. Une fois autorisés, les destinataires peuvent accéder et télécharger directement vos données.
- Delta Sharing
- Enregistrer une table Delta Lake sur le serveur Delta Sharing
- Création d’un RECIPIENT : consommateur de la table à partager
- Création d’un SHARE : liste des tables à partager
- GRANT SELECT : assigner les droits de consultation aux tables pour les consommateurs
- Une fois la table partagée, le consommateur peut réaliser des requêtes aux tables