Create continuous queries

This document describes how to run a continuous query in BigQuery.

BigQuery continuous queries are SQL statements that run continuously. Continuous queries let you analyze incoming data in BigQuery in real time, and then either export the results to Bigtable or Pub/Sub, or write the results to a BigQuery table.

Choose an account type

You can create and run a continuous query job by using a user account, or you can create a continuous query job by using a user account and then run it by using a service account. You must use a service account to run a continuous query that exports results to a Pub/Sub topic.

When you use a user account, a continuous query runs for two days. When you use a service account, a continuous query runs until it is explicitly canceled. For more information, see Authorization.

Required permissions

This section describes the permissions that you need to create and run a continuous query. As an alternative to the Identity and Access Management (IAM) roles mentioned, you could get the required permissions through custom roles.

Permissions when using a user account

This section provides information about the roles and permissions required to create and run a continuous query by using a user account.

To create a job in BigQuery, the user account must have the bigquery.jobs.create IAM permission. Each of the following IAM roles grants the bigquery.jobs.create permission:

To export data from a BigQuery table, the user account must have the bigquery.tables.export IAM permission . Each of the following IAM roles grants the bigquery.tables.export permission:

To update data in a BigQuery table, the user account must have the bigquery.tables.updateData IAM permission. Each of the following IAM roles grants the bigquery.tables.updateData permission:

If the user account must enable the APIs required for your continuous query use case, the user account must have the Service Usage Admin (roles/serviceusage.serviceUsageAdmin) role.

Permissions when using a service account

This section provides information about the roles and permissions required by the user account that creates the continuous query, and the service account that runs the continuous query.

User account permissions

To create a job in BigQuery, the user account must have the bigquery.jobs.create IAM permission. Each of the following IAM roles grants the bigquery.jobs.create permission:

To submit a job that runs using a service account, the user account must have the Service Account User (roles/iam.serviceAccountUser) role. If you are using the same user account to create the service account, then the user account must have the Service Account Admin (roles/iam.serviceAccountAdmin) role. For information on how to limit a user's access to single service account, rather than to all service accounts within a project, see Grant a single role.

If the user account must enable the APIs required for your continuous query use case, the user account must have the Service Usage Admin (roles/serviceusage.serviceUsageAdmin) role.

Service account permissions

To export data from a BigQuery table, the service account must have the bigquery.tables.export IAM permission. Each of the following IAM roles grants the bigquery.tables.export permission:

To update data in a BigQuery table, the service account must have the bigquery.tables.updateData IAM permission. Each of the following IAM roles grants the bigquery.tables.updateData permission:

Before you begin

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the BigQuery API.

    Enable the API

Create a reservation

Create an Enterprise or Enterprise Plus edition reservation, and then create a reservation assignment with a CONTINUOUS job type.

Export to Pub/Sub

Additional APIs, IAM permissions, and Google Cloud resources are required to export data to Pub/Sub. For more information, see Export to Pub/Sub.

Embed custom attributes as metadata in Pub/Sub messages

You can use Pub/Sub attributes to provide additional information about the message, such as its priority, origin, destination, or additional metadata. You can also use attributes to filter messages on the subscription.

Within a continuous query result, if a column is named _ATTRIBUTES, then its values are copied to the Pub/Sub message attributes. The provided fields within _ATTRIBUTES are used as attribute keys.

The _ATTRIBUTES column must be of JSON type, in the format ARRAY<STRUCT<STRING, STRING>> or STRUCT<STRING>.

For an example, see export data to a Pub/Sub topic.

Export to Bigtable

Additional APIs, IAM permissions, and Google Cloud resources are required to export data to Bigtable. For more information, see Export to Bigtable.

Write data to a BigQuery table

You can write data to a BigQuery table by using an INSERT statement.

Use AI functions

Additional APIs, IAM permissions, and Google Cloud resources are required to use a supported AI function in a continuous query. For more information, see one of the following topics, based on your use case:

When you use an AI function in a continuous query, consider whether the query output will remain within the quota for the function. If you exceed the quota, you might have to separately handle the records that don't get processed.

