Bayes Classifier on Dataproc

Join Sign in

Bayes Classifier on Dataproc

1 hour 30 minutes 9 Credits


Google Cloud self-paced labs logo


In this lab, you explore how to create a two-variable Bayesian model to look at whether to cancel a meeting based on the likely arrival delay of a flight. You quantize using two variables, create a conditional probability lookup table, and examine the on-time arrival percentage in each bin. You carry out the quantization using histogram equalization and on-time arrival percentage computations in Spark.

Dataproc is a managed Spark and Hadoop service that lets you take advantage of open source data tools for batch processing, querying, streaming, and machine learning. Dataproc automation helps you create clusters quickly, manage them easily, and save money by turning clusters off when you don't need them. With less time and money spent on administration, you can focus on your jobs and your data.

This lab uses a set of code samples and scripts developed for Data Science on the Google Cloud Platform, 2nd Edition from O'Reilly Media, Inc.

What you'll learn

In this lab, you'll use Dataproc to do the following:

  • Create a Dataproc cluster

  • Quantization using Spark SQL

  • Create a Bayes classification model

  • Evaluate the model

Setup and requirements

Before you click the Start Lab button

Read these instructions. Labs are timed and you cannot pause them. The timer, which starts when you click Start Lab, shows how long Google Cloud resources will be made available to you.

This hands-on lab lets you do the lab activities yourself in a real cloud environment, not in a simulation or demo environment. It does so by giving you new, temporary credentials that you use to sign in and access Google Cloud for the duration of the lab.

To complete this lab, you need:

  • Access to a standard internet browser (Chrome browser recommended).
Note: Use an Incognito or private browser window to run this lab. This prevents any conflicts between your personal account and the Student account, which may cause extra charges incurred to your personal account.
  • Time to complete the lab---remember, once you start, you cannot pause a lab.
Note: If you already have your own personal Google Cloud account or project, do not use it for this lab to avoid extra charges to your account.

How to start your lab and sign in to the Google Cloud Console

  1. Click the Start Lab button. If you need to pay for the lab, a pop-up opens for you to select your payment method. On the left is the Lab Details panel with the following:

    • The Open Google Console button
    • Time remaining
    • The temporary credentials that you must use for this lab
    • Other information, if needed, to step through this lab
  2. Click Open Google Console. The lab spins up resources, and then opens another tab that shows the Sign in page.

    Tip: Arrange the tabs in separate windows, side-by-side.

    Note: If you see the Choose an account dialog, click Use Another Account.
  3. If necessary, copy the Username from the Lab Details panel and paste it into the Sign in dialog. Click Next.

  4. Copy the Password from the Lab Details panel and paste it into the Welcome dialog. Click Next.

    Important: You must use the credentials from the left panel. Do not use your Google Cloud Skills Boost credentials. Note: Using your own Google Cloud account for this lab may incur extra charges.
  5. Click through the subsequent pages:

    • Accept the terms and conditions.
    • Do not add recovery options or two-factor authentication (because this is a temporary account).
    • Do not sign up for free trials.

After a few moments, the Cloud Console opens in this tab.

Note: You can view the menu with a list of Google Cloud Products and Services by clicking the Navigation menu at the top-left. Navigation menu icon

Task 1. Create a Dataproc cluster

In this section you create a Dataproc cluster on which you run the Spark scripts.

  1. In the Cloud Console, on the Navigation menu (Navigation menu icon), click Compute Engine > VM instances.

  2. Click the SSH button inline with startup-vm VM to launch a terminal and connect.

  3. Click Connect to confirm the VM launch.

  4. In the terminal window, run the following command on startup-vm to clone the data-science-on-gcp repository, and navigate to the directory 06_dataproc:

git clone cd ~/data-science-on-gcp/06_dataproc
  1. Set the project and bucket variables using the following code:
export PROJECT_ID=$(gcloud info --format='value(config.project)') export BUCKET_NAME=$PROJECT_ID-dsongcp
  1. Before using the file to create the Dataproc cluster, view the file using the cat command:
