Stream from Pub/Sub to BigQuery


This tutorial uses the Pub/Sub Subscription to BigQuery template to create and run a Dataflow template job using the Google Cloud console or Google Cloud CLI. The tutorial walks you through a streaming pipeline example that reads JSON-encoded messages from Pub/Sub, uses a User-Defined Function (UDF) to extend the Google-provided streaming template, transforms message data with the Apache Beam SDK, and writes the results to a BigQuery table.

Streaming analytics and data integration pipelines use Pub/Sub to ingest and distribute data. Pub/Sub enables you to create systems of event producers and consumers, called publishers and subscribers. Publishers send events to the Pub/Sub service asynchronously, and Pub/Sub delivers the events to all services that need to react to them.

Dataflow is a fully-managed service for transforming and enriching data in stream (real-time) and batch modes. It provides a simplified pipeline development environment that uses the Apache Beam SDK to transform incoming data and then output the transformed data.

The benefit of this workflow is that you can use UDFs to transform the message data before it is written to BigQuery. Another option is to use a BigQuery subscription, which writes Pub/Sub messages directly to BigQuery, without using Dataflow. This option only supports at-least-once delivery; it does not support exactly-once processing.

Objectives

  • Create a Pub/Sub topic.
  • Create a BigQuery dataset with a table and schema.
  • Use a Google-provided streaming template to stream data from your Pub/Sub subscription to BigQuery by using Dataflow.
  • Create a User-Defined Function (UDF) to extend the Google-provided streaming template.

Costs

In this document, you use the following billable components of Google Cloud:

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • Cloud Scheduler
  • BigQuery

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

When you finish the tasks that are described in this document, you can avoid continued billing by deleting the resources that you created. For more information, see Clean up.

Before you begin

This section shows you how to select a project, enable APIs, and grant the appropriate roles to your user account and to the worker service account.

Console

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager, and Cloud Scheduler APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager, and Cloud Scheduler APIs.

    Enable the APIs

  8. To complete the steps in this tutorial, your user account must have the Service Account User role. The Compute Engine default service account must have the following roles: Dataflow Worker, Dataflow Admin, Pub/Sub Editor, Storage Object Admin, and BigQuery Data Editor. To add the required roles in the Google Cloud console:

    1. In the Google Cloud console, go to the IAM page.

      Go to IAM
    2. Select your project.
    3. In the row containing your user account, click Edit principal, and then click Add another role.
    4. In the drop-down list, select the role Service Account User.
    5. In the row containing the Compute Engine default service account, click Edit principal, and then click Add another role.
    6. In the drop-down list, select the role Dataflow Worker.
    7. Repeat for the Dataflow Admin, the Pub/Sub Editor, the Storage Object Admin, and the BigQuery Data Editor roles, and then click Save.

      For more information about granting roles, see Grant an IAM role by using the console.

gcloud

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager, and Cloud Scheduler APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com
  7. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager, and Cloud Scheduler APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com
  14. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Grant roles to your Compute Engine default service account. Run the following command once for each of the following IAM roles:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    Replace the following:

    • PROJECT_ID: your project ID.
    • PROJECT_NUMBER: your project number. To find your project number, use the gcloud projects describe command.
    • SERVICE_ACCOUNT_ROLE: each individual role.

Create the example source and sink

This section explains how to create the following:

  • A streaming source of data using Pub/Sub
  • A dataset to load the data into BigQuery

Create a Cloud Storage bucket

Begin by creating a Cloud Storage bucket using the Google Cloud console or Google Cloud CLI. The Dataflow pipeline uses this bucket as a temporary storage location.

Console

  1. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  2. Click Create bucket.

  3. On the Create a bucket page, for Name your bucket, enter a name that meets the bucket naming requirements. Cloud Storage bucket names must be globally unique. Don't select the other options.

  4. Click Create.

gcloud

Use the gcloud storage buckets create command:

gcloud storage buckets create gs://BUCKET_NAME

Replace BUCKET_NAME with a name for your Cloud Storage bucket that meets the bucket naming requirements. Cloud Storage bucket names must be globally unique.

Create a Pub/Sub topic and subscription

Create a Pub/Sub topic and then create a subscription to that topic.

Console

To create a topic, complete the following steps.

  1. In the Google Cloud console, go to the Pub/Sub Topics page.

    Go to Topics

  2. Click Create topic.

  3. In the Topic ID field, enter an ID for your topic. For information about how to name a topic, see Guidelines to name a topic or a subscription.

  4. Retain the option Add a default subscription. Don't select the other options.

  5. Click Create topic.

gcloud

To create a topic, run the gcloud pubsub topics create command. For information about how to name a subscription, see Guidelines to name a topic or a subscription.

gcloud pubsub topics create TOPIC_ID

