arrow_back

Traitement de données sans serveur avec Dataflow : tests avec Apache Beam (Java)

Accédez à plus de 700 ateliers et cours

Traitement de données sans serveur avec Dataflow : tests avec Apache Beam (Java)

Atelier 1 heure 30 minutes universal_currency_alt 7 crédits show_chart Avancé
info Cet atelier peut intégrer des outils d'IA pour vous accompagner dans votre apprentissage.
Accédez à plus de 700 ateliers et cours

Présentation

Au cours de cet atelier, vous allez :

  • Écrire des tests unitaires pour les DoFn et les PTransform à l'aide des outils de test d'Apache Beam
  • Effectuer un test d'intégration du pipeline
  • Utiliser la classe TestStream pour tester le comportement de fenêtrage d'un pipeline de traitement en flux continu

Le test de votre pipeline constitue une étape particulièrement importante dans le développement d'une solution de traitement de données efficace. Du fait de la nature indirecte du modèle Beam, déboguer des échecs d'exécution peut s'avérer complexe.

Dans cet atelier, nous allons voir comment effectuer des tests unitaires en local avec les outils du package testing du SDK Beam, à l'aide de DirectRunner.

Préparation

Pour chaque atelier, nous vous attribuons un nouveau projet Google Cloud et un nouvel ensemble de ressources pour une durée déterminée, sans frais.

  1. Connectez-vous à Qwiklabs dans une fenêtre de navigation privée.

  2. Vérifiez le temps imparti pour l'atelier (par exemple : 01:15:00) : vous devez pouvoir le terminer dans ce délai.
    Une fois l'atelier lancé, vous ne pouvez pas le mettre en pause. Si nécessaire, vous pourrez le redémarrer, mais vous devrez tout reprendre depuis le début.

  3. Lorsque vous êtes prêt, cliquez sur Démarrer l'atelier.

  4. Notez vos identifiants pour l'atelier (Nom d'utilisateur et Mot de passe). Ils vous serviront à vous connecter à Google Cloud Console.

  5. Cliquez sur Ouvrir la console Google.

  6. Cliquez sur Utiliser un autre compte, puis copiez-collez les identifiants de cet atelier lorsque vous y êtes invité.
    Si vous utilisez d'autres identifiants, des messages d'erreur s'afficheront ou des frais seront appliqués.

  7. Acceptez les conditions d'utilisation et ignorez la page concernant les ressources de récupération des données.

Vérifier les autorisations du projet

Avant de commencer à travailler dans Google Cloud, vous devez vous assurer de disposer des autorisations adéquates pour votre projet dans IAM (Identity and Access Management).

  1. Dans la console Google Cloud, accédez au menu de navigation (Icône du menu de navigation), puis sélectionnez IAM et administration > IAM.

  2. Vérifiez que le compte de service Compute par défaut {project-number}-compute@developer.gserviceaccount.com existe et qu'il est associé au rôle editor (éditeur). Le préfixe du compte correspond au numéro du projet, disponible sur cette page : Menu de navigation > Présentation du cloud > Tableau de bord.

État de l'éditeur et nom du compte de service Compute Engine par défaut mis en évidence sur l'onglet "Autorisations"

Remarque : Si le compte n'est pas disponible dans IAM ou n'est pas associé au rôle editor (éditeur), procédez comme suit pour lui attribuer le rôle approprié.
  1. Dans la console Google Cloud, accédez au menu de navigation et cliquez sur Présentation du cloud > Tableau de bord.
  2. Copiez le numéro du projet (par exemple, 729328892908).
  3. Dans le menu de navigation, sélectionnez IAM et administration > IAM.
  4. Sous Afficher par compte principal, en haut de la table des rôles, cliquez sur Accorder l'accès.
  5. Dans le champ Nouveaux comptes principaux, saisissez :
{project-number}-compute@developer.gserviceaccount.com
  1. Remplacez {project-number} par le numéro de votre projet.
  2. Dans le champ Rôle, sélectionnez Projet (ou Basique) > Éditeur.
  3. Cliquez sur Enregistrer.

Configurer votre IDE

Dans cet atelier, vous utiliserez principalement un IDE Web Theia hébergé sur Google Compute Engine. Le dépôt de l'atelier y est précloné. L'IDE prend en charge les serveurs au langage Java et comprend un terminal permettant l'accès programmatique aux API Google Cloud via l'outil de ligne de commande gcloud, comme avec Cloud Shell.

  1. Pour accéder à votre IDE Theia, copiez le lien affiché dans Google Cloud Skills Boost et collez-le dans un nouvel onglet.
