arrow_back

Traitement de données sans serveur avec Dataflow : pipelines d'analyse par lot avec Dataflow (Python)

Accédez à plus de 700 ateliers et cours

Traitement de données sans serveur avec Dataflow : pipelines d'analyse par lot avec Dataflow (Python)

Atelier 2 heures universal_currency_alt 5 crédits show_chart Avancé
info Cet atelier peut intégrer des outils d'IA pour vous accompagner dans votre apprentissage.
Accédez à plus de 700 ateliers et cours

Présentation

Au cours de cet atelier, vous allez :

  • écrire un pipeline qui agrège le trafic de site par utilisateur ;
  • écrire un pipeline qui agrège le trafic de site par minute ;
  • implémenter le fenêtrage sur des données de séries temporelles.

Prérequis

REMARQUE : Cet atelier est d'un niveau avancé et nécessite une bonne connaissance de Python.

Préparation

Pour chaque atelier, nous vous attribuons un nouveau projet Google Cloud et un nouvel ensemble de ressources pour une durée déterminée, sans frais.

  1. Connectez-vous à Qwiklabs dans une fenêtre de navigation privée.

  2. Vérifiez le temps imparti pour l'atelier (par exemple : 01:15:00) : vous devez pouvoir le terminer dans ce délai.
    Une fois l'atelier lancé, vous ne pouvez pas le mettre en pause. Si nécessaire, vous pourrez le redémarrer, mais vous devrez tout reprendre depuis le début.

  3. Lorsque vous êtes prêt, cliquez sur Démarrer l'atelier.

  4. Notez vos identifiants pour l'atelier (Nom d'utilisateur et Mot de passe). Ils vous serviront à vous connecter à Google Cloud Console.

  5. Cliquez sur Ouvrir la console Google.

  6. Cliquez sur Utiliser un autre compte, puis copiez-collez les identifiants de cet atelier lorsque vous y êtes invité.
    Si vous utilisez d'autres identifiants, des messages d'erreur s'afficheront ou des frais seront appliqués.

  7. Acceptez les conditions d'utilisation et ignorez la page concernant les ressources de récupération des données.

Partie A : configurer l'environnement de développement des instances Workbench

