arrow_back

Traitement des données sans serveur avec Dataflow : pipeline d'analyse de flux avancé avec Dataflow (Java)

Accédez à plus de 700 ateliers et cours

Traitement des données sans serveur avec Dataflow : pipeline d'analyse de flux avancé avec Dataflow (Java)

Atelier 1 heure 30 minutes universal_currency_alt 5 crédits show_chart Avancé
info Cet atelier peut intégrer des outils d'IA pour vous accompagner dans votre apprentissage.
Accédez à plus de 700 ateliers et cours

Présentation

Au cours de cet atelier, vous allez :

  • Gérer les données en retard
  • Gérer les données mal formées en :
    1. écrivant une transformation composite pour rendre le code plus modulaire ;
    2. écrivant une transformation qui génère plusieurs sorties de types différents ;
    3. collectant les données mal formées et en les enregistrant dans un emplacement où elles pourront être examinées.

À la fin de l'atelier précédent, on évoquait un défi majeur des pipelines en temps réel : le décalage entre le moment où les événements se produisent et celui où ils sont traités, également appelé "latence". Cet atelier présente les concepts d'Apache Beam qui permettent aux concepteurs de pipelines de définir, de manière formelle, comment leurs pipelines doivent gérer la latence.

Mais la latence n'est pas le seul type de problème que les pipelines sont susceptibles de rencontrer dans un contexte de flux : chaque fois qu'une entrée provient de l'extérieur du système, il existe toujours un risque qu'elle soit, d'une manière ou d'une autre, mal formée. Cet atelier présente également des techniques permettant de traiter ces entrées.

Le pipeline obtenu à la fin de cet atelier sera semblable à celui illustré ci-dessous. Notez qu'il comporte une branche.

Schéma de l'architecture du pipeline final

Préparation

Pour chaque atelier, nous vous attribuons un nouveau projet Google Cloud et un nouvel ensemble de ressources pour une durée déterminée, sans frais.

  1. Connectez-vous à Qwiklabs dans une fenêtre de navigation privée.

  2. Vérifiez le temps imparti pour l'atelier (par exemple : 01:15:00) : vous devez pouvoir le terminer dans ce délai.
    Une fois l'atelier lancé, vous ne pouvez pas le mettre en pause. Si nécessaire, vous pourrez le redémarrer, mais vous devrez tout reprendre depuis le début.

  3. Lorsque vous êtes prêt, cliquez sur Démarrer l'atelier.

  4. Notez vos identifiants pour l'atelier (Nom d'utilisateur et Mot de passe). Ils vous serviront à vous connecter à Google Cloud Console.

  5. Cliquez sur Ouvrir la console Google.

  6. Cliquez sur Utiliser un autre compte, puis copiez-collez les identifiants de cet atelier lorsque vous y êtes invité.
    Si vous utilisez d'autres identifiants, des messages d'erreur s'afficheront ou des frais seront appliqués.

  7. Acceptez les conditions d'utilisation et ignorez la page concernant les ressources de récupération des données.

Vérifier les autorisations du projet

Avant de commencer à travailler dans Google Cloud, vous devez vous assurer de disposer des autorisations adéquates pour votre projet dans IAM (Identity and Access Management).

  1. Dans la console Google Cloud, accédez au menu de navigation (Icône du menu de navigation), puis sélectionnez IAM et administration > IAM.

  2. Vérifiez que le compte de service Compute par défaut {project-number}-compute@developer.gserviceaccount.com existe et qu'il est associé au rôle editor (éditeur). Le préfixe du compte correspond au numéro du projet, disponible sur cette page : Menu de navigation > Présentation du cloud > Tableau de bord.

État de l'éditeur et nom du compte de service Compute Engine par défaut mis en évidence sur l'onglet "Autorisations"

Remarque : Si le compte n'est pas disponible dans IAM ou n'est pas associé au rôle editor (éditeur), procédez comme suit pour lui attribuer le rôle approprié.
  1. Dans la console Google Cloud, accédez au menu de navigation et cliquez sur Présentation du cloud > Tableau de bord.
  2. Copiez le numéro du projet (par exemple, 729328892908).
  3. Dans le menu de navigation, sélectionnez IAM et administration > IAM.
  4. Sous Afficher par compte principal, en haut de la table des rôles, cliquez sur Accorder l'accès.
  5. Dans le champ Nouveaux comptes principaux, saisissez :
{project-number}-compute@developer.gserviceaccount.com
  1. Remplacez {project-number} par le numéro de votre projet.
  2. Dans le champ Rôle, sélectionnez Projet (ou Basique) > Éditeur.
  3. Cliquez sur Enregistrer.

