Skip to content

Commit 6395212

Browse files
samples: migrate subscriber samples and add tests (part 2) (#238)
* add subscriber examples * take out sync with lease example * add sync pull with lease sample, fix some nits * kvg's suggestions: lambda to replace anonymous classes
1 parent cdb7ae7 commit 6395212

9 files changed

Lines changed: 776 additions & 0 deletions
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package pubsub;
18+
19+
// [START pubsub_quickstart_subscriber]
20+
// [START pubsub_subscriber_async_pull]
21+
22+
import com.google.cloud.pubsub.v1.AckReplyConsumer;
23+
import com.google.cloud.pubsub.v1.MessageReceiver;
24+
import com.google.cloud.pubsub.v1.Subscriber;
25+
import com.google.pubsub.v1.ProjectSubscriptionName;
26+
import com.google.pubsub.v1.PubsubMessage;
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.TimeoutException;
29+
30+
public class SubscribeAsyncExample {
31+
public static void main(String... args) throws Exception {
32+
// TODO(developer): Replace these variables before running the sample.
33+
String projectId = "your-project-id";
34+
String subscriptionId = "your-subscription-id";
35+
36+
subscribeAsyncExample(projectId, subscriptionId);
37+
}
38+
39+
public static void subscribeAsyncExample(String projectId, String subscriptionId) {
40+
ProjectSubscriptionName subscriptionName =
41+
ProjectSubscriptionName.of(projectId, subscriptionId);
42+
43+
// Instantiate an asynchronous message receiver.
44+
MessageReceiver receiver =
45+
(PubsubMessage message, AckReplyConsumer consumer) -> {
46+
// Handle incoming message, then ack the received message.
47+
System.out.println("Id: " + message.getMessageId());
48+
System.out.println("Data: " + message.getData().toStringUtf8());
49+
consumer.ack();
50+
};
51+
52+
Subscriber subscriber = null;
53+
try {
54+
subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
55+
// Start the subscriber.
56+
subscriber.startAsync().awaitRunning();
57+
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
58+
// Allow the subscriber to run for 30s unless an unrecoverable error occurs.
59+
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
60+
} catch (TimeoutException timeoutException) {
61+
// Shut down the subscriber after 30s. Stop receiving messages.
62+
subscriber.stopAsync();
63+
}
64+
}
65+
}
66+
// [END pubsub_subscriber_async_pull]
67+
// [END pubsub_quickstart_subscriber]
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package pubsub;
18+
19+
// [START pubsub_subscriber_sync_pull]
20+
21+
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
22+
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
23+
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
24+
import com.google.pubsub.v1.AcknowledgeRequest;
25+
import com.google.pubsub.v1.ProjectSubscriptionName;
26+
import com.google.pubsub.v1.PullRequest;
27+
import com.google.pubsub.v1.PullResponse;
28+
import com.google.pubsub.v1.ReceivedMessage;
29+
import java.io.IOException;
30+
import java.util.ArrayList;
31+
import java.util.List;
32+
33+
public class SubscribeSyncExample {
34+
public static void main(String... args) throws Exception {
35+
// TODO(developer): Replace these variables before running the sample.
36+
String projectId = "your-project-id";
37+
String subscriptionId = "your-subscription-id";
38+
Integer numOfMessages = 10;
39+
40+
subscribeSyncExample(projectId, subscriptionId, numOfMessages);
41+
}
42+
43+
public static void subscribeSyncExample(
44+
String projectId, String subscriptionId, Integer numOfMessages) throws IOException {
45+
SubscriberStubSettings subscriberStubSettings =
46+
SubscriberStubSettings.newBuilder()
47+
.setTransportChannelProvider(
48+
SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
49+
.setMaxInboundMessageSize(20 * 1024 * 1024) // 20MB (maximum message size).
50+
.build())
51+
.build();
52+
53+
try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
54+
String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
55+
PullRequest pullRequest =
56+
PullRequest.newBuilder()
57+
.setMaxMessages(numOfMessages)
58+
.setSubscription(subscriptionName)
59+
.build();
60+
61+
// Use pullCallable().futureCall to asynchronously perform this operation.
62+
PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
63+
List<String> ackIds = new ArrayList<>();
64+
for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
65+
// Handle received message
66+
// ...
67+
ackIds.add(message.getAckId());
68+
}
69+
// Acknowledge received messages.
70+
AcknowledgeRequest acknowledgeRequest =
71+
AcknowledgeRequest.newBuilder()
72+
.setSubscription(subscriptionName)
73+
.addAllAckIds(ackIds)
74+
.build();
75+
76+
// Use acknowledgeCallable().futureCall to asynchronously perform this operation.
77+
subscriber.acknowledgeCallable().call(acknowledgeRequest);
78+
System.out.println(pullResponse.getReceivedMessagesList());
79+
}
80+
}
81+
}
82+
// [END pubsub_subscriber_sync_pull]
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package pubsub;
18+
19+
// [START pubsub_subscriber_sync_pull_with_lease]
20+
21+
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
22+
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
23+
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
24+
import com.google.pubsub.v1.AcknowledgeRequest;
25+
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
26+
import com.google.pubsub.v1.ProjectSubscriptionName;
27+
import com.google.pubsub.v1.PullRequest;
28+
import com.google.pubsub.v1.PullResponse;
29+
import com.google.pubsub.v1.ReceivedMessage;
30+
import java.io.IOException;
31+
import java.util.ArrayList;
32+
import java.util.List;
33+
34+
public class SubscribeSyncWithLeaseExample {
35+
public static void main(String... args) throws Exception {
36+
// TODO(developer): Replace these variables before running the sample.
37+
String projectId = "your-project-id";
38+
String subscriptionId = "your-subscription-id";
39+
Integer numOfMessages = 10;
40+
41+
projectId = "tz-playground-bigdata";
42+
subscriptionId = "uno";
43+
44+
subscribeSyncWithLeaseExample(projectId, subscriptionId, numOfMessages);
45+
}
46+
47+
public static void subscribeSyncWithLeaseExample(
48+
String projectId, String subscriptionId, Integer numOfMessages)
49+
throws IOException, InterruptedException {
50+
SubscriberStubSettings subscriberStubSettings =
51+
SubscriberStubSettings.newBuilder()
52+
.setTransportChannelProvider(
53+
SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
54+
.setMaxInboundMessageSize(20 << 20) // 20 MB
55+
.build())
56+
.build();
57+
58+
try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {
59+
60+
String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
61+
62+
PullRequest pullRequest =
63+
PullRequest.newBuilder()
64+
.setMaxMessages(numOfMessages)
65+
.setSubscription(subscriptionName)
66+
.build();
67+
68+
// Use pullCallable().futureCall to asynchronously perform this operation.
69+
PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
70+
71+
List<String> ackIds = new ArrayList<>();
72+
73+
for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
74+
ackIds.add(message.getAckId());
75+
76+
// Modify the ack deadline of each received message from the default 10 seconds to 30.
77+
// This prevents the server from redelivering the message after the default 10 seconds
78+
// have passed.
79+
ModifyAckDeadlineRequest modifyAckDeadlineRequest =
80+
ModifyAckDeadlineRequest.newBuilder()
81+
.setSubscription(subscriptionName)
82+
.addAckIds(message.getAckId())
83+
.setAckDeadlineSeconds(30)
84+
.build();
85+
86+
subscriber.modifyAckDeadlineCallable().call(modifyAckDeadlineRequest);
87+
}
88+
89+
// Acknowledge received messages.
90+
AcknowledgeRequest acknowledgeRequest =
91+
AcknowledgeRequest.newBuilder()
92+
.setSubscription(subscriptionName)
93+
.addAllAckIds(ackIds)
94+
.build();
95+
96+
// Use acknowledgeCallable().futureCall to asynchronously perform this operation.
97+
subscriber.acknowledgeCallable().call(acknowledgeRequest);
98+
System.out.println(pullResponse.getReceivedMessagesList());
99+
}
100+
}
101+
}
102+
// [END pubsub_subscriber_sync_pull_with_lease]
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package pubsub;
18+
19+
// [START pubsub_subscriber_concurrency_control]
20+
21+
import com.google.api.gax.core.ExecutorProvider;
22+
import com.google.api.gax.core.InstantiatingExecutorProvider;
23+
import com.google.cloud.pubsub.v1.AckReplyConsumer;
24+
import com.google.cloud.pubsub.v1.MessageReceiver;
25+
import com.google.cloud.pubsub.v1.Subscriber;
26+
import com.google.pubsub.v1.ProjectSubscriptionName;
27+
import com.google.pubsub.v1.PubsubMessage;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.TimeoutException;
30+
31+
public class SubscribeWithConcurrencyControlExample {
32+
public static void main(String... args) throws Exception {
33+
// TODO(developer): Replace these variables before running the sample.
34+
String projectId = "your-project-id";
35+
String subscriptionId = "your-subscription-id";
36+
37+
subscribeWithConcurrencyControlExample(projectId, subscriptionId);
38+
}
39+
40+
public static void subscribeWithConcurrencyControlExample(
41+
String projectId, String subscriptionId) {
42+
ProjectSubscriptionName subscriptionName =
43+
ProjectSubscriptionName.of(projectId, subscriptionId);
44+
45+
// Instantiate an asynchronous message receiver.
46+
MessageReceiver receiver =
47+
(PubsubMessage message, AckReplyConsumer consumer) -> {
48+
// Handle incoming message, then ack the received message.
49+
System.out.println("Id: " + message.getMessageId());
50+
System.out.println("Data: " + message.getData().toStringUtf8());
51+
consumer.ack();
52+
};
53+
54+
Subscriber subscriber = null;
55+
try {
56+
// Provides an executor service for processing messages. The default `executorProvider` used
57+
// by the subscriber has a default thread count of 5.
58+
ExecutorProvider executorProvider =
59+
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();
60+
61+
// `setParallelPullCount` determines how many StreamingPull streams the subscriber will open
62+
// to receive message. It defaults to 1. `setExecutorProvider` configures an executor for the
63+
// subscriber to process messages. Here, the subscriber is configured to open 2 streams for
64+
// receiving messages, each stream creates a new executor with 4 threads to help process the
65+
// message callbacks. In total 2x4=8 threads are used for message processing.
66+
subscriber =
67+
Subscriber.newBuilder(subscriptionName, receiver)
68+
.setParallelPullCount(2)
69+
.setExecutorProvider(executorProvider)
70+
.build();
71+
72+
// Start the subscriber.
73+
subscriber.startAsync().awaitRunning();
74+
System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
75+
// Allow the subscriber to run for 30s unless an unrecoverable error occurs.
76+
subscriber.awaitTerminated(30, TimeUnit.SECONDS);
77+
} catch (TimeoutException timeoutException) {
78+
// Shut down the subscriber after 30s. Stop receiving messages.
79+
subscriber.stopAsync();
80+
}
81+
}
82+
}
83+
// [END pubsub_subscriber_concurrency_control]

0 commit comments

Comments
 (0)