Stream Processing with Cloud Pub/Sub and Dataflow: Qwik Start

Join Sign in

Stream Processing with Cloud Pub/Sub and Dataflow: Qwik Start

30 minutes 1 Credit


Google Cloud self-paced labs logo


Google Cloud Pub/Sub is a messaging service for exchanging event data among applications and services. A producer of data publishes messages to a Cloud Pub/Sub topic. A consumer creates a subscription to that topic. Subscribers either pull messages from a subscription or are configured as webhooks for push subscriptions. Every subscriber must acknowledge each message within a configurable window of time.

Dataflow is a fully-managed service for transforming and enriching data in stream (real-time) and batch modes with equal reliability and expressiveness. It provides a simplified pipeline development environment using the Apache Beam SDK, which has a rich set of windowing and session analysis primitives as well as an ecosystem of source and sink connectors.

Pub/Sub is a scalable, durable event ingestion and delivery system. Dataflow compliments Pub/Sub's scalable, at-least-once delivery model with message deduplication and exactly-once, in-order processing if you use windows and buffering.

What you'll do

  1. Read messages published to a Pub/Sub topic
  2. Window (or group) the messages by timestamp
  3. Write the messages to Cloud Storage


Before you click the Start Lab button

Read these instructions. Labs are timed and you cannot pause them. The timer, which starts when you click Start Lab, shows how long Google Cloud resources will be made available to you.

This hands-on lab lets you do the lab activities yourself in a real cloud environment, not in a simulation or demo environment. It does so by giving you new, temporary credentials that you use to sign in and access Google Cloud for the duration of the lab.

To complete this lab, you need:

  • Access to a standard internet browser (Chrome browser recommended).
Note: Use an Incognito or private browser window to run this lab. This prevents any conflicts between your personal account and the Student account, which may cause extra charges incurred to your personal account.
  • Time to complete the lab---remember, once you start, you cannot pause a lab.
Note: If you already have your own personal Google Cloud account or project, do not use it for this lab to avoid extra charges to your account.

How to start your lab and sign in to the Google Cloud Console

  1. Click the Start Lab button. If you need to pay for the lab, a pop-up opens for you to select your payment method. On the left is the Lab Details panel with the following:

    • The Open Google Console button
    • Time remaining
    • The temporary credentials that you must use for this lab
    • Other information, if needed, to step through this lab
  2. Click Open Google Console. The lab spins up resources, and then opens another tab that shows the Sign in page.

    Tip: Arrange the tabs in separate windows, side-by-side.

    Note: If you see the Choose an account dialog, click Use Another Account.
  3. If necessary, copy the Username from the Lab Details panel and paste it into the Sign in dialog. Click Next.

  4. Copy the Password from the Lab Details panel and paste it into the Welcome dialog. Click Next.

    Important: You must use the credentials from the left panel. Do not use your Google Cloud Skills Boost credentials. Note: Using your own Google Cloud account for this lab may incur extra charges.
  5. Click through the subsequent pages:

    • Accept the terms and conditions.
    • Do not add recovery options or two-factor authentication (because this is a temporary account).
    • Do not sign up for free trials.

After a few moments, the Cloud Console opens in this tab.

Note: You can view the menu with a list of Google Cloud Products and Services by clicking the Navigation menu at the top-left. Navigation menu icon

Activate Cloud Shell

Cloud Shell is a virtual machine that is loaded with development tools. It offers a persistent 5GB home directory and runs on the Google Cloud. Cloud Shell provides command-line access to your Google Cloud resources.

  1. Click Activate Cloud Shell Activate Cloud Shell icon at the top of the Google Cloud console.

When you are connected, you are already authenticated, and the project is set to your PROJECT_ID. The output contains a line that declares the PROJECT_ID for this session:

Your Cloud Platform project in this session is set to YOUR_PROJECT_ID

gcloud is the command-line tool for Google Cloud. It comes pre-installed on Cloud Shell and supports tab-completion.

  1. (Optional) You can list the active account name with this command:
gcloud auth list
  1. Click Authorize.

  2. Your output should now look like this:


ACTIVE: * ACCOUNT: To set the active account, run: $ gcloud config set account `ACCOUNT`
  1. (Optional) You can list the project ID with this command:
gcloud config list project


[core] project = <project_ID>

Example output:

[core] project = qwiklabs-gcp-44776a13dea667a6 Note: For full documentation of gcloud, in Google Cloud, refer to the gcloud CLI overview guide.

Set the region

  • In Cloud Shell, run the following command to set the project region for this lab:
gcloud config set compute/region {{{project_0.default_region | "REGION"}}}

Ensure that the Dataflow API is successfully enabled

To ensure access to the necessary API, restart the connection to the Dataflow API.

  1. In the Cloud Console, enter "Dataflow API" in the top search bar. Click on the result for Dataflow API.

  2. Click Manage.

  3. Click Disable API.

If asked to confirm, click Disable.

  1. Click Enable.

When the API has been enabled again, the page will show the option to disable.

Task 1. Create project resources

  1. In Cloud Shell, create variables for your bucket, project, and region.
