arrow_back

Dataflow によるサーバーレス データ処理 - ストリーミング分析に Dataflow SQL を使用する(Java)

ログイン 参加
700 以上のラボとコースにアクセス

Dataflow によるサーバーレス データ処理 - ストリーミング分析に Dataflow SQL を使用する(Java)

ラボ 2時間 universal_currency_alt クレジット: 5 show_chart 上級
info このラボでは、学習をサポートする AI ツールが組み込まれている場合があります。
700 以上のラボとコースにアクセス

概要

このラボの内容:

  • SQL を使用してユーザー別のサイト トラフィックを集計するパイプラインを作成する。

設定と要件

各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。

  1. Qwiklabs にシークレット ウィンドウでログインします。

  2. ラボのアクセス時間(例: 1:15:00)に注意し、時間内に完了できるようにしてください。
    一時停止機能はありません。必要な場合はやり直せますが、最初からになります。

  3. 準備ができたら、[ラボを開始] をクリックします。

  4. ラボの認証情報(ユーザー名パスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。

  5. [Google Console を開く] をクリックします。

  6. [別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
    他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。

  7. 利用規約に同意し、再設定用のリソースページをスキップします。

プロジェクトの権限を確認する

Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。

  1. Google Cloud コンソールのナビゲーション メニューナビゲーション メニュー アイコン)で、[IAM と管理] > [IAM] を選択します。

  2. Compute Engine のデフォルトのサービス アカウント {project-number}-compute@developer.gserviceaccount.com が存在し、編集者のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] > [ダッシュボード] から確認できます。

Compute Engine のデフォルトのサービス アカウント名と編集者のステータスがハイライト表示された [権限] タブページ

注: アカウントが IAM に存在しない場合やアカウントに編集者のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。
  1. Google Cloud コンソールのナビゲーション メニューで、[Cloud の概要] > [ダッシュボード] をクリックします。
  2. プロジェクト番号(例: 729328892908)をコピーします。
  3. ナビゲーション メニューで、[IAM と管理] > [IAM] を選択します。
  4. ロールの表の上部で、[プリンシパル別に表示] の下にある [アクセスを許可] をクリックします。
  5. [新しいプリンシパル] に次のように入力します。
{project-number}-compute@developer.gserviceaccount.com
  1. {project-number} はプロジェクト番号に置き換えてください。
  2. [ロール] で、[Project](または [基本])> [編集者] を選択します。
  3. [保存] をクリックします。

IDE の設定

このラボでは、Google Compute Engine でホストされる Theia Web IDE を主に使用します。これには、事前にクローンが作成されたラボリポジトリが含まれます。Java 言語サーバーがサポートされているとともに、Cloud Shell に似た仕組みで、gcloud コマンドライン ツールを通じて Google Cloud API へのプログラムによるアクセスが可能なターミナルも使用できます。

  1. Theia IDE にアクセスするには、Google Cloud Skills Boost に表示されたリンクをコピーして新しいタブに貼り付けます。
注: URL が表示された後も、環境が完全にプロビジョニングされるまで 3~5 分待つ必要がある場合があります。その間はブラウザにエラーが表示されます。

ide_url が表示されている認証情報ペイン

ラボリポジトリのクローンが環境に作成されました。各ラボは、完成させるコードが格納される labs フォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できる solution フォルダに分けられています。

  1. ファイル エクスプローラ ボタンをクリックして確認します。

展開されたファイル エクスプローラ メニューで、labs フォルダがハイライト表示されている

Cloud Shell で行うように、この環境で複数のターミナルを作成することも可能です。

[ターミナル] メニューでハイライト表示されている [新しいターミナル] オプション

提供されたサービス アカウント(ラボのユーザー アカウントとまったく同じ権限がある)でログインしたターミナルで gcloud auth list を実行すれば、以下を確認できます。

gcloud auth list コマンドが表示されたターミナル

環境が機能しなくなった場合は、IDE をホストしている VM を GCE コンソールから次のようにリセットしてみてください。

[VM インスタンス] ページで、リセットボタンと VM インスタンス名の両方がハイライト表示されている

SQL を使用してユーザー別のサイト トラフィックを集計する

