arrow_back

Serverless Data Processing with Dataflow - Batch Analytics Pipelines with Dataflow (Python)

로그인 가입
700개 이상의 실습 및 과정 이용하기

Serverless Data Processing with Dataflow - Batch Analytics Pipelines with Dataflow (Python)

실습 2시간 universal_currency_alt 크레딧 5개 show_chart 고급
info 이 실습에는 학습을 지원하는 AI 도구가 통합되어 있을 수 있습니다.
700개 이상의 실습 및 과정 이용하기

Overview

In this lab, you:

  • Write a pipeline that aggregates site traffic by user.
  • Write a pipeline that aggregates site traffic by minute.
  • Implement windowing on time series data.

Prerequisites

NOTE: Lab level is advanced and skilled knowledge of python is required.

Setup and requirements

For each lab, you get a new Google Cloud project and set of resources for a fixed time at no cost.

  1. Sign in to Qwiklabs using an incognito window.

  2. Note the lab's access time (for example, 1:15:00), and make sure you can finish within that time.
    There is no pause feature. You can restart if needed, but you have to start at the beginning.

  3. When ready, click Start lab.

  4. Note your lab credentials (Username and Password). You will use them to sign in to the Google Cloud Console.

  5. Click Open Google Console.

  6. Click Use another account and copy/paste credentials for this lab into the prompts.
    If you use other credentials, you'll receive errors or incur charges.

  7. Accept the terms and skip the recovery resource page.

Part A. Workbench Instances development environment setup

For this lab, you will be running all commands in a terminal from your Workbench instance notebook.

  1. In the Google Cloud console, from the Navigation menu (Navigation menu), select Vertex AI, or else you can go to Vertex AI Dashboard

  2. Click Enable All Recommended APIs. Now. let's check that the Notebook API is enabled.

  3. In the Navigation menu, click Workbench.

    At the top of the Workbench page, ensure you are in the Instances view.

  4. Click add boxCreate New.

  5. Configure the Instance:

Name Region Zone Advanced Options (Optional)
lab-workbench If needed, click "Advanced Options" for further customization (e.g., machine type, disk size)
  1. Click Create.

This will take a few minutes to create the instance. A green checkmark will appear next to its name when it's ready.

  1. Click Open Jupyterlab next to the instance name to launch the JupyterLab interface. This will open a new tab in your browser.

  2. Next, click Terminal. This will open up a terminal where you can run all the commands in this lab.

Download Code Repository

Next you will download a code repository for use in this lab.

  1. In the terminal you just opened, enter the following:
git clone https://github.com/GoogleCloudPlatform/training-data-analyst cd /home/jupyter/training-data-analyst/quests/dataflow_python/
  1. On the left panel of your notebook environment, in the file browser, you will notice the training-data-analyst repo added.

  2. Navigate into the cloned repo /training-data-analyst/quests/dataflow_python/. You will see a folder for each lab, which is further divided into a lab sub-folder with code to be completed by you, and a solution sub-folder with a fully workable example to reference if you get stuck.

Explorer option highlighted in the expanded View menu

Note: To open a file for editing purposes, simply navigate to the file and click on it. This will open the file, where you can add or modify code.

Click Check my progress to verify the objective. Create notebook instance and clone course repo

Task 1. Aggregating site traffic by user

In this part of the lab, you write a pipeline that:

  1. Reads the day’s traffic from a file in Cloud Storage.
  2. Converts each event into a CommonLog object.
  3. Sums the number of hits for each unique user by grouping each object by user ID and combining the values to get the total number of hits for that particular user.
  4. Performs additional aggregations on each user.
  5. Writes the resulting data to BigQuery.

Task 2. Generate synthetic data

As in the prior labs, the first step is to generate data for the pipeline to process. You will open the lab environment and generate the data as before:

Open the appropriate lab

  • In the terminal in your IDE environment, run the following commands:
# Change directory into the lab cd 3_Batch_Analytics/lab export BASE_DIR=$(pwd)

Setting up virtual environment and dependencies

Before you can begin editing the actual pipeline code, you need to ensure that you have installed the necessary dependencies.

  1. Execute the following to create a virtual environment for your work in this lab:
