読み込んでいます...
一致する結果は見つかりませんでした。

Google Cloud コンソールでスキルを試す

700 以上のラボとコースにアクセス

Dataflow と BigQuery を使用した Google Cloud での ETL 処理(Python)

ラボ 1時間 30分 universal_currency_alt クレジット: 5 show_chart 中級
info このラボでは、学習をサポートする AI ツールが組み込まれている場合があります。
700 以上のラボとコースにアクセス

GSP290

Google Cloud セルフペース ラボのロゴ

概要

Dataflow は、統合されたストリーム データ処理とバッチデータ処理を大規模に実現できる Google Cloud サービスです。Dataflow は Apache Beam プロジェクトに基づいて構築されています。Apache Beam プロジェクトは、バッチデータ処理とストリーミングデータ処理を並列して実行できるパイプラインを定義するためのオープンソース モデルです。オープンソースの Apache Beam SDK のいずれかを使用して、パイプラインを定義するプログラムを構築し、Dataflow を使用してそのパイプラインを実行できます。

このラボでは、Apache Beam SDK for Python を使用してパイプラインを構築し、Dataflow 内で実行します。このパイプラインは、Cloud Storage から BigQuery にデータを取り込み、BigQuery でデータを変換、拡充します。

注: 指示があった場合は、Python ファイルを開き、コメントを読んでください。これにより、コードの動作を把握できます。

演習内容

このラボでは、次の方法について学びます。

  • Dataflow パイプライン(Python)を構築して実行し、Cloud Storage から BigQuery にデータを取り込む。
  • Dataflow パイプライン(Python)を構築して実行し、BigQuery でデータを変換、拡充する。
  • Dataflow パイプライン(Python)を構築して実行し、BigQuery でデータを結合して、その結果を新しいテーブルに書き込む。

設定と要件

[ラボを開始] ボタンをクリックする前に

こちらの説明をお読みください。ラボには時間制限があり、一時停止することはできません。タイマーは、Google Cloud のリソースを利用できる時間を示しており、[ラボを開始] をクリックするとスタートします。

このハンズオンラボでは、シミュレーションやデモ環境ではなく実際のクラウド環境を使って、ラボのアクティビティを行います。そのため、ラボの受講中に Google Cloud にログインおよびアクセスするための、新しい一時的な認証情報が提供されます。

このラボを完了するためには、下記が必要です。

  • 標準的なインターネット ブラウザ(Chrome を推奨)
注: このラボの実行には、シークレット モード(推奨)またはシークレット ブラウジング ウィンドウを使用してください。これにより、個人アカウントと受講者アカウント間の競合を防ぎ、個人アカウントに追加料金が発生しないようにすることができます。
  • ラボを完了するための時間(開始後は一時停止できません)
注: このラボでは、受講者アカウントのみを使用してください。別の Google Cloud アカウントを使用すると、そのアカウントに料金が発生する可能性があります。

ラボを開始して Google Cloud コンソールにログインする方法

  1. [ラボを開始] ボタンをクリックします。ラボの料金をお支払いいただく必要がある場合は、表示されるダイアログでお支払い方法を選択してください。 左側の [ラボの詳細] ペインには、以下が表示されます。

    • [Google Cloud コンソールを開く] ボタン
    • 残り時間
    • このラボで使用する必要がある一時的な認証情報
    • このラボを行うために必要なその他の情報(ある場合)
  2. [Google Cloud コンソールを開く] をクリックします(Chrome ブラウザを使用している場合は、右クリックして [シークレット ウィンドウで開く] を選択します)。

    ラボでリソースがスピンアップし、別のタブで [ログイン] ページが表示されます。

    ヒント: タブをそれぞれ別のウィンドウで開き、並べて表示しておきましょう。

    注: [アカウントの選択] ダイアログが表示されたら、[別のアカウントを使用] をクリックします。
  3. 必要に応じて、下のユーザー名をコピーして、[ログイン] ダイアログに貼り付けます。

    {{{user_0.username | "Username"}}}

    [ラボの詳細] ペインでもユーザー名を確認できます。

  4. [次へ] をクリックします。

  5. 以下のパスワードをコピーして、[ようこそ] ダイアログに貼り付けます。

    {{{user_0.password | "Password"}}}

    [ラボの詳細] ペインでもパスワードを確認できます。

  6. [次へ] をクリックします。

    重要: ラボで提供された認証情報を使用する必要があります。Google Cloud アカウントの認証情報は使用しないでください。 注: このラボでご自身の Google Cloud アカウントを使用すると、追加料金が発生する場合があります。
  7. その後次のように進みます。

    • 利用規約に同意してください。
    • 一時的なアカウントなので、復元オプションや 2 要素認証プロセスは設定しないでください。
    • 無料トライアルには登録しないでください。