Replace TOPIC_ID with a name for your Pub/Sub topic.

To create a subscription to your topic, run the gcloud pubsub subscriptions create command:

gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID

Replace SUBSCRIPTION_ID with a name for your Pub/Sub subscription.

Create and run Cloud Scheduler jobs

Create and run two Cloud Scheduler jobs, one that publishes positive ratings and a second that publishes negative ratings to your Pub/Sub topic.

Console

Create a Cloud Scheduler job for positive ratings.

  1. Visit the Cloud Scheduler page in the console.

    Go to Cloud Scheduler

  2. Click the Create a job button.

  3. Enter the name positive-ratings-publisher.

  4. Select a Dataflow region close to where you run the commands in this tutorial. The value of the REGION variable must be a valid region name. For more information about regions and locations, see Dataflow locations.

  5. Specify the frequency for your job, using the unix-cron format: * * * * *

    See Configuring Cron Job Schedules for more information.

  6. Select your time zone.

  7. Click Continue.

  8. In the Target list, select Pub/Sub.

  9. Select your Topic name from the list.

  10. Add the following Message string to be sent to your target: {"url": "https://beam.apache.org/", "review": "positive"}

  11. Click Create.

You now have a cron job that sends a message with a positive rating to your Pub/Sub topic every minute. Your Cloud Function subscribes to that topic.

Create a Cloud Scheduler job for negative ratings.

  1. On the Cloud Scheduler page in the console, click the Create a job button.

  2. Enter the name negative-ratings-publisher.

  3. Select a region for your job to run in.

  4. Specify the frequency for your job, using the unix-cron format: */2 * * * *

    See Configuring Cron Job Schedules for more information.

  5. Select your time zone.

  6. Click Continue.

  7. In the Target list, select Pub/Sub.

  8. Select your Topic name from the list.

  9. Add the following Message string to be sent to your target: {"url": "https://beam.apache.org/", "review": "negative"}

  10. Click Create.

You now have a cron job that sends a message with a negative rating to your Pub/Sub topic every two minutes. Your Cloud Function subscribes to that topic.

gcloud

  1. To create a Cloud Scheduler job for this tutorial, use the gcloud scheduler jobs create command. This step creates a publisher for "positive ratings" that publishes one message per minute.

    gcloud scheduler jobs create pubsub positive-ratings-publisher \
      --schedule="* * * * *" \
      --location=DATAFLOW_REGION \
      --topic="TOPIC_ID" \
      --message-body='{"url": "https://beam.apache.org/", "review": "positive"}'
    

    Replace DATAFLOW_REGION with the region to deploy your Dataflow job in. Select a Dataflow region close to where you run the commands in this tutorial. The value of the REGION variable must be a valid region name.

  2. To start the Cloud Scheduler job, use the gcloud scheduler jobs run command.

    gcloud scheduler jobs run --location=DATAFLOW_REGION positive-ratings-publisher
    
  3. Create and run another similar publisher for "negative ratings" that publishes one message every two minutes. This step creates a publisher for "negative ratings" that publishes one message every two minutes.

    gcloud scheduler jobs create pubsub negative-ratings-publisher \
      --schedule="*/2 * * * *" \
      --location=DATAFLOW_REGION  \
      --topic="TOPIC_ID" \
      --message-body='{"url": "https://beam.apache.org/", "review": "negative"}'
    
  4. Start the second Cloud Scheduler job.

    gcloud scheduler jobs run --location=DATAFLOW_REGION negative-ratings-publisher
    

Create a BigQuery dataset

Create a BigQuery dataset and table with the appropriate schema for your Pub/Sub topic.

Console

Create a BigQuery dataset.

  1. Open the BigQuery page in the Google Cloud console.

    Go to the BigQuery page

  2. In the Explorer panel, select the project where you want to create the dataset.

  3. Expand the Actions option and click Create dataset.

  4. On the Create dataset page:

    • For Dataset ID, enter tutorial_dataset.
    • For Data location, choose a geographic location for the dataset. After a dataset is created, the location can't be changed.

    • Don't select the other options.

    • Click Create dataset.

Create a BigQuery table with a schema.

  1. In the Explorer panel, expand your project and select your tutorial_dataset dataset.

  2. Expand the Actions option and click Open.

  3. In the details panel, click Create table .

  4. On the Create table page, in the Source section, select Empty table.

  5. On the Create table page, in the Destination section:

    • Verify that Dataset name is set to tutorial_dataset.
    • In the Table name field, enter tutorial.
    • Verify that Table type is set to Native table.
  6. In the Schema section, enter the schema definition. Enable Edit as text and enter the following table schema as a JSON array.

    [
      {
        "mode": "NULLABLE",
        "name": "url",
        "type": "STRING"
      },
      {
        "mode": "NULLABLE",
        "name": "review",
        "type": "STRING"
      }
    ]
    
  7. For Partition and cluster settings leave the default value: No partitioning.

  8. In the Advanced options section, for Encryption leave the default value: Google-managed key. By default, Dataflow encrypts customer content stored at rest.

  9. Click Create table.

