arrow_back

Processing Data with Google Cloud Dataflow

Join Sign in

Processing Data with Google Cloud Dataflow

1 hour 15 minutes 7 Credits

GSP198

Google Cloud Self-Paced Labs

Overview

In this lab you will simulate a real-time real world data set from a historical data set. This simulated data set will be processed from a set of text files using Python and Google Cloud Dataflow, and the resulting simulated real-time data will be stored in BigQuery. You will then use BigQuery to analyse some features of the real-time data set.

Cloud Dataflow is a fully-managed service for transforming and enriching data in stream (real time) and batch (historical) modes via Java and Python APIs with the Apache Beam SDK. Cloud dataflow provides a serverless architecture that can be used to shard and process very large batch data sets, or high volume live streams of data, in parallel.

BigQuery is a RESTful web service that enables interactive analysis of massively large datasets working in conjunction with Google Storage.

The data set that is used provides historic information about internal flights in the United States retrieved from the US Bureau of Transport Statistics website. This data set can be used to demonstrate a wide range of data science concepts and techniques and will be used in all of the other labs in the Data Science on Google Cloud Platform quest.

Objectives

  • Configure a Python application to create a simulated real-time data stream from historical data.
  • Use Apache Beam locally to test Dataflow locally.
  • Use Apache Beam to process data using Cloud Dataflow to create a simulated real-time data set.
  • Query the simulated real-time data stream using BiqQuery.

Setup and Requirements

Qwiklabs setup

Before you click the Start Lab button

Read these instructions. Labs are timed and you cannot pause them. The timer, which starts when you click Start Lab, shows how long Google Cloud resources will be made available to you.

This hands-on lab lets you do the lab activities yourself in a real cloud environment, not in a simulation or demo environment. It does so by giving you new, temporary credentials that you use to sign in and access Google Cloud for the duration of the lab.

What you need

To complete this lab, you need:

  • Access to a standard internet browser (Chrome browser recommended).
  • Time to complete the lab.

Note: If you already have your own personal Google Cloud account or project, do not use it for this lab.

Note: If you are using a Chrome OS device, open an Incognito window to run this lab.

How to start your lab and sign in to the Google Cloud Console

  1. Click the Start Lab button. If you need to pay for the lab, a pop-up opens for you to select your payment method. On the left is a panel populated with the temporary credentials that you must use for this lab.

    Open Google Console

  2. Copy the username, and then click Open Google Console. The lab spins up resources, and then opens another tab that shows the Sign in page.

    Sign in

    Tip: Open the tabs in separate windows, side-by-side.

  3. In the Sign in page, paste the username that you copied from the left panel. Then copy and paste the password.

    Important: You must use the credentials from the left panel. Do not use your Google Cloud Training credentials. If you have your own Google Cloud account, do not use it for this lab (avoids incurring charges).

  4. Click through the subsequent pages:

    • Accept the terms and conditions.
    • Do not add recovery options or two-factor authentication (because this is a temporary account).
    • Do not sign up for free trials.

After a few moments, the Cloud Console opens in this tab.

Activate Cloud Shell

Cloud Shell is a virtual machine that is loaded with development tools. It offers a persistent 5GB home directory and runs on the Google Cloud. Cloud Shell provides command-line access to your Google Cloud resources.

In the Cloud Console, in the top right toolbar, click the Activate Cloud Shell button.

Cloud Shell icon

Click Continue.

cloudshell_continue.png

It takes a few moments to provision and connect to the environment. When you are connected, you are already authenticated, and the project is set to your PROJECT_ID. For example:

Cloud Shell Terminal

gcloud is the command-line tool for Google Cloud. It comes pre-installed on Cloud Shell and supports tab-completion.

You can list the active account name with this command:

gcloud auth list

(Output)

Credentialed accounts: - <myaccount>@<mydomain>.com (active)

(Example output)

Credentialed accounts: - google1623327_student@qwiklabs.net

You can list the project ID with this command:

gcloud config list project

(Output)

[core] project = <project_ID>

(Example output)

[core] project = qwiklabs-gcp-44776a13dea667a6

Check project permissions

Before you begin your work on Google Cloud, you need to ensure that your project has the correct permissions within Identity and Access Management (IAM).

  1. In the Google Cloud console, on the Navigation menu (nav-menu.png), click IAM & Admin > IAM.

  2. Confirm that the default compute Service Account {project-number}-compute@developer.gserviceaccount.com is present and has the editor role assigned. The account prefix is the project number, which you can find on Navigation menu > Home.

check-sa.png

If the account is not present in IAM or does not have the editor role, follow the steps below to assign the required role.

  • In the Google Cloud console, on the Navigation menu, click Home.

  • Copy the project number (e.g. 729328892908).

  • On the Navigation menu, click IAM & Admin > IAM.

  • At the top of the IAM page, click Add.

  • For New principals, type:

{project-number}-compute@developer.gserviceaccount.com

Replace {project-number} with your project number.

  • For Role, select Project (or Basic) > Editor. Click Save.

add-sa.png