その後、このタブで Google Cloud コンソールが開きます。

注: Google Cloud のプロダクトやサービスにアクセスするには、ナビゲーション メニューをクリックするか、[検索] フィールドにサービス名またはプロダクト名を入力します。 ナビゲーション メニュー アイコンと検索フィールド

Cloud Shell をアクティブにする

Cloud Shell は、開発ツールと一緒に読み込まれる仮想マシンです。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働します。Cloud Shell を使用すると、コマンドラインで Google Cloud リソースにアクセスできます。

  1. Google Cloud コンソールの上部にある「Cloud Shell をアクティブにする」アイコン 「Cloud Shell をアクティブにする」アイコン をクリックします。

  2. ウィンドウで次の操作を行います。

    • Cloud Shell 情報ウィンドウで操作を進めます。
    • Cloud Shell が認証情報を使用して Google Cloud API を呼び出すことを承認します。

接続した時点で認証が完了しており、プロジェクトに各自の Project_ID が設定されます。出力には、このセッションの PROJECT_ID を宣言する次の行が含まれています。

Your Cloud Platform project in this session is set to {{{project_0.project_id | "PROJECT_ID"}}}

gcloud は Google Cloud のコマンドライン ツールです。このツールは、Cloud Shell にプリインストールされており、タブ補完がサポートされています。

  1. (省略可)次のコマンドを使用すると、有効なアカウント名を一覧表示できます。
gcloud auth list
  1. [承認] をクリックします。

出力:

ACTIVE: * ACCOUNT: {{{user_0.username | "ACCOUNT"}}} To set the active account, run: $ gcloud config set account `ACCOUNT`
  1. (省略可)次のコマンドを使用すると、プロジェクト ID を一覧表示できます。
gcloud config list project

出力:

[core] project = {{{project_0.project_id | "PROJECT_ID"}}} 注: Google Cloud における gcloud ドキュメントの全文については、gcloud CLI の概要ガイドをご覧ください。

タスク 1. Dataflow API が有効になっていることを確認する

必要な API にアクセスできることを確認するには、Dataflow API への接続をリセットします。

重要: API が現在有効になっている場合でも、API を再起動するために、以下の手順 1~4 を行い、API を無効にしてから再度有効にしてください。
  1. Google Cloud コンソールのタイトルバーにある [検索] フィールドに「Dataflow API」と入力し、検索結果から [Dataflow API] をクリックします。

  2. [管理] をクリックします。

  3. [API を無効にする] をクリックします。

確認を求められたら、[無効にする] をクリックします。

  1. [有効にする] をクリックします。

API が再度有効になると、ページに [無効にする] オプションが表示されます。

[進行状況を確認] をクリックして、実行したタスクを確認します。

Dataflow API を無効にし、再度有効にする

タスク 2. スターター コードをダウンロードする

このラボで使用する Dataflow Python の例をダウンロードします。

  1. Cloud Shell で以下のコマンドを実行して、Google Cloud のプロフェッショナル サービスの GitHub から Dataflow Python の例を取得します。
