Wczytuję…
Nie znaleziono wyników.

Wykorzystuj swoje umiejętności w konsoli Google Cloud

Zyskaj dostęp do ponad 700 modułów i kursów

ETL Processing on Google Cloud Using Dataflow and BigQuery (Python)

Moduł 1 godz. 30 godz. universal_currency_alt Punkty: 5 show_chart Średnio zaawansowane
info Ten moduł może zawierać narzędzia AI, które ułatwią Ci naukę.
Zyskaj dostęp do ponad 700 modułów i kursów

GSP290

Google Cloud self-paced labs logo

Overview

Dataflow is a Google Cloud service that provides unified stream and batch data processing at scale. It is built on the Apache Beam project, which is an open source model for defining both batch and streaming data-parallel processing pipelines. Using one of the open source Apache Beam SDKs, you can build a program that defines the pipeline and then use Dataflow to execute the pipeline.

In this lab, you use the Apache Beam SDK for Python to build and run a pipeline in Dataflow to ingest data from Cloud Storage to BigQuery, and then transform and enrich the data in BigQuery.

Note: Be sure to open the Python files and read the comments when instructed. This provides an understanding of what the code does.

What you'll do

In this lab, you learn how to build and run Dataflow pipelines (Python) to do the following:

  • Ingest data from Cloud Storage to BigQuery.
  • Transform and enrich data in BigQuery.
  • Join data in BigQuery and write the results to a new table.

Setup and requirements

Before you click the Start Lab button

Read these instructions. Labs are timed and you cannot pause them. The timer, which starts when you click Start Lab, shows how long Google Cloud resources are made available to you.

This hands-on lab lets you do the lab activities in a real cloud environment, not in a simulation or demo environment. It does so by giving you new, temporary credentials you use to sign in and access Google Cloud for the duration of the lab.

To complete this lab, you need:

  • Access to a standard internet browser (Chrome browser recommended).
Note: Use an Incognito (recommended) or private browser window to run this lab. This prevents conflicts between your personal account and the student account, which may cause extra charges incurred to your personal account.
  • Time to complete the lab—remember, once you start, you cannot pause a lab.
Note: Use only the student account for this lab. If you use a different Google Cloud account, you may incur charges to that account.

How to start your lab and sign in to the Google Cloud console

  1. Click the Start Lab button. If you need to pay for the lab, a dialog opens for you to select your payment method. On the left is the Lab Details pane with the following:

    • The Open Google Cloud console button
    • Time remaining
    • The temporary credentials that you must use for this lab
    • Other information, if needed, to step through this lab
  2. Click Open Google Cloud console (or right-click and select Open Link in Incognito Window if you are running the Chrome browser).

    The lab spins up resources, and then opens another tab that shows the Sign in page.

    Tip: Arrange the tabs in separate windows, side-by-side.

    Note: If you see the Choose an account dialog, click Use Another Account.
  3. If necessary, copy the Username below and paste it into the Sign in dialog.

    {{{user_0.username | "Username"}}}

    You can also find the Username in the Lab Details pane.

  4. Click Next.

  5. Copy the Password below and paste it into the Welcome dialog.

    {{{user_0.password | "Password"}}}

    You can also find the Password in the Lab Details pane.

  6. Click Next.

    Important: You must use the credentials the lab provides you. Do not use your Google Cloud account credentials. Note: Using your own Google Cloud account for this lab may incur extra charges.
  7. Click through the subsequent pages:

    • Accept the terms and conditions.
    • Do not add recovery options or two-factor authentication (because this is a temporary account).
    • Do not sign up for free trials.

After a few moments, the Google Cloud console opens in this tab.

Note: To access Google Cloud products and services, click the Navigation menu or type the service or product name in the Search field. Navigation menu icon and Search field

Activate Cloud Shell

