Blog Arolla

Craftsmanship et Data Engineering – Episode 1 : PySpark en TDD


“Il ne suffit pas qu'un logiciel soit fonctionnel, il faut qu’il soit bien conçu”. C’est la philosophie qui est la force motrice du Craft et qui fait partie de l’ADN de tout crafter. Cette philosophie qui nous encourage, tels les artisans que nous sommes, à construire des logiciels avec beaucoup de considération pour leur qualité. Et l’un des meilleurs moyens de veiller à la qualité d’un logiciel, c’est de le tester à différents niveaux (tests unitaires, tests fonctionnels, tests de bout en bout, etc.).

Mais ce processus de pensées ne se limite pas uniquement au développement logiciel. Comme toute philosophie, c’est une façon d’être que l’on choisit d’adopter dans un domaine particulier. Et ici, le domaine que je veux aborder est le data engineering, plus précisément, le test des pipelines de données Spark.

En effet, parmi les tâches quotidiennes d’un data engineer, on retrouve régulièrement la création de pipeline de données, Spark étant souvent utilisé pour y parvenir. Bien que le développement d’une pipeline de données pose des défis bien différents d’un processus de développement traditionnel, il n’en est pas moins que le test y est tout aussi important pour assurer la qualité de la pipeline.

Spark est nativement conçu en Scala mais possède une API pour Java, Python et R. Dans cet article, nous allons nous concentrer sur l’API Python plus connu sous le nom de PySpark.

Si tu es curieux de savoir comment tu pourrais faire du TDD pour créer une pipeline de données avec PySpark, le premier épisode de cette série sur le Craftsmanship et le Data Engineering est là pour t’accompagner dans ton aventure 😉

Sans plus attendre, sautons dans le bain !

Préréquis :

  • Avoir un compte GitLab (sinon, pas de panique, tu peux en créer un)
  • Avoir Git installé et configuré sur ton poste
  • Des connaissances de base sur Python, PySpark et pytest
  • Une bouteille d'eau, parce qu'en été il faut s'hydrater

Projet fil rouge

GitLab est un repository git au même titre que GitHub. La différence majeure entre les deux est que GitLab incorpore nativement des workflows de CI/CD et de DevOps, et c’est précisément ce qui va nous intéresser dans d’autres épisodes 😉

Afin de suivre les explications, connecte-toi sur GitLab et crée un projet vide du nom de ton choix. Je nommerai le mien Tester avec PySpark.

Voilà voilà ! Maintenant que ton projet est créé sur GitLab, tu peux le cloner et passer à la prochaine partie.

Installer pyspark et pytest

Pour la suite du tutoriel, nous aurons également besoin d’installer PySpark sur notre poste ainsi que la librairie python pyspark associée. Si tu recherches un bon moyen de configurer ton environnement PySpark, tu peux consulter le tutoriel ci-après qui donne de bonnes explications : https://www.tutorialspoint.com/pyspark/pyspark_environment_setup.htm

NB : Cette configuration est très importante pour l’utilisation de la librairie python pyspark, en particulier la bonne configuration des variables d’environnement SPARK_HOME, PYTHONPATH et PATH, ainsi que le fait de veiller à avoir java installé sur son poste

Ensuite, nous allons créer un fichier requirements.txt et y renseigner pyspark et la version désirée de la librairie. Faisons de même pour pytest

Je conseille de réaliser l'installation dans un environnement virtuel. Tu peux en créer un nommé .venv par exemple à l'aide de la commande

$ python -m venv .venv

Ensuite, tu peux alors l'activer et installer les librairies de requirements.txt :

$ source .venv/bin/activate
$ pip install -r requirements.txt

Construire une fonction d’ETL avec PySpark

Un ETL, diminutif de Extract-Transform-Load, a la vocation suivante comme l'indique son nom :

  1. Extraire les données d'une source
  2. Y appliquer des transformations
  3. Charger les données transformées à une destination

C'est un modèle de pipeline de données récurrent dans le Data Engineering et donc très intéressant comme exemple car il fait partie du quotidien 😉

