
始める前に
- ラボでは、Google Cloud プロジェクトとリソースを一定の時間利用します
- ラボには時間制限があり、一時停止機能はありません。ラボを終了した場合は、最初からやり直す必要があります。
- 画面左上の [ラボを開始] をクリックして開始します
Disable and re-enable the Dataflow API
/ 10
Create a Cloud Storage Bucket
/ 10
Copy Files to Your Bucket
/ 10
Create the BigQuery Dataset (name: lake)
/ 20
Build a Data Ingestion Dataflow Pipeline
/ 10
Build a Data Transformation Dataflow Pipeline
/ 10
Build a Data Enrichment Dataflow Pipeline
/ 10
Build a Data lake to Mart Dataflow Pipeline
/ 20
Dataflow は、統合されたストリーム データ処理とバッチデータ処理を大規模に実現できる Google Cloud サービスです。Dataflow は Apache Beam プロジェクトに基づいて構築されています。Apache Beam プロジェクトは、バッチデータ処理とストリーミングデータ処理を並列して実行できるパイプラインを定義するためのオープンソース モデルです。オープンソースの Apache Beam SDK のいずれかを使用して、パイプラインを定義するプログラムを構築し、Dataflow を使用してそのパイプラインを実行できます。
このラボでは、Apache Beam SDK for Python を使用してパイプラインを構築し、Dataflow 内で実行します。このパイプラインは、Cloud Storage から BigQuery にデータを取り込み、BigQuery でデータを変換、拡充します。
このラボでは、次の方法について学びます。
こちらの説明をお読みください。ラボには時間制限があり、一時停止することはできません。タイマーは、Google Cloud のリソースを利用できる時間を示しており、[ラボを開始] をクリックするとスタートします。
このハンズオンラボでは、シミュレーションやデモ環境ではなく実際のクラウド環境を使って、ラボのアクティビティを行います。そのため、ラボの受講中に Google Cloud にログインおよびアクセスするための、新しい一時的な認証情報が提供されます。
このラボを完了するためには、下記が必要です。
[ラボを開始] ボタンをクリックします。ラボの料金をお支払いいただく必要がある場合は、表示されるダイアログでお支払い方法を選択してください。 左側の [ラボの詳細] ペインには、以下が表示されます。
[Google Cloud コンソールを開く] をクリックします(Chrome ブラウザを使用している場合は、右クリックして [シークレット ウィンドウで開く] を選択します)。
ラボでリソースがスピンアップし、別のタブで [ログイン] ページが表示されます。
ヒント: タブをそれぞれ別のウィンドウで開き、並べて表示しておきましょう。
必要に応じて、下のユーザー名をコピーして、[ログイン] ダイアログに貼り付けます。
[ラボの詳細] ペインでもユーザー名を確認できます。
[次へ] をクリックします。
以下のパスワードをコピーして、[ようこそ] ダイアログに貼り付けます。
[ラボの詳細] ペインでもパスワードを確認できます。
[次へ] をクリックします。
その後次のように進みます。
その後、このタブで Google Cloud コンソールが開きます。
Cloud Shell は、開発ツールと一緒に読み込まれる仮想マシンです。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働します。Cloud Shell を使用すると、コマンドラインで Google Cloud リソースにアクセスできます。
Google Cloud コンソールの上部にある「Cloud Shell をアクティブにする」アイコン をクリックします。
ウィンドウで次の操作を行います。
接続した時点で認証が完了しており、プロジェクトに各自の Project_ID、
gcloud
は Google Cloud のコマンドライン ツールです。このツールは、Cloud Shell にプリインストールされており、タブ補完がサポートされています。
出力:
出力:
gcloud
ドキュメントの全文については、gcloud CLI の概要ガイドをご覧ください。
必要な API にアクセスできることを確認するには、Dataflow API への接続をリセットします。
Google Cloud コンソールのタイトルバーにある [検索] フィールドに「Dataflow API」と入力し、検索結果から [Dataflow API] をクリックします。
[管理] をクリックします。
[API を無効にする] をクリックします。
確認を求められたら、[無効にする] をクリックします。
API が再度有効になると、ページに [無効にする] オプションが表示されます。
[進行状況を確認] をクリックして、実行したタスクを確認します。
このラボで使用する Dataflow Python の例をダウンロードします。
Cloud Shell で Cloud Storage バケットを作成し、そのバケットにファイルをコピーします。これらのファイルは Dataflow Python の例です。
[進行状況を確認] をクリックして、実行したタスクを確認します。
gsutil
コマンドを使用して、先ほど作成した Cloud Storage バケットにファイルをコピーします。[進行状況を確認] をクリックして、実行したタスクを確認します。
このタスクでは、BigQuery データセットを作成します。BigQuery データセットは、BigQuery 内でテーブルが作成される場所です。
Cloud Shell で、lake
という名前のデータセットを作成します。
[進行状況を確認] をクリックして、実行したタスクを確認します。
このタスクでは、パイプライン コードを確認して、その動作を理解します。次に、パイプラインを設定して実行します。
データ取り込みパイプラインは、TextIO ソースと BigQueryIO 宛先を使用して、Cloud Storage から BigQuery テーブルにデータを取り込みます。このパイプラインが実行する具体的な処理は以下のとおりです。
Cloud Shell コードエディタを使用して、パイプラインのコードを確認します。
Cloud Shell のメニューバーで、[エディタを開く] をクリックします。
dataflow_python_examples
> dataflow_python_examples
に移動し、data_ingestion.py
ファイルを開きます。
コードを実行した際の挙動を説明したファイル内のコメントを読みます。
このコードは、Cloud Storage のデータファイルを使用して BigQuery テーブルにデータを入力します。
このラボの Dataflow ジョブには Python 3.8
が必要です。適切なバージョンで作業するには、Python 3.8 Docker コンテナで Dataflow プロセスを実行します。
このコマンドによって、Docker コンテナと Python 3.8 の最新の安定版が pull され、コマンドシェルが開き、コンテナ内の次のコマンドが実行されます。-v
フラグでソースコードをコンテナの volume
として指定しているため、Cloud Shell エディタで編集できるうえ、実行中のコンテナ内でもアクセスできます。
apache-beam
をインストールします。このコードは、必要なワーカーをスピンアップし、パイプラインが完了したらそのワーカーをシャットダウンします。
Dataflow ページが開いたら、ジョブのステータスを確認します。
[ジョブ ステータス] が「完了しました」になったら、次のステップに進みます。この取り込みパイプラインは、起動して処理が完了し、シャットダウンするまでに約 5 分かかります。
BigQuery(ナビゲーション メニュー > [BigQuery])に移動して、データが入力されていることを確認します。
プロジェクト名をクリックして、lake
データセットの usa_names テーブルを表示します。
usa_names
データの例を確認します。usa_names
テーブルが表示されない場合は、ページを更新するか、従来の BigQuery UI を使用してテーブルを表示してください。
[進行状況を確認] をクリックして、実行したタスクを確認します。
このタスクでは、データ変換パイプラインを確認して、その動作を理解します。次に、パイプラインを実行して Cloud Storage ファイルを処理し、その結果を BigQuery に出力します。
データ変換パイプラインも、TextIO ソースと BigQueryIO 宛先を使用して Cloud Storage から BigQuery テーブルにデータを取り込みますが、さらにデータ変換を行います。このパイプラインが実行する具体的な処理は以下のとおりです。
data_transformation.py
を開きます。コードを実行した際の挙動を説明したファイル内のコメントを読みます。
Google Cloud コンソールのタイトルバーにある [検索] フィールドに「Dataflow」と入力し、検索結果から [Dataflow] をクリックします。
このジョブの名前をクリックして、ジョブのステータスを確認します。
この Dataflow パイプラインは、起動して処理が完了し、シャットダウンするまでに約 5 分かかります。
lake
データセットに usa_names_transformed テーブルが表示されます。
usa_names_transformed
データの例を確認します。usa_names_transformed
テーブルが表示されない場合は、ページを更新するか、従来の BigQuery UI を使用してテーブルを表示してください。
[進行状況を確認] をクリックして、実行したタスクを確認します。
次の処理を行うデータ拡充パイプラインを構築します。
コードエディタで data_enrichment.py
を開きます。
コードを実行した際の挙動を説明したコメントを確認します。このコードは BigQuery にデータを入力します。
83 行目は以下のようになっています。
この Dataflow パイプラインは、起動して処理が完了し、シャットダウンするまでに約 5 分かかります。
lake
データセットに usa_names_enriched テーブルが表示されます。
usa_names_enriched
データの例を確認します。usa_names_enriched
テーブルが表示されない場合は、ページを更新するか、従来の BigQuery UI を使用してテーブルを表示してください。
[進行状況を確認] をクリックして、実行したタスクを確認します。
2 つの BigQuery データソースからデータを読み取りデータソースを結合する Dataflow パイプラインを構築します。具体的には以下を行います。
まず、data_lake_to_mart.py
のコードを確認して、その処理内容を理解します。次に、クラウドでパイプラインを実行します。
data_lake_to_mart.py
ファイルを開きます。コードを実行した際の挙動を説明したファイル内のコメントを読みます。このコードは、2 つのテーブルを結合し、その結果を BigQuery の新しいテーブルに書き込みます。
Google Cloud コンソールのタイトルバーにある [検索] フィールドに「Dataflow」と入力し、検索結果から [Dataflow] をクリックします。
この新しいジョブをクリックしてステータスを確認します。
この Dataflow パイプラインは、起動して処理が完了し、シャットダウンするまでに約 5 分かかります。
lake
データセットに orders_denormalized_sideinput テーブルが表示されます。
orders_denormalized_sideinput
データの例を確認します。orders_denormalized_sideinput
テーブルが表示されない場合は、ページを更新するか、従来の BigQuery UI を使用してテーブルを表示してください。
[進行状況を確認] をクリックして、実行したタスクを確認します。
今回のラボで学習した内容の理解を深めていただくために、以下の選択問題を用意しました。正解を目指して頑張ってください。
Dataflow を使用して Python コードを実行し、Cloud Storage から BigQuery にデータを取り込み、BigQuery 内のデータを変換、拡充しました。
さらに情報を探す場合は、以下の公式ドキュメントをご確認ください。
Google Cloud トレーニングと認定資格を通して、Google Cloud 技術を最大限に活用できるようになります。必要な技術スキルとベスト プラクティスについて取り扱うクラスでは、学習を継続的に進めることができます。トレーニングは基礎レベルから上級レベルまであり、オンデマンド、ライブ、バーチャル参加など、多忙なスケジュールにも対応できるオプションが用意されています。認定資格を取得することで、Google Cloud テクノロジーに関するスキルと知識を証明できます。
マニュアルの最終更新日: 2025 年 4 月 1 日
ラボの最終テスト日: 2025 年 4 月 1 日
Copyright 2025 Google LLC. All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。
このコンテンツは現在ご利用いただけません
利用可能になりましたら、メールでお知らせいたします
ありがとうございます。
利用可能になりましたら、メールでご連絡いたします
1 回に 1 つのラボ
既存のラボをすべて終了して、このラボを開始することを確認してください