arrow_back

Dataflow と BigQuery を使用した Google Cloud での ETL 処理

参加 ログイン

Dataflow と BigQuery を使用した Google Cloud での ETL 処理

1時間 クレジット: 5

GSP290

Google Cloud セルフペース ラボ

概要

このラボでは、以下の Google Cloud サービスを使用して複数のデータ パイプラインを構築し、一般公開されているデータセットから BigQuery にデータを取り込みます。

  • Cloud Storage
  • Dataflow
  • BigQuery

固有のデータ パイプライン(設計上の考慮事項と実装の詳細を反映)を作成して、プロトタイプが要件を満たすようにします。指示がある場合は、Python ファイルを開き、コメントを読んでください。

設定

[ラボを開始] ボタンをクリックする前に

こちらの手順をお読みください。ラボの時間は記録されており、一時停止することはできません。[ラボを開始] をクリックするとスタートするタイマーは、Google Cloud のリソースを利用できる時間を示しています。

この Qwiklabs ハンズオンラボでは、シミュレーションやデモ環境ではなく、実際のクラウド環境を使ってご自身でラボのアクティビティを行うことができます。そのため、ラボの受講中に Google Cloud にログインおよびアクセスするための、新しい一時的な認証情報が提供されます。

必要なもの

このラボを完了するためには、下記が必要です。

  • 標準的なインターネット ブラウザ(Chrome を推奨)
  • ラボを完了するために十分な時間

注: すでに個人の Google Cloud アカウントやプロジェクトをお持ちの場合でも、ラボでは使用しないでください。

注: Chrome OS デバイスを使用している場合は、シークレット ウィンドウを開いてこのラボを実行してください。

ラボを開始して Google Cloud コンソールにログインする方法

  1. [ラボを開始] ボタンをクリックします。ラボの料金をお支払いいただく必要がある場合は、表示されるポップアップでお支払い方法を選択してください。 左側の [ラボの詳細] パネルには、以下が表示されます。

    • [Google コンソールを開く] ボタン
    • 残り時間
    • このラボで使用する必要がある一時的な認証情報
    • このラボを行うために必要なその他の情報(ある場合)
  2. [Google コンソールを開く] をクリックします。 ラボでリソースが起動し、別のタブで [ログイン] ページが表示されます。

    ヒント: タブをそれぞれ別のウィンドウで開き、並べて表示しておきましょう。

    注: [アカウントの選択] ダイアログが表示されたら、[別のアカウントを使用] をクリックします。
  3. 必要に応じて、[ラボの詳細] パネルから [ユーザー名] をコピーして [ログイン] ダイアログに貼り付けます。[次へ] をクリックします。

  4. [ラボの詳細] パネルから [パスワード] をコピーして [ようこそ] ダイアログに貼り付けます。[次へ] をクリックします。

    重要: 認証情報は左側のパネルに表示されたものを使用してください。Google Cloud Skills Boost の認証情報は使用しないでください。 注: このラボでご自身の Google Cloud アカウントを使用すると、追加料金が発生する場合があります。
  5. その後次のように進みます。

    • 利用規約に同意してください。
    • 一時的なアカウントなので、復元オプションや 2 要素認証プロセスは設定しないでください。
    • 無料トライアルには登録しないでください。

その後このタブで Cloud Console が開きます。

注: 左上にある [ナビゲーション メニュー] をクリックすると、Google Cloud のプロダクトやサービスのリストが含まれるメニューが表示されます。 ナビゲーション メニュー アイコン

Google Cloud Shell の有効化

Google Cloud Shell は、デベロッパー ツールと一緒に読み込まれる仮想マシンです。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働します。Google Cloud Shell では、コマンドラインで GCP リソースにアクセスできます。

GCP Console の右上のツールバーにある [Cloud Shell をアクティブにする] ボタンをクリックします。

Cloud Shell アイコン

[続行] をクリックします。

cloudshell_continue

環境のプロビジョニングと接続には少し時間がかかります。接続すると、すでに認証されており、プロジェクトは PROJECT_ID に設定されています。例えば:

Cloud Shell 端末

gcloud は Google Cloud Platform のコマンドライン ツールです。このツールは、Cloud Shell にプリインストールされており、タブ補完がサポートされています。

次のコマンドを使用すると、有効なアカウント名を一覧表示できます。

