
始める前に
- ラボでは、Google Cloud プロジェクトとリソースを一定の時間利用します
- ラボには時間制限があり、一時停止機能はありません。ラボを終了した場合は、最初からやり直す必要があります。
- 画面左上の [ラボを開始] をクリックして開始します
Create Cloud Composer environment
/ 5
Create a Cloud Storage bucket
/ 5
Uploading the DAG to Cloud Storage
/ 5
ワークフローはデータ分析における一般的なテーマのひとつで、データの取り込み、変換、分析によってデータから有益な情報を見つけるために使用されます。Google Cloud Platform(GCP)には、ワークフローをホストするためのツールとして Cloud Composer が用意されています。これは、よく利用されているオープンソース ワークフロー ツールの Apache Airflow をホスト型にしたものです。
このラボでは、Google Cloud コンソールを使用して Cloud Composer 環境を作成し、Cloud Composer を使ってシンプルなワークフローを実行します。このワークフローは、データファイルが存在することを確認し、Cloud Dataproc クラスタを作成して Apache Hadoop ワードカウント ジョブを実行した後、Cloud Dataproc クラスタを削除します。
Google Cloud コンソールを使用して Cloud Composer 環境を作成する
Airflow ウェブ インターフェースで DAG(有向非巡回グラフ)を表示して実行する
保存されたワードカウント ジョブの結果を表示する
各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。
Qwiklabs にシークレット ウィンドウでログインします。
ラボのアクセス時間(例: 1:15:00
)に注意し、時間内に完了できるようにしてください。
一時停止機能はありません。必要な場合はやり直せますが、最初からになります。
準備ができたら、[ラボを開始] をクリックします。
ラボの認証情報(ユーザー名とパスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。
[Google Console を開く] をクリックします。
[別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。
利用規約に同意し、再設定用のリソースページをスキップします。
Cloud Shell は、開発ツールが組み込まれた仮想マシンです。5 GB の永続ホーム ディレクトリを提供し、Google Cloud 上で実行されます。Cloud Shell を使用すると、コマンドラインで Google Cloud リソースにアクセスできます。gcloud
は Google Cloud のコマンドライン ツールで、Cloud Shell にプリインストールされており、Tab キーによる入力補完がサポートされています。
Google Cloud Console のナビゲーション パネルで、「Cloud Shell をアクティブにする」アイコン()をクリックします。
[次へ] をクリックします。
環境がプロビジョニングされ、接続されるまでしばらく待ちます。接続の際に認証も行われ、プロジェクトは現在のプロジェクト ID に設定されます。次に例を示します。
有効なアカウント名前を一覧表示する:
(出力)
(出力例)
プロジェクト ID を一覧表示する:
(出力)
(出力例)
Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。
Google Cloud コンソールのナビゲーション メニュー()で、[IAM と管理] > [IAM] をクリックします。
Compute Engine のデフォルトのサービス アカウント {project-number}-compute@developer.gserviceaccount.com
が存在し、編集者
のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] から確認できます。
アカウントが IAM に存在しない場合やアカウントに編集者
のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。
Google Cloud コンソールのナビゲーション メニューで、[Cloud の概要] をクリックします。
[プロジェクト情報] カードからプロジェクト番号をコピーします。
ナビゲーション メニューで、[IAM と管理] > [IAM] をクリックします。
IAM ページの上部にある [追加] をクリックします。
新しいプリンシパルの場合は、次のように入力します。
{project-number}
はプロジェクト番号に置き換えてください。
[ロールを選択] で、[基本](または [Project])> [編集者] を選択します。
[保存] をクリックします。
必要な API にアクセスできることを確認するには、Kubernetes Engine API への接続をリセットします。
Google Cloud コンソールの [プロジェクト情報] の中にプロジェクト番号が表示されているので、それを記録します。
コンソールの上部の検索バーに「Kubernetes Engine API」と入力します。検索結果の「Kubernetes Engine API」をクリックします。
[管理] をクリックします。
[API を無効にする] をクリックします。
確認を求められたら、[無効にする] をクリックします。
[Kubernetes Engine API とその依存 API を無効にしますか?
] と表示されたら、[無効にする] をクリックします。
[有効にする] をクリックします。
API が再度有効になると、ページに無効にするオプションが表示されます。
Cloud Composer API への接続をリセットします。前のステップで Kubernetes Engine API を再起動したので、Cloud Composer API が無効になっています。
Google Cloud コンソールの上部の検索バーに「Cloud Composer API」と入力します。検索結果の [Cloud Composer API] をクリックします。
[有効にする] をクリックします。
API が再度有効になると、ページに無効にするオプションが表示されます。
このセクションでは、Cloud Composer 環境を作成します。
Google Cloud コンソールのタイトルバーにある [検索] フィールドに「cloud composer」と入力し、[Composer] をクリックします。
[環境の作成] をクリックして [Composer 3] を選択します。
環境を以下のように設定します。
プロパティ | 値 |
---|---|
名前 | highcpu |
ロケーション | |
イメージのバージョン | composer-3-airflow-n.n.n-build.n(注: 利用可能なイメージの中で最も大きい番号のイメージを選択) |
その他の設定はすべてデフォルトのままにします。
[環境リソース] で [Small] を選択します。
[作成] をクリックします。
Google Cloud コンソールの [環境] ページで環境の名前の左側に緑色のチェックマークが表示されれば、環境作成プロセスは完了しています。
設定プロセスが完了するまで 10~20 分かかることがあります。その間にラボの作業を進めてください。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
プロジェクトの Cloud Storage バケットを作成します。このバケットは、Dataproc の Hadoop ジョブの出力に使用されます。
ナビゲーション メニュー > [Cloud Storage] > [バケット] に移動し、[+ 作成] をクリックします。
バケットにユニバーサルに一意な名前を付けてから、[作成] をクリックします。「公開アクセスの防止
」というメッセージが表示されたら、[確認] をクリックします。
この Cloud Storage バケット名は後ほど Airflow 変数として使用するため、覚えておいてください。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
Composer 環境が作成されるまでの間に、Airflow で使用される用語を確認しましょう。
Airflow とは、ワークフローの作成、スケジューリング、モニタリングをプログラマティックに行うためのプラットフォームです。
Airflow を使用して、ワークフローをタスクの有向非循環グラフ(DAG)として作成します。Airflow スケジューラは、指定された依存関係に従って一連のワーカーでタスクを実行します。
DAG: 有向非巡回グラフとは、実行するすべてのタスクの集まりであり、それらの関係や依存状態を反映するように編成されます。
オペレーター: 単一のタスクを記述したもので、通常はアトミックなタスクです。たとえば、BashOperator は bash コマンドの実行に使用されます。
タスク: オペレーターのパラメータ化されたインスタンスであり、DAG 内のノードです。
タスク インスタンス: タスクの特定の実行です。DAG、タスク、特定の時点を表し、実行中、成功、失敗、スキップなどのステータスを示します。
コンセプトの詳細については、コンセプトのドキュメントをご覧ください。
これから使用するワークフローについて説明します。Cloud Composer ワークフローは DAG(Directed Acyclic Graph、有向非巡回グラフ)で構成されます。DAG の定義には標準 Python ファイルを使用し、これらのファイルは Airflow の DAG_FOLDER
に配置されます。Airflow では各ファイル内のコードを実行して動的に DAG
オブジェクトをビルドします。任意の数のタスクを記述した DAG を必要なだけ作成できます。一般に、DAG と論理ワークフローは 1 対 1 で対応している必要があります。
以下に、hadoop_tutorial.py
ワークフロー コード(DAG ともいいます)を示します。
3 つのワークフロー タスクをオーケストレーションするために、DAG は次のオペレーターをインポートします。
DataprocClusterCreateOperator
: Cloud Dataproc クラスタを作成します。DataProcHadoopOperator
: Hadoop ワードカウント ジョブを送信し、結果を Cloud Storage バケットに書き込みます。DataprocClusterDeleteOperator
: クラスタを削除して、Compute Engine の利用料金が発生しないようにします。タスクは順番に実行されます。ファイルの次の部分で確認できます。
DAG の名前は quickstart
で、DAG は 1 日 1 回実行されます。
default_dag_args
に渡される start_date
は yesterday
に設定されているので、Cloud Composer ではワークフローの開始が DAG のアップロード直後に設定されます。
Composer に戻り、環境のステータスを確認します。
環境が作成されたら、環境の名前(highcpu)をクリックして詳細を確認します。
[環境の構成] タブで、Airflow ウェブ UI の URL、GKE クラスタ、バケットに保存されている DAG フォルダへのリンクなどの情報を確認できます。
/dags
フォルダ内のワークフローのみです。Google Cloud コンソールで Airflow ウェブ インターフェースにアクセスするには:
Airflow 変数は Airflow 固有のコンセプトであり、環境変数とは異なります。
Airflow インターフェースのメニューバーで [Admin] > [Variables] を選択します。
+ アイコンをクリックして新しいレコードを追加します。
gcp_project
、gcs_bucket
、gce_zone
の 3 つの Airflow 変数を作成します。Key(キー) | Val(値) | Details(詳細) |
---|---|---|
gcp_project |
このラボで使用している Google Cloud Platform プロジェクト。 | |
gcs_bucket |
gs://<my-bucket> | <my-bucket> を、すでに作成済みの Cloud Storage バケットの名前に置き換えます。このバケットに Dataproc の Hadoop ジョブの出力が保存されます。 |
gce_zone |
Cloud Dataproc クラスタを作成する Compute Engine ゾーン。 | |
gce_region |
Cloud Dataproc クラスタを作成する Compute Engine リージョン。 |
DAG をアップロードするには:
Cloud Shell で次のコマンドを実行し、hadoop_tutorial.py
ファイルのコピーを環境の作成時に自動的に作成された Cloud Storage バケットにアップロードします。
次のコマンドの <DAGs_folder_path>
を DAG フォルダへのパスに置き換えます。
DAGs folder
] を探し、パスをコピーします。変更後、ファイルをアップロードするコマンドは次のようになります。
dags
フォルダを開き、バケットの詳細の [オブジェクト] タブを開きます。DAG ファイルが DAG フォルダに追加されると、Cloud Composer によって DAG が Airflow に追加され、自動的にスケジュールされます。DAG の変更は 3~5 分以内に行われます。
composer_hadoop_tutorial
の DAG タスクのステータスは、Airflow ウェブ インターフェースで確認できます。
[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。
DAG ファイルを Cloud Storage の dags
フォルダにアップロードすると、ファイルが Cloud Composer によって解析されます。エラーが見つからなければ、このワークフローの名前が DAG のリストに表示され、即時実行されるようキューに登録されます。
Airflow ウェブ インターフェースの [DAGs] タブが表示されていることを確認してください。このプロセスが完了するまで数分かかります。ブラウザの画面を更新して、最新情報が表示されるか確認します。
Airflow ウェブ インターフェースの [DAGs] タブが表示されていることを確認してください。このプロセスが完了するまで数分かかります。ブラウザの画面を更新して、最新情報が表示されるか確認します。
Airflow で [composer_hadoop_tutorial] をクリックして DAG の詳細ページを開きます。このページでは、ワークフローのタスクと依存関係が示されています。
ツールバーで [Graph] をクリックします。各タスクのグラフィックにカーソルを合わせると、そのタスクのステータスが表示されます。各タスクを囲む線の色もステータスを表しています(緑は実行中、赤は失敗など)。
[Refresh] をクリックして最新情報を表示すると、プロセスを囲む線の色がステータスに応じて変化するのを確認できます。
create_dataproc_cluster のステータスが「Running」に変わったら、ナビゲーション メニュー > [Dataproc] に移動し、次の操作を行います。
Dataproc のステータスが「Running」になったら Airflow に戻って [Refresh] をクリックすると、クラスタが完成したことを確認できます。
run_dataproc_hadoop
プロセスが完了したら、ナビゲーション メニュー > [Cloud Storage] > [バケット] に移動し、バケット名をクリックして wordcount
フォルダでワードカウントの結果を確認できます。
Cloud Composer ワークフローを実行しました。
ラボでの学習が完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Qwiklabs から削除され、アカウントの情報も消去されます。
ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。
星の数は、それぞれ次の評価を表します。
フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。
フィードバック、ご提案、修正が必要な箇所については、[サポート] タブからお知らせください。
Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。
このコンテンツは現在ご利用いただけません
利用可能になりましたら、メールでお知らせいたします
ありがとうございます。
利用可能になりましたら、メールでご連絡いたします
1 回に 1 つのラボ
既存のラボをすべて終了して、このラボを開始することを確認してください