arrow_back

使用 Dataflow 和 BigQuery (Python) 在 Google Cloud 上进行 ETL 处理

登录 加入
访问 700 多个实验和课程

使用 Dataflow 和 BigQuery (Python) 在 Google Cloud 上进行 ETL 处理

实验 1 小时 30 分钟 universal_currency_alt 5 个积分 show_chart 中级
info 此实验可能会提供 AI 工具来支持您学习。
访问 700 多个实验和课程

GSP290

Google Cloud 自学实验的徽标

概览

Dataflow 是一项 Google Cloud 服务,支持大规模的统一流式数据处理和批量数据处理操作。它基于 Apache Beam 项目构建,该项目是一种开源模型,用于定义批量数据和流式数据并行处理流水线。借助其中一个开源 Apache Beam SDK,您可以构建一个程序来定义流水线,然后使用 Dataflow 执行该流水线。

在本实验中,您将使用 Python 版 Apache Beam SDK 在 Dataflow 中构建和运行流水线,将 Cloud Storage 中的数据注入 BigQuery,然后在 BigQuery 中转换和丰富数据。

注意:请务必打开 Python 文件,并按照说明查看注释。这样可以了解代码的作用。

您将执行的操作

在本实验中,您将学习如何完成以下操作:

  • 构建并运行 Dataflow 流水线 (Python),将 Cloud Storage 中的数据注入 BigQuery。
  • 构建并运行 Dataflow 流水线 (Python),在 BigQuery 中转换和丰富数据。
  • 构建并运行 Dataflow 流水线 (Python),在 BigQuery 中执行数据联接并将结果写入新表。

设置和要求

点击“开始实验”按钮前的注意事项

请阅读以下说明。实验是计时的,并且您无法暂停实验。计时器在您点击开始实验后即开始计时,显示 Google Cloud 资源可供您使用多长时间。

此实操实验可让您在真实的云环境中开展实验活动,免受模拟或演示环境的局限。为此,我们会向您提供新的临时凭据,您可以在该实验的规定时间内通过此凭据登录和访问 Google Cloud。

为完成此实验,您需要:

  • 能够使用标准的互联网浏览器(建议使用 Chrome 浏览器)。
注意:请使用无痕模式(推荐)或无痕浏览器窗口运行此实验。这可以避免您的个人账号与学生账号之间发生冲突,这种冲突可能导致您的个人账号产生额外费用。
  • 完成实验的时间 - 请注意,实验开始后无法暂停。
注意:请仅使用学生账号完成本实验。如果您使用其他 Google Cloud 账号,则可能会向该账号收取费用。

如何开始实验并登录 Google Cloud 控制台

  1. 点击开始实验按钮。如果该实验需要付费,系统会打开一个对话框供您选择支付方式。左侧是“实验详细信息”窗格,其中包含以下各项:

    • “打开 Google Cloud 控制台”按钮
    • 剩余时间
    • 进行该实验时必须使用的临时凭据
    • 帮助您逐步完成本实验所需的其他信息(如果需要)
  2. 点击打开 Google Cloud 控制台(如果您使用的是 Chrome 浏览器,请右键点击并选择在无痕式窗口中打开链接)。

    该实验会启动资源并打开另一个标签页,显示“登录”页面。

    提示:将这些标签页安排在不同的窗口中,并排显示。

    注意:如果您看见选择账号对话框,请点击使用其他账号
  3. 如有必要,请复制下方的用户名,然后将其粘贴到登录对话框中。

    {{{user_0.username | "<用户名>"}}}

    您也可以在“实验详细信息”窗格中找到“用户名”。

  4. 点击下一步

  5. 复制下面的密码,然后将其粘贴到欢迎对话框中。

    {{{user_0.password | "<密码>"}}}

    您也可以在“实验详细信息”窗格中找到“密码”。

  6. 点击下一步

    重要提示:您必须使用实验提供的凭据。请勿使用您的 Google Cloud 账号凭据。 注意:在本实验中使用您自己的 Google Cloud 账号可能会产生额外费用。
  7. 继续在后续页面中点击以完成相应操作:

    • 接受条款及条件。
    • 由于这是临时账号,请勿添加账号恢复选项或双重验证。
    • 请勿注册免费试用。

