Pub/Sub to BigQuery template

The Pub/Sub to BigQuery template is a streaming pipeline that reads JSON-formatted messages from Pub/Sub and writes them to a BigQuery table. Optionally, you can provide a user-defined function (UDF) written in JavaScript to process the incoming messages.

Pipeline requirements

  • The BigQuery table must exist and have a schema.
  • The Pub/Sub message data must use JSON format, or you must provide a UDF that converts the message data to JSON. The JSON data must match the BigQuery table schema. For example, if the JSON payloads are formatted as {"k1":"v1", "k2":"v2"}, the BigQuery table must have two string columns named k1 and k2.
  • Specify the inputSubscription or inputTopic parameter, but not both.

Template parameters

Required parameters

  • outputTableSpec : The BigQuery table to write to, formatted as "PROJECT_ID:DATASET_NAME.TABLE_NAME".

Optional parameters

  • inputTopic : The Pub/Sub topic to read from, formatted as "projects/<PROJECT_ID>/topics/<TOPIC_NAME>".
  • inputSubscription : The Pub/Sub subscription to read from, formatted as "projects/<PROJECT_ID>/subscriptions/<SUBCRIPTION_NAME>".
  • outputDeadletterTable : The BigQuery table to use for messages that failed to reach the output table, formatted as "PROJECT_ID:DATASET_NAME.TABLE_NAME". If the table doesn't exist, it is created when the pipeline runs. If this parameter is not specified, the value "OUTPUT_TABLE_SPEC_error_records" is used instead.
  • useStorageWriteApiAtLeastOnce : When using the Storage Write API, specifies the write semantics. To use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), set this parameter to true. To use exactly-once semantics, set the parameter to false. This parameter applies only when useStorageWriteApi is true. The default value is false.
  • useStorageWriteApi : If true, the pipeline uses the BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). The default value is false. For more information, see Using the Storage Write API (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams : When using the Storage Write API, specifies the number of write streams. If useStorageWriteApi is true and useStorageWriteApiAtLeastOnce is false, then you must set this parameter. Defaults to: 0.
  • storageWriteApiTriggeringFrequencySec : When using the Storage Write API, specifies the triggering frequency, in seconds. If useStorageWriteApi is true and useStorageWriteApiAtLeastOnce is false, then you must set this parameter.
  • javascriptTextTransformGcsPath : The Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) to use. (Example: gs://my-bucket/my-udfs/my_file.js).
  • javascriptTextTransformFunctionName : The name of the JavaScript user-defined function (UDF) to use. For example, if your JavaScript function code is myTransform(inJson) { /*...do stuff...*/ }, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes : Specifies how frequently to reload the UDF, in minutes. If the value is greater than 0, Dataflow periodically checks the UDF file in Cloud Storage, and reloads the UDF if the file is modified. This parameter allows you to update the UDF while the pipeline is running, without needing to restart the job. If the value is 0, UDF reloading is disabled. The default value is 0.

User-defined function

Optionally, you can extend this template by writing a user-defined function (UDF). The template calls the UDF for each input element. Element payloads are serialized as JSON strings. For more information, see Create user-defined functions for Dataflow templates.

Function specification

The UDF has the following specification:

  • Input: the Pub/Sub message data field, serialized as a JSON string.
  • Output: a JSON string that matches the schema of the BigQuery destination table.
  • Run the template

    Console

    1. Go to the Dataflow Create job from template page.
    2. Go to Create job from template
    3. In the Job name field, enter a unique job name.
    4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

      For a list of regions where you can run a Dataflow job, see Dataflow locations.

    5. From the Dataflow template drop-down menu, select the Pub/Sub to BigQuery template.
    6. In the provided parameter fields, enter your parameter values.
    7. Optional: To switch from exactly-once processing to at-least-once streaming mode, select At Least Once.
    8. Click Run job.

    gcloud

    In your shell or terminal, run the template:

    gcloud dataflow flex-template run JOB_NAME \
        --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_BigQuery_Flex \
        --template-file-gcs-location REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME
    

    Replace the following:

    • JOB_NAME: a unique job name of your choice
    • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • STAGING_LOCATION: the location for staging local files (for example, gs://your-bucket/staging)
    • TOPIC_NAME: your Pub/Sub topic name
    • DATASET: your BigQuery dataset
    • TABLE_NAME: your BigQuery table name

    API

    To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
           "inputTopic": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_BigQuery_Flex",
       }
    }
    

    Replace the following:

    • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
    • JOB_NAME: a unique job name of your choice
    • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • STAGING_LOCATION: the location for staging local files (for example, gs://your-bucket/staging)
    • TOPIC_NAME: your Pub/Sub topic name
    • DATASET: your BigQuery dataset
    • TABLE_NAME: your BigQuery table name

    What's next