gcloud auth list

出力:

ACTIVE: *
ACCOUNT: student-01-xxxxxxxxxxxx@qwiklabs.net
To set the active account, run:
    $ gcloud config set account `ACCOUNT`
	

次のコマンドを使用すると、プロジェクト ID を一覧表示できます。

gcloud config list project
	

出力:

[core]
project = <project_ID>
	

出力例:

[core]
project = qwiklabs-gcp-44776a13dea667a6
	

Dataflow API が正常に有効になっていることを確認する

必要な API に確実にアクセスするには、Dataflow API への接続を再開します。

  1. Cloud Console で、上部の検索バーに「Dataflow API」と入力します。 Dataflow API の結果をクリックします。
  2. [管理] をクリックします
  3. [API を無効にする] をクリックします。
  4. 確認を求められたら、[無効にする] をクリックします。

  5. [有効にする] をクリックします。

API が再度有効になると、ページに無効にするオプションが表示されます。

dataflow_api.png

スターター コードをダウンロードする

Cloud Shell でセッションを開き、次のコマンドを実行して、GCP のプロフェッショナル サービスの GitHub からコードをチェックアウトします。

gsutil -m cp -R gs://spls/gsp290/dataflow-python-examples .

変数にプロジェクト ID を設定します。<YOUR-PROJECT-ID> は Qwiklabs プロジェクト ID に置き換えてください。

export PROJECT=<YOUR-PROJECT-ID>
gcloud config set project $PROJECT

Cloud Storage バケットを作成する

バケット作成コマンドを使用して、プロジェクト内の us-central1 リージョンに新しいリージョン バケットを作成します。

gsutil mb -c regional -l us-central1 gs://$PROJECT

完了したタスクをテストする

[進行状況を確認] をクリックして、実行したタスクを確認します。

Cloud Storage バケットを作成する

ファイルをバケットにコピーする

gsutil コマンドを使用して、作成した GCS バケットにファイルをコピーします。

gsutil cp gs://spls/gsp290/data_files/usa_names.csv gs://$PROJECT/data_files/
gsutil cp gs://spls/gsp290/data_files/head_usa_names.csv gs://$PROJECT/data_files/

完了したタスクをテストする

[進行状況を確認] をクリックして、実行したタスクを確認します。

ファイルをバケットにコピーする

BigQuery データセットを作成する

BigQuery 内に lake というデータセットを作成します。BigQuery で使用するテーブルはすべてここに読み込まれます。

bq mk lake

完了したタスクをテストする

[進行状況を確認] をクリックして、実行したタスクを確認します。

BigQuery データセット(名前: lake)を作成する

Dataflow パイプラインを構築する

このセクションでは、データの追加のみ可能な Dataflow を作成し、BigQuery テーブルにデータを取り込みます。組み込みのコードエディタを使用すると、GCP Console でコードを表示、編集できます。

3b1ddc75045cae6b.png

コードエディタを開く

Cloud Shell のエディタを開くのアイコンをクリックしてソースコードに移動します。

editor.png

プロンプトが表示されたら、[新しいウィンドウで開く] をクリックします。新しいウィンドウでコードエディタが開きます。

データの取り込み

データの読み取りに TextIO、書き込みに BigQueryIO を使用して Dataflow パイプラインを構築し、BigQuery にデータを取り込みます。具体的には以下を行います。

  • Cloud Storage からファイルを取り込む
  • ファイルのヘッダー行を除外する
  • 読み取った行を辞書オブジェクトに変換する
  • BigQuery に行を出力する

パイプラインの Python コードを確認する

コードエディタで、dataflow-python-examples > dataflow_python_examples に移動し、data_ingestion.py ファイルを開きます。コードの機能を説明したファイル内のコメントに目を通します。このコードでは、BigQuery にデータが入力されます。

21efa9aaf4e0d11.png

Apache Beam パイプラインを実行する

このステップでは Google Cloud Shell セッションに戻ります。必要な Python ライブラリの設定を行います。

このラボの Dataflow ジョブには Python3.7 が必要です。適切なバージョンを使用するには、Python 3.7 Docker コンテナでプロセスを実行します。

クラウドシェルで次を実行します:

docker run -it -e PROJECT=$PROJECT -v $(pwd)/dataflow-python-examples:/dataflow python:3.7 /bin/bash