Run a continuous query by using a user account

This section describes how to run a continuous query by using a user account. After the continuous query is running, you can close the Google Cloud console, terminal window, or application without interrupting query execution.

Follow these steps to run a continuous query:

Console

  1. In the Google Cloud console, go to the BigQuery page.

    Go to BigQuery

  2. In the query editor, click More.

  3. In the Choose query mode section, choose Continuous query.

  4. Click Confirm.

  5. In the query editor, type in the SQL statement for the continuous query. The SQL statement must only contain supported operations.

  6. Click Run.

bq

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. In Cloud Shell, run the continuous query by using the bq query command with the --continuous flag:

    bq query --use_legacy_sql=false --continuous=true
    'QUERY'

    Replace QUERY with the SQL statement for the continuous query. The SQL statement must only contain supported operations.

API

Run the continuous query by calling the jobs.insert method. You must set the continuous field to true in the JobConfigurationQuery of the Job resource that you pass in.

curl --request POST \
  'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs
  --header 'Authorization: Bearer $(gcloud auth application-default print-access-token)' \
  --header 'Accept: application/json' \
  --header 'Content-Type: application/json' \
  --data '("configuration":("continuous":true,"useLegacySql":false,"query":"QUERY"))'
  --compressed

Replace the following:

  • PROJECT_ID: your project ID.
  • QUERY: the SQL statement for the continuous query. The SQL statement must only contain supported operations.

Run a continuous query by using a service account

This section describes how to run a continuous query by using a service account. After the continuous query is running, you can close the Google Cloud console, terminal window, or application without interrupting query execution.

Follow these steps to use a service account to run a continuous query:

Console

  1. Create a service account.
  2. Grant the required permissions to the service account.
  3. In the Google Cloud console, go to the BigQuery page.

    Go to BigQuery

  4. In the query editor, click More.

  5. In the Choose query mode section, choose Continuous query.

  6. Click Confirm.

  7. In the query editor, click More > Query settings.

  8. In the Continuous query section, use the Service account box to select the service account that you created.

  9. Click Save.

  10. In the query editor, type in the SQL statement for the continuous query. The SQL statement must only contain supported operations.

  11. Click Run.

bq

  1. Create a service account.
  2. Grant the required permissions to the service account.
  3. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  4. On the command line, run the continuous query by using the bq query command with the following flags:

    • Set the --continuous flag to true to make the query continuous.
    • Use the --connection_property flag to specify a service account to use.
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'QUERY'

    Replace the following:

    • PROJECT_ID: your project ID.
    • SERVICE_ACCOUNT_EMAIL: the service account email. You can get the service account email from the Service accounts page of the Google Cloud console.
    • QUERY: the SQL statement for the continuous query. The SQL statement must only contain supported operations.