gcloud

Use the bq mk command to create the dataset.

bq --location=DATAFLOW_REGION mk \
PROJECT_ID:tutorial_dataset

Replace PROJECT_ID with the project ID of your project.

Use the bq mk command with the --table or -t flag to create a table in your dataset.

bq mk \
    --table \
    PROJECT_ID:tutorial_dataset.tutorial \
    url:STRING,review:STRING

Create a User-Defined Function (UDF)

You can optionally create a JavaScript UDF to extend the Google-provided Pub/Sub Subscription to BigQuery template. UDFs let you define data transformations not present in the template and inject them into the template.

The following UDF validates the URLs of the incoming ratings. Ratings with no URLs or wrong URLs are forwarded to a different output table suffixed with _error_records, also known as a dead-letter table, in the same project and dataset.

JavaScript

/**
 * User-defined function (UDF) to transform events
 * as part of a Dataflow template job.
 *
 * @param {string} inJson input Pub/Sub JSON message (stringified)
 */
 function process(inJson) {
    const obj = JSON.parse(inJson);
    const includePubsubMessage = obj.data && obj.attributes;
    const data = includePubsubMessage ? obj.data : obj;

    if (!data.hasOwnProperty('url')) {
      throw new Error("No url found");
    } else if (data.url !== "https://beam.apache.org/") {
      throw new Error("Unrecognized url");
    }

    return JSON.stringify(obj);
  }

Save this JavaScript snippet to the Cloud Storage bucket created earlier.

Run the pipeline

Run a streaming pipeline using the Google-provided Pub/Sub Subscription to BigQuery template. The pipeline gets incoming data from the Pub/Sub topic and outputs the data to your BigQuery dataset.

Console

  1. In the Google Cloud console, go to the Dataflow Jobs page.

    Go to Jobs

  2. Click Create job from template.

  3. Enter a Job name for your Dataflow job.

  4. For Regional endpoint, select a region for your Dataflow job.

  5. For Dataflow template, select the Pub/Sub Subscription to BigQuery template.

  6. For BigQuery output table, enter the following:

    PROJECT_ID:tutorial_dataset.tutorial
    
  7. For Pub/Sub input subscription, enter the following:

    projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
    

    Replace PROJECT_ID with the project ID of the project where you created your BigQuery dataset and SUBSCRIPTION_ID with the name of your Pub/Sub subscription.

  8. For Temporary location, enter the following:

    gs://BUCKET_NAME/temp/
    

    Replace BUCKET_NAME with the name of your Cloud Storage bucket. The temp folder stores temporary files, like the staged pipeline job.

  9. Optional: to include a UDF for the job, expand Optional parameters.

    1. For JavaScript UDF path in Cloud Storage, enter the following:

      gs://BUCKET_NAME/dataflow_udf_transform.js
      
    2. For JavaScript UDF name, enter the following:

      process
      
  10. Click Run job.

To check if the template can forward messages to a dead-letter table, publish some ratings with no URLs or wrong URLs.

  1. Go to the Pub/Sub Topics page.

  2. Click your TOPIC_ID.

  3. Go to the Messages section.

  4. Click Publish message.

  5. Enter some ratings with no URLs or wrong URLs in Message body. For example:

    {"url": "https://beam.apache.org/documentation/sdks/java/", "review": "positive"}
    
  6. Click Publish.

gcloud

To run the template in your shell or terminal, use the gcloud dataflow jobs run command.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
    --region DATAFLOW_REGION \
    --staging-location gs://BUCKET_NAME/temp \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:tutorial_dataset.tutorial

Replace JOB_NAME with a unique name of your choice.

Optionally, to run the template with the UDF, use the following command:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
    --region DATAFLOW_REGION \
    --staging-location gs://BUCKET_NAME/temp \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:tutorial_dataset.tutorial,\
javascriptTextTransformGcsPath=gs://BUCKET_NAME/dataflow_udf_transform.js,\
javascriptTextTransformFunctionName=process

To check if the template can forward messages to a dead-letter table, publish some ratings with no URLs or wrong URLs. For example:

gcloud pubsub topics publish TOPIC_ID \
  --message='{"url": "https://beam.apache.org/documentation/sdks/java/", "review": "positive"}'

View your results

View the data written to your BigQuery tables.

