arrow_back

Dataflow を使用したサーバーレスのデータ処理 - Dataflow を使用した高度なストリーミング分析パイプライン(Java)

ログイン 参加
700 以上のラボとコースにアクセス

Dataflow を使用したサーバーレスのデータ処理 - Dataflow を使用した高度なストリーミング分析パイプライン(Java)

ラボ 1時間 30分 universal_currency_alt クレジット: 7 show_chart 上級
info このラボでは、学習をサポートする AI ツールが組み込まれている場合があります。
700 以上のラボとコースにアクセス

概要

このラボの内容:

  • 遅延データに対処する。
  • 不正な形式のデータに以下の方法で対処する。
    1. コードのモジュール性を強化するために複合変換を書き込む。
    2. 多種多様な出力を生成する変換を書き込む。
    3. 不正な形式のデータを収集して、後で調査できるよう保管場所に書き込む。

前回のラボの最後に、リアルタイム型パイプラインが対処すべきある種の課題に触れました。イベントが発生したタイミングと処理されたタイミングの間に生じる隔たりで、ラグとも呼ばれます。このラボでは、Apache Beam のコンセプトをいくつか紹介します。これを理解すると、パイプラインを作成するときに、ラグに正式に対処するにはどうすればよいかを指定できるようになります。

また、ストリーミング環境でパイプラインを使用するときに発生しうる問題は、ラグだけではありません。システムの外部から入力を受け付ける場合、入力がどこか不正な形式である可能性が常にあります。このラボでは、そうした入力に対処するための手法についても紹介します。

このラボで作成する最終的なパイプラインは、以下の画像のようなものになります。ブランチを含んでいることに注意してください。

最終的なパイプライン アーキテクチャの図

設定と要件

各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。

  1. Qwiklabs にシークレット ウィンドウでログインします。

  2. ラボのアクセス時間(例: 1:15:00)に注意し、時間内に完了できるようにしてください。
    一時停止機能はありません。必要な場合はやり直せますが、最初からになります。

  3. 準備ができたら、[ラボを開始] をクリックします。

  4. ラボの認証情報(ユーザー名パスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。

  5. [Google Console を開く] をクリックします。

  6. [別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
    他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。

  7. 利用規約に同意し、再設定用のリソースページをスキップします。

プロジェクトの権限を確認する

Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。

  1. Google Cloud コンソールのナビゲーション メニューナビゲーション メニュー アイコン)で、[IAM と管理] > [IAM] を選択します。

  2. Compute Engine のデフォルトのサービス アカウント {project-number}-compute@developer.gserviceaccount.com が存在し、編集者のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] > [ダッシュボード] から確認できます。

Compute Engine のデフォルトのサービス アカウント名と編集者のステータスがハイライト表示された [権限] タブページ

注: アカウントが IAM に存在しない場合やアカウントに編集者のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。
  1. Google Cloud コンソールのナビゲーション メニューで、[Cloud の概要] > [ダッシュボード] をクリックします。
  2. プロジェクト番号(例: 729328892908)をコピーします。
  3. ナビゲーション メニューで、[IAM と管理] > [IAM] を選択します。
  4. ロールの表の上部で、[プリンシパル別に表示] の下にある [アクセスを許可] をクリックします。
  5. [新しいプリンシパル] に次のように入力します。
{project-number}-compute@developer.gserviceaccount.com
  1. {project-number} はプロジェクト番号に置き換えてください。
  2. [ロール] で、[Project](または [基本])> [編集者] を選択します。
  3. [保存] をクリックします。

このラボでは、Google Compute Engine でホストされる Theia Web IDE を主に使用します。これには、事前にクローンが作成されたラボリポジトリが含まれます。Java 言語サーバーがサポートされているとともに、Cloud Shell に似た仕組みで、gcloud コマンドライン ツールを通じて Google Cloud API へのプログラムによるアクセスが可能なターミナルも使用できます。

  1. Theia IDE にアクセスするには、Google Cloud Skills Boost に表示されたリンクをコピーして新しいタブに貼り付けます。