Dans cet atelier, vous utiliserez principalement un IDE Web Theia hébergé sur Google Compute Engine. Le dépôt de l'atelier y est précloné. L'IDE prend en charge les serveurs au langage Java et comprend un terminal permettant l'accès programmatique aux API Google Cloud via l'outil de ligne de commande gcloud, comme avec Cloud Shell.

  1. Pour accéder à votre IDE Theia, copiez le lien affiché dans Google Cloud Skills Boost et collez-le dans un nouvel onglet.
Remarque : Le provisionnement complet de l'environnement peut prendre entre trois et cinq minutes, même après l'affichage de l'URL. En attendant, le navigateur indiquera une erreur.

Volet des identifiants affichant l'URL ide_url

Le dépôt de l'atelier a été cloné dans votre environnement. Chaque atelier est divisé en deux dossiers : le premier, intitulé labs, contient du code que vous devez compléter, tandis que le second, nommé solution, comporte un exemple opérationnel que vous pouvez consulter si vous rencontrez des difficultés.

  1. Cliquez sur le bouton Explorateur de fichiers pour y accéder :

Menu de l'explorateur de fichiers développé, avec le dossier "Labs" mis en évidence

Vous pouvez également créer plusieurs terminaux dans cet environnement, comme vous le feriez avec Cloud Shell :

Option "Nouveau terminal" mise en évidence dans le menu "Terminal"

Vous pouvez exécuter la commande gcloud auth list dans le terminal pour vérifier que vous êtes connecté avec un compte de service fourni et que vous disposez donc des mêmes autorisations qu'avec votre compte utilisateur pour l'atelier :

Terminal dans lequel est utilisée la commande gcloud auth list

Si votre environnement cesse de fonctionner, vous pouvez essayer de réinitialiser la VM hébergeant votre IDE depuis la console GCE. Pour cela, procédez comme suit :

Bouton "Réinitialiser" et nom de l'instance de VM mis en évidence sur la page "Instances de VM"

Partie 1 de l'atelier : Gérer les données en retard

Dans les ateliers précédents, vous avez écrit du code qui répartissait les éléments selon l'heure d'événement en fenêtres de largeur fixe, à l'aide d'un code semblable à celui-ci :

commonLogs .apply("WindowCommonLogs", Window.into( FixedWindows.of( Duration.standardMinutes( options.getWindowDuration())))) .apply("CountEventsPerWindow", Combine.globally( Count.<CommonLog>combineFn()).withoutDefaults());

Cependant, comme vous l'avez constaté à la fin du dernier atelier non SQL, les flux de données présentent souvent une latence. La latence est problématique lors du fenêtrage basé sur l'heure de l'événement (par opposition à l'heure du traitement), car elle introduit une incertitude : tous les événements correspondant à un instant donné ont-ils bien été reçus ?

De toute évidence, pour générer des résultats, le pipeline que vous avez écrit devait trancher sur la question. Pour ce faire, il a utilisé un concept appelé "watermark". Une watermark est une estimation heuristique du système indiquant à quel moment toutes les données jusqu'à un certain point dans le temps événementiel ont été reçues dans le pipeline. Une fois que la watermark dépasse la fin d'une fenêtre, tous les éléments supplémentaires dont le code temporel correspond à cette fenêtre sont considérés comme des données en retard et sont simplement ignorés. Par conséquent, le comportement par défaut du fenêtrage consiste à émettre un seul résultat, que l'on espère complet, lorsque le système est certain d'avoir toutes les données.

Apache Beam utilise plusieurs méthodes heuristiques pour estimer la watermark Cependant, cela reste des estimations. Plus précisément, ces méthodes heuristiques sont générales et ne conviennent pas à tous les cas d'utilisation. Au lieu de se fier à des méthodes heuristiques générales, les concepteurs de pipelines doivent bien réfléchir aux points suivants afin de déterminer les compromis appropriés :

  • Exhaustivité : est-il important de disposer de toutes les données avant de calculer le résultat ?
  • Latence : combien de temps sont-ils prêts à attendre les données ? Par exemple, attendent-ils d'avoir reçu toutes les données ou les traitent-ils au fur et à mesure qu'elles arrivent ?
  • Coût : quelle puissance de calcul et quel budget sont-ils prêts à consacrer pour réduire la latence ?