Dans cette partie, nous allons supposer que nous avons un DataFrame contenant différentes ventes effectuées dans une pharmacie (désignation, date de vente, quantité).

Nous sommes Data Engineer pour cette pharmacie et nous avons besoin d’écrire une fonction de transformation qui regroupe les ventes par désignation afin de déterminer les quantités totales vendues pour chaque produit pharmaceutique.

Comme c’est de TDD qu’il s’agit ici, je vais commencer par montrer comment écrire le test pour cette fonction 😉

Petit rappel sur l'écriture de test
Un test est généralement composé de 3 parties : arrange, act et assert
Durant la phase “arrange”, on prépare le contexte nécessaire à l’exécution du test
Ensuite, dans la phase “act”, nous exécutons le comportement que nous voulons tester
Enfin, la phase “assert” réalise des assertions pour s’assurer que le comportement en question produit les résultats attendus

C’est un pattern de test très répandu

Tout d’abord, nous allons créer un fichier que nous allons nommer test_etl.py

Puisque lors des tests nous aurons besoin d’une session Spark, créons une fixture représentant la session Spark.

import pytest
from pyspark.sql import SparkSession


@pytest.fixture
def spark():
    return SparkSession.builder.master("local[*]").appName("test_app").getOrCreate()

Maintenant, créons le test pour notre transformation. Cela se fera pas à pas en plusieurs étapes au cours desquelles je vous guiderai.

Etape 1 : Tester un cas trivial

Nous allons commencer par tester un cas trivial de notre fonction. Testons que lorsque la fonction reçoit un DataFrame vide en entrée, elle retourne un DataFrame vide en sortie.

