Stay organized with collections
Save and categorize content based on your preferences.
The AstraDB to BigQuery template is a batch pipeline that reads records
from AstraDB and writes them to BigQuery.
If the destination table doesn't exist in BigQuery, the pipeline creates
a table with the following values:
The Dataset ID, which is inherited from the Cassandra keyspace.
The Table ID, which is inherited from the Cassandra table.
The schema of the destination table is inferred from the source Cassandra table.
List and Set are mapped to BigQuery REPEATED fields.
Map are mapped to BigQuery RECORD fields.
All other types are mapped to BigQuery fields with the corresponding types.
Cassandra user-defined types (UDTs) and tuple data types are not supported.
Pipeline requirements
AstraDB account with a token
Template parameters
Required parameters
astraToken : The token value or secret resource ID. (Example: AstraCS:abcdefghij).
astraDatabaseId : The database unique identifier (UUID). (Example: cf7af129-d33a-498f-ad06-d97a6ee6eb7).
astraKeyspace : The name of the Cassandra keyspace inside of the Astra database.
astraTable : The name of the table inside of the Cassandra database. (Example: my_table).
Optional parameters
astraQuery : The query to use to filter rows instead of reading the whole table.
astraDatabaseRegion : If not provided, a default is chosen, which is useful with multi-region databases.
minTokenRangesCount : The minimal number of splits to use to distribute the query.
outputTableSpec : The BigQuery table location to write the output to. Use the format <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>. The table's schema must match the input objects.
the version name, like 2023-09-12-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates-REGION_NAME/
REGION_NAME:
the region where you want to
deploy your Dataflow job—for example, us-central1
ASTRA_TOKEN: the Astra token
ASTRA_DATABASE_ID: the database identifier
ASTRA_KEYSPACE: the Cassandra keyspace
ASTRA_TABLE: the Cassandra table
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch.
the version name, like 2023-09-12-00_RC00, to use a specific version of the
template, which can be found nested in the respective dated parent folder in the bucket—
gs://dataflow-templates-REGION_NAME/
LOCATION:
the region where you want to
deploy your Dataflow job—for example, us-central1
ASTRA_TOKEN: the Astra token
ASTRA_DATABASE_ID: the database identifier
ASTRA_KEYSPACE: the Cassandra keyspace
ASTRA_TABLE: the Cassandra table
Template source code
Java
/*
* Copyright (C) 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.astradb.templates;
import com.datastax.oss.driver.api.core.CqlSession;
import com.dtsx.astra.sdk.db.DatabaseClient;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.astradb.options.AstraDbToBigQueryOptions;
import com.google.cloud.teleport.v2.astradb.transforms.AstraDbToBigQueryMappingFn;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.utils.SecretManagerUtils;
import java.util.AbstractMap;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.astra.db.AstraDbIO;
import org.apache.beam.sdk.io.astra.db.CqlSessionHolder;
import org.apache.beam.sdk.io.astra.db.mapping.AstraDbMapper;
import org.apache.beam.sdk.io.astra.db.mapping.BeamRowDbMapperFactoryFn;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link AstraDbToBigQuery} pipeline is a batch pipeline which ingests data from AstraDB and
* outputs the resulting records to BigQuery.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/astradb-to-bigquery/README_AstraDB_to_BigQuery.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "AstraDB_To_BigQuery",
category = TemplateCategory.BATCH,
displayName = "AstraDB to BigQuery",
description = {
"The AstraDB to BigQuery template is a batch pipeline that reads records from AstraDB and writes them to BigQuery.",
"If the destination table doesn't exist in BigQuery, the pipeline creates a table with the following values:\n"
+ "- The `Dataset ID` is inherited from the Cassandra keyspace.\n"
+ "- The `Table ID` is inherited from the Cassandra table.\n",
"The schema of the destination table is inferred from the source Cassandra table.\n"
+ "- `List` and `Set` are mapped to BigQuery `REPEATED` fields.\n"
+ "- `Map` are mapped to BigQuery `RECORD` fields.\n"
+ "- All other types are mapped to BigQuery fields with the corresponding types.\n"
+ "- Cassandra user-defined types (UDTs) and tuple data types are not supported."
},
optionsClass = AstraDbToBigQuery.Options.class,
flexContainerName = "astradb-to-bigquery",
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/astradb-to-bigquery",
contactInformation = "https://cloud.google.com/support",
preview = true,
requirements = {"AstraDB account with a token"})
public class AstraDbToBigQuery {
/** Logger for the class. */
private static final Logger LOGGER = LoggerFactory.getLogger(AstraDbToBigQuery.class);
/** If not provided, it is the default token range value. */
public static final int DEFAULT_TOKEN_RANGE = 18;
/**
* Options for the sample
*
* <p>Inherits standard configuration options.
*/
public interface Options
extends PipelineOptions,
AstraDbToBigQueryOptions.AstraDbSourceOptions,
AstraDbToBigQueryOptions.BigQueryWriteOptions {}
/** Main operations. */
public static void main(String[] args) {
UncaughtExceptionLogger.register();
LOGGER.info("Starting pipeline");
try {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
LOGGER.debug("Pipeline Arguments (options) validated");
// --------------------------------
// AstraDbIO.Read<Row>
// --------------------------------
// Credentials are read from secrets manager
AbstractMap.SimpleImmutableEntry<String, byte[]> astraCredentials =
parseAstraCredentials(options);
LOGGER.debug("Astra Credentials parsed");
// Map Cassandra Table Schema into BigQuery Table Schema
SerializableFunction<AstraDbIO.Read<?>, TableSchema> bigQuerySchemaFactory =
new AstraDbToBigQueryMappingFn(options.getAstraKeyspace(), options.getAstraTable());
LOGGER.debug("Schema Mapper has been initialized");
// Map Cassandra Rows into (Apache) Beam Rows (DATA)
SerializableFunction<CqlSession, AstraDbMapper<Row>> beamRowMapperFactory =
new BeamRowDbMapperFactoryFn(options.getAstraKeyspace(), options.getAstraTable());
LOGGER.debug("Row Mapper has been initialized");
// Distribute reads across all available Cassandra nodes
int minimalTokenRangesCount =
(options.getMinTokenRangesCount() == null)
? DEFAULT_TOKEN_RANGE
: options.getMinTokenRangesCount();
// Source: AstraDb
AstraDbIO.Read<Row> astraSource =
AstraDbIO.<Row>read()
.withToken(astraCredentials.getKey())
.withSecureConnectBundle(astraCredentials.getValue())
.withKeyspace(options.getAstraKeyspace())
.withTable(options.getAstraTable())
.withMinNumberOfSplits(minimalTokenRangesCount)
.withMapperFactoryFn(beamRowMapperFactory)
.withCoder(SerializableCoder.of(Row.class))
.withEntity(Row.class);
LOGGER.debug("AstraDb Source initialization [OK]");
// --------------------------------
// BigQueryIO.Write<Row>
// --------------------------------
TableReference bqTableRef = parseBigQueryDestinationTable(options);
createBigQueryDestinationTableIfNotExist(options, bqTableRef);
LOGGER.debug("BigQuery Sink Table has been initialized");
// Sink: BigQuery
BigQueryIO.Write<Row> bigQuerySink =
BigQueryIO.<Row>write()
.to(bqTableRef)
// Specialized function reading cassandra source table and mapping to BigQuery Schema
.withSchema(bigQuerySchemaFactory.apply(astraSource))
// Provided by google, convert a Beam Row to a BigQuery TableRow
.withFormatFunction(row -> row != null ? BigQueryUtils.toTableRow(row) : null)
// Table Will be created if not exist
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND);
LOGGER.debug("BigQuery Sink initialization [OK]");
// --------------------------------
// Pipeline
// --------------------------------
Pipeline astraDbToBigQueryPipeline = Pipeline.create(options);
astraDbToBigQueryPipeline
.apply("Read From Astra", astraSource)
.apply("Write To BigQuery", bigQuerySink);
astraDbToBigQueryPipeline.run();
} finally {
// Cassandra Connection is stateful and needs to be closed
CqlSessionHolder.cleanup();
}
}
/**
* Parse Astra Credentials from secrets in secret Manager. - SecretManagerUtils is not used as
* only applied to String secrets
*
* @param options pipeline options
* @return a pair with the token and the secure bundle
*/
private static AbstractMap.SimpleImmutableEntry<String, byte[]> parseAstraCredentials(
Options options) {
String astraToken = options.getAstraToken();
if (!astraToken.startsWith("AstraCS")) {
astraToken = SecretManagerUtils.getSecret(options.getAstraToken());
}
LOGGER.info("Astra Token is parsed, value={}", astraToken.substring(0, 10) + "...");
/*
* Accessing the devops Api to retrieve the secure bundle.
*/
DatabaseClient astraDbClient = new DatabaseClient(astraToken, options.getAstraDatabaseId());
if (!astraDbClient.exist()) {
throw new RuntimeException(
"Astra Database does not exist, please check your Astra Token and Database ID");
}
byte[] astraSecureBundle = astraDbClient.downloadDefaultSecureConnectBundle();
if (!StringUtils.isEmpty(options.getAstraDatabaseRegion())) {
astraSecureBundle =
astraDbClient.downloadSecureConnectBundle(options.getAstraDatabaseRegion());
}
LOGGER.info("Astra Bundle is parsed, length={}", astraSecureBundle.length);
return new AbstractMap.SimpleImmutableEntry<>(astraToken, astraSecureBundle);
}
/**
* Create the Bog Query table Reference (provided or based on Cassandra table name).
*
* @param options pipeline options
* @return the big query table reference
*/
private static TableReference parseBigQueryDestinationTable(Options options) {
/*
* bigQueryOutputTableSpec argument is the Big Query table specification. This is parameter
* is optional. If not set, the table specification is built from the cassandra source table
* attributes: keyspace=dataset name, table=table name.
*/
String bigQueryOutputTableSpec = options.getOutputTableSpec();
if (StringUtils.isEmpty(bigQueryOutputTableSpec)) {
bigQueryOutputTableSpec =
options.getProject() + ":" + options.getAstraKeyspace() + "." + options.getAstraTable();
}
TableReference bigQueryTableReference = BigQueryUtils.toTableReference(bigQueryOutputTableSpec);
LOGGER.info("Big Query table spec has been set to {}", bigQueryOutputTableSpec);
return bigQueryTableReference;
}
/**
* Create destination dataset and tables if needed (schema mapped from Cassandra).
*
* @param options pipeline options
* @param bqTableRef big query table reference
*/
private static void createBigQueryDestinationTableIfNotExist(
Options options, TableReference bqTableRef) {
BigQuery bigquery =
BigQueryOptions.newBuilder().setProjectId(options.getProject()).build().getService();
if (null
== bigquery.getDataset(
DatasetId.of(bqTableRef.getProjectId(), bqTableRef.getDatasetId()))) {
LOGGER.info(
"Dataset was not found: creating DataSet {} in region {}",
bqTableRef.getDatasetId(),
options.getWorkerRegion());
bigquery.create(
DatasetInfo.newBuilder(bqTableRef.getDatasetId())
.setLocation(options.getWorkerRegion())
.build());
LOGGER.debug("Dataset has been created [OK]");
} else {
LOGGER.info("Dataset {} already exist", bqTableRef.getDatasetId());
}
}
}
[[["Easy to understand","easyToUnderstand","thumb-up"],["Solved my problem","solvedMyProblem","thumb-up"],["Other","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["Other","otherDown","thumb-down"]],["Last updated 2024-10-30 UTC."],[],[]]