This document describes how to read data from BigQuery to Dataflow.
Overview
For most use cases, consider using Managed I/O to read from BigQuery. Managed I/O provides features such as automatic upgrades and a consistent configuration API. When reading from BigQuery, Managed I/O performs direct table reads which offers the best read performance.
If you need more advanced performance tuning, consider using the BigQueryIO
connector. The BigQueryIO
connector supports both direct table reads and
reading from BigQuery export jobs. It also offers more
fine-grained control over deserializing of table records. For more information,
see Use the BigQueryIO
connector in this document.
Column projection and filtering
To reduce the volume of data that your pipeline reads from BigQuery, you can use the following techniques:
- Column projection specifies a subset of columns to read from the table. Use column projection when your table has a large number columns and you only need to read a subset of them.
- Row filtering specifies a predicate to apply to the table. The BigQuery read operation only returns rows that match the filter, which can reduce the total amount of data ingested by the pipeline.
The following example reads the "user_name"
and "age"
columns from a table
and filters out rows that don't match the predicate "age > 18"
. This example
uses Managed I/O.
Java
To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
Read from a query result
The following example uses Managed I/O to read the result of a SQL query. It runs a query against a BigQuery public dataset. You can also use SQL queries to read from a BigQuery view or materialized view.
Java
To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
Use the BigQueryIO
connector
The BigQueryIO
connector supports the following serialization methods:
- Read the data as Avro-formatted records. Using this method, you provide a function that parses the Avro records into a custom data type.
- Read the data as
TableRow
objects. This method is convenient because it doesn't require a custom data type. However, it generally has lower performance than reading Avro-formatted records.
The connector supports two options for reading data:
- Export job. By default, the
BigQueryIO
connector runs a BigQuery export job that writes the table data to Cloud Storage. The connector then reads the data from Cloud Storage. - Direct table reads. This option is the faster than export jobs, because it
uses the BigQuery Storage Read API and skips
the export step. To use direct table reads, call
withMethod(Method.DIRECT_READ)
when you build the pipeline.
When choosing which option to use, consider the following points:
In general, we recommend using direct table reads. The Storage Read API is better suited to data pipelines than export jobs, because it doesn't need the intermediate step of exporting data.
If you use direct reads, you are charged for Storage Read API usage. See Data extraction pricing in the BigQuery pricing page.
There is no additional cost for export jobs. However, export jobs have limits. For large data movement, where timeliness is a priority and cost is adjustable, direct reads are recommended.
The Storage Read API has quota limits. Use Google Cloud metrics to monitor your quota usage.
If you use export jobs, set the
--tempLocation
pipeline option to specify a Cloud Storage bucket for the exported files.When using the Storage Read API, you might see lease expiration and session timeout errors in the logs, such as:
DEADLINE_EXCEEDED
Server Unresponsive
StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session
`
These errors can occur when an operation takes longer than the timeout, usually in pipelines that run for longer than 6 hours. To mitigate this issue, switch to file exports.
The degree of parallelism depends on the read method:
Direct reads: The I/O connector produces a dynamic number of streams, based on the size of the export request. It reads these streams directly from BigQuery in parallel.
Export jobs: BigQuery determines how many files to write to Cloud Storage. The number of files depends on the query and the volume of data. The I/O connector reads the exported files in parallel.
The following table shows performance metrics for various
BigQuery I/O read options. The workloads were run on one
e2-standard2
worker, using the Apache Beam SDK 2.49.0 for Java. They did
not use Runner v2.
100 M records | 1 kB | 1 column | Throughput (bytes) | Throughput (elements) |
---|---|---|
Storage Read | 120 MBps | 88,000 elements per second |
Avro Export | 105 MBps | 78,000 elements per second |
Json Export | 110 MBps | 81,000 elements per second |
These metrics are based on simple batch pipelines. They are intended to compare performance between I/O connectors, and are not necessarily representative of real-world pipelines. Dataflow pipeline performance is complex, and is a function of VM type, the data being processed, the performance of external sources and sinks, and user code. Metrics are based on running the Java SDK, and aren't representative of the performance characteristics of other language SDKs. For more information, see Beam IO Performance.
Examples
The following code examples use the BigQueryIO
connector with direct table
reads. To use an export job instead, omit the call to withMethod
.
Read Avro-formatted records
This example shows how to use the BigQueryIO
connector to read Avro-formatted
records.
To read BigQuery data into Avro-formatted records, use the
read(SerializableFunction)
method. This method
takes an application-defined function that parses
SchemaAndRecord
objects and returns a
custom data type. The output from the connector is a PCollection
of your
custom data type.
The following code reads a PCollection<MyData>
from a BigQuery
table, where MyData
is an application-defined class.
Java
To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
The read
method takes a SerializableFunction<SchemaAndRecord, T>
interface,
which defines a function to convert from Avro records to a custom data class. In
the previous code example, the MyData.apply
method implements this conversion
function. The example function parses the name
and age
fields from the Avro
record and returns a MyData
instance.
To specify which BigQuery table to read, call the from
method,
as shown in the previous example. For more information, see
Table names
in the BigQuery I/O connector documentation.
Read TableRow
objects
This example shows how to use the BigQueryIO
connector to read TableRow
objects.
The readTableRows
method reads
BigQuery data into a PCollection
of
TableRow
objects. Each TableRow
is a
map of key-value pairs that holds a single row of table data. Specify the
BigQuery table to read by calling the from
method.
The following code reads a PCollection<TableRows>
from a
BigQuery table.
Java
To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
This example also shows how to access the values from the TableRow
dictionary.
Integer values are encoded as strings to match BigQuery's
exported JSON format.