Une fois ces réponses en tête, il est possible d'utiliser les structures formelles d'Apache Beam pour écrire du code permettant de faire les bons compromis.

Retard autorisé

Le retard autorisé détermine combien de temps une fenêtre doit conserver son état. Une fois que la watermark atteint la fin de la période de retard autorisé, l'état est supprimé. Il serait intéressant de pouvoir conserver éternellement l'état persistant. En réalité, lorsque l'on travaille avec une source de données illimitée, il n'est souvent pas pratique de conserver l'état d'une fenêtre donnée indéfiniment, car on finit par manquer d'espace disque.

Par conséquent, tout système de traitement réel capable de gérer des données dans le désordre doit fournir un moyen de limiter la durée de vie des fenêtres qu'il traite. Une façon simple et concise de procéder consiste à définir un horizon pour le retard autorisé dans le système, c'est-à-dire à fixer une limite au-delà de laquelle un enregistrement donné (par rapport à la watermark) ne sera plus traité par le système. Toutes les données arrivant après cet horizon sont simplement ignorées. Une fois que vous avez défini la limite de retard pour les données individuelles, vous savez exactement combien de temps conserver l'état des fenêtres : jusqu'à ce que la watermark dépasse l'horizon de retard autorisé pour la fin de la fenêtre.

Tâche 1 : Préparer l'environnement

Comme dans les ateliers précédents, la première étape consiste à générer les données que le pipeline va traiter. Vous allez ouvrir l'environnement de l'atelier et générer les données comme précédemment.

Ouvrir l'atelier approprié

  1. Si ce n'est pas déjà fait, créez un terminal dans votre environnement IDE, puis copiez et collez la commande suivante :
# Change directory into the lab cd 7_Advanced_Streaming_Analytics/labs # Download dependencies mvn clean dependency:resolve export BASE_DIR=$(pwd)
  1. Configurez l'environnement de données :
# Create GCS buckets, BQ dataset, and Pubsub Topic cd $BASE_DIR/../.. source create_streaming_sinks.sh # Change to the directory containing the practice version of the code cd $BASE_DIR

Cliquez sur Vérifier ma progression pour valider l'objectif. Préparer l'environnement

Tâche 2 : Définir le retard autorisé

  1. Dans votre IDE, ouvrez StreamingMinuteTrafficPipeline.java, qui se trouve dans 7_Advanced_Streaming_Analytics/labs/src/main/java/com/mypackage/pipeline.

Dans Apache Beam, le retard autorisé est défini à l'aide de la méthode withAllowedLateness(), comme dans l'exemple ci-dessous :

PCollection<String> items = ...; PCollection<String> windowed_items = items.apply( Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))) .withAllowedLateness(Duration.standardDays(1)));
  1. Pour effectuer cette tâche, examinez la transformation de fenêtrage et ajoutez un appel à .withAllowedLateness() en transmettant une durée Duration valide construite à partir du paramètre de ligne de commande approprié. Déterminez une valeur raisonnable et mettez à jour la ligne de commande en indiquant les unités correctes.

Déclencheurs

Les concepteurs de pipelines peuvent également décider quand produire des résultats préliminaires. Par exemple, supposons que la watermark pour la fin d'une fenêtre n'ait pas encore été atteinte, mais que 75 % des données attendues soient déjà arrivées. Dans de nombreux cas, on peut supposer qu'un tel échantillon est représentatif, ce qui justifie de l'afficher aux utilisateurs finaux.

