arrow_back

Dataflow アカデミー(Java) - ラボ 2 - 分岐するパイプラインとカスタム Dataflow Flex テンプレート

ログイン 参加
700 以上のラボとコースにアクセス

Dataflow アカデミー(Java) - ラボ 2 - 分岐するパイプラインとカスタム Dataflow Flex テンプレート

ラボ 2時間 universal_currency_alt クレジット: 1 show_chart 上級
info このラボでは、学習をサポートする AI ツールが組み込まれている場合があります。
700 以上のラボとコースにアクセス

概要

このラボの内容:

  • ブランチがあるパイプラインを実装する
  • 書き込む前にデータをフィルタする
  • データを <Row> オブジェクトとして処理する
  • カスタム コマンドライン パラメータをパイプラインに追加する
  • カスタム パイプラインをカスタム Dataflow Flex テンプレートに変換する
  • Dataflow Flex テンプレートを実行する

前提条件:

  • Java に関する基本的な知識

前のラボでは、基本的な抽出、変換、読み込みパイプラインを作成し、対応する Dataflow テンプレートを使用して Google Cloud Storage のバッチ データ ストレージを取り込みました。このパイプラインは、次の変換シーケンスで構成されています。

alt_text

ところが、多くのパイプラインはこのような単純な構造ではありません。このラボでは、より高度な連続しないパイプラインを構築します。

今回のユースケースはリソース消費量の最適化です。プロダクトはリソースの利用状況によって変わります。また、ビジネスにおいてすべてのデータが同じように使われるわけではありません。分析ワークロードなどで定期的にクエリされるデータもあれば、復元にのみ使用されるデータもあります。このラボでは、最初のラボで作成したパイプラインのリソース消費量を最適化するために、アナリストが使用するデータのみを BigQuery に保存して、他のデータは低コストで耐久性の高いストレージ サービスである Google Cloud Storage の Coldline Storage にアーカイブします。

設定と要件

Qwiklabs の設定

各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。

  1. Qwiklabs にシークレット ウィンドウでログインします。

  2. ラボのアクセス時間(例: 1:15:00)に注意し、時間内に完了できるようにしてください。
    一時停止機能はありません。必要な場合はやり直せますが、最初からになります。

  3. 準備ができたら、[ラボを開始] をクリックします。

  4. ラボの認証情報(ユーザー名パスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。

  5. [Google Console を開く] をクリックします。

  6. [別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
    他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。

  7. 利用規約に同意し、再設定用のリソースページをスキップします。

Google Cloud Shell の有効化

Google Cloud Shell は、開発ツールと一緒に読み込まれる仮想マシンです。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働します。

Google Cloud Shell を使用すると、コマンドラインで Google Cloud リソースにアクセスできます。

  1. Google Cloud コンソールで、右上のツールバーにある [Cloud Shell をアクティブにする] ボタンをクリックします。

    ハイライト表示された Cloud Shell アイコン

  2. [続行] をクリックします。

環境がプロビジョニングされ、接続されるまでしばらく待ちます。接続した時点で認証が完了しており、プロジェクトに各自のプロジェクト ID が設定されます。次に例を示します。

Cloud Shell ターミナルでハイライト表示されたプロジェクト ID

gcloud は Google Cloud のコマンドライン ツールです。このツールは、Cloud Shell にプリインストールされており、タブ補完がサポートされています。

  • 次のコマンドを使用すると、有効なアカウント名を一覧表示できます。
gcloud auth list

出力:

Credentialed accounts: - @.com (active)

出力例:

Credentialed accounts: - google1623327_student@qwiklabs.net
  • 次のコマンドを使用すると、プロジェクト ID を一覧表示できます。
gcloud config list project

出力:

[core] project =

出力例:

[core] project = qwiklabs-gcp-44776a13dea667a6 注: gcloud ドキュメントの全文については、 gcloud CLI の概要ガイド をご覧ください。

プロジェクトの権限を確認する

Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。

  1. Google Cloud コンソールのナビゲーション メニューナビゲーション メニュー アイコン)で、[IAM と管理] > [IAM] を選択します。

  2. Compute Engine のデフォルトのサービス アカウント {project-number}-compute@developer.gserviceaccount.com が存在し、編集者のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] > [ダッシュボード] から確認できます。