Remarque : Le provisionnement complet de l'environnement peut prendre entre trois et cinq minutes, même après l'affichage de l'URL. En attendant, le navigateur indiquera une erreur.

Volet des identifiants affichant l'URL ide_url

Le dépôt de l'atelier a été cloné dans votre environnement. Chaque atelier est divisé en deux dossiers : le premier, intitulé labs, contient du code que vous devez compléter, tandis que le second, nommé solution, comporte un exemple opérationnel que vous pouvez consulter si vous rencontrez des difficultés.

  1. Cliquez sur le bouton Explorateur de fichiers pour y accéder :

Menu de l'explorateur de fichiers développé, avec le dossier "Labs" mis en évidence

Vous pouvez également créer plusieurs terminaux dans cet environnement, comme vous le feriez avec Cloud Shell :

Option "Nouveau terminal" mise en évidence dans le menu "Terminal"

Vous pouvez exécuter la commande gcloud auth list dans le terminal pour vérifier que vous êtes connecté avec un compte de service fourni et que vous disposez donc des mêmes autorisations qu'avec votre compte utilisateur pour l'atelier :

Terminal dans lequel est utilisée la commande gcloud auth list

Si votre environnement cesse de fonctionner, vous pouvez essayer de réinitialiser la VM hébergeant votre IDE depuis la console GCE. Pour cela, procédez comme suit :

Bouton "Réinitialiser" et nom de l'instance de VM mis en évidence sur la page "Instances de VM"

Le code de l'atelier est réparti dans deux dossiers : 8a_Batch_Testing_Pipeline/lab et 8b_Stream_Testing_Pipeline/lab. Si vous rencontrez des difficultés, vous trouverez la solution dans les dossiers solution correspondants.

Partie 1 de l'atelier : Effectuer des tests unitaires pour les DoFn et les PTransform

Dans cette partie de l'atelier, nous allons effectuer des tests unitaires sur les DoFn et les PTransform d'un pipeline par lot qui calcule des statistiques à partir de données de capteurs météorologiques. Pour tester les transformations que vous avez créées, vous pouvez utiliser ce modèle et ces transformations fournis par Beam :

  • Créez un objet TestPipeline.
  • Créez des données d'entrée de test et utilisez la transformation Create pour créer une PCollection de vos données d'entrée.
  • Appliquez votre transformation à la PCollection d'entrée et enregistrez la PCollection qui en résulte.
  • Utilisez DataflowAssert et ses sous-classes pour vérifier que les résultats de PCollection contiennent les éléments attendus.

TestPipeline est une classe spéciale incluse dans le SDK Beam, qui permet de tester vos transformations et la logique de votre pipeline.

  • Pour les tests, utilisez TestPipeline au lieu de Pipeline lorsque vous créez l'objet de pipeline :
TestPipeline p = TestPipeline.create();

La transformation Create prend une collection d'objets en mémoire (un itérable Java) et crée une PCollection à partir de cette collection. L'objectif est de disposer d'un petit ensemble de données d'entrée de test, pour lesquelles nous connaissons le résultat attendu de PCollection, à partir de nos PTransforms.

List<String> input = Arrays.asList(testInput); // Some code to create a TestPipeline p outputPColl = p.apply(Create.of(input).apply(...);

Enfin, nous voulons vérifier que le résultat de PCollection correspond à celui attendu. Nous utilisons la classe PAssert pour le vérifier. Par exemple, nous pouvons utiliser la méthode containsInAnyOrder pour vérifier que le résultat de PCollection contient les bons éléments :

PAssert.that(outputPColl).containsInAnyOrder(expectedOutput);

Tâche 1 : Explorer le code du pipeline principal

  1. Dans votre IDE, accédez à 8a_Batch_Testing_Pipeline/lab.

Ce répertoire contient un fichier pom.xml pour définir les dépendances, ainsi que le dossier src, qui contient deux sous-répertoires. Le dossier src/main contient le code du package de pipeline, et le dossier src/test contiendra notre code de test.

  1. Ouvrez d'abord 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherRecord.java.

Ce fichier contient la définition de la classe WeatherRecord que nous allons utiliser dans notre pipeline. La classe WeatherRecord est associée à un schéma, et les étapes de définition du schéma à l'aide de l'annotation @DefaultSchema devraient vous être familières. Toutefois, notez que nous devons remplacer la méthode equals lors de la définition de la classe.