Dans cet atelier, vous allez exécuter toutes les commandes dans un terminal intégré au notebook de votre instance Workbench.

  1. Dans le menu de navigation (Menu de navigation) de la console Google Cloud, sélectionnez Vertex AI ou accédez au tableau de bord Vertex AI.

  2. Cliquez sur Activer toutes les API recommandées. Vérifions maintenant que l'API Notebook est activée.

  3. Dans le menu de navigation, cliquez sur Workbench.

    En haut de la page "Workbench", vérifiez que vous vous trouvez dans la vue Instances.

  4. Cliquez sur Créer (boîte de dialogue d'ajout).

  5. Configurez l'instance :

Nom Région Zone Options avancées (facultatif)
lab-workbench Si nécessaire, cliquez sur "Options avancées" pour personnaliser davantage l'instance (par exemple, type de machine, taille du disque).
  1. Cliquez sur Créer.

La création de l'instance prend quelques minutes. Une coche verte apparaît à côté de son nom quand elle est prête.

  1. Cliquez sur Ouvrir JupyterLab à côté du nom de l'instance pour lancer l'interface JupyterLab. Un nouvel onglet s'ouvre alors dans votre navigateur.

  2. Cliquez ensuite sur Terminal. Le terminal qui s'affiche vous permet d'exécuter toutes les commandes de cet atelier.

Télécharger le dépôt de code

Maintenant, vous allez télécharger le dépôt de code que vous utiliserez dans cet atelier.

  1. Dans le terminal que vous venez d'ouvrir, saisissez le code suivant :
git clone https://github.com/GoogleCloudPlatform/training-data-analyst cd /home/jupyter/training-data-analyst/quests/dataflow_python/
  1. Dans le panneau de gauche de votre environnement de notebook, dans l'explorateur de fichiers, vous pouvez noter que le dépôt training-data-analyst a été ajouté.

  2. Accédez au dépôt cloné /training-data-analyst/quests/dataflow_python/. Chaque atelier correspond à un dossier, divisé en deux sous-dossiers : le premier, intitulé lab, contient du code que vous devez compléter, tandis que le second, nommé solution, comporte un exemple concret que vous pouvez utiliser si vous rencontrez des difficultés.

Option "Explorer" mise en évidence dans le menu "Affichage" développé

Remarque : Si vous souhaitez ouvrir un fichier pour le modifier, il vous suffit de le trouver et de cliquer dessus. Une fois ce fichier ouvert, vous pourrez modifier le code ou en ajouter.

Cliquez sur Vérifier ma progression pour valider l'objectif. Créer une instance de notebook et cloner le dépôt du cours

Tâche 1 : agréger le trafic de site par utilisateur

Dans cette partie de l'atelier, vous allez écrire un pipeline qui :

  1. lit le trafic de la journée à partir d'un fichier dans Cloud Storage ;
  2. convertit chaque événement en objet CommonLog ;
  3. additionne le nombre de hits pour chaque utilisateur unique en regroupant les objets par ID utilisateur et en combinant les valeurs pour obtenir le nombre total de hits pour cet utilisateur ;
  4. effectue des agrégations supplémentaires sur chaque utilisateur ;
  5. écrit les données obtenues dans BigQuery.

Tâche 2 : générer des données synthétiques

Comme dans les ateliers précédents, la première étape consiste à générer les données que le pipeline va traiter. Vous allez ouvrir l'environnement de l'atelier et générer les données comme précédemment :

Ouvrir l'atelier approprié

  • Dans le terminal de votre environnement IDE, exécutez les commandes suivantes :
# Change directory into the lab cd 3_Batch_Analytics/lab export BASE_DIR=$(pwd)

Configurer l'environnement virtuel et les dépendances

Avant de pouvoir modifier le code du pipeline, assurez-vous d'avoir installé les dépendances nécessaires.

  1. Exécutez la commande suivante pour créer un environnement virtuel pour cet atelier :
sudo apt-get update && sudo apt-get install -y python3-venv # Create and activate virtual environment python3 -m venv df-env source df-env/bin/activate
  1. Installez ensuite les packages dont vous aurez besoin pour exécuter votre pipeline :
python3 -m pip install -q --upgrade pip setuptools wheel python3 -m pip install apache-beam[gcp]
  1. Vérifiez que l'API Dataflow est activée :
gcloud services enable dataflow.googleapis.com

Configurer l'environnement de données

# Create GCS buckets and BQ dataset cd $BASE_DIR/../.. source create_batch_sinks.sh # Generate event dataflow source generate_batch_events.sh # Change to the directory containing the practice version of the code cd $BASE_DIR

Le script crée un fichier nommé events.json contenant des lignes semblables à ce qui suit :

{"user_id": "-6434255326544341291", "ip": "192.175.49.116", "timestamp": "2019-06-19T16:06:45.118306Z", "http_request": "\"GET eucharya.html HTTP/1.0\"", "lat": 37.751, "lng": -97.822, "http_response": 200, "user_agent": "Mozilla/5.0 (compatible; MSIE 7.0; Windows NT 5.01; Trident/5.1)", "num_bytes": 182}

Il copie ensuite automatiquement ce fichier dans votre bucket Google Cloud Storage à l'adresse .

  • Accédez à Google Cloud Storage et vérifiez que votre bucket de stockage contient bien un fichier nommé "events.json".

Cliquez sur Vérifier ma progression pour valider l'objectif. Générer des données synthétiques

Tâche 3 : compter les pages vues par utilisateur

Cette tâche comporte deux minichallenges. Vous pouvez consulter la solution.

Dans l'explorateur de fichiers, accédez au chemin d'accès ci-dessous et ouvrez le fichier batch_user_traffic_pipeline.py.

training-data-analyst/quests/dataflow_python/3_Batch_Analytics/lab

Minichallenge – #TODO 1 :

Ce pipeline contient déjà le code nécessaire pour accepter des options de ligne de commande (chemin d'entrée, nom de table de sortie) ainsi que le code permettant de lire les événements depuis Google Cloud Storage, de les analyser et d'écrire les résultats dans BigQuery. Cependant, il manque des éléments importants, identifiés par la mention #TODO.

Voici une tâche de modélisation des données. Dans cette étape, après avoir regroupé tous les événements de journal par utilisateur, vous devez déterminer les informations à calculer pour chacun d'eux.

Vous devez définir la structure (le schéma) qui contiendra les résultats de votre agrégation. Vous devez examiner la classe CommonLog pour voir les champs que vous pouvez agréger.

Comment faire ?

  1. Identifiez la clé : le nom de la classe est PerUserAggregation, la principale information à conserver est donc user_id.

  2. Choisissez les métriques à calculer : que pouvez-vous calculer à partir de la collection des entrées CommonLog d'un utilisateur ?

    • Un nombre : combien de fois l'utilisateur a-t-il accédé au serveur ?
    • Une somme : quelle quantité totale d'octets (num_bytes) cet utilisateur a-t-il téléchargée ?
    • Une valeur min/max : quels sont les codes temporels correspondant à la première et à la dernière activité ?
  3. Par exemple :

user_id: str page_views: int ...

Minichallenge – #TODO 2 :

Voici une exigence technique du framework Apache Beam. Le but de ce challenge est de voir si vous savez comment Beam gère les types de données personnalisés.

Lorsque Apache Beam exécute un pipeline, il doit souvent envoyer des données entre différents ordinateurs (appelés nœuds de calcul). Pour ce faire, il doit sérialiser votre objet Python (par exemple, une instance PerUserAggregation) sous forme de flux d'octets, l'envoyer sur le réseau, puis le désérialiser pour qu'il reprenne sa forme initiale une fois arrivé à destination. Pour effectuer ces opérations, Beam se base sur un objet Coder.

Si vous n'indiquez pas à Beam comment encoder/décoder votre classe PerUserAggregation personnalisée, le pipeline échouera et générera une erreur.

Comment faire ?

La solution se trouve sur la ligne juste au-dessus des #TODO. Beam fournit un objet RowCoder qui fonctionne parfaitement avec les classes NamedTuple. Il vous suffit d'enregistrer un RowCoder pour votre nouvelle classe PerUserAggregation, comme vous l'avez fait pour CommonLog.

  1. Par exemple :
beam.coders.registry.register_coder(PerUserAggregation, ...)

Tâche 4 : exécuter le pipeline

Revenez au terminal et exécutez la commande suivante pour lancer votre pipeline à l'aide du service Cloud Dataflow. Si vous rencontrez des difficultés, vous pouvez l'exécuter avec DirectRunner ou consulter la solution.

  • Dans l'extrait de code ci-dessous, remplacez les champs ENTER_REGION_ID et ENTER_ZONE_ID par les valeurs indiquées dans le tableau ci-dessous.
Région Zone
  • Remplacez les valeurs de la région et de la zone en vous reportant aux spécifications de votre atelier.
# 1. Set all environment variables export PROJECT_ID=$(gcloud config get-value project) export REGION=ENTER_REGION_ID export ZONE=ENTER_ZONE_ID export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export RUNNER=DataflowRunner export INPUT_PATH=${PIPELINE_FOLDER}/events.json export TABLE_NAME=${PROJECT_ID}:logs.user_traffic # 2. Double-check the input file exists # This command should NOT return an error. echo "Verifying input file exists at ${INPUT_PATH}..." gcloud storage ls ${INPUT_PATH} # 3. Execute the pipeline script echo "Running the Dataflow pipeline..." python3 batch_user_traffic_pipeline.py \ --project=${PROJECT_ID} \ --region=${REGION} \ --worker_zone=${ZONE} \ --staging_location=${PIPELINE_FOLDER}/staging \ --temp_location=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --input_path=${INPUT_PATH} \ --table_name=${TABLE_NAME}
  • Vous pouvez consulter le job que vous avez envoyé dans le tableau de bord Dataflow.

  • Une fois que le job s'est exécuté correctement, vérifiez les résultats dans BigQuery. Pour effectuer cette tâche, attendez que le pipeline se termine (l'opération prend quelques minutes), puis accédez à BigQuery et interrogez la table user_traffic.

Cliquez sur Vérifier ma progression pour valider l'objectif. Agréger le trafic de site par utilisateur et exécuter le pipeline

Partie B : agréger le trafic de site par minute

Dans cette partie de l'atelier, vous allez créer un pipeline nommé batch_minute_traffic. Ce pipeline s'appuie sur les principes de base de l'analyse par lots utilisés dans batch_user_traffic. Cependant, au lieu d'agréger les données par utilisateur sur l'ensemble du lot, il agrège les événements en fonction du moment où ils se sont produits. Il y a encore plusieurs #TODO que vous devez corriger. Vous pouvez vous aider de la solution.

Dans l'IDE, accédez au chemin ci-dessous et ouvrez le fichier batch_minute_traffic_pipeline.py.

3_Batch_Analytics/lab Remarque : Avant de commencer à travailler avec le script principal (batch_minute_traffic_pipeline.py), veillez à parcourir les deux fichiers d'aide pipeline_utils.py et setup.py. Nous allons nous en servir pour exécuter les jobs Dataflow. En effet, pipeline_utils contient une logique Python personnalisée (classes et fonctions) dans un module distinct qui peut être empaquetée et envoyée aux nœuds de calcul Dataflow distants. Le fichier setup.py sert de manuel d'instructions pour Dataflow : il lui indique exactement comment convertir votre fichier pipeline_utils.py en package pouvant être installé sur chaque nœud de calcul.

Tâche 5 : ajouter des codes temporels à chaque élément

Ces éléments #TODO vous guident pour créer un pipeline d'agrégation de séries temporelles classique dans Apache Beam. Chacun correspond à un concept de base du traitement par lot et du traitement par flux. L'objectif de ce pipeline est de traiter un fichier JSON d'événements Web (events.json), de compter le nombre d'événements qui se produisent chaque minute et de reporter ces données dans une table BigQuery.

Le flux du pipeline se présente comme suit : Lecture du texte -> Analyse dans CommonLog -> Éléments TODO -> Écriture dans BigQuery

Les éléments #TODO correspondent au concept de base du pipeline (logique d'agrégation).

Minichallenge – #TODO 1 :

Vous disposez d'une collection d'objets CommonLog. L'étape suivante du pipeline consiste à regrouper ces événements par heure (WindowByMinute). Pour ce faire, Apache Beam doit connaître l'heure d'événement de chaque donnée. Le challenge consiste à indiquer à Beam comment trouver ce code temporel dans votre objet CommonLog.

Comment faire ?

  1. Pour effectuer le fenêtrage, la fonction add_timestamp (définie dans pipeline_utils.py) analyse la chaîne de code temporel de chaque enregistrement de journal et l'associe à l'élément pour que Beam puisse la reconnaître.

  2. Par exemple :

| 'AddEventTimestamp' >> beam.Map(...)

Tâche 6 : définir des fenêtres d'une minute

Passons à la deuxième tâche du script, qui consiste à transformer les éléments des groupes pour atteindre les objectifs du challenge.

Minichallenge – #TODO 2 :

  1. Cette transformation regroupe les éléments dans des fenêtres fixes et sans chevauchement de 60 secondes (une minute) en fonction du code temporel des événements.

  2. Par exemple :

| "WindowByMinute" >> beam.WindowInto(beam.window.FixedWindows(...))
  1. Pour en savoir plus sur les autres types de fenêtrage, consultez la section 8.2 de la documentation Apache Beam, consacrée aux fonctions de fenêtrage disponibles.

Tâche 7 : compter les événements par fenêtre

Passons à la troisième tâche du script, qui consiste à compter les événements de chaque fenêtre.

Minichallenge – #TODO 3 :

  1. Cet argument "combiner" compte le nombre d'éléments intervenus dans chaque fenêtre d'une minute. Utilisez .without_defaults() pour qu'aucun résultat ne soit généré en cas de fenêtre vide.

  2. Par exemple :

| "CountPerMinute" >> beam.CombineGlobally(CountCombineFn())...()
  1. Pour effectuer cette tâche, ajoutez une transformation qui compte tous les éléments de chaque fenêtre. N'hésitez pas à consulter la solution si vous rencontrez des difficultés.

Tâche 8 : reconvertir les éléments en objets Row et ajouter un code temporel

Passons à la dernière tâche du script, qui consiste à reconvertir les données en ligne et à ajouter un code temporel.

Minichallenge – #TODO 4 :

  1. La fonction GetTimestampFn (définie dans pipeline_utils.py) récupère le nombre entier correspondant à chaque fenêtre et le met en forme dans un dictionnaire, en ajoutant l'heure de début de la fenêtre sous forme de chaîne pour correspondre au schéma BigQuery.

  2. Pour effectuer cette tâche, écrivez une fonction ParDo qui accepte des éléments de type "int" et transmet le paramètre supplémentaire pour accéder aux informations de la fenêtre. Notez que le champ "timestamp" du schéma de table BigQuery est de type STRING. Vous devrez donc convertir le code temporel en chaîne.

  3. Par exemple :

| "AddWindowTimestamp" >> beam.ParDo(...()) | 'WriteToBQ' >> beam.io.WriteToBigQuery( table_name, schema=table_schema, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE )

Tâche 9 : exécuter le pipeline

Une fois que vous avez terminé de coder, exécutez le pipeline à l'aide de la commande ci-dessous. N'oubliez pas que lorsque vous testez le code, il est beaucoup plus rapide de remplacer la variable d'environnement RUNNER par DirectRunner, qui exécute le pipeline en local. Pour l'instant, nous exécutons le pipeline avec Dataflow.

  • Remplacez les valeurs de la région et de la zone en vous reportant aux spécifications de votre atelier.
Région
export PROJECT_ID=$(gcloud config get-value project) export REGION=ENTER_REGION_ID export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export RUNNER=DataflowRunner export INPUT_PATH=${PIPELINE_FOLDER}/events.json export TABLE_NAME=${PROJECT_ID}:logs.minute_traffic python3 batch_minute_traffic_pipeline.py \ --project=${PROJECT_ID} \ --region=${REGION} \ --staging_location=${PIPELINE_FOLDER}/staging \ --temp_location=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --input_path=${INPUT_PATH} \ --table_name=${TABLE_NAME} \ --setup_file=./setup.py

Tâche 10 : vérifier les résultats

  • Pour effectuer cette tâche, attendez que le pipeline se termine (l'opération prend quelques minutes), puis accédez à BigQuery et interrogez la table minute_traffic.

Cliquez sur Vérifier ma progression pour valider l'objectif. Agréger le trafic du site par minute et exécuter le pipeline

Terminer l'atelier

Une fois l'atelier terminé, cliquez sur Terminer l'atelier. Google Cloud Skills Boost supprime les ressources que vous avez utilisées, puis efface le compte.

Si vous le souhaitez, vous pouvez noter l'atelier. Sélectionnez un nombre d'étoiles, saisissez un commentaire, puis cliquez sur Envoyer.

Le nombre d'étoiles correspond à votre degré de satisfaction :

  • 1 étoile = très insatisfait(e)
  • 2 étoiles = insatisfait(e)
  • 3 étoiles = ni insatisfait(e), ni satisfait(e)
  • 4 étoiles = satisfait(e)
  • 5 étoiles = très satisfait(e)

Si vous ne souhaitez pas donner votre avis, vous pouvez fermer la boîte de dialogue.

Pour soumettre des commentaires, suggestions ou corrections, veuillez accéder à l'onglet Assistance.

Copyright 2020 Google LLC Tous droits réservés. Google et le logo Google sont des marques de Google LLC. Tous les autres noms d'entreprises et de produits peuvent être des marques des entreprises auxquelles ils sont associés.

Avant de commencer

  1. Les ateliers créent un projet Google Cloud et des ressources pour une durée déterminée.
  2. Les ateliers doivent être effectués dans le délai imparti et ne peuvent pas être mis en pause. Si vous quittez l'atelier, vous devrez le recommencer depuis le début.
  3. En haut à gauche de l'écran, cliquez sur Démarrer l'atelier pour commencer.

Utilisez la navigation privée

  1. Copiez le nom d'utilisateur et le mot de passe fournis pour l'atelier
  2. Cliquez sur Ouvrir la console en navigation privée

Connectez-vous à la console

  1. Connectez-vous à l'aide des identifiants qui vous ont été attribués pour l'atelier. L'utilisation d'autres identifiants peut entraîner des erreurs ou des frais.
  2. Acceptez les conditions d'utilisation et ignorez la page concernant les ressources de récupération des données.
  3. Ne cliquez pas sur Terminer l'atelier, à moins que vous n'ayez terminé l'atelier ou que vous ne vouliez le recommencer, car cela effacera votre travail et supprimera le projet.

Ce contenu n'est pas disponible pour le moment

Nous vous préviendrons par e-mail lorsqu'il sera disponible

Parfait !

Nous vous contacterons par e-mail s'il devient disponible

Un atelier à la fois

Confirmez pour mettre fin à tous les ateliers existants et démarrer celui-ci

Utilisez la navigation privée pour effectuer l'atelier

Ouvrez une fenêtre de navigateur en mode navigation privée pour effectuer cet atelier. Vous éviterez ainsi les conflits entre votre compte personnel et le compte temporaire de participant, qui pourraient entraîner des frais supplémentaires facturés sur votre compte personnel.