Compute Engine のデフォルトのサービス アカウント名と編集者のステータスがハイライト表示された [権限] タブページ

注: アカウントが IAM に存在しない場合やアカウントに編集者のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。
  1. Google Cloud コンソールのナビゲーション メニューで、[Cloud の概要] > [ダッシュボード] をクリックします。
  2. プロジェクト番号(例: 729328892908)をコピーします。
  3. ナビゲーション メニューで、[IAM と管理] > [IAM] を選択します。
  4. ロールの表の上部で、[プリンシパル別に表示] の下にある [アクセス権を付与] をクリックします。
  5. [新しいプリンシパル] に次のように入力します。
{project-number}-compute@developer.gserviceaccount.com
  1. {project-number} はプロジェクト番号に置き換えてください。
  2. [ロール] で、[Project](または [基本])> [編集者] を選択します。
  3. [保存] をクリックします。

IDE の設定

このラボでは、Google Compute Engine でホストされる Theia Web IDE を主に使用します。これには、事前にクローンが作成されたラボリポジトリが含まれます。Java 言語サーバーがサポートされているとともに、Cloud Shell に似た仕組みで、gcloud コマンドライン ツールを通じて Google Cloud API へのプログラムによるアクセスが可能なターミナルも使用できます。

Theia IDE にアクセスするには、Qwiklabs に表示されたリンクをコピーして新しいタブに貼り付けます。

注: URL が表示された後も、環境が完全にプロビジョニングされるまで 3~5 分待つ必要がある場合があります。その間はブラウザにエラーが表示されます。

ide_url

ラボリポジトリのクローンが環境に作成されました。各ラボは、完成させるコードが格納される labs フォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できる solution フォルダに分けられています。ファイル エクスプローラ ボタンをクリックして確認します。

file_explorer

Cloud Shell で行うように、この環境で複数のターミナルを作成することも可能です。

new_terminal

提供されたサービス アカウント(ラボのユーザー アカウントとまったく同じ権限がある)でログインしたターミナルで gcloud auth list を実行すれば、以下を確認できます。

gcloud_auth

環境が機能しなくなった場合は、IDE をホストしている VM を GCE コンソールから次のようにリセットしてみてください。

gce_reset

パート 1: 分岐するパイプラインの作成

ここでは、Google Cloud Storage と BigQuery の両方にデータを書き込む、分岐するパイプラインを作成します。

複数の変換が同じ PCollection を処理する

分岐するパイプラインを作成する方法の一つは、2 つの異なる変換を同じ PCollection に適用することにより、2 つの異なる PCollection を作成することです。

[PCollection1] = [最初の入力 PCollection].apply([変換]) [PCollection2] = [最初の入力 PCollection].apply([別の変換])

分岐するパイプラインを実装する

このセクションや後のセクションでヒントが必要な場合は、こちらのソリューションをご利用ください。

タスク 1: Cloud Storage への書き込みを行うブランチを追加する

このタスクを完了するには、Cloud Storage への書き込みを行うブランチを追加して既存のパイプラインを変更します。

alt_text

適切なラボを開く

IDE 環境に新しいターミナルをまだ作成していない場合は作成し、次のコマンドをコピーして貼り付けます。

# ディレクトリをラボに変更する cd 2_Branching_Pipelines/labs # 依存関係をダウンロードする mvn clean dependency:resolve export BASE_DIR=$(pwd)

データ環境を設定する

# GCS バケットと BQ データセットを作成する cd $BASE_DIR/../.. source create_batch_sinks.sh # イベント データフローを生成する source generate_batch_events.sh # 練習用コードが含まれているディレクトリに移動する cd $BASE_DIR

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 データ環境を設定する