sudo apt-get update && sudo apt-get install -y python3-venv # Create and activate virtual environment python3 -m venv df-env source df-env/bin/activate
  1. Next, install the packages you will need to execute your pipeline:
python3 -m pip install -q --upgrade pip setuptools wheel python3 -m pip install apache-beam[gcp]
  1. Ensure that the Dataflow API is enabled:
gcloud services enable dataflow.googleapis.com

Set up the data environment

# Create GCS buckets and BQ dataset cd $BASE_DIR/../.. source create_batch_sinks.sh # Generate event dataflow source generate_batch_events.sh # Change to the directory containing the practice version of the code cd $BASE_DIR

The script creates a file called events.json containing lines resembling the following:

{"user_id": "-6434255326544341291", "ip": "192.175.49.116", "timestamp": "2019-06-19T16:06:45.118306Z", "http_request": "\"GET eucharya.html HTTP/1.0\"", "lat": 37.751, "lng": -97.822, "http_response": 200, "user_agent": "Mozilla/5.0 (compatible; MSIE 7.0; Windows NT 5.01; Trident/5.1)", "num_bytes": 182}

It then automatically copies this file to your Google Cloud Storage bucket at .

  • Navigate to Google Cloud Storage and confirm that your storage bucket contains a file called events.json.

Click Check my progress to verify the objective. Generate synthetic data

Task 3. Sum page views per user

This task has two mini challenges. You can refer to the solution

In the file explorer, navigate to below mentioned path and open the batch_user_traffic_pipeline.py file.

training-data-analyst/quests/dataflow_python/3_Batch_Analytics/lab

Mini Challenge - #TODO 1:

This pipeline already contains the necessary code to accept command-line options for the input path and the output table name, as well as code to read in events from Google Cloud Storage, parse those events, and write results to BigQuery. However, some important parts are missing and are marked as #TODOs.

This is a data modeling task. In this step, you should consider, after you group all the log events by user, what information do you want to calculate for each one?

You need to define the structure (the schema) that will hold the results of your aggregation. You must look at the CommonLog class to see what fields are available to aggregate.

How to solve it:

  1. Identify the Key: The class name is PerUserAggregation, so the primary piece of information you need to keep is the user_id.

  2. Choose Metrics to Calculate: What can you calculate from a user's collection of CommonLog entries?

    • A count: How many times did the user access the server?
    • A sum: What is the total number of bytes (num_bytes) they downloaded?
    • A min/max: What was their first and last activity timestamp?
  3. For example:

user_id: str page_views: int ...

Mini Challenge - #TODO 2:

This is a technical requirement of the Apache Beam framework. The challenge is testing your knowledge of how Beam handles custom data types.

When Apache Beam runs a pipeline, it often needs to send data between different computers (called workers). To do this, it must serialize your Python object (like a PerUserAggregation instance) into a stream of bytes, send it over the network, and then deserialize it back into an object on the other side. A Coder is the object that tells Beam how to perform this serialization and deserialization.

If you don't tell Beam how to encode/decode your custom PerUserAggregation class, the pipeline will fail with an error.

How to solve it:

The solution is right in the line above the #TODOs. Beam provides a RowCoder that works perfectly with NamedTuple classes. You simply need to register a RowCoder for your new PerUserAggregation class, just as it was done for CommonLog.

  1. For example:
beam.coders.registry.register_coder(PerUserAggregation, ...)

Task 4. Run your pipeline

Return to the Terminal and execute the following command to run your pipeline using the Cloud Dataflow service. You can run it with DirectRunner if you're having trouble, or refer to the solution

  • In below code snippet, replace ENTER_REGION_ID and ENTER_ZONE_ID field as per the table below
Region Zone
  • Replace the regional and zonal values as per your lab specifications.