注: URL が表示された後も、環境が完全にプロビジョニングされるまで 3~5 分待つ必要がある場合があります。その間はブラウザにエラーが表示されます。

ide_url が表示されている認証情報ペイン

ラボリポジトリのクローンが環境に作成されました。各ラボは、完成させるコードが格納される labs フォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できる solution フォルダに分けられています。

  1. ファイル エクスプローラ ボタンをクリックして確認します。

展開されたファイル エクスプローラ メニューで、labs フォルダがハイライト表示されている

Cloud Shell で行うように、この環境で複数のターミナルを作成することも可能です。

[ターミナル] メニューでハイライト表示されている [新しいターミナル] オプション

提供されたサービス アカウント(ラボのユーザー アカウントとまったく同じ権限がある)でログインしたターミナルで gcloud auth list を実行すれば、以下を確認できます。

gcloud auth list コマンドが表示されたターミナル

環境が機能しなくなった場合は、IDE をホストしている VM を GCE コンソールから次のようにリセットしてみてください。

[VM インスタンス] ページで、リセットボタンと VM インスタンス名の両方がハイライト表示されている

ラボパート 1遅延データに対処する

これまでのラボでは、要素をイベント時間に基づき固定幅のウィンドウに分割するコードを、以下のようなコードを使用して記述してきました。

commonLogs .apply("WindowCommonLogs", Window.into( FixedWindows.of( Duration.standardMinutes( options.getWindowDuration())))) .apply("CountEventsPerWindow", Combine.globally( Count.<CommonLog>combineFn()).withoutDefaults());

ですが、前回の非 SQL ラボの終わりにご確認いただいたように、データ ストリームにラグが起こることは珍しくありません。ウィンドウ処理に(処理時間ではなく)イベント時間を使用する場合は、ラグが問題となります。イベント時間のある特定の時点ですべてのイベントが届いているかどうか、確実にはわからないからです。

そのため、結果を出力するには、パイプライン内にこの点に関する判断を記述する必要があります。その目的で使用するのが、ウォーターマークというコンセプトです。ウォーターマークは、イベント時間のある時点までにすべてのデータがパイプラインに届いていると見なすタイミングをシステムがヒューリスティックに基づいて判断するという考え方です。ウォーターマークがウィンドウの枠を超えたら、以後、そのウィンドウ内のタイムスタンプで届いた要素は、遅延データと見なされ、ただ単にドロップされます。ウィンドウ処理は、デフォルトではすべてのデータが届いているとシステムが高い確度で判断した場合にできれば完全な形で結果を 1 つだけ出力するというように動作します。

Apache Beam では、数多くのヒューリスティックを使用して、何をウォーターマークにするかを経験則から導き出しますが、それでもヒューリスティックであることに変わりはありません。さらに重要なのは、こうしたヒューリスティックは汎用的なものであり、すべてのユースケースに適しているわけではないということです。パイプラインの設計担当者は、汎用的なヒューリスティックを使用するのではなく、以下の質問をじっくりと検討してうまくトレードオフを図る必要があります。

  • 完全性: 結果を計算する前にすべてのデータが揃っていることはどれほど重要か?
  • レイテンシ: どのくらいの時間までデータの到着を待つか?たとえば、すべてのデータが揃ったと判断できるまで待つのか、データが届くたびに処理していくのかということです。
  • 費用: レイテンシ低減のために費やしてかまわないと考える演算能力と費用はどれくらいか?

こうした疑問への答えに基づくと、Apache Beam の形式手法に従って、妥当なトレードオフを実現するコードを記述できます。

許容遅延