PROJECT_ID=$(gcloud config get-value project) BUCKET_NAME="${PROJECT_ID}-bucket" TOPIC_ID=my-id REGION={{{project_0.default_region | "filled in at lab start"}}}
  1. Set your App Engine region. Note: for regions other than us-central1, set the AppEngine region variable to be the same as the assigned region. If you are assigned us-central1, set the AppEngine region variable to us-central.

You can refer to the App Engine locations for more information.

  1. Create a Cloud Storage bucket owned by this project:
gsutil mb gs://$BUCKET_NAME Note: Cloud Storage bucket names must be globally unique. Your Qwiklabs Project ID is always unique, so that is used in your bucket name in this lab.
  1. Create a Pub/Sub topic in this project:
gcloud pubsub topics create $TOPIC_ID
  1. Create an App Engine app for your project:
gcloud app create --region=$AE_REGION
  1. Create a Cloud Scheduler job in this project. The job publishes a message to a Pub/Sub topic at one-minute intervals:
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!"
  1. If prompted to enable the Cloud Scheduler API, press y and enter.

Click Check my progress to verify the objective. Create Project Resources

  1. Start the job:
gcloud scheduler jobs run publisher-job
  1. Use the following commands to clone the quickstart repository and navigate to the sample code directory:
git clone cd java-docs-samples/pubsub/streaming-analytics docker run -it -e DEVSHELL_PROJECT_ID=$DEVSHELL_PROJECT_ID python:3.7 /bin/bash git clone cd python-docs-samples/pubsub/streaming-analytics pip install -U -r requirements.txt # Install Apache Beam dependencies Note: Execute the python commands individually.

Click Check my progress to verify the objective. Start the cloud scheduler job

Task 2. Review code to stream messages from Pub/Sub to Cloud Storage

Code sample

Review the following sample code, which uses Dataflow to:

  • Read Pub/Sub messages.
  • Window (or group) messages into fixed-size intervals by publish timestamps.
  • Write the messages in each window to files in Cloud Storage.