Preparing your environment

This lab uses a set of code samples and scripts developed for the Data Science on Google Cloud book from O'Reilly Media, Inc. You will clone the sample repository used in Chapter 4 from Github to the Cloud Shell and carry out all of the lab tasks from there.

Please wait until you see the Lab Running green light on the page where to started the lab. Your environment isn't ready until you see this indicator.

Clone the Data Science sample

Enter the following commands to clone the repository:

git clone https://github.com/GoogleCloudPlatform/data-science-on-gcp/

Change to the repository source directory for this lab:

cd ~/data-science-on-gcp/04_streaming/simulate

Create isolated Python environment:

virtualenv data-sci-env -p python3

Activate isolated Python environment:

source data-sci-env/bin/activate

Install the python packages that are required:

pip install timezonefinder pytz pip install apache-beam[gcp]

These commands will report some errors related to Google utilities, incompatible packages that can be ignored for this lab.

Test the Python data processing functions locally

Examine the file df05.py using the Google Cloud Editor. Click on the Open Editor icon and if prompted open the file in a new tab.

editor.png

Note: If you cannot find the Open Editor icon, close the left menu by clicking the Navigation menu (hamburger) icon at the top.

Open the data-science-on-gcp folder, then go to 04_streaming > simulate and then click on the df05.py file.

This file provides a number of functions to process the historical data file in order to generate corresponding departure and arrival event records that also include accurate universal time-stamps, not just the local time stamps that are provided in the source dataset. For more information on how each of these processing functions works, consult Chapter 4 of Data Science on Google Cloud Platform.

Next you will execute the df05.pyscript. This version of the script carries out the tasks locally on a prepared test subset of the data called 201501_part.csv that includes just the first 1000 records of the data. The script is configured to call Apache Beam using the Directrunner parameter that executes locally rather than using a configuration that deploys at scale to the cloud. This allows you to test and evaluate the accuracy of the code before committing significant resources to a processing task.

9062bb88f836b748.png

Click on the Open Terminal icon and in Cloud Shell run the script:

python ./df05.py

This will take about seven minutes to complete.

Once it has completed, check the events file all_events-00000-of-00001 by running:

tail all_events-00000-of-00001

Note that there are now departure and arrival events interleaved. The departure events do not contain any arrival information, and look like this:

2015-01-01,AA,19805,AA ... -21600.0,departed... ... 2015-01-01,AA,19805,AA ... -18000.0,arrived ... ...

The departure events do not contain any arrival information. You will see that departure events have a a number of blank fields, represented by a series of commas, while the arrival events have values for all fields.

This task takes about 5 minutes to process a small sample of a thousand or so events. The code that identifies the timezones is very compute heavy as it has to check against a large number of intersecting polygons to locate each airport.

Process data using Cloud Dataflow

Now you will configure BigQuery and Cloud Dataflow for your project.

Run the following to create a BigQuery dataset to hold the simulated event data:

export PROJECT_ID=$(gcloud info --format='value(config.project)') bq mk --project_id $PROJECT_ID flights

Test Completed Task

Click Check my progress to verify your performed task. If you have completed the task successfully you will granted with an assessment score.

Create a BigQuery Dataset.

You'll now copy the airport geolocation file to your Cloud Storage bucket. This file is used to identify the physical location of each airport in order to convert the local time fields to universal time.

To save you some time:

  • The Cloud Storage bucket was pre-created for you.
  • The historical flights text data files are already in the bucket.

Run the following command to make sure you are in the working directory:

cd ~/data-science-on-gcp/04_streaming/simulate

Now run these commands:

export BUCKET=${PROJECT_ID}-ml gsutil cp airports.csv.gz gs://$BUCKET/flights/airports/airports.csv.gz

Test Completed Task

Click Check my progress to verify your performed task. If you have completed the task successfully you will granted with an assessment score.

Copy the airport geolocation file to your Cloud Storage bucket. Note: The Dataflow API is enabled by default for Qwiklabs lab accounts. For your own projects you will have to enable the Dataflow API explicitly by going to APIs and Services > Library and searching for Dataflow.

Process the Data using Cloud Dataflow

You will now deploy the functions using full python script, df06.py, that is configured to run the tasks using Google Cloud Dataflow. This script processes the full dataset that is stored in your Cloud Storage bucket which was pre-installed for you for this lab.

The script is configured to call Apache Beam using the DataflowRunner to execute the processing tasks in parallel using Cloud Dataflow. This will substantially speed up the processing of the data. The script is also configured to save data to the Flights BigQuery table that you created in the previous section.

In Cloud Shell use nano to inspect and edit the source code:

nano df06.py

If you inspect the source code for df06.py you will see that the run function has been changed to use Google Cloud Dataflow and also points to the pre-configured Cloud Storage bucket that contains the historical flight data text files.

Press CTRL+X to exit the nano editor.

Execute the script:

python df06.py -p $PROJECT_ID -b $BUCKET -r us-central1 Note: Above command will report warning related to Apache-Beam for Python3 and missing metadata that can be ignored for this lab.