IDE で 2_Branching_Pipelines/labs/src/main/java/com/mypackage/pipeline にある MyPipeline.java を開きます。パイプラインの本体が定義されている run() メソッドまで下にスクロールします。現在は次のような内容です。

pipeline.apply("ReadFromGCS", TextIO.read().from(input)) .apply("ParseJson", ParDo.of(new JsonToCommonLog())) .apply("WriteToBQ", BigQueryIO.<CommonLog>write().to(output).useBeamSchema() .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));

各要素が JSON から <CommonLog> に変換される前に、TextIO.write() を使用して Cloud Storage への書き込みを行う新しい分岐変換を追加することで、このコードを変更します

このセクションや後のセクションでヒントが必要な場合は、こちらにあるソリューションをご覧ください。

スキーマを使用する理由

スキーマは、プログラミング言語の特定の種類に依存しない Beam レコードの型システムを提供します。複数の Java クラスのすべてに同じスキーマが含まれている可能性があり(プロトコル バッファ クラスや POJO クラスなど)、Beam によってこれらの型をシームレスに変換できます。スキーマを使えば、さまざまなプログラミング言語 API で簡単に型を推測できます。 スキーマを含む PCollection では、Beam がスキーマ行のエンコードとデコードの方法を認識するため、コーダーを指定する必要がありません。Beam はスキーマタイプのエンコードに特別なコーダーを使用します。スキーマ API を導入する前に、Beam がパイプラインのすべてのオブジェクトをエンコードする方法を認識している必要があります。

タスク 2: フィールドでデータをフィルタする

現状ではすべてのデータが 2 回保存されるため、新しいパイプラインでもリソースの消費量は減りません。リソースの消費量を改善するには、重複するデータの量を減らす必要があります。Google Cloud Storage バケットの目的はアーカイブおよびバックアップ ストレージとして機能することなので、すべてのデータをそこに保存する必要があります。ただし、必ずしもすべてのデータを BigQuery に送信する必要はありません。

たとえば、データ アナリストが頻繁に確認する対象が、ウェブサイトでユーザーがアクセスするリソースや、地域と時間に応じたアクセス パターンの違いである場合、必要なフィールドはごく一部です。

各オブジェクトを変換して一部のフィールドのみを返す DoFn を作成することもできますが、Apache Beam にはスキーマを含む PCollection 用にさまざまなリレーショナル変換が用意されています。各レコードは名前付きフィールドで構成されているので、SQL 式での集計と同様に、フィールドを名前で参照するシンプルでわかりやすい集計が可能になります。

Select および DropFields 変換はこのうちの 2 つです。

PCollection<MyClass> pCollection = ...; pCollection.apply("SelectUserState", Select.fieldNames("state"))); PCollection<MyClass> pCollection = ...; pCollection.apply("DropPII", DropFields.fields("ssn", "state"));

注: これらの各例は、PCollection<MyClass> ではなく PCollection<Row> を返します。Row クラスはあらゆるスキーマに対応できる、汎用スキーマ化されたオブジェクトと考えることができます。スキーマを含む PCollection を行の PCollection にキャストできます。上の 2 つの変換はフィールドを削除するため、どちらも完全な CommonLog オブジェクトを返しません。その結果、Row を返す変換に戻ります。新しい名前付きスキーマを作成するか、中間の POJO スキーマを登録することもできますが、当面は Row を使用する方が簡単です。

このタスクを完了するには、次の import を追加して BigQuery に保存される一連のフィールドを変更し、いずれかの変換をパイプラインに追加することでアナリストが使用するフィールドのみが送信されるようにします。

import org.apache.beam.sdk.values.Row; import org.apache.beam.sdk.schemas.transforms.Select; import org.apache.beam.sdk.schemas.transforms.DropFields;

注: すでにメソッド チェーンで BigQueryIO.<CommonLog>write() メソッドを追加している場合は、新しいタイプなので <Row> に変更する必要があります。

タスク 3: 要素でデータをフィルタする