cat ~/data-science-on-gcp/06_dataproc/
  1. Create a Dataproc cluster to run jobs on, specifying the name of your bucket and the region that the bucket is in.
./ $BUCKET_NAME "{{{project_0.startup_script.project_region}}}" Note: If you perform this outside this lab, the compute zone and the bucket must be in the same region to avoid network egress charges. Create Cloud Dataproc cluster

Task 2. Quantization using Spark SQL

You can use one variable in your dataset departure delay to make predictions of the arrival delay of a flight. However, a second variable would make predictions more accurate. As the longer the flight, the more likely that small delays in departure can be made up in the air, the second variable would be the distance to be traveled.

The statistical model you build in this lab uses two variables — the departure delay and the distance to be traveled.

JupyterLab on Dataproc

The Jupyter notebook provides a Python kernel to run Spark code and a PySpark kernel. The Jupyter component in Dataproc is a Web-based notebook for interactive data analytics and supports the JupyterLab Web UI. Jupyter notebooks are widely used for exploratory data analysis and building machine learning models as they allow you to interactively run your code and immediately see your results.

As developing the Bayesian classification from scratch requires being able to interactively carry out development, you use Jupyter notebooks for this lab.

To launch notebook:

  1. In the Cloud Console, on the Navigation menu, click Dataproc.

  2. In the Cluster list, click on the cluster name to view cluster details.

  3. Click the Web Interfaces tab and then click JupyterLab.

  4. In the Launcher dialog, click the Python 3 tile under Notebook.

Set up environment variables

In this section you set up environment variables (for example, PROJECT, BUCKET and REGION) inside the notebook session. With this notebook you interact with the Dataproc cluster created in Task 1.

  1. Set up PROJECT, BUCKET and REGION using the following command in the notebook cell.
PROJECT=!gcloud config get-value project PROJECT=PROJECT[0] import os BUCKET = '{}-dsongcp'.format(PROJECT) REGION = "{{{project_0.startup_script.project_region}}}" os.environ['BUCKET'] = BUCKET
  1. Run the cell by either pressing Shift + Enter, or clicking the triangle on the Notebook top menu to Run selected cells and advance.
Note: After pasting commands into the Jupyter notebook cell, you run the cell to execute the command and then advance to the next cell.

Exploration using BigQuery

  1. First import the basic data science and Google BigQuery API client library followed by creating a BigQuery client object bq:
import matplotlib.pyplot as plt import seaborn as sns import pandas as pd import numpy as np import as bigquery bq = bigquery.Client()
  1. Enter the code below in new cell and run the cell:
sql = """ SELECT DISTANCE, DEP_DELAY FROM dsongcp.flights_tzcorr WHERE RAND() < 0.001 AND dep_delay > -20 AND dep_delay < 30 AND distance < 2000 """ df = bq.query(sql).to_dataframe()

The query samples the full dataset, pulling in 1/1,000 of the flights_tzcorr table distance and departure delay fields (that lie within reasonable ranges) into a Pandas dataframe.

  1. Use the seaborn library to draw a "hex" kind of plot using jointplot:
sns.set_style("whitegrid") g = sns.jointplot(x=df['DISTANCE'], y=df['DEP_DELAY'], kind="hex", height=10, joint_kws={'gridsize':20})

The distribution plots at the top and right of the center panel of the graph show how the distance and departure delay values are distributed.

Start a Spark session

  1. Type and run the following code in a new cell to Create a Spark session:
from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .appName("Bayes classification using Spark") \ .getOrCreate()
  1. Read the time-corrected JSON files from the Google cloud storage bucket:
inputs = 'gs://{}/flights/tzcorr/all_flights-*'.format(BUCKET) flights =
  1. Employ SQL on the dataframe by creating a temporary view (it is available only within this Spark session):
  1. Employ SQL to query the flights view, for example by using this command:
results = spark.sql('SELECT COUNT(*) FROM flights WHERE dep_delay > -20 AND CAST(distance AS FLOAT) < 2000')

Restrict to train days

  1. Create a CSV file of the training days Google BigQuery table and save data to cloud storage bucket:
sql = """ SELECT * FROM dsongcp.trainday """ df = bq.query(sql).to_dataframe() df.to_csv('trainday.csv', index=False) %%bash gsutil cp trainday.csv gs://${BUCKET}/flights/trainday.csv
  1. Create the traindays dataframe from the CSV file trainday.csv using the following code:
from pyspark.sql.types import StructType, StructField, StringType, BooleanType schema = StructType([ StructField('FL_DATE', StringType(), True), StructField('is_train_day', BooleanType(), True) ]) traindays = \ .option("header", "true") \ .option("inferSchema", "true") \ .csv('gs://{}/flights/trainday.csv'.format(BUCKET)) traindays.createOrReplaceTempView('traindays')
  1. Now restrict the flights dataframe to contain only training days using an SQL join operation:
statement = """ SELECT f.FL_DATE AS date, CAST(distance AS FLOAT) AS distance, dep_delay, IF(arr_delay < 15, 1, 0) AS ontime FROM flights f JOIN traindays t ON f.FL_DATE == t.FL_DATE WHERE t.is_train_day AND f.dep_delay IS NOT NULL ORDER BY f.dep_delay DESC """ flights = spark.sql(statement)
  1. Create a hexbin plot using Spark (repeat of what you did in BigQuery, except that you now restrict to train days only):
df = flights[(flights['distance'] < 2000) & (flights['dep_delay'] > -20) & (flights['dep_delay'] < 30)] pdf = df.sample(False, 0.02, 20).toPandas() # to 100,000 rows approx on complete dataset g = sns.jointplot(x=pdf['distance'], y=pdf['dep_delay'], kind="hex", height=10, joint_kws={'gridsize':20})
  1. Finding thresholds that make the two quantized variables uniformly distributed is straightforward using the approximate quantiles method:
distthresh = flights.approxQuantile('distance', list(np.arange(0, 1.0, 0.2)), 0.02) distthresh[-1] = float('inf') print(distthresh)
  1. You can similarly quantize the departure delay thresholds into equal boundaries:
delaythresh = flights.approxQuantile('dep_delay', list(np.arange(0, 1.0, 0.2)), 0.05) delaythresh[-1] = float('inf') print(delaythresh)

Task 3. Bayes classification

You have the quantization thresholds, you now need to determine the recommendation (whether to cancel the meeting) for each bin based on whether 70% of flights in that bin are on time or not.

  1. Find the flights that belong to the mth distance bin and nth delay bin by slicing the full set of flights:
import pyspark.sql.functions as F import pandas as pd df = pd.DataFrame(columns=['dist_thresh', 'delay_thresh', 'frac_ontime']) for m in range(0, 2): for n in range(0, len(delaythresh)-1): bdf = flights[(flights['distance'] >= distthresh[m]) & (flights['distance'] < distthresh[m+1]) & (flights['dep_delay'] >= delaythresh[n]) & (flights['dep_delay'] < delaythresh[n+1])] ontime_frac = bdf.agg(F.sum('ontime')).collect()[0][0] / bdf.agg(F.count('ontime')).collect()[0][0] print (m, n, ontime_frac) df = df.append({ 'dist_thresh': distthresh[m], 'delay_thresh': delaythresh[n], 'frac_ontime': ontime_frac }, ignore_index=True)

The ontime fraction is nearly 100% for all the delay bins except the largest value for n. This makes perfect sense because only the last departure delay bin has any delayed flights.

Here, you get close to the 70% threshold only on the last bin. You have to fix this – one way to do so is to hand-select the departure delay bins. Because you previously looked at thresholding the departure delay, you know that the interesting range is between 10 and 20 minutes and that departure delays are reported in integer minutes. So, you try delay variables of 10, 11, 12, …, 20 minutes.

  1. Fine-tune the delay threshold around the decision boundary:
