チェックポイント
Create a Cloud Storage Bucket
/ 20
Copy Files to Your Bucket
/ 10
Create the BigQuery Dataset (name: lake)
/ 20
Build a Data Ingestion Dataflow Pipeline
/ 10
Build a Data Transformation Dataflow Pipeline
/ 10
Build a Data Enrichment Dataflow Pipeline
/ 10
Build a Data lake to Mart Dataflow Pipeline
/ 20
Dataflow と BigQuery を使用した Google Cloud での ETL 処理
GSP290
概要
このラボでは、以下の Google Cloud サービスを使用して複数のデータ パイプラインを構築し、一般公開されているデータセットから BigQuery にデータを取り込みます。
- Cloud Storage
- Dataflow
- BigQuery
固有のデータ パイプライン(設計上の考慮事項と実装の詳細を反映)を作成して、プロトタイプが要件を満たすようにします。指示がある場合は、Python ファイルを開き、コメントを読んでください。
設定
[ラボを開始] ボタンをクリックする前に
こちらの手順をお読みください。ラボの時間は記録されており、一時停止することはできません。[ラボを開始] をクリックするとスタートするタイマーは、Google Cloud のリソースを利用できる時間を示しています。
この Qwiklabs ハンズオンラボでは、シミュレーションやデモ環境ではなく、実際のクラウド環境を使ってご自身でラボのアクティビティを行うことができます。そのため、ラボの受講中に Google Cloud にログインおよびアクセスするための、新しい一時的な認証情報が提供されます。
必要なもの
このラボを完了するためには、下記が必要です。
- 標準的なインターネット ブラウザ(Chrome を推奨)
- ラボを完了するために十分な時間
注: すでに個人の Google Cloud アカウントやプロジェクトをお持ちの場合でも、ラボでは使用しないでください。
注: Chrome OS デバイスを使用している場合は、シークレット ウィンドウを開いてこのラボを実行してください。
ラボを開始して Google Cloud コンソールにログインする方法
-
[ラボを開始] ボタンをクリックします。ラボの料金をお支払いいただく必要がある場合は、表示されるポップアップでお支払い方法を選択してください。 左側の [ラボの詳細] パネルには、以下が表示されます。
- [Google コンソールを開く] ボタン
- 残り時間
- このラボで使用する必要がある一時的な認証情報
- このラボを行うために必要なその他の情報(ある場合)
-
[Google コンソールを開く] をクリックします。 ラボでリソースが起動し、別のタブで [ログイン] ページが表示されます。
ヒント: タブをそれぞれ別のウィンドウで開き、並べて表示しておきましょう。
注: [アカウントの選択] ダイアログが表示されたら、[別のアカウントを使用] をクリックします。 -
必要に応じて、[ラボの詳細] パネルから [ユーザー名] をコピーして [ログイン] ダイアログに貼り付けます。[次へ] をクリックします。
-
[ラボの詳細] パネルから [パスワード] をコピーして [ようこそ] ダイアログに貼り付けます。[次へ] をクリックします。
重要: 認証情報は左側のパネルに表示されたものを使用してください。Google Cloud Skills Boost の認証情報は使用しないでください。 注: このラボでご自身の Google Cloud アカウントを使用すると、追加料金が発生する場合があります。 -
その後次のように進みます。
- 利用規約に同意してください。
- 一時的なアカウントなので、復元オプションや 2 要素認証プロセスは設定しないでください。
- 無料トライアルには登録しないでください。
その後このタブで Cloud Console が開きます。
Google Cloud Shell の有効化
Google Cloud Shell は、デベロッパー ツールと一緒に読み込まれる仮想マシンです。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働します。Google Cloud Shell では、コマンドラインで GCP リソースにアクセスできます。
GCP Console の右上のツールバーにある [Cloud Shell をアクティブにする] ボタンをクリックします。
[続行] をクリックします。
環境のプロビジョニングと接続には少し時間がかかります。接続すると、すでに認証されており、プロジェクトは PROJECT_ID に設定されています。例えば:
gcloud
は Google Cloud Platform のコマンドライン ツールです。このツールは、Cloud Shell にプリインストールされており、タブ補完がサポートされています。
次のコマンドを使用すると、有効なアカウント名を一覧表示できます。
gcloud auth list
出力:
ACTIVE: *
ACCOUNT: student-01-xxxxxxxxxxxx@qwiklabs.net
To set the active account, run:
$ gcloud config set account `ACCOUNT`
次のコマンドを使用すると、プロジェクト ID を一覧表示できます。
gcloud config list project
出力:
[core]
project = <project_ID>
出力例:
[core]
project = qwiklabs-gcp-44776a13dea667a6
Dataflow API が正常に有効になっていることを確認する
必要な API に確実にアクセスするには、Dataflow API への接続を再開します。
- Cloud Console で、上部の検索バーに「Dataflow API」と入力します。 Dataflow API の結果をクリックします。
- [管理] をクリックします
- [API を無効にする] をクリックします。
- [有効にする] をクリックします。
確認を求められたら、[無効にする] をクリックします。
API が再度有効になると、ページに無効にするオプションが表示されます。
スターター コードをダウンロードする
Cloud Shell でセッションを開き、次のコマンドを実行して、GCP のプロフェッショナル サービスの GitHub からコードをチェックアウトします。
gsutil -m cp -R gs://spls/gsp290/dataflow-python-examples .
変数にプロジェクト ID を設定します。<YOUR-PROJECT-ID>
は Qwiklabs プロジェクト ID に置き換えてください。
export PROJECT=<YOUR-PROJECT-ID>
gcloud config set project $PROJECT
Cloud Storage バケットを作成する
バケット作成コマンドを使用して、プロジェクト内の us-central1
リージョンに新しいリージョン バケットを作成します。
gsutil mb -c regional -l us-central1 gs://$PROJECT
完了したタスクをテストする
[進行状況を確認] をクリックして、実行したタスクを確認します。
ファイルをバケットにコピーする
gsutil
コマンドを使用して、作成した GCS バケットにファイルをコピーします。
gsutil cp gs://spls/gsp290/data_files/usa_names.csv gs://$PROJECT/data_files/
gsutil cp gs://spls/gsp290/data_files/head_usa_names.csv gs://$PROJECT/data_files/
完了したタスクをテストする
[進行状況を確認] をクリックして、実行したタスクを確認します。
BigQuery データセットを作成する
BigQuery 内に lake
というデータセットを作成します。BigQuery で使用するテーブルはすべてここに読み込まれます。
bq mk lake
完了したタスクをテストする
[進行状況を確認] をクリックして、実行したタスクを確認します。
Dataflow パイプラインを構築する
このセクションでは、データの追加のみ可能な Dataflow を作成し、BigQuery テーブルにデータを取り込みます。組み込みのコードエディタを使用すると、GCP Console でコードを表示、編集できます。
コードエディタを開く
Cloud Shell のエディタを開くのアイコンをクリックしてソースコードに移動します。
プロンプトが表示されたら、[新しいウィンドウで開く] をクリックします。新しいウィンドウでコードエディタが開きます。
データの取り込み
データの読み取りに TextIO、書き込みに BigQueryIO を使用して Dataflow パイプラインを構築し、BigQuery にデータを取り込みます。具体的には以下を行います。
- Cloud Storage からファイルを取り込む
- ファイルのヘッダー行を除外する
- 読み取った行を辞書オブジェクトに変換する
- BigQuery に行を出力する
パイプラインの Python コードを確認する
コードエディタで、dataflow-python-examples
> dataflow_python_examples
に移動し、data_ingestion.py
ファイルを開きます。コードの機能を説明したファイル内のコメントに目を通します。このコードでは、BigQuery にデータが入力されます。
Apache Beam パイプラインを実行する
このステップでは Google Cloud Shell セッションに戻ります。必要な Python ライブラリの設定を行います。
このラボの Dataflow ジョブには Python3.7
が必要です。適切なバージョンを使用するには、Python 3.7 Docker コンテナでプロセスを実行します。
クラウドシェルで次を実行します:
docker run -it -e PROJECT=$PROJECT -v $(pwd)/dataflow-python-examples:/dataflow python:3.7 /bin/bash
このコマンドは、Python 3.7 の最新の安定バージョンを使用して Docker コンテナを抽出し、コマンドシェルを実行して、コンテナ内で次のコマンドを実行します。-v
フラグは、コンテナの volume
としてソースコードを提供するため、Cloud Shell エディタで編集しても、コンテナ内でアクセスできます。
アーカイブの抽出が完了したら、次のコマンドを実行して apache-beam
をインストールします。
pip install apache-beam[gcp]==2.24.0
次に、ディレクトリをソースコードをリンクした場所に変更します。
cd dataflow/
クラウドで Dataflow パイプラインを実行します。
以下のコマンドを実行すると必要なワーカーが起動され、完了するとシャットダウンされます。
python dataflow_python_examples/data_ingestion.py --project=$PROJECT --region=us-central1 --runner=DataflowRunner --staging_location=gs://$PROJECT/test --temp_location gs://$PROJECT/test --input gs://$PROJECT/data_files/head_usa_names.csv --save_main_session
Cloud Console に戻り、ナビゲーション メニュー > [Dataflow] を開いて、ジョブのステータスを確認します。
ジョブの名前をクリックして進行状況を確認します。[ジョブ ステータス] が「完了しました」になったら、BigQuery(ナビゲーション メニュー > [BigQuery])に移動して、データが入力されていることを確認します。
プロジェクト名をクリックして、lake
データセットの usa_names テーブルを表示します。
そのテーブルをクリックし、[プレビュー] タブに移動して、usa_names
のデータの例を確認します。
完了したタスクをテストする
[進行状況を確認] をクリックして、実行したタスクを確認します。
データの変換
データの読み取りに TextIO、書き込みに BigQueryIO を使用して Dataflow パイプラインを構築し、BigQuery にデータを取り込みます。具体的には以下を行います。
- Cloud Storage からファイルを取り込む
- 読み取った行を辞書オブジェクトに変換する
- 年を含むデータを、BigQuery が日付として認識できる形式に変換する
- BigQuery に行を出力する
パイプラインの Python コードを確認する
data_transformation.py
に移動し、コードエディタで開きます。コードの機能を説明したファイル内のコメントに目を通します。
Apache Beam パイプラインを実行する
クラウドで Dataflow パイプラインを実行します。以下のコマンドを実行すると必要なワーカーが起動され、完了するとシャットダウンされます。
次のコマンドを実行します。
python dataflow_python_examples/data_transformation.py --project=$PROJECT --region=us-central1 --runner=DataflowRunner --staging_location=gs://$PROJECT/test --temp_location gs://$PROJECT/test --input gs://$PROJECT/data_files/head_usa_names.csv --save_main_session
ナビゲーション メニュー > [Dataflow] に移動し、このジョブの名前をクリックしてステータスを確認します。
Dataflow のジョブ ステータス画面で [ジョブ ステータス] が「完了しました」になったら、BigQuery に移動してデータが入力されていることを確認します。
lake
データセットに usa_names_transformed テーブルが表示されます。
そのテーブルをクリックし、[プレビュー] タブに移動して、usa_names_transformed
のデータの例を確認します。
完了したタスクをテストする
[進行状況を確認] をクリックして、実行したタスクを確認します。
データ拡充
データの読み取りに TextIO、書き込みに BigQueryIO を使用して Dataflow パイプラインを構築し、BigQuery にデータを取り込みます。具体的には以下を行います。
- Cloud Storage からファイルを取り込む
- ファイルのヘッダー行を除外する
- 読み取った行を辞書オブジェクトに変換する
- BigQuery に行を出力する
パイプラインの Python コードを確認する
data_enrichment.py
に移動し、コードエディタで開きます。コードの機能を説明したコメントを確認します。このコードでは、BigQuery にデータが入力されます。
現在、行 83 は次のように見えます
values = [x.decode('utf8') for x in csv_row]
次のように編集します
values = [x for x in csv_row]
Apache Beam パイプラインを実行する
ここでは、クラウドで Dataflow パイプラインを実行します。以下のコマンドを実行すると必要なワーカーが起動され、完了するとシャットダウンされます。
python dataflow_python_examples/data_enrichment.py --project=$PROJECT --region=us-central1 --runner=DataflowRunner --staging_location=gs://$PROJECT/test --temp_location gs://$PROJECT/test --input gs://$PROJECT/data_files/head_usa_names.csv --save_main_session
ナビゲーション メニュー > [Dataflow] に移動して、ジョブのステータスを確認します。
Dataflow のジョブ ステータス画面で [ジョブ ステータス] が「完了しました」になったら、BigQuery に移動してデータが入力されていることを確認します。
lake
データセットに usa_names_enriched テーブルが表示されます。
そのテーブルをクリックし、[プレビュー] タブに移動して、usa_names_enriched
のデータの例を確認します。
完了したタスクをテストする
[進行状況を確認] をクリックして、実行したタスクを確認します。
データレイクからデータマートへ
次は、2 つの BigQuery データソースからデータを読み取り、データソースを結合するDataflowパイプラインを作成します。具体的には以下を行います。
- 2 つの BigQuery からファイルを取り込む
- 2 つのデータソースを結合する
- ファイルのヘッダー行を除外する
- 読み取った行を辞書オブジェクトに変換する
- BigQuery に行を出力する
パイプラインの Python コードを確認する
data_lake_to_mart.py
に移動し、コードエディタで開きます。コードの機能を説明したファイル内のコメントに目を通します。このコードでは、BigQuery にデータが入力されます。
Apache Beam パイプラインを実行する
ここでは、クラウドで Dataflow パイプラインを実行します。以下のコマンドを実行すると必要なワーカーが起動され、完了するとシャットダウンされます。
python dataflow_python_examples/data_lake_to_mart.py --worker_disk_type="compute.googleapis.com/projects//zones//diskTypes/pd-ssd" --max_num_workers=4 --project=$PROJECT --runner=DataflowRunner --staging_location=gs://$PROJECT/test --temp_location gs://$PROJECT/test --save_main_session --region=us-central1
ナビゲーション メニュー > [Dataflow] に移動し、この新しいジョブの名前をクリックしてステータスを確認します。
Dataflow のジョブ ステータス画面で [ジョブ ステータス] が「完了しました」になったら、BigQuery に移動してデータが入力されていることを確認します。
lake
データセットに orders_denormalized_sideinput テーブルが表示されます。
そのテーブルをクリックし、[プレビュー] タブに移動して、orders_denormalized_sideinput
のデータの例を確認します。
完了したタスクをテストする
[進行状況を確認] をクリックして、実行したタスクを確認します。
理解度を確認する
今回のラボで学習した内容の理解を深めるため、以下の選択問題を用意しました。正解を目指して頑張ってください。
お疲れさまでした
Python ファイルを使用して、Dataflow で BigQuery にデータを取り込みました。
クエストの修了
このセルフペースラボは Qwiklabs 「Data Engineering」クエストの一部です。クエストとは学習パスを構成する一連のラボのことで、完了すると成果が認められて上のようなバッジが贈られます。バッジは公開して、オンライン レジュメやソーシャル メディア アカウントにリンクできます。 このラボの修了後、次のクエストに登録すれば、すぐにクレジットを受け取ることができます。受講可能なその他の Qwiklabs のクエストもご確認ください。
次のラボを受講
Predict Visitor Purchases with a Classification Model in BQML でクエストを続行してください。または、次のクエストもご確認ください。
- Predict Housing Prices with Tensorflow and AI Platform
- Cloud Composer: Copy BigQuery Tables Across Different Locations
次のステップと詳細情報
さらに情報を探す場合は、以下の公式ドキュメントをご確認ください。
- Google Dataflow
- Google BigQuery
- より高度な概念については、Apache Beam プログラミング ガイドをご覧ください。
Google Cloud Training & Certification
Google Cloud 技術を最大限に活用できるようになります。このクラスでは、必要な技術力とベスト プラクティスを習得し、継続的に学習することができます。トレーニングは基礎レベルから上級レベルまであり、オンデマンド、ライブ、仮想環境など、多忙なスケジュールに対応できるオプションが用意されています。認定資格を取得することで、Google Cloud の技術のスキルと知識を証明できます。
マニュアルの最終更新日: 2022 年 03 月 16 日
ラボの最終テスト日: 2022 年 03 月 16 日
Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。