arrow_back

Building Dynamic Pipelines in Cloud Data Fusion Using Macros

Join Sign in

Building Dynamic Pipelines in Cloud Data Fusion Using Macros

1 hour 30 minutes 7 Credits

GSP809

Google Cloud Self-Paced Labs logo

Overview

  • In Data Fusion, you can use Macros to introduce dynamic variables for plugin configurations so that you can specify the variable substitutions at runtime.

  • Macros are defined as variables wrapped inside of a **${ }**. Plugins that have macro enabled fields will contain a blue M icon.

10-01-overview.png

  • A typical use case for macros is to allow for variable substitution when the pipeline is run, so instead of using hard coded paths, you could use dynamic paths. This way you’re able to specify which particular file you want to process when the pipeline is invoked. In addition to macros you'll use wildcards as well.

Objectives

In this lab, you will learn to:

  • Create a simple batch pipeline

  • Modify the batch pipeline to use Macros for certain plugin configurations

  • Deploy the dynamic pipeline and run it using runtime arguments

Setup

For each lab, you get a new Google Cloud project and set of resources for a fixed time at no cost.

  1. Sign in to Qwiklabs using an incognito window.

  2. Note the lab's access time (for example, 02:00:00), and make sure you can finish within that time. There is no pause feature. You can restart if needed, but you have to start at the beginning.

  3. When ready, click Start lab.

  1. Note your lab credentials (Username and Password). You will use them to sign in to the Google Cloud Console.

  2. Click Open Google Console.

  3. Click Use another account and copy/paste credentials for this lab into the prompts. If you use other credentials, you'll receive errors or incur charges.

  4. Accept the terms and skip the recovery resource page.

Log in to Google Cloud Console

  1. Using the browser tab or window you are using for this lab session, copy the Username from the Connection Details panel and click the Open Google Console button.
Note: If you are asked to choose an account, click Use another account.
  1. Paste in the Username, and then the Password as prompted.
  2. Click Next.
  3. Accept the terms and conditions.

Since this is a temporary account, which will last only as long as this lab:

  • Do not add recovery options

  • Do not sign up for free trials

  1. Once the console opens, view the list of services by clicking the Navigation menu (Navigation menu icon) at the top-left.

Navigation menu

Activate Cloud Shell

Cloud Shell is a virtual machine that contains development tools. It offers a persistent 5-GB home directory and runs on Google Cloud. Cloud Shell provides command-line access to your Google Cloud resources. gcloud is the command-line tool for Google Cloud. It comes pre-installed on Cloud Shell and supports tab completion.

  1. Click the Activate Cloud Shell button (Activate Cloud Shell icon) at the top right of the console.

  2. Click Continue. It takes a few moments to provision and connect to the environment. When you are connected, you are also authenticated, and the project is set to your PROJECT_ID.

Sample commands

  • List the active account name:

gcloud auth list

(Output)

Credentialed accounts: - <myaccount>@<mydomain>.com (active)

(Example output)

Credentialed accounts: - google1623327_student@qwiklabs.net
  • List the project ID:

gcloud config list project

(Output)

[core] project = <project_ID>

(Example output)

[core] project = qwiklabs-gcp-44776a13dea667a6 Note: Full documentation of gcloud is available in the gcloud CLI overview guide.

Check project permissions

Before you begin working on Google Cloud, you must ensure that your project has the correct permissions within Identity and Access Management (IAM).

  1. In the Google Cloud console, on the Navigation menu (Navigation menu icon), click IAM & Admin > IAM.

  2. Confirm that the default compute Service Account {project-number}-compute@developer.gserviceaccount.com is present and has the editor role assigned. The account prefix is the project number, which you can find on Navigation menu > Cloud overview.

Default compute service account

If the account is not present in IAM or does not have the editor role, follow the steps below to assign the required role.

  1. In the Google Cloud console, on the Navigation menu, click Cloud overview.

  2. From the Project info card, copy the Project number.

  3. On the Navigation menu, click IAM & Admin > IAM.

  4. At the top of the IAM page, click Add.

  5. For New principals, type:

{project-number}-compute@developer.gserviceaccount.com

Replace {project-number} with your project number.

  1. For Select a role, select Basic (or Project) > Editor.

  2. Click Save.

Load the data

Next you will create a Cloud Storage bucket in your project so that you can load some sample data for Wrangling. Cloud Data Fusion will later read data out of this storage bucket

  1. In Cloud Shell, execute the following commands to create a new bucket:

export BUCKET=$GOOGLE_CLOUD_PROJECT gsutil mb gs://$BUCKET

This creates a bucket with the same name as your Project ID.

  1. Run the commands below to copy the CSV files into your bucket:

gsutil cp gs://cloud-training/OCBL165/titanic.csv gs://$BUCKET gsutil cp gs://cloud-training/OCBL165/incremental/titanic*.csv gs://$BUCKET/incremental/

Click Check my progress to verify the objective. Load the data

Add necessary permissions for your Cloud Data Fusion instance

Next, you will grant permissions to the service account associated with the instance, using the following steps.

  1. In the Cloud Console, from the Navigation menu select Data Fusion > Instances. You should see a Cloud Data Fusion instance already setup and ready for use.

  2. Click on the instance name. On the Instance details page copy the Service Account to your clipboard.

service_account.png

  1. In the Console navigate to the IAM & Admin > IAM.

  2. On the IAM Permissions page, click Add.

  3. In the New principals field paste the service account.

  4. Click into the Select a role field and start typing Cloud Data Fusion API Service Agent, then select it.

00-4-DataFusion-API-Service.png

  1. Click Save.

Click Check my progress to verify the objective. Add Cloud Data Fusion API Service Agent role to service account

Grant service account user permission

  1. In the console, on the Navigation menu, click IAM & admin > IAM.

  2. Select the Include Google-provided role grants checkbox.

  3. Scroll down the list to find the Google-managed Cloud Data Fusion service account that looks like service-{project-number}@gcp-sa-datafusion.iam.gserviceaccount.com and then copy the service account name to your clipboard.

Google-managed Cloud Data Fusion service account listing

  1. Next, navigate to the IAM & admin > Service Accounts.

  2. Click on the default compute engine account that looks like {project-number}-compute@developer.gserviceaccount.com, and select the Permissions tab on the top navigation.

  3. Click on the Grant Access button.

  4. In the New Principals field, paste the service account you copied earlier.

  5. In the Role dropdown menu, select Service Account User.

  6. Click Save.

Build a Batch Pipeline

  1. In the Cloud Console, from the Navigation Menu, click on Data Fusion, then click the View Instance link next to your Data Fusion instance. Select your lab credentials to sign in. If prompted to take a tour of the service click on No, Thanks. You should now be in the Cloud Data Fusion UI.

  2. On the Cloud Data Fusion Control Center, use the Navigation menu to expose the left menu, then choose Wrangler.

  3. When Wrangler loads, on the left side is a panel with the pre-configured connections to your data, including the Cloud Storage connection.

  4. Under Google Cloud Storage, select Cloud Storage Default.

  5. Click on the bucket corresponding to your Project ID.

  6. Click on titanic.csv. The data is loaded into the Wrangler screen in row/column form.

    wrangler_cloud_storage_default.png

Once the file has been loaded into Wrangler you can start applying the data transformations iteratively.

  1. Click the dropdown icon from the first column (body) heading, select the Parse menu item, then CSV from the submenu.

    parse_csv.png

  2. In the raw data you can see that the first row consists of column headings, so select the option to Set first row as header in the dialog box for Parse as CSV and click Apply.

parse_as_csv.png

  1. At this stage, the raw data is parsed and you can see the columns generated by this operation on the right of the screen:

    csv-in-wrangler.png

  2. Now it’s time for some cleanup. You no longer need the body column that represents the raw CSV data, so remove that using the menu option.