delaythresh = range(10, 20) df = pd.DataFrame(columns=['dist_thresh', 'delay_thresh', 'frac_ontime']) for m in range(0, len(distthresh)-1): for n in range(0, len(delaythresh)-1): bdf = flights[(flights['distance'] >= distthresh[m]) & (flights['distance'] < distthresh[m+1]) & (flights['dep_delay'] >= delaythresh[n]) & (flights['dep_delay'] < delaythresh[n+1])] ontime_frac = bdf.agg(F.sum('ontime')).collect()[0][0] / bdf.agg(F.count('ontime')).collect()[0][0] print (m, n, ontime_frac) df = df.append({ 'dist_thresh': distthresh[m], 'delay_thresh': delaythresh[n], 'frac_ontime': ontime_frac }, ignore_index=True) Note: It may take 10-12 minutes to complete.
  1. To find the delay threshold for each distance threshold where the value is closest to the 0.70 decision boundary, run the following code:
df['score'] = abs(df['frac_ontime'] - 0.7) bayes = df.sort_values(['score']).groupby('dist_thresh').head(1).sort_values('dist_thresh') print(bayes)

If the departure delay is greater than the threshold corresponding to how far the flight is, you cancel the meeting because you expect the flight to be late.

  1. Write out the table bayes as a CSV file to Google cloud storage bucket:
bayes.to_csv('gs://{}/flights/bayes.csv'.format(BUCKET), index=False) !gsutil cat gs://{BUCKET}/flights/bayes.csv Create Bayes classification model

Task 4. Evaluate the model

To evaluate the model you created, look at the flights data that was not used in creating the model.

  • Enter the following code into a new cell and run the cell.
distthresh[-1] = 100000 for m in range(0, len(distthresh)-1): statement = """ SELECT '{0:.0f}-{1:.0f} miles' AS bin, ROUND(SUM(IF(dep_delay < {2:f} AND arr_delay < 15, 1, 0))/COUNT(*), 2) AS correct_nocancel, ROUND(SUM(IF(dep_delay >= {2:f} AND arr_delay < 15, 1, 0))/COUNT(*), 2) AS false_positive, ROUND(SUM(IF(dep_delay < {2:f} AND arr_delay >= 15, 1, 0))/COUNT(*), 2) AS false_negative, ROUND(SUM(IF(dep_delay >= {2:f} AND arr_delay >= 15, 1, 0))/COUNT(*), 2) AS correct_cancel, COUNT(*) AS total_flights FROM flights f JOIN traindays t ON f.FL_DATE == t.FL_DATE WHERE t.is_train_day == 'False' AND f.distance >= {0:f} AND f.distance < {1:f} """.format( distthresh[m], distthresh[m+1], bayes[ bayes['dist_thresh'] == distthresh[m] ]['delay_thresh'].values[0] ) eval_flights = spark.sql(statement)

Task 5. Delete the dataproc cluster

You created and evaluated your model. Now clean up by deleting the Dataproc cluster.

  1. Return to the startup-vm terminal and delete the dataproc cluster using the following command: and enter y to continue.
cd ~/data-science-on-gcp/06_dataproc/ ./ "{{{project_0.startup_script.project_region}}}"
  1. Enter Y when prompted to confirm.


You created a Bayesian model to predict the likely arrival delay of a flight. You did this through an integrated workflow that involved BigQuery and Spark SQL.

Finish your quest

This self-paced lab is part of the Data Science on Google Cloud: Machine Learning quest.A quest is a series of related labs that form a learning path. Completing a quest earns you a badge to recognize your achievement. You can make your badge or badges public and link to them in your online resume or social media account. Enroll in this quest and get immediate completion credit if you've taken this lab. See the Google Cloud Skills Boost catalog to see all available quests.

Take your next lab

Continue your Quest with Machine Learning with Spark on Google Cloud Dataproc, or check out these suggestions:

Next steps / Learn more

The source of this lab:

Google Cloud training and certification

...helps you make the most of Google Cloud technologies. Our classes include technical skills and best practices to help you get up to speed quickly and continue your learning journey. We offer fundamental to advanced level training, with on-demand, live, and virtual options to suit your busy schedule. Certifications help you validate and prove your skill and expertise in Google Cloud technologies.

Manual Last Updated August 16, 2022

Lab Last Tested March 31, 2022

Copyright 2023 Google LLC All rights reserved. Google and the Google logo are trademarks of Google LLC. All other company and product names may be trademarks of the respective companies with which they are associated.