import; import org.apache.beam.examples.common.WriteOneFilePerWindow; import org.apache.beam.sdk.Pipeline; import; import org.apache.beam.sdk.options.Default; import org.apache.beam.sdk.options.Description; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.options.StreamingOptions; import org.apache.beam.sdk.options.Validation.Required; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.joda.time.Duration; public class PubSubToGcs { * Define your own configuration options. Add your own arguments to be processed * by the command-line parser, and specify default values for them. public interface PubSubToGcsOptions extends PipelineOptions, StreamingOptions { @Description("The Cloud Pub/Sub topic to read from.") @Required String getInputTopic(); void setInputTopic(String value); @Description("Output file's window size in number of minutes.") @Default.Integer(1) Integer getWindowSize(); void setWindowSize(Integer value); @Description("Path of the output file including its filename prefix.") @Required String getOutput(); void setOutput(String value); } public static void main(String[] args) throws IOException { // The maximum number of shards when writing output. int numShards = 1; PubSubToGcsOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToGcsOptions.class); options.setStreaming(true); Pipeline pipeline = Pipeline.create(options); pipeline // 1) Read string messages from a Pub/Sub topic. .apply("Read PubSub Messages", PubsubIO.readStrings().fromTopic(options.getInputTopic())) // 2) Group the messages into fixed-sized minute intervals. .apply(Window.into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize())))) // 3) Write one file to GCS for every window of messages. .apply("Write Files to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards)); // Execute the pipeline and wait until it finishes running.; } } import argparse from datetime import datetime import logging import random from apache_beam import DoFn, GroupByKey, io, ParDo, Pipeline, PTransform, WindowInto, WithKeys from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.transforms.window import FixedWindows class GroupMessagesByFixedWindows(PTransform): """A composite transform that groups Pub/Sub messages based on publish time and outputs a list of tuples, each containing a message and its publish time. """ def __init__(self, window_size, num_shards=5): # Set window size to 60 seconds. self.window_size = int(window_size * 60) self.num_shards = num_shards def expand(self, pcoll): return ( pcoll # Bind window info to each element using element timestamp (or publish time). | "Window into fixed intervals" >> WindowInto(FixedWindows(self.window_size)) | "Add timestamp to windowed elements" >> ParDo(AddTimestamp()) # Assign a random key to each windowed element based on the number of shards. | "Add key" >> WithKeys(lambda _: random.randint(0, self.num_shards - 1)) # Group windowed elements by key. All the elements in the same window must fit # memory for this. If not, you need to use `beam.util.BatchElements`. | "Group by key" >> GroupByKey() ) class AddTimestamp(DoFn): def process(self, element, publish_time=DoFn.TimestampParam): """Processes each windowed element by extracting the message body and its publish time into a tuple. """ yield ( element.decode("utf-8"), datetime.utcfromtimestamp(float(publish_time)).strftime( "%Y-%m-%d %H:%M:%S.%f" ), ) class WriteToGCS(DoFn): def **init**(self, output_path): self.output_path = output_path def process(self, key_value, window=DoFn.WindowParam): """Write messages in a batch to Google Cloud Storage.""" ts_format = "%H:%M" window_start = window.start.to_utc_datetime().strftime(ts_format) window_end = window.end.to_utc_datetime().strftime(ts_format) shard_id, batch = key_value filename = "-".join([self.output_path, window_start, window_end, str(shard_id)]) with io.gcsio.GcsIO().open(filename=filename, mode="w") as f: for message_body, publish_time in batch: f.write(f"{message_body},{publish_time}\n".encode("utf-8")) def run(input_topic, output_path, window_size=1.0, num_shards=5, pipeline_args=None): # Set `save_main_session` to True so DoFns can access globally imported modules. pipeline_options = PipelineOptions( pipeline_args, streaming=True, save_main_session=True ) with Pipeline(options=pipeline_options) as pipeline: ( pipeline # Because `timestamp_attribute` is unspecified in `ReadFromPubSub`, Beam # binds the publish time returned by the Pub/Sub server for each message # to the element's timestamp parameter, accessible via `DoFn.TimestampParam`. # | "Read from Pub/Sub" >> io.ReadFromPubSub(topic=input_topic) | "Window into" >> GroupMessagesByFixedWindows(window_size, num_shards) | "Write to GCS" >> ParDo(WriteToGCS(output_path)) ) if **name** == "**main**": logging.getLogger().setLevel(logging.INFO) parser = argparse.ArgumentParser() parser.add_argument( "--input_topic", help="The Cloud Pub/Sub topic to read from." '"projects//topics/".', ) parser.add_argument( "--window_size", type=float, default=1.0, help="Output file's window size in minutes.", ) parser.add_argument( "--output_path", help="Path of the output GCS file including the prefix.", ) parser.add_argument( "--num_shards", type=int, default=5, help="Number of shards to use when writing windowed elements to GCS.", ) known_args, pipeline_args = parser.parse_known_args() run( known_args.input_topic, known_args.output_path, known_args.window_size, known_args.num_shards, pipeline_args, ) Note: To explore the sample code further, visit the respective java-docs-samples and python-docs-samples GitHub pages.

Task 3. Start the pipeline

  1. To start the pipeline, run the following command:
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --windowSize=2" python \ --project=project_id \ --region=region \ --input_topic=projects/project_id/topics/my-id \ --output_path=gs://bucket_name/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://bucket_name/temp Note: When executing the python command, replace project_id, bucket_name, and region with your project id, bucket name, and assigned lab region.

You may have to wait around 10 minutes for the pipeline to appear in the dataflow console.

The preceding command runs locally and launches a Dataflow job that runs in the cloud.

  1. When the command returns Workers have started successfully, exit the local program using Ctrl+C.
Note: If you receive a warning regarding StaticLoggerBinder, you can safely ignore it without exiting the local program and move ahead in the lab. Note: To exit your Python development environment, type and enter exit.

Click Check my progress to verify the objective. Start the pipeline and launch dataflow job

Task 4. Observe job and pipeline progress

  1. You can observe the job's progress in the Dataflow console.

Go to the Dataflow console.

Dataflow page displaying the information of the pubsubtogcs 0815172250-75a99ab8 job

  1. Open the job details view to see:
  • Job structure
  • Job logs
  • Stage metrics

Job page displaying the Job summary information

You may have to wait a few minutes to see the output files in Cloud Storage.

  1. You can see the output files by navigating to Navigation menu > Cloud Storage. Click on your bucket name and then click Samples:

Bucket details page displaying the output file information

  1. Exit the application using CTRL+C if not already done. Alternatively, use the command line below to check which files have been written out:
gsutil ls gs://${BUCKET_NAME}/samples/

The output should look like the following:

gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1

Task 5. Cleanup

  1. Delete the Cloud Scheduler job:
gcloud scheduler jobs delete publisher-job
  1. If prompted Do you want to continue press Y and enter.
  1. In the Dataflow console, stop the job.

  2. With your job selected from the Dataflow Console, press the Stop button.

  3. Click Stop Job > Cancel to cancel the pipeline without draining.

  4. Delete the topic:

gcloud pubsub topics delete $TOPIC_ID
  1. Delete the files created by the pipeline:
gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*" gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
  1. Remove the Cloud Storage bucket:
gsutil rb gs://${BUCKET_NAME}


You created a Dataflow pipeline which read messages from your Pub/Sub topic, windowed them by timestamp, and wrote them to your cloud storage bucket.

Next step / learn more

Google Cloud training and certification

...helps you make the most of Google Cloud technologies. Our classes include technical skills and best practices to help you get up to speed quickly and continue your learning journey. We offer fundamental to advanced level training, with on-demand, live, and virtual options to suit your busy schedule. Certifications help you validate and prove your skill and expertise in Google Cloud technologies.

Manual Last Updated September 20, 2023

Lab Last Tested September 20, 2023

Copyright 2023 Google LLC All rights reserved. Google and the Google logo are trademarks of Google LLC. All other company and product names may be trademarks of the respective companies with which they are associated.