This document describes how to integrate Apache Kafka and Pub/Sub Lite by using the Pub/Sub Group Kafka Connector.
About the Pub/Sub Group Kafka Connector
Apache Kafka is an open source platform for streaming events. It is commonly used in distributed architectures to enable communication between loosely coupled components. Pub/Sub Lite is a managed service for sending and receiving messages asynchronously. As with Kafka, you can use Pub/Sub Lite to communicate between components in your cloud architecture.
The Pub/Sub Group Kafka Connector allows you to integrate these two systems. The following connectors are packaged in the Connector JAR:
- The sink connector reads records from one or more Kafka topics and publishes them to Pub/Sub Lite.
- The source connector reads messages from a Pub/Sub Lite topic and publishes them to Kafka.
Here are some scenarios in which you might use the Pub/Sub Group Kafka Connector:
- You are migrating a Kafka-based architecture to Google Cloud.
- You have a frontend system that stores events in Kafka outside of Google Cloud, but you also use Google Cloud to run some of your backend services, which need to receive the Kafka events.
- You collect logs from an on-premises Kafka solution and send them to Google Cloud for data analytics.
- You have a frontend system that uses Google Cloud, but you also store data on-premises using Kafka.
The connector requires Kafka Connect, which is a framework for streaming data between Kafka and other systems. To use the connector, you must run Kafka Connect alongside your Kafka cluster.
This document assumes that you are familiar with both Kafka and Pub/Sub Lite. To get started with Pub/Sub Lite, see Publish and receive messages in Pub/Sub Lite by using the Google Cloud console.
Get started with the Pub/Sub Group Kafka Connector
This section walks you through the following tasks:- Configure the Pub/Sub Group Kafka Connector.
- Send events from Kafka to Pub/Sub Lite.
- Send messages from Pub/Sub Lite to Kafka.
Prerequisites
Install Kafka
Follow the Apache Kafka quickstart to install a single-node Kafka on your local machine. Complete these steps in the quickstart:
- Download the latest Kafka release and extract it.
- Start the Kafka environment.
- Create a Kafka topic.
Authenticate
The Pub/Sub Group Kafka Connector must authenticate with Pub/Sub in order to send and receive Pub/Sub messages. To set up authentication, perform the following steps:
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsublite.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/pubsublite.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
Download the connector JAR
Download the connector JAR file to your local machine. For more information, see Acquire the connector in the GitHub readme.
Copy the connector configuration files
Clone or download the GitHub repository for the connector.
git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git cd java-pubsub-group-kafka-connector
Copy the contents of the
config
directory into theconfig
subdirectory of your Kafka installation.cp config/* [path to Kafka installation]/config/
These files contain configuration settings for the connector.
Update your Kafka Connect configuration
- Navigate to the directory that contains the Kafka Connect binary that you downloaded.
- In the Kafka Connect binary directory, open the file named
config/connect-standalone.properties
in a text editor. - If the
plugin.path property
is commented out, uncomment it. Update the
plugin.path property
to include the path to the connector JAR.Example:
plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
Set the
offset.storage.file.filename
property to a local file name. In standalone mode, Kafka uses this file to store offset data.Example:
offset.storage.file.filename=/tmp/connect.offsets
Forward events from Kafka to Pub/Sub Lite
This section describes how to start the sink connector, publish events to Kafka, and then read the forwarded messages from Pub/Sub Lite.
Use the Google Cloud CLI to create a Pub/Sub Lite reservation.
gcloud pubsub lite-reservations create RESERVATION_NAME \ --location=LOCATION \ --throughput-capacity=4
Replace the following:
- RESERVATION_NAME: The name of the Pub/Sub Lite reservation.
- LOCATION: The location of the reservation.
Use the Google Cloud CLI to create a Pub/Sub Lite topic with a subscription.
gcloud pubsub lite-topics create LITE_TOPIC \ --location=LOCATION \ --partitions=2 \ --per-partition-bytes=30GiB \ --throughput-reservation=RESERVATION_NAME gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \ --location=LOCATION \ --topic=LITE_TOPIC
Replace the following:
- LITE_TOPIC: The name of the Pub/Sub Lite topic to receive messages from Kafka.
- LOCATION: The location of the topic. The value must match the location of the reservation.
- RESERVATION_NAME: The name of the Pub/Sub Lite reservation.
- LITE_SUBSCRIPTION: The name of a Pub/Sub Lite subscription for the topic.
Open the file
/config/pubsub-lite-sink-connector.properties
in a text editor. Add values for the following properties, which are marked"TODO"
in the comments:topics=KAFKA_TOPICS pubsublite.project=PROJECT_ID pubsublite.location=LOCATION pubsublite.topic=LITE_TOPIC
Replace the following:
- KAFKA_TOPICS: A comma-separated list of Kafka topics to read from.
- PROJECT_ID: The Google Cloud project that contains your Pub/Sub Lite topic.
- LOCATION: The location of the Pub/Sub Lite topic.
- LITE_TOPIC: The Pub/Sub Lite topic to receive messages from Kafka.
From the Kafka directory, run the following command:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/pubsub-lite-sink-connector.properties
Follow the steps in the Apache Kafka quickstart to write some events to your Kafka topic.
Subscribe to the Pub/Sub Lite subscription by using any of the methods shown in Receiving messages from Lite subscriptions.
Forward messages from Pub/Sub Lite to Kafka
This section describes how to start the source connector, publish messages to Pub/Sub Lite, and read the forwarded messages from Kafka.
Use the Google Cloud CLI to create a Pub/Sub Lite reservation.
gcloud pubsub lite-reservations create RESERVATION_NAME \ --location=LOCATION \ --throughput-capacity=4
Replace the following:
- RESERVATION_NAME: The name of the Pub/Sub Lite reservation.
- LOCATION: The location of the reservation.
Use the Google Cloud CLI to create a Pub/Sub Lite topic with a subscription.
gcloud pubsub lite-topics create LITE_TOPIC \ --location=LOCATION \ --partitions=2 \ --per-partition-bytes=30GiB \ --throughput-reservation=RESERVATION_NAME gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION \ --location=LOCATION \ --topic=LITE_TOPIC
Replace the following:
- LITE_TOPIC: The name of the Pub/Sub Lite topic.
- LOCATION: The location of the topic. The value must match the location of the reservation.
- RESERVATION_NAME: The name of the Pub/Sub Lite reservation.
- LITE_SUBSCRIPTION: The name of a Pub/Sub Lite subscription for the topic.
Open the file named
/config/pubsub-lite-source-connector.properties
in a text editor. Add values for the following properties, which are marked"TODO"
in the comments:topic=KAFKA_TOPIC pubsublite.project=PROJECT_ID pubsublite.location=LOCATION pubsublite.subscription=LITE_SUBSCRIPTION
Replace the following:
- KAFKA_TOPIC: The Kafka topics to receive the Pub/Sub messages.
- PROJECT_ID: The Google Cloud project that contains your Pub/Sub topic.
- LOCATION: The location of the Pub/Sub Lite topic.
- LITE_SUBSCRIPTION: The Pub/Sub Lite topic.
From the Kafka directory, run the following command:
bin/connect-standalone.sh \ config/connect-standalone.properties \ config/pubsub-lite-source-connector.properties
Publish messages to the Pub/Sub Lite topic by using any of the methods shown in Publishing messages to Lite topics.
Read the message from Kafka. Follow the steps in the Apache Kafka quickstart to read the messages from the Kafka topic.
Message conversion
A Kafka record contains a key and a value, which are variable-length byte arrays. Optionally, a Kafka record can also have headers, which are key-value pairs. A Pub/Sub Lite message has the following fields:
key
: Message key (bytes
)data
: Message data (bytes
)attributes
: Zero or more attributes. Each attribute is a(key,values[])
map. A single attribute can have multiple values.event_time
: An optional user-provided event timestamp.
Kafka Connect uses converters to serialize keys and values to and from Kafka. To control the serialization, set the following properties in the connector configuration files:
key.converter
: The converter used to serialize record keys.value.converter
: The converter used to serialize record values.
Conversion from Kafka to Pub/Sub Lite
The sink connector converts Kafka records to Pub/Sub Lite messages as follows.
Kafka record
(SinkRecord ) |
Pub/Sub Lite message |
---|---|
Key | key |
Value | data |
Headers | attributes |
Timestamp | eventTime |
Timestamp type | attributes["x-goog-pubsublite-source-kafka-event-time-type"] |
Topic | attributes["x-goog-pubsublite-source-kafka-topic"] |
Partition | attributes["x-goog-pubsublite-source-kafka-offset"] |
Offset | attributes["x-goog-pubsublite-source-kafka-partition"] |
Keys, values, and headers are encoded to as follows:
- Null schemas are treated as string schemas.
- Bytes payloads are written directly with no conversion.
- String, integer, and floating-point payloads are encoded into a sequence of UTF-8 bytes.
- All other payloads are encoded into a Protocol buffer
Value
type and then converted to a byte string.- Nested string fields are encoded into a protobuf
Value
. - Nested bytes fields are encoded to a protobuf
Value
holding the base64-encoded bytes. - Nested numeric fields are encoded as a double into a protobuf
Value
. - Maps with array, map, or struct keys are not supported.
- Nested string fields are encoded into a protobuf
Conversion from Pub/Sub Lite to Kafka
The source connector converts Pub/Sub Lite messages to Kafka records as follows:
Pub/Sub Lite message | Kafka record
(SourceRecord )
|
---|---|
key |
Key |
data |
Value |
attributes |
Headers |
event_time |
Timestamp. If event_time is not present, the publish
time is used. |
Configuration options
In addition to the configurations provided by the Kafka Connect API, the connector supports the following Pub/Sub Lite configurations.
Sink connector configuration options
The sink connector supports the following configuration options.
Setting | Data type | Description |
---|---|---|
connector.class |
String |
Required. The Java class for the connector. For
the Pub/Sub Lite sink connector, the value must be
com.google.pubsublite.kafka.sink.PubSubLiteSinkConnector .
|
gcp.credentials.file.path |
String |
Optional. The path to a file that stores Google Cloud credentials for authenticating Pub/Sub Lite. |
gcp.credentials.json |
String |
Optional. A JSON blob that contains Google Cloud for authenticating Pub/Sub Lite. |
pubsublite.location |
String |
Required. The location of the Pub/Sub Lite topic. |
pubsublite.project |
String |
Required. The Google Cloud that contains the Pub/Sub Lite topic. |
pubsublite.topic |
String |
Required. The Pub/Sub Lite topic to publish Kafka records to. |
topics |
String |
Required. A comma-separated list of Kafka topics to read from. |
Source connector configuration options
The source connector supports the following configuration options.
Setting | Data type | Description |
---|---|---|
connector.class |
String |
Required. The Java class for the connector. For
the Pub/Sub Lite source connector, the value must be
com.google.pubsublite.kafka.source.PubSubLiteSourceConnector .
|
gcp.credentials.file.path |
String |
Optional. The path to a file that stores Google Cloud credentials for authenticating Pub/Sub Lite. |
gcp.credentials.json |
String |
Optional. A JSON blob that contains Google Cloud for authenticating Pub/Sub Lite. |
kafka.topic |
String |
Required. The Kafka topic that receives messages from Pub/Sub Lite. |
pubsublite.location |
String |
Required. The location of the Pub/Sub Lite topic. |
pubsublite.partition_flow_control.bytes |
Long |
The maximum number of outstanding bytes per Pub/Sub Lite partition. Default: 20,000,000 |
pubsublite.partition_flow_control.messages |
Long |
The maximum number of outstanding messages per Pub/Sub Lite partition. Default: |
pubsublite.project |
String |
Required. The Google Cloud project that contains the Pub/Sub Lite topic. |
pubsublite.subscription |
String |
Required. The name of the Pub/Sub Lite subscription to pull messages from. |
What's next
- Understand the differences between Kafka and Pub/Sub.
- Learn more about the Pub/Sub Group Kafka Connector.
- See the Pub/Sub Group Kafka Connector
GitHub repository.