arrow_back

Traitement de données sans serveur avec Dataflow : écrire un pipeline ETL à l'aide d'Apache Beam et de Dataflow (Java)

Accédez à plus de 700 ateliers et cours

Traitement de données sans serveur avec Dataflow : écrire un pipeline ETL à l'aide d'Apache Beam et de Dataflow (Java)

Atelier 1 heure 30 minutes universal_currency_alt 5 crédits show_chart Intermédiaire
info Cet atelier peut intégrer des outils d'IA pour vous accompagner dans votre apprentissage.
Accédez à plus de 700 ateliers et cours

Présentation

Au cours de cet atelier, vous allez :

  • créer un pipeline d'extraction, de transformation et de chargement (ETL, Extract-Transform-Load) par lot dans Apache Beam, qui extrait les données brutes de Google Cloud Storage et les écrit dans Google BigQuery ;
  • exécuter le pipeline Apache Beam sur Dataflow ;
  • paramétrer l'exécution du pipeline.

Prérequis :

  • Connaître les bases de Java

Préparation

Pour chaque atelier, nous vous attribuons un nouveau projet Google Cloud et un nouvel ensemble de ressources pour une durée déterminée, sans frais.

  1. Connectez-vous à Qwiklabs dans une fenêtre de navigation privée.

  2. Vérifiez le temps imparti pour l'atelier (par exemple : 01:15:00) : vous devez pouvoir le terminer dans ce délai.
    Une fois l'atelier lancé, vous ne pouvez pas le mettre en pause. Si nécessaire, vous pourrez le redémarrer, mais vous devrez tout reprendre depuis le début.

  3. Lorsque vous êtes prêt, cliquez sur Démarrer l'atelier.

  4. Notez vos identifiants pour l'atelier (Nom d'utilisateur et Mot de passe). Ils vous serviront à vous connecter à Google Cloud Console.

  5. Cliquez sur Ouvrir la console Google.

  6. Cliquez sur Utiliser un autre compte, puis copiez-collez les identifiants de cet atelier lorsque vous y êtes invité.
    Si vous utilisez d'autres identifiants, des messages d'erreur s'afficheront ou des frais seront appliqués.

  7. Acceptez les conditions d'utilisation et ignorez la page concernant les ressources de récupération des données.

Activer Google Cloud Shell

Google Cloud Shell est une machine virtuelle qui contient de nombreux outils pour les développeurs. Elle comprend un répertoire d'accueil persistant de 5 Go et s'exécute sur Google Cloud.

Google Cloud Shell vous permet d'accéder à vos ressources Google Cloud grâce à une ligne de commande.

  1. Dans la barre d'outils située en haut à droite dans la console Cloud, cliquez sur le bouton "Ouvrir Cloud Shell".

    Icône Cloud Shell encadrée

  2. Cliquez sur Continuer.

Le provisionnement et la connexion à l'environnement prennent quelques instants. Une fois connecté, vous êtes en principe authentifié et le projet est défini sur votre ID_PROJET. Par exemple :

ID de projet mis en évidence dans le terminal Cloud Shell

gcloud est l'outil de ligne de commande pour Google Cloud. Il est préinstallé sur Cloud Shell et permet la complétion par tabulation.

  • Vous pouvez lister les noms des comptes actifs à l'aide de cette commande :
gcloud auth list

Résultat :

Credentialed accounts: - @.com (active)

Exemple de résultat :

Credentialed accounts: - google1623327_student@qwiklabs.net
  • Vous pouvez lister les ID de projet à l'aide de cette commande :
gcloud config list project

Résultat :

[core] project =

Exemple de résultat :

[core] project = qwiklabs-gcp-44776a13dea667a6 Remarque : Pour consulter la documentation complète sur gcloud, accédez au guide de présentation de la gcloud CLI.

Vérifier les autorisations du projet

Avant de commencer à travailler dans Google Cloud, vous devez vous assurer de disposer des autorisations adéquates pour votre projet dans IAM (Identity and Access Management).

  1. Dans la console Google Cloud, accédez au menu de navigation (Icône du menu de navigation), puis sélectionnez IAM et administration > IAM.

  2. Vérifiez que le compte de service Compute par défaut {project-number}-compute@developer.gserviceaccount.com existe et qu'il est associé au rôle editor (éditeur). Le préfixe du compte correspond au numéro du projet, disponible sur cette page : Menu de navigation > Présentation du cloud > Tableau de bord.

État de l'éditeur et nom du compte de service Compute Engine par défaut mis en évidence sur l'onglet "Autorisations"