This script will complete in a few seconds. The complete set of tasks has been handed off to Google Cloud Dataflow and will take about 15 minutes to complete. The initial startup phase for Cloud Dataflow typically takes about 7 or 8 minutes before it starts producing the simulated event data.

The lab has been configured to provide 2 months of source data, covering just under 900,000 flights that took place between Jan 1 2015 and Feb 28 2015. The resulting simulated data set will have 2.6 million records as each non-cancelled flight that departs and arrives in that time window has three events generated; Departure, Wheels Off, and Arrived.

The Cloud Dataflow job will typically only scale up once during processing this dataset but it would scale out as far as the max_num_workers parameter that was set for the job if the amount of data being processed was large enough.

Test Completed Task

Click Check my progress to verify your performed task. If you have completed the task successfully you will granted with an assessment score.

Process the Data using Cloud Dataflow (submit Dataflow job). If this step fails to score, wait a minute and resubmit the job by rerunning the previous command.

Monitor the Cloud Dataflow job and inspect the processed data

In the Google Cloud console, click Navigation menu, and in the Big Data section click on Dataflow.

Click the name of the Dataflow job to open the job details page for the events simulation job.

Here you can monitor the progress of your job. Since it will take several minutes for the pipeline to start writing events out to the flights.simevents table, a pre-processed BigQuery table, flightssample, has already been prepared that you can analyze immediately. You can choose to wait for the cho4timecorr job to finish, just follow the instructions in the box below to modify the query.

d95f51b893209d49.png

Note: If you chose to wait for the cho4timecorr job to finish, you must edit these queries to refer to flights.simevents rather than flightssample.simevents.

In the Console click on Navigation menu then in the Big Data section click BigQuery. If prompted, click Done.

Copy the following script and paste it in the Query editor field:

SELECT FL_NUM, ORIGIN, DEP_TIME, DEP_DELAY, DEST, ARR_TIME, ARR_DELAY, EVENT, NOTIFY_TIME FROM `flightssample.simevents` WHERE (DEP_DELAY > 15 and ORIGIN = 'SEA') or (ARR_DELAY > 15 and DEST = 'SEA') ORDER BY NOTIFY_TIME ASC LIMIT 10

Wait for it to validate then click RUN.

Test Completed Task

Click Check my progress to verify your performed task. If you have completed the task successfully you will granted with an assessment score.

Compose Queries.

You will see 10 rows of data. It's now easier to see some of the differences between arrival and departure events. You will also see that there is now a third type of event, wheelsoff, that describes the moment when the plane actually leaves the ground.

BigQuery is a columnar database which makes it inefficient when you need to query all of the fields associated with a specific event. For the event simulation task that, you need to be able to retrieve all of the data fields for each event as efficiently as possible. The processing script addresses this problem by adding a field to the table that includes all of the original event record in text format. This trade-off between storage and speed allows higher performance querying at the expense of having a larger database table.

Click Compose New Query in the top left of the BigQuery console, and then click Compose Query to confirm.

Copy this new query and paste it into the Query editor:

SELECT EVENT, NOTIFY_TIME, EVENT_DATA FROM `flightssample.simevents` WHERE NOTIFY_TIME >= TIMESTAMP('2015-01-01 00:00:00 UTC') AND NOTIFY_TIME < TIMESTAMP('2015-01-03 00:00:00 UTC') ORDER BY NOTIFY_TIME ASC LIMIT 10

Wait for it to validate then click RUN.

75c0ec29379ebcbe.png

You can see that the table now includes a field that contains all of the event data.

Test your Understanding

Below are multiple-choice questions to reinforce your understanding of this lab's concepts. Answer them to the best of your abilities.

Congratulations!

Now you know how to use Google Dataflow to process historical batch data using Python and Apache Beam. You have also used BigQuery to analyse a database that contains simulated real-time event data.

Finish Your Quest

2ea99a2e13bf2db4.png

This self-paced lab is part of the Qwiklabs Quest, Data Science on Google Cloud. A Quest is a series of related labs that form a learning path. Completing this Quest earns you the badge above, to recognize your achievement. You can make your badges public and link to them in your online resume or social media account. Enroll in this Quest and get immediate completion credit if you've taken this lab. See other available Qwiklabs Quests.

Take Your Next Lab

Continue your Quest with Visualize Real Time Geospatial Data with Google DataStudio, or check out these suggestions:

Next Steps / Learn More

The source of this lab:

Google Cloud Training & Certification

...helps you make the most of Google Cloud technologies. Our classes include technical skills and best practices to help you get up to speed quickly and continue your learning journey. We offer fundamental to advanced level training, with on-demand, live, and virtual options to suit your busy schedule. Certifications help you validate and prove your skill and expertise in Google Cloud technologies.

Manual Last Updated March 1, 2021
Lab Last Tested March 1, 2021

Copyright 2022 Google LLC All rights reserved. Google and the Google logo are trademarks of Google LLC. All other company and product names may be trademarks of the respective companies with which they are associated.