Console

  1. In the Google Cloud console, go to the BigQuery page.
    Go to the BigQuery page

  2. In the query editor, run the following query:

    SELECT * FROM `PROJECT_ID.tutorial_dataset.tutorial`
    LIMIT 1000
    

    It can take up to a minute for data to start appearing in your table.

    The query returns rows that have been added to your table in the past 24 hours. You can also run queries using standard SQL.

    If you expect some error records to be written to your dead-letter table, in the query, use the table name tutorial_error_records. For example:

    SELECT * FROM `PROJECT_ID.tutorial_dataset.tutorial_error_records`
    LIMIT 1000
    

gcloud

Check the results in BigQuery by running the following query:

bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.tutorial_dataset.tutorial"'`'

While this pipeline is running, you can see new rows appended into the BigQuery table every minute.

If you expect some error records to be written to your dead-letter table, in the query, use the table name tutorial_error_records. For example:

SELECT * FROM `PROJECT_ID.tutorial_dataset.tutorial_error_records`
LIMIT 1000

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.

Delete the project

The easiest way to eliminate billing is to delete the Google Cloud project that you created for the tutorial.

Console

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

gcloud

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

Delete the individual resources

If you want to reuse the project later, you can keep the project but delete the resources that you created during the tutorial.

Stop the Dataflow pipeline

Console

  1. In the Google Cloud console, go to the Dataflow Jobs page.

    Go to Jobs

  2. Click the job that you want to stop.

    To stop a job, the status of the job must be running.

  3. In the job details page, click Stop.

  4. Click Cancel.

  5. To confirm your choice, click Stop Job.

gcloud

To cancel your Dataflow job, use the gcloud dataflow jobs command.

gcloud dataflow jobs list \
  --filter 'NAME=JOB_NAME AND STATE=Running' \
  --format 'value(JOB_ID)' \
  --region "DATAFLOW_REGION" \
  | xargs gcloud dataflow jobs cancel --region "DATAFLOW_REGION"

Clean up Google Cloud project resources

Console

  1. Delete the Cloud Scheduler jobs.

    1. Go to the Cloud Scheduler page in the Google Cloud console.

      Go to Cloud Scheduler

    2. Select your jobs.

    3. Click the Delete button at the top of the page and confirm your delete.

  2. Delete the Pub/Sub topic and subscription.

    1. Go to the Pub/Sub Topics page in the Google Cloud console.

      Go to Topics

    2. Select the topic that you created.

    3. Click Delete to permanently delete the topic.

    4. Go to the Pub/Sub Subscriptions page in the Google Cloud console.

      Go to Subscriptions

    5. Select the subscription created with your topic.

    6. Click Delete to permanently delete the subscription.

  3. Delete the BigQuery table and dataset.

    1. In the Google Cloud console, go to the BigQuery page.

      Go to BigQuery

    2. In the Explorer panel, expand your project.

    3. Next to the dataset you want to delete, click View actions, and then click delete.

  4. Delete the Cloud Storage bucket.

    1. In the Google Cloud console, go to the Cloud Storage Buckets page.

      Go to Buckets

    2. Select the bucket that you want to delete, click Delete, and then follow the instructions.

gcloud

  1. To delete the Cloud Scheduler jobs, use the gcloud scheduler jobs delete command.

    gcloud scheduler jobs delete negative-ratings-publisher --location=DATAFLOW_REGION
    
    gcloud scheduler jobs delete positive-ratings-publisher --location=DATAFLOW_REGION
    
  2. To delete the Pub/Sub subscription and topic, use the gcloud pubsub subscriptions delete and the gcloud pubsub topics delete commands.

    gcloud pubsub subscriptions delete SUBSCRIPTION_ID
    gcloud pubsub topics delete TOPIC_ID
    
  3. To delete the BigQuery table, use the bq rm command.

    bq rm -f -t PROJECT_ID:tutorial_dataset.tutorial
    
  4. Delete the BigQuery dataset. The dataset alone does not incur any charges.

    bq rm -r -f -d PROJECT_ID:tutorial_dataset
    
  5. To delete the Cloud Storage bucket, use the gcloud storage rm command. The bucket alone does not incur any charges.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Revoke credentials

Console

If you keep your project, revoke the roles that you granted to the Compute Engine default service account.

  1. In the Google Cloud console, go to the IAM page.

Go to IAM

  1. Select a project, folder, or organization.

  2. Find the row containing the principal whose access you want to revoke. In that row, click Edit principal.

  3. Click the Delete button for each role you want to revoke, and then click Save.

gcloud

  • If you keep your project, revoke the roles that you granted to the Compute Engine default service account. Run the following command one time for each of the following IAM roles:
    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
      gcloud projects remove-iam-policy-binding <var>PROJECT_ID</var> \
      --member=serviceAccount:<var>PROJECT_NUMBER</var>-compute@developer.gserviceaccount.com \
      --role=<var>ROLE</var>
    

  • Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  • Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

What's next