Remarque : Si le compte n'est pas disponible dans IAM ou n'est pas associé au rôle editor (éditeur), procédez comme suit pour lui attribuer le rôle approprié.
  1. Dans la console Google Cloud, accédez au menu de navigation et cliquez sur Présentation du cloud > Tableau de bord.
  2. Copiez le numéro du projet (par exemple, 729328892908).
  3. Dans le menu de navigation, sélectionnez IAM et administration > IAM.
  4. Sous Afficher par compte principal, en haut de la table des rôles, cliquez sur Accorder l'accès.
  5. Dans le champ Nouveaux comptes principaux, saisissez :
{project-number}-compute@developer.gserviceaccount.com
  1. Remplacez {project-number} par le numéro de votre projet.
  2. Dans le champ Rôle, sélectionnez Projet (ou Basique) > Éditeur.
  3. Cliquez sur Enregistrer.

Configurer votre IDE

Dans cet atelier, vous utiliserez principalement un IDE Web Theia hébergé sur Google Compute Engine. Le dépôt de l'atelier y est précloné. L'IDE prend en charge les serveurs au langage Java et comprend un terminal permettant l'accès programmatique aux API Google Cloud via l'outil de ligne de commande gcloud, comme avec Cloud Shell.

  1. Pour accéder à votre IDE Theia, copiez le lien affiché dans Google Cloud Skills Boost et collez-le dans un nouvel onglet.
Remarque : Le provisionnement complet de l'environnement peut prendre entre trois et cinq minutes, même après l'affichage de l'URL. En attendant, le navigateur indiquera une erreur.

Volet des identifiants affichant l'URL ide_url

Le dépôt de l'atelier a été cloné dans votre environnement. Chaque atelier est divisé en deux dossiers : le premier, intitulé labs, contient du code que vous devez compléter, tandis que le second, nommé solution, comporte un exemple opérationnel que vous pouvez consulter si vous rencontrez des difficultés.

  1. Cliquez sur le bouton Explorateur de fichiers pour y accéder :

Menu de l'explorateur de fichiers développé, avec le dossier "Labs" mis en évidence

Vous pouvez également créer plusieurs terminaux dans cet environnement, comme vous le feriez avec Cloud Shell :

Option "Nouveau terminal" mise en évidence dans le menu "Terminal"

Vous pouvez exécuter la commande gcloud auth list dans le terminal pour vérifier que vous êtes connecté avec un compte de service fourni et que vous disposez donc des mêmes autorisations qu'avec votre compte utilisateur pour l'atelier :

Terminal dans lequel est utilisée la commande gcloud auth list

Si votre environnement cesse de fonctionner, vous pouvez essayer de réinitialiser la VM hébergeant votre IDE depuis la console GCE. Pour cela, procédez comme suit :

Bouton "Réinitialiser" et nom de l'instance de VM mis en évidence sur la page "Instances de VM"

Apache Beam et Dataflow

Environ 5 minutes

Dataflow est un service Google Cloud entièrement géré permettant d'exécuter des pipelines de traitement de données Apache Beam par lot et par flux.

Apache Beam est un modèle de programmation Open Source avancé, unifié et portable pour le traitement des données. Il permet à l'utilisateur final de définir des pipelines de traitement parallèle des données par lot et par flux en Java, Python ou Go. Les pipelines Apache Beam peuvent être exécutés sur votre ordinateur de développement local pour les petits ensembles de données, et à grande échelle sur Dataflow. Cependant, comme Apache Beam est Open Source, vous pouvez aussi exécuter les pipelines Beam sur Apache Flink et Apache Spark, entre autres.

Diagramme de l'architecture du modèle Beam

Partie 1 de l'atelier : Écrire un pipeline ETL en partant de zéro

Introduction

Dans cette section, vous allez écrire un pipeline ETL Apache Beam en partant de zéro.

Examen de l'ensemble de données et du cas d'utilisation

Dans chaque atelier de cette quête, les données d'entrée doivent être semblables aux journaux de serveur Web Common Log Format, ainsi qu'aux autres données que peut potentiellement contenir un serveur Web. Dans ce premier atelier, les données sont traitées comme source par lot. Dans les prochains ateliers, les données seront traitées comme source par flux. Votre tâche consiste à lire, à analyser, puis à écrire les données dans BigQuery, un entrepôt de données sans serveur, afin de les analyser ultérieurement.

Ouvrir l'atelier approprié

  • Si ce n'est pas déjà fait, créez un terminal dans votre environnement IDE, puis copiez et collez la commande suivante :
# Change directory into the lab cd 1_Basic_ETL/labs export BASE_DIR=$(pwd)