Cloud Shell is a virtual machine that is loaded with development tools. It offers a persistent 5GB home directory and runs on the Google Cloud. Cloud Shell provides command-line access to your Google Cloud resources.

  1. Click Activate Cloud Shell Activate Cloud Shell icon at the top of the Google Cloud console.

  2. Click through the following windows:

    • Continue through the Cloud Shell information window.
    • Authorize Cloud Shell to use your credentials to make Google Cloud API calls.

When you are connected, you are already authenticated, and the project is set to your Project_ID, . The output contains a line that declares the Project_ID for this session:

Your Cloud Platform project in this session is set to {{{project_0.project_id | "PROJECT_ID"}}}

gcloud is the command-line tool for Google Cloud. It comes pre-installed on Cloud Shell and supports tab-completion.

  1. (Optional) You can list the active account name with this command:
gcloud auth list
  1. Click Authorize.

Output:

ACTIVE: * ACCOUNT: {{{user_0.username | "ACCOUNT"}}} To set the active account, run: $ gcloud config set account `ACCOUNT`
  1. (Optional) You can list the project ID with this command:
gcloud config list project

Output:

[core] project = {{{project_0.project_id | "PROJECT_ID"}}} Note: For full documentation of gcloud, in Google Cloud, refer to the gcloud CLI overview guide.

Task 1. Ensure that the Dataflow API is successfully enabled

  • To ensure access to the necessary API, restart the connection to the Dataflow API.

    Important: Even if the API is currently enabled, follow the steps below to disable, and then re-enable the API, to restart the API successfully.
gcloud services disable dataflow.googleapis.com --project {{{project_0.project_id|Project ID}}} --force gcloud services enable dataflow.googleapis.com --project {{{project_0.project_id|Project ID}}}

When the API has been re-enabled, the page shows the Disable option.

Click Check my progress to verify your performed task. Disable and re-enable the Dataflow API.

Task 2. Download the starter code

Download the Dataflow Python examples to use in this lab.

gcloud storage cp -r gs://spls/gsp290/dataflow-python-examples .

Task 3. Create a Cloud Storage bucket and copy files to the bucket

In Cloud Shell, create a Cloud Storage bucket, and then copy files to the bucket. These files are the Dataflow Python examples.

Create a Cloud Storage bucket

  • Still in Cloud Shell, use the make bucket command to create a new regional bucket in the region within your project:
gcloud storage buckets create gs://{{{ project_0.project_id | BUCKET_NAME }}} --location={{{ project_0.default_region | REGION }}}

Click Check my progress to verify your performed task. Create a Cloud Storage Bucket.

Copy files to your bucket

  • In Cloud Shell, use the gsutil command to copy files into the Cloud Storage bucket you just created:
gcloud storage cp gs://spls/gsp290/data_files/usa_names.csv gs://{{{ project_0.project_id | BUCKET_NAME }}}/data_files/ gcloud storage cp gs://spls/gsp290/data_files/head_usa_names.csv gs://{{{ project_0.project_id | BUCKET_NAME }}}/data_files/

Click Check my progress to verify your performed task. Copy Files to Your Bucket.

Task 4. Create a BigQuery dataset

Create a dataset in BigQuery dataset. This is where your tables are loaded in BigQuery.

  • In Cloud Shell, create the dataset named lake:
bq mk lake

Click Check my progress to verify your performed task. Create the BigQuery Dataset named lake.

Task 5. Review and run the data ingestion pipeline

In this task, you review the pipeline code to see how it works. You then set up and run the pipeline.

The data ingestion pipeline ingests data from Cloud Storage into the BigQuery table using a TextIO source and a BigQueryIO destination. Specifically, the pipeline:

  • Ingests the files from Cloud Storage.
  • Filters out the header row in the files.
  • Converts the lines read to dictionary objects.
  • Outputs the rows to BigQuery.

Enable Gemini Code Assist in the Cloud Shell IDE

You can use Gemini Code Assist in an integrated development environment (IDE) such as Cloud Shell to receive guidance on code or solve problems with your code. Before you can start using Gemini Code Assist, however, you need to enable it.

  1. In Cloud Shell, enable the Gemini for Google Cloud API with the following command:
