arrow_back

Using Beam ML to catch Toxicity in Gaming

Sign in Join
Get access to 700+ labs and courses

Using Beam ML to catch Toxicity in Gaming

Lab 1 hour 30 minutes universal_currency_alt 1 Credit show_chart Intermediate
info This lab may incorporate AI tools to support your learning.
Get access to 700+ labs and courses

Overview

Toxicity is a real problem in the gaming industry. In this lab, we build a real-time pipeline that catches offenders (and be the real hero, in real-time!).

Apache Beam is an open-source, unified programming model for batch and streaming data processing pipelines that simplifies large-scale data processing dynamics. Google Cloud Dataflow is a managed service for running a wide variety of data processing patterns with Beam.

Real-time intelligence lets you act on your data instantaneously. With Beam ML, you can use your model to run inference and predictions, giving you a result that you can work with.

Toxicity can happen in many ways, and chat is one of them. This pipeline is trained on chat data to identify toxic messages. The neat thing about this setup is that you can apply the same steps to different applications, such as fraud, supply chain, and so on. Just swap out the model, and off you go.

What you'll learn

  • How to build an Apache Beam Python pipeline
  • How to infer a model with Beam's ML model handler
  • How to compare the results of another model within the same Beam pipeline

Prerequisites

This lab is at an intermediate level. You should be familiar with Python and the Beam model; however, if needed, you can reference the guide and fully written code samples along the way.

Setup

For each lab, you get a new Google Cloud project and set of resources for a fixed time at no cost.

  1. Sign in to Qwiklabs using an incognito window.

  2. Note the lab's access time (for example, 1:15:00), and make sure you can finish within that time.
    There is no pause feature. You can restart if needed, but you have to start at the beginning.

  3. When ready, click Start lab.

  4. Note your lab credentials (Username and Password). You will use them to sign in to the Google Cloud Console.

  5. Click Open Google Console.

  6. Click Use another account and copy/paste credentials for this lab into the prompts.
    If you use other credentials, you'll receive errors or incur charges.

  7. Accept the terms and skip the recovery resource page.

Activate Cloud Shell

Cloud Shell is a virtual machine that contains development tools. It offers a persistent 5-GB home directory and runs on Google Cloud. Cloud Shell provides command-line access to your Google Cloud resources. gcloud is the command-line tool for Google Cloud. It comes pre-installed on Cloud Shell and supports tab completion.

  1. Click the Activate Cloud Shell button (Activate Cloud Shell icon) at the top right of the console.

  2. Click Continue.
    It takes a few moments to provision and connect to the environment. When you are connected, you are also authenticated, and the project is set to your PROJECT_ID.

Sample commands

  • List the active account name:
gcloud auth list

(Output)

Credentialed accounts: - <myaccount>@<mydomain>.com (active)

(Example output)

Credentialed accounts: - google1623327_student@qwiklabs.net
  • List the project ID:
gcloud config list project

(Output)

[core] project = <project_ID>

(Example output)

[core] project = qwiklabs-gcp-44776a13dea667a6 Note: Full documentation of gcloud is available in the gcloud CLI overview guide.

Task 1: Clone pipeline code and exercise files

  1. In Cloud Shell, run the commands below to clone the full pipeline code that is available on GitHub:
git clone https://github.com/GoogleCloudPlatform/training-data-analyst/
  1. Review the pipeline code files
cd training-data-analyst/quests/getting_started_apache_beam/beam_ml_toxicity_in_gaming ls

The full code is in part2.py. We have broken it down into part1.py and part2.py as per the lab tasks.

  1. Navigate to the exercise files.
cd exercises