@Override public boolean equals(final Object obj){ if(obj instanceof WeatherRecord){ final WeatherRecord other = (WeatherRecord) obj; return (locId.equals(other.locId)) && (Double.compare(lat, other.lat) == 0) && (Double.compare(lng, other.lng) == 0) && (date.equals(other.date)) && (Double.compare(lowTemp, other.lowTemp) == 0) && (Double.compare(highTemp, other.highTemp) == 0) && (Double.compare(precip, other.precip) == 0); } else{ return false; } }

Pourquoi ? PAssert utilise la méthode equals pour vérifier l'appartenance dans le résultat de PCollection. Toutefois, la méthode equals par défaut pour un POJO (Plain Old Java Object) compare uniquement les adresses des objets. Alors que nous, c'est le contenu des objets que nous souhaitons comparer. C'est très simple, comme le montre l'exemple ci-dessus.

  1. Ouvrez 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherStatisticsPipeline.java.

Il s'agit du code principal de notre pipeline. Les concepts de ce pipeline ont pour la plupart été abordés dans les ateliers précédents, mais veillez à explorer plus en détail les éléments suivants :

  • Les fonctions DoFn ConvertCsvToWeatherRecord (à partir de la ligne 65) et ConvertTempUnits (à partir de la ligne 81). Nous effectuerons des tests unitaires sur ces DoFn ultérieurement.
  • La PTransform ComputeStatistics (à partir de la ligne 103). Il s'agit d'un exemple de transformation composite que nous pourrons tester de la même manière qu'une DoFn.
  • La PTransform WeatherStatsTransform (à partir de la ligne 123). Cette PTransform contient la logique de traitement de l'ensemble de notre pipeline (moins les transformations de source et de récepteur), ce qui nous permet d'effectuer un petit test d'intégration de pipeline sur des données synthétiques créées par une transformation Create.

Si vous remarquez une erreur logique dans le code de traitement, ne la corrigez pas tout de suite. Nous verrons plus tard comment repérer l'erreur grâce aux tests.

Tâche 2 : Ajouter des dépendances pour les tests

  1. Ouvrez maintenant le fichier 8a_Batch_Testing_Pipeline/lab/pom.xml.

Nous devons ajouter quelques dépendances pour les tests. Tout code Java Beam destiné aux tests doit être lié à JUnit et Hamcrest. Dans Maven, il suffit de mettre à jour le fichier pom.xml.

  1. Pour effectuer cette tâche, copiez le code XML suivant et collez-le dans le fichier pom.xml à l'endroit indiqué dans un commentaire :
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-core</artifactId> <version>2.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.hamcrest</groupId> <artifactId>hamcrest-library</artifactId> <version>2.1</version> <scope>test</scope> </dependency>

Notez que le champ d'application de ces dépendances est "test". Nous aurons besoin de ces packages lorsque nous exécuterons un test avec mvn test, mais pas lors de l'exécution du pipeline principal.

Tâche 3 : Écrire le premier test unitaire DoFn dans Apache Beam

  1. Accédez à 8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java.

Ce fichier contient le code de nos tests unitaires DoFn et PTransform. La majeure partie du code est actuellement mise en commentaire, mais nous allons la faire passer en code au fur et à mesure.

Nous allons commencer par examiner un test unitaire DoFn pour notre DoFn ConvertCsvToWeatherRecord (à partir de la ligne 43).

  1. Nous créons d'abord une classe pour tester notre pipeline et créons un objet TestPipeline :
@RunWith(JUnit4.class) public class WeatherStatisticsPipelineTest { @Rule public final transient TestPipeline p = TestPipeline.create();

Nous allons utiliser cet objet TestPipeline dans tous les tests suivants. Nous n'avons pas à nous soucier des effets secondaires liés à la réutilisation d'un même objet, car nous avons utilisé le mot clé transient lors de sa création.

  1. Examinons à présent le code (incomplet) de notre premier test :
@Test @Category(NeedsRunner.class) public void testConvertCsvToWeatherRecord() throws Exception { String testInput = "x,31.4,-39.2,2/2/21,4.0,7.5,0.1"; List<String> input = Arrays.asList(testInput); PCollection<WeatherRecord> output = p.apply(/* Create PCollection from in-memory object */) .apply(ParDo.of(new ConvertCsvToWeatherRecord())); WeatherRecord testOutput = new WeatherRecord("x", 31.4, -39.2, "2/2/21", 4.0, 7.5, 0.1); // Include PAssert statement to check for correct results p.run().waitUntilFinish(); }

Nous annotons la méthode que nous allons utiliser pour tester notre pipeline avec @Test. Nous créons une seule entrée de test (testInput) représentant une ligne d'un fichier CSV (le format d'entrée attendu pour notre pipeline) et la plaçons dans une entrée d'objet List.

Il manque quelques éléments dans le reste du code du test.

