Use the BigQuery connector with Dataproc Serverless for Spark

Use the spark-bigquery-connector with Apache Spark to read and write data from and to BigQuery. This tutorial demonstrates a PySpark application that uses the spark-bigquery-connector.

Use the BigQuery connector with your workload

See Dataproc Serverless for Spark runtime releases to determine the BigQuery connector version that is installed in your batch workload runtime version. If the connector is not listed, see the next section for instructions on how to make the connector available to applications.

How to use the connector with Spark runtime version 2.0

The BigQuery connector is not installed in Spark runtime version 2.0. When using Spark runtime version 2.0, you can make the connector available to your application in one of the following ways:

  • Use the jars parameter to point to a connector jar file when you submit your Dataproc Serverless for Spark batch workload The following example specifies a connector jar file (see the GoogleCloudDataproc/spark-bigquery-connector repository on GitHub for a list of available connector jar files).
    • Google Cloud CLI example:
      gcloud dataproc batches submit pyspark \
          --region=region \
          --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.13-version.jar \
          ... other args
      
  • Include the connector jar file in your Spark application as a dependency (see Compiling against the connector)

Calculate costs

This tutorial uses billable components of Google Cloud, including:

  • Dataproc Serverless
  • BigQuery
  • Cloud Storage

Use the Pricing Calculator to generate a cost estimate based on your projected usage. New Cloud Platform users may be eligible for a free trial.

BigQuery I/O

This example reads data from BigQuery into a Spark DataFrame to perform a word count using the standard data source API.

The connector writes the wordcount output to BigQuery by:

  1. Buffering the data into temporary files in your Cloud Storage bucket

  2. Copying the data in one operation from your Cloud Storage bucket into BigQuery

  3. Deleting the temporary files in Cloud Storage after the BigQuery load operation completes (temporary files are also deleted after the Spark application terminates). If deletion fails, you will need to delete any unwanted temporary Cloud Storage files, which typically are placed in gs://your-bucket/.spark-bigquery-jobid-UUID.

Configure billing

By default. the project associated with the credentials or service account is billed for API usage. To bill a different project, set the following configuration: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>").

It can also be added to a read/write operation, as follows: .option("parentProject", "<BILLED-GCP-PROJECT>").

Submit a PySpark wordcount batch workload

  1. Create the wordcount_dataset with the bq command-line tool in a local terminal or in Cloud Shell.
    bq mk wordcount_dataset
    
  2. Create a Cloud Storage bucket with the Google Cloud CLI in a local terminal or in Cloud Shell.
    gcloud storage buckets create gs://your-bucket
    
  3. Examine the code.
    #!/usr/bin/python
    """BigQuery I/O PySpark example."""
    from pyspark.sql import SparkSession
    
    spark = SparkSession \
      .builder \
      .appName('spark-bigquery-demo') \
      .getOrCreate()
    
    # Use the Cloud Storage bucket for temporary BigQuery export data used
    # by the connector.
    bucket = "[your-bucket-name]"
    spark.conf.set('temporaryGcsBucket', bucket)
    
    # Load data from BigQuery.
    words = spark.read.format('bigquery') \
      .option('table', 'bigquery-public-data:samples.shakespeare') \
      .load()
    words.createOrReplaceTempView('words')
    
    # Perform word count.
    word_count = spark.sql(
        'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word')
    word_count.show()
    word_count.printSchema()
    
    # Saving the data to BigQuery
    word_count.write.format('bigquery') \
      .option('table', 'wordcount_dataset.wordcount_output') \
      .save()
  4. Create wordcount.py locally in a text editor by copying the PySpark code from the PySpark code listing, Replace the [your-bucket] placeholder with the name of the Cloud Storage bucket you created.
  5. Submit the PySpark batch workload:
    gcloud dataproc batches submit pyspark wordcount.py \
        --region=region \
        --deps-bucket=your-bucket
    
    Sample terminal output:
    ...
    +---------+----------+
    |     word|word_count|
    +---------+----------+
    |     XVII|         2|
    |    spoil|        28|
    |    Drink|         7|
    |forgetful|         5|
    |   Cannot|        46|
    |    cures|        10|
    |   harder|        13|
    |  tresses|         3|
    |      few|        62|
    |  steel'd|         5|
    | tripping|         7|
    |   travel|        35|
    |   ransom|        55|
    |     hope|       366|
    |       By|       816|
    |     some|      1169|
    |    those|       508|
    |    still|       567|
    |      art|       893|
    |    feign|        10|
    +---------+----------+
    only showing top 20 rows
    
    root
     |-- word: string (nullable = false)
     |-- word_count: long (nullable = true)
    

    To preview the output table in the Google Cloud console, open your project's BigQuery page, select the wordcount_output table, and then click Preview.

For more information