You will see the template versions of the part1.py and part2.py code, which means there is missing code (labeled with # TODO) that you need to build by following the steps in the rest of this lab.

If you get stuck at any point, feel free to reference the complete code as indicated in step 2 above.

Task 2: Setup you environment

  1. In Cloud Shell, run the following command to set an environment variable for your project ID.
PROJECT_ID=$(gcloud config list --format=json | jq -r .core.project)
  1. In Cloud Shell, run the following commands to copy the two models into the current directory.
gsutil cp -r gs://toxicity-with-beam-ml-run-inference/gaming_trained/ . gsutil cp -r gs://toxicity-with-beam-ml-run-inference/movie_trained/ .
  1. Install the Beam and TensorFlow libraries. The tensorflow_text library is necessary for the particular BERT model we use in this lab.
pip install apache-beam[gcp] pip install tensorflow pip install tensorflow_text
  1. In Cloud Shell, run the below commands to create two Pub/Sub topics: tox-input and tox-output..
gcloud pubsub topics create tox-input gcloud pubsub topics create tox-output You need an input topic to ingest messages that will be evaluated. You also need an output topic to send messages that the model predicts as toxic.
  1. In Cloud Shell, run the following command in the terminal to create your BigQuery dataset. Later, your pipeline code will write results into this dataset.
bq --location=US mk --dataset demo

Click Check my progress to verify the objective. Please create a Pub/sub topics and bigquery datset

Task 3: Create a Beam Python pipeline with Beam ML

In this task, you create a Python Beam Pipeline that reads in a message submitted to Pub/Sub and makes a prediction on whether the message is toxic or not.

  1. Click on Open Editor on the top of your Cloud Shell window to launch the Cloud Shell Editor. You will use this editor to create your pipeline code.

  2. Once the editor is launched, use the file explorer navigation on the left to navigate to training_data_analyst > quests > getting_started_apache_beam > beam_ml_toxicity_in_gaming > exercises

  3. Click on the file part1.py to open it in the editor. Now you are ready to add code to the file to build your pipeline

Note: The remaining steps in this task will guide you on what to add to the code to complete your pipeline. If you get stuck at any point, feel free to reference the full code in training_data_analyst/quests/getting_started_apache_beam/beam_ml_toxicity_in_gaming/part1.py
  1. Create a Beam Python pipeline object that reads from Google Cloud Pub/Sub input topic.
beam.io.ReadFromPubSub(topic=input_topic,with_attributes=True)

For simplicity, we use the built-in source, ReadFromPubSub.

If you use the topic option, you don't need to create a subscription, because the built-in source creates the necessary subscription at run time. Setting the with_attributes option to True lets you use attributes to enhance your messages.
  1. Window the incoming element

We want to join the element in the second half of the lab so we need to apply a window to the incoming message as this is a streaming pipeline.

Again, for simplicity, we use a fixed window.

| "Window data" >> beam.WindowInto(beam.window.FixedWindows(0.1)) The window here is only necessary for joining the element back together in the A/B test portion of the piperline.
  1. Tag your element with the key

We want to tag the element with a key so it can be identified as needed. We use the attribute in the example.

Essentially, you want to output a tuple (key, element).

(element.attributes["userid"],(element.data).decode('UTF-8'))

In the example, we decode the string. If you plan to use it as a string, you also need to decode the string; otherwise, leave it in the binary form.

  1. At this point, save your DAG into a variable so that you can use it to easily and intuitively fork the pipeline.
read_from_pubsub = ( p | ... )
  1. Create the model handler

You need to create a model handler to instantiate your model. We use a KeyedModelHandler, because we have a tuple with a key.

The KeyedModelHandler automatically helps you handle the key, so you don't need to worry about parsing it.

gaming_model_handler = KeyedModelHandler(TFModelHandlerTensor(gaming_model_location)) You can either hardcode the model path (*./gaming_trained*), or provide a variable that you pass in as an argument later.
  1. Submit the input to the model for a result

Submit your message to the model for your prediction. There's really nothing extra that you need to do here as long as your model takes in the expected input.

gaming_inference = ( read_from_pubsub | "Perform gaming inference" >> RunInference(gaming_model_handler) ) This step saves the step of the pipeline in a variable. We need this variable when we do the A/B test in a later task.
  1. Parse your results from the prediction.

Parse the prediction output from the model using the PredictionResult object.

tox_level = element[1][1].numpy().item() if tox_level > -0.5: yield ("not",element) else: yield ("nice",element)

In this example, we use a simple method to illustrate that the results are simply objects that you can manipulate and use as necessary. Assign an arbitrary value to determine if a message is toxic or not. Tag the results with a not for a not-nice message and nice for a nice message.

The toxicity value is heavily dependent on your model. Our value was chosen for demonstration purposes only.
  1. Do a simple MAP and print the results of the prediction to the standard output to see if the model worked.
nice_or_not | beam.Map(print) This method isn't the recommended way to log messages, but we use it for brevity.
  1. Filter your data on the result key

We want to only submit the toxic messages to the output topic, for possibly additional action.

Use the Beam primitive Filter to get all of the not keys.

Remember, these messages are the ones that are potentially mean or toxic.
  1. Submit the messages to Pub/Sub for further action

Submit the flagged messages to Pub/Sub.

In this exercise, we don't build a consumer-to-action on these messages; however, you can build a Cloud Run or Cloud Function to receive the message and perform an action, like muting the misbehaving offender.

Task 4: Run your pipeline

  1. Use the Direct Runner to run your pipeline.

You can also provide arguments to parse, or you can code them into your pipeline.

parser = argparse.ArgumentParser() parser.add_argument( '--project_id', dest='project_id', required=True, help=('project id'))
  1. Run the pipeline with the following command in the terminal in cloud shell. If you receive an error, please ensure you are in the correct directory (~/devrel-demos/data-analytics/beam_ml_toxicity_in_gaming). Note that you should not expect an output after this step.:
python part1.py --project_id $PROJECT_ID \ --gaming "./gaming_trained" \ --movie "./movie_trained" \ --streaming \ --pickle_library=cloudpickle
  1. Submit two messages to test your pipeline

Use the + sign at the top of the console to open another terminal. In the new terminal, submit the messages.

a6edfe5b7f8c78fb.png PROJECT_ID=$(gcloud config list --format=json | jq -r .core.project) gcloud pubsub topics publish projects/$PROJECT_ID/topics/tox-input --message="that was awesome" --attribute="userid=3" gcloud pubsub topics publish projects/$PROJECT_ID/topics/tox-input --message="You are all a bunch of losers" --attribute="userid=3" The model may be inaccurate in its predictions but you can always replace it with a better performing model.
  1. In the original terminal (the one running the pipeline), stop your program. (Ctrl-C to send an interrupt to it; twice should return the CLI to you).
Note: Messages will expire in 10 seconds. So if you are not getting your progress score, please send the messages again.

Click Check my progress to verify the objective. Create a Beam Python Pipeline with Beam ML and Run the Pipeline.

Task 5: Add another model for A/B testing

Open the part2.py file in your exercises folder and use it as a starting point to build the Python pipeline. The individual steps are labeled in the comments. There are some steps with multiple snippets of code, ensure you reference them all.

In this task, you add another model to compare model results. You can test multiple models and see which is best suited for your needs. You also send the results to BigQuery, where you can compare them.

  1. Create the model handler

Create another model handler, like we did earlier in the lab.

Use the KeyedModelHandler to either hardcode the path (./movie_trained) or provide a variable.

  1. Submit the input into the model for a result

Submit the input to the new model.

Remember that you forked the pipeline, so reuse that object to continue your pipeline.

movie_inference = ( read_from_pubsub | "Perform movie inference" >> RunInference(movie_model_handler) )
  1. Join your results together

We want to compare the results eventually, so we need to combine the results.

To join your results together, use a CoGroupByKey. You need to collate the results of the two PCollections (from movie and gaming) together.

  1. Transform your joined results into a string

In a real-world scenario, you want to parse the results and store them properly with a schema.

To make this lab shorter, we're going to take the entire joined result, cast it into a giant string, and store it in BigQuery.

  1. Join your results together

Write your results to BigQuery in the code, you can use the built-in IO WriteToBigQuery.

Use a write method, such as STREAMING_INSERTS, to write into BigQuery.

Task 6: Run your pipeline

  1. Use the Direct Runner to run your pipeline. First, in the cloud shell, run the following command to set an environment variable for your project ID.
PROJECT_ID=$(gcloud config list --format=json | jq -r .core.project)

Then, run the pipeline with the following command in the terminal in cloud shell. If you receive an error, please ensure you are in the correct directory (~/devrel-demos/data-analytics/beam_ml_toxicity_in_gaming). Note that you should not expect an output after this step.:

python part2.py --project_id $PROJECT_ID \ --gaming "./gaming_trained" \ --movie "./movie_trained" \ --streaming \ --pickle_library=cloudpickle
  1. Submit two messages to test your pipeline

You can test the pipeline by submitting some more messages.

We reuse the messages from above.

Use the + sign at the top of the console to open another terminal. In the new terminal, submit the messages.

PROJECT_ID=$(gcloud config list --format=json | jq -r .core.project) gcloud pubsub topics publish projects/$PROJECT_ID/topics/tox-input --message="that was awesome" --attribute="userid=3" gcloud pubsub topics publish projects/$PROJECT_ID/topics/tox-input --message="You are all a bunch of losers" --attribute="userid=3"
  1. Check the comparisons in BigQuery

Use the following query to pull the data from BigQuery and check the comparison. Note that you may have to wait a few minutes for the BigQuery table to populate.

bq query --use_legacy_sql=false "SELECT * FROM demo.tox;"

Click Check my progress to verify the objective. Add another Model for A/B testing and Run the Pipeline and Check the Comparisions in Bigquery.

End your lab

When you have completed your lab, click End Lab. Qwiklabs removes the resources you’ve used and cleans the account for you.

You will be given an opportunity to rate the lab experience. Select the applicable number of stars, type a comment, and then click Submit.

The number of stars indicates the following:

  • 1 star = Very dissatisfied
  • 2 stars = Dissatisfied
  • 3 stars = Neutral
  • 4 stars = Satisfied
  • 5 stars = Very satisfied

You can close the dialog box if you don't want to provide feedback.

For feedback, suggestions, or corrections, please use the Support tab.

Congratulations!

Congratulations, you've now learned how to use ML in Beam.

You can now customize your pipelines to best suit your needs.

Copyright 2022 Google LLC All rights reserved. Google and the Google logo are trademarks of Google LLC. All other company and product names may be trademarks of the respective companies with which they are associated.

Before you begin

  1. Labs create a Google Cloud project and resources for a fixed time
  2. Labs have a time limit and no pause feature. If you end the lab, you'll have to restart from the beginning.
  3. On the top left of your screen, click Start lab to begin

Use private browsing

  1. Copy the provided Username and Password for the lab
  2. Click Open console in private mode

Sign in to the Console

  1. Sign in using your lab credentials. Using other credentials might cause errors or incur charges.
  2. Accept the terms, and skip the recovery resource page
  3. Don't click End lab unless you've finished the lab or want to restart it, as it will clear your work and remove the project

This content is not currently available

We will notify you via email when it becomes available

Great!

We will contact you via email if it becomes available

One lab at a time

Confirm to end all existing labs and start this one

Use private browsing to run the lab

Use an Incognito or private browser window to run this lab. This prevents any conflicts between your personal account and the Student account, which may cause extra charges incurred to your personal account.