# 1. Set all environment variables export PROJECT_ID=$(gcloud config get-value project) export REGION=ENTER_REGION_ID export ZONE=ENTER_ZONE_ID export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export RUNNER=DataflowRunner export INPUT_PATH=${PIPELINE_FOLDER}/events.json export TABLE_NAME=${PROJECT_ID}:logs.user_traffic # 2. Double-check the input file exists # This command should NOT return an error. echo "Verifying input file exists at ${INPUT_PATH}..." gcloud storage ls ${INPUT_PATH} # 3. Execute the pipeline script echo "Running the Dataflow pipeline..." python3 batch_user_traffic_pipeline.py \ --project=${PROJECT_ID} \ --region=${REGION} \ --worker_zone=${ZONE} \ --staging_location=${PIPELINE_FOLDER}/staging \ --temp_location=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --input_path=${INPUT_PATH} \ --table_name=${TABLE_NAME}
  • You can check your submitted job in the Dataflow Dashboard

  • Once job status is successful, let's verify results in BigQuery. To complete this task, wait a few minutes for the pipeline to complete, then navigate to BigQuery and query the user_traffic table.

Click Check my progress to verify the objective. Aggregating site traffic by user and running your pipeline

Part B. Aggregating site traffic by minute

In this part of the lab, you create a new pipeline called batch_minute_traffic. It expands on the basic batch analysis principles used in batch_user_traffic and, instead of aggregating by users across the entire batch, aggregates by when events occurred. There are again multiple #TODOs which basically you need to fix or can take help from the solution

In the IDE, navigate to below path and open the file batch_minute_traffic_pipeline.py.

3_Batch_Analytics/lab Note: Before start working with the main script(batch_minute_traffic_pipeline.py), make sure you go through the two helper files pipeline_utils.py and setup.py. To run our dataflow jobs, we will take help of these helper files as pipeline_utils holds custom Python logic (classes and functions) in a separate module that can be packaged and sent to remote Dataflow workers. Whereas, setup.py acts as the instruction manual for Dataflow, telling it exactly how to bundle your pipeline_utils.py file into a proper package to be installed on every worker.

Task 5. Add timestamps to each element

These #TODOs items guide you through building a classic time-series aggregation pipeline in Apache Beam. Each one represents a core concept in batch and stream processing. The goal of this pipeline is to process a JSON file of web events (events.json), count how many events occurred every minute, and write these minute-by-minute counts to a BigQuery table.

The pipeline flow looks like this: Read Text -> Parse to CommonLog -> TODOs -> Write to BigQuery

The #TODOs are the core concept of the pipeline, where the actual aggregation logic happens.

Mini Challenge - #TODO 1:

You have a collection of CommonLog objects. The next step in the pipeline is to group these events by time (WindowByMinute). Apache Beam cannot do this unless it knows the event time for each piece of data. Your challenge is to tell Beam how to find this timestamp within your CommonLog object.

How to solve it:

  1. The add_timestamp function (defined in pipeline_utils.py) parses the timestamp string from each log record and attaches it to the element as a proper Beam timestamp, which is required for windowing.

  2. For Example:

| 'AddEventTimestamp' >> beam.Map(...)

Task 6. Window into one-minute windows

Let's move to second task of the script where we transform groups elements as per the challenge.

Mini Challenge - #TODO 2:

  1. This transform groups elements into fixed-size, non-overlapping windows of 60 seconds (one minute) based on their event timestamps.

  2. For Example:

| "WindowByMinute" >> beam.WindowInto(beam.window.FixedWindows(...))
  1. To learn more about other types of windowing, read the Apache Beam documentation Section 8.2. Provided windowing functions.

Task 7. Count events per window

Let's move to the third task of the script where we count events per window.

Mini Challenge - #TODO 3:

  1. This combiner counts the number of elements within each one-minute window. Use .without_defaults() to make sure that no output is generated for empty windows.

  2. For Example:

| "CountPerMinute" >> beam.CombineGlobally(CountCombineFn())...()
  1. To complete this task, add a transform that counts all the elements in each window. Remember to refer to the solution if you get stuck.

Task 8. Convert back to a row and add timestamp

Let's move to the final task of the script where we convert back to a row and add timestamp