このラボでは、以下を実行するように以前の BatchUserTraffic パイプラインを書き換えます。

  1. Cloud Storage 内のファイルからその日のトラフィックを読み取る。
  2. 各イベントを CommonLog オブジェクトに変換する。
  3. Java 変換ではなく SQL を使用して、一意のユーザー ID 別にヒット数を合計し、追加の集計を実行する。
  4. 結果として得られるデータを BigQuery に書き込む。
  5. 元データを後で分析できるように、追加のブランチで BigQuery に書き込む。

タスク 1. 環境を準備する

以前のラボと同様に、最初のステップはパイプラインで処理するデータを生成することです。ラボ環境を開いて、以前と同じようにデータを生成します。

適切なラボを開く

  1. IDE 環境に新しいターミナルをまだ作成していない場合は作成し、次のコマンドをコピーして貼り付けます。
# ディレクトリを変更してラボに移動する cd 4_SQL_Batch_Analytics/labs # 依存関係をダウンロードする mvn clean dependency:resolve export BASE_DIR=$(pwd)
  1. データ環境を設定します。
# GCS バケットと BQ データセットを作成する cd $BASE_DIR/../.. source create_batch_sinks.sh # イベント データフローを生成する source generate_batch_events.sh # 練習バージョンのコードを含むディレクトリに移動する cd $BASE_DIR
  1. Data Catalog API が有効になっていることを確認します。
gcloud services enable datacatalog.googleapis.com

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 データ環境を設定する

タスク 2. パイプラインに SQL 依存関係を追加する

  1. IDE で BatchUserTrafficSQLPipeline.java を開きます。これは 4_SQL_Batch_Analytics/labs/src/main/java/com/mypackage/pipeline にあります。

このパイプラインには、入力パスと 1 つの出力テーブル名のコマンドライン オプションを受け取るために必要なコードと、Google Cloud Storage からイベントを読み取り、それらのイベントを解析して、結果を BigQuery に書き込むコードがすでに含まれています。ただし、いくつかの重要な部分が欠けています。

以前のラボと同様に、パイプラインの次のステップでは、一意の user_id 別にイベントを集計し、それぞれのページビューをカウントします。ただし、今回は、Java ベースの変換ではなく SqlTransform を使用して、SQL で集計を実行します。

これを実装する前に、SQL 依存関係をパイプラインに追加する必要があります。

  1. このタスクを完了するには、4_SQL_Batch_Analytics/labs/ にあるこのパイプラインの pom.xml ファイルを開いて、次の依存関係を追加します。
<dependency> <groupId>org.apache.beam</groupId> <artifactId>beam-sdks-java-extensions-sql</artifactId> <version>${beam.version}</version> </dependency>
  1. IDE のターミナルで次のコマンドを実行し、Maven を通じて依存関係をダウンロードします。
mvn clean dependency:resolve
  1. BatchUserTrafficSQLPipeline.java で、次の import がファイルの先頭にあることを確認します。
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlPipelineOptions; import org.apache.beam.sdk.extensions.sql.SqlTransform;

タスク 3. ユーザー別のサイト トラフィックを集計する SQL ステートメントを作成する

Beam SQL は Apache Calcite 言語で実装できます。

SqlTransform.query(queryString) メソッドは、SQL クエリの文字列表現から PTransform を作成する唯一の API です。この PTransform は、単一の PCollection か、複数の PCollection を保持する 1 つの PCollectionTuple のいずれかに適用できます(PCollectionTuple については後で詳しく説明します)。

この PTransform は、単一の PCollection に適用されている場合、クエリ内の PCOLLECTION という名前のテーブルを介して参照できます。

PCollection<Row> filteredNames = testApps.apply( SqlTransform.query( "SELECT appId, description, rowtime " + "FROM PCOLLECTION " + "WHERE id=1"));

結果として得られる出力は関連スキーマを含む Row オブジェクトであり、SQL Transform または他の Java PTransform でさらに変更することやシンクに保存することができます。

  • このタスクを完了するには、SQLTransform をパイプラインに追加し、user_id 別にヒット数を集計する SQL 文字列を設定して「pageviews」という名前を付けます。

また、必要に応じて追加の集計を実行することもできます。ちなみに、これは以前に使用した PTransform です。

aggregateField("user_id", Count.combineFn(), "pageviews") .aggregateField("num_bytes", Sum.ofIntegers(), "total_bytes") .aggregateField("num_bytes", Max.ofIntegers(), "max_num_bytes") .aggregateField("num_bytes", Min.ofIntegers(), "min_num_bytes"))