Lorsqu'on fait du TDD, il est souvent recommandé d'écrire la phase act en premier. Cela a l'avantage de nous faire penser tout de suite à l'essentiel (ce qu'on veut tester) et en déduire le contexte nécessaire (arrange) ainsi que les assertions (assert) selon le cas de figure envisagé dans le test.

Dans ce cas précis, notre objectif est de créer une fonction qui groupe les produits pharmaceutiques vendus par désignation de produit. Passons donc l'acte !

def test_group_by_product_with_empty_dataframe(spark):
    # Act
    transformed_df = group_by_product(input_df)

Voilà exactement à quoi ça ressemblerait d'utiliser notre fonction. Maintenant, tentons d'exécuter notre test tel quel avec la commande pytest.

Eh oui ! Il se casse la figure ! Et c'est tout à fait ce qu'on veut 😉 En effet, le premier lancement de test en TDD une fois que nous avons écrit l'acte est généralement supposé échouer : c'est la phase de red
Le message d'échec nous fait comprendre que nous n'avons pas encore créé la fonction group_by_product.

Nous devons ensuite écrire le code minimal permettant de corriger l'erreur qui s'est manifestée. Pour cela, créons la fonction group_by_product dans un fichier que nous appelerons etl.py et importons-la dans le test. Elle se contentera de retourner None dans un premier temps.

etl.py

def group_by_product(df):
    return None

test_etl.py

from etl import group_by_product

Maintenant, relançons le test !

Tu peux alors constater que le test échoue toujours mais que le message d'erreur est différent. Cette fois-ci, l'erreur est que le paramètre input_df utilisé par la fonction n'est pas défini. En d'autres termes, cet échec nous invite à créer le dataframe à utiliser pendant la phase d'act : c'est donc le moment de faire un arrange !

Faire un arrange nécessite de tenir compte deux choses : ce dont on a besoin pour la phase act et le contexte dans le cas de figure envisagé pour le test. Ici, la phase act nous a révélé que nous avions besoin d'un dataframe en paramètre de notre fonction. Ensuite, le cas de figure envisagé pour notre test trivial est celui d'un dataframe d'entrée vide. Enfin, la création d'un dataframe en elle-même nécessite des données et un schéma, tous les deux vides dans ce cas de figure.

Il s'ensuit donc la mise à jour suivante sur notre code de test :

def test_group_by_product_with_empty_dataframe(spark):
    # Arrange
    input_schema = StructType([])
    input_data = []
    input_df = spark.createDataFrame(data=input_data, schema=input_schema)

    # Act
    transformed_df = group_by_product(input_df)

Et là tadam ! Notre test passe (ENFIN) !

Il manque juste un dernier ingrédient, très essentiel. Tu t'en souviens bien ? 😉

La phase assert

Jusqu'ici nous n'avons fait que tester qu'il est possible d'appeler la fonction avec un dataframe vide. Maintenant, la dernière question est de savoir si le résultat est bel et bien un dataframe vide. Etant donné la manière dont les dataframes sont constitués, nous allons vérifier à la fois le schéma et les données afin de s'assurer qu'ils sont tous les deux vides.

def test_group_by_product_with_empty_dataframe(spark):
    # Arrange
    input_schema = StructType([])
    input_data = []
    input_df = spark.createDataFrame(data=input_data, schema=input_schema)

    # Act
    transformed_df = group_by_product(input_df)

    # Assert
    assert not transformed_df.schema
    assert transformed_df.count() == 0

Cela produira un nouvel échec de pytest car la fonction group_by_product retourne None et pas un dataframe vide comme attendu ici, ce qu'on résoudra en écrivant à nouveau le code minimal permettant de faire passer le test :

def group_by_product(df):
    return df

Tu remarqueras qu'on se contente de retourner le dataframe - ici vide - afin d'obtenir un dataframe... vide.
Tu peux relancer pytest. Le test passera sans accroc 🙂

Et voilà pour le cas trivial !

Même si au premier abord développer de cette manière peut paraître très enfantin en raison de la décomposition de travail très (parfois très très) fine, cette manière de procéder possède l'avantage de nous amener à nous poser les bonnes questions dès le départ sur la fonction que nous créons en partant de ce qu'on veut tester vers ce dont on a besoin pour le tester. N'oublie pas, nous sommes des crafters ! Notre objectif est de bien concevoir notre solution.

Etape 2 : Tester un cas ordinaire

Maintenant que nous avons testé un cas trivial, nous avons pu construire une base pour notre fonction group_by_product. Maintenant nous allons tester un cas ordinaire où nous avons en entrée un dataframe valide contenant les ventes de médicaments et nous voulons en sortie un dataframe faisant le groupement de ses ventes par produit pharmaceutique.

L'acte est exactement le même qu'à l'étape précédente.

def test_group_by_product(spark):
    transformed_df = group_by_product(input_df)

Et ceci nous amène à créer un dataframe factice représentant nos données d'origine pour les besoins du test.

def test_group_by_product(spark):
    # Arrange :
    # DataFrame representing our pharmaceutical sells data
    input_schema = StructType(
        [
            StructField("SellID", IntegerType(), True),
            StructField("Name", StringType(), True),
            StructField("Date", StringType(), True),
            StructField("Quantity", IntegerType(), True),
        ]
    )

    input_data = [
        (1, "Paracetamol", "2022-01-28", 5),
        (2, "Doliprane", "2022-01-28", 7),
        (3, "Paracetamol", "2022-01-28", 7),
        (4, "Doliprane", "2022-02-01", 9),
        (5, "Doliprane", "2022-02-01", 1),
        (6, "Paracetamol", "2022-02-02", 3),
        (7, "Probiolog", "2022-02-03", 3),
    ]
    input_df = spark.createDataFrame(data=input_data, schema=input_schema)

    # Act:
    # Apply grouping function(transformation)
    transformed_df = group_by_product(input_df)

De façon similaire à l'étape 1, la création d'un dataframe nécessite dans un premier temps de créer un schéma (input_schema), ce qui implique d’importer les classes StructType, StructField, IntegerType et StringType de pyspark.sql.types.

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

Si nous lançons pytest pour l'instant, tous nos tests passent.

Mais notre fonction de transformation ne fait absolument rien pour le moment. Et tu te doutes bien que nous n’allons pas en rester là !

Maintenant, nous allons passer à la phase d’assert de notre test. Plus précisément, nous allons faire des assertions pour vérifier que la fonction de transformation effectue bien les groupements comme prévus en comparant le dataframe retourné par la fonction au dataframe attendu. Pour cela, nous allons comparer leurs schémas et les données qu’ils contiennent.

Commençons par créer le dataframe attendu en sortie de la fonction de groupement :

	# Expected DataFrame output of the grouping transformation
	expected_schema = StructType(
		[
			StructField("Name", StringType(), True),
			StructField("TotalQuantity", LongType(), True),
		]
	)

	expected_data = [
		("Doliprane", 17),
		("Paracetamol", 15),
		("Probiolog", 3),
	]
	expected_df = spark.createDataFrame(data=expected_data, schema=expected_schema)

Ensuite, procédons à la comparaison entre le dataframe attendu et celui produit par group_by_product

field_list = lambda fields: (fields.name, fields.dataType, fields.nullable)
transformed_fields = [*map(field_list, transformed_df.schema.fields)]
expected_fields = [*map(field_list, expected_df.schema.fields)]

assert set(transformed_fields) == set(expected_fields)  # schema comparison
assert sorted(transformed_df.collect()) == sorted(
    expected_df.collect()
)  # data comparison

Nous obtenons finalement le code suivant pour notre test dans le cas de figure ordinaire :

def test_group_by_product(spark):
    # Arrange :
    # DataFrame representing our pharmaceutical sells data
    input_schema = StructType(
        [
            StructField("SellID", IntegerType(), True),
            StructField("Name", StringType(), True),
            StructField("Date", StringType(), True),
            StructField("Quantity", IntegerType(), True),
        ]
    )

    input_data = [
        (1, "Paracetamol", "2022-01-28", 5),
        (2, "Doliprane", "2022-01-28", 7),
        (3, "Paracetamol", "2022-01-28", 7),
        (4, "Doliprane", "2022-02-01", 9),
        (5, "Doliprane", "2022-02-01", 1),
        (6, "Paracetamol", "2022-02-02", 3),
        (7, "Probiolog", "2022-02-03", 3),
    ]
    input_df = spark.createDataFrame(data=input_data, schema=input_schema)

    # Act:
    # Apply grouping function(transformation)
    transformed_df = group_by_product(input_df)

    # Assert:
    # Assert the output of the transformation is the same as the expected dataframe

    # Expected DataFrame output of the grouping transformation
    expected_schema = StructType(
        [
            StructField("Name", StringType(), True),
            StructField("TotalQuantity", LongType(), True),
        ]
    )

    expected_data = [
        ("Doliprane", 17),
        ("Paracetamol", 15),
        ("Probiolog", 3),
    ]
    expected_df = spark.createDataFrame(data=expected_data, schema=expected_schema)

    field_list = lambda fields: (fields.name, fields.dataType, fields.nullable)
    transformed_fields = [*map(field_list, transformed_df.schema.fields)]
    expected_fields = [*map(field_list, expected_df.schema.fields)]

    assert set(transformed_fields) == set(expected_fields)  # schema comparison
    assert sorted(transformed_df.collect()) == sorted(
        expected_df.collect()
    )  # data comparison

Lorsque nous lançons pytest, le test se remet à échouer pour la simple raison que notre fonction pour l’instant se contente de retourner None et n’adopte pas le comportement attendu par les assertions.

Écrivons le code faisant passer ces assertions dans etl.py.

def group_by_product(df):
    if df.count() == 0:
        return df
    grouped_df = df.groupBy(
        "Name",
    ).agg(sum("Quantity").alias("TotalQuantity"))
    return grouped_df

NB : Le morceau de code if df.count() == 0: permet de nous assurer que le code continue de passer le test trivial.

Champomy pour tout le monde ! Le test passe et notre pipeline fonctionne comme on veut !

Tu peux retrouver le code complet de ce que nous avons fait ensemble ici.

Take Away

Développer en TDD permet d'établir dès le départ une réflexion orientée vers les objectifs de notre code. Les tests servent à la fois à nous protéger de la correction et à nous documenter sur ce qu'est supposé faire notre code.
Maintenant que nous avons développé notre fonction de transformation en TDD, que dirais-tu d’automatiser les tests à l’aide d’une CI ? Je te parlerai de tout cela dans le prochain épisode. A très bientôt 😉

Consultant Python/Data à Arolla | Plus de publications

Comments are closed.