Mini Challenge - #TODO 4:

  1. The GetTimestampFn (defined in pipeline_utils.py) takes the integer count for each window and formats it into a dictionary, adding the window's start time as a string to match the BigQuery schema.

  2. To complete this task, write a ParDo function that accepts elements of type int, passes in the additional parameter to access window information. Note that the timestamp field in the BigQuery table schema is a STRING, so you will have to convert the timestamp to a string.

  3. For Example:

| "AddWindowTimestamp" >> beam.ParDo(...()) | 'WriteToBQ' >> beam.io.WriteToBigQuery( table_name, schema=table_schema, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE )

Task 9. Run the pipeline

Once you’ve finished coding, run the pipeline using the command below. Keep in mind that, while testing your code, it will be much faster to change the RUNNER environment variable to DirectRunner, which will run the pipeline locally. For now, we run the pipeline using Dataflow.

  • Replace the regional and zonal values as per your lab specifications.
Region
export PROJECT_ID=$(gcloud config get-value project) export REGION=ENTER_REGION_ID export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export RUNNER=DataflowRunner export INPUT_PATH=${PIPELINE_FOLDER}/events.json export TABLE_NAME=${PROJECT_ID}:logs.minute_traffic python3 batch_minute_traffic_pipeline.py \ --project=${PROJECT_ID} \ --region=${REGION} \ --staging_location=${PIPELINE_FOLDER}/staging \ --temp_location=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --input_path=${INPUT_PATH} \ --table_name=${TABLE_NAME} \ --setup_file=./setup.py

Task 10. Verify the results

  • To complete this task, wait a few minutes for the pipeline to execute, then navigate to BigQuery and query the minute_traffic table.

Click Check my progress to verify the objective. Aggregating site traffic by minute and running the pipeline

End your lab

When you have completed your lab, click End Lab. Google Cloud Skills Boost removes the resources you’ve used and cleans the account for you.

You will be given an opportunity to rate the lab experience. Select the applicable number of stars, type a comment, and then click Submit.

The number of stars indicates the following:

  • 1 star = Very dissatisfied
  • 2 stars = Dissatisfied
  • 3 stars = Neutral
  • 4 stars = Satisfied
  • 5 stars = Very satisfied

You can close the dialog box if you don't want to provide feedback.

For feedback, suggestions, or corrections, please use the Support tab.

Copyright 2022 Google LLC All rights reserved. Google and the Google logo are trademarks of Google LLC. All other company and product names may be trademarks of the respective companies with which they are associated.

시작하기 전에

  1. 실습에서는 정해진 기간 동안 Google Cloud 프로젝트와 리소스를 만듭니다.
  2. 실습에는 시간 제한이 있으며 일시중지 기능이 없습니다. 실습을 종료하면 처음부터 다시 시작해야 합니다.
  3. 화면 왼쪽 상단에서 실습 시작을 클릭하여 시작합니다.

시크릿 브라우징 사용

  1. 실습에 입력한 사용자 이름비밀번호를 복사합니다.
  2. 비공개 모드에서 콘솔 열기를 클릭합니다.

콘솔에 로그인

    실습 사용자 인증 정보를 사용하여
  1. 로그인합니다. 다른 사용자 인증 정보를 사용하면 오류가 발생하거나 요금이 부과될 수 있습니다.
  2. 약관에 동의하고 리소스 복구 페이지를 건너뜁니다.
  3. 실습을 완료했거나 다시 시작하려고 하는 경우가 아니면 실습 종료를 클릭하지 마세요. 이 버튼을 클릭하면 작업 내용이 지워지고 프로젝트가 삭제됩니다.

현재 이 콘텐츠를 이용할 수 없습니다

이용할 수 있게 되면 이메일로 알려드리겠습니다.

감사합니다

이용할 수 있게 되면 이메일로 알려드리겠습니다.

한 번에 실습 1개만 가능

모든 기존 실습을 종료하고 이 실습을 시작할지 확인하세요.

시크릿 브라우징을 사용하여 실습 실행하기

이 실습을 실행하려면 시크릿 모드 또는 시크릿 브라우저 창을 사용하세요. 개인 계정과 학생 계정 간의 충돌로 개인 계정에 추가 요금이 발생하는 일을 방지해 줍니다.