許容遅延を使用すると、ウィンドウの状態が維持される期間を制御できます。ウォーターマークが許容遅延の期間に達すると、すべての状態がドロップされます。すべての永続状態を期間の最後までずっと維持できればよいのですが、現実にはそうはいきません。無限データソースに対処する場合、特定のウィンドウの状態を無期限に維持するのは実用的ではありません。そんなことをすれば、ディスク容量を使い切ってしまいます。

そのため、現実世界のあらゆる順不同処理システムで、処理対象のウィンドウの存続期間を制限するなんらかの方法が必要になります。これを実現するシンプルかつ簡明な方法の一つが、システム内で許容される遅延の範囲を定義することです。つまり、ウォーターマークに基づきレコードの遅延に限度を設け、この範囲を超過して到着したデータはシステムによって処理されず、単純にドロップされるよう設定することが考えられます。各データに許容する遅延の範囲を設定すると、ウィンドウの状態を維持するべき期間も正確に定義されます(ウィンドウの終了時間に設定された遅延範囲をウォーターマークが超過するまで)。

タスク 1. 環境を準備する

以前のラボと同様に、最初のステップはパイプラインで処理するデータを生成することです。ラボ環境を開いて、以前と同じようにデータを生成します。

適切なラボを開く

  1. IDE 環境に新しいターミナルをまだ作成していない場合は作成し、次のコマンドをコピーして貼り付けます。
# ディレクトリをラボに変更する cd 7_Advanced_Streaming_Analytics/labs # 依存関係をダウンロードする mvn clean dependency:resolve export BASE_DIR=$(pwd)
  1. データ環境を設定します。
# GCS バケット、BQ データセット、Pub/Sub トピックを作成する cd $BASE_DIR/../.. source create_streaming_sinks.sh # 練習用コードが含まれているディレクトリに移動する cd $BASE_DIR

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 環境を準備する

タスク 2. 許容遅延を設定する

  1. IDE で、7_Advanced_Streaming_Analytics/labs/src/main/java/com/mypackage/pipeline にある StreamingMinuteTrafficPipeline.java を開きます。

Apache Beam では、以下の例のように withAllowedLateness() メソッドを使用して許容遅延を設定します。

PCollection<String> items = ...; PCollection<String> windowed_items = items.apply( Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))) .withAllowedLateness(Duration.standardDays(1)));
  1. このタスクの仕上げとして、ウィンドウ処理変換を調べて .withAllowedLateness() の呼び出し処理を追加し、適切なコマンドライン パラメータから構築した有効な Duration に渡します。妥当な値を決め、適切な数を反映するようコマンドラインを更新してください。

トリガー

パイプラインの設計者は、暫定的な結果を出力するタイミングを自ら決定することもできます。たとえば、ウィンドウの終了時間に設定されたウォーターマークには達していないものの、想定データの 75% がすでに到着している場合を考えてみましょう。多くの場合、このようなサンプルは全体の傾向を表し、エンドユーザーに紹介する価値があるものと見なされます。

トリガーは、処理時間中のどの時点で具体的な結果が出力されるかを決定するものです。ウィンドウの各出力は、ウィンドウのペインと呼ばれます。トリガーの条件が満たされると、トリガーによりペインが出力されます。Apache Beam では、ウォーターマークの進行状況、処理時間の進行状況(どのくらいのデータが実際に到着したかにかかわらず均一に進行)、要素数のカウント(例: 新規データの到着が特定数に達した場合)、ファイルの末尾に到達した場合などのデータ依存トリガーなどが、これらの条件に含まれます。

トリガーの条件によっては、特定のペインが何度も発生する可能性があります。そのため、出力結果を累積する方法も指定する必要があります。Apache Beam は現在、2 つの累積モードをサポートしています。一つは結果を全部まとめて累積するモードで、もう一つはペインが最後に発生してから新たに結果に含まれた部分のみを返すモードです。

タスク 3. トリガーを設定する

Window 変換を使用して PCollection のウィンドウ関数を設定するときに、トリガーも指定できます。

