arrow_back

Cloud Composer の概要

ログイン 参加
700 以上のラボとコースにアクセス

Cloud Composer の概要

ラボ 1時間 30分 universal_currency_alt クレジット: 5 show_chart 入門
info このラボでは、学習をサポートする AI ツールが組み込まれている場合があります。
700 以上のラボとコースにアクセス

概要

ワークフローはデータ分析における一般的なテーマのひとつで、データの取り込み、変換、分析によってデータから有益な情報を見つけるために使用されます。Google Cloud Platform(GCP)には、ワークフローをホストするためのツールとして Cloud Composer が用意されています。これは、よく利用されているオープンソース ワークフロー ツールの Apache Airflow をホスト型にしたものです。

このラボでは、Google Cloud コンソールを使用して Cloud Composer 環境を作成し、Cloud Composer を使ってシンプルなワークフローを実行します。このワークフローは、データファイルが存在することを確認し、Cloud Dataproc クラスタを作成して Apache Hadoop ワードカウント ジョブを実行した後、Cloud Dataproc クラスタを削除します。

演習内容

  • Google Cloud コンソールを使用して Cloud Composer 環境を作成する

  • Airflow ウェブ インターフェースで DAG(有向非巡回グラフ)を表示して実行する

  • 保存されたワードカウント ジョブの結果を表示する

設定と要件

ラボの設定

各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。

  1. Qwiklabs にシークレット ウィンドウでログインします。

  2. ラボのアクセス時間(例: 1:15:00)に注意し、時間内に完了できるようにしてください。
    一時停止機能はありません。必要な場合はやり直せますが、最初からになります。

  3. 準備ができたら、[ラボを開始] をクリックします。

  4. ラボの認証情報(ユーザー名パスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。

  5. [Google Console を開く] をクリックします。

  6. [別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
    他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。

  7. 利用規約に同意し、再設定用のリソースページをスキップします。

Cloud Shell をアクティブにする

Cloud Shell は、開発ツールが組み込まれた仮想マシンです。5 GB の永続ホーム ディレクトリを提供し、Google Cloud 上で実行されます。Cloud Shell を使用すると、コマンドラインで Google Cloud リソースにアクセスできます。gcloud は Google Cloud のコマンドライン ツールで、Cloud Shell にプリインストールされており、Tab キーによる入力補完がサポートされています。

  1. Google Cloud Console のナビゲーション パネルで、「Cloud Shell をアクティブにする」アイコン(Cloud Shell アイコン)をクリックします。

  2. [次へ] をクリックします。
    環境がプロビジョニングされ、接続されるまでしばらく待ちます。接続の際に認証も行われ、プロジェクトは現在のプロジェクト ID に設定されます。次に例を示します。

Cloud Shell ターミナル

サンプル コマンド

  • 有効なアカウント名前を一覧表示する:

gcloud auth list

(出力)

Credentialed accounts: - <myaccount>@<mydomain>.com (active)

(出力例)

Credentialed accounts: - google1623327_student@qwiklabs.net
  • プロジェクト ID を一覧表示する:

gcloud config list project

(出力)

[core] project = <プロジェクト ID>

(出力例)

[core] project = qwiklabs-gcp-44776a13dea667a6

プロジェクトの権限を確認する

Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。

  1. Google Cloud コンソールのナビゲーション メニューナビゲーション メニュー アイコン)で、[IAM と管理] > [IAM] をクリックします。

  2. Compute Engine のデフォルトのサービス アカウント {project-number}-compute@developer.gserviceaccount.com が存在し、編集者のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] から確認できます。

デフォルトのコンピューティング サービス アカウント

アカウントが IAM に存在しない場合やアカウントに編集者のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。

  1. Google Cloud コンソールのナビゲーション メニューで、[Cloud の概要] をクリックします。

  2. [プロジェクト情報] カードからプロジェクト番号をコピーします。

  3. ナビゲーション メニューで、[IAM と管理] > [IAM] をクリックします。

  4. IAM ページの上部にある [追加] をクリックします。

  5. 新しいプリンシパルの場合は、次のように入力します。

{project-number}-compute@developer.gserviceaccount.com

{project-number} はプロジェクト番号に置き換えてください。

  1. [ロールを選択] で、[基本](または [Project])> [編集者] を選択します。

  2. [保存] をクリックします。