役に立つ SQL 集計関数がいくつかあります。

COUNT(*) MAX(field_to_find_max_of) SUM(field_to_sum) MIN(field_to_find_min_of)

ヒントが必要な場合は、こちらのソリューションをご覧ください。

元データを保存するブランチを実装する

後から UI で SQL 集計を行うために、生の結果をすべて BigQuery に保存したい場合があります。

  • このタスクを完了するには、生の CommonLog オブジェクトを直接 BigQuery に書き出すブランチを使用して、パイプラインを再構成します。書き込み先は、コマンドライン オプション rawTableName で参照されるテーブル名です。

これを行うには、以前と同様に、パイプラインの最初のブランチをセミコロンで終了し、各ブランチを logs.apply(); で開始します。この新しいコマンドラインを、inputPath および aggregateTableName とともにパイプライン オプションに追加することを忘れないでください。また、BigQueryIO.<Object>write() の型ヒントも必ず変更してください。

タスク 4. パイプラインを実行する

  1. ターミナルに戻ってパイプラインを実行します。
# 環境変数を設定する export PROJECT_ID=$(gcloud config get-value project) export REGION='{{{project_0.default_region|Region}}}' export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export MAIN_CLASS_NAME=com.mypackage.pipeline.BatchUserTrafficSQLPipeline export RUNNER=DataflowRunner export INPUT_PATH=${PIPELINE_FOLDER}/events.json export AGGREGATE_TABLE_NAME=${PROJECT_ID}:logs.user_traffic export RAW_TABLE_NAME=${PROJECT_ID}:logs.raw cd $BASE_DIR mvn compile exec:java \ -Dexec.mainClass=${MAIN_CLASS_NAME} \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=${PROJECT_ID} \ --region=${REGION} \ --stagingLocation=${PIPELINE_FOLDER}/staging \ --tempLocation=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --inputPath=${INPUT_PATH} \ --aggregateTableName=${AGGREGATE_TABLE_NAME} \ --rawTableName=${RAW_TABLE_NAME}"
  1. [ナビゲーション メニュー] > [Dataflow] に移動して、パイプラインのステータスを確認します。

  2. パイプラインが終了したら、BigQuery UI に移動して、結果として得られる 2 つのテーブルに対してクエリを実行します。logs.raw が存在し、データが入力されていることを確認してください。これは後ほどラボで必要になります。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 SQL を使用してユーザー別のサイト トラフィックを集計する

ラボを終了する

ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。

ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。

星の数は、それぞれ次の評価を表します。

  • 星 1 つ = 非常に不満
  • 星 2 つ = 不満
  • 星 3 つ = どちらともいえない
  • 星 4 つ = 満足
  • 星 5 つ = 非常に満足

フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。

フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。

Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。

始める前に

  1. ラボでは、Google Cloud プロジェクトとリソースを一定の時間利用します
  2. ラボには時間制限があり、一時停止機能はありません。ラボを終了した場合は、最初からやり直す必要があります。
  3. 画面左上の [ラボを開始] をクリックして開始します

シークレット ブラウジングを使用する

  1. ラボで使用するユーザー名パスワードをコピーします
  2. プライベート モードで [コンソールを開く] をクリックします

コンソールにログインする

    ラボの認証情報を使用して
  1. ログインします。他の認証情報を使用すると、エラーが発生したり、料金が発生したりする可能性があります。
  2. 利用規約に同意し、再設定用のリソースページをスキップします
  3. ラボを終了する場合や最初からやり直す場合を除き、[ラボを終了] はクリックしないでください。クリックすると、作業内容がクリアされ、プロジェクトが削除されます

このコンテンツは現在ご利用いただけません

利用可能になりましたら、メールでお知らせいたします

ありがとうございます。

利用可能になりましたら、メールでご連絡いたします

1 回に 1 つのラボ

既存のラボをすべて終了して、このラボを開始することを確認してください

シークレット ブラウジングを使用してラボを実行する

このラボの実行には、シークレット モードまたはシークレット ブラウジング ウィンドウを使用してください。これにより、個人アカウントと受講者アカウントの競合を防ぎ、個人アカウントに追加料金が発生することを防ぎます。