  1. Pour effectuer cette tâche, commencez par ajouter la transformation Create pour convertir l'entrée en PCollection.

  2. Ensuite, incluez une instruction PAssert à l'aide de la méthode containsInAnyOrder pour comparer l'entrée avec testOutput.

Si vous rencontrez des difficultés, vous pouvez vous reporter aux tests commentés ultérieurs ou aux solutions.

Tâche 4 : Exécuter le premier test unitaire DoFn

  1. Si ce n'est pas déjà fait, créez un terminal dans votre environnement IDE, puis copiez et collez la commande suivante :
# Change directory into the lab cd 8a_Batch_Testing_Pipeline/lab # Download dependencies mvn clean dependency:resolve export BASE_DIR=$(pwd)

Nous sommes maintenant prêts à lancer notre test.

  1. Pour ce faire, exécutez simplement la commande suivante dans votre terminal :
mvn test

Si vous avez correctement effectué la tâche précédente, vous devriez voir ce qui suit dans votre terminal une fois le test terminé (le temps écoulé exact sera différent) :

[INFO] ------------------------------------------------------- [INFO] T E S T S [INFO] ------------------------------------------------------- [INFO] Running com.mypackage.pipeline.WeatherStatisticsPipelineTest [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 5.479 s - in com.mypackage.pipeline.WeatherStatisticsPipelineTest [INFO] [INFO] Results: [INFO] [INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0 [INFO]

Tâche 5 : Exécuter le deuxième test unitaire DoFn et déboguer le pipeline

  1. Accédez à 8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java et annulez la mise en commentaire du code pour le deuxième test unitaire (vers les lignes 67-80). Pour ce faire, sélectionnez le code et appuyez sur Ctrl + / (ou Cmd + / sur macOS). Le code est présenté ci-dessous à titre de référence :
@Test @Category(NeedsRunner.class) public void testConvertTempUnits() throws Exception { WeatherRecord testInput = new WeatherRecord("x", 31.4, -39.2, "2/2/21", 4.0, 7.5, 0.1); List<WeatherRecord> input = Arrays.asList(testInput); PCollection<WeatherRecord> output = p.apply(Create.of(input)) .apply(ParDo.of(new ConvertTempUnits())); WeatherRecord testOutput = new WeatherRecord("x", 31.4, -39.2, "2/2/21", 39.2, 45.5, 0.1); PAssert.that(output).containsInAnyOrder(testOutput); p.run().waitUntilFinish(); }

Ce test permet de s'assurer que la fonction DoFn ConvertTempUnits() fonctionne comme prévu.

  1. Enregistrez WeatherStatisticsPipelineTest.java et revenez à votre terminal.

  2. Exécutez à nouveau la commande suivante pour lancer les tests :

mvn test

Cette fois, le test échoue. Si nous parcourons le résultat, nous pouvons trouver les informations suivantes sur l'échec du test :

[ERROR] Failures: [ERROR] WeatherStatisticsPipelineTest.testConvertTempUnits:76 ParDo(ConvertTempUnits)/ParMultiDo(ConvertTempUnits).output: Expected: iterable with items [<com.mypackage.pipeline.WeatherRecord@e3daa587>] in any order but: not matched: <com.mypackage.pipeline.WeatherRecord@e3cb2587>

À première vue, ce message d'erreur ne semble pas très utile. Cependant, nous pouvons voir que l'objet WeatherRecord attendu dans testOutput n'a pas été trouvé. Peut-être que la conversion de température n'a pas été effectuée correctement.

  1. Revenez à 8a_Batch_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > WeatherStatisticsPipeline.java et faites défiler la page jusqu'à la définition de ConvertTempUnits (vers la ligne 81).

  2. Pour effectuer cette tâche, identifiez l'erreur dans la logique de traitement DoFn et réexécutez la commande mvn test pour vérifier que le test réussit. Pour rappel, voici la formule de conversion des degrés Celsius en degrés Fahrenheit :

tempF = tempC * 1.8 + 32.0

Si vous rencontrez des difficultés, vous pouvez consulter les solutions.

Tâche 6 : Exécuter un test unitaire de PTransform et tester le pipeline de bout en bout

  1. Revenez à 8a_Batch_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > WeatherStatisticsPipelineTest.java et annulez la mise en commentaire du code pour les deux derniers tests (à partir de la ligne 84 environ).

Le premier test que nous venons de passer en code teste la PTransform composite ComputeStatistics. Une version tronquée du code est présentée ci-dessous à titre de référence :

@Test @Category(NeedsRunner.class) public void testComputeStatistics() throws Exception { WeatherRecord[] testInputs = new WeatherRecord[3]; //Define Testing Inputs (Omitted here) List<WeatherRecord> input = Arrays.asList(testInputs); PCollection<String> output = p.apply(Create.of(input)) .apply(new ComputeStatistics()); String testOutputs[] = new String[]{"[\"x\",34.2,45.5,0.4]", "[\"y\",72.5,82.5,0.5]"}; PAssert.that(output).containsInAnyOrder(testOutputs); p.run().waitUntilFinish(); }

Vous remarquerez que cela ressemble beaucoup aux tests unitaires DoFn que nous avons effectués précédemment. La seule différence réelle (en dehors des entrées et résultats de test) est que nous appliquons la PTransform plutôt que ParDo(new DoFn()).

Le dernier test concerne le pipeline de bout en bout. Dans le code du pipeline (WeatherStatisticsPipeline.java), l'intégralité du pipeline de bout en bout, à l'exception de la source et du récepteur, a été incluse dans une seule PTransform WeatherStatsTransform.

  1. Pour tester le pipeline de bout en bout, nous pouvons répéter une opération semblable à celle que nous avons effectuée ci-dessus, mais en utilisant cette PTransform :
@Test @Category(NeedsRunner.class) public void testWeatherStatsTransform() throws Exception { String[] testInputs = new String[] //Define Testing Inputs (Omitted here) List<String> input = Arrays.asList(testInputs); PCollection<String> output = p.apply(Create.of(input)) .apply(new WeatherStatsTransform()); String testOutputs[] = new String[]{"[\"x\",38.3,45.5,0.4]", "[\"y\",54.5,63.5,0.5]"}; PAssert.that(output).containsInAnyOrder(testOutputs); p.run().waitUntilFinish(); }
  1. Revenez à votre terminal et exécutez la commande suivante pour lancer les tests une nouvelle fois :
mvn test

Si vous avez correctement effectué les tâches précédentes, vous devriez voir ce qui suit dans le terminal une fois les tests terminés :

[INFO] ------------------------------------------------------- [INFO] T E S T S [INFO] ------------------------------------------------------- [INFO] Running com.mypackage.pipeline.WeatherStatisticsPipelineTest [INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 13.602 s - in com.mypackage.pipeline.WeatherStatisticsPipelineTest [INFO] [INFO] Results: [INFO] [INFO] Tests run: 4, Failures: 0, Errors: 0, Skipped: 0 [INFO]

Cliquez sur Vérifier ma progression pour valider l'objectif. Effectuer des tests unitaires pour les DoFn et les PTransform

Partie 2 de l'atelier : Tester la logique de traitement par flux avec TestStream

Dans cette partie de l'atelier, nous allons effectuer des tests unitaires pour un pipeline de traitement par flux qui calcule le nombre de courses de taxi par fenêtre. Pour tester les transformations que vous avez créées, vous pouvez utiliser ce modèle et ces transformations fournis par Beam :

  • Créez un objet TestPipeline.
  • Utilisez la classe TestStream pour générer un flux de données. Cela inclut la génération d'une série d'événements, l'avancement de la watermark et l'avancement du temps de traitement.
  • Utilisez PAssert et ses sous-classes pour vérifier que le résultat de PCollection contient les éléments attendus dans des fenêtres spécifiques.

Lorsqu'un pipeline qui lit à partir d'un TestStream est exécuté, la lecture attend que toutes les conséquences de chaque événement soient terminées avant de passer à l'événement suivant, y compris lorsque le temps de traitement avance et que les déclencheurs appropriés se déclenchent. TestStream permet d'observer et de tester l'effet du déclenchement et de la tolérance au retard sur un pipeline. Cela inclut la logique concernant les déclencheurs tardifs et les données perdues en raison de leur retard.

Tâche 1 : Explorer le code du pipeline principal

  1. Dans votre IDE, accédez à 8b_Stream_Testing_Pipeline/lab.

Ce répertoire contient un fichier pom.xml pour définir les dépendances, ainsi que le dossier src, qui contient deux sous-répertoires. Le dossier src/main contient le code du package de pipeline, et le dossier src/test contiendra notre code de test.

  1. Commencez par ouvrir 8b_Stream_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > TaxiRide.java.

Ce fichier contient la définition de la classe TaxiRide que nous allons utiliser dans notre pipeline. La classe TaxiRide est associée à un schéma, et les étapes de définition du schéma à l'aide de l'annotation @DefaultSchema devraient vous être familières.

  1. Ouvrez 8b_Stream_Testing_Pipeline > lab > src > main > java > com > mypackage > pipeline > TaxiStreamingPipeline.java.

Il s'agit du code principal de notre pipeline. Les concepts de ce pipeline ont pour la plupart été abordés dans les ateliers précédents, mais veillez à explorer plus en détail les éléments suivants :

  • Le DoFn JsonToTaxiRide (à partir de la ligne 94) utilisé pour convertir les messages Pub/Sub entrants en objets de la classe TaxiRide.
  • La PTransform TaxiCountTransform (à partir de la ligne 113). Cette PTransform contient la logique principale de comptage et de fenêtrage du pipeline. Nos tests porteront sur cette PTransform.

Le résultat de TaxiCountTransform doit être un décompte de toutes les courses de taxi enregistrées par fenêtre. Cependant, nous aurons plusieurs événements par trajet (prise en charge, dépose, etc.).

  1. Nous allons filtrer les données sur la propriété ride_status pour nous assurer que chaque course n'est comptabilisée qu'une seule fois. Pour cela, nous ne conservons que les éléments dont la valeur de ride_status est "pickup" (prise en charge) :
.apply("FilterForPickups", Filter.<TaxiRide>create().whereFieldName("ride_status", status -> "pickup".equals(status)))

Voici la logique de fenêtrage utilisée dans notre pipeline :

.apply("WindowByMinute", Window.<TaxiRide>into(FixedWindows.of(Duration.standardSeconds(60))) .triggering(AfterWatermark.pastEndOfWindow() .withLateFirings(AfterProcessingTime.pastFirstElementInPane())) .withAllowedLateness(Duration.standardMinutes(1)) .accumulatingFiredPanes())

Nous allons créer des fenêtres fixes d'une durée de 60 secondes. Nous n'avons pas de déclencheur anticipé, mais nous émettrons les résultats une fois que la watermark aura dépassé la fin de la fenêtre. Nous incluons les déclenchements tardifs avec chaque nouvel élément entrant, mais uniquement avec une tolérance de retard d'une minute. Enfin, nous allons accumuler l'état dans des fenêtres jusqu'à ce que le délai de retard autorisé soit dépassé.

Tâche 2 : Explorer l'utilisation de TestStream et exécuter le premier test

  1. Ouvrez 8b_Stream_Testing_Pipeline > lab > src > test > java > com > mypackage > pipeline > TaxiStreamingPipelineTest.java.

Le premier objectif sera de comprendre l'utilisation de TestStream dans notre code de test. Rappelez-vous que la classe TestStream nous permet de simuler un flux de messages en temps réel tout en contrôlant la progression du temps de traitement et la watermark.

Le code du premier test (à partir de la ligne 66) est inclus ci-dessous :

TestStream<String> createEvents = TestStream.create(StringUtf8Coder.of()) .advanceWatermarkTo(startTime) .addElements( TimestampedValue.of(json.format(json, "pickup"), startTime), TimestampedValue.of(json.format(json, "enroute"), startTime), TimestampedValue.of(json.format(json, "pickup"), startTime)) .addElements( TimestampedValue.of(json.format(json, "pickup"), startTime.plus(Duration.standardMinutes(1)))) .advanceWatermarkTo(startTime.plus(Duration.standardMinutes(1))) .addElements( TimestampedValue.of(json.format(json, "pickup"), startTime.plus(Duration.standardMinutes(2)))) .advanceWatermarkToInfinity();

Nous créons un TestStream à l'aide de la méthode create, tout en spécifiant l'encodeur. Nous allons transmettre le message JSON sous forme de chaîne pour pouvoir utiliser StringUtf8Coder. Que fait le flux TestStream ci-dessus ?

Le flux TestStream effectue les tâches suivantes :

  • Il définit la watermark initiale sur la variable startTime (Instant(0)).
  • Il ajoute trois éléments à la chaîne avec un code temporel d'événement de startTime. Deux de ces événements seront comptabilisés (ride_status = "pickup"), mais pas l'autre.
  • Il ajoute un autre événement "pickup", mais avec un code temporel d'événement d'une minute après startTime.
  • Il fait avancer la watermark d'une minute après startTime pour déclencher la première fenêtre.
  • Il ajoute un autre événement "pickup", mais avec un code temporel d'événement de deux minutes après startTime.
  • Il fait avancer la watermark jusqu'à "infini". Cela signifie que toutes les fenêtres seront désormais fermées et que toutes les nouvelles données dépasseront la limite de retard autorisée.
  1. Le reste du code du premier test est semblable à l'exemple de traitement par lot précédent, mais nous utilisons désormais TestStream au lieu de la transformation Create :
PCollection<Long> outputCount = p .apply(createEvents) .apply(new TaxiCountTransform()); IntervalWindow window1 = new IntervalWindow(startTime, startTime.plus(Duration.standardMinutes(1))); IntervalWindow window2 = new IntervalWindow(startTime.plus(Duration.standardMinutes(1)), startTime.plus(Duration.standardMinutes(2))); IntervalWindow window3 = new IntervalWindow(startTime.plus(Duration.standardMinutes(2)), startTime.plus(Duration.standardMinutes(3))); PAssert.that(outputCount).inWindow(window1).containsInAnyOrder(2L); PAssert.that(outputCount).inWindow(window2).containsInAnyOrder(1L); PAssert.that(outputCount).inWindow(window3).containsInAnyOrder(1L); p.run().waitUntilFinish();

Dans le code ci-dessus, nous définissons le résultat de PCollection (outputCount) en créant le TestStream et en appliquant la PTransform TaxiCountTransform. Nous utilisons la classe InvervalWindow pour définir les fenêtres que nous souhaitons vérifier, puis nous utilisons PAssert avec la méthode inWindow pour vérifier les résultats par fenêtre.

  1. Revenez au terminal de votre IDE (ou ouvrez un nouveau terminal) et exécutez les commandes suivantes pour vous déplacer vers le bon répertoire et installer les dépendances :
# Change directory into the lab cd $BASE_DIR/../../8b_Stream_Testing_Pipeline/lab # Download dependencies mvn clean dependency:resolve export BASE_DIR=$(pwd)
  1. Lancez maintenant le test ci-dessus à l'aide de la commande suivante :
mvn test

Le résultat suivant doit s'afficher après le test (le temps écoulé peut varier) :

[INFO] Tests run: 1, Failures: 0, Errors: 0, Skipped: 0 [INFO] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 31.629 s [INFO] Finished at: 2021-05-13T12:24:20-04:00 [INFO] ------------------------------------------------------------------------

Tâche 3 : Créer TestStream pour tester la gestion des données tardives

Dans cette tâche, vous allez écrire du code pour un TestStream afin de tester la logique de gestion des données tardives.

  1. Revenez à 8b_Stream_Testing_Pipeline/lab/src/test/java/com/mypackage/pipeline/TaxiStreamingPipelineTest.java et faites défiler la page jusqu'à la méthode testTaxiRideLateData, qui est mise en commentaire (vers la ligne 104).

  2. Annulez la mise en commentaire du code pour ce test, car nous allons l'exécuter :

@Test @Category(NeedsRunner.class) public void testTaxiRideLateData() throws Exception { Instant startTime = new Instant(0); String json = "{\"ride_id\":\"x\",\"point_idx\":1,\"latitude\":0.0," + "\"longitude\":0.0,\"timestamp\":\"00:00:00\",\"meter_reading\":1.0," + "\"meter_increment\":0.1,\"ride_status\":\"%s\",\"passenger_count\":1}"; TestStream<String> createEvents = /* CreateTestStream */ PCollection<Long> outputCount = p .apply(createEvents) .apply(new TaxiCountTransform()); IntervalWindow window1 = new IntervalWindow(startTime, startTime.plus(Duration.standardMinutes(1))); PAssert.that(outputCount).inOnTimePane(window1).containsInAnyOrder(2L); PAssert.that(outputCount).inFinalPane(window1).containsInAnyOrder(3L); p.run().waitUntilFinish(); }

Le code du test est complet, à l'exception de la création de TestStream.

  1. Pour effectuer cette tâche, créez un objet TestStream qui effectue les tâches suivantes :
  • Il fait avancer la watermark jusqu'à startTime.
  • Il ajoute deux TimestampedValue avec la valeur json.format(json, "pickup") et le code temporel startTime.
  • Il fait avancer la watermark à une minute après startTime
  • Il ajoute une autre TimestamedValue avec la valeur json.format(json, "pickup") et le code temporel startTime.
  • Il fait avancer la watermark à deux minutes après startTime.
  • Il ajoute une autre TimestamedValue avec la valeur json.format(json, "pickup") et le code temporel startTime.
  • Il fait avancer la watermark à l'infini.

Cela crée un TestStream avec quatre éléments appartenant à notre première fenêtre. Les deux premiers éléments sont à l'heure, le troisième est en retard (mais dans les limites autorisées), et le dernier est en retard au-delà des limites autorisées. Comme nous accumulons les fenêtres déclenchées, le premier déclencheur doit compter deux événements et le dernier déclencheur doit compter trois événements. Le quatrième événement ne doit pas être inclus. Pour le vérifier, nous pouvons utiliser les méthodes inOnTimePane et inFinalPane de la classe PAssert.

Si vous rencontrez des difficultés, vous pouvez consulter les solutions.

Tâche 4 : Exécuter un test pour la gestion des données tardives

  • Revenez à votre terminal et exécutez la commande suivante pour lancer les tests une nouvelle fois :
mvn test

Si vous avez correctement effectué les tâches précédentes, vous devriez voir ce qui suit dans le terminal une fois les tests terminés :

[INFO] Results: [INFO] [INFO] Tests run: 2, Failures: 0, Errors: 0, Skipped: 0 [INFO] [INFO] ------------------------------------------------------------------------ [INFO] BUILD SUCCESS [INFO] ------------------------------------------------------------------------ [INFO] Total time: 24.849 s [INFO] Finished at: 2021-05-13T13:10:32-04:00 [INFO] ------------------------------------------------------------------------

Cliquez sur Vérifier ma progression pour valider l'objectif. Tester la logique de traitement par flux avec TestStream

Terminer l'atelier

Une fois l'atelier terminé, cliquez sur Terminer l'atelier. Google Cloud Skills Boost supprime les ressources que vous avez utilisées, puis efface le compte.

Si vous le souhaitez, vous pouvez noter l'atelier. Sélectionnez un nombre d'étoiles, saisissez un commentaire, puis cliquez sur Envoyer.

Le nombre d'étoiles correspond à votre degré de satisfaction :

  • 1 étoile = très insatisfait(e)
  • 2 étoiles = insatisfait(e)
  • 3 étoiles = ni insatisfait(e), ni satisfait(e)
  • 4 étoiles = satisfait(e)
  • 5 étoiles = très satisfait(e)

Si vous ne souhaitez pas donner votre avis, vous pouvez fermer la boîte de dialogue.

Pour soumettre des commentaires, suggestions ou corrections, veuillez accéder à l'onglet Assistance.

Copyright 2020 Google LLC Tous droits réservés. Google et le logo Google sont des marques de Google LLC. Tous les autres noms d'entreprises et de produits peuvent être des marques des entreprises auxquelles ils sont associés.

Avant de commencer

  1. Les ateliers créent un projet Google Cloud et des ressources pour une durée déterminée.
  2. Les ateliers doivent être effectués dans le délai imparti et ne peuvent pas être mis en pause. Si vous quittez l'atelier, vous devrez le recommencer depuis le début.
  3. En haut à gauche de l'écran, cliquez sur Démarrer l'atelier pour commencer.

Utilisez la navigation privée

  1. Copiez le nom d'utilisateur et le mot de passe fournis pour l'atelier
  2. Cliquez sur Ouvrir la console en navigation privée

Connectez-vous à la console

  1. Connectez-vous à l'aide des identifiants qui vous ont été attribués pour l'atelier. L'utilisation d'autres identifiants peut entraîner des erreurs ou des frais.
  2. Acceptez les conditions d'utilisation et ignorez la page concernant les ressources de récupération des données.
  3. Ne cliquez pas sur Terminer l'atelier, à moins que vous n'ayez terminé l'atelier ou que vous ne vouliez le recommencer, car cela effacera votre travail et supprimera le projet.

Ce contenu n'est pas disponible pour le moment

Nous vous préviendrons par e-mail lorsqu'il sera disponible

Parfait !

Nous vous contacterons par e-mail s'il devient disponible

Un atelier à la fois

Confirmez pour mettre fin à tous les ateliers existants et démarrer celui-ci

Utilisez la navigation privée pour effectuer l'atelier

Ouvrez une fenêtre de navigateur en mode navigation privée pour effectuer cet atelier. Vous éviterez ainsi les conflits entre votre compte personnel et le compte temporaire de participant, qui pourraient entraîner des frais supplémentaires facturés sur votre compte personnel.