
始める前に
- ラボでは、Google Cloud プロジェクトとリソースを一定の時間利用します
- ラボには時間制限があり、一時停止機能はありません。ラボを終了した場合は、最初からやり直す必要があります。
- 画面左上の [ラボを開始] をクリックして開始します
Create Cloud Composer environment
/ 5
Create a Cloud Storage bucket
/ 5
Uploading the DAG to Cloud Storage
/ 5
ワークフローはデータ分析における一般的なテーマのひとつで、データの取り込み、変換、分析によってデータから有益な情報を見つけるために使用されます。Google Cloud には、ワークフローをホストするためのツールとして Cloud Composer が用意されています。これは、よく利用されているオープンソース ワークフロー ツールの Apache Airflow をホスト型にしたものです。
このラボでは、Google Cloud コンソールを使用して Cloud Composer 環境を作成し、Cloud Composer を使ってシンプルなワークフローを実行します。このワークフローは、データファイルが存在することを確認し、Cloud Dataproc クラスタを作成して Apache Hadoop ワードカウント ジョブを実行した後、Cloud Dataproc クラスタを削除します。
Google Cloud コンソールを使用して Cloud Composer 環境を作成する
Airflow ウェブ インターフェースで DAG(有向非巡回グラフ)を表示して実行する
保存されたワードカウント ジョブの結果を表示する
各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。
Qwiklabs にシークレット ウィンドウでログインします。
ラボのアクセス時間(例: 1:15:00
)に注意し、時間内に完了できるようにしてください。
一時停止機能はありません。必要な場合はやり直せますが、最初からになります。
準備ができたら、[ラボを開始] をクリックします。
ラボの認証情報(ユーザー名とパスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。
[Google Console を開く] をクリックします。
[別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。
利用規約に同意し、再設定用のリソースページをスキップします。
Google Cloud Shell は、開発ツールと一緒に読み込まれる仮想マシンです。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働します。
Google Cloud Shell を使用すると、コマンドラインで Google Cloud リソースにアクセスできます。
Google Cloud コンソールで、右上のツールバーにある [Cloud Shell をアクティブにする] ボタンをクリックします。
[続行] をクリックします。
環境がプロビジョニングされ、接続されるまでしばらく待ちます。接続した時点で認証が完了しており、プロジェクトに各自のプロジェクト ID が設定されます。次に例を示します。
gcloud は Google Cloud のコマンドライン ツールです。このツールは、Cloud Shell にプリインストールされており、タブ補完がサポートされています。
出力:
出力例:
出力:
出力例:
Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。
Google Cloud コンソールのナビゲーション メニュー()で、[IAM と管理] > [IAM] を選択します。
Compute Engine のデフォルトのサービス アカウント {project-number}-compute@developer.gserviceaccount.com
が存在し、編集者
のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] > [ダッシュボード] から確認できます。
編集者
のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。729328892908
)をコピーします。{project-number}
はプロジェクト番号に置き換えてください。Google Cloud コンソールのタイトルバーで、[Cloud Shell をアクティブにする] をクリックします。プロンプトが表示されたら、[続行] をクリックします。
以下のコマンドを実行して、Compute デベロッパー サービス アカウントに Composer ワーカー ロールを割り当てます。
このセクションでは、Cloud Composer 環境を作成します。
Google Cloud コンソールのタイトルバーにある検索フィールドに「Composer」と入力し、次に [プロダクトとページ] セクションの [Composer] をクリックします。
[環境を作成] をクリックして [Composer 3] を選択します。環境を以下のように設定します。
プロパティ | 値 |
---|---|
名前 | highcpu |
ロケーション | |
イメージのバージョン | composer-3-airflow-n.n.n-build.n(注: 利用可能なイメージの中で最も大きい番号のイメージを選択) |
その他の設定はすべてデフォルトのままにします。
コンソールの [環境] ページで、環境の名前の左側に緑色のチェックマークが表示されていれば、環境作成プロセスは完了しています。
設定プロセスが完了するまでに 15~30 分かかることがあります。その間にラボの作業を進めてください。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
プロジェクトの Cloud Storage バケットを作成します。このバケットは、Dataproc の Hadoop ジョブの出力に使用されます。
ナビゲーション メニュー > [Cloud Storage] > [バケット] に移動し、[+ 作成] をクリックします。
バケットにユニバーサルに一意な名前(プロジェクト ID 公開アクセスの防止
」というメッセージが表示されたら、[確認] をクリックします。
この Cloud Storage バケット名は後ほど Airflow 変数として使用するため、覚えておいてください。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
Composer 環境が作成されるまでの間に、Airflow で使用される用語を確認しましょう。
Airflow とは、ワークフローの作成、スケジューリング、モニタリングをプログラマティックに行うためのプラットフォームです。
Airflow を使用して、ワークフローをタスクの有向非循環グラフ(DAG)として作成します。Airflow スケジューラは、指定された依存関係に従って一連のワーカーでタスクを実行します。
有向非巡回グラフとは、実行するすべてのタスクの集まりであり、それらの関係や依存状態を反映するように編成されます。
単一のタスクを記述したもので、通常はアトミックなタスクです。たとえば、BashOperator は bash コマンドの実行に使用されます。
オペレーターのパラメータ化されたインスタンスであり、DAG 内のノードです。
特定のタスクの実行です。DAG、タスク、ある時点を表し、実行中、成功、失敗、スキップなどのステータスを示します。
コンセプトの詳細については、Concepts のドキュメントをご覧ください。
これから使用するワークフローについて説明します。Cloud Composer ワークフローは DAG(Directed Acyclic Graph、有向非巡回グラフ)で構成されます。DAG の定義には標準 Python ファイルを使用し、これらのファイルは Airflow の DAG_FOLDER
に配置されます。Airflow では各ファイル内のコードを実行して動的に DAG
オブジェクトをビルドします。任意の数のタスクを記述した DAG を必要なだけ作成できます。一般に、DAG と論理ワークフローは 1 対 1 で対応している必要があります。
以下に、hadoop_tutorial.py
ワークフロー コード(DAG ともいいます)を示します。
3 つのワークフロー タスクをオーケストレーションするために、DAG は次のオペレーターをインポートします。
DataprocClusterCreateOperator
: Cloud Dataproc クラスタを作成します。DataProcHadoopOperator
: Hadoop ワードカウント ジョブを送信し、結果を Cloud Storage バケットに書き込みます。DataprocClusterDeleteOperator
: クラスタを削除して、Compute Engine の利用料金が発生しないようにします。タスクは順番に実行されます。ファイルの次の部分で確認できます。
この DAG の名前は composer_hadoop_tutorial で、1 日 1 回実行されます。
default_dag_args
に渡される start_date
が yesterday
に設定されているので、Cloud Composer ではワークフローの開始が DAG のアップロード直後に設定されます。
Composer に戻り、環境のステータスを確認します。
環境が作成されたら、環境の名前(highcpu)をクリックして詳細を確認します。
[環境の構成] タブで、Airflow ウェブ UI の URL、GKE クラスタ、バケットに保存されている DAG フォルダへのリンクなどの情報を確認できます。
/dags
フォルダ内のワークフローのみです。コンソールで Airflow ウェブ インターフェースにアクセスするには:
Airflow 変数は Airflow 固有のコンセプトであり、環境変数とは異なります。
Airflow インターフェースのメニューバーで [Admin] > [Variables] を選択します。
+ アイコンをクリックして新しいレコードを追加します。
Key(キー) | Val(値) | Details(詳細) |
---|---|---|
gcp_project |
このラボで使用している Google Cloud Platform プロジェクト。 | |
gcs_bucket |
gs://<my-bucket> | <my-bucket> を、すでに作成済みの Cloud Storage バケットの名前に置き換えます。このバケットに Dataproc の Hadoop ジョブの出力が保存されます。 |
gce_zone |
Cloud Dataproc クラスタを作成する Compute Engine ゾーン。 | |
gce_region |
Cloud Dataproc クラスタを作成する Compute Engine リージョン。 |
[Save] をクリックします。1 つ目の変数を追加したら、2 つ目と 3 つ目の変数についても同じ手順を繰り返します。完了すると、Variables のテーブルは次のようになります。
DAG をアップロードするには:
Cloud Shell で次のコマンドを実行し、hadoop_tutorial.py
ファイルのコピーを環境の作成時に自動的に作成された Cloud Storage バケットにアップロードします。
次のコマンドの <DAGs_folder_path>
を DAG フォルダへのパスに置き換えます。
DAGs folder
] を探し、パスをコピーします。変更後、ファイルをアップロードするコマンドは次のようになります。
dags
フォルダを開き、バケットの詳細の [Objects] タブを開きます。DAG ファイルが DAG フォルダに追加されると、Cloud Composer によって DAG が Airflow に追加され、自動的にスケジュールされます。DAG の変更は 3~5 分以内に行われます。
composer_hadoop_tutorial
の DAG タスクのステータスは、Airflow ウェブ インターフェースで確認できます。
注: インターフェースに「The scheduler does not appear to be running...」などのメッセージが表示されても無視してかまいません。 Airflow ウェブ インターフェースは、DAG の進行状況に応じて更新されます。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
DAG ファイルを Cloud Storage の dags
フォルダにアップロードすると、ファイルが Cloud Composer によって解析されます。エラーが見つからなければ、このワークフローの名前が DAG のリストに表示され、即時実行されるようキューに登録されます。
Airflow ウェブ インターフェースの [DAGs] タブが表示されていることを確認してください。このプロセスが完了するまで数分かかります。ブラウザの画面を更新して、最新情報が表示されるか確認します。
Airflow で [composer_hadoop_tutorial] をクリックして DAG の詳細ページを開きます。このページでは、ワークフローのタスクと依存関係が示されています。
ツールバーで [Graph] をクリックします。各タスクのグラフィックにカーソルを合わせると、そのタスクのステータスが表示されます。各タスクを囲む線の色もステータスを表しています(緑は実行中、赤は失敗など)。
[Refresh] をクリックして最新情報を表示すると、プロセスを囲む線の色がステータスに応じて変化するのを確認できます。
create_dataproc_cluster のステータスが「Running」に変わったら、ナビゲーション メニュー > [Dataproc] に移動し、次の操作を行います。
Dataproc のステータスが「Running」になったら Airflow に戻って [Refresh] をクリックすると、クラスタが完成したことを確認できます。
run_dataproc_hadoop
プロセスが完了したら、ナビゲーション メニュー > [Cloud Storage] > [バケット] に移動し、バケット名をクリックして wordcount
フォルダでワードカウントの結果を確認できます。
Cloud Composer ワークフローを実行しました。
ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。
ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。
星の数は、それぞれ次の評価を表します。
フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。
フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。
Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。
このコンテンツは現在ご利用いただけません
利用可能になりましたら、メールでお知らせいたします
ありがとうございます。
利用可能になりましたら、メールでご連絡いたします
1 回に 1 つのラボ
既存のラボをすべて終了して、このラボを開始することを確認してください