Modifier le fichier pom.xml

Avant de pouvoir modifier le code du pipeline, vous devez ajouter les dépendances nécessaires.

  1. Ajoutez les dépendances suivantes au fichier pom.xml, qui se trouve dans 1_Basic_ETL/labs, à l'intérieur de la balise <dependencies> :
<dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-core</artifactId> <version>${beam.version}</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-direct-java</artifactId> <version>${beam.version}</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId> <version>${beam.version}</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-runners-google-cloud-dataflow-java</artifactId> <version>${beam.version}</version> </dependency> <dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-extensions-google-cloud-platform-core</artifactId> <version>${beam.version}</version> </dependency>
  1. Une balise <beam.version> a déjà été ajoutée dans le fichier pom.xml pour indiquer la version de Beam à installer.

  2. Enregistrez le fichier.

  3. Enfin, téléchargez ces dépendances pour les utiliser dans votre pipeline :

# Download dependencies listed in pom.xml mvn clean dependency:resolve

Écrire votre premier pipeline

1 heure

Tâche 1 : Générer des données synthétiques

  1. Exécutez la commande suivante dans le shell pour cloner un dépôt contenant des scripts permettant de générer des journaux de serveur Web synthétiques :
# Change to the directory containing the relevant code cd $BASE_DIR/../.. # Create GCS buckets and BQ dataset source create_batch_sinks.sh # Run a script to generate a batch of web server log events bash generate_batch_events.sh # Examine some sample events head events.json

Le script crée un fichier nommé events.json contenant des lignes semblables à ce qui suit :

{"user_id": "-6434255326544341291", "ip": "192.175.49.116", "timestamp": "2019-06-19T16:06:45.118306Z", "http_request": "\"GET eucharya.html HTTP/1.0\"", "lat": 37.751, "lng": -97.822, "http_response": 200, "user_agent": "Mozilla/5.0 (compatible; MSIE 7.0; Windows NT 5.01; Trident/5.1)", "num_bytes": 182}

Il copie ensuite automatiquement ce fichier dans votre bucket Google Cloud Storage à l'adresse gs://<YOUR-PROJECT-ID>/events.json.

  1. Accédez à Google Cloud Storage et vérifiez que votre bucket de stockage contient un fichier nommé events.json.

Cliquez sur Vérifier ma progression pour valider l'objectif. Générer des données synthétiques

Tâche 2 : Lire les données de votre source

Si vous rencontrez des difficultés dans cette section ou dans les sections suivantes, n'hésitez pas à consulter la solution.

  1. Ouvrez MyPipeline.java dans votre IDE. Ce fichier se trouve dans 1_Basic_ETL/labs/src/main/java/com/mypackage/pipeline. Assurez-vous que les packages suivants sont importés :
import com.google.gson.Gson; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.schemas.JavaFieldSchema; import org.apache.beam.sdk.schemas.annotations.DefaultSchema; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
  1. Faites défiler la page jusqu'à la méthode run(). Cette méthode contient actuellement un pipeline qui ne réalise aucune opération. Notez qu'un objet Pipeline est créé à l'aide d'un objet PipelineOptions. La dernière ligne de la méthode exécute le pipeline.
Pipeline pipeline = Pipeline.create(options); // Do stuff pipeline.run();

Toutes les données des pipelines Apache Beam résident dans des PCollections. Pour créer la première PCollection du pipeline, appliquez une transformation racine à l'objet de pipeline. Une transformation racine crée une PCollection à partir d'une source de données externe ou de certaines données locales que vous spécifiez.

Il existe deux types de transformations racines dans les SDK Beam : Read et Create. Les transformations Read lisent des données provenant d'une source externe, comme un fichier texte ou une table de base de données. Les transformations Create créent une PCollection à partir d'une java.util.Collection en mémoire et sont particulièrement utiles pendant les tests.

L'exemple de code suivant montre comment appliquer une transformation racine TextIO.Read pour lire des données à partir d'un fichier texte. Cette transformation est appliquée à un objet Pipeline p et renvoie un ensemble de données de pipeline sous la forme d'une PCollection<String>. "ReadLines" est le nom de la transformation, qui sera utile ultérieurement lorsque vous travaillerez avec des pipelines plus volumineux :

PCollection<String> lines = pipeline.apply("ReadLines", TextIO.read().from("gs://path/to/input.txt"));
  1. Dans la méthode run(), créez une constante de chaîne nommée "input" et définissez sa valeur sur gs://<YOUR-PROJECT-ID>/events.json. Dans un prochain atelier, vous utiliserez des paramètres de ligne de commande pour transmettre ces informations.

  2. Créez une PCollection contenant les chaînes de tous les événements du fichier events.json en appelant la transformation TextIO.read().

  3. Ajoutez toute instruction d'importation appropriée en haut du fichier MyPipeline.java, dans ce cas import org.apache.beam.sdk.values.PCollection;.

