arrow_back

Cloud Dataproc で Apache Spark ジョブを実行する

ログイン 参加
700 以上のラボとコースにアクセス

Cloud Dataproc で Apache Spark ジョブを実行する

ラボ 1時間 30分 universal_currency_alt クレジット: 5 show_chart 上級
info このラボでは、学習をサポートする AI ツールが組み込まれている場合があります。
700 以上のラボとコースにアクセス

概要

このラボでは、Apache Spark のコードを Cloud Dataproc に移行する方法を学びます。一連のステップに従って段階的に、ジョブの各コンポーネントを Google Cloud のサービスに移行します。

  • 元の Spark コードを Cloud Dataproc で実行する(リフト&シフト)
  • HDFS を Cloud Storage に置き換える(クラウドネイティブ)
  • ジョブ固有のクラスタで実行されるように、すべての処理を自動化する(クラウド最適化)

学習内容

このラボでは、次の方法について学びます。

  • 既存の Spark ジョブを Cloud Dataproc に移行する
  • HDFS ではなく Cloud Storage を使用するように Spark ジョブを変更する
  • ジョブ固有のクラスタで実行するために Spark ジョブを最適化する

必要なもの

  • Cloud Dataproc
  • Apache Spark

設定と要件

各ラボでは、新しい Google Cloud プロジェクトとリソースセットを一定時間無料で利用できます。

  1. Qwiklabs にシークレット ウィンドウでログインします。

  2. ラボのアクセス時間(例: 1:15:00)に注意し、時間内に完了できるようにしてください。
    一時停止機能はありません。必要な場合はやり直せますが、最初からになります。

  3. 準備ができたら、[ラボを開始] をクリックします。

  4. ラボの認証情報(ユーザー名パスワード)をメモしておきます。この情報は、Google Cloud Console にログインする際に使用します。

  5. [Google Console を開く] をクリックします。

  6. [別のアカウントを使用] をクリックし、このラボの認証情報をコピーしてプロンプトに貼り付けます。
    他の認証情報を使用すると、エラーが発生したり、料金の請求が発生したりします。

  7. 利用規約に同意し、再設定用のリソースページをスキップします。

Google Cloud Shell の有効化

Google Cloud Shell は、開発ツールと一緒に読み込まれる仮想マシンです。5 GB の永続ホーム ディレクトリが用意されており、Google Cloud で稼働します。

Google Cloud Shell を使用すると、コマンドラインで Google Cloud リソースにアクセスできます。

  1. Google Cloud コンソールで、右上のツールバーにある [Cloud Shell をアクティブにする] ボタンをクリックします。

    ハイライト表示された Cloud Shell アイコン

  2. [続行] をクリックします。

環境がプロビジョニングされ、接続されるまでしばらく待ちます。接続した時点で認証が完了しており、プロジェクトに各自のプロジェクト ID が設定されます。次に例を示します。

Cloud Shell ターミナルでハイライト表示されたプロジェクト ID

gcloud は Google Cloud のコマンドライン ツールです。このツールは、Cloud Shell にプリインストールされており、タブ補完がサポートされています。

  • 次のコマンドを使用すると、有効なアカウント名を一覧表示できます。
gcloud auth list

出力:

Credentialed accounts: - @.com (active)

出力例:

Credentialed accounts: - google1623327_student@qwiklabs.net
  • 次のコマンドを使用すると、プロジェクト ID を一覧表示できます。
gcloud config list project

出力:

[core] project =

出力例:

[core] project = qwiklabs-gcp-44776a13dea667a6 注: gcloud ドキュメントの全文については、 gcloud CLI の概要ガイド をご覧ください。

プロジェクトの権限を確認する

Google Cloud で作業を開始する前に、Identity and Access Management(IAM)内で適切な権限がプロジェクトに付与されていることを確認する必要があります。

  1. Google Cloud コンソールのナビゲーション メニューナビゲーション メニュー アイコン)で、[IAM と管理] > [IAM] を選択します。

  2. Compute Engine のデフォルトのサービス アカウント {project-number}-compute@developer.gserviceaccount.com が存在し、編集者のロールが割り当てられていることを確認します。アカウントの接頭辞はプロジェクト番号で、ナビゲーション メニュー > [Cloud の概要] > [ダッシュボード] から確認できます。