片刻之后,系统会在此标签页中打开 Google Cloud 控制台。

注意:如需访问 Google Cloud 产品和服务,请点击导航菜单,或在搜索字段中输入服务或产品的名称。 “导航菜单”图标和“搜索”字段

激活 Cloud Shell

Cloud Shell 是一种装有开发者工具的虚拟机。它提供了一个永久性的 5GB 主目录,并且在 Google Cloud 上运行。Cloud Shell 提供可用于访问您的 Google Cloud 资源的命令行工具。

  1. 点击 Google Cloud 控制台顶部的激活 Cloud Shell “激活 Cloud Shell”图标

  2. 在弹出的窗口中执行以下操作:

    • 继续完成 Cloud Shell 信息窗口中的设置。
    • 授权 Cloud Shell 使用您的凭据进行 Google Cloud API 调用。

如果您连接成功,即表示您已通过身份验证,且项目 ID 会被设为您的 Project_ID 。输出内容中有一行说明了此会话的 Project_ID

Your Cloud Platform project in this session is set to {{{project_0.project_id | "PROJECT_ID"}}}

gcloud 是 Google Cloud 的命令行工具。它已预先安装在 Cloud Shell 上,且支持 Tab 自动补全功能。

  1. (可选)您可以通过此命令列出活跃账号名称:
gcloud auth list
  1. 点击授权

输出:

ACTIVE: * ACCOUNT: {{{user_0.username | "ACCOUNT"}}} To set the active account, run: $ gcloud config set account `ACCOUNT`
  1. (可选)您可以通过此命令列出项目 ID:
gcloud config list project

输出:

[core] project = {{{project_0.project_id | "PROJECT_ID"}}} 注意:如需查看在 Google Cloud 中使用 gcloud 的完整文档,请参阅 gcloud CLI 概览指南

任务 1. 确保 Dataflow API 已成功启用

为了确保能访问这个必要的 API,请重新启动与 Dataflow API 的连接。

重要提示:即使该 API 目前已启用,也请按照以下第 1 步至第 4 步,停用并重新启用该 API,以便成功将其重新启动。
  1. 在 Google Cloud 控制台标题栏的搜索字段中输入 Dataflow API,然后点击搜索结果中的 Dataflow API

  2. 点击管理

  3. 点击停用 API

如果系统要求您确认,请点击停用

  1. 点击启用

重新启用该 API 后,页面会显示停用选项。

点击检查我的进度以验证您已完成的任务。

停用并重新启用 Dataflow API。

任务 2. 下载起始代码

下载要在本实验中使用的 Dataflow Python 示例。

  1. 在 Cloud Shell 中运行以下命令,从 Google Cloud 的专业服务 GitHub 中获取 Dataflow Python 示例:
gcloud storage cp -r gs://spls/gsp290/dataflow-python-examples .
  1. 为项目 ID 设置一个变量。
export PROJECT={{{ project_0.project_id }}} gcloud config set project $PROJECT

任务 3. 创建 Cloud Storage 存储桶并将文件复制到该存储桶

在 Cloud Shell 中创建一个 Cloud Storage 存储桶,然后将文件复制到该存储桶。这些文件是 Dataflow Python 示例。

创建 Cloud Storage 存储桶

  • 仍然在 Cloud Shell 中,使用 make bucket 命令在项目的 区域中新建一个区域级存储桶:
gcloud storage buckets create gs://$PROJECT --location={{{ project_0.default_region | REGION }}}

点击检查我的进度以验证您已完成的任务。

创建 Cloud Storage 存储桶。

将文件复制到存储桶

  • 在 Cloud Shell 中,使用 gsutil 命令将文件复制到刚才创建的 Cloud Storage 存储桶中:
gcloud storage cp gs://spls/gsp290/data_files/usa_names.csv gs://$PROJECT/data_files/ gcloud storage cp gs://spls/gsp290/data_files/head_usa_names.csv gs://$PROJECT/data_files/

点击检查我的进度以验证您已完成的任务。

将文件复制到存储桶。

任务 4. 创建 BigQuery 数据集