API

  1. Create a service account.
  2. Grant the required permissions to the service account.
  3. Run the continuous query by calling the jobs.insert method. Set the following fields in the JobConfigurationQuery resource of the Job resource that you pass in:

    • Set the continuous field to true to make the query continuous.
    • Use the connection_property field to specify a service account to use.
    curl --request POST \
      'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs
      --header 'Authorization: Bearer $(gcloud auth print-access-token) \
      --header 'Accept: application/json' \
      --header 'Content-Type: application/json' \
      --data '("configuration":("query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \
      --compressed

    Replace the following:

    • PROJECT_ID: your project ID.
    • QUERY: the SQL statement for the continuous query. The SQL statement must only contain supported operations.
    • SERVICE_ACCOUNT_EMAIL: the service account email. You can get the service account email on the Service accounts page of the Google Cloud console.

Examples

The following SQL examples show common use cases for continuous queries.

Export data to a Pub/Sub topic

The following example shows a continuous query that filters data from a BigQuery table that is receiving streaming taxi ride information, and publishes the data to a Pub/Sub topic in real time with message attributes:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message,
    TO_JSON(
      STRUCT(
        CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES
  FROM `myproject.real_time_taxi_streaming.taxi_rides`
  WHERE ride_status = 'enroute'
);

Export data to a Bigtable table

The following example shows a continuous query that filters data from a BigQuery table that is receiving streaming taxi ride information, and exports the data into Bigtable table in real time:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_BIGTABLE',
    truncate = TRUE,
    overwrite = TRUE,
    uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides')
AS (
  SELECT
    CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey,
    STRUCT(
      timestamp,
      latitude,
      longitude,
      meter_reading,
      ride_status,
      passenger_count) AS features
  FROM `myproject.real_time_taxi_streaming.taxirides`
  WHERE ride_status = 'enroute'
);

Write data to a BigQuery table

The following example shows a continuous query that filters and transforms data from a BigQuery table that is receiving streaming taxi ride information, and then writes the data to another BigQuery table in real time. This makes the data available for further downstream analysis.

INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
SELECT
  timestamp,
  meter_reading,
  ride_status,
  passenger_count,
  ST_Distance(
    ST_GeogPoint(pickup_longitude, pickup_latitude),
    ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
    SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
FROM `myproject.real_time_taxi_streaming.taxirides`
WHERE
  ride_status = 'dropoff';

Process data by using a Vertex AI model

The following example shows a continuous query which uses a Vertex AI model to generate an advertisement for taxi riders based on their current latitude and longitude, and then exports the results into a Pub/Sub topic in real time:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude,
        prompt,
        ml_generate_text_llm_result)) AS message
  FROM
    ML.GENERATE_TEXT(
      MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`,
      (
        SELECT
          timestamp,
          ride_id,
          latitude,
          longitude,
          CONCAT(
            'Generate an ad based on the current latitude of ',
            latitude,
            ' and longitude of ',
            longitude) AS prompt
        FROM `myproject.real_time_taxi_streaming.taxirides`
        WHERE ride_status = 'enroute'
      ),
      STRUCT(
        50 AS max_output_tokens,
        1.0 AS temperature,
        40 AS top_k,
        1.0 AS top_p,
        TRUE AS flatten_json_output))
      AS ml_output
);

Start a continuous query from a particular point in time

When you start a continuous query, it processes all of the rows in the table that you are selecting from, and then processes new rows as they come in. If you want to skip processing some or all of the existing data, you can use the APPENDS change history function to start processing from a particular point in time.

The following example shows how to start a continuous query from a particular point in time by using the APPENDS function:

EXPORT DATA
  OPTIONS (format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS (
  SELECT
    TO_JSON_STRING(STRUCT(ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, '2024-06-12 01:23:03.652423 UTC', NULL)
  WHERE
    ride_status = 'enroute');

Modify the SQL of a continuous query

You can't update the SQL used in a continuous query while the continuous query job is running. You must cancel the continuous query job, modify the SQL, and then a start a new continuous query job from the point where you stopped the original continuous query job.

Follow these steps to modify the SQL used in a continuous query:

  1. View the job details for the continuous query job that you want to update, and note the job ID.
  2. If possible, pause collection of upstream data. If you can't do this, you might get some data duplication when the continuous query is restarted.
  3. Cancel the continuous query that you want to modify.
  4. Get the end_time value for the original continuous query job by using the INFORMATION_SCHEMA JOBS view:

    SELECT end_time
    FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
    WHERE
      EXTRACT(DATE FROM creation_time) = current_date()
    AND error_result.reason = 'stopped'
    AND job_id = 'JOB_ID';

    Replace the following:

    • PROJECT_ID: your project ID.
    • REGION: the region used by your project.
    • JOB_ID: the continuous query job ID that you identified in Step 1.
  5. Modify the continuous query SQL statement to start the continuous query from a particular point in time, using the end_time value that you retrieved in Step 5 as the starting point.

  6. Modify the continuous query SQL statement to reflect your needed changes.

  7. Run the modified continuous query.

Cancel a continuous query

You can cancel a continuous query job just like any other job. It might take up to a minute for the query to stop running after the job is cancelled.

What's next