gcloud storage cp -r gs://spls/gsp290/dataflow-python-examples .
  1. プロジェクト ID の変数を設定します。
export PROJECT={{{ project_0.project_id }}} gcloud config set project $PROJECT

タスク 3. Cloud Storage バケットを作成して、そのバケットにファイルをコピーする

Cloud Shell で Cloud Storage バケットを作成し、そのバケットにファイルをコピーします。これらのファイルは Dataflow Python の例です。

Cloud Storage バケットを作成する

  • 引き続き Cloud Shell で、バケット作成コマンドを使用して、プロジェクト内で リージョンに新しいリージョン バケットを作成します。
gcloud storage buckets create gs://$PROJECT --location={{{ project_0.default_region | REGION }}}

[進行状況を確認] をクリックして、実行したタスクを確認します。

Cloud Storage バケットを作成する。

ファイルをバケットにコピーする

  • Cloud Shell で gsutil コマンドを使用して、先ほど作成した Cloud Storage バケットにファイルをコピーします。
gcloud storage cp gs://spls/gsp290/data_files/usa_names.csv gs://$PROJECT/data_files/ gcloud storage cp gs://spls/gsp290/data_files/head_usa_names.csv gs://$PROJECT/data_files/

[進行状況を確認] をクリックして、実行したタスクを確認します。

ファイルをバケットにコピーする。

タスク 4. BigQuery データセットを作成する

このタスクでは、BigQuery データセットを作成します。BigQuery データセットは、BigQuery 内でテーブルが作成される場所です。

Cloud Shell で、lake という名前のデータセットを作成します。

bq mk lake

[進行状況を確認] をクリックして、実行したタスクを確認します。

lake という名前の BigQuery データセットを作成する。

タスク 5. データ取り込みパイプラインを確認して実行する

このタスクでは、パイプライン コードを確認して、その動作を理解します。次に、パイプラインを設定して実行します。

データ取り込みパイプラインは、TextIO ソースと BigQueryIO 宛先を使用して、Cloud Storage から BigQuery テーブルにデータを取り込みます。このパイプラインが実行する具体的な処理は以下のとおりです。

  • Cloud Storage からファイルを取り込む。
  • ファイルのヘッダー行を除外する。
  • 読み取った行を辞書オブジェクトに変換する。
  • BigQuery に行を出力する。

データ取り込みパイプラインの Python コードを確認する

Cloud Shell コードエディタを使用して、パイプラインのコードを確認します。

  1. Cloud Shell のメニューバーで、[エディタを開く] をクリックします。

  2. dataflow_python_examples > dataflow_python_examples に移動し、data_ingestion.py ファイルを開きます。

  3. コードを実行した際の挙動を説明したファイル内のコメントを読みます。

このコードは、Cloud Storage のデータファイルを使用して BigQuery テーブルにデータを入力します。

  1. Cloud Shell に戻るには、[ターミナルを開く] をクリックします。

Dataflow ジョブ用の Docker コンテナを設定する

  1. Cloud Shell セッションに戻り、必要な Python ライブラリを設定します。

このラボの Dataflow ジョブには Python 3.8 が必要です。適切なバージョンで作業するには、Python 3.8 Docker コンテナで Dataflow プロセスを実行します。

  1. Cloud Shell で以下のコマンドを実行して、Python コンテナを起動します。
cd ~ docker run -it -e PROJECT=$PROJECT -v $(pwd)/dataflow-python-examples:/dataflow python:3.8 /bin/bash

このコマンドによって、Docker コンテナと Python 3.8 の最新の安定版が pull され、コマンドシェルが開き、コンテナ内の次のコマンドが実行されます。-v フラグでソースコードをコンテナの volume として指定しているため、Cloud Shell エディタで編集できるうえ、実行中のコンテナ内でもアクセスできます。

  1. コンテナが pull されて Cloud Shell で実行され始めたら、以下のコマンドを実行し、実行中のコンテナに apache-beam をインストールします。
