
始める前に
- ラボでは、Google Cloud プロジェクトとリソースを一定の時間利用します
- ラボには時間制限があり、一時停止機能はありません。ラボを終了した場合は、最初からやり直す必要があります。
- 画面左上の [ラボを開始] をクリックして開始します
Create Vertex AI Platform Notebooks instance and clone course repo
/ 25
Generate synthetic data
/ 25
Aggregating site traffic by user and run your pipeline
/ 25
Aggregating site traffic by minute and run the pipeline
/ 25
このラボの内容:
各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。
Qwiklabs にシークレット ウィンドウでログインします。
ラボのアクセス時間(例: 1:15:00
)に注意し、時間内に完了できるようにしてください。
一時停止機能はありません。必要な場合はやり直せますが、最初からになります。
準備ができたら、[ラボを開始] をクリックします。
ラボの認証情報(ユーザー名とパスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。
[Google Console を開く] をクリックします。
[別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。
利用規約に同意し、再設定用のリソースページをスキップします。
このラボでは、すべてのコマンドを Workbench インスタンス ノートブックのターミナルで実行します。
Google Cloud コンソールのナビゲーション メニュー()で [Vertex AI] を選択するか、Vertex AI ダッシュボードに移動します。
[すべての推奨 API を有効化] をクリックします。次に、Notebook API が有効になっていることを確認します。
ナビゲーション メニューで [ワークベンチ] をクリックします。
[ワークベンチ] ページの上部で、[インスタンス] ビューになっていることを確認します。
[新規作成] をクリックします。
インスタンスの構成:
名前 | リージョン | ゾーン | 詳細オプション(省略可能) |
---|---|---|---|
lab-workbench | 必要に応じて [詳細オプション] をクリックして、さらにカスタマイズします(マシンタイプ、ディスクサイズなど)。 |
インスタンスが作成されるまで数分かかります。作成が終了するとインスタンスの名前の横に緑色のチェックマークが付きます。
インスタンスの名前の横に表示されている [JupyterLab を開く] をクリックして JupyterLab インターフェースを起動します。ブラウザで新しいタブが開きます。
次に、[ターミナル] をクリックします。これにより、このラボのすべてのコマンドを実行できるターミナルが開きます。
このラボで使用するコード リポジトリをダウンロードします。
ノートブック環境の左側パネルのファイル ブラウザに、training-data-analyst リポジトリが追加されます。
クローン リポジトリ /training-data-analyst/quests/dataflow_python/
に移動します。ラボごとに、1 つのフォルダが表示されます。このフォルダはさらに、完成させるコードが格納される lab
サブフォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できる solution
サブフォルダとに分けられています。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
ラボのこのパートでは、次のようなパイプラインを作成します。
CommonLog
オブジェクトに変換する。以前のラボと同様に、最初のステップはパイプラインで処理するデータを生成することです。ラボ環境を開いて、以前と同じようにデータを生成します。
実際のパイプライン コードの編集を開始する前に、必要な依存関係がインストールされていることを確認する必要があります。
このスクリプトにより、次のような行が含まれている events.json
というファイルが作成されます。
このファイルは
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
このタスクには 2 つのミニチャレンジがあります。解決方法は、ソリューションで確認できます。
ファイル エクスプローラで以下のパスに移動し、batch_user_traffic_pipeline.py
ファイルを開きます。
このパイプラインには、入力パスと出力テーブル名のコマンドライン オプションを受け取るために必要なコードと、Google Cloud Storage からイベントを読み取り、それらのイベントを解析して、結果を BigQuery に書き込むコードがすでに含まれています。ただし、いくつかの重要な部分が欠けています。それらの部分は #TODO
でマークされています。
これはデータ モデリングのタスクです。このステップでは、すべてのログイベントをユーザーごとにグループ化した後、各ユーザーについてどのような情報を計算したいかを検討する必要があります。
集計結果を保持する構造(スキーマ)を定義する必要があります。集計に使用できるフィールドを調べるには、CommonLog クラスを確認する必要があります。
キーを特定する: クラス名は PerUserAggregation
であるため、保持する必要がある主要な情報は user_id
です。
計算する指標を選択する: ユーザーの CommonLog
エントリのコレクションから何を計算できますか。
例:
これは Apache Beam フレームワークの技術要件です。このチャレンジでは、Beam がカスタムデータ型を処理する仕組みについての知識をテストします。
多くの場合、Apache Beam がパイプラインを実行する際は、異なる複数のコンピュータ(ワーカーと呼ばれる)間でデータを送信する必要があります。そのためには、Python オブジェクト(PerUserAggregation インスタンス
など)をバイト ストリームにシリアル化してから、ネットワーク経由で送信し、送信先でオブジェクトに逆シリアル化する必要があります。Coder
は、このシリアル化と逆シリアル化を実行する方法を Beam に指示するオブジェクトです。
カスタムクラス PerUserAggregation
のエンコード / デコード方法を Beam に指示しなければ、パイプラインはエラーで失敗します。
解決方法は、#TODO
の上の行に示されています。Beam には、NamedTuple クラスと完全に連携する RowCoder
が用意されています。CommonLog
の場合と同様に、新しい PerUserAggregation クラスの RowCoder
を登録するだけで済みます。
ターミナルに戻り、次のコマンドを実行して Cloud Dataflow サービスを使用したパイプラインを実行します。問題が発生した場合は DirectRunner で実行するか、ソリューションを参照してください。
ENTER_REGION_ID
フィールドと ENTER_ZONE_ID
フィールドを下の表に従って置き換えます。リージョン | ゾーン | |
---|---|---|
送信したジョブは Dataflow ダッシュボードで確認できます。
ジョブ ステータスが「Successful」になったら、BigQuery で結果を確認します。このタスクを完了するには、パイプラインが完了するまで数分待ってから BigQuery に移動し、user_traffic
テーブルに対してクエリを実行します。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
ラボのこのパートでは、batch_minute_traffic
という新しいパイプラインを作成します。このパイプラインは batch_user_traffic
で使用されたバッチ分析の基本原則を拡張して、バッチ全体でユーザー別に集計するのではなく、イベントが発生した時間ごとに集計します。ここにも複数の #TODO
があります。基本的には自分で修正する必要がありますが、ソリューションを参考にすることもできます。
IDE で以下のパスに移動して、batch_minute_traffic_pipeline.py
ファイルを開きます。
これらの #TODO
項目は、Apache Beam で従来の時系列集計パイプラインを構築する手順を説明するものです。それぞれが、バッチ処理とストリーム処理のコアコンセプトを表しています。
このパイプラインの目標は、ウェブイベントの JSON ファイル(events.json)を処理して、1 分あたりのイベント発生件数をカウントし、これらの分単位のカウントを BigQuery テーブルに書き込むことです。
パイプラインのフローは次のようになります。
テキストを読み取る
-> CommonLog に解析する
-> TODO
-> BigQuery に書き込む
#TODO
はパイプラインのコアコンセプトであり、ここで実際の集計ロジックが実行されます。
CommonLog オブジェクトのコレクションがあります。パイプラインの次のステップでは、これらのイベントを時間別にグループ化します(WindowByMinute)。Apache Beam がこれを行うには、各データのイベント時間を把握していなければなりません。ここでの課題は、このタイムスタンプを CommonLog
オブジェクト内で見つける方法を Beam に指示することです。
add_timestamp
関数(pipeline_utils.py で定義)は、各ログレコードに含まれるタイムスタンプ文字列を解析し、ウィンドウ処理に必要となる適切な Beam タイムスタンプとして当該要素に付加します。
例:
スクリプトの 2 つ目のタスクに取り掛かりましょう。ここでは、チャレンジに従って要素のグループ変換を行います。
この変換では、イベントのタイムスタンプに基づいて、固定サイズで互いに重なり合わない 60 秒(1 分)のウィンドウに要素をグループ化します。
例:
スクリプトの 3 つ目のタスクに進んで、ウィンドウごとのイベント数をカウントしましょう。
この combiner は、1 分間の各ウィンドウに含まれる要素の数をカウントします。.without_defaults()
を使用して、空のウィンドウに対して出力が生成されないようにします。
例:
スクリプトの最後のタスクに進みましょう。ここでは、行に戻してタイムスタンプを追加します。
GetTimestampFn
(pipeline_utils.py で定義)は、各ウィンドウの整数カウントを取得し、それを辞書形式に変換して、BigQuery スキーマに一致するようにウィンドウの開始時間を文字列として追加します。
このタスクを完了するには、int 型の要素を受け取り、ウィンドウ情報にアクセスするための追加パラメータを渡す ParDo 関数
を作成します。BigQuery テーブル スキーマのタイムスタンプ フィールドは STRING であり、タイムスタンプを文字列に変換する必要があるので注意してください。
例:
コーディングが完了したら、以下のコマンドを使用してパイプラインを実行します。コードをテストするときは、RUNNER 環境変数を DirectRunner に変更すると、パイプラインがローカルで実行されるため、はるかに高速になります。ここでは、Dataflow
を使用してパイプラインを実行します。
リージョン | ||
---|---|---|
minute_traffic
テーブルに対してクエリを実行します。[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。
ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。
星の数は、それぞれ次の評価を表します。
フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。
フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。
Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。
このコンテンツは現在ご利用いただけません
利用可能になりましたら、メールでお知らせいたします
ありがとうございます。
利用可能になりましたら、メールでご連絡いたします
1 回に 1 つのラボ
既存のラボをすべて終了して、このラボを開始することを確認してください