arrow_back

Serverless Data Processing with Dataflow - Using Dataflow SQL for Batch Analytics (Python)

로그인 가입
700개 이상의 실습 및 과정 이용하기

Serverless Data Processing with Dataflow - Using Dataflow SQL for Batch Analytics (Python)

실습 2시간 universal_currency_alt 크레딧 5개 show_chart 고급
info 이 실습에는 학습을 지원하는 AI 도구가 통합되어 있을 수 있습니다.
700개 이상의 실습 및 과정 이용하기

Overview

In this lab, you:

  • Write a pipeline that uses SQL to aggregate site traffic by user.
  • Write a pipeline that uses SQL to aggregate site traffic by minute.

Setup and requirements

Before you click the Start Lab button

Note: Read these instructions.

Labs are timed and you cannot pause them. The timer, which starts when you click Start Lab, shows how long Google Cloud resources will be made available to you.

This Qwiklabs hands-on lab lets you do the lab activities yourself in a real cloud environment, not in a simulation or demo environment. It does so by giving you new, temporary credentials that you use to sign in and access Google Cloud for the duration of the lab.

What you need

To complete this lab, you need:

  • Access to a standard internet browser (Chrome browser recommended).
  • Time to complete the lab.
Note: If you already have your own personal Google Cloud account or project, do not use it for this lab. Note: If you are using a Pixelbook, open an Incognito window to run this lab.

How to start your lab and sign in to the Console

  1. Click the Start Lab button. If you need to pay for the lab, a pop-up opens for you to select your payment method. On the left is a panel populated with the temporary credentials that you must use for this lab.

    Credentials panel

  2. Copy the username, and then click Open Google Console. The lab spins up resources, and then opens another tab that shows the Choose an account page.

    Note: Open the tabs in separate windows, side-by-side.
  3. On the Choose an account page, click Use Another Account. The Sign in page opens.

    Choose an account dialog box with Use Another Account option highlighted

  4. Paste the username that you copied from the Connection Details panel. Then copy and paste the password.

Note: You must use the credentials from the Connection Details panel. Do not use your Google Cloud Skills Boost credentials. If you have your own Google Cloud account, do not use it for this lab (avoids incurring charges).
  1. Click through the subsequent pages:
  • Accept the terms and conditions.
  • Do not add recovery options or two-factor authentication (because this is a temporary account).
  • Do not sign up for free trials.

After a few moments, the Cloud console opens in this tab.

Note: You can view the menu with a list of Google Cloud Products and Services by clicking the Navigation menu at the top-left. Cloud Console Menu

Jupyter notebook-based development environment setup

For this lab, you will be running all commands in a terminal from your notebook.

  1. In the Google Cloud Console, on the Navigation Menu, click Vertex AI > Workbench.

  2. Click Enable Notebooks API.

  3. On the Workbench page, select USER-MANAGED NOTEBOOKS and click CREATE NEW.

  4. In the New instance dialog box that appears, set the region to and zone to .

  5. For Environment, select Apache Beam.

  6. Click CREATE at the bottom of the dialog vox.

Note: The environment may take 3 - 5 minutes to be fully provisioned. Please wait until the step is complete. Note: Click Enable Notebook API to enable the notebook api.
  1. Once the environment is ready, click the OPEN JUPYTERLAB link next to your Notebook name. This will open up your environment in a new tab in your browser.

IDE_link

  1. Next, click Terminal. This will open up a terminal where you can run all the commands in this lab.

Open Terminal

Download Code Repository

Next you will download a code repository for use in this lab.

  1. In the terminal you just opened, enter the following:
git clone https://github.com/GoogleCloudPlatform/training-data-analyst cd /home/jupyter/training-data-analyst/quests/dataflow_python/
  1. On the left panel of your notebook environment, in the file browser, you will notice the training-data-analyst repo added.

  2. Navigate into the cloned repo /training-data-analyst/quests/dataflow_python/. You will see a folder for each lab, which is further divided into a lab sub-folder with code to be completed by you, and a solution sub-folder with a fully workable example to reference if you get stuck.