Apache Beam にはフィルタリングの方法が数多くあります。前のタスクでスキーマ変換を使用した方法を説明しました。この実装では、各要素の一部を除外した結果、スキーマと残りのフィールドのサブセットを含む新しい Row オブジェクトが返されました。以下の例のように、簡単にすべての要素を除外できます。

purchases.apply(Filter.<MyObject>create() .whereFieldName(“costCents”, (Long c) -> c > 100 * 20) .whereFieldName(“shippingAddress.country”, (String c) -> c.equals(“de”)); 注: この Filter 変換(org.apache.beam.sdk.schemas.transforms.Filter)とスキーマを含まない古い Filter 関数(org.apache.beam.sdk.transforms.Filter)を混同しないよう注意してください。

このタスクを完了するには、まず次の import ステートメントをコードに追加してから、Filter 変換をパイプラインに追加します。あらゆる条件でフィルタできます。lambda 関数への型ヒントの追加が必要になる場合があります(例: (Integer c) -> c > 100)。

import org.apache.beam.sdk.schemas.transforms.Filter;

タスク 4: カスタム コマンドライン パラメータを追加する

パイプラインには現在、BigQuery テーブルの入力と場所へのパスなど、多くのパラメータがハードコードされていますが、パイプラインで Cloud Storage の JSON ファイルを読み取ることができれば、さらに便利になります。この機能を追加するには、一連のコマンドライン パラメータに追加する必要があります。

現在パイプラインでは PipelineOptionsFactory を使用して Options というカスタムクラスのインスタンスが生成されていますが、このクラスは PipelineOptions クラスと何も変わらないので、実質的には PipelineOptions のインスタンスです。

public interface Options extends PipelineOptions { } public static void main(String[] args) { Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); run(options); }

PipelineOptions クラスは、次の形式のコマンドライン引数を処理します。

--<option>=<value>

ただし、ごく一部の定義済みパラメータに限られる場合があります。get- 関数はこちらで確認できます。カスタム パラメータを追加するには、2 つの手順を行います。まずは、以下の例のように状態変数を Options クラスに追加します。

public interface Options extends PipelineOptions { @Description("My custom command line argument.") @Default.String("DEFAULT") String getMyCustomOption(); void setMyCustomOption(String myCustomOption); }

2 つ目の手順として、main() メソッド内に PipelineOptionsFactory でインターフェースを登録し、PipelineOptions オブジェクトの作成時にインターフェースを渡します。PipelineOptionsFactory でインターフェースを登録する場合、--help でカスタム オプション インターフェースを検索し、--help コマンドの出力に追加できます。PipelineOptionsFactory は、カスタム オプションが他のすべての登録済みオプションと互換であることも検証します。

次のコード例は、PipelineOptionsFactory でカスタム オプション インターフェースを登録する方法を示しています。

PipelineOptionsFactory.register(Options.class); Options options = PipelineOptionsFactory.fromArgs(args) .withValidation() .as(Options.class);

コード内のコマンドライン パラメータには、パラメータの get 関数を呼び出すだけでアクセスできます。

String myCustomOption = option.getMyCustomOption();

このタスクを完了するには、まず次の import ステートメントを追加してから、入力パス、Google Cloud Storage 出力パス、BigQuery テーブル名のコマンドライン パラメータを追加し、定数ではなくこれらのパラメータにアクセスするようにパイプライン コードを更新します。

import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description;

タスク 5: パイプラインに NULLABLE フィールドを追加する

すでにお気づきかもしれませんが、前回のラボで作成した BigQuery テーブルに、すべてのフィールドが REQUIRED に設定された次のようなスキーマがありました。

BigQuery ログのスキーマ

パイプラインの実行自体とこれを反映するスキーマで構成される BigQuery テーブルの両方に対して、データがない NULLABLE フィールドを持つ Apache Beam スキーマを作成することをおすすめします。

Javax 表記をクラス定義に追加できます。これは次のように Apache Beam スキーマに組み込まれます。

@DefaultSchema(JavaFieldSchema.class) class MyClass { int field1; @javax.annotation.Nullable String field2; }

