Publish and consume messages with the CLI
Learn how to create a Google Cloud Managed Service for Apache Kafka cluster using the CLI and configure producer and consumer clients to connect to the cluster. The following guide creates a relatively small cluster designed only for testing.
Before you begin
Follow these guidelines for the quickstart:
- Create the Google Cloud project in the
us-central1
region.
Perform these steps:
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Managed Kafka, Compute Engine, and Cloud DNS APIs:
gcloud services enable managedkafka.googleapis.com
compute.googleapis.com dns.googleapis.com - Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Managed Kafka, Compute Engine, and Cloud DNS APIs:
gcloud services enable managedkafka.googleapis.com
compute.googleapis.com dns.googleapis.com
Create a cluster
A Managed Service for Apache Kafka cluster is located in a specific Google Cloud project and region. It can be accessed using a set of IP addresses within one or more subnets in any Virtual Private Cloud (VPC). The cluster's size is determined by the number of CPUs and total RAM that you allocate to it. Disk and broker configuration are automatic and cannot be adjusted.
Get a subnet identifier by using the
gcloud compute networks subnets describe
command.The following command retrieves the self link of the default subnet of your Google Cloud project in the
us-central1
region.gcloud compute networks subnets describe default --region=us-central1 \ --format='value(selfLink)' | sed 's|.*/compute/v1/||'
The output of the command is similar to the following:
projects/managed-kafka-experimental/regions/us-central1/subnetworks/default
Here,
managed-kafka-experimental
is the project ID andus-central1
is the location.Record these values from your command output as you require them later.
To create a cluster, run the
gcloud beta managed-kafka clusters create
command.gcloud beta managed-kafka clusters create test-cluster \ --location=us-central1 \ --cpu=3 \ --memory=3GiB \ --subnets=SUBNET_ID \ --async
Replace
SUBNET_ID
with the value of the subnet ID obtained from step 1.You get a response similar to the following:
Create request issued for: [test-cluster] Check operation [projects/PROJECT_ID/locations/us-central1/operations/OPERATION_ID] for status.
Store the
OPERATION_ID
to track progress.Creating a cluster usually takes 20-30 minutes. To track progress of the cluster creation, the
gcloud beta managed-kafka clusters create
command uses a long running operation (LRO), which you can monitor using the following command:curl -X GET \ -H "Authorization: Bearer $(gcloud auth print-access-token)" \ "https://managedkafka.googleapis.com/v1/projects/PROJECT_ID/locations/us-central1/operations/OPERATION_ID"
Replace the following:
OPERATION_ID
with the value of the operation ID from step 2.PROJECT_ID
with the project for your Kafka cluster.
Describe a cluster
After the cluster is created, to get the details of the cluster, run the
gcloud beta managed-kafka clusters describe
command.gcloud beta managed-kafka clusters describe test-cluster \ --location=us-central1
The output of the command is similar to the following:
bootstrapAddress: bootstrap.test-cluster.us-central1.managedkafka.PROJECT_ID.cloud.goog:9092 capacityConfig: memoryBytes: '3221225472' vcpuCount: '3' createTime: '2024-05-28T04:32:08.671168869Z' gcpConfig: accessConfig: networkConfigs: - subnet: projects/PROJECT_NUMBER/regions/us-central1/subnetworks/default name: projects/PROJECT_ID/locations/us-central1/clusters/test-cluster rebalanceConfig: mode: AUTO_REBALANCE_ON_SCALE_UP state: CREATING updateTime: '2024-05-28T04:32:08.671168869Z'
The bootstrap URL is the primary way in which Kafka clients identify a server. You can retrieve the bootstrap URL using the following command:
gcloud beta managed-kafka clusters describe \ test-cluster --location=us-central1 \ --format="value(bootstrapAddress)"
Create a topic
Create a topic in the cluster by running the command
gcloud beta managed-kafka topics create
.Here, the name of the topic is
t1
.gcloud beta managed-kafka topics create t1 \ --cluster=test-cluster --location=us-central1 --partitions=10 \ --replication-factor=3
You get an output similar to the following:
Created topic [t1]
Describe a topic
After the topic is created, to get more details about the topic, run the
gcloud beta managed-kafka topics describe
command.gcloud beta managed-kafka topics describe t1 \ --cluster=test-cluster --location=us-central1
You get an output similar to the following:
name: projects/PROJECT_ID/locations/us-central1/clusters/test-cluster/topics/t1 partitionCount: 10 replicationFactor: 3
Set up a client machine
Set up a client on a Compute Engine instance that can access the VPC containing the
default
subnet where the Kafka cluster is reachable.
Create a Compute Engine instance in a zone which is in the same region as the Kafka cluster. The instance must also be in a VPC containing the subnet that you've used in the cluster configuration. For example:
gcloud compute instances create test-instance \ --scopes=https://www.googleapis.com/auth/cloud-platform \ --subnet=SUBNET_ID \ --zone=us-central1-f
Replace
SUBNET_ID
with the value of the subnet ID obtained from step 1 of Create a cluster.For more information about creating a VM, see Create a VM instance in a specific subnet.
Use SSH to connect to the VM that you just created in the previous step. For more information about connecting using SSH, see About SSH connections.
Install the Kafka command line tools on the VM.
Here is an example for installing Kafka version 3.6.2 in a Debian environment.
sudo apt-get install default-jdk jq mkdir ~/Downloads wget -O ~/Downloads/kafka_2.13-3.6.2.tgz https://downloads.apache.org/kafka/3.6.2/kafka_2.13-3.6.2.tgz mkdir ~/kafka_home tar xfz ~/Downloads/kafka_2.13-3.6.2.tgz -C ~/kafka_home
Set up authentication on your VM with a service account.
a. Authenticate to Google Cloud CLI on your VM.
gcloud auth login
b. Create a service account from either your VM or another machine.
gcloud iam service-accounts create quickstart-sa --project=SERVICE_ACCOUNT_PROJECT_ID
Replace the
SERVICE_ACCOUNT_PROJECT_ID
with the project ID where you want to create the service account. This does not need to be the same project where your cluster is located.c. Grant the
roles/managedkafka.client
IAM role to the service account from your cluster project from either your VM or another machine.gcloud projects add-iam-policy-binding \ CLUSTER_PROJECT_ID \ --member="serviceAccount:quickstart-sa@SERVICE_ACCOUNT_PROJECT_ID.iam.gserviceaccount.com" --role=roles/managedkafka.client
Replace the following:
CLUSTER_PROJECT_ID
: the project ID where the cluster is located.SERVICE_ACCOUNT_PROJECT_ID
: the project ID where the service account is located.
d. Grant the required role to the principal that will attach the service account to other resources from either your VM or another machine.
gcloud iam service-accounts add-iam-policy-binding \ --project=SERVICE_ACCOUNT_PROJECT_ID \ quickstart-sa@SERVICE_ACCOUNT_PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_PROJECT_ID
: with the project ID where the service account is located.USER_EMAIL
: email for the principal you want to attach to the service account.
e. Generate a service account key on your VM. This will save the key to a file called
sa.key.json
in your current directory.gcloud iam service-accounts keys create sa.key.json \ --iam-account=quickstart-sa@SERVICE_ACCOUNT_PROJECT_ID.iam.gserviceaccount.com
Replace
SERVICE_ACCOUNT_PROJECT_ID
with the project ID where the service account is located.Set up Kafka client credentials. SASL_SLL security protocol with PLAIN authentication mode (username/password) is supported.
The username in this case is the service account email address and the password is the base64-encoded service account key file.
Run this command to create a file called
client.properties
.cat <<EOF>> client.properties security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \\ username="$(jq -r '.client_email' sa.key.json)" \\ password="$(cat sa.key.json | base64 -w 0)"; EOF
Include the
/bin/
directory in thePATH
bash variable.This configuration lets you run Kafka commands from your terminal.
export PATH=$PATH:~/kafka_home/kafka_2.13-3.6.2/bin
Use the Kafka command line tools
Run these commands on the client machine.
Set up the BOOTSTRAP address as an environment variable. This can be fetched from describing the cluster that was created.
export BOOTSTRAP=bootstrap.test-cluster.us-central1.managedkafka.PROJECT_ID.cloud.goog:9092
Replace
PROJECT_ID
with the value of the project for your Kafka cluster.List the topics in the cluster.
kafka-topics.sh --list \ --bootstrap-server $BOOTSTRAP \ --command-config client.properties
Write a message to the topic
t1
and consume it.echo "hello world" | kafka-console-producer.sh --topic t1 \ --bootstrap-server $BOOTSTRAP --producer.config client.properties
Consume the message from topic
t1
.kafka-console-consumer.sh --topic t1 --from-beginning \ --bootstrap-server $BOOTSTRAP --consumer.config client.properties
Run a simpler producer performance test.
kafka-producer-perf-test.sh --topic t1 --num-records 1000000 \ --throughput -1 --print-metrics --record-size 1024 \ --producer-props bootstrap.servers=$BOOTSTRAP --producer.config client.properties
View the monitoring charts
In the Google Cloud console, go to the Clusters page.
The cluster that you created is listed.
- Click the cluster.
The cluster details page is displayed. In the cluster details page, for the Resources tab, the topic
t1
is listed. - Click the topic
t1
.The topic details page is displayed.
- Click the Monitoring tab.
This tab provides visual charts that display key metrics related to the topic's activity and performance.
These time-series charts include the following:
Byte count: Rate at which bytes are produced to the topic. This indicates the volume of data written to the topic over time. The corresponding metric is
managedkafka.googleapis.com/byte_in_count
.Request count: Rate of requests made to the topic. It reflects the overall activity and usage of the topic. The related metric is
managedkafka.googleapis.com/topic_request_count
.Log segments by partition: Number of active log segments for each partition within the topic. Log segments are the physical files on disk where Kafka stores the topic data. The relevant metric is
managedkafka.googleapis.com/log_segments
.
Clean up
To avoid incurring charges to your Google Cloud account for the resources used on this page, delete the Google Cloud project with the resources.
To delete the cluster, run the
gcloud beta managed-kafka clusters delete
command:gcloud beta managed-kafka clusters delete test-cluster --location=us-central1