Les déclencheurs déterminent à quel moment les résultats seront produits pendant le traitement. Chaque sortie spécifique d'une fenêtre est appelée "volet" de la fenêtre. Les déclencheurs génèrent des volets lorsque leurs conditions sont remplies. Dans Apache Beam, ces conditions incluent la progression de la watermark, la progression du temps de traitement (qui progresse uniformément, quelle que soit la quantité de données réellement reçues), le nombre d'éléments (par exemple, lorsqu'une certaine quantité de nouvelles données arrive) et les déclencheurs dépendants des données, comme la détection de la fin d'un fichier.

Les conditions d'un déclencheur peuvent l'amener à produire un volet à plusieurs reprises. Par conséquent, il est également nécessaire de spécifier comment cumuler ces résultats. Apache Beam prend actuellement en charge deux modes d'accumulation : l'un qui cumule les résultats et l'autre qui renvoie uniquement les parties du résultat qui sont nouvelles depuis le dernier volet produit.

Tâche 3 : Définir un déclencheur

Lorsque vous définissez un fenêtrage pour une PCollection à l'aide de la transformation Window, vous pouvez également spécifier un déclencheur.

Vous définissez le ou les déclencheurs pour une PCollection en invoquant la méthode .triggering() sur le résultat de votre transformation Window.into(). Window.triggering() accepte un déclencheur comme argument. Apache Beam propose plusieurs déclencheurs prédéfinis :

  • AfterWatermark déclenche l'émission de résultats lorsque la watermark dépasse un code temporel déterminé soit par la fin de la fenêtre, soit par l'arrivée du premier élément dans un volet.
  • AfterProcessingTime déclenche l'émission de résultats après qu'un certain temps de traitement s'est écoulé (généralement depuis l'arrivée du premier élément dans volet).
  • AfterPane déclenche l'émission d'une propriété pour les éléments du volet actuel, comme le nombre d'éléments qui ont été attribués au volet actuel.

Cet exemple de code définit un déclencheur basé sur le temps pour une PCollection qui émet des résultats une minute après le traitement du premier élément de cette fenêtre. La dernière ligne de l'exemple de code, .discardingFiredPanes(), définit le mode d'accumulation de la fenêtre :

PCollection<String> pc = ...; pc.apply(Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES)) .triggering(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(1)) .discardingFiredPanes());
  • Pour effectuer cette tâche, ajoutez un appel à Window.triggering() dans la transformation de fenêtrage, en transmettant un déclencheur valide. Lorsque vous concevez votre déclencheur, gardez à l'esprit ce cas d'utilisation : les données sont regroupées dans des fenêtres d'une minute et peuvent arriver en retard.

Si vous souhaitez voir un exemple de déclencheur, consultez la solution.

Partie 2 de l'atelier : Traiter des données mal formées

Selon la façon dont vous avez configuré votre déclencheur, si vous exécutez le pipeline maintenant et le comparez à celui de l'atelier précédent, vous remarquerez peut-être que le nouveau pipeline présente les résultats plus tôt. Il est également possible que ses résultats soient plus précis si les méthodes heuristiques n'ont pas permis de prédire le comportement du flux et que le retard autorisé est mieux adapté.

Cependant, bien que le pipeline actuel soit plus performant face aux retards, il reste vulnérable aux données mal formées. Si vous deviez exécuter le pipeline et publier un message contenant autre chose qu'une chaîne JSON bien formée pouvant être analysée dans un CommonLog, le pipeline générerait une erreur. Bien que des outils comme Cloud Logging facilitent la lecture de ces erreurs, un pipeline mieux conçu les stockera dans un emplacement prédéfini afin qu'elles puissent être examinées ultérieurement.

Dans cette section, vous allez ajouter des composants au pipeline pour le rendre plus modulaire et plus robuste.

Tâche 1 : Collecter les données mal formées

Pour être plus robuste face aux données mal formées, le pipeline doit pouvoir filtrer ces données et créer des branches pour les traiter différemment. Vous avez déjà vu une façon de créer une branche dans un pipeline : en utilisant une PCollection comme entrée pour plusieurs transformations.

Cette méthode de création de branches est très efficace. Cependant, elle s'avère inefficace dans certains cas d'utilisation. Supposons que vous souhaitiez créer deux sous-ensembles différents de la même PCollection. Avec la méthode des transformations multiples, il vous faudrait créer une transformation de filtrage pour chaque sous-ensemble, puis les appliquer toutes deux à la PCollection d'origine. Cependant, chaque élément serait traité deux fois.

Une autre méthode pour générer un pipeline avec des branches consiste à faire en sorte qu'une seule transformation produise plusieurs sorties tout en traitant la PCollection d'entrée une seule fois. Dans cette tâche, vous allez écrire une transformation qui produit plusieurs sorties : la première sera constituée des résultats issus des données correctement formées, et la seconde contiendra les éléments mal formés provenant du flux d'entrée d'origine.

Pour émettre plusieurs résultats tout en ne créant qu'une seule PCollection, Apache Beam utilise une classe appelée PCollectionTuple. Un PCollectionTuple est un tuple immuable de PCollection de types hétérogènes, "indexé" par TupleTag.