このコマンドは、Python 3.7 の最新の安定バージョンを使用して Docker コンテナを抽出し、コマンドシェルを実行して、コンテナ内で次のコマンドを実行します。-v フラグは、コンテナの volume としてソースコードを提供するため、Cloud Shell エディタで編集しても、コンテナ内でアクセスできます。

アーカイブの抽出が完了したら、次のコマンドを実行して apache-beam をインストールします。

pip install apache-beam[gcp]==2.24.0

次に、ディレクトリをソースコードをリンクした場所に変更します。

cd dataflow/

クラウドで Dataflow パイプラインを実行します。

以下のコマンドを実行すると必要なワーカーが起動され、完了するとシャットダウンされます。

python dataflow_python_examples/data_ingestion.py --project=$PROJECT --region=us-central1 --runner=DataflowRunner --staging_location=gs://$PROJECT/test --temp_location gs://$PROJECT/test --input gs://$PROJECT/data_files/head_usa_names.csv --save_main_session

Cloud Console に戻り、ナビゲーション メニュー > [Dataflow] を開いて、ジョブのステータスを確認します。

df.png

ジョブの名前をクリックして進行状況を確認します。[ジョブ ステータス] が「完了しました」になったら、BigQuery(ナビゲーション メニュー > [BigQuery])に移動して、データが入力されていることを確認します。

bq.png

プロジェクト名をクリックして、lake データセットの usa_names テーブルを表示します。

bq_lake_one.png

そのテーブルをクリックし、[プレビュー] タブに移動して、usa_names のデータの例を確認します。

完了したタスクをテストする

[進行状況を確認] をクリックして、実行したタスクを確認します。

「データの取り込み」の Dataflow パイプラインをビルドする

データの変換

データの読み取りに TextIO、書き込みに BigQueryIO を使用して Dataflow パイプラインを構築し、BigQuery にデータを取り込みます。具体的には以下を行います。

  • Cloud Storage からファイルを取り込む
  • 読み取った行を辞書オブジェクトに変換する
  • 年を含むデータを、BigQuery が日付として認識できる形式に変換する
  • BigQuery に行を出力する

パイプラインの Python コードを確認する

data_transformation.py に移動し、コードエディタで開きます。コードの機能を説明したファイル内のコメントに目を通します。

Apache Beam パイプラインを実行する

クラウドで Dataflow パイプラインを実行します。以下のコマンドを実行すると必要なワーカーが起動され、完了するとシャットダウンされます。

次のコマンドを実行します。

python dataflow_python_examples/data_transformation.py --project=$PROJECT --region=us-central1 --runner=DataflowRunner --staging_location=gs://$PROJECT/test --temp_location gs://$PROJECT/test --input gs://$PROJECT/data_files/head_usa_names.csv --save_main_session

ナビゲーション メニュー > [Dataflow] に移動し、このジョブの名前をクリックしてステータスを確認します。

Dataflow のジョブ ステータス画面で [ジョブ ステータス] が「完了しました」になったら、BigQuery に移動してデータが入力されていることを確認します。

lake データセットに usa_names_transformed テーブルが表示されます。

そのテーブルをクリックし、[プレビュー] タブに移動して、usa_names_transformed のデータの例を確認します。

完了したタスクをテストする

[進行状況を確認] をクリックして、実行したタスクを確認します。

「データの変換」の Dataflow パイプラインを構築する

データ拡充

データの読み取りに TextIO、書き込みに BigQueryIO を使用して Dataflow パイプラインを構築し、BigQuery にデータを取り込みます。具体的には以下を行います。

  • Cloud Storage からファイルを取り込む
  • ファイルのヘッダー行を除外する
  • 読み取った行を辞書オブジェクトに変換する
  • BigQuery に行を出力する

パイプラインの Python コードを確認する

data_enrichment.py に移動し、コードエディタで開きます。コードの機能を説明したコメントを確認します。このコードでは、BigQuery にデータが入力されます。

現在、行 83 は次のように見えます

values = [x.decode('utf8') for x in csv_row]

次のように編集します

values = [x for x in csv_row]

Apache Beam パイプラインを実行する

ここでは、クラウドで Dataflow パイプラインを実行します。以下のコマンドを実行すると必要なワーカーが起動され、完了するとシャットダウンされます。