创建一个 BigQuery 数据集。您的表将加载到 BigQuery 中的这个数据集中。

在 Cloud Shell 中,创建名为 lake 的数据集:

bq mk lake

点击检查我的进度以验证您已完成的任务。

创建名为“lake”的 BigQuery 数据集。

任务 5. 查看和运行数据注入流水线

在此任务中,您将查看流水线代码,了解其工作原理。然后,您将设置并运行流水线。

数据注入流水线使用 TextIO 作为来源、BigQueryIO 作为目的地,将 Cloud Storage 中的数据注入到 BigQuery 表中。具体而言,该流水线将执行以下操作:

  • 从 Cloud Storage 提取文件。
  • 滤除文件中的标题行。
  • 将读取的行转换为字典对象。
  • 将行输出到 BigQuery。

查看数据注入流水线的 Python 代码

使用 Cloud Shell 代码编辑器查看流水线代码。

  1. 在 Cloud Shell 菜单栏中点击打开编辑器

  2. 前往 dataflow_python_examples > dataflow_python_examples,然后打开 data_ingestion.py 文件。

  3. 查看文件中的注释,这些注释解释了代码的作用。

这些代码会使用 Cloud Storage 中的数据文件填充 BigQuery 表。

  1. 如需返回 Cloud Shell,请点击打开终端

为 Dataflow 作业设置 Docker 容器

  1. 返回 Cloud Shell 会话,设置所需的 Python 库。

在本实验中,Dataflow 作业需要使用 Python3.8。为确保您使用的是正确的版本,请在 Python 3.8 Docker 容器中运行 Dataflow 进程。

  1. 在 Cloud Shell 中运行以下命令来启动 Python 容器:
cd ~ docker run -it -e PROJECT=$PROJECT -v $(pwd)/dataflow-python-examples:/dataflow python:3.8 /bin/bash

此命令会拉取包含 Python 3.8 最新稳定版的 Docker 容器,并执行一个命令 shell,以便在容器内运行后续命令。-v 标志会将源代码作为容器的一个提供,这样,我们在 Cloud Shell 编辑器中修改源代码的同时,仍能在运行的容器中访问源代码。

  1. 容器完成拉取并开始在 Cloud Shell 中执行后,运行以下命令将 apache-beam 安装在运行的容器中:
pip install apache-beam[gcp]==2.59.0
  1. 接下来,在 Cloud Shell 中运行的容器内,将目录切换到链接源代码的位置:
cd dataflow/
  1. 在容器中设置项目 ID:
export PROJECT={{{ project_0.project_id }}}

在云端运行数据注入流水线

  1. 运行以下代码以执行数据注入流水线:
python dataflow_python_examples/data_ingestion.py \ --project=$PROJECT \ --region={{{ project_0.default_region | REGION }}} \ --runner=DataflowRunner \ --machine_type=e2-standard-2 \ --staging_location=gs://$PROJECT/test \ --temp_location gs://$PROJECT/test \ --input gs://$PROJECT/data_files/head_usa_names.csv \ --save_main_session

这些代码会启动所需的工作器,并在流水线完成后将它们关闭。

  1. 在控制台标题栏的搜索字段中,输入 Dataflow,然后点击搜索结果中的 Dataflow

Dataflow 页面打开后,查看作业的状态。

  1. 点击作业的名称以查看其进度。

作业状态显示为成功后,您便可执行下一步。此注入流水线从启动、完成工作到关闭,大约需要五分钟时间。

  1. 前往 BigQuery(导航菜单 > BigQuery),确认是否已填充数据。

  2. 点击项目名称,查看 lake 数据集下的 usa_names 表。

usa_names 表

  1. 点击此表,然后前往预览标签页,查看 usa_names 数据的示例。
注意:如果您没有看到 usa_names 表,请尝试刷新页面或使用经典版 BigQuery 界面来查看此表。

点击检查我的进度以验证您已完成的任务。

构建数据注入流水线。

任务 6. 查看和运行数据转换流水线

在此任务中,您将查看数据转换流水线,了解其工作原理。然后,您将运行该流水线来处理 Cloud Storage 文件,并将结果输出到 BigQuery。