gcloud services enable cloudaicompanion.googleapis.com
  1. Click Open Editor on the Cloud Shell toolbar.
Note: To open the Cloud Shell Editor, click Open Editor on the Cloud Shell toolbar. You can switch between Cloud Shell and the code Editor by clicking Open Editor or Open Terminal, as required.
  1. In the Cloud Shell Editor, navigate to Cloud Code > Help and Feedback > Change Settings.

  2. In the Settings, search for Gemini Code Assist.

  3. Locate and ensure that the checkbox is selected for Geminicodeassist: Enable, and close the Settings.

  4. Click Cloud Code - No Project in the status bar at the bottom of the screen.

  5. Authorize the plugin as instructed. If a project is not automatically selected, click Select a Google Cloud Project, and choose .

  6. Verify that your Google Cloud project () displays in the Cloud Code status message in the status bar.

Review the Python code for the data ingestion pipeline

In this section, prompt Gemini Code Assist for more information on the data ingestion pipeline to provide an overview for a new team member.

  1. In the Cloud Shell Editor's file Explorer, navigate to dataflow_python_examples > dataflow_python_examples > data_ingestion.py.

  2. Open the data_ingestion.py file. This action enables Gemini Code Assist, as indicated by the presence of the Gemini Code Assist: Smart Actions icon in the upper-right corner of the editor.

  3. Click the Gemini Code Assist: Smart Actions Gemini Code Assist: Smart Actions icon and select Explain this.

  4. Gemini Code Assist opens a chat pane with the prefilled prompt of Explain this. In the inline text box of the Code Assist chat, replace the prefilled prompt with the following, and click Send:

You are an expert Data Engineer at Cymbal AI. A new team member is unfamiliar with this pipeline code. Explain the purpose and functionality of the data ingestion pipeline defined in the data_ingestion.py. Your explanation should include: 1. A high-level summary of what the script does. 2. A breakdown of the key components, such as the DataIngestion class and the run function. 3. An explanation of how the script uses the Apache Beam pipeline to read, process, and write data. 4. The role of command-line arguments and how they are used. 5. A description of the input data format and the output BigQuery table schema. For the suggested improvements, don't update this file.

This code populates a BigQuery table with the data files from Cloud Storage. The detailed explanation for the code in the data_ingestion.py file appears in the Gemini Code Assist chat.

  1. To return to Cloud Shell, click Open Terminal.

Set up the Docker container for the Dataflow jobs

In this section, you return to your Cloud Shell session to set up the required Python libraries.

The Dataflow jobs in this lab require Python3.8. To ensure you're on the proper version, run the Dataflow processes in a Python 3.8 Docker container.

  1. Run the following in Cloud Shell to start up a Python container:
cd ~ docker run -it -e PROJECT={{{ project_0.project_id | PROJECT_ID }}} -v $(pwd)/dataflow-python-examples:/dataflow python:3.8 /bin/bash

This command pulls a Docker container with the latest stable version of Python 3.8 and executes a command shell to run the next commands within the container. The -v flag provides the source code as a volume for the container so that we can edit in Cloud Shell editor and still access it within the running container.

  1. Once the container finishes pulling and starts executing in Cloud Shell, run the following to install apache-beam in that running container:
pip install apache-beam[gcp]==2.59.0
  1. Next, in the running container in the Cloud Shell, change directories into where you linked the source code:
cd dataflow/

Run the data ingestion pipeline in the cloud

  1. Run the following code to execute the data ingestion pipeline:
python dataflow_python_examples/data_ingestion.py \ --project={{{ project_0.project_id | PROJECT_ID }}} \ --region={{{ project_0.default_region | REGION }}} \ --runner=DataflowRunner \ --machine_type=e2-standard-2 \ --staging_location=gs://{{{ project_0.project_id | BUCKET_NAME }}}/test \ --temp_location gs://{{{ project_0.project_id | BUCKET_NAME }}}/test \ --input gs://{{{ project_0.project_id | BUCKET_NAME }}}/data_files/head_usa_names.csv \ --save_main_session

