arrow_back

Dataflow を使用したサーバーレスのデータ処理 - Dataflow を使用したバッチ分析パイプライン(Python)

ログイン 参加
700 以上のラボとコースにアクセス

Dataflow を使用したサーバーレスのデータ処理 - Dataflow を使用したバッチ分析パイプライン(Python)

ラボ 2時間 universal_currency_alt クレジット: 5 show_chart 上級
info このラボでは、学習をサポートする AI ツールが組み込まれている場合があります。
700 以上のラボとコースにアクセス

概要

このラボの内容:

  • ユーザー別にサイト トラフィックを集計するパイプラインを作成する。
  • 分単位でサイト トラフィックを集計するパイプラインを作成する。
  • 時系列データのウィンドウ処理を実装する。

前提条件

注: これは上級レベルのラボです。このラボには、Python の高度な知識が必要です。

設定と要件

各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。

  1. Qwiklabs にシークレット ウィンドウでログインします。

  2. ラボのアクセス時間(例: 1:15:00)に注意し、時間内に完了できるようにしてください。
    一時停止機能はありません。必要な場合はやり直せますが、最初からになります。

  3. 準備ができたら、[ラボを開始] をクリックします。

  4. ラボの認証情報(ユーザー名パスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。

  5. [Google Console を開く] をクリックします。

  6. [別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
    他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。

  7. 利用規約に同意し、再設定用のリソースページをスキップします。

パート A. Workbench インスタンスの開発環境の設定

このラボでは、すべてのコマンドを Workbench インスタンス ノートブックのターミナルで実行します。

  1. Google Cloud コンソールのナビゲーション メニューナビゲーション メニュー)で [Vertex AI] を選択するか、Vertex AI ダッシュボードに移動します。

  2. [すべての推奨 API を有効化] をクリックします。次に、Notebook API が有効になっていることを確認します。

  3. ナビゲーション メニューで [ワークベンチ] をクリックします。

    [ワークベンチ] ページの上部で、[インスタンス] ビューになっていることを確認します。

  4. [ボックスを追加する新規作成] をクリックします。

  5. インスタンスの構成:

名前 リージョン ゾーン 詳細オプション(省略可能)
lab-workbench 必要に応じて [詳細オプション] をクリックして、さらにカスタマイズします(マシンタイプ、ディスクサイズなど)。
  1. [作成] をクリックします。

インスタンスが作成されるまで数分かかります。作成が終了するとインスタンスの名前の横に緑色のチェックマークが付きます。

  1. インスタンスの名前の横に表示されている [JupyterLab を開く] をクリックして JupyterLab インターフェースを起動します。ブラウザで新しいタブが開きます。

  2. 次に、[ターミナル] をクリックします。これにより、このラボのすべてのコマンドを実行できるターミナルが開きます。

コード リポジトリをダウンロードする

このラボで使用するコード リポジトリをダウンロードします。

  1. 開いたターミナルで、次のコマンドを入力します。
git clone https://github.com/GoogleCloudPlatform/training-data-analyst cd /home/jupyter/training-data-analyst/quests/dataflow_python/
  1. ノートブック環境の左側パネルのファイル ブラウザに、training-data-analyst リポジトリが追加されます。

  2. クローン リポジトリ /training-data-analyst/quests/dataflow_python/ に移動します。ラボごとに、1 つのフォルダが表示されます。このフォルダはさらに、完成させるコードが格納される lab サブフォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できる solution サブフォルダとに分けられています。

展開された [表示] メニューでハイライト表示されているエクスプローラ オプション

注: 編集のためにファイルを開くには、目的のファイルに移動してクリックします。 ファイルが開き、コードを追加または変更できます。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 ノートブック インスタンスを作成し、コース リポジトリのクローンを作成する

タスク 1. ユーザー別にサイト トラフィックを集計する

ラボのこのパートでは、次のようなパイプラインを作成します。

  1. Cloud Storage 内のファイルからその日のトラフィックを読み取る。
  2. 各イベントを CommonLog オブジェクトに変換する。
  3. 個々のオブジェクトをユーザー ID 別にグループ化し、値を足し合わせてその特定ユーザーのヒットの合計数を取得して、ユニーク ユーザーごとの総ヒット数を合計する。
  4. 別の集計を各ユーザーに対して行う。
  5. 得られたデータを BigQuery に書き込む。

タスク 2. 合成データを生成する

以前のラボと同様に、最初のステップはパイプラインで処理するデータを生成することです。ラボ環境を開いて、以前と同じようにデータを生成します。

該当するラボを開く

  • IDE 環境のターミナルで、次のコマンドを実行します。
# ディレクトリをラボに変更する cd 3_Batch_Analytics/lab export BASE_DIR=$(pwd)

仮想環境と依存関係を設定する

実際のパイプライン コードの編集を開始する前に、必要な依存関係がインストールされていることを確認する必要があります。

  1. 次のコマンドを実行して、このラボでの作業用に仮想環境を作成します。
sudo apt-get update && sudo apt-get install -y python3-venv # 仮想環境を作成して有効化する python3 -m venv df-env source df-env/bin/activate
  1. 次に、パイプラインを実行するために必要なパッケージをインストールします。
python3 -m pip install -q --upgrade pip setuptools wheel python3 -m pip install apache-beam[gcp]
  1. Dataflow API が有効になっていることを確認します。
gcloud services enable dataflow.googleapis.com

データ環境を設定する

# GCS バケットと BQ データセットを作成する cd $BASE_DIR/../.. source create_batch_sinks.sh # イベント データフローを生成する source generate_batch_events.sh # 練習用コードが含まれているディレクトリに移動する cd $BASE_DIR

このスクリプトにより、次のような行が含まれている events.json というファイルが作成されます。

{"user_id": "-6434255326544341291", "ip": "192.175.49.116", "timestamp": "2019-06-19T16:06:45.118306Z", "http_request": "\"GET eucharya.html HTTP/1.0\"", "lat": 37.751, "lng": -97.822, "http_response": 200, "user_agent": "Mozilla/5.0 (compatible; MSIE 7.0; Windows NT 5.01; Trident/5.1)", "num_bytes": 182}

このファイルは にある Google Cloud Storage バケットに自動的にコピーされます。

  • Google Cloud Storage に移動し、ストレージ バケットに events.json というファイルが含まれていることを確認します。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 合成データを生成する

タスク 3. ユーザー別にページビュー数を合計する

このタスクには 2 つのミニチャレンジがあります。解決方法は、ソリューションで確認できます。

ファイル エクスプローラで以下のパスに移動し、batch_user_traffic_pipeline.py ファイルを開きます。

training-data-analyst/quests/dataflow_python/3_Batch_Analytics/lab

ミニチャレンジ - #TODO 1:

このパイプラインには、入力パスと出力テーブル名のコマンドライン オプションを受け取るために必要なコードと、Google Cloud Storage からイベントを読み取り、それらのイベントを解析して、結果を BigQuery に書き込むコードがすでに含まれています。ただし、いくつかの重要な部分が欠けています。それらの部分は #TODO でマークされています。

これはデータ モデリングのタスクです。このステップでは、すべてのログイベントをユーザーごとにグループ化した後、各ユーザーについてどのような情報を計算したいかを検討する必要があります。

集計結果を保持する構造(スキーマ)を定義する必要があります。集計に使用できるフィールドを調べるには、CommonLog クラスを確認する必要があります。

解決方法:

  1. キーを特定する: クラス名は PerUserAggregation であるため、保持する必要がある主要な情報は user_id です。

  2. 計算する指標を選択する: ユーザーの CommonLog エントリのコレクションから何を計算できますか。

    • カウント: ユーザーがサーバーにアクセスした回数
    • 合計: ユーザーがダウンロードしたバイト数(num_bytes)の合計
    • 最小 / 最大: 最初と最後のアクティビティのタイムスタンプ
  3. 例:

user_id: str page_views: int ...

ミニチャレンジ - #TODO 2:

これは Apache Beam フレームワークの技術要件です。このチャレンジでは、Beam がカスタムデータ型を処理する仕組みについての知識をテストします。

多くの場合、Apache Beam がパイプラインを実行する際は、異なる複数のコンピュータ(ワーカーと呼ばれる)間でデータを送信する必要があります。そのためには、Python オブジェクト(PerUserAggregation インスタンスなど)をバイト ストリームにシリアル化してから、ネットワーク経由で送信し、送信先でオブジェクトに逆シリアル化する必要があります。Coder は、このシリアル化と逆シリアル化を実行する方法を Beam に指示するオブジェクトです。

カスタムクラス PerUserAggregation のエンコード / デコード方法を Beam に指示しなければ、パイプラインはエラーで失敗します。

解決方法:

解決方法は、#TODO の上の行に示されています。Beam には、NamedTuple クラスと完全に連携する RowCoder が用意されています。CommonLog の場合と同様に、新しい PerUserAggregation クラスの RowCoder を登録するだけで済みます。

  1. 例:
beam.coders.registry.register_coder(PerUserAggregation, ...)

タスク 4. パイプラインを実行する

ターミナルに戻り、次のコマンドを実行して Cloud Dataflow サービスを使用したパイプラインを実行します。問題が発生した場合は DirectRunner で実行するか、ソリューションを参照してください。

  • 以下のコード スニペットで、ENTER_REGION_ID フィールドと ENTER_ZONE_ID フィールドを下の表に従って置き換えます。
リージョン ゾーン
  • リージョンとゾーンの値は、ラボの仕様に合わせて置き換えてください。
# 1. すべての環境変数を設定する export PROJECT_ID=$(gcloud config get-value project) export REGION=ENTER_REGION_ID export ZONE=ENTER_ZONE_ID export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export RUNNER=DataflowRunner export INPUT_PATH=${PIPELINE_FOLDER}/events.json export TABLE_NAME=${PROJECT_ID}:logs.user_traffic # 2. 入力ファイルが存在することを再確認する # このコマンドがエラーを返さないことを確認する。 echo "Verifying input file exists at ${INPUT_PATH}..." gcloud storage ls ${INPUT_PATH} # 3. パイプライン スクリプトを実行する echo "Running the Dataflow pipeline..." python3 batch_user_traffic_pipeline.py \ --project=${PROJECT_ID} \ --region=${REGION} \ --worker_zone=${ZONE} \ --staging_location=${PIPELINE_FOLDER}/staging \ --temp_location=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --input_path=${INPUT_PATH} \ --table_name=${TABLE_NAME}
  • 送信したジョブは Dataflow ダッシュボードで確認できます。

  • ジョブ ステータスが「Successful」になったら、BigQuery で結果を確認します。このタスクを完了するには、パイプラインが完了するまで数分待ってから BigQuery に移動し、user_traffic テーブルに対してクエリを実行します。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 ユーザー別にサイト トラフィックを集計し、パイプラインを実行する

パート B: 分単位でサイト トラフィックを集計する

ラボのこのパートでは、batch_minute_traffic という新しいパイプラインを作成します。このパイプラインは batch_user_traffic で使用されたバッチ分析の基本原則を拡張して、バッチ全体でユーザー別に集計するのではなく、イベントが発生した時間ごとに集計します。ここにも複数の #TODO があります。基本的には自分で修正する必要がありますが、ソリューションを参考にすることもできます。

IDE で以下のパスに移動して、batch_minute_traffic_pipeline.py ファイルを開きます。

3_Batch_Analytics/lab 注: メイン スクリプト(batch_minute_traffic_pipeline.py)で作業を開始する前に、2 つのヘルパー ファイル pipeline_utils.py および setup.py を確認してください。Dataflow ジョブを実行するために、これらのヘルパー ファイルを利用します。pipeline_utils では、カスタムの Python ロジック(クラスと関数)が別のモジュールに保持されており、このロジックをパッケージ化してリモートの Dataflow ワーカーに送信できます。一方、setup.py は Dataflow の取扱説明書として機能し、pipeline_utils.py ファイルを適切なパッケージにバンドルしてすべてのワーカーにインストールする方法を正確に指示します。

タスク 5. 各要素にタイムスタンプを追加する

これらの #TODO 項目は、Apache Beam で従来の時系列集計パイプラインを構築する手順を説明するものです。それぞれが、バッチ処理とストリーム処理のコアコンセプトを表しています。 このパイプラインの目標は、ウェブイベントの JSON ファイル(events.json)を処理して、1 分あたりのイベント発生件数をカウントし、これらの分単位のカウントを BigQuery テーブルに書き込むことです。

パイプラインのフローは次のようになります。 テキストを読み取る -> CommonLog に解析する -> TODO -> BigQuery に書き込む

#TODO はパイプラインのコアコンセプトであり、ここで実際の集計ロジックが実行されます。

ミニチャレンジ - #TODO 1:

CommonLog オブジェクトのコレクションがあります。パイプラインの次のステップでは、これらのイベントを時間別にグループ化します(WindowByMinute)。Apache Beam がこれを行うには、各データのイベント時間を把握していなければなりません。ここでの課題は、このタイムスタンプを CommonLog オブジェクト内で見つける方法を Beam に指示することです。

解決方法:

  1. add_timestamp 関数(pipeline_utils.py で定義)は、各ログレコードに含まれるタイムスタンプ文字列を解析し、ウィンドウ処理に必要となる適切な Beam タイムスタンプとして当該要素に付加します。

  2. 例:

| 'AddEventTimestamp' >> beam.Map(...)

タスク 6. 1 分間のウィンドウにウィンドウ処理する

スクリプトの 2 つ目のタスクに取り掛かりましょう。ここでは、チャレンジに従って要素のグループ変換を行います。

ミニチャレンジ - #TODO 2:

  1. この変換では、イベントのタイムスタンプに基づいて、固定サイズで互いに重なり合わない 60 秒(1 分)のウィンドウに要素をグループ化します。

  2. 例:

| "WindowByMinute" >> beam.WindowInto(beam.window.FixedWindows(...))
  1. 他の種類のウィンドウ処理の詳細については、Apache Beam ドキュメントのセクション 8.2. Provided windowing functions を参照してください。

タスク 7. ウィンドウごとのイベント数をカウントする

スクリプトの 3 つ目のタスクに進んで、ウィンドウごとのイベント数をカウントしましょう。

ミニチャレンジ - #TODO 3:

  1. この combiner は、1 分間の各ウィンドウに含まれる要素の数をカウントします。.without_defaults() を使用して、空のウィンドウに対して出力が生成されないようにします。

  2. 例:

| "CountPerMinute" >> beam.CombineGlobally(CountCombineFn())...()
  1. このタスクを完了するには、各ウィンドウ内のすべての要素をカウントする変換を追加します。行き詰まった場合は、ソリューションを参照してください。

タスク 8. 行に戻してタイムスタンプを追加する

スクリプトの最後のタスクに進みましょう。ここでは、行に戻してタイムスタンプを追加します。

ミニチャレンジ - #TODO 4:

  1. GetTimestampFn(pipeline_utils.py で定義)は、各ウィンドウの整数カウントを取得し、それを辞書形式に変換して、BigQuery スキーマに一致するようにウィンドウの開始時間を文字列として追加します。

  2. このタスクを完了するには、int 型の要素を受け取り、ウィンドウ情報にアクセスするための追加パラメータを渡す ParDo 関数を作成します。BigQuery テーブル スキーマのタイムスタンプ フィールドは STRING であり、タイムスタンプを文字列に変換する必要があるので注意してください。

  3. 例:

| "AddWindowTimestamp" >> beam.ParDo(...()) | 'WriteToBQ' >> beam.io.WriteToBigQuery( table_name, schema=table_schema, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE )

タスク 9. パイプラインを実行する

コーディングが完了したら、以下のコマンドを使用してパイプラインを実行します。コードをテストするときは、RUNNER 環境変数を DirectRunner に変更すると、パイプラインがローカルで実行されるため、はるかに高速になります。ここでは、Dataflow を使用してパイプラインを実行します。

  • リージョンとゾーンの値は、ラボの仕様に合わせて置き換えてください。
リージョン
export PROJECT_ID=$(gcloud config get-value project) export REGION=ENTER_REGION_ID export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export RUNNER=DataflowRunner export INPUT_PATH=${PIPELINE_FOLDER}/events.json export TABLE_NAME=${PROJECT_ID}:logs.minute_traffic python3 batch_minute_traffic_pipeline.py \ --project=${PROJECT_ID} \ --region=${REGION} \ --staging_location=${PIPELINE_FOLDER}/staging \ --temp_location=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --input_path=${INPUT_PATH} \ --table_name=${TABLE_NAME} \ --setup_file=./setup.py

タスク 10. 結果を確認する

  • このタスクを完了するには、パイプラインが実行されるまで数分待ってから BigQuery に移動し、minute_traffic テーブルに対してクエリを実行します。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 分単位でサイト トラフィックを集計し、パイプラインを実行する

ラボを終了する

ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。

ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。

星の数は、それぞれ次の評価を表します。

  • 星 1 つ = 非常に不満
  • 星 2 つ = 不満
  • 星 3 つ = どちらともいえない
  • 星 4 つ = 満足
  • 星 5 つ = 非常に満足

フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。

フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。

Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。

始める前に

  1. ラボでは、Google Cloud プロジェクトとリソースを一定の時間利用します
  2. ラボには時間制限があり、一時停止機能はありません。ラボを終了した場合は、最初からやり直す必要があります。
  3. 画面左上の [ラボを開始] をクリックして開始します

シークレット ブラウジングを使用する

  1. ラボで使用するユーザー名パスワードをコピーします
  2. プライベート モードで [コンソールを開く] をクリックします

コンソールにログインする

    ラボの認証情報を使用して
  1. ログインします。他の認証情報を使用すると、エラーが発生したり、料金が発生したりする可能性があります。
  2. 利用規約に同意し、再設定用のリソースページをスキップします
  3. ラボを終了する場合や最初からやり直す場合を除き、[ラボを終了] はクリックしないでください。クリックすると、作業内容がクリアされ、プロジェクトが削除されます

このコンテンツは現在ご利用いただけません

利用可能になりましたら、メールでお知らせいたします

ありがとうございます。

利用可能になりましたら、メールでご連絡いたします

1 回に 1 つのラボ

既存のラボをすべて終了して、このラボを開始することを確認してください

シークレット ブラウジングを使用してラボを実行する

このラボの実行には、シークレット モードまたはシークレット ブラウジング ウィンドウを使用してください。これにより、個人アカウントと受講者アカウントの競合を防ぎ、個人アカウントに追加料金が発生することを防ぎます。