数据转换流水线还将使用 TextIO 作为来源、BigQueryIO 作为目的地,将 Cloud Storage 中的数据注入 BigQuery 表中,但会进行额外的数据转换。具体而言,该流水线将执行以下操作:

  • 从 Cloud Storage 提取文件。
  • 将读取的行转换为字典对象。
  • 将包含年份的数据转换为 BigQuery 视为日期的格式。
  • 将行输出到 BigQuery。

查看数据转换流水线的 Python 代码

  • 在代码编辑器中,打开 data_transformation.py

查看文件中的注释,这些注释解释了代码的作用。

在云端运行数据转换流水线

  1. 运行以下代码以运行数据转换流水线:
python dataflow_python_examples/data_transformation.py \ --project=$PROJECT \ --region={{{ project_0.default_region | REGION }}} \ --runner=DataflowRunner \ --machine_type=e2-standard-2 \ --staging_location=gs://$PROJECT/test \ --temp_location gs://$PROJECT/test \ --input gs://$PROJECT/data_files/head_usa_names.csv \ --save_main_session
  1. 在 Google Cloud 控制台标题栏的搜索字段中输入 Dataflow,然后点击搜索结果中的 Dataflow

  2. 点击此作业名称可查看作业状态。

此 Dataflow 流水线从启动、完成工作到关闭,大约需要五分钟时间。

  1. 当 Dataflow“作业状态”屏幕中的作业状态显示为成功后,前往 BigQuery 确认是否已填充数据。

您应该可在 lake 数据集下看到 usa_names_transformed 表。

  1. 点击此表,然后前往预览标签页,查看 usa_names_transformed 数据的示例。
注意:如果您没有看到 usa_names_transformed 表,请尝试刷新页面或使用经典版 BigQuery 界面来查看此表。

点击检查我的进度以验证您已完成的任务。

构建数据转换流水线。

任务 7. 查看并运行数据丰富化流水线

现在,您将构建一个数据丰富化流水线,以实现以下目标:

  • 从 Cloud Storage 提取文件。
  • 滤除文件中的标题行。
  • 将读取的行转换为字典对象。
  • 将行输出到 BigQuery。

查看和修改数据丰富化流水线的 Python 代码

  1. 在代码编辑器中,打开 data_enrichment.py

  2. 查看注释,这些注释解释了代码的作用。此代码会在 BigQuery 中填充数据。

第 83 行现如下所示:

values = [x.decode('utf8') for x in csv_row]
  1. 将该行修改为如下所示的内容:
values = [x for x in csv_row]
  1. 将该行代码修改完毕后,请务必在代码编辑器中选择文件选项并点击保存,以保存更新后的文件。

运行数据丰富化流水线

  1. 运行以下代码以执行数据丰富化流水线:
python dataflow_python_examples/data_enrichment.py \ --project=$PROJECT \ --region={{{ project_0.default_region | REGION }}} \ --runner=DataflowRunner \ --machine_type=e2-standard-2 \ --staging_location=gs://$PROJECT/test \ --temp_location gs://$PROJECT/test \ --input gs://$PROJECT/data_files/head_usa_names.csv \ --save_main_session
  1. 在 Dataflow 页面中,点击您的作业以查看作业状态

此 Dataflow 流水线从启动、完成工作到关闭,大约需要五分钟时间。

  1. Dataflow 作业状态屏幕中的作业状态显示为成功后,在控制台中依次点击导航菜单 > BigQuery,确认是否已填充数据。

您应该可在 lake 数据集下看到 usa_names_enriched 表。

  1. 点击此表,然后前往预览标签页,查看 usa_names_enriched 数据的示例。
注意:如果您没有看到 usa_names_enriched 表,请尝试刷新页面或使用经典版 BigQuery 界面来查看此表。

点击检查我的进度以验证您已完成的任务。

构建数据丰富化流水线。

任务 8. 查看和运行从数据湖到数据集市的流水线

接下来,您需要构建一条 Dataflow 流水线,从两个 BigQuery 数据源读取数据,然后联接这两个数据源。具体操作如下:

  • 从两个 BigQuery 数据源提取文件。
  • 联接两个数据源。
  • 滤除文件中的标题行。
  • 将读取的行转换为字典对象。
  • 将行输出到 BigQuery。