Compute Engine のデフォルトのサービス アカウント名と編集者のステータスがハイライト表示された [権限] タブページ

注: アカウントが IAM に存在しない場合やアカウントに編集者のロールがない場合は、以下の手順に沿って必要なロールを割り当てます。
  1. Google Cloud コンソールのナビゲーション メニューで、[Cloud の概要] > [ダッシュボード] をクリックします。
  2. プロジェクト番号(例: 729328892908)をコピーします。
  3. ナビゲーション メニューで、[IAM と管理] > [IAM] を選択します。
  4. ロールの表の上部で、[プリンシパル別に表示] の下にある [アクセスを許可] をクリックします。
  5. [新しいプリンシパル] に次のように入力します。
{project-number}-compute@developer.gserviceaccount.com
  1. {project-number} はプロジェクト番号に置き換えてください。
  2. [ロール] で、[Project](または [基本])> [編集者] を選択します。
  3. [保存] をクリックします。

シナリオ

既存の Spark ワークロードを Cloud Dataproc に移行した後、Google Cloud のネイティブ機能とサービスを使用するように、Spark コードを段階的に変更します。

タスク 1. リフト&シフト

既存の Spark ジョブを Cloud Dataproc に移行する

新しい Cloud Dataproc クラスタを作成した後、インポートした Jupyter ノートブックを実行します。このノートブックでは、クラスタのデフォルトのローカル Hadoop 分散ファイル システム(HDFS)を使用してソースデータを格納し、Spark を使用する Hadoop クラスタの場合と同様に、そのデータを処理します。これにより、Spark コードが含まれる Jupyter ノートブックなど、既存の分析ワークロードの多くが、Cloud Dataproc 環境に移行しても変更の必要がないことを確認することができます。

Cloud Dataproc クラスタを構成して開始する

  1. Google Cloud コンソールのナビゲーション メニューで、[分析] セクションの [Dataproc] をクリックします。

  2. [CREATE CLUSTER] をクリックします。

  3. [Compute Engine 上のクラスタ] の [作成] をクリックします。

  4. [クラスタ名] に「sparktodp」と入力します。

  5. リージョンを [] に設定し、ゾーンを [] に設定します。

  6. [バージョニング] セクションで、[変更] をクリックし、[2.1 (Debian 11, Hadoop 3.3, Spark 3.3)] を選択します。

このバージョンには、このラボで使用するサンプルコードで必要となる Python3 が含まれています。

  1. [選択] をクリックします。

  2. [コンポーネント] > [コンポーネント ゲートウェイ] セクションで、[コンポーネント ゲートウェイを有効にする] を選択します。

  3. [オプション コンポーネント] で [Jupyter Notebook] を選択します。

  4. 左側のリストの [クラスタの設定] の下の [ノードの構成(省略可)] をクリックします。

  5. [マネージャー ノード] で、[シリーズ] を [E2] に、[マシンタイプ] を [e2-standard-2(2 vCPU、8 GB メモリ)] に変更し、[プライマリ ディスク サイズ] を [30] に設定します。

  6. [ワーカーノード] で、[シリーズ] を [E2]、[マシンタイプ] を [e2-standard-2(2 vCPU、8 GB メモリ)] に変更し、[プライマリ ディスク サイズ] を [30] に設定します。

  7. [作成] をクリックします。

クラスタは数分で起動します。Cloud Dataproc クラスタが完全にデプロイされるまで待ってから次のステップに進んでください

ラボ用にソース リポジトリのクローンを作成する

Cloud Shell で、ラボ用に Git リポジトリのクローンを作成し、Cloud Dataproc によって Jupyter ノートブックのホーム ディレクトリとして使用される Cloud Storage バケットに、必要なノートブック ファイルをコピーします。

  1. ラボ用に Git リポジトリのクローンを作成するには、Cloud Shell で次のコマンドを入力します。
git -C ~ clone https://github.com/GoogleCloudPlatform/training-data-analyst
  1. Cloud Dataproc によって使用されるデフォルトの Cloud Storage バケットを見つけるには、Cloud Shell で次のコマンドを入力します。
export DP_STORAGE="gs://$(gcloud dataproc clusters describe sparktodp --region={{{project_0.default_region | REGION }}} --format=json | jq -r '.config.configBucket')"
  1. Jupyter 作業フォルダにサンプル ノートブックをコピーするには、Cloud Shell で次のコマンドを入力します。