タスク 1. Kubernetes Engine API が有効になっていることを確認する

必要な API にアクセスできることを確認するには、Kubernetes Engine API への接続をリセットします。

  1. Google Cloud コンソールの [プロジェクト情報] の中にプロジェクト番号が表示されているので、それを記録します。

  2. コンソールの上部の検索バーに「Kubernetes Engine API」と入力します。検索結果の「Kubernetes Engine API」をクリックします。

  3. [管理] をクリックします。

  4. [API を無効にする] をクリックします。

    • 確認を求められたら、[無効にする] をクリックします。

    • [Kubernetes Engine API とその依存 API を無効にしますか?] と表示されたら、[無効にする] をクリックします。

  5. [有効にする] をクリックします。

API が再度有効になると、ページに無効にするオプションが表示されます。

タスク 2. Cloud Composer API が有効になっていることを確認する

Cloud Composer API への接続をリセットします。前のステップで Kubernetes Engine API を再起動したので、Cloud Composer API が無効になっています。

  1. Google Cloud コンソールの上部の検索バーに「Cloud Composer API」と入力します。検索結果の [Cloud Composer API] をクリックします。

  2. [有効にする] をクリックします。

API が再度有効になると、ページに無効にするオプションが表示されます。

タスク 3. Cloud Composer 環境を作成する

このセクションでは、Cloud Composer 環境を作成します。

注: この先のステップに進む前に、ここまでの手順を実行済みで、必要な API が有効になっていることを確認してください。まだの場合はここで実行してください。実行しない場合、Cloud Composer 環境の作成に失敗します。
  1. Google Cloud コンソールのタイトルバーにある [検索] フィールドに「cloud composer」と入力し、[Composer] をクリックします。

  2. [環境の作成] をクリックして [Composer 3] を選択します。

  3. 環境を以下のように設定します。

プロパティ
名前 highcpu
ロケーション
イメージのバージョン composer-3-airflow-n.n.n-build.n(注: 利用可能なイメージの中で最も大きい番号のイメージを選択)

その他の設定はすべてデフォルトのままにします。

  1. [環境リソース] で [Small] を選択します。

  2. [作成] をクリックします。

Google Cloud コンソールの [環境] ページで環境の名前の左側に緑色のチェックマークが表示されれば、環境作成プロセスは完了しています。

設定プロセスが完了するまで 10~20 分かかることがあります。その間にラボの作業を進めてください。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。

Cloud Composer 環境を作成する

Cloud Storage バケットを作成する

プロジェクトの Cloud Storage バケットを作成します。このバケットは、Dataproc の Hadoop ジョブの出力に使用されます。

  1. ナビゲーション メニュー > [Cloud Storage] > [バケット] に移動し、[+ 作成] をクリックします。

  2. バケットにユニバーサルに一意な名前を付けてから、[作成] をクリックします。「公開アクセスの防止」というメッセージが表示されたら、[確認] をクリックします。

この Cloud Storage バケット名は後ほど Airflow 変数として使用するため、覚えておいてください。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。

Cloud Storage バケットを作成する

タスク 4. Airflow と主要なコンセプト

Composer 環境が作成されるまでの間に、Airflow で使用される用語を確認しましょう。

Airflow とは、ワークフローの作成、スケジューリング、モニタリングをプログラマティックに行うためのプラットフォームです。

Airflow を使用して、ワークフローをタスクの有向非循環グラフ(DAG)として作成します。Airflow スケジューラは、指定された依存関係に従って一連のワーカーでタスクを実行します。

基本コンセプト

DAG: 有向非巡回グラフとは、実行するすべてのタスクの集まりであり、それらの関係や依存状態を反映するように編成されます。

オペレーター: 単一のタスクを記述したもので、通常はアトミックなタスクです。たとえば、BashOperator は bash コマンドの実行に使用されます。

タスク: オペレーターのパラメータ化されたインスタンスであり、DAG 内のノードです。

タスク インスタンス: タスクの特定の実行です。DAG、タスク、特定の時点を表し、実行中、成功、失敗、スキップなどのステータスを示します。

コンセプトの詳細については、コンセプトのドキュメントをご覧ください。

タスク 5. ワークフローの定義