This code spins up the workers required and then shut them down when the pipeline is complete.

  1. In the console title bar, type Dataflow in the Search field, and then click Dataflow in the search results.

When the Dataflow page opens, view the status of your job.

  1. Click the job's name to watch its progress.

Once your Job Status is Succeeded, you can move to the next step. This ingestion pipeline takes approximately five minutes to start, complete the work, and then shutdown.

  1. Navigate to BigQuery (Navigation menu > BigQuery) to see that your data has been populated.

  2. Click your project name to see the usa_names table under the lake dataset.

usa_names table

  1. Click the table then navigate to the Preview tab to see examples of the usa_names data.
Note: If you don't see the usa_names table, try refreshing the page or view the tables using the classic BigQuery UI.

Click Check my progress to verify your performed task. Build a data ingestion pipeline.

Task 6. Review and run the data transformation pipeline

In this task, you review the data transformation pipeline to learn how it works. You then run the pipeline to process the Cloud Storage files and output the result to BigQuery.

The data transformation pipeline also ingests data from Cloud Storage into the BigQuery table using a TextIO source and a BigQueryIO destination, but with additional data transformations. Specifically, the pipeline:

  • Ingests the files from Cloud Storage.
  • Converts the lines read to dictionary objects.
  • Transforms the data which contains the year to a format BigQuery understands as a date.
  • Outputs the rows to BigQuery.

Review the Python code for the data transformation pipeline

In this section, you prompt Gemini Code Assist for additional information on the data transformation pipeline to assist the new team member further.

  1. In the Cloud Shell menu bar, click Open Editor.

  2. In the Cloud Shell Editor, still in the same directory, navigate to data_transformation.py file. As before, notice the Gemini Code Assist: Smart Actions icon in the upper-right corner of the editor.

  3. Click the Gemini Code Assist: Smart Actions Gemini Code Assist: Smart Actions icon and select Explain this.

  4. Gemini Code Assist opens a chat pane with the prefilled prompt of Explain this. In the inline text box of the Code Assist chat, replace the prefilled prompt with the following, and click Send:

You are an expert Data Engineer at Cymbal AI. A new team member is unfamiliar with this pipeline code. Explain the purpose and functionality of the data transformation pipeline defined in the data_transformation.py. Your explanation should include: 1. A high-level summary of what the script does, noting its differences from a simple ingestion pipeline. 2. A breakdown of the key components, specifically the DataTransformation class and the run function. 3. A detailed explanation of how the script uses the Apache Beam pipeline to read from a file, transform the data, and write it to a BigQuery table. 4. Describe how the script handles the BigQuery schema by reading it from a JSON file. 5. Explain the data transformation logic within the parse_method, particularly how it converts the year to a DATE type. 6. The role of command-line arguments and how they are used. For the suggested improvements, don't update this file.

The explanation for the code in the data_transformation.py file appears in the Gemini Code Assist chat.

Run the data transformation pipeline in the cloud

  1. Enter the following command in the Cloud Shell Terminal to run the data transformation pipeline:
python dataflow_python_examples/data_transformation.py \ --project={{{ project_0.project_id | PROJECT_ID }}} \ --region={{{ project_0.default_region | REGION }}} \ --runner=DataflowRunner \ --machine_type=e2-standard-2 \ --staging_location=gs://{{{ project_0.project_id | BUCKET_NAME }}}/test \ --temp_location gs://{{{ project_0.project_id | BUCKET_NAME }}}/test \ --input gs://{{{ project_0.project_id | BUCKET_NAME }}}/data_files/head_usa_names.csv \ --save_main_session
  1. In the Google Cloud console title bar, type Dataflow in the Search field and then click Dataflow from the search results.

  2. Click the name of this job to view the status of your job.

This Dataflow pipeline takes approximately five minutes to start, complete the work, and then shutdown.

  1. When your Job Status is Succeeded in the Dataflow Job Status screen, navigate to BigQuery to check to see that your data has been populated.