Tâche 3 : Exécuter le pipeline pour vérifier qu'il fonctionne

  • Retournez dans le terminal, accédez au dossier $BASE_DIR et exécutez la commande mvn compile exec:java :
cd $BASE_DIR # Set up environment variables export MAIN_CLASS_NAME=com.mypackage.pipeline.MyPipeline mvn compile exec:java \ -Dexec.mainClass=${MAIN_CLASS_NAME} Remarque : Si la compilation échoue, exécutez la commande mvn clean install.

Pour l'instant, votre pipeline ne réalise aucune opération. Il lit simplement les données. Cependant, son exécution illustre un workflow utile, dans lequel vous vérifiez le pipeline localement et à moindre coût à l'aide de DirectRunner sur votre ordinateur local avant d'effectuer des calculs plus coûteux. Pour exécuter le pipeline avec Dataflow, vous pouvez remplacer runner par DataflowRunner.

Tâche 4 : Ajouter une transformation

Si vous rencontrez des difficultés, consultez la solution.

Les transformations modifient vos données. Dans Apache Beam, les transformations sont effectuées par la classe PTransform. Au moment de l'exécution, ces opérations seront effectuées sur un certain nombre de nœuds de calcul indépendants. L'entrée et le résultat de chaque PTransform sont des PCollections. Même si vous ne l'avez peut-être pas remarqué, vous avez déjà utilisé une PTransform pour lire des données provenant de Google Cloud Storage. Que vous l'ayez affectée ou non à une variable, cela a créé une PCollection de chaînes.

Étant donné que Beam utilise une méthode d'application générique pour les PCollection, vous pouvez enchaîner les transformations de manière séquentielle. Par exemple, vous pouvez les enchaîner pour créer un pipeline séquentiel comme celui-ci :

[Final Output PCollection] = [Initial Input PCollection].apply([First Transform]) .apply([Second Transform]) .apply([Third Transform]);

Pour cette tâche, vous allez utiliser un nouveau type de transformation : ParDo. ParDo est une transformation Beam pour le traitement parallèle générique. Le paradigme de traitement ParDo est semblable à la phase de mappage d'un algorithme de type mappage/brassage/réduction (Map/Shuffle/Reduce) : une transformation ParDo prend en compte chaque élément de la PCollection d'entrée, exécute une fonction de traitement (votre code utilisateur) sur cet élément et émet zéro, un ou plusieurs éléments dans une PCollection de sortie.

L'opération ParDo est utile pour diverses opérations de traitement de données, par exemple :

  • Filtrer un ensemble de données : vous pouvez utiliser ParDo pour prendre en compte chaque élément d'une PCollection et transmettre cet élément en sortie dans une nouvelle collection ou le supprimer.
  • Formater ou convertir le type de chaque élément d'un ensemble de données : si votre PCollection d'entrée contient des éléments dont le type ou le format ne vous convient pas, vous pouvez utiliser ParDo pour effectuer la conversion de chaque élément et générer le résultat dans une nouvelle PCollection.
  • Extraire des parties de chaque élément d'un ensemble de données : si votre PCollection contient des enregistrements avec plusieurs champs, vous pouvez par exemple utiliser ParDo pour n'extraire que les champs à prendre en compte dans une nouvelle PCollection.
  • Effectuer des calculs sur chaque élément d'un ensemble de données : vous pouvez utiliser ParDo pour effectuer des calculs simples ou complexes sur chaque élément ou certains éléments d'une PCollection et générer les résultats dans une nouvelle PCollection.
  1. Pour réaliser cette tâche, vous devez écrire une transformation ParDo qui lit une chaîne JSON représentant un seul événement, qui l'analyse à l'aide de Gson et qui génère en sortie l'objet personnalisé renvoyé par Gson.

Les fonctions ParDo peuvent être implémentées de manière intégrée ou en tant que classe statique. Pour exécuter des fonctions ParDo intégrées, écrivez-les comme suit :

