arrow_back

Cloud Composer 3 の概要

ログイン 参加
700 以上のラボとコースにアクセス

Cloud Composer 3 の概要

ラボ 1時間 30分 universal_currency_alt クレジット: 5 show_chart 入門
info このラボでは、学習をサポートする AI ツールが組み込まれている場合があります。
700 以上のラボとコースにアクセス

概要

ワークフローはデータ分析における一般的なテーマのひとつで、データの取り込み、変換、分析によってデータから有益な情報を見つけるために使用されます。Google Cloud には、ワークフローをホストするためのツールとして Cloud Composer が用意されています。これは、よく利用されているオープンソース ワークフロー ツールの Apache Airflow をホスト型にしたものです。

このラボでは、Google Cloud コンソールを使用して Cloud Composer 環境を作成し、Cloud Composer を使ってシンプルなワークフローを実行します。このワークフローは、データファイルが存在することを確認し、Cloud Dataproc クラスタを作成して Apache Hadoop ワードカウント ジョブを実行した後、Cloud Dataproc クラスタを削除します。

演習内容

  • Google Cloud コンソールを使用して Cloud Composer 環境を作成する

  • Airflow ウェブ インターフェースで DAG(有向非巡回グラフ)を表示して実行する

  • 保存されたワードカウント ジョブの結果を表示する

設定と要件

ラボの設定

各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。

  1. Qwiklabs にシークレット ウィンドウでログインします。

  2. ラボのアクセス時間(例: 1:15:00)に注意し、時間内に完了できるようにしてください。
    一時停止機能はありません。必要な場合はやり直せますが、最初からになります。

  3. 準備ができたら、[ラボを開始] をクリックします。

  4. ラボの認証情報(ユーザー名パスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。

  5. [Google Console を開く] をクリックします。

  6. [別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
    他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。

  7. 利用規約に同意し、再設定用のリソースページをスキップします。

Google Cloud Shell の有効化

Google Cloud Shell は、開発ツールと一緒に読み込まれる仮想マシンです。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働します。

Google Cloud Shell を使用すると、コマンドラインで Google Cloud リソースにアクセスできます。

  1. Google Cloud コンソールで、右上のツールバーにある [Cloud Shell をアクティブにする] ボタンをクリックします。

    ハイライト表示された Cloud Shell アイコン

  2. [続行] をクリックします。

環境がプロビジョニングされ、接続されるまでしばらく待ちます。接続した時点で認証が完了しており、プロジェクトに各自のプロジェクト ID が設定されます。次に例を示します。

Cloud Shell ターミナルでハイライト表示されたプロジェクト ID

gcloud は Google Cloud のコマンドライン ツールです。このツールは、Cloud Shell にプリインストールされており、タブ補完がサポートされています。

  • 次のコマンドを使用すると、有効なアカウント名を一覧表示できます。
gcloud auth list

出力:

Credentialed accounts: - @.com (active)

出力例:

Credentialed accounts: - google1623327_student@qwiklabs.net
  • 次のコマンドを使用すると、プロジェクト ID を一覧表示できます。
gcloud config list project

出力:

[core] project =

出力例:

[core] project = qwiklabs-gcp-44776a13dea667a6 注: gcloud ドキュメントの全文については、 gcloud CLI の概要ガイド をご覧ください。

プロジェクトの権限を確認する

Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。

  1. Google Cloud コンソールのナビゲーション メニューナビゲーション メニュー アイコン)で、[IAM と管理] > [IAM] を選択します。

  2. Compute Engine のデフォルトのサービス アカウント {project-number}-compute@developer.gserviceaccount.com が存在し、編集者のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] > [ダッシュボード] から確認できます。

Compute Engine のデフォルトのサービス アカウント名と編集者のステータスがハイライト表示された [権限] タブページ

注: アカウントが IAM に存在しない場合やアカウントに編集者のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。
  1. Google Cloud コンソールのナビゲーション メニューで、[Cloud の概要] > [ダッシュボード] をクリックします。
  2. プロジェクト番号(例: 729328892908)をコピーします。
  3. ナビゲーション メニューで、[IAM と管理] > [IAM] を選択します。
  4. ロールの表の上部で、[プリンシパル別に表示] の下にある [アクセスを許可] をクリックします。
  5. [新しいプリンシパル] に次のように入力します。
{project-number}-compute@developer.gserviceaccount.com
  1. {project-number} はプロジェクト番号に置き換えてください。
  2. [ロール] で、[Project](または [基本])> [編集者] を選択します。
  3. [保存] をクリックします。