pip install apache-beam[gcp]==2.59.0
  1. Cloud Shell で実行中のコンテナで、ソースコードがリンクされているディレクトリに移動します。
cd dataflow/
  1. コンテナ内でプロジェクト ID を設定します。
export PROJECT={{{ project_0.project_id }}}

クラウドでデータ取り込みパイプラインを実行する

  1. 次のコードを実行して、データ取り込みパイプラインを実行します。
python dataflow_python_examples/data_ingestion.py \ --project=$PROJECT \ --region={{{ project_0.default_region | REGION }}} \ --runner=DataflowRunner \ --machine_type=e2-standard-2 \ --staging_location=gs://$PROJECT/test \ --temp_location gs://$PROJECT/test \ --input gs://$PROJECT/data_files/head_usa_names.csv \ --save_main_session

このコードは、必要なワーカーをスピンアップし、パイプラインが完了したらそのワーカーをシャットダウンします。

  1. コンソールのタイトルバーにある [検索] フィールドに「Dataflow」と入力し、検索結果から [Dataflow] をクリックします。

Dataflow ページが開いたら、ジョブのステータスを確認します。

  1. ジョブの名前をクリックして進行状況を確認します。

[ジョブ ステータス] が「完了しました」になったら、次のステップに進みます。この取り込みパイプラインは、起動して処理が完了し、シャットダウンするまでに約 5 分かかります。

  1. BigQuery(ナビゲーション メニュー > [BigQuery])に移動して、データが入力されていることを確認します。

  2. プロジェクト名をクリックして、lake データセットの usa_names テーブルを表示します。

usa_names テーブル

  1. そのテーブルをクリックし、[プレビュー] タブに移動して、usa_names データの例を確認します。
注: usa_names テーブルが表示されない場合は、ページを更新するか、従来の BigQuery UI を使用してテーブルを表示してください。

[進行状況を確認] をクリックして、実行したタスクを確認します。

データ取り込みパイプラインを構築する。

タスク 6. データ変換パイプラインを確認して実行する

このタスクでは、データ変換パイプラインを確認して、その動作を理解します。次に、パイプラインを実行して Cloud Storage ファイルを処理し、その結果を BigQuery に出力します。

データ変換パイプラインも、TextIO ソースと BigQueryIO 宛先を使用して Cloud Storage から BigQuery テーブルにデータを取り込みますが、さらにデータ変換を行います。このパイプラインが実行する具体的な処理は以下のとおりです。

  • Cloud Storage からファイルを取り込む。
  • 読み取った行を辞書オブジェクトに変換する。
  • 年を含むデータを、BigQuery が日付として認識できる形式に変換する。
  • BigQuery に行を出力する。

データ変換パイプラインの Python コードを確認する

  • コードエディタで data_transformation.py を開きます。

コードを実行した際の挙動を説明したファイル内のコメントを読みます。

クラウドでデータ変換パイプラインを実行する

  1. 次のコードを実行して、データ変換パイプラインを実行します。
python dataflow_python_examples/data_transformation.py \ --project=$PROJECT \ --region={{{ project_0.default_region | REGION }}} \ --runner=DataflowRunner \ --machine_type=e2-standard-2 \ --staging_location=gs://$PROJECT/test \ --temp_location gs://$PROJECT/test \ --input gs://$PROJECT/data_files/head_usa_names.csv \ --save_main_session
  1. Google Cloud コンソールのタイトルバーにある [検索] フィールドに「Dataflow」と入力し、検索結果から [Dataflow] をクリックします。

  2. このジョブの名前をクリックして、ジョブのステータスを確認します。