pCollection.apply(ParDo.of(new DoFn<T1, T2>() { @ProcessElement public void processElement(@Element T1 i, OutputReceiver<T2> r) { // Do something r.output(0); } }));

Vous pouvez également les implémenter en tant que classes statiques qui étendent DoFn. Cela permet de les intégrer plus facilement aux frameworks de test.

static class MyDoFn extends DoFn<T1, T2> { @ProcessElement public void processElement(@Element T1 json, OutputReceiver<T2> r) throws Exception { // Do something r.output(0); } }

Et dans le pipeline lui-même :

[Initial Input PCollection].apply(ParDo.of(new MyDoFn());
  1. Pour utiliser Gson, vous devez créer une classe interne dans MyPipeline. Pour profiter des schémas Beam, ajoutez l'annotation @DefaultSchema. Nous y reviendrons plus tard. Voici un exemple d'utilisation de Gson :
// Elsewhere @DefaultSchema(JavaFieldSchema.class) class MyClass { int field1; String field2; } // Within the DoFn Gson gson = new Gson(); MyClass myClass = gson.fromJson(jsonString, MyClass.class);
  1. Nommez votre classe interne CommmonLog. Pour construire cette classe interne avec les bonnes variables d'état, reportez-vous à l'exemple JSON ci-dessus : la classe doit comporter une variable d'état pour chaque clé du JSON entrant, et cette variable doit correspondre en type et en nom à la valeur et à la clé.

  2. Utilisez String pour "timestamp" pour le moment, Long pour "INTEGER" (l'entier BigQuery est INT64), Double pour "FLOAT" (le type FLOAT de BigQuery est FLOAT64) et suivez le schéma BigQuery suivant :

Page avec l&#39;onglet &quot;Schéma CommonLog&quot; qui inclut les informations de journal telles que user_id, timestamp et num_bytes.

Pour rappel, vous pouvez consulter la solution si vous rencontrez des difficultés.

Tâche 5 : Écrire dans un récepteur

À ce stade, le pipeline lit un fichier depuis Google Cloud Storage, analyse chaque ligne et émet un CommonLog pour chaque élément. L'étape suivante consiste à écrire ces objets CommonLog dans une table BigQuery.

Bien que vous puissiez demander à votre pipeline de créer une table BigQuery si nécessaire, vous devrez créer l'ensemble de données à l'avance. Cette opération a déjà été effectuée par le script generate_batch_events.sh.

Vous pouvez examiner l'ensemble de données :

# Examine dataset bq ls # No tables yet bq ls logs

Pour générer les PCollection finales du pipeline, vous devez appliquer une transformation Write à cette PCollection. Les transformations Write peuvent générer les éléments d'une PCollection vers un récepteur de données externe, tel qu'une table de base de données. Vous pouvez utiliser une transformation Write pour générer une PCollection à tout moment dans le pipeline, bien que l'écriture de données soit généralement effectuée à la fin du pipeline.

L'exemple de code suivant montre comment appliquer une transformation TextIO.Write pour écrire une PCollection de chaînes dans un fichier texte :

PCollection<String> pCollection = ...; pCollection.apply("WriteMyFile", TextIO.write().to("gs://path/to/output"));
  1. Dans ce cas, utilisez BigQueryIO.write() au lieu de TextIO.write().

Cette fonction nécessite de spécifier un certain nombre d'éléments, y compris la table dans laquelle écrire ainsi que le schéma de cette table. Vous pouvez indiquer si vous souhaitez ajouter des données à une table existante, recréer des tables existantes (ce qui est utile pour la première itération du pipeline) ou créer la table si elle n'existe pas. Par défaut, cette transformation créera des tables qui n'existent pas et n'écrira pas dans une table qui n'est pas vide.

Depuis l'ajout des schémas Beam au SDK, vous pouvez demander à la transformation d'inférer le schéma de la table à partir de l'objet qui lui est transmis en utilisant .useBeamSchema() et en marquant le type d'entrée. Vous pouvez également fournir explicitement le schéma avec .withSchema(), mais vous devrez créer un objet TableSchema BigQuery à transmettre. Comme vous avez annoté la classe CommonLog avec @DefaultSchema(JavaFieldSchema.class), chaque transformation connaît les noms et les types des champs de l'objet, y compris BigQueryIO.write().

  1. Examinez les différentes alternatives dans la section "Writing" de BigQueryIO. Dans ce cas, comme vous avez annoté votre objet CommonLog, utilisez .useBeamSchema() et ciblez la table <YOUR-PROJECT-ID>:logs.logs comme suit :
pCollection.apply(BigQueryIO.<MyObject>write() .to("my-project:output_dataset.output_table") .useBeamSchema() .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) ); Remarque  : WRITE_TRUNCATE supprime et recrée votre table à chaque fois. Cette opération est utile lors de la première itération du pipeline, en particulier sur votre schéma. Toutefois, elle peut facilement entraîner des problèmes inattendus en production. Il est plus prudent d'utiliser WRITE_APPEND ou WRITE_EMPTY.

L'ensemble des types disponibles dans les schémas Beam se trouve dans la documentation Schema.FieldType. Vous trouverez tous les types de données BigQuery possibles en SQL standard dans la documentation sur setType. Si vous êtes curieux, vous pouvez inspecter la conversion du schéma Beam en schéma BigQuery.

Tâche 6 : Exécuter le pipeline

Retournez dans le terminal, remplacez la valeur de la variable d'environnement RUNNER par DataflowRunner et exécutez votre pipeline en vous servant de la même commande que précédemment. Une fois qu'il a démarré, accédez à la page du produit Dataflow et notez l'organisation de votre pipeline. Si vous avez nommé vos transformations, les noms s'affichent. En cliquant sur chacun d'eux, vous verrez en temps réel le nombre d'éléments traités chaque seconde.

La forme globale doit correspondre à un seul chemin d'accès, commençant par la transformation Read et se terminant par la transformation Write. À mesure que votre pipeline s'exécute, des nœuds de calcul sont ajoutés automatiquement, car le service détermine les besoins de votre pipeline, puis ils disparaissent lorsqu'ils ne sont plus nécessaires. Pour observer cela, accédez à Compute Engine, où vous devriez voir les machines virtuelles créées par le service Dataflow.

Remarque : Si vous avez réussi à créer votre pipeline, mais que vous constatez de nombreuses erreurs dues au code ou à une mauvaise configuration dans le service Dataflow, vous pouvez redéfinir RUNNER sur "DirectRunner" pour l'exécuter en local et obtenir plus rapidement des retours. Dans notre cas présent, c'est une approche viable car l'ensemble de données est de taille réduite et vous n'utilisez aucune fonctionnalité non compatible avec DirectRunner. # Set up environment variables export PROJECT_ID=$(gcloud config get-value project) export REGION='{{{project_0.default_region|Region}}}' export PIPELINE_FOLDER=gs://${PROJECT_ID} export MAIN_CLASS_NAME=com.mypackage.pipeline.MyPipeline export RUNNER=DataflowRunner cd $BASE_DIR mvn compile exec:java \ -Dexec.mainClass=${MAIN_CLASS_NAME} \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=${PROJECT_ID} \ --region=${REGION} \ --stagingLocation=${PIPELINE_FOLDER}/staging \ --tempLocation=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER}"
  • Une fois votre pipeline terminé, retournez dans la fenêtre BigQuery du navigateur et interrogez votre table.

Si votre code ne fonctionne pas comme prévu et que vous ne savez pas quoi faire pour y remédier, consultez la solution.

Remarque : Accédez à Dataflow > Jobs et attendez que l'état du job passe à Réussie.

Cliquez sur Vérifier ma progression pour valider l'objectif. Exécuter le pipeline

Partie 2 de l'atelier : Paramétrer un pipeline ETL de base

Environ 20 minutes

La plupart des tâches des ingénieurs de données sont prévisibles, comme les jobs récurrents, ou bien elles se ressemblent. Cependant, le processus d'exécution des pipelines nécessite une expertise en ingénierie. Repensez aux étapes que vous venez de terminer :

  1. Vous avez créé un environnement de développement et développé un pipeline. L'environnement comprenait le SDK Apache Beam ainsi que d'autres dépendances.
  2. Vous avez exécuté le pipeline à partir de l'environnement de développement. Le SDK Apache Beam a préparé des fichiers dans Cloud Storage, a créé un fichier de requête de job et l'a envoyé au service Dataflow.

Ce serait beaucoup mieux s'il existait un moyen de démarrer un job avec un appel d'API ou sans avoir à configurer un environnement de développement (une opération inaccessible aux utilisateurs non techniques). Cela vous permettrait également d'exécuter des pipelines.

Les modèles Dataflow cherchent à résoudre ce problème en modifiant la représentation créée lorsqu'un pipeline est compilé afin de le rendre paramétrable. Malheureusement, il ne suffit pas d'exposer des paramètres de ligne de commande, même si cette approche sera abordée dans un prochain atelier. Avec les modèles Dataflow, le workflow ci-dessus se présente comme suit :

  1. Les développeurs créent un environnement de développement et développent leur pipeline. L'environnement comprend le SDK Apache Beam ainsi que d'autres dépendances.
  2. Les développeurs exécutent le pipeline et créent un modèle. Le SDK Apache Beam prépare les fichiers dans Cloud Storage, crée un fichier de modèle (comparable à une requête de job), puis l'enregistre dans Cloud Storage.
  3. Les utilisateurs non techniques, ainsi que d'autres outils de workflow comme Airflow, peuvent facilement exécuter des jobs à l'aide de la console Google Cloud, de l'outil de ligne de commande gcloud ou de l'API REST pour envoyer des requêtes d'exécution de fichier de modèle au service Dataflow.

Dans cet atelier, vous allez vous entraîner à utiliser l'un des nombreux modèles Dataflow créés par Google pour effectuer la même tâche que le pipeline que vous avez créé dans la partie 1.

Tâche 1 : Créer un fichier de schéma JSON

Bien que vous n'ayez pas eu à transmettre d'objet TableSchema à la transformation BigQueryIO.writeTableRows() puisque vous avez utilisé .usedBeamSchema(), vous devez transmettre au modèle Dataflow un fichier JSON représentant le schéma de cet exemple.

  1. Ouvrez le terminal et revenez au répertoire principal. Exécutez la commande suivante pour récupérer le schéma de votre table logs.logs existante :
cd $BASE_DIR/../.. bq show --schema --format=prettyjson logs.logs
  1. Capturez ensuite le résultat dans un fichier et importez-le dans GCS. Les commandes sed supplémentaires permettent de créer un objet JSON complet attendu par Dataflow.
bq show --schema --format=prettyjson logs.logs | sed '1s/^/{"BigQuery Schema":/' | sed '$s/$/}/' > schema.json cat schema.json export PROJECT_ID=$(gcloud config get-value project) gcloud storage cp schema.json gs://${PROJECT_ID}/

Cliquez sur Vérifier ma progression pour valider l'objectif. Créer un fichier de schéma JSON

Tâche 2 : Écrire une fonction définie par l'utilisateur en JavaScript

Le modèle Dataflow de Cloud Storage vers BigQuery nécessite une fonction JavaScript pour convertir le texte brut en code JSON valide. Dans notre cas, chaque ligne de texte est déjà du code JSON valide. Cette fonction est donc sans grand intérêt ici.

Pour effectuer cette tâche, utilisez l'IDE pour créer un fichier .js avec le contenu ci-dessous, puis copiez-le dans Google Cloud Storage.

  1. Copiez la fonction ci-dessous dans son propre fichier transform.js dans le dossier 1_Basic_ETL/ :
function transform(line) { return line; }
  1. Exécutez ensuite la commande suivante pour copier le fichier dans Google Cloud Storage :
cd 1_Basic_ETL/ export PROJECT_ID=$(gcloud config get-value project) gcloud storage cp *.js gs://${PROJECT_ID}/

Cliquez sur Vérifier ma progression pour valider l'objectif. Écrire une fonction définie par l'utilisateur en JavaScript

Tâche 3 : Exécuter un modèle Dataflow

  1. Accédez à l'interface utilisateur Web de Dataflow.

  2. Cliquez sur CRÉER UN JOB À PARTIR D'UN MODÈLE.

  3. Saisissez un nom pour votre job Cloud Dataflow.

  4. Pour Point de terminaison régional, sélectionnez la région .

  5. Sous "Modèle Dataflow", sélectionnez le modèle Fichiers texte dans Cloud Storage vers BigQuery dans la section Traiter les données de façon groupée (lot), PAS dans la section "Flux".

  6. Pour Chemin d'accès d'entrée Cloud Storage, saisissez le chemin d'accès vers le fichier events.json au format gs://<YOUR-PROJECT-ID>/events.json.

  7. Pour Emplacement Cloud Storage de votre fichier de schéma BigQuery, écrivez le chemin d'accès à votre fichier schema.json, au format gs://<YOUR-PROJECT-ID>/schema.json

  8. Dans le champ Table BigQuery de sortie, saisissez <myprojectid>:logs.logs.

  9. Sous Répertoire BigQuery temporaire, indiquez un nouveau dossier dans ce même bucket. Le job va le créer pour vous.

  10. Sous Emplacement temporaire, indiquez un deuxième nouveau dossier dans ce même bucket.

  11. Laissez le champ Chiffrement défini sur Clé gérée par Google.

  12. Cliquez pour ouvrir Paramètres facultatifs.

  13. Pour le Chemin d'accès aux UDF JavaScript dans Cloud Storage, saisissez le chemin d'accès à votre fichier .js, au format gs://<YOUR-PROJECT-ID>/transform.js.

  14. Dans le champ Nom des UDF JavaScript définies par l'utilisateur, saisissez transform.

  15. Cliquez sur le bouton Exécuter le job.

Vous pouvez inspecter votre job à partir de l'UI Web de Dataflow pendant son exécution.

Remarque : Accédez à Dataflow > Jobs et attendez que l'état du job passe à Réussie.

Cliquez sur Vérifier ma progression pour valider l'objectif. Exécuter un modèle Dataflow

Tâche 4 : Inspecter le code du modèle Dataflow

  1. Rappelez-vous le code du modèle Dataflow que vous venez d'utiliser.

  2. Faites défiler la page jusqu'à la méthode main. Le code doit ressembler au pipeline que vous avez créé.

  • Il commence par un objet Pipeline, créé à l'aide d'un objet PipelineOptions.
  • Il se compose d'une chaîne de PTransform, commençant par une transformation TextIO.read().
  • La PTransform après la transformation Read est légèrement différente. Elle permet d'utiliser du code JavaScript pour transformer les chaînes d'entrée si, par exemple, le format source ne correspond pas tout à fait au format de la table BigQuery. Pour en savoir plus sur l'utilisation de cette fonctionnalité, consultez cette page.
  • La PTransform après les UDF JavaScript utilise une fonction de bibliothèque pour convertir le code JSON en ligne de table. Vous pouvez inspecter ce code ici.
  • La PTransform Write est légèrement différente, car au lieu d'utiliser un schéma connu au moment de la compilation du graphe, le code est destiné à accepter des paramètres qui ne seront connus qu'au moment de l'exécution. C'est la classe NestedValueProvider qui rend cela possible. Il utilise également un schéma défini explicitement, contrairement à celui que vous avez inféré du schéma Beam à l'aide de .useBeamSchema().
  1. Ne manquez pas le prochain atelier, qui abordera la création de pipelines allant au-delà des simples chaînes de PTransform. Vous découvrirez également comment adapter un pipeline que vous avez créé pour en faire un modèle Dataflow personnalisé.

Terminer l'atelier

Une fois l'atelier terminé, cliquez sur Terminer l'atelier. Google Cloud Skills Boost supprime les ressources que vous avez utilisées, puis efface le compte.

Si vous le souhaitez, vous pouvez noter l'atelier. Sélectionnez un nombre d'étoiles, saisissez un commentaire, puis cliquez sur Envoyer.

Le nombre d'étoiles correspond à votre degré de satisfaction :

  • 1 étoile = très insatisfait(e)
  • 2 étoiles = insatisfait(e)
  • 3 étoiles = ni insatisfait(e), ni satisfait(e)
  • 4 étoiles = satisfait(e)
  • 5 étoiles = très satisfait(e)

Si vous ne souhaitez pas donner votre avis, vous pouvez fermer la boîte de dialogue.

Pour soumettre des commentaires, suggestions ou corrections, veuillez accéder à l'onglet Assistance.

Copyright 2020 Google LLC Tous droits réservés. Google et le logo Google sont des marques de Google LLC. Tous les autres noms d'entreprises et de produits peuvent être des marques des entreprises auxquelles ils sont associés.

Avant de commencer

  1. Les ateliers créent un projet Google Cloud et des ressources pour une durée déterminée.
  2. Les ateliers doivent être effectués dans le délai imparti et ne peuvent pas être mis en pause. Si vous quittez l'atelier, vous devrez le recommencer depuis le début.
  3. En haut à gauche de l'écran, cliquez sur Démarrer l'atelier pour commencer.

Utilisez la navigation privée

  1. Copiez le nom d'utilisateur et le mot de passe fournis pour l'atelier
  2. Cliquez sur Ouvrir la console en navigation privée

Connectez-vous à la console

  1. Connectez-vous à l'aide des identifiants qui vous ont été attribués pour l'atelier. L'utilisation d'autres identifiants peut entraîner des erreurs ou des frais.
  2. Acceptez les conditions d'utilisation et ignorez la page concernant les ressources de récupération des données.
  3. Ne cliquez pas sur Terminer l'atelier, à moins que vous n'ayez terminé l'atelier ou que vous ne vouliez le recommencer, car cela effacera votre travail et supprimera le projet.

Ce contenu n'est pas disponible pour le moment

Nous vous préviendrons par e-mail lorsqu'il sera disponible

Parfait !

Nous vous contacterons par e-mail s'il devient disponible

Un atelier à la fois

Confirmez pour mettre fin à tous les ateliers existants et démarrer celui-ci

Utilisez la navigation privée pour effectuer l'atelier

Ouvrez une fenêtre de navigateur en mode navigation privée pour effectuer cet atelier. Vous éviterez ainsi les conflits entre votre compte personnel et le compte temporaire de participant, qui pourraient entraîner des frais supplémentaires facturés sur votre compte personnel.