このタスクを完了するには、クラス定義で lat および lon フィールドを null 可能としてマークします。

タスク 6: コマンドラインでパイプラインを実行する

このタスクを完了するには、コマンドラインでパイプラインを実行して適切なパラメータを渡します。生成される BigQuery スキーマの NULLABLE フィールドを忘れずにメモしておいてください。コードは次のようになります。

# 環境変数を設定する export PROJECT_ID=$(gcloud config get-value project) export REGION='us-central1' export BUCKET=gs://${PROJECT_ID} export COLDLINE_BUCKET=${BUCKET}-coldline export PIPELINE_FOLDER=${BUCKET} export MAIN_CLASS_NAME=com.mypackage.pipeline.MyPipeline export RUNNER=DataflowRunner export INPUT_PATH=${PIPELINE_FOLDER}/events.json export OUTPUT_PATH=${PIPELINE_FOLDER}-coldline export TABLE_NAME=${PROJECT_ID}:logs.logs_filtered cd $BASE_DIR mvn compile exec:java \ -Dexec.mainClass=${MAIN_CLASS_NAME} \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=${PROJECT_ID} \ --region=${REGION} \ --stagingLocation=${PIPELINE_FOLDER}/staging \ --tempLocation=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --inputPath=${INPUT_PATH} \ --outputPath=${OUTPUT_PATH} \ --tableName=${TABLE_NAME}" パイプラインの構築に問題がなくても、コードや Dataflow サービスの構成ミスのために多くのエラーが発生する場合は、RUNNER の設定を「DirectRunner」に戻してローカルで実行し、迅速にフィードバックを受け取ることができます。今回のケースはデータセットが小規模で、DirectRunner がサポートしている機能のみを使用しているため、この手法が有効です。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 コマンドラインでパイプラインを実行する

タスク 7: パイプラインの結果を確認する

Cloud Dataflow の [ジョブ] ページに移動して、実行中のジョブを確認します。次のようなグラフが表示されます。

alt_text

Filter 関数を表すノード(上の図では FilterFn)をクリックします。右側に表示されたパネルで、出力として書き込まれた要素よりも入力として追加された要素が多いことを確認します。

次は Cloud Storage への書き込みを表すノードをクリックします。すべての要素が書き込まれているので、この数字は Filter 関数への入力にある要素の数と一致する必要があります。

パイプラインが終了したら、テーブルに対してクエリを実行して BigQuery の結果を確認します。テーブル内のレコード数が Filter 関数で出力された要素の数と一致する必要があります。

パート 2: カスタム Dataflow テンプレート

コマンドライン パラメータを受け入れるパイプラインは、ハードコードされたパラメータを使うパイプラインよりもはるかに便利ですが、そのようなパイプラインを実行するには、開発環境を作成する必要があります。さまざまなユーザーによる再実行や、さまざまな状況での再実行が想定されるパイプラインには、Dataflow テンプレートを使う方が適しています。

Google Cloud Platform には多数の Dataflow テンプレートがすでに作成されており、こちらで確認できます。その中にこのラボのパイプラインと同じ動作をするテンプレートはありませんが、このパートでパイプラインを(従来のカスタム テンプレートではなく)新しいカスタム Dataflow Flex テンプレートに変換できます。

パイプラインをカスタム Dataflow Flex テンプレートに変換するには、コードだけでなく依存関係もパッケージ化する Uber JAR、ビルド対象のコードを記述する Dockerfile、実際のジョブを作成するためにランタイムで実行される基盤コンテナをビルドする Cloud Build、ジョブ パラメータを記述するメタデータ ファイルを使用する必要があります。

タスク 1: カスタム Dataflow Flex テンプレートのコンテナ イメージを作成する

このタスクを完了するには、最初に次のプラグインpom.xml ファイルに追加して、Uber JAR をビルドできるようにします。まず、プロパティタグに以下を追加します。

<maven-shade-plugin.version>3.2.3</maven-shade-plugin.version>

次に、ビルド プラグイン タグに以下を追加します。