タスク 1. 環境のタスク

  1. Google Cloud コンソールのタイトルバーで、[Cloud Shell をアクティブにする] をクリックします。プロンプトが表示されたら、[続行] をクリックします。

  2. 以下のコマンドを実行して、Compute デベロッパー サービス アカウントに Composer ワーカー ロールを割り当てます。

export PROJECT_ID=$(gcloud config get-value project) export PROJECT_NUMBER=$(gcloud projects describe $PROJECT_ID --format="value(projectNumber)") gcloud projects add-iam-policy-binding {{{project_0.project_id}}} \ --member=serviceAccount:$PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=roles/composer.worker
  1. 次のコマンドを実行して、プロジェクトで必要な API がスムーズに有効化されるようにします。
gcloud services disable composer.googleapis.com gcloud services disable artifactregistry.googleapis.com gcloud services disable container.googleapis.com gcloud services enable artifactregistry.googleapis.com gcloud services enable container.googleapis.com gcloud services enable composer.googleapis.com

タスク 2. Cloud Composer 環境を作成する

このセクションでは、Cloud Composer 環境を作成します。

注: この先のステップに進む前に、ここまでの手順を実行済みで、必要な API が有効になっていることを確認してください。まだの場合はここで実行してください。そうしないと、Cloud Composer 環境の作成に失敗します。
  1. Google Cloud コンソールのタイトルバーにある検索フィールドに「Composer」と入力し、次に [プロダクトとページ] セクションの [Composer] をクリックします。

  2. [環境を作成] をクリックして [Composer 3] を選択します。環境を以下のように設定します。

プロパティ
名前 highcpu
ロケーション
イメージのバージョン composer-3-airflow-n.n.n-build.n(注: 利用可能なイメージの中で最も大きい番号のイメージを選択)
  1. [環境リソース] で [Small] を選択します。

その他の設定はすべてデフォルトのままにします。

  1. [作成] をクリックします。

コンソールの [環境] ページで、環境の名前の左側に緑色のチェックマークが表示されていれば、環境作成プロセスは完了しています。

設定プロセスが完了するまでに 15~30 分かかることがあります。その間にラボの作業を進めてください。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。

Cloud Composer 環境を作成する

Cloud Storage バケットを作成する

プロジェクトの Cloud Storage バケットを作成します。このバケットは、Dataproc の Hadoop ジョブの出力に使用されます。

  1. ナビゲーション メニュー > [Cloud Storage] > [バケット] に移動し、[+ 作成] をクリックします。

  2. バケットにユニバーサルに一意な名前(プロジェクト ID など)を付けてから、[作成] をクリックします。「公開アクセスの防止」というメッセージが表示されたら、[確認] をクリックします。

この Cloud Storage バケット名は後ほど Airflow 変数として使用するため、覚えておいてください。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。

Cloud Storage バケットを作成する

タスク 3. Airflow と主要なコンセプト

Composer 環境が作成されるまでの間に、Airflow で使用される用語を確認しましょう。

Airflow とは、ワークフローの作成、スケジューリング、モニタリングをプログラマティックに行うためのプラットフォームです。

Airflow を使用して、ワークフローをタスクの有向非循環グラフ(DAG)として作成します。Airflow スケジューラは、指定された依存関係に従って一連のワーカーでタスクを実行します。

基本コンセプト

DAG

有向非巡回グラフとは、実行するすべてのタスクの集まりであり、それらの関係や依存状態を反映するように編成されます。

オペレーター

単一のタスクを記述したもので、通常はアトミックなタスクです。たとえば、BashOperator は bash コマンドの実行に使用されます。

タスク

オペレーターのパラメータ化されたインスタンスであり、DAG 内のノードです。

タスク インスタンス

特定のタスクの実行です。DAG、タスク、ある時点を表し、実行中、成功、失敗、スキップなどのステータスを示します。

コンセプトの詳細については、Concepts のドキュメントをご覧ください。

タスク 4. ワークフローを定義する