Voici un exemple d'instanciation d'un PCollectionTuple avec deux types de PCollection différents. Ces PCollection sont ensuite récupérées à l'aide de la méthode PCollectionTuple.get() :

PCollection<String> pc1 = ...; PCollection<Integer> pc2 = ...; TupleTag<String> tag1 = new TupleTag<>(); TupleTag<Integer> tag2 = new TupleTag<>(); PCollectionTuple pcs = PCollectionTuple.of(tag1, pc1) .and(tag2, pc2); PCollection<Integer> pcX = pcs.get(tag1); PCollection<String> pcY = pcs.get(tag2);

Pour utiliser cette méthode dans le contexte d'une PTransform, vous pouvez écrire du code comme dans l'exemple suivant, qui attribue un TupleTag à un élément en fonction de son contenu :

final TupleTag<String> aTag = new TupleTag<String>(){}; final TupleTag<String> bTag = new TupleTag<String>(){}; PCollectionTuple mixedCollection = input.apply(ParDo .of(new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext c) { if (c.element().startsWith("A")) { // Emit to main output, which is the output with tag aTag. c.output(c.element()); } else if(c.element().startsWith("B")) { // Emit to output with tag bTag. c.output(bTag, c.element()); } } }) // Specify main output. In this example, it is the output // with tag startsWithATag. .withOutputTags(aTag, // Specify the output with tag bTag, as a TupleTagList. TupleTagList.of(bTag))); // Get subset of the output with tag bTag. mixedCollection.get(aTag).apply(...); // Get subset of the output with tag startsWithBTag. mixedCollection.get(bTag).apply(...);
  • Pour réaliser cette tâche, vous devez déclarer deux constantes TupleTag en haut de la classe et modifier la transformation JsonToCommonLog pour qu'elle renvoie un PCollectionTuple et qu'elle tague les éléments non analysés avec un tag et les éléments analysés avec l'autre. Au lieu d'un bloc if/then/else, utilisez une instruction try/catch.

Tâche 2 : Rendre le code plus modulaire avec une transformation composite

Les transformations peuvent avoir une structure imbriquée, selon laquelle une transformation complexe effectue plusieurs transformations plus simples (comme plusieurs transformations ParDo, Combine, GroupByKey ou même d'autres transformations composites). Ces transformations sont appelées transformations composites. L'imbrication de plusieurs transformations au sein d'une même transformation composite peut rendre votre code plus modulaire et plus facile à comprendre.

  1. Pour créer votre propre transformation composite, générez une sous-classe de la classe PTransform et remplacez la méthode "expand" pour spécifier la logique de traitement réelle. Avec les paramètres de type de la classe PTransform, vous transmettez les types de PCollection que votre transformation exploite en tant qu'entrée et produit en tant que sortie.

L'exemple de code suivant montre comment déclarer une PTransform qui accepte une PCollection de chaînes String en tant qu'entrée et une PCollection d'entiers Integer :

#TODO: JsonToRow

static class MyCompositeTransform extends PTransform<PCollection<String>, PCollection<Integer>> { ... }
  1. Dans votre sous-classe PTransform, vous devez remplacer la méthode "expand". La méthode "expand" précise l'endroit où vous ajoutez la logique de traitement pour la PTransform. Le remplacement de la méthode "expand" signifie que vous devez accepter le type approprié de PCollection d'entrée en tant que paramètre et spécifier la PCollection de sortie en tant que valeur de renvoi.
static class MyCompositeTransform extends PTransform<PCollection<String>, PCollection<Integer>> { @Override public PCollection<Integer> expand(PCollection<String>) { ... // transform logic goes here ... } }
  1. Pour appeler la transformation, utilisez PCollection.apply() sur la PCollection et transmettez une instance de la transformation composite :
PCollection<Integer> i = stringPColl.apply("CompositeTransform", new MyCompositeTransform());
  1. Pour effectuer cette tâche, prenez la transformation JsonToCommonLog que vous venez de modifier et convertissez-la en transformation composite. Notez que cela va poser un problème avec la transformation d'écriture actuelle, qui attend des instances de CommonLog. Enregistrez les résultats de la transformation composite dans un nouveau PCollectionTuple et utilisez .get() pour récupérer la PCollection attendue par la transformation d'écriture.

Tâche 3 : Écrire les données mal formées pour analyse ultérieure

Pour résoudre le problème à l'origine des données mal formées en amont, il est important de pouvoir analyser ces données. Cela nécessite de les stocker quelque part. Dans cette tâche, vous allez écrire des données mal formées dans Google Cloud Storage. Ce modèle est appelé "stockage des lettres mortes".

Dans les ateliers précédents, vous écriviez directement à partir d'une source limitée (traitement par lot) dans Cloud Storage à l'aide de TextIO.write(). Toutefois, lorsque l'on écrit à partir d'une source illimitée (traitement par flux), cette approche doit être légèrement modifiée.

Tout d'abord, avant la transformation d'écriture, vous devez utiliser un déclencheur pour spécifier quand, selon le temps de traitement, écrire les données. Sinon, si vous conservez les valeurs par défaut, l'écriture ne se produira jamais. Par défaut, chaque événement appartient à la fenêtre globale. Lorsque vous traitez les données par lot, cela ne pose pas de problème, car l'ensemble de données complet est connu au moment de l'exécution. En revanche, avec des sources illimitées, la taille de l'ensemble de données complet est inconnue, et les volets de la fenêtre globale ne se déclenchent jamais, car ils ne se terminent jamais.

Comme vous utilisez un déclencheur, vous devez également utiliser une fonction Window. Toutefois, vous n'avez pas forcément besoin de changer de fenêtre. Dans les ateliers et tâches précédents, vous avez utilisé des transformations de fenêtrage pour remplacer la fenêtre globale par une fenêtre de durée fixe en fonction du temps des événements. Dans ce cas, ce n'est pas tant le regroupement des éléments qui importe, mais plutôt que les résultats soient produits de manière utile et à un rythme approprié.

Dans l'exemple ci-dessous, la fenêtre déclenche le volet de fenêtre globale toutes les 10 secondes en fonction du temps de traitement, mais elle écrit uniquement les nouveaux événements :

pCollection.apply("FireEvery10s", Window.<String>configure() .triggering(Repeatedly.forever( AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardSeconds(10)))) .discardingFiredPanes())

Une fois que vous avez défini un déclencheur, vous devez modifier l'appel à TextIO.write() pour effectuer les écritures. Lorsque vous écrivez des données en aval d'une transformation de fenêtrage, enchaînez un appel à withWindowedWrites() et spécifiez un certain nombre de fragments pour que l'écriture puisse se faire en parallèle :

fixedWindowedItems.apply( "WriteWindowedPCollection", TextIO .write() .to("gs://path/to/somewhere") .withWindowedWrites() .withNumShards(NUM_SHARDS));
  • Pour effectuer cette tâche, créez une transformation à l'aide de .get() sur le PCollectionTuple afin de récupérer les données mal formées. En vous basant sur ce que vous savez des déclencheurs, définissez des conditions de déclenchement appropriées pour ce déclencheur.

Tâche 4 : Exécuter le pipeline

  1. Pour exécuter votre pipeline, créez une commande semblable à l'exemple ci-dessous. Notez que vous devrez la modifier pour qu'elle indique les noms des options de ligne de commande que vous avez définies.
export PROJECT_ID=$(gcloud config get-value project) export REGION={{{ project_0.default_region | "REGION" }}} export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export MAIN_CLASS_NAME=com.mypackage.pipeline.StreamingMinuteTrafficPipeline export RUNNER=DataflowRunner export PUBSUB_TOPIC=projects/${PROJECT_ID}/topics/my_topic export WINDOW_DURATION=60 export ALLOWED_LATENESS=1 export OUTPUT_TABLE_NAME=${PROJECT_ID}:logs.minute_traffic export DEADLETTER_BUCKET=${BUCKET} cd $BASE_DIR mvn compile exec:java \ -Dexec.mainClass=${MAIN_CLASS_NAME} \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=${PROJECT_ID} \ --region=${REGION} \ --stagingLocation=${PIPELINE_FOLDER}/staging \ --tempLocation=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --inputTopic=${PUBSUB_TOPIC} \ --windowDuration=${WINDOW_DURATION} \ --allowedLateness=${ALLOWED_LATENESS} \ --outputTableName=${OUTPUT_TABLE_NAME} \ --deadletterBucket=${DEADLETTER_BUCKET}"

Le code de cette quête inclut un script permettant de publier des événements JSON à l'aide de Pub/Sub.

  1. Pour effectuer cette tâche et commencer à publier des messages, ouvrez un nouveau terminal à côté de celui que vous utilisez actuellement et exécutez le script suivant. Il continuera à publier des messages jusqu'à ce que vous arrêtiez le script. Assurez-vous de vous trouver dans le dossier training-data-analyst/quests/dataflow.
Remarque : L'option true ajoute les événements en retard au flux. bash generate_streaming_events.sh true

Cliquez sur Vérifier ma progression pour valider l'objectif. Exécuter le pipeline

Tâche 5 : Tester votre pipeline

  1. Dans la barre de titre de la console Google Cloud, saisissez Pub/Sub dans le champ Recherche, puis cliquez sur Pub/Sub dans la section Produits et pages.

  2. Cliquez sur Sujets, puis sur le sujet my_topic.

  3. Cliquez sur l'onglet Messages, puis sur le bouton Publier un message.

  4. Sur la page suivante, saisissez un message à publier.

Si le message ne respecte pas parfaitement la spécification JSON de CommonLog, il devrait rapidement apparaître dans le bucket de lettres mortes Cloud Storage. Vous pouvez suivre son parcours dans le pipeline en revenant à la fenêtre de surveillance du pipeline et en cliquant sur un nœud de la branche chargée de traiter les messages non analysés.

  1. Une fois qu'un élément a été ajouté à cette branche, vous pouvez accéder à Cloud Storage pour vérifier que le message a bien été écrit sur le disque :
export PROJECT_ID=$(gcloud config get-value project) export REGION={{{ project_0.default_region | "REGION" }}} export BUCKET=gs://${PROJECT_ID}/deadletter gsutil ls $BUCKET gsutil cat $BUCKET/*/*

Cliquez sur Vérifier ma progression pour valider l'objectif. Tester votre pipeline

Terminer l'atelier

Une fois l'atelier terminé, cliquez sur Terminer l'atelier. Google Cloud Skills Boost supprime les ressources que vous avez utilisées, puis efface le compte.

Si vous le souhaitez, vous pouvez noter l'atelier. Sélectionnez un nombre d'étoiles, saisissez un commentaire, puis cliquez sur Envoyer.

Le nombre d'étoiles correspond à votre degré de satisfaction :

  • 1 étoile = très insatisfait(e)
  • 2 étoiles = insatisfait(e)
  • 3 étoiles = ni insatisfait(e), ni satisfait(e)
  • 4 étoiles = satisfait(e)
  • 5 étoiles = très satisfait(e)

Si vous ne souhaitez pas donner votre avis, vous pouvez fermer la boîte de dialogue.

Pour soumettre des commentaires, suggestions ou corrections, veuillez accéder à l'onglet Assistance.

Copyright 2020 Google LLC Tous droits réservés. Google et le logo Google sont des marques de Google LLC. Tous les autres noms d'entreprises et de produits peuvent être des marques des entreprises auxquelles ils sont associés.

Avant de commencer

  1. Les ateliers créent un projet Google Cloud et des ressources pour une durée déterminée.
  2. Les ateliers doivent être effectués dans le délai imparti et ne peuvent pas être mis en pause. Si vous quittez l'atelier, vous devrez le recommencer depuis le début.
  3. En haut à gauche de l'écran, cliquez sur Démarrer l'atelier pour commencer.

Utilisez la navigation privée

  1. Copiez le nom d'utilisateur et le mot de passe fournis pour l'atelier
  2. Cliquez sur Ouvrir la console en navigation privée

Connectez-vous à la console

  1. Connectez-vous à l'aide des identifiants qui vous ont été attribués pour l'atelier. L'utilisation d'autres identifiants peut entraîner des erreurs ou des frais.
  2. Acceptez les conditions d'utilisation et ignorez la page concernant les ressources de récupération des données.
  3. Ne cliquez pas sur Terminer l'atelier, à moins que vous n'ayez terminé l'atelier ou que vous ne vouliez le recommencer, car cela effacera votre travail et supprimera le projet.

Ce contenu n'est pas disponible pour le moment

Nous vous préviendrons par e-mail lorsqu'il sera disponible

Parfait !

Nous vous contacterons par e-mail s'il devient disponible

Un atelier à la fois

Confirmez pour mettre fin à tous les ateliers existants et démarrer celui-ci

Utilisez la navigation privée pour effectuer l'atelier

Ouvrez une fenêtre de navigateur en mode navigation privée pour effectuer cet atelier. Vous éviterez ainsi les conflits entre votre compte personnel et le compte temporaire de participant, qui pourraient entraîner des frais supplémentaires facturés sur votre compte personnel.