运行数据注入流水线以执行数据联接,并将生成的表写入 BigQuery

您首先要查看 data_lake_to_mart.py 代码,了解其作用。然后,您将在云端运行该流水线。

  1. 代码编辑器中,打开 data_lake_to_mart.py 文件。

查看文件中的注释,这些注释解释了代码的作用。此代码会联接两个表,并将结果写入 BigQuery 中的新表。

  1. 运行以下代码块以执行流水线:
python dataflow_python_examples/data_lake_to_mart.py \ --worker_disk_type="compute.googleapis.com/projects//zones//diskTypes/pd-ssd" \ --max_num_workers=4 \ --project=$PROJECT \ --runner=DataflowRunner \ --machine_type=e2-standard-2 \ --staging_location=gs://$PROJECT/test \ --temp_location gs://$PROJECT/test \ --save_main_session \ --region={{{ project_0.default_region | REGION }}}
  1. 在 Google Cloud 控制台标题栏的搜索字段中输入 Dataflow,然后点击搜索结果中的 Dataflow

  2. 点击此新作业以查看作业状态。

此 Dataflow 流水线从启动、完成工作到关闭,大约需要五分钟时间。

  1. 当 Dataflow“作业状态”屏幕中的作业状态显示为成功后,点击导航菜单 > BigQuery 确认是否已填充数据。

您应该可在 lake 数据集下看到 orders_denormalized_sideinput 表。

  1. 点击此表,然后前往预览部分,查看 orders_denormalized_sideinput 数据的示例。
注意:如果您没有看到 orders_denormalized_sideinput 表,请尝试刷新页面或使用经典版 BigQuery 界面来查看此表。

点击检查我的进度以验证您已完成的任务。

构建从数据湖到数据集市的 Dataflow 流水线

检验您的掌握情况

下面有一道选择题可强化您对此实验所涉概念的理解。请尽您所能回答。

恭喜!

您使用 Dataflow 执行了 Python 代码,将 Cloud Storage 中的数据注入 BigQuery,然后在 BigQuery 中转换并丰富了数据。

后续步骤/了解详情

想要查找更多参考信息?请点击以下链接查看官方文档:

Google Cloud 培训和认证

…可帮助您充分利用 Google Cloud 技术。我们的课程会讲解各项技能与最佳实践,可帮助您迅速上手使用并继续学习更深入的知识。我们提供从基础到高级的全方位培训,并有点播、直播和虚拟三种方式选择,让您可以按照自己的日程安排学习时间。各项认证可以帮助您核实并证明您在 Google Cloud 技术方面的技能与专业知识。

上次更新手册的时间:2025 年 4 月 1 日

上次测试实验的时间:2025 年 4 月 1 日

版权所有 2025 Google LLC 保留所有权利。Google 和 Google 徽标是 Google LLC 的商标。其他所有公司名和产品名可能是其各自相关公司的商标。

准备工作

  1. 实验会创建一个 Google Cloud 项目和一些资源,供您使用限定的一段时间
  2. 实验有时间限制,并且没有暂停功能。如果您中途结束实验,则必须重新开始。
  3. 在屏幕左上角,点击开始实验即可开始

使用无痕浏览模式

  1. 复制系统为实验提供的用户名密码
  2. 在无痕浏览模式下,点击打开控制台

登录控制台

  1. 使用您的实验凭证登录。使用其他凭证可能会导致错误或产生费用。
  2. 接受条款,并跳过恢复资源页面
  3. 除非您已完成此实验或想要重新开始,否则请勿点击结束实验,因为点击后系统会清除您的工作并移除该项目

此内容目前不可用

一旦可用,我们会通过电子邮件告知您

太好了!

一旦可用,我们会通过电子邮件告知您

一次一个实验

确认结束所有现有实验并开始此实验

使用无痕浏览模式运行实验

请使用无痕模式或无痕式浏览器窗口运行此实验。这可以避免您的个人账号与学生账号之间发生冲突,这种冲突可能导致您的个人账号产生额外费用。