次のようにメソッド「.triggering()」を Window.into() 変換の結果に対して呼び出すことで、PCollection のトリガーを設定します。Window.triggering() はトリガーを引数として受け取ります。Apache Beam では、以下のようにいくつものトリガーが用意されています。

  • AfterWatermark: ウィンドウの終了時間またはペイン内の最初の要素の到着に基づいて定めたタイムスタンプをウォーターマークが通過した際に起動させます。
  • AfterProcessingTime: 一定の処理時間が経過した後で起動させます(一般的には、ペイン内の最初の要素が到着してから)。
  • AfterPane: 現在のペイン内の要素に関する特性(例: 現在のペインに割り当てられている要素の数)が成立してから起動させます。

以下のコードサンプルでは PCollection に時間ベースのトリガーを設定しています。このトリガーは、ウィンドウ内で最初の要素が処理されて 1 分後に結果を出力します。コードサンプルの最終行にある .discardingFiredPanes() は、ウィンドウの蓄積モードを設定します。

PCollection<String> pc = ...; pc.apply(Window.<String>into(FixedWindows.of(1, TimeUnit.MINUTES)) .triggering(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardMinutes(1)) .discardingFiredPanes());
  • このタスクの仕上げとして、ウィンドウ処理変換に Window.triggering() の呼び出し処理を追加し、有効な Trigger を渡します。トリガーを設計する際には、データが複数の 1 分間ウィンドウに表示され、到着の遅延が許容されるこのユースケースを忘れないようにしてください。

トリガーの例が必要な方は、こちらのソリューションをご覧ください。

ラボのパート 2不正な形式のデータに対処する

Trigger の設定方法によっては、今回のパイプラインをすぐに実行して以前のラボから取得したパイプラインと比較した際に、新しいパイプラインの方が結果表示が早いと思われるかもしれません。また、ヒューリスティックがストリーミング動作をうまく予想できず、許容される遅延の方が効果的な場合、今回のパイプラインから得た結果の方が正確なこともあり得ます。

ただ、今回のパイプラインは遅延に対して高い堅牢性を発揮しますが、不正な形式のデータに対しては依然として脆弱です。今回のパイプラインを実行してパブリッシュしたメッセージに CommonLog に解析できる適切な JSON 形式以外の文字列が含まれていた場合は、パイプラインでエラーが発生します。こうしたエラーは、Cloud Logging のようなツールを使用すると簡単に確認できますが、事前に定義した場所に保存して後で調べられるようにパイプラインをうまく設計することもできます。

このセクションでは、モジュール性と堅牢性の両方を高めるコンポーネントをパイプラインに追加します。

タスク 1. 不正な形式のデータを収集する

不正な形式のデータに対する堅牢性を強化するには、パイプラインがこの種のデータをフィルタリングして違う方法で処理できるよう分岐させる方法が必要です。パイプラインにブランチを組み込む方法の一つは、すでにご紹介しています。それは、多重変換用の入力である PCollection を 1 つ作成することです。

この分岐形態は強力です。ユースケースによっては処理が非効率になることもあります。たとえば、同じ PCollection に対して 2 種類のサブセットを作成したいとします。複数変換の方法では、サブセットごとにフィルタ変換を 1 つ作成して、その両方を元の PCollection に適用します。しかし、これでは各要素を 2 回処理することになります。

パイプラインに分岐を生成するにはもう一つ、入力の PCollection を 1 回処理する間に単一の変換で複数の出力を生成するという方法もあります。このタスクでは、多重出力を生成する変換を記述します。この出力の一方は元の入力ストリームから得た正しい形式のデータから取得した結果であり、もう一方は不正な形式の要素です。

PCollection を 1 つだけ作成しながら複数の結果を出力するために、Apache Beam は PCollectionTuple と呼ばれるクラスを使用します。PCollectionTuple は、異なる型を含む PCollection の不変タプルであり、TupleTag を「キー」とします。