これから使用するワークフローについて説明します。Cloud Composer ワークフローは DAG(Directed Acyclic Graph、有向非巡回グラフ)で構成されます。DAG の定義には標準 Python ファイルを使用し、これらのファイルは Airflow の DAG_FOLDER に配置されます。Airflow では各ファイル内のコードを実行して動的に DAG オブジェクトをビルドします。任意の数のタスクを記述した DAG を必要なだけ作成できます。一般に、DAG と論理ワークフローは 1 対 1 で対応している必要があります。

以下に、hadoop_tutorial.py ワークフロー コード(DAG ともいいます)を示します。

"""Airflow DAG の例。Cloud Dataproc クラスタを作成し、Hadoop の ワードカウントの例を実行してクラスタを削除します。 この DAG は 3 つの Airflow 変数に依存します https://airflow.apache.org/concepts.html#variables * gcp_project - Cloud Dataproc クラスタで使用する Google Cloud プロジェクト * gce_zone - Cloud Dataproc クラスタを作成する Google Compute Engine ゾーン * gcs_bucket - Dataproc の Hadoop ジョブの出力として使用される Google Cloud Storage バケット バケットの作成方法については https://cloud.google.com/storage/docs/creating-buckets を参照 """ import datetime import os from airflow import models from airflow.contrib.operators import dataproc_operator from airflow.utils import trigger_rule # Cloud Dataproc ジョブの出力ファイル output_file = os.path.join( models.Variable.get('gcs_bucket'), 'wordcount', datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep # すべての Dataproc クラスタで利用可能な、Hadoop のワードカウントの例へのパス WORDCOUNT_JAR = ( 'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar' ) # Cloud Dataproc ジョブに渡す引数 wordcount_args = ['wordcount', 'gs://pub/shakespeare/rose.txt', output_file] yesterday = datetime.datetime.combine( datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()) default_dag_args = { # 開始日を前日にすると、Cloud Storage バケットで # 検出され次第 DAG が実行されます 'start_date': yesterday, # 失敗時や再試行時にメールを送信するには、「email」引数を # 自分のアドレスに設定してメールを有効にします 'email_on_failure': False, 'email_on_retry': False, # タスクの失敗時に、5 分以上待ってから 1 回再試行します 'retries': 1, 'retry_delay': datetime.timedelta(minutes=5), 'project_id': models.Variable.get('gcp_project') } with models.DAG( 'composer_sample_quickstart', # DAG を 1 日に 1 回、継続して実行します schedule_interval=datetime.timedelta(days=1), default_args=default_dag_args) as dag: # Cloud Dataproc クラスタを作成します create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator( task_id='create_dataproc_cluster', # 設定された日付を加えてクラスタに一意の名前を付けます # https://airflow.apache.org/code.html#default-variables を参照 cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}', num_workers=2, region=models.Variable.get('gce_region'), zone=models.Variable.get('gce_zone'), image_version='2.0', master_machine_type='n1-standard-2', worker_machine_type='n1-standard-2') # Cloud Dataproc クラスタのマスターノードにインストール # されている Hadoop のワードカウントの例を実行します run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator( task_id='run_dataproc_hadoop', region=models.Variable.get('gce_region'), main_jar=WORDCOUNT_JAR, cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}', arguments=wordcount_args) # Cloud Dataproc クラスタを削除します delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator( task_id='delete_dataproc_cluster', region=models.Variable.get('gce_region'), cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}', # trigger_rule を ALL_DONE に設定すると、 # Dataproc ジョブが失敗した場合でもクラスタは削除されます trigger_rule=trigger_rule.TriggerRule.ALL_DONE) # DAG の依存関係を定義します create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

3 つのワークフロー タスクをオーケストレーションするために、DAG は次のオペレーターをインポートします。

  1. DataprocClusterCreateOperator: Cloud Dataproc クラスタを作成します。
  2. DataProcHadoopOperator: Hadoop ワードカウント ジョブを送信し、結果を Cloud Storage バケットに書き込みます。
  3. DataprocClusterDeleteOperator: クラスタを削除して、Compute Engine の利用料金が発生しないようにします。

タスクは順番に実行されます。ファイルの次の部分で確認できます。

# DAG の依存関係を定義します create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

DAG の名前は quickstart で、DAG は 1 日 1 回実行されます。