gcloud storage cp ~/training-data-analyst/quests/sparktobq/*.ipynb $DP_STORAGE/notebooks/jupyter

Jupyter Notebook にログインする

クラスタが完全に起動したら、すぐにウェブ インターフェースに接続できます。更新ボタンをクリックして、このステージに至るまでに完全にデプロイされたかどうかを確認します。

  1. Dataproc の [クラスタ] ページで、クラスタの起動が完了するまで待ち、該当するクラスタの名前をクリックして [クラスタの詳細] ページを開きます。

  2. [ウェブ インターフェース] をクリックします。

  3. Jupyter のリンクをクリックして、ブラウザに新しい Jupyter のタブを開きます。

Jupyter のホームページが開きます。このページには Cloud Storage の /notebooks/jupyter ディレクトリの内容が表示され、このラボで使用するサンプルの Jupyter ノートブックが含まれるようになったことがわかります。

  1. [Files] タブで、GCS フォルダをクリックした後、01_spark.ipynb ノートブックをクリックして開きます。

  2. [Cell]、[Run All] を順にクリックして、ノートブック内のすべてのセルを実行します。

  3. ノートブックの一番上まで戻ります。各セルの実行が完了したら、その下に結果が出力されることを確認します。

一連のセルの処理に伴い、順を追って各セルのコードを確認することで、ノートブックによって行われていることを把握できます。特に、データがどこで保存され、どこから処理されるかに注意するようにしてください。

  • 最初のコードセルではソースデータ ファイルを取得します。このファイルは 1999 年の Knowledge, Discovery, and Data(KDD)会議の KDD Cup コンペティションから引用したものです。このデータはコンピュータ侵入検知のイベントに関連します。
!wget https://storage.googleapis.com/cloud-training/dataengineering/lab_assets/sparklab/kddcup.data_10_percent.gz
  • 2 番目のコードセルでは、ソースデータがデフォルト(ローカル)の Hadoop ファイル システムにコピーされます。
!hadoop fs -put kddcup* /
  • 3 番目のコードセルでは、コマンドによって、クラスタの HDFS ファイル システムのデフォルト ディレクトリの内容がリストされます。
!hadoop fs -ls /

データの読み取り

データは、gzip で圧縮された CSV ファイルです。Spark ではこれらのデータを textFile メソッドを使用して直接読み取り、各行をカンマで区切ることで解析できます。

Python Spark コードはセル In[4] から始まります。

  • このセルでは、Spark SQL が初期化され、Spark を使用してソースデータがテキストデータとして読み取られて、最初の 5 行が戻されます。
from pyspark.sql import SparkSession, SQLContext, Row spark = SparkSession.builder.appName("kdd").getOrCreate() sc = spark.sparkContext data_file = "hdfs:///kddcup.data_10_percent.gz" raw_rdd = sc.textFile(data_file).cache() raw_rdd.take(5)
  • セル In [5] では、各行が「,」という文字で区切られ、コード内に準備されたインライン スキーマを使用して解析されます。
csv_rdd = raw_rdd.map(lambda row: row.split(",")) parsed_rdd = csv_rdd.map(lambda r: Row( duration=int(r[0]), protocol_type=r[1], service=r[2], flag=r[3], src_bytes=int(r[4]), dst_bytes=int(r[5]), wrong_fragment=int(r[7]), urgent=int(r[8]), hot=int(r[9]), num_failed_logins=int(r[10]), num_compromised=int(r[12]), su_attempted=r[14], num_root=int(r[15]), num_file_creations=int(r[16]), label=r[-1] ) ) parsed_rdd.take(5)

Spark 分析

セル In [6] では、Spark SQL コンテキストが作成され、そのコンテキストを使用する Spark データフレームが、前のステージの解析済み入力データを使用して作成されます。

  1. 行データは、データフレームの .show() メソッド(選択されたフィールドの数を要約したビューを出力する)を使用して選択および表示できます。
sqlContext = SQLContext(sc) df = sqlContext.createDataFrame(parsed_rdd) connections_by_protocol = df.groupBy('protocol_type').count().orderBy('count', ascending=False) connections_by_protocol.show()

.show() メソッドによって、次のような出力テーブルが生成されます。

+-------------+------+ |protocol_type| count| +-------------+------+ | icmp|283602| | tcp|190065| | udp| 20354| +-------------+------+

SparkSQL を使用して、データフレームに格納された解析済みデータのクエリを実行することもできます。

  1. セル In [7] では、一時テーブル(connections)が登録されます。これはその後、後続の SparkSQL SQL クエリ ステートメント内で参照されます。
df.registerTempTable("connections") attack_stats = sqlContext.sql(""" SELECT protocol_type, CASE label WHEN 'normal.' THEN 'no attack' ELSE 'attack' END AS state, COUNT(*) as total_freq, ROUND(AVG(src_bytes), 2) as mean_src_bytes, ROUND(AVG(dst_bytes), 2) as mean_dst_bytes, ROUND(AVG(duration), 2) as mean_duration, SUM(num_failed_logins) as total_failed_logins, SUM(num_compromised) as total_compromised, SUM(num_file_creations) as total_file_creations, SUM(su_attempted) as total_root_attempts, SUM(num_root) as total_root_acceses FROM connections GROUP BY protocol_type, state ORDER BY 3 DESC """) attack_stats.show()

クエリが終了すると、この例(簡略化したもの)のような出力が表示されます。

+-------------+---------+----------+--------------+-- |protocol_type| state|total_freq|mean_src_bytes| +-------------+---------+----------+--------------+-- | icmp| attack| 282314| 932.14| | tcp| attack| 113252| 9880.38| | tcp|no attack| 76813| 1439.31| ... ... | udp| attack| 1177| 27.5| +-------------+---------+----------+--------------+--

このデータは棒グラフを使用して表示することもできます。

  1. 最後のセル In [8] では、単にデータを変数にダンプするのではなく、%matplotlib inline Jupyter マジック関数を使用して matplotlib をリダイレクトし、ノートブックにインラインで視覚的なグラフをレンダリングしています。このセルには、前のステップの attack_stats クエリを使用した棒グラフが表示されます。
%matplotlib inline ax = attack_stats.toPandas().plot.bar(x='protocol_type', subplots=True, figsize=(10,25))

ノートブックのすべてのセルが正常に実行されると、出力の最初の部分は、次のグラフのようになります。ノートブックを下までスクロールすれば、出力グラフ全体を参照できます。

棒グラフ

タスク 2. コンピューティングとストレージを分ける

HDFS ではなく Cloud Storage を使用するように Spark ジョブを変更する

元の「リフト&シフト」のサンプル ノートブックからコピーを作成し、ジョブのストレージ要件をコンピューティング要件から切り離します。このケースでは、Hadoop ファイル システム呼び出しを Cloud Storage 呼び出しに置き換えることのみが必要となります。具体的には、コード内の hdfs:// ストレージ参照を gs:// 参照に置き換え、必要に応じてフォルダ名を修正します。

最初に、Cloud Shell を使用して、新しい Cloud Storage バケットにソースデータのコピーを配置します。

  1. Cloud Shell で、ソースデータのための新しいストレージ バケットを作成します。
export PROJECT_ID=$(gcloud info --format='value(config.project)') gcloud storage buckets create gs://$PROJECT_ID
  1. Cloud Shell で、ソースデータをバケットにコピーします。
wget https://storage.googleapis.com/cloud-training/dataengineering/lab_assets/sparklab/kddcup.data_10_percent.gz gcloud storage cp kddcup.data_10_percent.gz gs://$PROJECT_ID/

最後のコマンドが完了し、新しいストレージ バケットにファイルがコピーされていることを確認します。

  1. ブラウザの Jupyter Notebook の 01_spark のタブに戻ります。

  2. [File] をクリックした後、[Make a Copy] を選択します。

  3. コピーが開いたら、01_spark-Copy1 というタイトルをクリックし、名前を De-couple-storage に変更します。

  4. Jupyter の 01_spark のタブを開きます。

  5. [File]、[Save and Checkpoint] の順にクリックして、ノートブックを保存します。

  6. [File]、[Close and Halt] の順にクリックして、ノートブックをシャットダウンします。

  • ノートブックを閉じるかどうかを確認するプロンプトが表示されたら、[Leave] か [Cancel] をクリックします。
  1. 必要に応じて、ブラウザの Jupyter Notebook の De-couple-storage のタブに戻ります。

データをダウンロードしてクラスタの内部的な HDFS ファイル システムにコピーするセルは必要なくなったため、最初にこれらのセルを削除します。

セルを削除するには、該当するセル内をクリックして選択した後、ノートブックのツールバーの cut selected cells アイコン(はさみ)をクリックします。

  1. 最初のコメントセルと、最初の 3 つのコードセル(In [1]In [2]In [3])を削除します。これにより、ノートブックの先頭は [Reading in data] になります。

次に、最初のセル(ノートブックを再実行していない場合はまだ In[4] という名前です)のコードを変更します。このセルではデータファイルのソースの場所を定義し、ソースデータを読み込みます。現在、セルには次のコードが含まれています。

from pyspark.sql import SparkSession, SQLContext, Row spark = SparkSession.builder.appName("kdd").getOrCreate() sc = spark.sparkContext data_file = "hdfs:///kddcup.data_10_percent.gz" raw_rdd = sc.textFile(data_file).cache() raw_rdd.take(5)
  1. セル In [4] の内容を次のコードに置き換えます。ここで加える変更は 1 つだけです。Cloud Storage バケット名を格納する変数を作成した後、data_file が、Cloud Storage にソースデータを格納するために使用していたバケットをポイントするようにします。
from pyspark.sql import SparkSession, SQLContext, Row gcs_bucket='[Your-Bucket-Name]' spark = SparkSession.builder.appName("kdd").getOrCreate() sc = spark.sparkContext data_file = "gs://"+gcs_bucket+"//kddcup.data_10_percent.gz" raw_rdd = sc.textFile(data_file).cache() raw_rdd.take(5)

コードを置き換えると、最初のセルは次のようになります。バケット名は実際のラボ プロジェクト ID になります。

gcs_bucket='[Your-Bucket-Name]'

  1. 先ほど更新したセルで、プレースホルダ [Your-Bucket-Name] を、このセクションの最初のステップで作成したストレージ バケットの名前に置き換えます。このバケットは、プロジェクト ID を名前にして作成しました。プロジェクト ID はこちらの画面の左側にある、Qwiklabs ラボのログイン情報パネルからコピーできます。すべてのプレースホルダ テキストを、角かっこ [] も含めて置き換えます。

  2. [Cell]、[Run All] を順にクリックして、ノートブック内のすべてのセルを実行します。

表示される出力は、内部クラスタ ストレージからファイルがロードされ、実行されたときとまったく同一になります。ストレージ ソース参照のポイント先を hdfs:// から gs:// に変更するだけで、ソースデータのファイルを Cloud Storage に移動できます。

タスク 3. Spark ジョブをデプロイする

ジョブ固有のクラスタで実行するために Spark ジョブを最適化する

次に、このノートブックと同じ機能を実行し、Cloud Dataproc ジョブとしてデプロイできるスタンドアロンの Python ファイルを作成します。そのためには、このノートブックのコピーの Python セルに、セルの内容をファイルに書き出すマジック コマンドを追加します。また、Python スクリプトが呼び出されたときにストレージ バケットの場所を設定する入力パラメータ ハンドラも追加することで、コードのポータビリティを高めます。

  1. Jupyter Notebook メニューの De-couple-storage で、[File] をクリックして [Make a Copy] を選択します。

  2. コピーが開いたら、De-couple-storage-Copy1 をクリックし、名前を PySpark-analysis-file に変更します。

  3. Jupyter の De-couple-storage のタブを開きます。

  4. [File]、[Save and Checkpoint] の順にクリックして、ノートブックを保存します。

  5. [File]、[Close and Halt] の順にクリックして、ノートブックをシャットダウンします。

  • ノートブックを閉じるかどうかを確認するプロンプトが表示されたら、[Leave] か [Cancel] をクリックします。
  1. 必要に応じて、ブラウザの Jupyter Notebook の PySpark-analysis-file のタブに戻ります。

  2. ノートブック上部の最初のセルをクリックします。

  3. [Insert] をクリックし、[Insert Cell Above] を選択します。

  4. 次のライブラリ インポートとパラメータ処理コードを新しい最初のコードセルに貼り付けます。

%%writefile spark_analysis.py import matplotlib matplotlib.use('agg') import argparse parser = argparse.ArgumentParser() parser.add_argument("--bucket", help="bucket for input and output") args = parser.parse_args() BUCKET = args.bucket

%%writefile spark_analysis.py という Jupyter マジック コマンドによって、スタンドアロンの Python スクリプトを含む新しい出力ファイルが作成されます。残りの各セルには、このマジック コマンド(「-a」付き)を追加し、各セルの内容をこのスタンドアロンのスクリプト ファイルに追加します。

さらに、このコードでは、matplotlib モジュールをインポートし、matplotlib.use('agg') を介して明示的にデフォルトのプロット バックエンドを設定します。これにより、プロットコードが Jupyter ノートブックの外部で実行されるようになります。

  1. 残りのセルについて、各 Python コードセルの先頭に %%writefile -a spark_analysis.py を挿入します。これらは In [x] というラベルの付いた 5 つのセルです。
%%writefile -a spark_analysis.py

たとえば、次のセルは、以下のようになります。

%%writefile -a spark_analysis.py from pyspark.sql import SparkSession, SQLContext, Row spark = SparkSession.builder.appName("kdd").getOrCreate() sc = spark.sparkContext data_file = "gs://{}/kddcup.data_10_percent.gz".format(BUCKET) raw_rdd = sc.textFile(data_file).cache() #raw_rdd.take(5)
  1. このステップを繰り返して、最後まで、各コードセルの先頭に %%writefile -a spark_analysis.py を挿入します。

  2. 最後のセル(Pandas 棒グラフがプロットされる)から、%matplotlib inline マジック コマンドを削除します。

注: このインラインの matplotlib Jupyter マジック ディレクティブを削除しておかないと、スクリプトを実行したときにエラーが発生します。
  1. ノートブックの最後のコードセルを選択した状態で、メニューバーで [Insert] をクリックし、[Insert Cell Below] を選択します。

  2. 新しいセルに次のコードを貼り付けます。

%%writefile -a spark_analysis.py ax[0].get_figure().savefig('report.png');
  1. ノートブックの末尾に新しいセルを追加し、次の内容を貼り付けます。
%%writefile -a spark_analysis.py import google.cloud.storage as gcs bucket = gcs.Client().get_bucket(BUCKET) for blob in bucket.list_blobs(prefix='sparktodp/'): blob.delete() bucket.blob('sparktodp/report.png').upload_from_filename('report.png')
  1. ノートブックの末尾に新しいセルを追加し、次の内容を貼り付けます。
%%writefile -a spark_analysis.py connections_by_protocol.write.format("csv").mode("overwrite").save( "gs://{}/sparktodp/connections_by_protocol".format(BUCKET))

テストの自動化

ここでは、ノートブック内からローカルコピーを呼び出し、このジョブの入力データを格納するストレージ バケット(作成済み)を識別するためのパラメータを受け渡すことによって、PySpark コードがファイルとして正常に実行されるかどうかをテストします。このバケットは、スクリプトによって生成されたレポートデータ ファイルを格納するためにも使用されます。

  1. PySpark-analysis-file ノートブックの末尾に、新しいセルを追加して、次の内容を貼り付けます。
BUCKET_list = !gcloud info --format='value(config.project)' BUCKET=BUCKET_list[0] print('Writing to {}'.format(BUCKET)) !/opt/conda/miniconda3/bin/python spark_analysis.py --bucket=$BUCKET

このコードは、ここまでに示した手順のとおり、ストレージ バケット名としてラボのプロジェクト ID を使用して Cloud Storage バケットを作成していることを前提としています。別の名前を使用した場合は、BUCKET 変数に該当する名前に設定するように、このコードを変更します。

  1. ノートブックの末尾に新しいセルを追加し、次の内容を貼り付けます。
!gcloud storage ls gs://$BUCKET/sparktodp/**

Cloud Storage バケットに保存されているスクリプト出力ファイルが一覧表示されます。

  1. Python ファイルのコピーを永続ストレージに保存するために、新しいセルを追加して、次の内容を貼り付けます。
!gcloud storage cp spark_analysis.py gs://$BUCKET/sparktodp/spark_analysis.py
  1. [Cell]、[Run All] を順にクリックして、ノートブック内のすべてのセルを実行します。

正常に Python ファイルが作成されて実行された場合は、最後の 2 つのセルについて、次のような出力が表示されます。これは、スクリプトの実行が完了し、このラボで前に作成した Cloud Storage バケットに、出力が保存されたことを示します。

出力

注: このステージでエラーが発生する最も一般的な原因は、In [7] の matplotlib ディレクティブを削除していないことです。ここまで、手順のとおりにすべてのセルを変更したことと、スキップしたステップがないことを再度確認してください。

Cloud Shell から分析ジョブを実行する

  1. Cloud Shell に再び切り替え、Python スクリプトを Cloud Storage からコピーして、Cloud Dataproc ジョブとして実行できるようにします。
gcloud storage cp gs://$PROJECT_ID/sparktodp/spark_analysis.py spark_analysis.py
  1. 起動スクリプトを作成します。
nano submit_onejob.sh
  1. スクリプトに次の内容を貼り付けます。
#!/bin/bash gcloud dataproc jobs submit pyspark \ --cluster sparktodp \ --region {{{project_0.default_region | REGION }}} \ spark_analysis.py \ -- --bucket=$1
  1. Ctrl+X キー、Y キー、Enter キーの順に押して保存して終了します。

  2. スクリプトを実行可能にします。

chmod +x submit_onejob.sh
  1. PySpark 分析ジョブを起動します。
./submit_onejob.sh $PROJECT_ID
  1. Cloud コンソールのタブで、[Dataproc] > [クラスタ] ページを開きます(まだ開いていない場合)。

  2. [ジョブ] をクリックします。

  3. リストされているジョブの名前をクリックします。進行状況は、ここでも、Cloud Shell からでも確認することができます。ジョブが正常に完了するまで待ちます。

  4. ストレージ バケットに移動し、出力レポート /sparktodp/report.png のタイムスタンプが更新されている、つまり、スタンドアロンのジョブが正常に完了したことを確認します。

このジョブによって入出力のデータ ストレージとして使用されたストレージ バケットは、プロジェクト ID が名前として使用されているものとなります。

  1. [Dataproc] > [クラスタ] ページに戻ります。

  2. sparktodp クラスタを選択し、[削除] をクリックします(この後は必要ないため)。

  3. [削除] をクリックします。

  4. ブラウザの Jupyter のタブをすべて閉じます。

ラボを終了する

ラボが完了したら、[ラボを終了] をクリックします。ラボで使用したリソースが Google Cloud Skills Boost から削除され、アカウントの情報も消去されます。

ラボの評価を求めるダイアログが表示されたら、星の数を選択してコメントを入力し、[送信] をクリックします。

星の数は、それぞれ次の評価を表します。

  • 星 1 つ = 非常に不満
  • 星 2 つ = 不満
  • 星 3 つ = どちらともいえない
  • 星 4 つ = 満足
  • 星 5 つ = 非常に満足

フィードバックを送信しない場合は、ダイアログ ボックスを閉じてください。

フィードバックやご提案の送信、修正が必要な箇所をご報告いただく際は、[サポート] タブをご利用ください。

Copyright 2020 Google LLC All rights reserved. Google および Google のロゴは Google LLC の商標です。その他すべての企業名および商品名はそれぞれ各社の商標または登録商標です。

始める前に

  1. ラボでは、Google Cloud プロジェクトとリソースを一定の時間利用します
  2. ラボには時間制限があり、一時停止機能はありません。ラボを終了した場合は、最初からやり直す必要があります。
  3. 画面左上の [ラボを開始] をクリックして開始します

シークレット ブラウジングを使用する

  1. ラボで使用するユーザー名パスワードをコピーします
  2. プライベート モードで [コンソールを開く] をクリックします

コンソールにログインする

    ラボの認証情報を使用して
  1. ログインします。他の認証情報を使用すると、エラーが発生したり、料金が発生したりする可能性があります。
  2. 利用規約に同意し、再設定用のリソースページをスキップします
  3. ラボを終了する場合や最初からやり直す場合を除き、[ラボを終了] はクリックしないでください。クリックすると、作業内容がクリアされ、プロジェクトが削除されます

このコンテンツは現在ご利用いただけません

利用可能になりましたら、メールでお知らせいたします

ありがとうございます。

利用可能になりましたら、メールでご連絡いたします

1 回に 1 つのラボ

既存のラボをすべて終了して、このラボを開始することを確認してください

シークレット ブラウジングを使用してラボを実行する

このラボの実行には、シークレット モードまたはシークレット ブラウジング ウィンドウを使用してください。これにより、個人アカウントと受講者アカウントの競合を防ぎ、個人アカウントに追加料金が発生することを防ぎます。