Explorer option highlighted in the expanded View menu

Note: To open a file for editing purposes, simply navigate to the file and click on it. This will open the file, where you can add or modify code.

Click Check my progress to verify the objective. Create notebook instance and clone course repo

Part 1: Aggregating site traffic by user with SQL

In this part of the lab, you rewrite your previous BatchUserTraffic pipeline so that it performs the following:

  • Reads the day’s traffic from a file in Cloud Storage.
  • Converts each event into a CommonLog object.
  • Uses SQL instead of Java transforms to sum the number of hits for each unique user ID and perform additional aggregations.
  • Writes the resulting data to BigQuery.
  • Has an additional branch that writes the raw data to BigQuery for later analysis.

Task 1. Generate synthetic data

As in the prior labs, the first step is to generate data for the pipeline to process. You will open the lab environment and generate the data as before:

Open the appropriate lab

  • In the terminal in your IDE, change to the directory you will use for this lab:
# Change directory into the lab cd 4_SQL_Batch_Analytics/lab export BASE_DIR=$(pwd)

Setting up dependencies

Before you can begin editing the actual pipeline code, you need to ensure that you have installed the necessary dependencies.

  1. In your terminal, execute the following to install the packages you will need to execute your pipeline:
python3 -m pip install -q --upgrade pip setuptools wheel python3 -m pip install apache-beam[gcp]
  1. Ensure that the Dataflow and Data Catalog APIs are enabled:
gcloud services enable dataflow.googleapis.com gcloud services enable datacatalog.googleapis.com

Set up the data environment

# Create GCS buckets and BQ dataset cd $BASE_DIR/../.. source create_batch_sinks.sh # Generate event dataflow source generate_batch_events.sh # Change to the directory containing the practice version of the code cd $BASE_DIR

The script creates a file called events.json containing lines resembling the following:

{"user_id": "-6434255326544341291", "ip": "192.175.49.116", "timestamp": "2019-06-19T16:06:45.118306Z", "http_request": "\"GET eucharya.html HTTP/1.0\"", "lat": 37.751, "lng": -97.822, "http_response": 200, "user_agent": "Mozilla/5.0 (compatible; MSIE 7.0; Windows NT 5.01; Trident/5.1)", "num_bytes": 182}

It then automatically copies this file to your Google Cloud Storage bucket at .

  • Navigate to Google Cloud Storage and confirm that your storage bucket contains a file called events.json.

Click Check my progress to verify the objective. Set up the data environment

Task 2. Add SQL dependencies

  1. In your File Explorer, navigate to training-data-analyst/quests/dataflow_python/4_SQL_Batch_Analytics/lab/ and open the batch_user_traffic_SQL_pipeline.py file.

This pipeline already contains the necessary code to accept command-line options for the input path and one output table name, as well as code to read in events from Google Cloud Storage, parse those events, and write results to BigQuery. However, some important parts are missing.

As in the previous lab, the next step in the pipeline is to aggregate the events by each unique user_id and count pageviews for each. This time, however, you will perform the aggregation using SQL, using SqlTransform instead of Python-based transforms.

  1. In batch_user_traffic_SQL_pipeline.py, add the following import statement:
from apache_beam.transforms.sql import SqlTransform
  1. Next, you will add the following SQL query to the file in the query variable definition:
SELECT user_id, COUNT(*) AS page_views, SUM(num_bytes) as total_bytes, MAX(num_bytes) AS max_bytes, MIN(num_bytes) as min_bytes FROM PCOLLECTION GROUP BY user_id
  1. Next, fill out the following #TODO. You will use this code to write a Transform to write raw data to BigQuery:
logs | 'WriteRawToBQ' >> beam.io.WriteToBigQuery( raw_table_name, schema=raw_table_schema, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE )

Beam SQL can be implemented in both the Apache Calcite dialects (default) and ZetaSQL dialects. While both will be executed by Dataflow, we will be implementing ZetaSQL in this example, as it is similar to the language used in BigQuery and is also the dialect implemented in 'Dataflow SQL' — SQL queries authored directly in the Dataflow UI.

  1. Fill out the last #TODO. Apply a SQLTransform using ZetaSQL Dialect using the following code:
SqlTransform(query, dialect='zetasql')
  1. Save the changes you made to the file.

Task 3. Run your pipeline

  1. Return to the terminal and execute the following code to run your pipeline:
export PROJECT_ID=$(gcloud config get-value project) export REGION={{{project_0.startup_script.lab_region|Region}}} export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export RUNNER=DataflowRunner export INPUT_PATH=${PIPELINE_FOLDER}/events.json export TABLE_NAME=${PROJECT_ID}:logs.user_traffic export AGGREGATE_TABLE_NAME=${PROJECT_ID}:logs.user_traffic export RAW_TABLE_NAME=${PROJECT_ID}:logs.raw python3 batch_user_traffic_SQL_pipeline.py \ --project=${PROJECT_ID} \ --region=${REGION} \ --staging_location=${PIPELINE_FOLDER}/staging \ --temp_location=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --experiments=use_runner_v2 \ --input_path=${INPUT_PATH} \ --agg_table_name=${AGGREGATE_TABLE_NAME} \ --raw_table_name=${RAW_TABLE_NAME}
  1. Navigate to Navigation Menu > Dataflow to see the status of your pipeline.

  2. Once your pipeline has finished, go to the BigQuery UI to query the two resulting tables.

  3. Make sure that logs.raw exists and has data populated, as you will need that later in the lab.

Click Check my progress to verify the objective. Aggregating site traffic by user with SQL

Part 2: Aggregating site traffic by minute with SQL

In this part of the lab, you rewrite your previous BatchMinuteTraffic pipeline so that it performs the following:

  • Reads the day’s traffic from a file in Cloud Storage.
  • Converts each event into a CommonLog object and then adds a Joda Timestamp attribute to the object.
  • Uses SQL instead of Java transforms to again Window sum the number of total hits per minute.
  • Writes the resulting data to BigQuery.

Task 1. Add timestamp field to CommonLog row

In this task, you will add a Joda timestamp field to your CommonLog object, implicitly converting it to a generic Row object.

The appropriate imports and options have already been set to use ZetaSQL. A pipeline has been built with steps to ingest and write the data, but not to transform or aggregate it. In the file explorer of the IDE, navigate to training-data-analyst/quests/dataflow_python/4_SQL_Batch_Analytics/lab/ and open the file batch_minute_user_SQL_pipeline.py.

  1. For the first #TODO, add the formatted timestamp as a string:
ts = datetime.strptime(element.ts[:-8], "%Y-%m-%dT%H:%M:%S") ts = datetime.strftime(ts, "%Y-%m-%d %H:%M:%S")

Note that when using the Python SDK, we currently cannot directly pass in objects of type datetime to a SqlTransform. Instead, we will convert the object into a string using strftime and then in SQL use the TIMESTAMP function.

  1. Next, add the following SQL query:
SELECT COUNT(*) AS page_views, STRING(window_start) AS start_time FROM TUMBLE( (SELECT TIMESTAMP(ts) AS ts FROM PCOLLECTION), DESCRIPTOR(ts), 'INTERVAL 1 MINUTE') GROUP BY window_start Note: In this SQL query, we convert our field ts to type TIMESTAMP and use it as the event timestamp for our fixed one-minute windows. The window_start field is generated by TUMBLE and is of type TIMESTAMP as well. Due to the issue we discussed earlier with the Python SDK, we must convert this field to a STRING before sending the resulting PCollection back to Python transforms.
  1. For the next #TODO, apply a SQLTransform using ZetaSQL Dialect using the following code:
SqlTransform(query, dialect='zetasql')

Task 2. Run your pipeline

  1. Back in your terminal, execute the following code to run your pipeline:
export PROJECT_ID=$(gcloud config get-value project) export REGION={{{project_0.startup_script.lab_region|Region}}} export BUCKET=gs://${PROJECT_ID} export PIPELINE_FOLDER=${BUCKET} export RUNNER=DataflowRunner export INPUT_PATH=${PIPELINE_FOLDER}/events.json export TABLE_NAME=${PROJECT_ID}:logs.minute_traffic python3 batch_minute_traffic_SQL_pipeline.py \ --project=${PROJECT_ID} \ --region={{{project_0.startup_script.lab_region|Region}}} \ --stagingLocation=${PIPELINE_FOLDER}/staging \ --tempLocation=${PIPELINE_FOLDER}/temp \ --runner=${RUNNER} \ --inputPath=${INPUT_PATH} \ --tableName=${TABLE_NAME} \ --experiments=use_runner_v2
  1. In the Cloud Console, navigate to Navigation Menu > Dataflow to see the status of your pipeline.

  2. Once your pipeline has finished, go to the BigQuery UI to query the resulting logs.minute_traffic tables. Alternatively, you can query from the terminal:

bq head -n 10 $PROJECT_ID:logs.minute_traffic

Click Check my progress to verify the objective. Aggregating site traffic by minute with SQL

End your lab

When you have completed your lab, click End Lab. Google Cloud Skills Boost removes the resources you’ve used and cleans the account for you.

You will be given an opportunity to rate the lab experience. Select the applicable number of stars, type a comment, and then click Submit.

The number of stars indicates the following:

  • 1 star = Very dissatisfied
  • 2 stars = Dissatisfied
  • 3 stars = Neutral
  • 4 stars = Satisfied
  • 5 stars = Very satisfied

You can close the dialog box if you don't want to provide feedback.

For feedback, suggestions, or corrections, please use the Support tab.

Copyright 2022 Google LLC All rights reserved. Google and the Google logo are trademarks of Google LLC. All other company and product names may be trademarks of the respective companies with which they are associated.

시작하기 전에

  1. 실습에서는 정해진 기간 동안 Google Cloud 프로젝트와 리소스를 만듭니다.
  2. 실습에는 시간 제한이 있으며 일시중지 기능이 없습니다. 실습을 종료하면 처음부터 다시 시작해야 합니다.
  3. 화면 왼쪽 상단에서 실습 시작을 클릭하여 시작합니다.

시크릿 브라우징 사용

  1. 실습에 입력한 사용자 이름비밀번호를 복사합니다.
  2. 비공개 모드에서 콘솔 열기를 클릭합니다.

콘솔에 로그인

    실습 사용자 인증 정보를 사용하여
  1. 로그인합니다. 다른 사용자 인증 정보를 사용하면 오류가 발생하거나 요금이 부과될 수 있습니다.
  2. 약관에 동의하고 리소스 복구 페이지를 건너뜁니다.
  3. 실습을 완료했거나 다시 시작하려고 하는 경우가 아니면 실습 종료를 클릭하지 마세요. 이 버튼을 클릭하면 작업 내용이 지워지고 프로젝트가 삭제됩니다.

현재 이 콘텐츠를 이용할 수 없습니다

이용할 수 있게 되면 이메일로 알려드리겠습니다.

감사합니다

이용할 수 있게 되면 이메일로 알려드리겠습니다.

한 번에 실습 1개만 가능

모든 기존 실습을 종료하고 이 실습을 시작할지 확인하세요.

시크릿 브라우징을 사용하여 실습 실행하기

이 실습을 실행하려면 시크릿 모드 또는 시크릿 브라우저 창을 사용하세요. 개인 계정과 학생 계정 간의 충돌로 개인 계정에 추가 요금이 발생하는 일을 방지해 줍니다.