Toxicity is a real problem in the gaming industry. In this lab, we build a real-time pipeline that catches offenders (and be the real hero, in real-time!).
Apache Beam is an open-source, unified programming model for batch and streaming data processing pipelines that simplifies large-scale data processing dynamics. Google Cloud Dataflow is a managed service for running a wide variety of data processing patterns with Beam.
Real-time intelligence lets you act on your data instantaneously. With Beam ML, you can use your model to run inference and predictions, giving you a result that you can work with.
Toxicity can happen in many ways, and chat is one of them. This pipeline is trained on chat data to identify toxic messages. The neat thing about this setup is that you can apply the same steps to different applications, such as fraud, supply chain, and so on. Just swap out the model, and off you go.
What you'll learn
How to build an Apache Beam Python pipeline
How to infer a model with Beam's ML model handler
How to compare the results of another model within the same Beam pipeline
Prerequisites
This lab is at an intermediate level. You should be familiar with Python and the Beam model; however, if needed, you can reference the guide and fully written code samples along the way.
Setup
For each lab, you get a new Google Cloud project and set of resources for a fixed time at no cost.
Sign in to Qwiklabs using an incognito window.
Note the lab's access time (for example, 1:15:00), and make sure you can finish within that time.
There is no pause feature. You can restart if needed, but you have to start at the beginning.
When ready, click Start lab.
Note your lab credentials (Username and Password). You will use them to sign in to the Google Cloud Console.
Click Open Google Console.
Click Use another account and copy/paste credentials for this lab into the prompts.
If you use other credentials, you'll receive errors or incur charges.
Accept the terms and skip the recovery resource page.
Activate Cloud Shell
Cloud Shell is a virtual machine that contains development tools. It offers a persistent 5-GB home directory and runs on Google Cloud. Cloud Shell provides command-line access to your Google Cloud resources. gcloud is the command-line tool for Google Cloud. It comes pre-installed on Cloud Shell and supports tab completion.
Click the Activate Cloud Shell button () at the top right of the console.
Click Continue.
It takes a few moments to provision and connect to the environment. When you are connected, you are also authenticated, and the project is set to your PROJECT_ID.
cd training-data-analyst/quests/getting_started_apache_beam/beam_ml_toxicity_in_gaming
ls
The full code is in part2.py. We have broken it down into part1.py and part2.py as per the lab tasks.
Navigate to the exercise files.
cd exercises
You will see the template versions of the part1.py and part2.py code, which means there is missing code (labeled with # TODO) that you need to build by following the steps in the rest of this lab.
If you get stuck at any point, feel free to reference the complete code as indicated in step 2 above.
Task 2: Setup you environment
In Cloud Shell, run the following command to set an environment variable for your project ID.
PROJECT_ID=$(gcloud config list --format=json | jq -r .core.project)
In Cloud Shell, run the following commands to copy the two models into the current directory.
In Cloud Shell, run the below commands to create two Pub/Sub topics: tox-input and tox-output..
gcloud pubsub topics create tox-input
gcloud pubsub topics create tox-output
You need an input topic to ingest messages that will be evaluated. You also need an output topic to send messages that the model predicts as toxic.
In Cloud Shell, run the following command in the terminal to create your BigQuery dataset. Later, your pipeline code will write results into this dataset.
bq --location=US mk --dataset demo
Click Check my progress to verify the objective.
Please create a Pub/sub topics and bigquery datset
Task 3: Create a Beam Python pipeline with Beam ML
In this task, you create a Python Beam Pipeline that reads in a message submitted to Pub/Sub and makes a prediction on whether the message is toxic or not.
Click on Open Editor on the top of your Cloud Shell window to launch the Cloud Shell Editor. You will use this editor to create your pipeline code.
Once the editor is launched, use the file explorer navigation on the left to navigate to training_data_analyst > quests > getting_started_apache_beam > beam_ml_toxicity_in_gaming > exercises
Click on the file part1.py to open it in the editor. Now you are ready to add code to the file to build your pipeline
Note: The remaining steps in this task will guide you on what to add to the code to complete your pipeline.
If you get stuck at any point, feel free to reference the full code in training_data_analyst/quests/getting_started_apache_beam/beam_ml_toxicity_in_gaming/part1.py
Create a Beam Python pipeline object that reads from Google Cloud Pub/Sub input topic.
For simplicity, we use the built-in source, ReadFromPubSub.
If you use the topic option, you don't need to create a subscription, because the built-in source creates the necessary subscription at run time.
Setting the with_attributes option to True lets you use attributes to enhance your messages.
Window the incoming element
We want to join the element in the second half of the lab so we need to apply a window to the incoming message as this is a streaming pipeline.
Again, for simplicity, we use a fixed window.
| "Window data" >> beam.WindowInto(beam.window.FixedWindows(0.1))
The window here is only necessary for joining the element back together in the A/B test portion of the piperline.
Tag your element with the key
We want to tag the element with a key so it can be identified as needed. We use the attribute in the example.
Essentially, you want to output a tuple (key, element).
In the example, we decode the string. If you plan to use it as a string, you also need to decode the string; otherwise, leave it in the binary form.
At this point, save your DAG into a variable so that you can use it to easily and intuitively fork the pipeline.
read_from_pubsub = (
p
| ...
)
Create the model handler
You need to create a model handler to instantiate your model. We use a KeyedModelHandler, because we have a tuple with a key.
The KeyedModelHandler automatically helps you handle the key, so you don't need to worry about parsing it.
gaming_model_handler = KeyedModelHandler(TFModelHandlerTensor(gaming_model_location))
You can either hardcode the model path (*./gaming_trained*), or provide a variable that you pass in as an argument later.
Submit the input to the model for a result
Submit your message to the model for your prediction. There's really nothing extra that you need to do here as long as your model takes in the expected input.
gaming_inference = (
read_from_pubsub
| "Perform gaming inference" >> RunInference(gaming_model_handler)
)
This step saves the step of the pipeline in a variable. We need this variable when we do the A/B test in a later task.
Parse your results from the prediction.
Parse the prediction output from the model using the PredictionResult object.
In this example, we use a simple method to illustrate that the results are simply objects that you can manipulate and use as necessary. Assign an arbitrary value to determine if a message is toxic or not. Tag the results with a not for a not-nice message and nice for a nice message.
The toxicity value is heavily dependent on your model. Our value was chosen for demonstration purposes only.
Do a simple MAP and print the results of the prediction to the standard output to see if the model worked.
nice_or_not | beam.Map(print)
This method isn't the recommended way to log messages, but we use it for brevity.
Filter your data on the result key
We want to only submit the toxic messages to the output topic, for possibly additional action.
Use the Beam primitive Filter to get all of the not keys.
Remember, these messages are the ones that are potentially mean or toxic.
Submit the messages to Pub/Sub for further action
Submit the flagged messages to Pub/Sub.
In this exercise, we don't build a consumer-to-action on these messages; however, you can build a Cloud Run or Cloud Function to receive the message and perform an action, like muting the misbehaving offender.
Task 4: Run your pipeline
Use the Direct Runner to run your pipeline.
You can also provide arguments to parse, or you can code them into your pipeline.
Run the pipeline with the following command in the terminal in cloud shell. If you receive an error, please ensure you are in the correct directory (~/devrel-demos/data-analytics/beam_ml_toxicity_in_gaming). Note that you should not expect an output after this step.:
Use the + sign at the top of the console to open another terminal. In the new terminal, submit the messages.
PROJECT_ID=$(gcloud config list --format=json | jq -r .core.project)
gcloud pubsub topics publish projects/$PROJECT_ID/topics/tox-input --message="that was awesome" --attribute="userid=3"
gcloud pubsub topics publish projects/$PROJECT_ID/topics/tox-input --message="You are all a bunch of losers" --attribute="userid=3"
The model may be inaccurate in its predictions but you can always replace it with a better performing model.
In the original terminal (the one running the pipeline), stop your program. (Ctrl-C to send an interrupt to it; twice should return the CLI to you).
Note: Messages will expire in 10 seconds. So if you are not getting your progress score, please send the messages again.
Click Check my progress to verify the objective.
Create a Beam Python Pipeline with Beam ML and Run the Pipeline.
Task 5: Add another model for A/B testing
Open the part2.py file in your exercises folder and use it as a starting point to build the Python pipeline. The individual steps are labeled in the comments. There are some steps with multiple snippets of code, ensure you reference them all.
In this task, you add another model to compare model results. You can test multiple models and see which is best suited for your needs. You also send the results to BigQuery, where you can compare them.
Create the model handler
Create another model handler, like we did earlier in the lab.
Use the KeyedModelHandler to either hardcode the path (./movie_trained) or provide a variable.
Submit the input into the model for a result
Submit the input to the new model.
Remember that you forked the pipeline, so reuse that object to continue your pipeline.
We want to compare the results eventually, so we need to combine the results.
To join your results together, use a CoGroupByKey. You need to collate the results of the two PCollections (from movie and gaming) together.
Transform your joined results into a string
In a real-world scenario, you want to parse the results and store them properly with a schema.
To make this lab shorter, we're going to take the entire joined result, cast it into a giant string, and store it in BigQuery.
Join your results together
Write your results to BigQuery in the code, you can use the built-in IO WriteToBigQuery.
Use a write method, such as STREAMING_INSERTS, to write into BigQuery.
Task 6: Run your pipeline
Use the Direct Runner to run your pipeline. First, in the cloud shell, run the following command to set an environment variable for your project ID.
PROJECT_ID=$(gcloud config list --format=json | jq -r .core.project)
Then, run the pipeline with the following command in the terminal in cloud shell. If you receive an error, please ensure you are in the correct directory (~/devrel-demos/data-analytics/beam_ml_toxicity_in_gaming). Note that you should not expect an output after this step.:
You can test the pipeline by submitting some more messages.
We reuse the messages from above.
Use the + sign at the top of the console to open another terminal. In the new terminal, submit the messages.
PROJECT_ID=$(gcloud config list --format=json | jq -r .core.project)
gcloud pubsub topics publish projects/$PROJECT_ID/topics/tox-input --message="that was awesome" --attribute="userid=3"
gcloud pubsub topics publish projects/$PROJECT_ID/topics/tox-input --message="You are all a bunch of losers" --attribute="userid=3"
Check the comparisons in BigQuery
Use the following query to pull the data from BigQuery and check the comparison. Note that you may have to wait a few minutes for the BigQuery table to populate.
bq query --use_legacy_sql=false "SELECT * FROM demo.tox;"
Click Check my progress to verify the objective.
Add another Model for A/B testing and Run the Pipeline and Check the Comparisions in Bigquery.
End your lab
When you have completed your lab, click End Lab. Qwiklabs removes the resources you’ve used and cleans the account for you.
You will be given an opportunity to rate the lab experience. Select the applicable number of stars, type a comment, and then click Submit.
The number of stars indicates the following:
1 star = Very dissatisfied
2 stars = Dissatisfied
3 stars = Neutral
4 stars = Satisfied
5 stars = Very satisfied
You can close the dialog box if you don't want to provide feedback.
For feedback, suggestions, or corrections, please use the Support tab.
Congratulations!
Congratulations, you've now learned how to use ML in Beam.
You can now customize your pipelines to best suit your needs.
Copyright 2022 Google LLC All rights reserved. Google and the Google logo are trademarks of Google LLC. All other company and product names may be trademarks of the respective companies with which they are associated.
Os laboratórios criam um projeto e recursos do Google Cloud por um período fixo
Os laboratórios têm um limite de tempo e não têm o recurso de pausa. Se você encerrar o laboratório, vai precisar recomeçar do início.
No canto superior esquerdo da tela, clique em Começar o laboratório
Usar a navegação anônima
Copie o nome de usuário e a senha fornecidos para o laboratório
Clique em Abrir console no modo anônimo
Fazer login no console
Faça login usando suas credenciais do laboratório. Usar outras credenciais pode causar erros ou gerar cobranças.
Aceite os termos e pule a página de recursos de recuperação
Não clique em Terminar o laboratório a menos que você tenha concluído ou queira recomeçar, porque isso vai apagar seu trabalho e remover o projeto
Este conteúdo não está disponível no momento
Você vai receber uma notificação por e-mail quando ele estiver disponível
Ótimo!
Vamos entrar em contato por e-mail se ele ficar disponível
Um laboratório por vez
Confirme para encerrar todos os laboratórios atuais e iniciar este
Use a navegação anônima para executar o laboratório
Para executar este laboratório, use o modo de navegação anônima ou uma janela anônima do navegador. Isso evita conflitos entre sua conta pessoal e a conta de estudante, o que poderia causar cobranças extras na sua conta pessoal.
In this lab, we will use Apache Beam ML RunInference transform to show how with just a few lines of code you can enable a real time pipeline that will scale with your needs!
Duração:
Configuração: 0 minutos
·
Tempo de acesso: 90 minutos
·
Tempo para conclusão: 90 minutos