batch6.png

For the purpose of this lab, the two transformations are sufficient to create the ETL pipeline.

titanic_columns.png

  1. Click on the Create a Pipeline button to jump into the next section to create a pipeline, where you’ll see how the ETL pipeline comes together.

  2. When presented with the next dialog, select Batch pipeline to continue. This launches the Pipeline Studio.

Pipeline Studio

  1. The rest of the pipeline building tasks will take in the pipeline studio, the UI that lets you compose data pipelines visually. At the moment, you have the E (Extract) and T (Transform) nodes of our ETL pipeline. To complete this pipeline you will add the BigQuery sink, the L portion of our ETL.

    batch10.png

  2. To add the BigQuery sink to the pipeline navigate to the Sink section on the left of the screen and click on the BigQuery icon to place it on the canvas.

    BQ_sink.png

  3. Once the sink has been placed on the canvas connect the Wrangler node with the BigQuery node. Do this by dragging the arrow from the Wrangler node to connect to the BigQuery node as illustrated below. All that’s left top do now is to specify some configuration options so that you can write the data to the dataset you want.

    ETL_BQ.png

  4. Hover your mouse over the BigQuery node and a Properties button will be displayed. Click on this button to open up the plugin’s configuration settings.

  5. Set the properties of the BigQuery Sink as follows:

a. Under Reference Name, enter Titanic_BQ

b. Under Dataset, enter demo

c. Under Table, enter titanic

BQ_properties.png

  1. Click on the Validate button on the top right to confirm there are no errors. After it completes validating, click on the X to close the BigQuery Properties dialog box.

Test and deploy the pipeline

  1. First, make sure to name and save your draft so you won’t lose any of your work. On the top of your canvas, click on Name your pipeline and add a Name (like Static-ETL) and an appropriate Description for your pipeline as illustrated below then click Save.

    pipeline_name_static.png

  2. To test your pipeline click on the Preview icon. Next, click on the Run icon to run the pipeline in preview mode. While the pipeline is running in preview mode no data is actually written to the BigQuery table, but you will be able to confirm that data is being read properly and that it will be written as expected once the pipeline is deployed. The preview button is a toggle so be sure to click it again to get out of preview mode when done.

    pipeline_preview.png

  3. While still in preview mode, click on the Wrangler node. If all went well you should see the raw data that came in from the input, the node to the left, and the parsed records that will be emitted as output, to the node on the right. Each node that operates on data should show you similar output. This is a good way to proof your work to make sure you are on the right track before deploying your pipeline. If you encounter any errors you can easily fix it while in draft mode.

    batch19.png

Click on the X on the top right to close the properties box.

  1. If everything looks good so far, then click on Preview again to exit preview mode. Next, deploy the pipeline by clicking on the Deploy icon.

2.png

  1. You will see a confirmation dialog that your pipeline is being deployed.

  2. Once your pipeline has successfully deployed you’re ready to process some actual data and perform an ETL. Now it’s time to run the pipeline and load some data into BigQuery.

  3. Click on the Run icon to execute the ETL job.

  4. When done you should see pipeline status changes to Succeeded indicating that the pipeline ran successfully.

  5. As data is processed by the pipeline you will see metrics being emitted by each node in the pipeline indicating how many records have been processed.

    pipeline_succeeded.png

Note: Pipeline take 5 - 10 minutes to get Succeeded.
  1. So you have built and deployed an ETL pipeline. In the next section you’ll learn how to turn this simple pipeline into something more dynamic so that you can process any number of records across many files and load them using the same transformation steps as before.

Click Check my progress to verify the objective. Build and execute a batch pipeline

Dynamic loading and writing data

The goal of this section is to take a sequence of files, [titanic1.csv, titanic2.csv, …, titanincN.csv] that you copied into your Cloud Storage bucket, and load them all through the same pipeline and apply the same transformations in parallel.

