This document describes how to write text data from Dataflow to
Pub/Sub by using the Apache Beam PubSubIO
I/O connector.
Overview
To write data to Pub/Sub, use the PubSubIO
connector. The input
elements can be either Pub/Sub messages or just the message data.
If the input elements are Pub/Sub messages, you can optionally
set attributes or an ordering key on each message.
You can use either the Java, Python, or Go version of the PubSubIO
connector,
as follows:
Java
To write to a single topic, call the
PubsubIO.writeMessages
method. This
method takes an input collection of PubsubMessage
objects. The connector
also defines convenience methods for writing strings, binary-encoded Avro
messages, or binary-encoded protobuf messages. These methods convert the input
collection into Pub/Sub messages.
To write to a dynamic set of topics based on the input data, call
writeMessagesDynamic
. Specify
the destination topic for each message by calling PubsubMessage.withTopic
on
the message. For example, you can route messages to different topics based on
the value of a particular field in your input data.
For more information, see the
PubsubIO
reference documentation.
Python
Call the pubsub.WriteToPubSub
method.
By default, this method takes an input collection of type bytes
,
representing the message payload. If the with_attributes
parameter is
True
, the method takes a collection of PubsubMessage
objects.
For more information, see the
pubsub
module
reference documentation.
Go
To write data to Pub/Sub, call the
pubsubio.Write
method. This method takes an
input collection of either PubSubMessage
objects or byte slices that contain
the message payloads.
For more information, see the
pubsubio
package
reference documentation.
For more information about Pub/Sub messages, see Message format in the Pub/Sub documentation.
Timestamps
Pub/Sub sets a timestamp on each message. This timestamp
represents the time when the message is published to Pub/Sub. In a
streaming scenario, you might also care about the event timestamp, which
is the time when the message data was generated. You can use the Apache Beam
element timestamp
to represent event time. Sources that create an unbounded PCollection
often
assign each new element a timestamp that corresponds to the event time.
For Java and Python, the Pub/Sub I/O connector can write each element's timestamp as a Pub/Sub message attribute. Message consumers can use this attribute to get the event timestamp.
Java
Call PubsubIO.Write<T>.withTimestampAttribute
and specify the name of the
attribute.
Python
Specify the timestamp_attribute
parameter when you call WriteToPubSub
.
Message delivery
Dataflow supports exactly-once processing of messages within a pipeline. However, the Pub/Sub I/O connector can't guarantee exactly-once delivery of messages through Pub/Sub.
For Java and Python, you can configure the Pub/Sub I/O connector to write each element's unique ID as a message attribute. Message consumers can then use this attribute to deduplicate messages.
Java
Call PubsubIO.Write<T>.withIdAttribute
and specify the name of the
attribute.
Python
Specify the id_label
parameter when you call WriteToPubSub
.
Direct output
If you enable at-least-once streaming mode in your pipeline, then the I/O connector uses direct output. In this mode, the connector doesn't checkpoint messages, which enables faster writes. However, retries in this mode might cause duplicate messages with different message IDs, possibly making it harder for message consumers to deduplicate the messages.
For pipelines that use exactly-once mode, you can enable direct output by
setting the streaming_enable_pubsub_direct_output
service option. Direct output
reduces write latency and results in more efficient processing. Consider this
option if your message consumers can handle duplicate messages with non-unique
message IDs.
Examples
The following example creates a PCollection
of Pub/Sub messages
and writes them to a Pub/Sub topic. The topic is specified as a
pipeline option. Each message contains payload data and a set of attributes.
Java
To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
To authenticate to Dataflow, set up Application Default Credentials.
For more information, see
Set up authentication for a local development environment.
Python