
始める前に
- ラボでは、Google Cloud プロジェクトとリソースを一定の時間利用します
- ラボには時間制限があり、一時停止機能はありません。ラボを終了した場合は、最初からやり直す必要があります。
- 画面左上の [ラボを開始] をクリックして開始します
Prepare Environment
/ 15
Run your pipeline
/ 15
Test your pipeline
/ 15
このラボの内容:
前回のラボの最後に、リアルタイム型パイプラインが対処すべきある種の課題に触れました。イベントが発生したタイミングと処理されたタイミングの間に生じる隔たりで、ラグとも呼ばれます。このラボでは、Apache Beam のコンセプトをいくつか紹介します。これを理解すると、パイプラインを作成するときに、ラグに正式に対処するにはどうすればよいかを指定できるようになります。
また、ストリーミング環境でパイプラインを使用するときに発生しうる問題は、ラグだけではありません。システムの外部から入力を受け付ける場合、入力がどこか不正な形式である可能性が常にあります。このラボでは、そうした入力に対処するための手法についても紹介します。
このラボで作成する最終的なパイプラインは、以下の画像のようなものになります。ブランチを含んでいることに注意してください。
各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。
Qwiklabs にシークレット ウィンドウでログインします。
ラボのアクセス時間(例: 1:15:00
)に注意し、時間内に完了できるようにしてください。
一時停止機能はありません。必要な場合はやり直せますが、最初からになります。
準備ができたら、[ラボを開始] をクリックします。
ラボの認証情報(ユーザー名とパスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。
[Google Console を開く] をクリックします。
[別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。
利用規約に同意し、再設定用のリソースページをスキップします。
Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。
Google Cloud コンソールのナビゲーション メニュー()で、[IAM と管理] > [IAM] を選択します。
Compute Engine のデフォルトのサービス アカウント {project-number}-compute@developer.gserviceaccount.com
が存在し、編集者
のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] > [ダッシュボード] から確認できます。
編集者
のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。729328892908
)をコピーします。{project-number}
はプロジェクト番号に置き換えてください。このラボでは、Google Compute Engine でホストされる Theia Web IDE を主に使用します。これには、事前にクローンが作成されたラボリポジトリが含まれます。Java 言語サーバーがサポートされているとともに、Cloud Shell に似た仕組みで、gcloud
コマンドライン ツールを通じて Google Cloud API へのプログラムによるアクセスが可能なターミナルも使用できます。
ラボリポジトリのクローンが環境に作成されました。各ラボは、完成させるコードが格納される labs
フォルダと、ヒントが必要な場合に完全に機能するサンプルを参照できる solution
フォルダに分けられています。
ファイル エクスプローラ
ボタンをクリックして確認します。Cloud Shell で行うように、この環境で複数のターミナルを作成することも可能です。
提供されたサービス アカウント(ラボのユーザー アカウントとまったく同じ権限がある)でログインしたターミナルで gcloud auth list
を実行すれば、以下を確認できます。
環境が機能しなくなった場合は、IDE をホストしている VM を GCE コンソールから次のようにリセットしてみてください。
これまでのラボでは、要素をイベント時間に基づき固定幅のウィンドウに分割するコードを、以下のようなコードを使用して記述してきました。
ですが、前回の非 SQL ラボの終わりにご確認いただいたように、データ ストリームにラグが起こることは珍しくありません。ウィンドウ処理に(処理時間ではなく)イベント時間を使用する場合は、ラグが問題となります。イベント時間のある特定の時点ですべてのイベントが届いているかどうか、確実にはわからないからです。
そのため、結果を出力するには、パイプライン内にこの点に関する判断を記述する必要があります。その目的で使用するのが、ウォーターマークというコンセプトです。ウォーターマークは、イベント時間のある時点までにすべてのデータがパイプラインに届いていると見なすタイミングをシステムがヒューリスティックに基づいて判断するという考え方です。ウォーターマークがウィンドウの枠を超えたら、以後、そのウィンドウ内のタイムスタンプで届いた要素は、遅延データと見なされ、ただ単にドロップされます。ウィンドウ処理は、デフォルトではすべてのデータが届いているとシステムが高い確度で判断した場合にできれば完全な形で結果を 1 つだけ出力するというように動作します。
Apache Beam では、数多くのヒューリスティックを使用して、何をウォーターマークにするかを経験則から導き出しますが、それでもヒューリスティックであることに変わりはありません。さらに重要なのは、こうしたヒューリスティックは汎用的なものであり、すべてのユースケースに適しているわけではないということです。パイプラインの設計担当者は、汎用的なヒューリスティックを使用するのではなく、以下の質問をじっくりと検討してうまくトレードオフを図る必要があります。
こうした疑問への答えに基づくと、Apache Beam の形式手法に従って、妥当なトレードオフを実現するコードを記述できます。
許容遅延を使用すると、ウィンドウの状態が維持される期間を制御できます。ウォーターマークが許容遅延の期間に達すると、すべての状態がドロップされます。すべての永続状態を期間の最後までずっと維持できればよいのですが、現実にはそうはいきません。無限データソースに対処する場合、特定のウィンドウの状態を無期限に維持するのは実用的ではありません。そんなことをすれば、ディスク容量を使い切ってしまいます。
そのため、現実世界のあらゆる順不同処理システムで、処理対象のウィンドウの存続期間を制限するなんらかの方法が必要になります。これを実現するシンプルかつ簡明な方法の一つが、システム内で許容される遅延の範囲を定義することです。つまり、ウォーターマークに基づきレコードの遅延に限度を設け、この範囲を超過して到着したデータはシステムによって処理されず、単純にドロップされるよう設定することが考えられます。各データに許容する遅延の範囲を設定すると、ウィンドウの状態を維持するべき期間も正確に定義されます(ウィンドウの終了時間に設定された遅延範囲をウォーターマークが超過するまで)。
以前のラボと同様に、最初のステップはパイプラインで処理するデータを生成することです。ラボ環境を開いて、以前と同じようにデータを生成します。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
7_Advanced_Streaming_Analytics/labs/src/main/java/com/mypackage/pipeline
にある StreamingMinuteTrafficPipeline.java
を開きます。Apache Beam では、以下の例のように withAllowedLateness() メソッドを使用して許容遅延を設定します。
.withAllowedLateness()
の呼び出し処理を追加し、適切なコマンドライン パラメータから構築した有効な Duration
に渡します。妥当な値を決め、適切な数を反映するようコマンドラインを更新してください。パイプラインの設計者は、暫定的な結果を出力するタイミングを自ら決定することもできます。たとえば、ウィンドウの終了時間に設定されたウォーターマークには達していないものの、想定データの 75% がすでに到着している場合を考えてみましょう。多くの場合、このようなサンプルは全体の傾向を表し、エンドユーザーに紹介する価値があるものと見なされます。
トリガー
は、処理時間中のどの時点で具体的な結果が出力されるかを決定するものです。ウィンドウの各出力は、ウィンドウのペインと呼ばれます。トリガーの条件が満たされると、トリガーによりペインが出力されます。Apache Beam では、ウォーターマークの進行状況、処理時間の進行状況(どのくらいのデータが実際に到着したかにかかわらず均一に進行)、要素数のカウント(例: 新規データの到着が特定数に達した場合)、ファイルの末尾に到達した場合などのデータ依存トリガーなどが、これらの条件に含まれます。
トリガーの条件によっては、特定のペインが何度も発生する可能性があります。そのため、出力結果を累積する方法も指定する必要があります。Apache Beam は現在、2 つの累積モードをサポートしています。一つは結果を全部まとめて累積するモードで、もう一つはペインが最後に発生してから新たに結果に含まれた部分のみを返すモードです。
Window
変換を使用して PCollection
のウィンドウ関数を設定するときに、トリガーも指定できます。
次のようにメソッド「.triggering()
」を Window.into()
変換の結果に対して呼び出すことで、PCollection
のトリガーを設定します。Window.triggering() はトリガーを引数として受け取ります。Apache Beam では、以下のようにいくつものトリガーが用意されています。
以下のコードサンプルでは PCollection
に時間ベースのトリガーを設定しています。このトリガーは、ウィンドウ内で最初の要素が処理されて 1 分後に結果を出力します。コードサンプルの最終行にある .discardingFiredPanes()
は、ウィンドウの蓄積モードを設定します。
Window.triggering()
の呼び出し処理を追加し、有効な Trigger
を渡します。トリガーを設計する際には、データが複数の 1 分間ウィンドウに表示され、到着の遅延が許容されるこのユースケースを忘れないようにしてください。トリガーの例が必要な方は、こちらのソリューションをご覧ください。
Trigger
の設定方法によっては、今回のパイプラインをすぐに実行して以前のラボから取得したパイプラインと比較した際に、新しいパイプラインの方が結果表示が早いと思われるかもしれません。また、ヒューリスティックがストリーミング動作をうまく予想できず、許容される遅延の方が効果的な場合、今回のパイプラインから得た結果の方が正確なこともあり得ます。
ただ、今回のパイプラインは遅延に対して高い堅牢性を発揮しますが、不正な形式のデータに対しては依然として脆弱です。今回のパイプラインを実行してパブリッシュしたメッセージに CommonLog
に解析できる適切な JSON 形式以外の文字列が含まれていた場合は、パイプラインでエラーが発生します。こうしたエラーは、Cloud Logging のようなツールを使用すると簡単に確認できますが、事前に定義した場所に保存して後で調べられるようにパイプラインをうまく設計することもできます。
このセクションでは、モジュール性と堅牢性の両方を高めるコンポーネントをパイプラインに追加します。
不正な形式のデータに対する堅牢性を強化するには、パイプラインがこの種のデータをフィルタリングして違う方法で処理できるよう分岐させる方法が必要です。パイプラインにブランチを組み込む方法の一つは、すでにご紹介しています。それは、多重変換用の入力である PCollection
を 1 つ作成することです。
この分岐形態は強力です。ユースケースによっては処理が非効率になることもあります。たとえば、同じ PCollection
に対して 2 種類のサブセットを作成したいとします。複数変換の方法では、サブセットごとにフィルタ変換を 1 つ作成して、その両方を元の PCollection
に適用します。しかし、これでは各要素を 2 回処理することになります。
パイプラインに分岐を生成するにはもう一つ、入力の PCollection
を 1 回処理する間に単一の変換で複数の出力を生成するという方法もあります。このタスクでは、多重出力を生成する変換を記述します。この出力の一方は元の入力ストリームから得た正しい形式のデータから取得した結果であり、もう一方は不正な形式の要素です。
PCollection
を 1 つだけ作成しながら複数の結果を出力するために、Apache Beam は PCollectionTuple
と呼ばれるクラスを使用します。PCollectionTuple は、異なる型を含む PCollection
の不変タプルであり、TupleTag
を「キー」とします。
2 種類の PCollection を使用して PCollectionTuple
をインスタンス化する例を以下に示します。その後、これらの PCollection
は PCollectionTuple.get()
メソッドを使用して取得されます。
PTransform
のコンテキストでこのメソッドを使用するには、以下の例にあるようなコードを記述します。この例では、要素の内容に基づいて TupleTag
を要素に割り当てています。
TupleTag
定数をクラスの冒頭で宣言します。そして、PCollectionTuple
を返し、未解析の要素に片方のタグを付与して解析済みの要素にもう片方のタグを付与するように、JsonToCommonLog
変換を変更します。if / then / else
型の代わりに、try / catch
文を使用します。変換は、複雑な変換がよりシンプルな変換(複数の ParDo
、Combine
、GroupByKey
、あるいは他の複合変換など)を複数実行するネスト構造にすることができます。これらの変換は複合変換と呼ばれます。1 つの複合変換内で複数の変換をネストすると、コードのモジュール性を高めて、わかりやすくすることができます。
PTransform
クラスのサブクラスを作成し、実際の処理ロジックを指定するように expand() メソッドをオーバーライドします。PTransform
クラスの型パラメータについては、変換が入力として受け取り、出力として生成する PCollection
型を渡します。次のコードサンプルは、文字列の PCollection
を入力として受け付け、整数型の PCollection
を出力する PTransform
の宣言方法を示したものです。
#TODO: JsonToRow
PTransform
サブクラス内では、expand() メソッドをオーバーライドする必要があります。expand() メソッドでは、PTransform
の処理ロジックを追加します。expand() メソッドのオーバーライドでは、適切なタイプの入力「PCollection
」をパラメータとして受け付け、出力「PCollection
」を戻り値として指定する必要があります。PCollection
に対して PCollection.apply()
を使用し、複合変換のインスタンスを渡します。JsonToCommonLog
変換を複合変換に変えます。この処理によって、CommonLog
のインスタンスを想定している現在の書き込み変換に問題が発生します。複合変換の結果を新規の PCollectionTuple
に保存してから、.get()
を使用して書き込み変換が想定している PCollection
を取得します。不正な形式のデータを生成しているアップストリームの問題を解決するためには、不正な形式のデータを分析できることが重要です。そのためには、どこかにその具体的なデータを出力する必要があります。このタスクでは、Google Cloud Storage に不正な形式のデータを書き込みます。このような方法を、デッドレター ストレージの使用と呼びます。
これまでのラボでは、TextIO.write()
を使用して制限付きソース(バッチ)を Cloud Storage に直接書き込んできました。ですが、制限なしソース(ストリーミング)からの書き込みでは、この手法に少し手を加える必要があります。
まずは書き込み変換のアップストリームで、Trigger
を使用して、処理時間のどのタイミングで書き込むのかを指定します。これを指定せずにデフォルトのままにしておくと、書き込みは行われません。デフォルトでは、すべてのイベントがグローバル ウィンドウに属しています。この場合、データセット全体の内容を実行時に把握できるため、一括して操作するときはデフォルトのままでかまいません。一方、無限ソースの場合、データセット全体のサイズが不明であるため、グローバル ウィンドウのペインが発生することはなく、したがって完了することもありません。
トリガー
を使用しているため、ウィンドウ
も使用する必要があります。ただし、必ずしもウィンドウの変更が必要になるとは限りません。これまでのラボとタスクでは、ウィンドウ処理変換を使用して、グローバル ウィンドウをイベント時間内に期間が固定されたウィンドウに置き換えてきました。この場合、有用な方法かつ実用的な速度で具体的な結果を出力することが重要であり、それに比べればどの要素をグループ化するかはそれほど重要ではありません。
以下の例では、ウィンドウは処理時間 10 秒ごとにグローバル ウィンドウのペインを出力しますが、新規イベントのみを書き込みます。
Trigger
を設定したら、書き込みを実行するように TextIO.write()
の呼び出し処理を変更します。ウィンドウ処理変換のダウンストリームに書き込む際は、withWindowedWrites()
の呼び出し処理をチェーンして、書き込みが並列処理されるように複数のシャードを指定します。
PCollectionTuple
に対して .get()
を使用して新規の変換を作成し、不正な形式のデータを取得します。トリガーに関する知識と判断力を働かせて、このトリガーに対して適切な起動条件を設定しましょう。このクエストのコードには、JSON イベントを Pub/Sub でパブリッシュするためのスクリプトが含まれています。
training-data-analyst/quests/dataflow
フォルダが作業ディレクトリになっていることを確認します。true
フラグはストリームに遅延イベントを追加します。[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
Google Cloud コンソールのタイトルバーにある [検索] フィールドに「Pub/Sub」と入力し、[プロダクトとページ] セクションの [Pub/Sub] をクリックします。
[トピック] をクリックし、トピック「my_topic
」をクリックします。
[メッセージ] タブ > [メッセージをパブリッシュ] ボタンの順にクリックします。
その後のページで、配信するメッセージを入力します。
CommonLog
JSON 仕様に対して完璧に適合しない限り、メッセージは短時間のうちにデッドレター Cloud Storage バケットに到着するはずです。パイプラインのモニタリング ウィンドウに戻り、未解析メッセージの処理を担当するブランチに存在するノードをクリックすることで、パイプラインを通過する経路を追跡できます。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。
ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。
星の数は、それぞれ次の評価を表します。
フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。
フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。
Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。
このコンテンツは現在ご利用いただけません
利用可能になりましたら、メールでお知らせいたします
ありがとうございます。
利用可能になりましたら、メールでご連絡いたします
1 回に 1 つのラボ
既存のラボをすべて終了して、このラボを開始することを確認してください