with models.DAG( 'composer_sample_quickstart', # DAG を 1 日に 1 回、継続して実行します schedule_interval=datetime.timedelta(days=1), default_args=default_dag_args) as dag:

default_dag_args に渡される start_dateyesterday に設定されているので、Cloud Composer ではワークフローの開始が DAG のアップロード直後に設定されます。

タスク 6. 環境情報を表示する

  1. Composer に戻り、環境のステータスを確認します。

  2. 環境が作成されたら、環境の名前(highcpu)をクリックして詳細を確認します。

[環境の構成] タブで、Airflow ウェブ UI の URL、GKE クラスタ、バケットに保存されている DAG フォルダへのリンクなどの情報を確認できます。

注: Cloud Composer がスケジュール設定を行うのは、/dags フォルダ内のワークフローのみです。

タスク 7. Airflow UI の使用

Google Cloud コンソールで Airflow ウェブ インターフェースにアクセスするには:

  1. [環境] ページに戻ります。
  2. 環境の [Airflow ウェブサーバー] 列で、[Airflow] をクリックします。
  3. ラボの認証情報をクリックします。
  4. 新しいウィンドウに Airflow ウェブ インターフェースが表示されます。

タスク 8. Airflow 変数を設定する

Airflow 変数は Airflow 固有のコンセプトであり、環境変数とは異なります。

  1. Airflow インターフェースのメニューバーで [Admin] > [Variables] を選択します。

  2. + アイコンをクリックして新しいレコードを追加します。

変数の追加

  1. gcp_projectgcs_bucketgce_zone の 3 つの Airflow 変数を作成します。
Key(キー) Val(値) Details(詳細)
gcp_project このラボで使用している Google Cloud Platform プロジェクト。
gcs_bucket gs://<my-bucket> <my-bucket> を、すでに作成済みの Cloud Storage バケットの名前に置き換えます。このバケットに Dataproc の Hadoop ジョブの出力が保存されます。
gce_zone Cloud Dataproc クラスタを作成する Compute Engine ゾーン。
gce_region Cloud Dataproc クラスタを作成する Compute Engine リージョン。
  1. [Save] をクリックします。1 つ目の変数を追加したら、2 つ目と 3 つ目の変数についても同じ手順を繰り返します。完了すると、Variables のテーブルは次のようになります。

[Key] 列見出しと [Val] 列見出しがある表が表示されている [変数リスト] タブページ

タスク 9. DAG を Cloud Storage にアップロードする

DAG をアップロードするには:

  1. Cloud Shell で次のコマンドを実行し、hadoop_tutorial.py ファイルのコピーを環境の作成時に自動的に作成された Cloud Storage バケットにアップロードします。

  2. 次のコマンドの <DAGs_folder_path> を DAG フォルダへのパスに置き換えます。

gsutil cp gs://cloud-training/datawarehousing/lab_assets/hadoop_tutorial.py <DAGs_folder_path>
  • このパスは Composer で確認できます。
  • 前のステップで作成した環境をクリックし、[Environment Configuration] タブをクリックして環境の詳細を確認します。
  • [DAGs folder] を探し、パスをコピーします。

Cloud Composer 環境で DAG フォルダ名がハイライト表示されている

変更後、ファイルをアップロードするコマンドは次のようになります。

gsutil cp gs://cloud-training/datawarehousing/lab_assets/hadoop_tutorial.py gs://{{{project_0.default_region|REGION}}}-highcpu-0682d8c0-bucket/dags
  1. ファイルが DAG ディレクトリにアップロードされたら、バケットで dags フォルダを開き、バケットの詳細の [オブジェクト] タブを開きます。

[バケットの詳細] ページの [オブジェクト] タブで hadoop_tutorial.py ファイルがハイライト表示されている

DAG ファイルが DAG フォルダに追加されると、Cloud Composer によって DAG が Airflow に追加され、自動的にスケジュールされます。DAG の変更は 3~5 分以内に行われます。

composer_hadoop_tutorial の DAG タスクのステータスは、Airflow ウェブ インターフェースで確認できます。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。

DAG を Cloud Storage にアップロードする

DAG の実行状況を確認する

DAG ファイルを Cloud Storage の dags フォルダにアップロードすると、ファイルが Cloud Composer によって解析されます。エラーが見つからなければ、このワークフローの名前が DAG のリストに表示され、即時実行されるようキューに登録されます。

Airflow ウェブ インターフェースの [DAGs] タブが表示されていることを確認してください。このプロセスが完了するまで数分かかります。ブラウザの画面を更新して、最新情報が表示されるか確認します。

  1. Airflow ウェブ インターフェースの [DAGs] タブが表示されていることを確認してください。このプロセスが完了するまで数分かかります。ブラウザの画面を更新して、最新情報が表示されるか確認します。

  2. Airflow で [composer_hadoop_tutorial] をクリックして DAG の詳細ページを開きます。このページでは、ワークフローのタスクと依存関係が示されています。

  3. ツールバーで [Graph] をクリックします。各タスクのグラフィックにカーソルを合わせると、そのタスクのステータスが表示されます。各タスクを囲む線の色もステータスを表しています(緑は実行中、赤は失敗など)。

  4. [Refresh] をクリックして最新情報を表示すると、プロセスを囲む線の色がステータスに応じて変化するのを確認できます。

注: Dataproc クラスタがすでに存在している場合は、[create_dataproc_cluster] グラフィックをクリックしてもう一度ワークフローを実行し、「success」ステータスになってから、[Clear] をクリックして 3 つのタスクをリセットし、[OK] をクリックして確定します。
  1. create_dataproc_cluster のステータスが「Running」に変わったら、ナビゲーション メニュー > [Dataproc] に移動し、次の操作を行います。

    • [Clusters] をクリックして、クラスタの作成と削除をモニタリングします。ワークフローによって作成されたクラスタは一時的なものです。ワークフローの実行中にのみ存在し、最後のワークフロー タスクで削除されます。
    • [Jobs] をクリックして、Apache Hadoop ワードカウント ジョブをモニタリングします。ジョブ ID をクリックすると、ジョブのログ出力を確認できます。
  2. Dataproc のステータスが「Running」になったら Airflow に戻って [Refresh] をクリックすると、クラスタが完成したことを確認できます。

run_dataproc_hadoop プロセスが完了したら、ナビゲーション メニュー > [Cloud Storage] > [バケット] に移動し、バケット名をクリックして wordcount フォルダでワードカウントの結果を確認できます。

  1. DAG ですべてのステップが完了すると、各ステップを囲む線の色が濃い緑になります。また、作成した Dataproc クラスタは削除されました。

お疲れさまでした

Cloud Composer ワークフローを実行しました。

次のステップ

ラボを終了する

ラボでの学習が完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Qwiklabs から削除され、アカウントの情報も消去されます。

ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。

星の数は、それぞれ次の評価を表します。

  • 星 1 つ = 非常に不満
  • 星 2 つ = 不満
  • 星 3 つ = どちらともいえない
  • 星 4 つ = 満足
  • 星 5 つ = 非常に満足

フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。

フィードバック、ご提案、修正が必要な箇所については、[サポート] タブからお知らせください。

Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。

始める前に

  1. ラボでは、Google Cloud プロジェクトとリソースを一定の時間利用します
  2. ラボには時間制限があり、一時停止機能はありません。ラボを終了した場合は、最初からやり直す必要があります。
  3. 画面左上の [ラボを開始] をクリックして開始します

シークレット ブラウジングを使用する

  1. ラボで使用するユーザー名パスワードをコピーします
  2. プライベート モードで [コンソールを開く] をクリックします

コンソールにログインする

    ラボの認証情報を使用して
  1. ログインします。他の認証情報を使用すると、エラーが発生したり、料金が発生したりする可能性があります。
  2. 利用規約に同意し、再設定用のリソースページをスキップします
  3. ラボを終了する場合や最初からやり直す場合を除き、[ラボを終了] はクリックしないでください。クリックすると、作業内容がクリアされ、プロジェクトが削除されます

このコンテンツは現在ご利用いただけません

利用可能になりましたら、メールでお知らせいたします

ありがとうございます。

利用可能になりましたら、メールでご連絡いたします

1 回に 1 つのラボ

既存のラボをすべて終了して、このラボを開始することを確認してください

シークレット ブラウジングを使用してラボを実行する

このラボの実行には、シークレット モードまたはシークレット ブラウジング ウィンドウを使用してください。これにより、個人アカウントと受講者アカウントの競合を防ぎ、個人アカウントに追加料金が発生することを防ぎます。