これから使用するワークフローについて説明します。Cloud Composer ワークフローは DAG(Directed Acyclic Graph、有向非巡回グラフ)で構成されます。DAG の定義には標準 Python ファイルを使用し、これらのファイルは Airflow の DAG_FOLDER に配置されます。Airflow では各ファイル内のコードを実行して動的に DAG オブジェクトをビルドします。任意の数のタスクを記述した DAG を必要なだけ作成できます。一般に、DAG と論理ワークフローは 1 対 1 で対応している必要があります。

以下に、hadoop_tutorial.py ワークフロー コード(DAG ともいいます)を示します。

"""Airflow DAG の例。Cloud Dataproc クラスタを作成し、Hadoop の ワードカウントの例を実行してクラスタを削除します。 この DAG は 3 つの Airflow 変数に依存します https://airflow.apache.org/concepts.html#variables * gcp_project - Cloud Dataproc クラスタで使用する Google Cloud プロジェクト * gce_zone - Cloud Dataproc クラスタを作成する Google Compute Engine ゾーン * gce_region - Cloud Dataproc クラスタを作成する Google Compute Engine リージョン。 * gcs_bucket - Dataproc の Hadoop ジョブの出力として使用される Google Cloud Storage バケット バケットの作成方法については https://cloud.google.com/storage/docs/creating-buckets を参照 """ import datetime import os from airflow import models from airflow.contrib.operators import dataproc_operator from airflow.utils import trigger_rule # Cloud Dataproc ジョブの出力ファイル output_file = os.path.join( models.Variable.get('gcs_bucket'), 'wordcount', datetime.datetime.now().strftime('%Y%m%d-%H%M%S')) + os.sep # すべての Dataproc クラスタで利用可能な、Hadoop のワードカウントの例へのパス WORDCOUNT_JAR = ( 'file:///usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar' ) # Cloud Dataproc ジョブに渡す引数 wordcount_args = ['wordcount', 'gs://pub/shakespeare/rose.txt', output_file] yesterday = datetime.datetime.combine( datetime.datetime.today() - datetime.timedelta(1), datetime.datetime.min.time()) default_dag_args = { # 開始日を前日にすると、Cloud Storage バケットで # 検出され次第 DAG が実行されます 'start_date': yesterday, # 失敗時や再試行時にメールを送信するには、「email」引数を # 自分のアドレスに設定してメールを有効にします 'email_on_failure': False, 'email_on_retry': False, # タスクの失敗時には、5 分以上待ってから 1 回再試行します 'retries': 1, 'retry_delay': datetime.timedelta(minutes=5), 'project_id': models.Variable.get('gcp_project') } with models.DAG( 'composer_hadoop_tutorial', # DAG を 1 日に 1 回、継続して実行します schedule_interval=datetime.timedelta(days=1), default_args=default_dag_args) as dag: # Cloud Dataproc クラスタを作成します create_dataproc_cluster = dataproc_operator.DataprocClusterCreateOperator( task_id='create_dataproc_cluster', # 設定された日付を付加してクラスタに一意の名前を付けます # https://airflow.apache.org/code.html#default-variables を参照 cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}', num_workers=2, region=models.Variable.get('gce_region'), zone=models.Variable.get('gce_zone'), image_version='2.0', master_machine_type='e2-standard-2', worker_machine_type='e2-standard-2') # Cloud Dataproc クラスタのマスターノードにインストール # されている Hadoop のワードカウントの例を実行します run_dataproc_hadoop = dataproc_operator.DataProcHadoopOperator( task_id='run_dataproc_hadoop', region=models.Variable.get('gce_region'), main_jar=WORDCOUNT_JAR, cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}', arguments=wordcount_args) # Cloud Dataproc クラスタを削除します delete_dataproc_cluster = dataproc_operator.DataprocClusterDeleteOperator( task_id='delete_dataproc_cluster', region=models.Variable.get('gce_region'), cluster_name='composer-hadoop-tutorial-cluster-{{ ds_nodash }}', # trigger_rule を ALL_DONE に設定すると、 # Dataproc ジョブが失敗した場合でもクラスタは削除されます trigger_rule=trigger_rule.TriggerRule.ALL_DONE) # DAG の依存関係を定義します create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

3 つのワークフロー タスクをオーケストレーションするために、DAG は次のオペレーターをインポートします。

  1. DataprocClusterCreateOperator: Cloud Dataproc クラスタを作成します。
  2. DataProcHadoopOperator: Hadoop ワードカウント ジョブを送信し、結果を Cloud Storage バケットに書き込みます。
  3. DataprocClusterDeleteOperator: クラスタを削除して、Compute Engine の利用料金が発生しないようにします。

