Receive messages from the Cloud Pub/Sub service.
This class is used to receive message from a given subscription, with a fixed configuration such as credentials, and background threads. Applications that receive messages from multiple subscriptions need to create separate instances of this class. Applications wanting to receive events with configuration parameters also need to create separate instances.
See Also
https://cloud.google.com/pubsub for an overview of the Cloud Pub/Sub service.
Example: subscriber quickstart
namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
return subscriber.Subscribe(
[&](pubsub::Message const& m, pubsub::AckHandler h) {
std::cout << "Received message " << m << "\n";
std::move(h).ack();
PleaseIgnoreThisSimplifiesTestingTheSamples();
});
};
Performance
Subscriber
objects are relatively cheap to create, copy, and move. However, each Subscriber
object must be created with a std::shared_ptr<
SubscriberConnection
>
, which itself is relatively expensive to create. Therefore, connection instances should be shared when possible. See the MakeSubscriberConnection()
function and the SubscriberConnection
interface for more details.
Thread Safety
Instances of this class created via copy-construction or copy-assignment share the underlying pool of connections. Access to these copies via multiple threads is guaranteed to work. Two threads operating on the same instance of this class is not guaranteed to work.
Background Threads
This class uses the background threads configured via the Options
from GrpcOptionList
. Applications can create their own pool of background threads by (a) creating their own google::cloud::CompletionQueue
, (b) passing this completion queue as a GrpcCompletionQueueOption
, and (c) attaching any number of threads to the completion queue.
Example: using a custom thread pool
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcCompletionQueueOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string subscription_id) {
// Create our own completion queue to run the background activity.
google::cloud::CompletionQueue cq;
// Setup one or more of threads to service this completion queue. These must
// remain running until all the work is done.
std::vector<std::thread> tasks;
std::generate_n(std::back_inserter(tasks), 4, [&cq] {
return std::thread([cq]() mutable { cq.Run(); });
});
auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
Options{}.set<GrpcCompletionQueueOption>(cq)));
// Because this is an example we want to exit eventually, use a mutex and
// condition variable to notify the current thread and stop the example.
std::mutex mu;
std::condition_variable cv;
int count = 0;
auto await_count = [&] {
std::unique_lock<std::mutex> lk(mu);
cv.wait(lk, [&] { return count >= 4; });
};
auto increase_count = [&] {
std::unique_lock<std::mutex> lk(mu);
if (++count >= 4) cv.notify_one();
};
// Receive messages in the previously allocated thread pool.
auto session = subscriber.Subscribe(
[&](pubsub::Message const& m, pubsub::AckHandler h) {
std::cout << "Received message " << m << "\n";
increase_count();
std::move(h).ack();
});
await_count();
session.cancel();
// Report any final status, blocking until the subscription loop completes,
// either with a failure or because it was canceled.
auto status = session.get();
std::cout << "Message count=" << count << ", status=" << status << "\n";
// Shutdown the completion queue and join the threads
cq.Shutdown();
for (auto& t : tasks) t.join();
}
Asynchronous Functions
Some of the member functions in this class return a future<T>
(or future<
StatusOr
<T>>
) object. Readers are probably familiar with std::future<T>
. Our version adds a .then()
function to attach a callback to the future, which is invoked when the future is satisfied. This function returns a future<U>
where U
is the return value of the attached function. More details in the google::cloud::future
documentation.
Error Handling
This class uses StatusOr
<T>
to report errors. When an operation fails to perform its work the returned StatusOr
<T>
contains the error details. If the ok()
member function in the StatusOr
<T>
returns true
then it contains the expected result. Please consult the google::cloud::StatusOr
documentation for more details.
Changing Retry Parameters Example
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
auto sample = [](std::string project_id, std::string subscription_id) {
// By default a subscriber will retry for 60 seconds, with an initial
// backoff of 100ms, a maximum backoff of 60 seconds, and the backoff will
// grow by 30% after each attempt. This changes those defaults.
auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
Options{}
.set<pubsub::RetryPolicyOption>(
pubsub::LimitedTimeRetryPolicy(
/*maximum_duration=*/std::chrono::minutes(1))
.clone())
.set<pubsub::BackoffPolicyOption>(
pubsub::ExponentialBackoffPolicy(
/*initial_delay=*/std::chrono::milliseconds(200),
/*maximum_delay=*/std::chrono::seconds(10),
/*scaling=*/2.0)
.clone())));
auto session = subscriber.Subscribe(
[](pubsub::Message const& m, pubsub::AckHandler h) {
std::move(h).ack();
std::cout << "Received message " << m << "\n";
PleaseIgnoreThisSimplifiesTestingTheSamples();
});
return std::make_pair(subscriber, std::move(session));
};
Constructors
Subscriber(std::shared_ptr< SubscriberConnection >, Options)
Parameters | |
---|---|
Name | Description |
connection |
std::shared_ptr< SubscriberConnection >
|
opts |
Options
|
Functions
Subscribe(ApplicationCallback, Options)
Creates a new session to receive messages from subscription
.
Idempotency
This is an idempotent operation; it only reads messages from the service. Will make multiple attempts to start a connection to the service, subject to the retry policies configured in the SubscriberConnection
. Once a successful connection is established the library will try to resume the connection even if the connection fails with a permanent error. Resuming the connection is subject to the retry policies as described earlier.
Note that calling AckHandler::ack()
and/or AckHandler::nack()
is handled differently with respect to retrying. Check the documentation of these functions for details.
Example
namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
return subscriber.Subscribe(
[&](pubsub::Message const& m, pubsub::AckHandler h) {
std::cout << "Received message " << m << "\n";
std::move(h).ack();
PleaseIgnoreThisSimplifiesTestingTheSamples();
});
};
Parameters | |
---|---|
Name | Description |
cb |
ApplicationCallback
the callable invoked when messages are received. |
opts |
Options
any option overrides to use in this call. These options take precedence over the options passed in the constructor, and over any options provided in the |
Returns | |
---|---|
Type | Description |
future< Status > | a future that is satisfied when the session will no longer receive messages. For example, because there was an unrecoverable error trying to receive data. Calling |
Subscribe(ExactlyOnceApplicationCallback, Options)
Creates a new session to receive messages from subscription
using exactly-once delivery.
Idempotency
This is an idempotent operation; it only reads messages from the service. Will make multiple attempts to start a connection to the service, subject to the retry policies configured in the SubscriberConnection
. Once a successful connection is established the library will try to resume the connection even if the connection fails with a permanent error. Resuming the connection is subject to the retry policies as described earlier.
Note that calling ExactlyOnceAckHandler::ack()
and/or ExactlyOnceAckHandler::nack()
have their own rules with respect to retrying. Check the documentation of these functions for details.
Example
namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
return subscriber.Subscribe(
[&](pubsub::Message const& m, pubsub::ExactlyOnceAckHandler h) {
std::cout << "Received message " << m << "\n";
std::move(h).ack().then([id = m.message_id()](auto f) {
auto status = f.get();
std::cout << "Message id " << id
<< " ack() completed with status=" << status << "\n";
});
PleaseIgnoreThisSimplifiesTestingTheSamples();
});
};
Parameters | |
---|---|
Name | Description |
cb |
ExactlyOnceApplicationCallback
the callable invoked when messages are received. |
opts |
Options
any option overrides to use in this call. These options take precedence over the options passed in the constructor, and over any options provided in the |
Returns | |
---|---|
Type | Description |
future< Status > | a future that is satisfied when the session will no longer receive messages. For example, because there was an unrecoverable error trying to receive data. Calling |
Pull(Options)
Pulls one message from subscription
.
Idempotency
This is an idempotent operation; it only reads messages from the service. It will make multiple attempts to pull a message from the service, subject to the retry policies configured in the SubscriberConnection
.
Note that calling PullAckHandler::ack()
and/or PullAckHandler::nack()
have their own rules with respect to retrying.
Example
[](google::cloud::pubsub::Subscriber subscriber) {
auto response = subscriber.Pull();
if (!response) throw std::move(response).status();
std::cout << "Received message " << response->message << "\n";
std::move(response->handler).ack();
}
Parameter | |
---|---|
Name | Description |
opts |
Options
any option overrides to use in this call. These options take precedence over the options passed in the constructor, and over any options provided in the |
Returns | |
---|---|
Type | Description |
StatusOr< PullResponse > | a response including the message and a |