Write a pipeline that aggregates site traffic by user.
Write a pipeline that aggregates site traffic by minute.
Implement windowing on time series data.
Prerequisites
NOTE: Lab level is advanced and skilled knowledge of python is required.
Setup and requirements
For each lab, you get a new Google Cloud project and set of resources for a fixed time at no cost.
Sign in to Qwiklabs using an incognito window.
Note the lab's access time (for example, 1:15:00), and make sure you can finish within that time.
There is no pause feature. You can restart if needed, but you have to start at the beginning.
When ready, click Start lab.
Note your lab credentials (Username and Password). You will use them to sign in to the Google Cloud Console.
Click Open Google Console.
Click Use another account and copy/paste credentials for this lab into the prompts.
If you use other credentials, you'll receive errors or incur charges.
Accept the terms and skip the recovery resource page.
Part A. Workbench Instances development environment setup
For this lab, you will be running all commands in a terminal from your Workbench instance notebook.
In the Google Cloud console, from the Navigation menu (), select Vertex AI, or else you can go to Vertex AI Dashboard
Click Enable All Recommended APIs. Now. let's check that the Notebook API is enabled.
In the Navigation menu, click Workbench.
At the top of the Workbench page, ensure you are in the Instances view.
Click Create New.
Configure the Instance:
Name
Region
Zone
Advanced Options (Optional)
lab-workbench
If needed, click "Advanced Options" for further customization (e.g., machine type, disk size)
Click Create.
This will take a few minutes to create the instance. A green checkmark will appear next to its name when it's ready.
Click Open Jupyterlab next to the instance name to launch the JupyterLab interface. This will open a new tab in your browser.
Next, click Terminal. This will open up a terminal where you can run all the commands in this lab.
Download Code Repository
Next you will download a code repository for use in this lab.
In the terminal you just opened, enter the following:
git clone https://github.com/GoogleCloudPlatform/training-data-analyst
cd /home/jupyter/training-data-analyst/quests/dataflow_python/
On the left panel of your notebook environment, in the file browser, you will notice the training-data-analyst repo added.
Navigate into the cloned repo /training-data-analyst/quests/dataflow_python/. You will see a folder for each lab, which is further divided into a lab sub-folder with code to be completed by you, and a solution sub-folder with a fully workable example to reference if you get stuck.
Note: To open a file for editing purposes, simply navigate to the file and click on it. This will open the file, where you can add or modify code.
Click Check my progress to verify the objective.
Create notebook instance and clone course repo
Task 1. Aggregating site traffic by user
In this part of the lab, you write a pipeline that:
Reads the day’s traffic from a file in Cloud Storage.
Converts each event into a CommonLog object.
Sums the number of hits for each unique user by grouping each object by user ID and combining the values to get the total number of hits for that particular user.
Performs additional aggregations on each user.
Writes the resulting data to BigQuery.
Task 2. Generate synthetic data
As in the prior labs, the first step is to generate data for the pipeline to process. You will open the lab environment and generate the data as before:
Open the appropriate lab
In the terminal in your IDE environment, run the following commands:
# Change directory into the lab
cd 3_Batch_Analytics/lab
export BASE_DIR=$(pwd)
Setting up virtual environment and dependencies
Before you can begin editing the actual pipeline code, you need to ensure that you have installed the necessary dependencies.
Execute the following to create a virtual environment for your work in this lab:
# Create GCS buckets and BQ dataset
cd $BASE_DIR/../..
source create_batch_sinks.sh
# Generate event dataflow
source generate_batch_events.sh
# Change to the directory containing the practice version of the code
cd $BASE_DIR
The script creates a file called events.json containing lines resembling the following:
This pipeline already contains the necessary code to accept command-line options for the input path and the output table name, as well as code to read in events from Google Cloud Storage, parse those events, and write results to BigQuery. However, some important parts are missing and are marked as #TODOs.
This is a data modeling task. In this step, you should consider, after you group all the log events by user, what information do you want to calculate for each one?
You need to define the structure (the schema) that will hold the results of your aggregation. You must look at the CommonLog class to see what fields are available to aggregate.
How to solve it:
Identify the Key: The class name is PerUserAggregation, so the primary piece of information you need to keep is the user_id.
Choose Metrics to Calculate: What can you calculate from a user's collection of CommonLog entries?
A count: How many times did the user access the server?
A sum: What is the total number of bytes (num_bytes) they downloaded?
A min/max: What was their first and last activity timestamp?
For example:
user_id: str
page_views: int
...
Mini Challenge - #TODO 2:
This is a technical requirement of the Apache Beam framework. The challenge is testing your knowledge of how Beam handles custom data types.
When Apache Beam runs a pipeline, it often needs to send data between different computers (called workers). To do this, it must serialize your Python object (like a PerUserAggregation instance) into a stream of bytes, send it over the network, and then deserialize it back into an object on the other side. A Coder is the object that tells Beam how to perform this serialization and deserialization.
If you don't tell Beam how to encode/decode your custom PerUserAggregation class, the pipeline will fail with an error.
How to solve it:
The solution is right in the line above the #TODOs. Beam provides a RowCoder that works perfectly with NamedTuple classes. You simply need to register a RowCoder for your new PerUserAggregation class, just as it was done for CommonLog.
Return to the Terminal and execute the following command to run your pipeline using the Cloud Dataflow service. You can run it with DirectRunner if you're having trouble, or refer to the solution
In below code snippet, replace ENTER_REGION_ID and ENTER_ZONE_ID field as per the table below
Region
Zone
Replace the regional and zonal values as per your lab specifications.
# 1. Set all environment variables
export PROJECT_ID=$(gcloud config get-value project)
export REGION=ENTER_REGION_ID
export ZONE=ENTER_ZONE_ID
export BUCKET=gs://${PROJECT_ID}
export PIPELINE_FOLDER=${BUCKET}
export RUNNER=DataflowRunner
export INPUT_PATH=${PIPELINE_FOLDER}/events.json
export TABLE_NAME=${PROJECT_ID}:logs.user_traffic
# 2. Double-check the input file exists
# This command should NOT return an error.
echo "Verifying input file exists at ${INPUT_PATH}..."
gcloud storage ls ${INPUT_PATH}
# 3. Execute the pipeline script
echo "Running the Dataflow pipeline..."
python3 batch_user_traffic_pipeline.py \
--project=${PROJECT_ID} \
--region=${REGION} \
--worker_zone=${ZONE} \
--staging_location=${PIPELINE_FOLDER}/staging \
--temp_location=${PIPELINE_FOLDER}/temp \
--runner=${RUNNER} \
--input_path=${INPUT_PATH} \
--table_name=${TABLE_NAME}
Once job status is successful, let's verify results in BigQuery. To complete this task, wait a few minutes for the pipeline to complete, then navigate to BigQuery and query the user_traffic table.
Click Check my progress to verify the objective.
Aggregating site traffic by user and running your pipeline
Part B. Aggregating site traffic by minute
In this part of the lab, you create a new pipeline called batch_minute_traffic. It expands on the basic batch analysis principles used in batch_user_traffic and, instead of aggregating by users across the entire batch, aggregates by when events occurred. There are again multiple #TODOs which basically you need to fix or can take help from the solution
In the IDE, navigate to below path and open the file batch_minute_traffic_pipeline.py.
3_Batch_Analytics/lab
Note: Before start working with the main script(batch_minute_traffic_pipeline.py), make sure you go through the two helper files pipeline_utils.py and setup.py. To run our dataflow jobs, we will take help of these helper files as pipeline_utils holds custom Python logic (classes and functions) in a separate module that can be packaged and sent to remote Dataflow workers. Whereas, setup.py acts as the instruction manual for Dataflow, telling it exactly how to bundle your pipeline_utils.py file into a proper package to be installed on every worker.
Task 5. Add timestamps to each element
These #TODOs items guide you through building a classic time-series aggregation pipeline in Apache Beam. Each one represents a core concept in batch and stream processing.
The goal of this pipeline is to process a JSON file of web events (events.json), count how many events occurred every minute, and write these minute-by-minute counts to a BigQuery table.
The pipeline flow looks like this:
Read Text -> Parse to CommonLog -> TODOs -> Write to BigQuery
The #TODOs are the core concept of the pipeline, where the actual aggregation logic happens.
Mini Challenge - #TODO 1:
You have a collection of CommonLog objects. The next step in the pipeline is to group these events by time (WindowByMinute). Apache Beam cannot do this unless it knows the event time for each piece of data. Your challenge is to tell Beam how to find this timestamp within your CommonLog object.
How to solve it:
The add_timestamp function (defined in pipeline_utils.py) parses the timestamp string from each log record and attaches it to the element as a proper Beam timestamp, which is required for windowing.
For Example:
| 'AddEventTimestamp' >> beam.Map(...)
Task 6. Window into one-minute windows
Let's move to second task of the script where we transform groups elements as per the challenge.
Mini Challenge - #TODO 2:
This transform groups elements into fixed-size, non-overlapping windows of 60 seconds (one minute) based on their event timestamps.
Let's move to the third task of the script where we count events per window.
Mini Challenge - #TODO 3:
This combiner counts the number of elements within each one-minute window. Use .without_defaults() to make sure that no output is generated for empty windows.
To complete this task, add a transform that counts all the elements in each window. Remember to refer to the solution if you get stuck.
Task 8. Convert back to a row and add timestamp
Let's move to the final task of the script where we convert back to a row and add timestamp
Mini Challenge - #TODO 4:
The GetTimestampFn (defined in pipeline_utils.py) takes the integer count for each window and formats it into a dictionary, adding the window's start time as a string to match the BigQuery schema.
To complete this task, write a ParDo function that accepts elements of type int, passes in the additional parameter to access window information. Note that the timestamp field in the BigQuery table schema is a STRING, so you will have to convert the timestamp to a string.
Once you’ve finished coding, run the pipeline using the command below. Keep in mind that, while testing your code, it will be much faster to change the RUNNER environment variable to DirectRunner, which will run the pipeline locally. For now, we run the pipeline using Dataflow.
Replace the regional and zonal values as per your lab specifications.
To complete this task, wait a few minutes for the pipeline to execute, then navigate to BigQuery and query the minute_traffic table.
Click Check my progress to verify the objective.
Aggregating site traffic by minute and running the pipeline
End your lab
When you have completed your lab, click End Lab. Google Cloud Skills Boost removes the resources you’ve used and cleans the account for you.
You will be given an opportunity to rate the lab experience. Select the applicable number of stars, type a comment, and then click Submit.
The number of stars indicates the following:
1 star = Very dissatisfied
2 stars = Dissatisfied
3 stars = Neutral
4 stars = Satisfied
5 stars = Very satisfied
You can close the dialog box if you don't want to provide feedback.
For feedback, suggestions, or corrections, please use the Support tab.
Copyright 2022 Google LLC All rights reserved. Google and the Google logo are trademarks of Google LLC. All other company and product names may be trademarks of the respective companies with which they are associated.
Labs erstellen ein Google Cloud-Projekt und Ressourcen für einen bestimmten Zeitraum
Labs haben ein Zeitlimit und keine Pausenfunktion. Wenn Sie das Lab beenden, müssen Sie von vorne beginnen.
Klicken Sie links oben auf dem Bildschirm auf Lab starten, um zu beginnen
Privates Surfen verwenden
Kopieren Sie den bereitgestellten Nutzernamen und das Passwort für das Lab
Klicken Sie im privaten Modus auf Konsole öffnen
In der Konsole anmelden
Melden Sie sich mit Ihren Lab-Anmeldedaten an. Wenn Sie andere Anmeldedaten verwenden, kann dies zu Fehlern führen oder es fallen Kosten an.
Akzeptieren Sie die Nutzungsbedingungen und überspringen Sie die Seite zur Wiederherstellung der Ressourcen
Klicken Sie erst auf Lab beenden, wenn Sie das Lab abgeschlossen haben oder es neu starten möchten. Andernfalls werden Ihre bisherige Arbeit und das Projekt gelöscht.
Diese Inhalte sind derzeit nicht verfügbar
Bei Verfügbarkeit des Labs benachrichtigen wir Sie per E-Mail
Sehr gut!
Bei Verfügbarkeit kontaktieren wir Sie per E-Mail
Es ist immer nur ein Lab möglich
Bestätigen Sie, dass Sie alle vorhandenen Labs beenden und dieses Lab starten möchten
Privates Surfen für das Lab verwenden
Nutzen Sie den privaten oder Inkognitomodus, um dieses Lab durchzuführen. So wird verhindert, dass es zu Konflikten zwischen Ihrem persönlichen Konto und dem Teilnehmerkonto kommt und zusätzliche Gebühren für Ihr persönliches Konto erhoben werden.
In this lab you write a pipeline that aggregates site traffic by user and write a pipeline that aggregates site traffic by minute.