この Dataflow パイプラインは、起動して処理が完了し、シャットダウンするまでに約 5 分かかります。

  1. Dataflow のジョブ ステータス画面で [ジョブ ステータス] が「完了しました」になったら、BigQuery に移動してデータが入力されていることを確認します。

lake データセットに usa_names_transformed テーブルが表示されます。

  1. そのテーブルをクリックし、[プレビュー] タブに移動して、usa_names_transformed データの例を確認します。
注: usa_names_transformed テーブルが表示されない場合は、ページを更新するか、従来の BigQuery UI を使用してテーブルを表示してください。

[進行状況を確認] をクリックして、実行したタスクを確認します。

データ変換パイプラインを構築する。

タスク 7. データ拡充パイプラインを確認して実行する

次の処理を行うデータ拡充パイプラインを構築します。

  • Cloud Storage からファイルを取り込む
  • ファイルのヘッダー行を除外する
  • 読み取った行を辞書オブジェクトに変換する
  • BigQuery に行を出力する

データ拡充パイプラインの Python コードを確認して編集する

  1. コードエディタで data_enrichment.py を開きます。

  2. コードを実行した際の挙動を説明したコメントを確認します。このコードは BigQuery にデータを入力します。

83 行目は以下のようになっています。

values = [x.decode('utf8') for x in csv_row]
  1. これを次のように編集します。
values = [x for x in csv_row]
  1. この行を編集し終えたら、コードエディタの [File] オプションを選択して [Save] をクリックし、更新したファイルを保存してください。

データ拡充パイプラインを実行する

  1. 次のコードを実行して、データ拡充パイプラインを実行します。
python dataflow_python_examples/data_enrichment.py \ --project=$PROJECT \ --region={{{ project_0.default_region | REGION }}} \ --runner=DataflowRunner \ --machine_type=e2-standard-2 \ --staging_location=gs://$PROJECT/test \ --temp_location gs://$PROJECT/test \ --input gs://$PROJECT/data_files/head_usa_names.csv \ --save_main_session
  1. Dataflow ページでジョブをクリックして、ジョブ ステータスを表示します。

この Dataflow パイプラインは、起動して処理が完了し、シャットダウンするまでに約 5 分かかります。

  1. Dataflow のジョブ ステータス画面で [ジョブ ステータス] が「完了しました」になったら、コンソールで ナビゲーション メニュー > [BigQuery] をクリックして、データが入力されていることを確認します。

lake データセットに usa_names_enriched テーブルが表示されます。

  1. そのテーブルをクリックし、[プレビュー] タブに移動して、usa_names_enriched データの例を確認します。
注: usa_names_enriched テーブルが表示されない場合は、ページを更新するか、従来の BigQuery UI を使用してテーブルを表示してください。

[進行状況を確認] をクリックして、実行したタスクを確認します。

データ拡充パイプラインを構築する。

タスク 8. データレイクからデータマートへのパイプラインを確認して実行する

2 つの BigQuery データソースからデータを読み取りデータソースを結合する Dataflow パイプラインを構築します。具体的には以下を行います。

  • 2 つの BigQuery ソースからファイルを取り込む
  • 2 つのデータソースを結合する
  • ファイルのヘッダー行を除外する
  • 読み取った行を辞書オブジェクトに変換する
  • BigQuery に行を出力する

データ取り込みパイプラインを実行してデータを結合し、その結果作られるテーブルを BigQuery に書き込む

まず、data_lake_to_mart.py のコードを確認して、その処理内容を理解します。次に、クラウドでパイプラインを実行します。

  1. コードエディタdata_lake_to_mart.py ファイルを開きます。

コードを実行した際の挙動を説明したファイル内のコメントを読みます。このコードは、2 つのテーブルを結合し、その結果を BigQuery の新しいテーブルに書き込みます。

  1. 次のコードブロックを実行してパイプラインを実行します。