<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>${maven-shade-plugin.version}</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> </transformers> <filters> <filter> <artifact>*:*</artifact> <excludes> <exclude>META-INF/*.SF</exclude> <exclude>META-INF/*.DSA</exclude> <exclude>META-INF/*.RSA</exclude> </excludes> </filter> </filters> </configuration> </execution> </executions> </plugin>

これで、次のコマンドを使用して Uber JAR ファイルをビルドできます。

cd $BASE_DIR mvn clean package

サイズに注意してください。この Uber JAR ファイルにはすべての依存関係が含まれます。このファイルは、他のライブラリで外部依存関係のないスタンドアロン アプリケーションとして実行できます。

ls -lh target/*.jar

pom.xml と同じディレクトリに、次のテキストを含む Dockerfile というファイルを作成します。FLEX_TEMPLATE_JAVA_MAIN_CLASS には完全なクラス名、YOUR_JAR_HERE には作成した Uber JAR を設定してください。

FROM gcr.io/dataflow-templates-base/java11-template-launcher-base:latest # Dataflow Flex テンプレートに必要な Java コマンド オプションを定義します。 ENV FLEX_TEMPLATE_JAVA_MAIN_CLASS="YOUR-CLASS-HERE" ENV FLEX_TEMPLATE_JAVA_CLASSPATH="/template/pipeline.jar" # すべての依存関係を含む uber-jar としてパッケージ化します。 COPY target/YOUR-JAR-HERE.jar ${FLEX_TEMPLATE_JAVA_CLASSPATH}

次は、このコンテナをビルドしますが、ローカルでビルドするのではなく、Cloud Build を使用してビルドをオフロードします。まず、今後のビルド時間を短縮するためにキャッシュを有効にします。

gcloud config set builds/use_kaniko True

次に、実際のビルドを実行します。これで、実際のビルド内容についての指示が記述された Dockerfile を含むディレクトリ全体が 1 つにまとめられてサービスにアップロードされます。さらに、コンテナがビルドされてプロジェクトの Google Container Registry に push され、今後使用できるようになります。

export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/my-pipeline:latest" gcloud builds submit --tag $TEMPLATE_IMAGE .

Cloud Build UI でビルドのステータスをモニタリングすることもできます。また、ビルドされたコンテナが Google Container Registry にアップロードされたことも確認できます。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 カスタム Dataflow Flex テンプレートのコンテナ イメージを作成する

タスク 2: Flex テンプレートを作成してステージングする

テンプレートを実行するには、SDK 情報やメタデータなど、ジョブの実行に必要なすべての情報を含むテンプレート仕様ファイルを Cloud Storage に作成する必要があります。

このタスクを完了するには、パイプラインで予期されるすべての入力パラメータを考慮した次の形式で、metadata.json ファイルを作成します。必要な場合は、こちらでソリューションを参照してください。独自の正規表現チェックを記述する必要があります。おすすめの方法ではありませんが、".*" はあらゆる入力に一致します。

{ "name": "Your pipeline name", "description": "Your pipeline description", "parameters": [ { "name": "inputSubscription", "label": "Pub/Sub input subscription.", "helpText": "Pub/Sub subscription to read from.", "regexes": [ "[-_.a-zA-Z0-9]+" ] }, { "name": "outputTable", "label": "BigQuery output table", "helpText": "BigQuery table spec to write to, in the form 'project:dataset.table'.", "is_optional": true, "regexes": [ "[^:]+:[^.]+[.].+" ] } ] }

次に、実際のテンプレートをビルドしてステージングします。

export TEMPLATE_PATH="gs://${PROJECT_ID}/templates/mytemplate.json" # テンプレートをビルドして GCS にアップロードする # gcloud のベータ版機能のオプトインが必要な可能性があります gcloud beta dataflow flex-template build $TEMPLATE_PATH \ --image "$TEMPLATE_IMAGE" \ --sdk-language "JAVA" \ --metadata-file "metadata.json"

ファイルが Cloud Storage のテンプレート用の場所にアップロードされていることを確認します。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 Flex テンプレートを作成してステージングする

タスク 3: UI でテンプレートを実行する

このタスクを完了するには、以下の手順に沿って操作します。

  1. Google Cloud コンソールの Cloud Dataflow ページに移動します。
  2. [テンプレートからジョブを作成] をクリックします。
  3. [ジョブ名] に有効なジョブ名を入力します。
  4. [Cloud Dataflow テンプレート] プルダウン メニューから [カスタム テンプレート] を選択します。
  5. テンプレートの Cloud Storage パスの欄に、テンプレート ファイルへの Cloud Storage パスを入力します。
  6. [必須パラメータ] に適切な項目を入力します。
  7. [ジョブを実行] をクリックします。
注: ステージング バケットを指定する必要はありません。`gs://dataflow-staging-us-central1-/staging` のような、プロジェクト番号を使用した限定公開のバケットが DataFlow によってプロジェクトに作成されます。

Compute Engine コンソールを確認すると、コンテナを実行して指定のパラメータでパイプラインを開始するために一時的なランチャー VM が作成されているのがわかります。

タスク 4: gcloud を使用してテンプレートを実行する

Dataflow テンプレートを使用する利点の一つは、開発環境以外のさまざまな場面で実行できることです。それを確認するために、gcloud を使用してコマンドラインで Dataflow テンプレートを実行します。

このタスクを完了するには、以下のコマンドを、適宜パラメータを変更してターミナルで実行します。

export PROJECT_ID=$(gcloud config get-value project) export REGION='us-central1' export JOB_NAME=mytemplate-$(date +%Y%m%H%M$S) export TEMPLATE_LOC=gs://${PROJECT_ID}/templates/mytemplate.json export INPUT_PATH=gs://${PROJECT_ID}/events.json export OUTPUT_PATH=gs://${PROJECT_ID}-coldline/ export BQ_TABLE=${PROJECT_ID}:logs.logs_filtered gcloud beta dataflow flex-template run ${JOB_NAME} \ --region=$REGION \ --template-file-gcs-location ${TEMPLATE_LOC} \ --parameters "inputPath=${INPUT_PATH},outputPath=${OUTPUT_PATH},tableName=${BQ_TABLE}"

パイプラインが正常に完了することを確認します。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 UI と gcloud を使用してテンプレートを実行する

ラボを終了する

ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。

ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。

星の数は、それぞれ次の評価を表します。

  • 星 1 つ = 非常に不満
  • 星 2 つ = 不満
  • 星 3 つ = どちらともいえない
  • 星 4 つ = 満足
  • 星 5 つ = 非常に満足

フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。

フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。

Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。

始める前に

  1. ラボでは、Google Cloud プロジェクトとリソースを一定の時間利用します
  2. ラボには時間制限があり、一時停止機能はありません。ラボを終了した場合は、最初からやり直す必要があります。
  3. 画面左上の [ラボを開始] をクリックして開始します

シークレット ブラウジングを使用する

  1. ラボで使用するユーザー名パスワードをコピーします
  2. プライベート モードで [コンソールを開く] をクリックします

コンソールにログインする

    ラボの認証情報を使用して
  1. ログインします。他の認証情報を使用すると、エラーが発生したり、料金が発生したりする可能性があります。
  2. 利用規約に同意し、再設定用のリソースページをスキップします
  3. ラボを終了する場合や最初からやり直す場合を除き、[ラボを終了] はクリックしないでください。クリックすると、作業内容がクリアされ、プロジェクトが削除されます

このコンテンツは現在ご利用いただけません

利用可能になりましたら、メールでお知らせいたします

ありがとうございます。

利用可能になりましたら、メールでご連絡いたします

1 回に 1 つのラボ

既存のラボをすべて終了して、このラボを開始することを確認してください

シークレット ブラウジングを使用してラボを実行する

このラボの実行には、シークレット モードまたはシークレット ブラウジング ウィンドウを使用してください。これにより、個人アカウントと受講者アカウントの競合を防ぎ、個人アカウントに追加料金が発生することを防ぎます。