タスクは順番に実行されます。ファイルの次の部分で確認できます。

# DAG の依存関係を定義します create_dataproc_cluster >> run_dataproc_hadoop >> delete_dataproc_cluster

この DAG の名前は composer_hadoop_tutorial で、1 日 1 回実行されます。

with models.DAG( 'composer_hadoop_tutorial', # DAG を 1 日に 1 回、継続して実行します schedule_interval=datetime.timedelta(days=1), default_args=default_dag_args) as dag:

default_dag_args に渡される start_dateyesterday に設定されているので、Cloud Composer ではワークフローの開始が DAG のアップロード直後に設定されます。

タスク 5. 環境情報を表示する

  1. Composer に戻り、環境のステータスを確認します。

  2. 環境が作成されたら、環境の名前(highcpu)をクリックして詳細を確認します。

[環境の構成] タブで、Airflow ウェブ UI の URL、GKE クラスタ、バケットに保存されている DAG フォルダへのリンクなどの情報を確認できます。

注: Cloud Composer がスケジュール設定を行うのは、/dags フォルダ内のワークフローのみです。

タスク 6. Airflow UI を使用する

コンソールで Airflow ウェブ インターフェースにアクセスするには:

  1. [環境] ページに戻ります。
  2. 環境の [Airflow ウェブサーバー] 列で、[Airflow] をクリックします。
  3. ラボの認証情報をクリックします。
  4. 新しいブラウザ ウィンドウに Airflow ウェブ インターフェースが表示されます。

タスク 7. Airflow 変数を設定する

Airflow 変数は Airflow 固有のコンセプトであり、環境変数とは異なります。

  1. Airflow インターフェースのメニューバーで [Admin] > [Variables] を選択します。

  2. + アイコンをクリックして新しいレコードを追加します。

変数の追加

  1. gcp_projectgcs_bucketgce_zonegce_region の 4 つの Airflow 変数を作成します。変数を 1 つ作成するごとに [Save] をクリックします。
Key(キー) Val(値) Details(詳細)
gcp_project このラボで使用している Google Cloud Platform プロジェクト。
gcs_bucket gs://<my-bucket> <my-bucket> を、すでに作成済みの Cloud Storage バケットの名前に置き換えます。このバケットに Dataproc の Hadoop ジョブの出力が保存されます。
gce_zone Cloud Dataproc クラスタを作成する Compute Engine ゾーン。
gce_region Cloud Dataproc クラスタを作成する Compute Engine リージョン。

[Save] をクリックします。1 つ目の変数を追加したら、2 つ目と 3 つ目の変数についても同じ手順を繰り返します。完了すると、Variables のテーブルは次のようになります。

変数リスト

タスク 8. DAG を Cloud Storage にアップロードする

DAG をアップロードするには:

  1. Cloud Shell で次のコマンドを実行し、hadoop_tutorial.py ファイルのコピーを環境の作成時に自動的に作成された Cloud Storage バケットにアップロードします。

  2. 次のコマンドの <DAGs_folder_path> を DAG フォルダへのパスに置き換えます。

gcloud storage cp gs://cloud-training/datawarehousing/lab_assets/hadoop_tutorial.py <DAGs_folder_path>
  • このパスは Composer で確認できます。
  • 前のステップで作成した環境をクリックし、[Environment Configuration] タブをクリックして環境の詳細を確認します。
  • [DAGs folder] を探し、パスをコピーします。

DAGS フォルダのパス

変更後、ファイルをアップロードするコマンドは次のようになります。

gcloud storage cp gs://cloud-training/datawarehousing/lab_assets/hadoop_tutorial.py gs://{{{project_0.default_region|REGION}}}-highcpu-0682d8c0-bucket/dags
  1. ファイルが DAG ディレクトリにアップロードされたら、バケットで dags フォルダを開き、バケットの詳細の [Objects] タブを開きます。

バケットの詳細

DAG ファイルが DAG フォルダに追加されると、Cloud Composer によって DAG が Airflow に追加され、自動的にスケジュールされます。DAG の変更は 3~5 分以内に行われます。

composer_hadoop_tutorial の DAG タスクのステータスは、Airflow ウェブ インターフェースで確認できます。

注: インターフェースに「The scheduler does not appear to be running...」などのメッセージが表示されても無視してかまいません。 Airflow ウェブ インターフェースは、DAG の進行状況に応じて更新されます。

[進行状況を確認] をクリックして、目標に沿って進んでいることを確認します。

DAG を Cloud Storage にアップロードする

DAG の実行状況を確認する

DAG ファイルを Cloud Storage の dags フォルダにアップロードすると、ファイルが Cloud Composer によって解析されます。エラーが見つからなければ、このワークフローの名前が DAG のリストに表示され、即時実行されるようキューに登録されます。

  1. Airflow ウェブ インターフェースの [DAGs] タブが表示されていることを確認してください。このプロセスが完了するまで数分かかります。ブラウザの画面を更新して、最新情報が表示されるか確認します。

  2. Airflow で [composer_hadoop_tutorial] をクリックして DAG の詳細ページを開きます。このページでは、ワークフローのタスクと依存関係が示されています。

  3. ツールバーで [Graph] をクリックします。各タスクのグラフィックにカーソルを合わせると、そのタスクのステータスが表示されます。各タスクを囲む線の色もステータスを表しています(緑は実行中、赤は失敗など)。

  4. [Refresh] をクリックして最新情報を表示すると、プロセスを囲む線の色がステータスに応じて変化するのを確認できます。

注: Dataproc クラスタがすでに存在している場合は、[create_dataproc_cluster] グラフィックをクリックしてもう一度ワークフローを実行し、「success」ステータスになってから、[Clear] をクリックして 3 つのタスクをリセットし、[OK] をクリックして確定します。
  1. create_dataproc_cluster のステータスが「Running」に変わったら、ナビゲーション メニュー > [Dataproc] に移動し、次の操作を行います。

    • [Clusters] をクリックして、クラスタの作成と削除をモニタリングします。ワークフローによって作成されたクラスタは一時的なものです。ワークフローの実行中にのみ存在し、最後のワークフロー タスクで削除されます。
    • [Jobs] をクリックして、Apache Hadoop ワードカウント ジョブをモニタリングします。ジョブ ID をクリックすると、ジョブのログ出力を確認できます。
  2. Dataproc のステータスが「Running」になったら Airflow に戻って [Refresh] をクリックすると、クラスタが完成したことを確認できます。

run_dataproc_hadoop プロセスが完了したら、ナビゲーション メニュー > [Cloud Storage] > [バケット] に移動し、バケット名をクリックして wordcount フォルダでワードカウントの結果を確認できます。

  1. DAG ですべてのステップが完了すると、各ステップを囲む線の色が濃い緑になります。また、作成した Dataproc クラスタは削除されました。

お疲れさまでした

Cloud Composer ワークフローを実行しました。

次のステップ

ラボを終了する

ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。

ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。

星の数は、それぞれ次の評価を表します。

  • 星 1 つ = 非常に不満
  • 星 2 つ = 不満
  • 星 3 つ = どちらともいえない
  • 星 4 つ = 満足
  • 星 5 つ = 非常に満足

フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。

フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。

Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。

始める前に

  1. ラボでは、Google Cloud プロジェクトとリソースを一定の時間利用します
  2. ラボには時間制限があり、一時停止機能はありません。ラボを終了した場合は、最初からやり直す必要があります。
  3. 画面左上の [ラボを開始] をクリックして開始します

シークレット ブラウジングを使用する

  1. ラボで使用するユーザー名パスワードをコピーします
  2. プライベート モードで [コンソールを開く] をクリックします

コンソールにログインする

    ラボの認証情報を使用して
  1. ログインします。他の認証情報を使用すると、エラーが発生したり、料金が発生したりする可能性があります。
  2. 利用規約に同意し、再設定用のリソースページをスキップします
  3. ラボを終了する場合や最初からやり直す場合を除き、[ラボを終了] はクリックしないでください。クリックすると、作業内容がクリアされ、プロジェクトが削除されます

このコンテンツは現在ご利用いただけません

利用可能になりましたら、メールでお知らせいたします

ありがとうございます。

利用可能になりましたら、メールでご連絡いたします

1 回に 1 つのラボ

既存のラボをすべて終了して、このラボを開始することを確認してください

シークレット ブラウジングを使用してラボを実行する

このラボの実行には、シークレット モードまたはシークレット ブラウジング ウィンドウを使用してください。これにより、個人アカウントと受講者アカウントの競合を防ぎ、個人アカウントに追加料金が発生することを防ぎます。