python dataflow_python_examples/data_lake_to_mart.py \ --worker_disk_type="compute.googleapis.com/projects//zones//diskTypes/pd-ssd" \ --max_num_workers=4 \ --project=$PROJECT \ --runner=DataflowRunner \ --machine_type=e2-standard-2 \ --staging_location=gs://$PROJECT/test \ --temp_location gs://$PROJECT/test \ --save_main_session \ --region={{{ project_0.default_region | REGION }}}
  1. Google Cloud コンソールのタイトルバーにある [検索] フィールドに「Dataflow」と入力し、検索結果から [Dataflow] をクリックします。

  2. この新しいジョブをクリックしてステータスを確認します。

この Dataflow パイプラインは、起動して処理が完了し、シャットダウンするまでに約 5 分かかります。

  1. Dataflow のジョブ ステータス画面で [ジョブ ステータス] が「完了しました」になったら、ナビゲーション メニュー > [BigQuery] をクリックして、データが入力されていることを確認します。

lake データセットに orders_denormalized_sideinput テーブルが表示されます。

  1. そのテーブルをクリックし、[プレビュー] タブに移動して、orders_denormalized_sideinput データの例を確認します。
注: orders_denormalized_sideinput テーブルが表示されない場合は、ページを更新するか、従来の BigQuery UI を使用してテーブルを表示してください。

[進行状況を確認] をクリックして、実行したタスクを確認します。

データレイクからデータマートへの Dataflow パイプラインを構築する

理解度チェック

今回のラボで学習した内容の理解を深めていただくために、以下の選択問題を用意しました。正解を目指して頑張ってください。

お疲れさまでした

Dataflow を使用して Python コードを実行し、Cloud Storage から BigQuery にデータを取り込み、BigQuery 内のデータを変換、拡充しました。

次のステップと詳細情報

さらに情報を探す場合は、以下の公式ドキュメントをご確認ください。

Google Cloud トレーニングと認定資格

Google Cloud トレーニングと認定資格を通して、Google Cloud 技術を最大限に活用できるようになります。必要な技術スキルとベスト プラクティスについて取り扱うクラスでは、学習を継続的に進めることができます。トレーニングは基礎レベルから上級レベルまであり、オンデマンド、ライブ、バーチャル参加など、多忙なスケジュールにも対応できるオプションが用意されています。認定資格を取得することで、Google Cloud テクノロジーに関するスキルと知識を証明できます。

マニュアルの最終更新日: 2025 年 4 月 1 日

ラボの最終テスト日: 2025 年 4 月 1 日

Copyright 2025 Google LLC. All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。

始める前に

  1. ラボでは、Google Cloud プロジェクトとリソースを一定の時間利用します
  2. ラボには時間制限があり、一時停止機能はありません。ラボを終了した場合は、最初からやり直す必要があります。
  3. 画面左上の [ラボを開始] をクリックして開始します

シークレット ブラウジングを使用する

  1. ラボで使用するユーザー名パスワードをコピーします
  2. プライベート モードで [コンソールを開く] をクリックします

コンソールにログインする

    ラボの認証情報を使用して
  1. ログインします。他の認証情報を使用すると、エラーが発生したり、料金が発生したりする可能性があります。
  2. 利用規約に同意し、再設定用のリソースページをスキップします
  3. ラボを終了する場合や最初からやり直す場合を除き、[ラボを終了] はクリックしないでください。クリックすると、作業内容がクリアされ、プロジェクトが削除されます

このコンテンツは現在ご利用いただけません

利用可能になりましたら、メールでお知らせいたします

ありがとうございます。

利用可能になりましたら、メールでご連絡いたします

1 回に 1 つのラボ

既存のラボをすべて終了して、このラボを開始することを確認してください

シークレット ブラウジングを使用してラボを実行する

このラボの実行には、シークレット モードまたはシークレット ブラウジング ウィンドウを使用してください。これにより、個人アカウントと受講者アカウントの競合を防ぎ、個人アカウントに追加料金が発生することを防ぎます。