A Subscription object will give you access to your Cloud Pub/Sub subscription.
Subscriptions are sometimes retrieved when using various methods:
Subscription objects may be created directly with:
All Subscription objects are instances of an [EventEmitter](http://nodejs.org/api/events.html). The subscription will pull for messages automatically as long as there is at least one listener assigned for the message
event. Available events:
Upon receipt of a message: on(event: 'message', listener: (message: Message) => void): this;
Upon receipt of an error: on(event: 'error', listener: (error: Error) => void): this;
Upon the closing of the subscriber: on(event: 'close', listener: Function): this;
By default Subscription objects allow you to process 100 messages at the same time. You can fine tune this value by adjusting the options.flowControl.maxMessages
option.
If your subscription is seeing more re-deliveries than preferable, you might try increasing your options.ackDeadline
value or decreasing the options.streamingOptions.maxStreams
value.
Subscription objects handle ack management, by automatically extending the ack deadline while the message is being processed, to then issue the ack or nack of such message when the processing is done. **Note:** message redelivery is still possible.
By default each PubSub instance can handle 100 open streams, with default options this translates to less than 20 Subscriptions per PubSub instance. If you wish to create more Subscriptions than that, you can either create multiple PubSub instances or lower the options.streamingOptions.maxStreams
value on each Subscription object.
Inheritance
EventEmitter
>
Subscription
Package
@google-cloud/pubsub
Examples
From
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
pubsub.getSubscriptions((err, subscriptions) => {
// `subscriptions` is an array of Subscription objects.
});
From
const topic = pubsub.topic('my-topic');
topic.getSubscriptions((err, subscriptions) => {
// `subscriptions` is an array of Subscription objects.
});
const topic = pubsub.topic('my-topic');
topic.createSubscription('new-subscription', (err, subscription) => {
// `subscription` is a Subscription object.
});
const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('my-subscription');
// `subscription` is a Subscription object.
Once you have obtained a subscription object, you may begin to register listeners. This will automatically trigger pulling for messages.
// Register an error handler.
subscription.on('error', (err) => {});
// Register a close handler in case the subscriber closes unexpectedly
subscription.on('close', () => {});
// Register a listener for `message` events.
function onMessage(message) {
// Called every time a message is received.
// message.id = ID of the message.
// message.ackId = ID used to acknowledge the message receival.
// message.data = Contents of the message.
// message.attributes = Attributes of the message.
// message.publishTime = Date when Pub/Sub received the message.
// Ack the message:
// message.ack();
// This doesn't ack the message, but allows more messages to be retrieved
// if your limit was hit or if you don't want to ack the message.
// message.nack();
}
subscription.on('message', onMessage);
// Remove the listener from receiving `message` events.
subscription.removeListener('message', onMessage);
To apply a fine level of flow control, consider the following configuration
const subscription = topic.subscription('my-sub', {
flowControl: {
maxMessages: 1,
// this tells the client to manage and lock any excess messages
allowExcessMessages: false
}
});
Constructors
(constructor)(pubsub, name, options)
constructor(pubsub: PubSub, name: string, options?: SubscriptionOptions);
Constructs a new instance of the Subscription
class
Parameters
Properties
iam
Property Value
isOpen
Indicates if the Subscription is open and receiving messages.
{boolean}
Property Value
metadata?: google.pubsub.v1.ISubscription;
Property Value
Type | Description |
google.pubsub.v1.ISubscription | |
name
Property Value
projectId
Property Value
pubsub
Property Value
request
request: typeof PubSub.prototype.request;
Property Value
Type | Description |
typeof PubSub#request | |
topic
Property Value
Type | Description |
Topic | string | |
Methods
close()
Closes the Subscription, once this is called you will no longer receive message events unless you call {Subscription#open} or add new message listeners.
Returns
Type | Description |
Promise<void> | |
Example
subscription.close(err => {
if (err) {
// Error handling omitted.
}
});
// If the callback is omitted a Promise will be returned.
subscription.close().then(() => {});
close(callback)
close(callback: SubscriptionCloseCallback): void;
Parameter
Returns
create(options)
create(options?: CreateSubscriptionOptions): Promise<CreateSubscriptionResponse>;
Parameter
Returns
Examples
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('newMessages');
const callback = function(err, subscription, apiResponse) {};
subscription.create(callback);
With options
subscription.create({
ackDeadlineSeconds: 90
}, callback);
If the callback is omitted, we'll return a Promise.
const [sub, apiResponse] = await subscription.create();
create(callback)
create(callback: CreateSubscriptionCallback): void;
Parameter
Returns
create(options, callback)
create(options: CreateSubscriptionOptions, callback: CreateSubscriptionCallback): void;
Parameters
Returns
createSnapshot(name, gaxOpts)
createSnapshot(name: string, gaxOpts?: CallOptions): Promise<CreateSnapshotResponse>;
Create a snapshot with the given name.
Parameters
Returns
Example
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('my-subscription');
const callback = (err, snapshot, apiResponse) => {
if (!err) {
// The snapshot was created successfully.
}
};
subscription.createSnapshot('my-snapshot', callback);
//-
// If the callback is omitted, we'll return a Promise.
//-
subscription.createSnapshot('my-snapshot').then((data) => {
const snapshot = data[0];
const apiResponse = data[1];
});
createSnapshot(name, callback)
createSnapshot(name: string, callback: CreateSnapshotCallback): void;
Parameters
Returns
createSnapshot(name, gaxOpts, callback)
createSnapshot(name: string, gaxOpts: CallOptions, callback: CreateSnapshotCallback): void;
Parameters
Returns
delete(gaxOpts)
delete(gaxOpts?: CallOptions): Promise<EmptyResponse>;
Delete the subscription. Pull requests from the current subscription will be errored once unsubscription is complete.
Parameter
Returns
Example
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('my-subscription');
subscription.delete((err, apiResponse) => {});
//-
// If the callback is omitted, we'll return a Promise.
//-
subscription.delete().then((data) => {
const apiResponse = data[0];
});
delete(callback)
delete(callback: EmptyCallback): void;
Parameter
Returns
delete(gaxOpts, callback)
delete(gaxOpts: CallOptions, callback: EmptyCallback): void;
Parameters
Returns
detached()
detached(): Promise<DetachedResponse>;
Check if a subscription is detached.
Returns
Type | Description |
Promise<DetachedResponse> | {Promise
|
Example
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('my-subscription');
subscription.detached((err, exists) => {});
//-
// If the callback is omitted, we'll return a Promise.
//-
subscription.detached().then((data) => {
const detached = data[0];
});
detached(callback)
detached(callback: DetachedCallback): void;
Parameter
Name | Description |
callback |
DetachedCallback
|
Returns
exists()
exists(): Promise<ExistsResponse>;
Check if a subscription exists.
Returns
Example
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('my-subscription');
subscription.exists((err, exists) => {});
//-
// If the callback is omitted, we'll return a Promise.
//-
subscription.exists().then((data) => {
const exists = data[0];
});
exists(callback)
exists(callback: ExistsCallback): void;
Parameter
Returns
static formatMetadata_(metadata: SubscriptionMetadata): google.pubsub.v1.ISubscription;
Parameter
Returns
Type | Description |
google.pubsub.v1.ISubscription | |
static formatName_(projectId: string, name: string): string;
Parameters
Name | Description |
projectId |
string
|
name |
string
|
Returns
get(gaxOpts)
get(gaxOpts?: GetSubscriptionOptions): Promise<GetSubscriptionResponse>;
Get a subscription if it exists.
Parameter
Returns
Example
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('my-subscription');
subscription.get((err, subscription, apiResponse) => {
// The `subscription` data has been populated.
});
//-
// If the callback is omitted, we'll return a Promise.
//-
subscription.get().then((data) => {
const subscription = data[0];
const apiResponse = data[1];
});
get(callback)
get(callback: GetSubscriptionCallback): void;
Parameter
Returns
get(gaxOpts, callback)
get(gaxOpts: GetSubscriptionOptions, callback: GetSubscriptionCallback): void;
Parameters
Returns
getMetadata(gaxOpts?: CallOptions): Promise<GetSubscriptionMetadataResponse>;
Fetches the subscriptions metadata.
Parameter
Returns
Example
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('my-subscription');
subscription.getMetadata((err, apiResponse) => {
if (err) {
// Error handling omitted.
}
});
//-
// If the callback is omitted, we'll return a Promise.
//-
subscription.getMetadata().then((data) => {
const apiResponse = data[0];
});
getMetadata(callback: GetSubscriptionMetadataCallback): void;
Parameter
Returns
getMetadata(gaxOpts: CallOptions, callback: GetSubscriptionMetadataCallback): void;
Parameters
Returns
modifyPushConfig(config, gaxOpts)
modifyPushConfig(config: PushConfig, gaxOpts?: CallOptions): Promise<EmptyResponse>;
Modify the push config for the subscription.
Parameters
Returns
Example
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('my-topic');
const subscription = topic.subscription('my-subscription');
const pushConfig = {
pushEndpoint: 'https://mydomain.com/push',
attributes: {
key: 'value'
},
oidcToken: {
serviceAccountEmail: 'myproject@appspot.gserviceaccount.com',
audience: 'myaudience'
}
};
subscription.modifyPushConfig(pushConfig, (err, apiResponse) => {
if (err) {
// Error handling omitted.
}
});
//-
// If the callback is omitted, we'll return a Promise.
//-
subscription.modifyPushConfig(pushConfig).then((data) => {
const apiResponse = data[0];
});
modifyPushConfig(config, callback)
modifyPushConfig(config: PushConfig, callback: EmptyCallback): void;
Parameters
Returns
modifyPushConfig(config, gaxOpts, callback)
modifyPushConfig(config: PushConfig, gaxOpts: CallOptions, callback: EmptyCallback): void;
Parameters
Returns
open()
Opens the Subscription to receive messages. In general this method shouldn't need to be called, unless you wish to receive messages after calling . Alternatively one could just assign a new message
event listener which will also re-open the Subscription.
Returns
Example
subscription.on('message', message => message.ack());
// Close the subscription.
subscription.close(err => {
if (err) {
// Error handling omitted.
}
The subscription has been closed and messages will no longer be received.
});
// Resume receiving messages.
subscription.open();
seek(snapshot, gaxOpts)
seek(snapshot: string | Date, gaxOpts?: CallOptions): Promise<SeekResponse>;
Seeks an existing subscription to a point in time or a given snapshot.
Parameters
Returns
Example
const callback = (err, resp) => {
if (!err) {
// Seek was successful.
}
};
subscription.seek('my-snapshot', callback);
//-
// Alternatively, to specify a certain point in time, you can provide a
Date
// object.
//-
const date = new Date('October 21 2015');
subscription.seek(date, callback);
seek(snapshot, callback)
seek(snapshot: string | Date, callback: SeekCallback): void;
Parameters
Returns
seek(snapshot, gaxOpts, callback)
seek(snapshot: string | Date, gaxOpts: CallOptions, callback: SeekCallback): void;
Parameters
Name | Description |
snapshot |
string | Date
|
gaxOpts |
CallOptions
|
callback |
SeekCallback
|
Returns
setMetadata(metadata: SubscriptionMetadata, gaxOpts?: CallOptions): Promise<SetSubscriptionMetadataResponse>;
Update the subscription object.
Parameters
Returns
Example
const metadata = {
key: 'value'
};
subscription.setMetadata(metadata, (err, apiResponse) => {
if (err) {
// Error handling omitted.
}
});
//-
// If the callback is omitted, we'll return a Promise.
//-
subscription.setMetadata(metadata).then((data) => {
const apiResponse = data[0];
});
setMetadata(metadata: SubscriptionMetadata, callback: SetSubscriptionMetadataCallback): void;
Parameters
Returns
setMetadata(metadata: SubscriptionMetadata, gaxOpts: CallOptions, callback: SetSubscriptionMetadataCallback): void;
Parameters
Returns
setOptions(options)
setOptions(options: SubscriberOptions): void;
Sets the Subscription options.
Parameter
Returns
snapshot(name)
snapshot(name: string): Snapshot;
Create a Snapshot object. See to create a snapshot.
Parameter
Name | Description |
name |
string
The name of the snapshot.
|
Returns
Example
const snapshot = subscription.snapshot('my-snapshot');