2 種類の PCollection を使用して PCollectionTuple をインスタンス化する例を以下に示します。その後、これらの PCollectionPCollectionTuple.get() メソッドを使用して取得されます。

PCollection<String> pc1 = ...; PCollection<Integer> pc2 = ...; TupleTag<String> tag1 = new TupleTag<>(); TupleTag<Integer> tag2 = new TupleTag<>(); PCollectionTuple pcs = PCollectionTuple.of(tag1, pc1) .and(tag2, pc2); PCollection<Integer> pcX = pcs.get(tag1); PCollection<String> pcY = pcs.get(tag2);

PTransform のコンテキストでこのメソッドを使用するには、以下の例にあるようなコードを記述します。この例では、要素の内容に基づいて TupleTag を要素に割り当てています。

final TupleTag<String> aTag = new TupleTag<String>(){}; final TupleTag<String> bTag = new TupleTag<String>(){}; PCollectionTuple mixedCollection = input.apply(ParDo .of(new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext c) { if (c.element().startsWith("A")) { // aTag タグを付与された出力であるメイン出力に送る。 c.output(c.element()); } else if(c.element().startsWith("B")) { // bTag タグを付与された出力に送る。 c.output(bTag, c.element()); } } }) // メイン出力を指定する。 この例では、startsWithATag タグを付与された出力のこと。.withOutputTags(aTag, // bTag タグを付与された出力を TupleTagList として指定する。TupleTagList.of(bTag))); // bTag タグを付与された出力のサブセットを取得する。 mixedCollection.get(aTag).apply(...); // startsWithBTag タグを付与された出力のサブセットを取得する。 mixedCollection.get(bTag).apply(...);
  • このタスクの仕上げとして、2 つの TupleTag 定数をクラスの冒頭で宣言します。そして、PCollectionTuple を返し、未解析の要素に片方のタグを付与して解析済みの要素にもう片方のタグを付与するように、JsonToCommonLog 変換を変更します。if / then / else 型の代わりに、try / catch 文を使用します。

タスク 2. 複合変換を使用してコードのモジュール性を強化する

変換は、複雑な変換がよりシンプルな変換(複数の ParDoCombineGroupByKey、あるいは他の複合変換など)を複数実行するネスト構造にすることができます。これらの変換は複合変換と呼ばれます。1 つの複合変換内で複数の変換をネストすると、コードのモジュール性を高めて、わかりやすくすることができます。

  1. 独自の複合変換を作成する場合は、PTransform クラスのサブクラスを作成し、実際の処理ロジックを指定するように expand() メソッドをオーバーライドします。PTransform クラスの型パラメータについては、変換が入力として受け取り、出力として生成する PCollection 型を渡します。

次のコードサンプルは、文字列の PCollection を入力として受け付け、整数型の PCollection を出力する PTransform の宣言方法を示したものです。

#TODO: JsonToRow

static class MyCompositeTransform extends PTransform<PCollection<String>, PCollection<Integer>> { ... }
  1. PTransform サブクラス内では、expand() メソッドをオーバーライドする必要があります。expand() メソッドでは、PTransform の処理ロジックを追加します。expand() メソッドのオーバーライドでは、適切なタイプの入力「PCollection」をパラメータとして受け付け、出力「PCollection」を戻り値として指定する必要があります。
static class MyCompositeTransform extends PTransform<PCollection<String>, PCollection<Integer>> { @Override public PCollection<Integer> expand(PCollection<String>) { ... // 変換ロジックはここに挿入 ... } }
  1. 変換を呼び出すには、PCollection に対して PCollection.apply() を使用し、複合変換のインスタンスを渡します。
PCollection<Integer> i = stringPColl.apply("CompositeTransform", new MyCompositeTransform());
  1. このタスクの仕上げとして、先ほど変更したばかりの JsonToCommonLog 変換を複合変換に変えます。この処理によって、CommonLog のインスタンスを想定している現在の書き込み変換に問題が発生します。複合変換の結果を新規の PCollectionTuple に保存してから、.get() を使用して書き込み変換が想定している PCollection を取得します。

タスク 3. 後で分析できるように不正な形式のデータを書き込む

不正な形式のデータを生成しているアップストリームの問題を解決するためには、不正な形式のデータを分析できることが重要です。そのためには、どこかにその具体的なデータを出力する必要があります。このタスクでは、Google Cloud Storage に不正な形式のデータを書き込みます。このような方法を、デッドレター ストレージの使用と呼びます。

これまでのラボでは、TextIO.write() を使用して制限付きソース(バッチ)を Cloud Storage に直接書き込んできました。ですが、制限なしソース(ストリーミング)からの書き込みでは、この手法に少し手を加える必要があります。

まずは書き込み変換のアップストリームで、Trigger を使用して、処理時間のどのタイミングで書き込むのかを指定します。これを指定せずにデフォルトのままにしておくと、書き込みは行われません。デフォルトでは、すべてのイベントがグローバル ウィンドウに属しています。この場合、データセット全体の内容を実行時に把握できるため、一括して操作するときはデフォルトのままでかまいません。一方、無限ソースの場合、データセット全体のサイズが不明であるため、グローバル ウィンドウのペインが発生することはなく、したがって完了することもありません。

トリガーを使用しているため、ウィンドウも使用する必要があります。ただし、必ずしもウィンドウの変更が必要になるとは限りません。これまでのラボとタスクでは、ウィンドウ処理変換を使用して、グローバル ウィンドウをイベント時間内に期間が固定されたウィンドウに置き換えてきました。この場合、有用な方法かつ実用的な速度で具体的な結果を出力することが重要であり、それに比べればどの要素をグループ化するかはそれほど重要ではありません。

以下の例では、ウィンドウは処理時間 10 秒ごとにグローバル ウィンドウのペインを出力しますが、新規イベントのみを書き込みます。

pCollection.apply("FireEvery10s", Window.<String>configure() .triggering(Repeatedly.forever( AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(Duration.standardSeconds(10)))) .discardingFiredPanes())

Trigger を設定したら、書き込みを実行するように TextIO.write() の呼び出し処理を変更します。ウィンドウ処理変換のダウンストリームに書き込む際は、withWindowedWrites() の呼び出し処理をチェーンして、書き込みが並列処理されるように複数のシャードを指定します。

fixedWindowedItems.apply( "WriteWindowedPCollection", TextIO .write() .to("gs://path/to/somewhere") .withWindowedWrites() .withNumShards(NUM_SHARDS));
  • このタスクの仕上げとして、PCollectionTuple に対して .get() を使用して新規の変換を作成し、不正な形式のデータを取得します。トリガーに関する知識と判断力を働かせて、このトリガーに対して適切な起動条件を設定しましょう。

タスク 4. パイプラインを実行する

  1. パイプラインを実行するには、以下の例に類似したコマンドを作成します。この例は、記述に含めたコマンドライン オプションの名前を反映するように修正が必要です。
export PROJECT_ID=$(gcloud config get-value project) export REGION={{{ project_0.default_region | "REGION" }}} export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export MAIN_CLASS_NAME=com.mypackage.pipeline.StreamingMinuteTrafficPipeline export RUNNER=DataflowRunner export PUBSUB_TOPIC=projects/${PROJECT_ID}/topics/my_topic export WINDOW_DURATION=60 export ALLOWED_LATENESS=1 export OUTPUT_TABLE_NAME=${PROJECT_ID}:logs.minute_traffic export DEADLETTER_BUCKET=${BUCKET} cd $BASE_DIR mvn compile exec:java \ -Dexec.mainClass=${MAIN_CLASS_NAME} \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=${PROJECT_ID} \ --region=${REGION} \ --stagingLocation=${PIPELINE_FOLDER}/staging \ --tempLocation=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --inputTopic=${PUBSUB_TOPIC} \ --windowDuration=${WINDOW_DURATION} \ --allowedLateness=${ALLOWED_LATENESS} \ --outputTableName=${OUTPUT_TABLE_NAME} \ --deadletterBucket=${DEADLETTER_BUCKET}"

このクエストのコードには、JSON イベントを Pub/Sub でパブリッシュするためのスクリプトが含まれています。

  1. このタスクを完了してメッセージのパブリッシュを開始するには、現在のターミナルと並べて新しいターミナルを開き、次のスクリプトを実行します。このスクリプトは停止されるまでメッセージをパブリッシュし続けます。training-data-analyst/quests/dataflow フォルダが作業ディレクトリになっていることを確認します。
注: true フラグはストリームに遅延イベントを追加します。 bash generate_streaming_events.sh true

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。パイプラインを実行する

タスク 5. パイプラインをテストする

  1. Google Cloud コンソールのタイトルバーにある [検索] フィールドに「Pub/Sub」と入力し、[プロダクトとページ] セクションの [Pub/Sub] をクリックします。

  2. [トピック] をクリックし、トピック「my_topic」をクリックします。

  3. [メッセージ] タブ > [メッセージをパブリッシュ] ボタンの順にクリックします。

  4. その後のページで、配信するメッセージを入力します。

CommonLog JSON 仕様に対して完璧に適合しない限り、メッセージは短時間のうちにデッドレター Cloud Storage バケットに到着するはずです。パイプラインのモニタリング ウィンドウに戻り、未解析メッセージの処理を担当するブランチに存在するノードをクリックすることで、パイプラインを通過する経路を追跡できます。

  1. この分岐に要素が追加されたら、Cloud Storage に移動して、メッセージがディスクに書き込まれたことを確認できます。
export PROJECT_ID=$(gcloud config get-value project) export REGION={{{ project_0.default_region | "REGION" }}} export BUCKET=gs://${PROJECT_ID}/deadletter gsutil ls $BUCKET gsutil cat $BUCKET/*/*

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。 パイプラインをテストする

ラボを終了する

ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。

ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。

星の数は、それぞれ次の評価を表します。

  • 星 1 つ = 非常に不満
  • 星 2 つ = 不満
  • 星 3 つ = どちらともいえない
  • 星 4 つ = 満足
  • 星 5 つ = 非常に満足

フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。

フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。

Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。

始める前に

  1. ラボでは、Google Cloud プロジェクトとリソースを一定の時間利用します
  2. ラボには時間制限があり、一時停止機能はありません。ラボを終了した場合は、最初からやり直す必要があります。
  3. 画面左上の [ラボを開始] をクリックして開始します

シークレット ブラウジングを使用する

  1. ラボで使用するユーザー名パスワードをコピーします
  2. プライベート モードで [コンソールを開く] をクリックします

コンソールにログインする

    ラボの認証情報を使用して
  1. ログインします。他の認証情報を使用すると、エラーが発生したり、料金が発生したりする可能性があります。
  2. 利用規約に同意し、再設定用のリソースページをスキップします
  3. ラボを終了する場合や最初からやり直す場合を除き、[ラボを終了] はクリックしないでください。クリックすると、作業内容がクリアされ、プロジェクトが削除されます

このコンテンツは現在ご利用いただけません

利用可能になりましたら、メールでお知らせいたします

ありがとうございます。

利用可能になりましたら、メールでご連絡いたします

1 回に 1 つのラボ

既存のラボをすべて終了して、このラボを開始することを確認してください

シークレット ブラウジングを使用してラボを実行する

このラボの実行には、シークレット モードまたはシークレット ブラウジング ウィンドウを使用してください。これにより、個人アカウントと受講者アカウントの競合を防ぎ、個人アカウントに追加料金が発生することを防ぎます。