python dataflow_python_examples/data_enrichment.py --project=$PROJECT --region=us-central1 --runner=DataflowRunner --staging_location=gs://$PROJECT/test --temp_location gs://$PROJECT/test --input gs://$PROJECT/data_files/head_usa_names.csv --save_main_session

ナビゲーション メニュー > [Dataflow] に移動して、ジョブのステータスを確認します。

Dataflow のジョブ ステータス画面で [ジョブ ステータス] が「完了しました」になったら、BigQuery に移動してデータが入力されていることを確認します。

lake データセットに usa_names_enriched テーブルが表示されます。

そのテーブルをクリックし、[プレビュー] タブに移動して、usa_names_enriched のデータの例を確認します。

完了したタスクをテストする

[進行状況を確認] をクリックして、実行したタスクを確認します。

「データ拡充」の Dataflow パイプラインをビルドする

データレイクからデータマートへ

次は、2 つの BigQuery データソースからデータを読み取り、データソースを結合するDataflowパイプラインを作成します。具体的には以下を行います。

  • 2 つの BigQuery からファイルを取り込む
  • 2 つのデータソースを結合する
  • ファイルのヘッダー行を除外する
  • 読み取った行を辞書オブジェクトに変換する
  • BigQuery に行を出力する

パイプラインの Python コードを確認する

data_lake_to_mart.py に移動し、コードエディタで開きます。コードの機能を説明したファイル内のコメントに目を通します。このコードでは、BigQuery にデータが入力されます。

Apache Beam パイプラインを実行する

ここでは、クラウドで Dataflow パイプラインを実行します。以下のコマンドを実行すると必要なワーカーが起動され、完了するとシャットダウンされます。

python dataflow_python_examples/data_lake_to_mart.py --worker_disk_type="compute.googleapis.com/projects//zones//diskTypes/pd-ssd" --max_num_workers=4 --project=$PROJECT --runner=DataflowRunner --staging_location=gs://$PROJECT/test --temp_location gs://$PROJECT/test --save_main_session --region=us-central1

ナビゲーション メニュー > [Dataflow] に移動し、この新しいジョブの名前をクリックしてステータスを確認します。

Dataflow のジョブ ステータス画面で [ジョブ ステータス] が「完了しました」になったら、BigQuery に移動してデータが入力されていることを確認します。

lake データセットに orders_denormalized_sideinput テーブルが表示されます。

そのテーブルをクリックし、[プレビュー] タブに移動して、orders_denormalized_sideinput のデータの例を確認します。

完了したタスクをテストする

[進行状況を確認] をクリックして、実行したタスクを確認します。

「データレイクからデータマートへ」の Dataflow パイプラインを構築する

理解度を確認する

今回のラボで学習した内容の理解を深めるため、以下の選択問題を用意しました。正解を目指して頑張ってください。

お疲れさまでした

Python ファイルを使用して、Dataflow で BigQuery にデータを取り込みました。

クエストの修了

Data_Engineering_badge_125.png

このセルフペースラボは Qwiklabs 「Data Engineering」クエストの一部です。クエストとは学習パスを構成する一連のラボのことで、完了すると成果が認められて上のようなバッジが贈られます。バッジは公開して、オンライン レジュメやソーシャル メディア アカウントにリンクできます。 このラボの修了後、次のクエストに登録すれば、すぐにクレジットを受け取ることができます。受講可能なその他の Qwiklabs のクエストもご確認ください。

次のラボを受講

Predict Visitor Purchases with a Classification Model in BQML でクエストを続行してください。または、次のクエストもご確認ください。

次のステップと詳細情報

さらに情報を探す場合は、以下の公式ドキュメントをご確認ください。

Google Cloud Training & Certification

Google Cloud 技術を最大限に活用できるようになります。このクラスでは、必要な技術力とベスト プラクティスを習得し、継続的に学習することができます。トレーニングは基礎レベルから上級レベルまであり、オンデマンド、ライブ、仮想環境など、多忙なスケジュールに対応できるオプションが用意されています。認定資格を取得することで、Google Cloud の技術のスキルと知識を証明できます。

マニュアルの最終更新日: 2022 年 03 月 16 日
ラボの最終テスト日: 2022 年 03 月 16 日

Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。