In order to include all the files that need to be processed as input to the E phase of the ETL, the input string must accommodate the parameterization of the file in sequence.

  1. To get started, duplicate the pipeline you previously deployed. Click on the Actions button and Duplicate will be presented as a drop down option. This will load the pipeline into the studio in draft mode and automatically increment the pipeline with a version number.

Duplicate.png

In the steps below, you will edit the Path field of Cloud Storage source plugin and use macro notation. You will do the same for the BigQuery sink and specify the Dataset and the Table as macros.

  1. Click on the Properties of the Cloud Storage node.

  2. Under the Path setting, click the Macro icon to the right of the form field. This will clear out any text that was previously in the field and pre-populate the ${ } macro notation.

Lab06-02.png

There are a number of choices available for consideration at this point. The goal is to convert the existing static path to one that will meet your needs in terms of how you organize your data, or the level of granularity you need for specifying paths.

For this example, you will read the five CSV files (titanic1.csv through titanic5.csv) from your Cloud Storage bucket.

  1. Set the Path as: gs://YOUR-BUCKET-NAME/incremental/${file.name}. Be sure to replace YOUR-BUCKET-NAME with the name of your actual Cloud Storage bucket.

gcs_path_macros.png

The path can be replaced using multiple macros to split the bucket, folder, and file portions as follows: gs://${bucket.name}/${folder}/${file.name}, but for the sake of simplicity, you will replace the file name portion with ${file.name}.
  1. Click on the Validate button on the top right. Once validated, close the dialog box.

  2. Now you will do the same for the BigQuery node of your pipeline. Click on the properties of the BigQuery node.

  3. Replace the Dataset field with ${bq.dataset} and the Table field with ${table.name}

bq_macros.png

You may have noticed that the macro naming syntax used so far is written in dot notation to separate the words of the placeholder or key. You can use camel case, dashes, or underscores, but whatever you choose, make sure that you are consistent in your use of the parameters so that there is no ambiguity or confusion when you come across macros later.
  1. The changes to the pipeline nodes are now complete. Rename the pipeline as Dynamic_ETL and update the description as appropriate.

  2. Click Deploy to deploy the pipeline.

  3. Next, click Run. You will now see that a new dialog box is presented where you see the macro names and the empty fields where you can type in the desired values:

a. For file.name, enter titanic*.csv

b. For bq.dataset, enter demo

c. For table.name, enter titanic

Click Save at the bottom of the dialog box and click Run to run the pipeline with the provided runtime values.

In order to process all the CSV files, you specify the file name and use a wildcard * to read all the files that begin with titanic and end with .csv. This will selectively read only the titanic files and filter out any other CSV files that may exist in that directory. Optionaly, if you only have CSV files in that directory you can simply set the value to an asterisk ( * ) or a forward slash ( / ) and read all the files in the directory.

pipeline_runtime_arguments.png

Click Check my progress to verify the objective. Build and execute dynamic pipeline using macro

Congratulations!

You have now learned how to leverage Macros to make your Data Fusion pipelines more dynamic and be able to provide runtime arguments for your pipeline runs.

CloudDataFusion_advanced_125x135.png

Continue Your Quest

This self-paced lab is part of the Qwiklabs Building Advanced Codeless Pipelines on Cloud Data Fusion Quest. A Quest is a series of related labs that form a learning path. Completing this Quest earns you the badge above, to recognize your achievement. You can make your badge (or badges) public and link to them in your online resume or social media account. Enroll in this Quest and get immediate completion credit if you've taken this lab. See other available Qwiklabs Quests.

Take Your Next Lab

Continue your Quest with Creating Reusable Pipelines in Cloud Data Fusion

Manual Last Updated October 20, 2021
Lab Last Tested October 20, 2021

©2022 Google LLC All rights reserved. Google and the Google logo are trademarks of Google LLC. All other company and product names may be trademarks of the respective companies with which they are associated.