You should see the usa_names_transformed table under the lake dataset.

  1. Click the table and navigate to the Preview tab to see examples of the usa_names_transformed data.
Note: If you don't see the usa_names_transformed table, try refreshing the page or view the tables using the classic BigQuery UI.

Click Check my progress to verify your performed task. Build a data transformation pipeline.

Task 7. Review and run the data enrichment pipeline

You now build a data enrichment pipeline that accomplishes the following:

  • Ingest the files from Cloud Storage.
  • Filter out the header row in the files.
  • Convert the lines read to dictionary objects.
  • Output the rows to BigQuery.

Review and edit the Python code for the data enrichment pipeline

In this section, you leverage the AI-powered features of Gemini Code Assist to review and edit the Python code for the data enrichment pipeline.

  1. In the Cloud Shell menu bar, click Open Editor.

  2. In the Cloud Shell Editor, still in the same directory, navigate to data_enrichment.py. As before, notice the Gemini Code Assist: Smart Actions icon in the upper-right corner of the editor.

  1. Click the Gemini Code Assist: Smart Actions Gemini Code Assist icon on the toolbar.

  2. To update the code at Line 83, paste the following prompt into the Gemini Code Assist inline text field that opens from the toolbar.

In the data_enrichment.py file, update line 83 by replacing x.decode('utf8') with x.
  1. To prompt Gemini Code Assist to modify the code accordingly, press ENTER.

  2. When prompted in the Gemini Diff view, click Accept.

The updated line 83 in the data_enrichment.py file now looks something like this:

values = [x for x in csv_row]
  1. When you have finished editing this line, remember to Save this updated file by selecting the File option in the Code Editor and clicking Save.

Run the data enrichment pipeline

  1. Enter the following command in the Cloud Shell Terminal to execute the data enrichment pipeline:
python dataflow_python_examples/data_enrichment.py \ --project={{{ project_0.project_id | PROJECT_ID }}} \ --region={{{ project_0.default_region | REGION }}} \ --runner=DataflowRunner \ --machine_type=e2-standard-2 \ --staging_location=gs://{{{ project_0.project_id | BUCKET_NAME }}}/test \ --temp_location gs://{{{ project_0.project_id | BUCKET_NAME }}}/test \ --input gs://{{{ project_0.project_id | BUCKET_NAME }}}/data_files/head_usa_names.csv \ --save_main_session
  1. In the Dataflow page, click your job to view its Job status.

This Dataflow pipeline takes approximately five minutes to start, complete the work, and then shut down.

  1. Once your Job Status is Succeed in the Dataflow Job Status screen, in the console click Navigation menu > BigQuery to check to see that your data has been populated.

You should see the usa_names_enriched table under the lake dataset.

  1. Click the table and navigate to the Preview tab to see examples of the usa_names_enriched data.
Note: If you don't see the usa_names_enriched table, try refreshing the page or view the tables using the classic BigQuery UI.

Click Check my progress to verify your performed task. Build a data enrichment pipeline.

Task 8. Review and run the data lake to data mart pipeline

Now you build a Dataflow pipeline that reads data from two BigQuery data sources and then joins the data sources. Specifically, you:

  • Ingest files from two BigQuery sources.
  • Join the two data sources.
  • Filter out the header row in the files.
  • Convert the lines read to dictionary objects.
  • Output the rows to BigQuery.

Run the data ingestion pipeline to perform a data join and write the resulting table to BigQuery

You first review the data_lake_to_mart.py code to gain understanding of what it does. You then run the pipeline in the cloud.

  1. In the Code Editor, open data_lake_to_mart.py file.

Read the comments in the file, which explain what the code is doing. This code joins two tables and write the results to new table in BigQuery.

Note: To learn more about the data lake to data mart pipeline, you can again ask Gemini Code Assist to explain the code, similar to how you used its AI-powered features in Tasks 5 and 6.
  1. Run the following code block to execute the pipeline:
