Présentation
Dans cet atelier, vous allez reprendre de nombreux concepts présentés dans un contexte de lot et les appliquer dans un contexte de flux continu pour créer un pipeline semblable à batch_minute_traffic_pipeline, mais qui fonctionne en temps réel. Le pipeline final lira d'abord les messages JSON de Pub/Sub et les analysera avant de créer des ramifications. Une branche écrit des données brutes dans BigQuery et note l'événement et l'heure de son traitement. L'autre branche procède au fenêtrage des données et les agrège, puis écrit les résultats dans BigQuery.
Objectifs
- Lire des données provenant d'une source de flux continu
- Écrire des données dans un récepteur de flux continu
- Effectuer le fenêtrage des données dans un contexte de flux continu
- Vérifier de façon expérimentale les effets du décalage
Vous allez créer le pipeline suivant :

Préparation
Avant de cliquer sur le bouton "Démarrer l'atelier"
Remarque : Lisez ces instructions.
Les ateliers sont minutés, et vous ne pouvez pas les mettre en pause. Le minuteur, qui démarre lorsque vous cliquez sur Démarrer l'atelier, indique combien de temps les ressources Google Cloud resteront accessibles.
Cet atelier pratique Qwiklabs vous permet de suivre vous-même les activités dans un véritable environnement cloud, et non dans un environnement de simulation ou de démonstration. Des identifiants temporaires vous sont fournis pour vous permettre de vous connecter à Google Cloud le temps de l'atelier.
Conditions requises
Pour réaliser cet atelier, vous devez :
- avoir accès à un navigateur Internet standard (nous vous recommandons d'utiliser Chrome) ;
- disposer de suffisamment de temps pour effectuer l'atelier en une fois.
Remarque : Si vous possédez déjà votre propre compte ou projet Google Cloud, veillez à ne pas l'utiliser pour réaliser cet atelier.
Remarque : Si vous utilisez un Pixelbook, veuillez exécuter cet atelier dans une fenêtre de navigation privée.
Démarrer votre atelier et vous connecter à la console
-
Cliquez sur le bouton Démarrer l'atelier. Si l'atelier est payant, un pop-up s'affiche pour vous permettre de sélectionner un mode de paiement.
Sur la gauche, vous verrez un panneau contenant les identifiants temporaires à utiliser pour cet atelier.

-
Copiez le nom d'utilisateur, puis cliquez sur Ouvrir la console Google.
L'atelier lance les ressources, puis la page Sélectionner un compte dans un nouvel onglet.
Remarque : Ouvrez les onglets dans des fenêtres distinctes, placées côte à côte.
-
Sur la page "Sélectionner un compte", cliquez sur Utiliser un autre compte. La page de connexion s'affiche.

-
Collez le nom d'utilisateur que vous avez copié dans le panneau "Détails de connexion". Copiez et collez ensuite le mot de passe.
Remarque : Vous devez utiliser les identifiants fournis dans le panneau "Détails de connexion", et non vos identifiants Google Cloud Skills Boost. Si vous possédez un compte Google Cloud, ne vous en servez pas pour cet atelier (vous éviterez ainsi que des frais vous soient facturés).
- Accédez aux pages suivantes :
- Acceptez les conditions d'utilisation.
- N'ajoutez pas d'options de récupération ni d'authentification à deux facteurs (ce compte est temporaire).
- Ne vous inscrivez pas aux essais offerts.
Après quelques instants, la console Cloud s'ouvre dans cet onglet.
Remarque : Vous pouvez afficher le menu qui contient la liste des produits et services Google Cloud en cliquant sur le menu de navigation en haut à gauche.
Configuration de l'environnement de développement des instances Workbench
Dans cet atelier, vous allez exécuter toutes les commandes dans un terminal à partir de votre notebook d'instance.
-
Dans le menu de navigation (
) de la console Google Cloud, sélectionnez Vertex AI.
-
Cliquez sur Activer toutes les API recommandées.
-
Dans le menu de navigation, cliquez sur Workbench.
En haut de la page "Workbench", vérifiez que vous vous trouvez dans la vue Instances.
-
Cliquez sur
Créer.
-
Configurez l'instance :
-
Nom : lab-workbench
-
Région : définissez la région sur
-
Zone : définissez la zone sur
-
Options avancées (facultatif) : si nécessaire, cliquez sur "Options avancées" pour une personnalisation plus avancée (par exemple, type de machine, taille du disque).

- Cliquez sur Créer.
La création de l'instance prend quelques minutes. Une coche verte apparaît à côté de son nom quand elle est prête.
- Cliquez sur Ouvrir JupyterLab à côté du nom de l'instance pour lancer l'interface JupyterLab. Un nouvel onglet s'ouvre alors dans votre navigateur.

- Cliquez ensuite sur Terminal. Le terminal qui s'affiche vous permet d'exécuter toutes les commandes de cet atelier.
Télécharger le dépôt de code
Maintenant, vous allez télécharger le dépôt de code que vous utiliserez dans cet atelier.
- Dans le terminal que vous venez d'ouvrir, saisissez le code suivant :
git clone https://github.com/GoogleCloudPlatform/training-data-analyst
cd /home/jupyter/training-data-analyst/quests/dataflow_python/
-
Dans le panneau de gauche de votre environnement de notebook, dans l'explorateur de fichiers, vous pouvez noter que le dépôt training-data-analyst a été ajouté.
-
Accédez au dépôt cloné /training-data-analyst/quests/dataflow_python/
. Chaque atelier correspond à un dossier, divisé en deux sous-dossiers : le premier, intitulé lab
, contient du code que vous devez compléter, tandis que le second, nommé solution
, comporte un exemple concret que vous pouvez consulter si vous rencontrez des difficultés.

Remarque : Si vous souhaitez ouvrir un fichier pour le modifier, il vous suffit de le trouver et de cliquer dessus. Une fois ce fichier ouvert, vous pourrez y ajouter du code ou en modifier.
Ouvrir l'atelier approprié
- Dans votre terminal, exécutez les commandes suivantes pour accéder au répertoire que vous utiliserez pour cet atelier :
# Change directory into the lab
cd 5_Streaming_Analytics/lab
export BASE_DIR=$(pwd)
Configurer les dépendances
Avant de pouvoir modifier le code du pipeline, assurez-vous d'avoir installé les dépendances nécessaires.
- Exécutez les commandes suivantes pour installer les packages dont vous aurez besoin pour exécuter votre pipeline :
python3 -m pip install -q --upgrade pip setuptools wheel
python3 -m pip install apache-beam[gcp]
- Vérifiez que l'API Dataflow est activée :
gcloud services enable dataflow.googleapis.com
- Enfin, attribuez le rôle
dataflow.worker
au compte de service Compute Engine par défaut :
PROJECT_ID=$(gcloud config get-value project)
export PROJECT_NUMBER=$(gcloud projects list --filter="$PROJECT_ID" --format="value(PROJECT_NUMBER)")
export serviceAccount=""$PROJECT_NUMBER"-compute@developer.gserviceaccount.com"
-
Dans la console Cloud, accédez à IAM ET ADMINISTRATION > IAM, puis cliquez sur l'icône Modifier le compte principal pour le compte de service par défaut de Compute Engine
.
-
Ajoutez le rôle Nœud de calcul Dataflow, puis cliquez sur Enregistrer.
Configurer l'environnement de données
# Create GCS buckets and BQ dataset
cd $BASE_DIR/../..
source create_streaming_sinks.sh
# Change to the directory containing the practice version of the code
cd $BASE_DIR
Cliquez sur Vérifier ma progression pour valider l'objectif.
Configurer l'environnement de données
Tâche 1 : Lire des données à partir d'une source de flux continu
Dans les ateliers précédents, vous avez utilisé beam.io.ReadFromText
pour lire des données depuis Google Cloud Storage. Dans cet atelier, vous allez utiliser Pub/Sub au lieu de Google Cloud Storage. Pub/Sub est un service de messagerie en temps réel entièrement géré qui permet aux diffuseurs d'envoyer des messages vers un "sujet" auquel il est possible de s'abonner par un "abonnement".

Le pipeline que vous créez s'abonne au sujet appelé my_topic
que vous venez de créer à l'aide du script create_streaming_sinks.sh
. En production, ce sujet est souvent créé par l'équipe de publication. Vous pouvez le consulter dans la section Pub/Sub de la console.
- Dans l'explorateur de fichiers, accédez à
training-data-analyst/quest/dataflow_python/5_Streaming_Analytics/lab/
et ouvrez le fichier streaming_minute_traffic_pipeline.py
.
- Pour lire des données depuis Pub/Sub à l'aide des connecteurs d'E/S d'Apache Beam, ajoutez une transformation au pipeline qui utilise la classe
beam.io.ReadFromPubSub()
. Cette classe comporte des attributs permettant de spécifier le sujet source ainsi que l'attribut timestamp_attribute
. Par défaut, cet attribut est défini sur l'heure de publication du message.
Remarque :
L'heure de publication correspond au moment où le service Pub/Sub reçoit le message pour la première fois. Dans certains systèmes, il peut y avoir un décalage entre l'heure réelle de l'événement et l'heure de publication (c'est-à-dire des données tardives). Si vous souhaitez en tenir compte, le code client qui publie le message doit définir un attribut de métadonnées "timestamp" sur le message et fournir le code temporel réel de l'événement. En effet, Pub/Sub ne saura pas, de manière native, comment extraire le code temporel de l'événement intégré à la charge utile. Vous pouvez voir le code client qui génère les messages que vous allez utiliser ici.
Pour effectuer cette tâche :
- Ajoutez une transformation qui lit les données du sujet Pub/Sub spécifié par le paramètre de ligne de commande
input_topic
.
- Ensuite, utilisez la fonction fournie,
parse_json
, avec beam.Map
pour convertir chaque chaîne JSON en instance CommonLog
.
- Collectez les résultats de cette transformation dans une
PCollection
d'instances CommonLog
à l'aide de with_output_types()
.
- Dans la première occurrence
#TODO
, ajoutez le code suivant :
beam.io.ReadFromPubSub(input_topic)
Tâche 2 : Effectuer le fenêtrage des données
Dans l'atelier précédent (non SQL), vous avez implémenté le fenêtrage à durée fixe afin de regrouper les événements par heure d'événement dans des fenêtres de taille fixe mutuellement exclusives. Faites la même chose ici avec les entrées en flux continu. N'hésitez pas à vous reporter au code de l'atelier précédent ou à la solution si vous êtes bloqué.
Regrouper en fenêtres d'une minute
Pour effectuer cette tâche :
- Ajoutez à votre pipeline une transformation qui accepte la
PCollection
de données CommonLog
et qui regroupe les éléments dans des fenêtres d'une durée de window_duration
secondes, où window_duration
est un autre paramètre de ligne de commande.
- Utilisez le code suivant pour ajouter à votre pipeline une transformation qui regroupe les éléments dans des fenêtres d'une minute :
"WindowByMinute" >> beam.WindowInto(beam.window.FixedWindows(60))
Tâche 3 : Agréger les données
Dans l'atelier précédent, vous avez utilisé l'argument CountCombineFn()
pour compter le nombre d'événements par fenêtre. Faites de même ici.
Compter les événements par fenêtre
Pour effectuer cette tâche :
- Transmettez la
PCollection
regroupée en fenêtre sous forme d'entrée à une transformation qui compte le nombre d'événements par fenêtre.
- Ensuite, utilisez le
DoFn
fourni, GetTimestampFn
, avec beam.ParDo
pour inclure le code temporel de début de la fenêtre.
- Utilisez le code suivant pour ajouter à votre pipeline une transformation qui compte le nombre d'événements par fenêtre :
"CountPerMinute" >> beam.CombineGlobally(CountCombineFn()).without_defaults()
Tâche 4 : Écrire dans BigQuery
Ce pipeline écrit dans BigQuery dans deux branches distinctes. La première branche écrit les données agrégées dans BigQuery. La deuxième branche, qui a déjà été créée pour vous, écrit des métadonnées concernant chaque événement brut, y compris le code temporel de l'événement et le code temporel du traitement réel. Les deux écrivent directement dans BigQuery à l'aide d'insertions en flux continu.
Écrire les données agrégées dans BigQuery
L'écriture dans BigQuery a été largement abordée dans les ateliers précédents. Nous ne reviendrons donc pas en détail sur les mécanismes de base.
Pour effectuer cette tâche :
- Créez un paramètre de ligne de commande nommé
agg_table_name
pour la table destinée à héberger les données agrégées.
- Ajoutez une transformation comme précédemment, pour écrire dans BigQuery.
Remarque :
Dans un contexte de flux continu de données, beam.io.WriteToBigQuery()
n'est pas compatible avec write_disposition
de type WRITE_TRUNCATE
, qui supprime et recrée la table. Dans cet exemple, utilisez WRITE_APPEND
.
Méthode d'insertion dans BigQuery
beam.io.WriteToBigQuery
utilisera par défaut soit les insertions en flux continu pour les PCollections illimitées, soit les jobs de chargement de fichiers par lot pour les PCollections limitées. Les insertions en flux continu peuvent être particulièrement utiles lorsque vous souhaitez que les données apparaissent immédiatement dans les agrégations, mais elles entraînent des frais supplémentaires. Dans les cas d'utilisation de flux continu où vous acceptez des importations par lot périodiques toutes les deux minutes environ, vous pouvez spécifier ce comportement à l'aide de l'argument mot clé method
et définir la fréquence avec l'argument mot clé triggering_frequency
. Pour en savoir plus, consultez la section "Write data to BigQuery" de la documentation du module apache_beam.io.gcp.bigquery.
- Utilisez le code suivant pour ajouter une transformation à votre pipeline qui écrit les données agrégées dans la table BigQuery.
'WriteAggToBQ' >> beam.io.WriteToBigQuery(
agg_table_name,
schema=agg_table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
)
Tâche 5 : Exécuter le pipeline
- Revenez au terminal et exécutez le code suivant pour exécuter votre pipeline :
export PROJECT_ID=$(gcloud config get-value project)
export REGION='{{{project_0.default_region|Region}}}'
export BUCKET=gs://${PROJECT_ID}
export PIPELINE_FOLDER=${BUCKET}
export RUNNER=DataflowRunner
export PUBSUB_TOPIC=projects/${PROJECT_ID}/topics/my_topic
export WINDOW_DURATION=60
export AGGREGATE_TABLE_NAME=${PROJECT_ID}:logs.windowed_traffic
export RAW_TABLE_NAME=${PROJECT_ID}:logs.raw
python3 streaming_minute_traffic_pipeline.py \
--project=${PROJECT_ID} \
--region=${REGION} \
--staging_location=${PIPELINE_FOLDER}/staging \
--temp_location=${PIPELINE_FOLDER}/temp \
--runner=${RUNNER} \
--input_topic=${PUBSUB_TOPIC} \
--window_duration=${WINDOW_DURATION} \
--agg_table_name=${AGGREGATE_TABLE_NAME} \
--raw_table_name=${RAW_TABLE_NAME}
Remarque : Si vous obtenez une erreur indiquant que le pipeline Dataflow n'a pas pu ouvrir le fichier pipeline.py
, exécutez à nouveau le pipeline. Il devrait s'exécuter sans problème.
Vérifiez dans l'UI Dataflow que le job s'exécute correctement et sans erreur. Notez qu'aucune donnée n'est encore créée ni ingérée par le pipeline : il s'exécute, mais ne traite rien. Vous allez introduire des données à l'étape suivante.
Cliquez sur Vérifier ma progression pour valider l'objectif.
Exécuter le pipeline
Tâche 6 : Générer des entrées en flux continu sans décalage
Comme il s'agit d'un pipeline de flux de données, il s'abonne à la source du flux et attend une entrée, qui n'est pas disponible pour le moment. Dans cette section, vous allez générer des données sans décalage. Les données réelles contiennent presque toujours un décalage. Toutefois, il est utile de comprendre les entrées en flux continu sans décalage.
Le code de cette quête inclut un script permettant de publier des événements JSON à l'aide de Pub/Sub.
- Pour effectuer cette tâche et commencer à publier des messages, ouvrez un nouveau terminal à côté de celui que vous utilisez actuellement et exécutez le script suivant. Il continuera à publier des messages jusqu'à ce que vous arrêtiez le script. Assurez-vous de vous trouver dans le dossier
training-data-analyst/quests/dataflow_python
.
bash generate_streaming_events.sh
Cliquez sur Vérifier ma progression pour valider l'objectif.
Générer des entrées en flux continu sans décalage
Examiner les résultats
- Attendez quelques minutes que les données commencent à s'afficher. Accédez ensuite à BigQuery et interrogez la table
logs.minute_traffic
avec la requête suivante :
SELECT timestamp, page_views
FROM `logs.windowed_traffic`
ORDER BY timestamp ASC
Vous devez constater que le nombre de pages vues se situe autour de 100 vues par minute.
- Vous pouvez également utiliser l'outil de ligne de commande BigQuery pour vérifier rapidement que les résultats sont en cours d'écriture :
bq head logs.raw
bq head logs.windowed_traffic
- Saisissez la requête suivante :
SELECT
UNIX_MILLIS(TIMESTAMP(event_timestamp)) - min_millis.min_event_millis AS event_millis,
UNIX_MILLIS(TIMESTAMP(processing_timestamp)) - min_millis.min_event_millis AS processing_millis,
user_id,
-- added as unique label so we see all the points
CAST(UNIX_MILLIS(TIMESTAMP(event_timestamp)) - min_millis.min_event_millis AS STRING) AS label
FROM
`logs.raw`
CROSS JOIN (
SELECT
MIN(UNIX_MILLIS(TIMESTAMP(event_timestamp))) AS min_event_millis
FROM
`logs.raw`) min_millis
WHERE
event_timestamp IS NOT NULL
ORDER BY
event_millis ASC
Cette requête illustre l'écart entre l'heure de l'événement et l'heure de traitement. Cependant, il peut être difficile d'avoir une vue d'ensemble en se contentant de regarder les données tabulaires brutes. Nous allons utiliser Looker Studio, un moteur d'informatique décisionnelle et de visualisation de données léger.
- Pour activer Looker Studio :
- Accédez à Looker Studio.
- En haut à gauche, cliquez sur Créer.
- Cliquez sur Rapport.
- Sélectionnez un nom pour le champ Pays et saisissez un nom dans le champ Société. Cochez la case pour confirmer que vous avez lu et que vous acceptez les conditions d'utilisation supplémentaires de Google Looker Studio, puis cliquez sur Continuer.
- Sélectionnez "Non" pour toutes les options, puis cliquez sur Continuer.
- Revenez à l'UI de BigQuery.
- Dans l'UI BigQuery, cliquez sur le menu déroulant Ouvrir dans et sélectionnez Looker Studio.
Une nouvelle fenêtre s'affiche.
- Cliquez sur Commencer.
Notez que des visualisations par défaut sont créées pour les données.
-
Pour supprimer les visualisations par défaut, faites un clic droit sur chacune d'elles et sélectionnez Supprimer.
-
Dans la barre de menu supérieure, cliquez sur Ajouter un graphique.
-
Sélectionnez le type Graphique à nuage de points.
-
Dans la colonne Données du panneau de droite, définissez les valeurs suivantes :
- Dimension : label
- Métrique X : event_millis
- Métrique Y : processing_millis
Le graphique se transforme en nuage de points, où tous les points se trouvent sur la diagonale. En effet, dans les données en flux continu actuellement générées, les événements sont traités immédiatement après leur génération. Il n'y a donc pas de décalage. Si vous avez démarré le script de génération de données rapidement, c'est-à-dire avant que le job Dataflow ne soit entièrement opérationnel, vous pouvez voir une courbe en forme de crosse de hockey, car des messages étaient mis en file d'attente dans Pub/Sub et ont été traités plus ou moins en même temps.
Mais en situation réelle, les pipelines doivent gérer le décalage.

Tâche 7 : Introduire un décalage dans l'entrée en flux continu
Le script d'événement de flux est capable de générer des événements avec un décalage simulé.
Cela représente les scénarios dans lesquels il existe un décalage entre le moment où les événements sont générés et celui où ils sont publiés dans Pub/Sub. Par exemple, lorsqu'un client mobile passe en mode hors connexion si un utilisateur n'a pas de réseau, les événements sont collectés sur l'appareil et publiés en une seule fois lorsque l'appareil est de nouveau en ligne.
Générer des entrées en flux continu avec un décalage
-
Commencez par fermer la fenêtre Looker Studio.
-
Ensuite, pour activer le décalage, revenez au terminal et arrêtez le script en cours d'exécution à l'aide de CTRL+C
.
-
Exécutez ensuite la commande suivante :
bash generate_streaming_events.sh true
Examiner les résultats
- Revenez à l'UI de BigQuery, réexécutez la requête, puis recréez la vue Looker Studio comme précédemment. Les nouvelles données qui arrivent (elles doivent apparaître sur la droite du graphique) ne doivent plus être parfaites. Certaines apparaîtront au-dessus de la diagonale, indiquant qu'elles ont été traitées après les événements.
Type de graphique : nuage de points
- Dimension : label
- Métrique X : event_millis
- Métrique Y : processing_millis

Terminer l'atelier
Une fois l'atelier terminé, cliquez sur Terminer l'atelier. Google Cloud Skills Boost supprime les ressources que vous avez utilisées, puis efface le compte.
Si vous le souhaitez, vous pouvez noter l'atelier. Sélectionnez un nombre d'étoiles, saisissez un commentaire, puis cliquez sur Envoyer.
Le nombre d'étoiles correspond à votre degré de satisfaction :
- 1 étoile = très insatisfait(e)
- 2 étoiles = insatisfait(e)
- 3 étoiles = ni insatisfait(e), ni satisfait(e)
- 4 étoiles = satisfait(e)
- 5 étoiles = très satisfait(e)
Si vous ne souhaitez pas donner votre avis, vous pouvez fermer la boîte de dialogue.
Pour soumettre des commentaires, suggestions ou corrections, veuillez accéder à l'onglet Assistance.
Copyright 2020 Google LLC Tous droits réservés. Google et le logo Google sont des marques de Google LLC. Tous les autres noms d'entreprises et de produits peuvent être des marques des entreprises auxquelles ils sont associés.