python dataflow_python_examples/data_lake_to_mart.py \ --worker_disk_type="compute.googleapis.com/projects//zones//diskTypes/pd-ssd" \ --max_num_workers=4 \ --project={{{ project_0.project_id | PROJECT_ID }}} \ --runner=DataflowRunner \ --machine_type=e2-standard-2 \ --staging_location=gs://{{{ project_0.project_id | BUCKET_NAME }}}/test \ --temp_location gs://{{{ project_0.project_id | BUCKET_NAME }}}/test \ --save_main_session \ --region={{{ project_0.default_region | REGION }}}
  1. In the Google Cloud console title bar, type Dataflow in the Search field, and then click Dataflow from the search results.

  2. Click this new job to view the status.

This Dataflow pipeline takes approximately five minutes to start, complete the work, and then shutdown.

  1. Once your Job Status is Succeeded in the Dataflow Job Status screen, click Navigation menu > BigQuery to check that your data has been populated.

You should see the orders_denormalized_sideinput table under the lake dataset.

  1. Click the table and navigate to the Preview section to see examples of orders_denormalized_sideinput data.
Note: If you don't see the orders_denormalized_sideinput table, try refreshing the page or view the tables using the classic BigQuery UI.

Click Check my progress to verify your performed task. Build a Data lake to Mart Dataflow Pipeline

Test your understanding

Below is a multiple-choice question to reinforce your understanding of this lab's concepts. Answer it to the best of your ability.

Congratulations!

You executed Python code using Dataflow with input from Gemini Code Assist to ingest data from Cloud Storage into BigQuery and then transform and enrich the data in BigQuery.

Next steps / Learn more

Looking for more? Check out the official documentation on:

Google Cloud training and certification

...helps you make the most of Google Cloud technologies. Our classes include technical skills and best practices to help you get up to speed quickly and continue your learning journey. We offer fundamental to advanced level training, with on-demand, live, and virtual options to suit your busy schedule. Certifications help you validate and prove your skill and expertise in Google Cloud technologies.

Manual Last Updated September 1, 2025

Lab Last Tested September 1, 2025

Copyright 2025 Google LLC. All rights reserved. Google and the Google logo are trademarks of Google LLC. All other company and product names may be trademarks of the respective companies with which they are associated.

Zanim zaczniesz

  1. Moduły tworzą projekt Google Cloud i zasoby na określony czas.
  2. Moduły mają ograniczenie czasowe i nie mają funkcji wstrzymywania. Jeśli zakończysz moduł, musisz go zacząć od początku.
  3. Aby rozpocząć, w lewym górnym rogu ekranu kliknij Rozpocznij moduł.

Użyj przeglądania prywatnego

  1. Skopiuj podaną nazwę użytkownika i hasło do modułu.
  2. Kliknij Otwórz konsolę w trybie prywatnym.

Zaloguj się w konsoli

  1. Zaloguj się z użyciem danych logowania do modułu. Użycie innych danych logowania może spowodować błędy lub naliczanie opłat.
  2. Zaakceptuj warunki i pomiń stronę zasobów przywracania.
  3. Nie klikaj Zakończ moduł, chyba że właśnie został przez Ciebie zakończony lub chcesz go uruchomić ponownie, ponieważ spowoduje to usunięcie wyników i projektu.

Ta treść jest obecnie niedostępna

Kiedy dostępność się zmieni, wyślemy Ci e-maila z powiadomieniem

Świetnie

Kiedy dostępność się zmieni, skontaktujemy się z Tobą e-mailem

Jeden moduł, a potem drugi

Potwierdź, aby zakończyć wszystkie istniejące moduły i rozpocząć ten

Aby uruchomić moduł, użyj przeglądania prywatnego

Uruchom ten moduł w oknie incognito lub przeglądania prywatnego. Dzięki temu unikniesz konfliktu między swoim kontem osobistym a kontem do nauki, co mogłoby spowodować naliczanie dodatkowych opłat na koncie osobistym.